Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/tools/peak.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ def _process_peaks(self, filepath: str | Path) -> "dict[int | str, Peak]":
)
return peak_info


@dataclass(frozen=True)
class Peak:
"""A single chromatographic peak with optional compound annotation."""
Expand Down
65 changes: 65 additions & 0 deletions src/tools/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import numpy.typing as npt
import pymzml
from tools.utils import get_ppm_range


class Spectra:
Expand Down Expand Up @@ -152,3 +153,67 @@ class Spectrum:
intensity: npt.NDArray[np.float64]
polarity: Literal[0, 1, -1]
rtime_unit: str

def _match_peaks(
self, other_spectrum: npt.NDArray[np.float64], ppm_error: float, abs_tol: float = 0
) -> npt.NDArray[np.float64]:
"""
Match peaks from this spectrum against ``exp`` within a ppm tolerance.

Each unmatched peak from ``self`` contributes a row with zero exp values;
each unmatched peak from ``exp`` contributes a row with zero self values.

Returns an (n, 4) array with columns [self_mz, self_int, exp_mz, exp_int].
"""
spec1 = np.column_stack([self.mz, self.intensity])
spec1 = spec1[np.argsort(spec1[:, 0])]
spec2 = other_spectrum[np.argsort(other_spectrum[:, 0])]
matches = []

for i, spec in enumerate(spec1):
lower_bound, upper_bound = get_ppm_range(spec[0], ppm_error, abs_tol)
mask = (spec2[:, 0] >= lower_bound) & (spec2[:, 0] <= upper_bound)
if sum(mask) > 0:
matches.append(
np.hstack([spec, spec2[mask][np.argsort(np.abs(spec2[mask, 0] - spec[0]))[0]]])
)
else:
matches.append(np.r_[spec, [0, 0]])

matches = np.array(matches)
for spec in other_spectrum:
if spec[0] not in matches[:, 2]:
matches = np.vstack((matches, np.r_[[0, 0], spec]))

return matches

def compare_spectra(
self,
other_spectrum: npt.NDArray[np.float64],
ppm_error: float,
function: Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float],
) -> float:
"""
Score this spectrum against ``other_spectrum``.

Parameters
----------
other_spectrum:
Array of shape ``(n, 2)`` with columns ``[mz, intensity]``.

ppm_error:
Mass tolerance in parts-per-million for peak matching.

function:
Scoring function applied to ``(self_intensities, other_intensities)``
extracted from matched peak rows.

Returns
-------
float
Score returned by ``function``, or 0 if ``other_spectrum`` is empty.
"""
if other_spectrum.size == 0:
return 0.0
matches = self._match_peaks(other_spectrum, ppm_error)
return float(function(matches[:, 1], matches[:, 3]))
9 changes: 9 additions & 0 deletions src/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,12 @@ def get_formula(element_count: dict[str, int], charge: int) -> str:
formula += sign + magnitude

return formula


def get_ppm_range(mz, ppm_error=0, abs_tol=0):
lower_bound = mz - ppm_error / 1e6 * mz
upper_bound = mz + ppm_error / 1e6 * mz
if abs_tol != 0:
lower_bound += -abs_tol
upper_bound += abs_tol
return lower_bound, upper_bound
114 changes: 114 additions & 0 deletions tests/test_spectra.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
from pathlib import Path

import numpy as np
import pytest

from tools import Spectra
from tools.spectra import Spectrum


def _make_spectrum(mz, intensity):
return Spectrum(
spectrum_index=0,
ms_level=2,
rtime=1.0,
scan_index=1,
file=Path("test.mzML"),
mz=np.array(mz, dtype=np.float64),
intensity=np.array(intensity, dtype=np.float64),
polarity=1,
rtime_unit="minute",
)


def test_spectra_len(data_dir, spectra):
assert len(spectra) == 600
s = Spectra(filepaths=[data_dir / "Blank1A.mzML"])
Expand Down Expand Up @@ -70,3 +87,100 @@ def test_configure_retention_time_minute_to_hour():
s._configure_retention_time(0.0, "hour")
result = s._configure_retention_time(60.0, "minute")
assert result == pytest.approx(1.0)


def test_match_peaks_all_matched():
"""
All self peaks have a counterpart in other_spectrum within the ppm window.
No unmatched rows should appear on either side.
"""
spec = _make_spectrum(mz=[100.0, 200.0], intensity=[0.5, 0.8])
other = np.array([[100.0005, 0.6], [200.001, 0.9]]) # both within 20 ppm

matches = spec._match_peaks(other, ppm_error=20)

expected = np.array([
[100.0, 0.5, 100.0005, 0.6],
[200.0, 0.8, 200.001, 0.9],
])
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)


def test_match_peaks_partial_match():
spec = _make_spectrum(mz=[100.0, 200.0, 300.0], intensity=[0.5, 0.8, 0.3])
other = np.array([[100.0, 0.6], [500.0, 0.7]])

matches = spec._match_peaks(other, ppm_error=10)

expected = np.array([
[100.0, 0.5, 100.0, 0.6],
[200.0, 0.8, 0.0, 0.0],
[300.0, 0.3, 0.0, 0.0],
[0.0, 0.0, 500.0, 0.7],
])
assert matches.shape == (4, 4)
assert np.allclose(matches, expected)


def test_match_peaks_no_match():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
other = np.array([[300.0, 0.8]])

matches = spec._match_peaks(other, ppm_error=10)

expected = np.array([
[100.0, 0.5, 0.0, 0.0],
[0.0, 0.0, 300.0, 0.8],
])
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)


def test_match_peaks_closest_chosen():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
other = np.array([[99.999, 0.4], [100.0005, 0.6]])

matches = spec._match_peaks(other, ppm_error=20)

# 99.999 is closer to 100.0 than 100.0005
expected = np.array([
[100.0, 0.5, 100.0005, 0.6],
[0.0, 0.0, 99.999, 0.4],
])
assert matches.shape == (2, 4)
assert np.allclose(matches, expected)


def test_match_peaks_abs_tol():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
other = np.array([[100.005, 0.6]])

without_abs_tol = spec._match_peaks(other, ppm_error=10, abs_tol=0)
with_abs_tol = spec._match_peaks(other, ppm_error=10, abs_tol=0.005)

assert np.allclose(without_abs_tol, [[100.0, 0.5, 0.0, 0.0], [0.0, 0.0, 100.005, 0.6]])
assert np.allclose(with_abs_tol, [[100.0, 0.5, 100.005, 0.6]])



def test_compare_spectra_empty_other():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
result = spec.compare_spectra(np.empty((0, 2)), ppm_error=10, function=np.dot)
assert result == 0.0


def test_compare_spectra_matched_peaks():
spec = _make_spectrum(mz=[100.0, 200.0], intensity=[0.5, 0.8])
other = np.array([[100.0, 0.6], [200.0, 0.9]])

result = spec.compare_spectra(other, ppm_error=10, function=np.dot)
assert result == pytest.approx(1.02)


def test_compare_spectra_no_match():
spec = _make_spectrum(mz=[100.0], intensity=[0.5])
other = np.array([[300.0, 0.8]])

result = spec.compare_spectra(other, ppm_error=10, function=np.dot)
assert result == pytest.approx(0.0)
Loading