Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/tools/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@ class Spectra:
("hour", "minute"): lambda x: x * 60,
}

def __init__(self, filepaths: list[str | Path]):
def __init__(
self,
filepaths: list[str | Path],
rtime_unit: Literal["seconds", "minute", "hour"] = "seconds",
):
"""
Initialize from a list of mzML file paths.

Parameters
----------
filepaths : list[str or Path]
Paths to mzML files to parse.

rtime_unit : {'seconds', 'minute', 'hour'}
Target retention time unit. All parsed spectra are converted to
this unit. Defaults to 'seconds'.
"""
self.rtime_unit: str = "unknown"
self.rtime_unit: str = self._configure_retention_time_unit(rtime_unit)
self.spectra = self._read_mzml_files(filepaths)

def __len__(self) -> int:
Expand Down Expand Up @@ -69,10 +77,7 @@ def _configure_retention_time_unit(self, unit: str) -> str:

def _configure_retention_time(self, rtime: float, unit: str) -> float:
"""
Convert a retention time value to the collection's established unit.

Sets the collection's unit from the first spectrum encountered, then converts
all subsequent values to match.
Convert a retention time value to the collection's target unit.

Parameters
----------
Expand All @@ -84,19 +89,16 @@ def _configure_retention_time(self, rtime: float, unit: str) -> float:
Returns
-------
float
Retention time converted to the collection's established unit.
Retention time converted to ``self.rtime_unit``.
"""
unit = self._configure_retention_time_unit(unit)

# establish the target unit
if self.rtime_unit == "unknown":
self.rtime_unit = unit
return float(rtime)

if self.rtime_unit == unit:
return float(rtime)

return self.CONVERSIONS[(unit, self.rtime_unit)](float(rtime))
converted_value = self.CONVERSIONS[(unit, self.rtime_unit)](float(rtime))
self.rtime_unit = unit
return converted_value

def _read_mzml_files(self, filepaths: list[str | Path]) -> "list[Spectrum]":
"""
Expand Down Expand Up @@ -130,7 +132,7 @@ def _read_mzml_files(self, filepaths: list[str | Path]) -> "list[Spectrum]":
ms_level=spec.ms_level,
rtime=rtime,
scan_index=spec.ID,
file=Path(run.path_or_file),
file=Path(run.path_or_file).name,
mz=spec.mz,
intensity=spec.i,
polarity=polarity,
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def spectra(data_dir):
data_dir / "L01.mzML",
data_dir / "LB01.mzML",
data_dir / "T01A.mzML",
]
],
rtime_unit="minute",
)


Expand Down
59 changes: 33 additions & 26 deletions tests/test_spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_spectra_rtime_unit(spectra):
assert all(sp.rtime_unit == spectra.rtime_unit for sp in spectra)

s = Spectra(filepaths=[])
assert s.rtime_unit == "unknown"
assert s.rtime_unit == "seconds"
s._configure_retention_time(1.0, "minute")
assert s.rtime_unit == "minute"
for unit in ("seconds", "minute", "hour"):
Expand All @@ -51,12 +51,12 @@ def test_spectra_rtime_unit(spectra):

def test_spectrum_file_paths(data_dir, spectra):
expected_files = {
data_dir / "Blank1A.mzML",
data_dir / "GAS01.mzML",
data_dir / "GB01.mzML",
data_dir / "L01.mzML",
data_dir / "LB01.mzML",
data_dir / "T01A.mzML",
"Blank1A.mzML",
"GAS01.mzML",
"GB01.mzML",
"L01.mzML",
"LB01.mzML",
"T01A.mzML",
}
assert {sp.file for sp in spectra} == expected_files

Expand Down Expand Up @@ -99,10 +99,12 @@ def test_match_peaks_all_matched():

matches = spec._match_peaks(other, ppm_error=20)

expected = np.array([
[100.0, 0.5, 100.0005, 0.6],
[200.0, 0.8, 200.001, 0.9],
])
expected = np.array(
[
[100.0, 0.5, 100.0005, 0.6],
[200.0, 0.8, 200.001, 0.9],
]
)
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)

Expand All @@ -113,12 +115,14 @@ def test_match_peaks_partial_match():

matches = spec._match_peaks(other, ppm_error=10)

expected = np.array([
[100.0, 0.5, 100.0, 0.6],
[200.0, 0.8, 0.0, 0.0],
[300.0, 0.3, 0.0, 0.0],
[0.0, 0.0, 500.0, 0.7],
])
expected = np.array(
[
[100.0, 0.5, 100.0, 0.6],
[200.0, 0.8, 0.0, 0.0],
[300.0, 0.3, 0.0, 0.0],
[0.0, 0.0, 500.0, 0.7],
]
)
assert matches.shape == (4, 4)
assert np.allclose(matches, expected)

Expand All @@ -129,10 +133,12 @@ def test_match_peaks_no_match():

matches = spec._match_peaks(other, ppm_error=10)

expected = np.array([
[100.0, 0.5, 0.0, 0.0],
[0.0, 0.0, 300.0, 0.8],
])
expected = np.array(
[
[100.0, 0.5, 0.0, 0.0],
[0.0, 0.0, 300.0, 0.8],
]
)
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)

Expand All @@ -144,10 +150,12 @@ def test_match_peaks_closest_chosen():
matches = spec._match_peaks(other, ppm_error=20)

# 99.999 is closer to 100.0 than 100.0005
expected = np.array([
[100.0, 0.5, 100.0005, 0.6],
[0.0, 0.0, 99.999, 0.4],
])
expected = np.array(
[
[100.0, 0.5, 100.0005, 0.6],
[0.0, 0.0, 99.999, 0.4],
]
)
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)

Expand All @@ -163,7 +171,6 @@ def test_match_peaks_abs_tol():
assert np.allclose(with_abs_tol, [[100.0, 0.5, 100.005, 0.6]])



def test_compare_spectra_empty_other():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
result = spec.compare_spectra(np.empty((0, 2)), ppm_error=10, function=np.dot)
Expand Down
Loading