diff --git a/.github/workflows/prepare_test_data.yaml b/.github/workflows/prepare_test_data.yaml index e694d2de..dfd182e1 100644 --- a/.github/workflows/prepare_test_data.yaml +++ b/.github/workflows/prepare_test_data.yaml @@ -4,10 +4,10 @@ on: schedule: - cron: "0 0 1 * *" # run once a month to prevent artifact expiration workflow_dispatch: -# uncomment and adjust the branch name if you need to add new datasets to the artifact -# push: -# branches: -# - main + # uncomment and adjust the branch name if you need to add new datasets to the artifact + push: + branches: + - fix/macsima-cycle-parsing jobs: prepare-data: @@ -57,6 +57,14 @@ jobs: # Spatial Genomics seqFISH v2 curl -O https://s3.embl.de/spatialdata/raw_data/seqfish-2-test-dataset.zip + # ------- + # MACSima OMAP datasets are licensed as CC BY 4.0 + # OMAP23 for current format + curl https://zenodo.org/api/records/14008816/files-archive -o OMAP23.zip + + # OMAP10 for legacy format + curl https://zenodo.org/api/records/7875938/files-archive -o OMAP10.zip + - name: Unzip files run: | cd ./data diff --git a/src/spatialdata_io/readers/macsima.py b/src/spatialdata_io/readers/macsima.py index 8bbe0532..fce16a9c 100644 --- a/src/spatialdata_io/readers/macsima.py +++ b/src/spatialdata_io/readers/macsima.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os +import re import warnings from collections import defaultdict from copy import deepcopy @@ -13,6 +15,7 @@ import pandas as pd import spatialdata as sd from dask_image.imread import imread +from ome_types import OME, from_tiff from spatialdata import SpatialData from spatialdata._logging import logger @@ -44,6 +47,12 @@ class ChannelMetadata: name: str cycle: int + imagetype: str + well: str + roi: int + fluorophore: str + exposure: float + clone: str | None = None # For example DAPI doesnt have a clone @dataclass @@ -61,39 +70,41 @@ def from_paths( imread_kwargs: Mapping[str, Any], skip_rounds: list[int] | None = None, ) -> MultiChannelImage: - cycles = [] - channels = [] + channel_metadata: list[ChannelMetadata] = [] for p in path_files: - cycle = parse_name_to_cycle(p.stem) - cycles.append(cycle) try: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - channel_names = parse_channels(p) - if len(channel_names) > 1: - warnings.warn( - f"Found multiple channels in OME-TIFF file {p}. Only the first one will be used.", - UserWarning, - stacklevel=2, - ) - channels.append(channel_names[0]) + metadata = parse_metadata(p) except ValueError as e: warnings.warn( f"Cannot parse OME metadata from {p}. Error: {e}. Skipping this file.", UserWarning, stacklevel=2 ) + continue + + channel_metadata.append( + ChannelMetadata( + name=metadata["name"], + cycle=metadata["cycle"], + imagetype=metadata["imagetype"], + well=metadata["well"], + roi=metadata["roi"], + fluorophore=metadata["fluorophore"], + clone=metadata["clone"], + exposure=metadata["exposure"], + ) + ) - if len(path_files) != len(cycles) or len(path_files) != len(channels): - raise ValueError("Length of path_files, cycles and channels must be the same.") + if len(path_files) != len(channel_metadata): + raise ValueError("Length of path_files and metadata must be the same.") # if any of round_channels is in skip_rounds, remove that round from the list and from path_files if skip_rounds: logger.info(f"Skipping cycles: {skip_rounds}") - path_files, cycles, channels = map( + path_files, channel_metadata = map( list, zip( *[ - (p, c, ch) - for p, c, ch in zip(path_files, cycles, channels, strict=True) - if c not in skip_rounds + (p, ch_meta) + for p, ch_meta in zip(path_files, channel_metadata, strict=True) + if ch_meta.cycle not in skip_rounds ], strict=True, ), @@ -108,7 +119,7 @@ def from_paths( # create MultiChannelImage object with imgs and metadata output = cls( data=imgs, - metadata=[ChannelMetadata(name=ch, cycle=c) for c, ch in zip(cycles, channels, strict=True)], + metadata=channel_metadata, ) return output @@ -150,6 +161,30 @@ def get_cycles(self) -> list[int]: """Get the cycle numbers.""" return [c.cycle for c in self.metadata] + def get_image_types(self) -> list[str | None]: + """Get the staining types (stain or bleach).""" + return [c.imagetype for c in self.metadata] + + def get_wells(self) -> list[str | None]: + """Get the wells.""" + return [c.well for c in self.metadata] + + def get_rois(self) -> list[int | None]: + """Get the ROIs.""" + return [c.roi for c in self.metadata] + + def get_fluorophores(self) -> list[str | None]: + """Get the fluorophores.""" + return [c.fluorophore for c in self.metadata] + + def get_clones(self) -> list[str | None]: + """Get the clones.""" + return [c.clone for c in self.metadata] + + def get_exposures(self) -> list[float | None]: + """Get the exposures.""" + return [c.exposure for c in self.metadata] + def sort_by_channel(self) -> None: """Sort the channels by cycle number.""" self.data = [d for _, d in sorted(zip(self.metadata, self.data, strict=True), key=lambda x: x[0].cycle)] @@ -300,12 +335,249 @@ def macsima( raise NotImplementedError("Parsing raw MACSima data is not yet implemented.") -def parse_name_to_cycle(name: str) -> int: - """Parse the cycle number from the name of the image.""" - cycle = name.split("_")[0] - if "-" in cycle: - cycle = cycle.split("-")[1] - return int(cycle) +def _collect_map_annotation_values(ome: OME) -> dict[str, Any]: + """Collapse structured_annotations from OME into dictionary. + + Collects all key/value pairs from all map_annotations ins structured_annotations into a single flat dictionary. + If a key appears multiple times across annotations, the *first* + occurrence wins and later occurrences are ignored. + """ + merged: dict[str, Any] = {} + + sa = getattr(ome, "structured_annotations", None) + map_annotations = getattr(sa, "map_annotations", []) if sa else [] + + for ma in map_annotations: + raw_value = ma.value + value = raw_value.dict() + + for k, v in value.items(): + if k not in merged: + merged[k] = v + else: + # For instances where we have different values for the same repeated key, raise a warning. + if v != merged[k]: + warnings.warn( + f"Found different value for {k}: {v}. The parser will only use the first found value, which is {merged[k]}!", + UserWarning, + stacklevel=2, + ) + + return merged + + +def _get_software_version(ma_values: dict[str, Any]) -> str: + """Extract the software version string from the flattened map-annotation values. + + Supports both: + - 'Software version' (v0) + - 'SoftwareVersion' (v1) + """ + for key in ("SoftwareVersion", "Software version"): + v = ma_values.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + + raise ValueError("Could not extract Software Version from OME metadata.") + + +def _get_software_major_version(version: str) -> int: + """Parse the major component of a semantic version string.""" + s = version.strip() + if s.startswith(("v", "V")): + s = s[1:] + parts = s.split(".") + if not parts: + raise ValueError("Could not extract major software version part from version string.") + + major = int(parts[0]) + logger.debug(f"Found major software version {major}") + + return major + + +def _parse_v0_ome_metadata(ome: OME) -> dict[str, Any]: + """Parse Legacy Format of OME Metadata (software version 0.x.x).""" + logger.debug("Parsing OME metadata expecting version 0 format") + + metadata: dict[str, Any] = { + "name": None, + "clone": None, + "fluorophore": None, + "cycle": None, + "imagetype": None, + "well": None, + "roi": None, + "exposure": None, + } + + antigen = None + clone = None + + if ome.screens: + screen0 = ome.screens[0] + reagents = getattr(screen0, "reagents", []) + if reagents: + r0 = reagents[0] + name = getattr(r0, "name", None) + if isinstance(name, str) and name: + if "__" in name: + antigen, clone = name.split("__", 1) + else: + antigen = name + clone = None + + metadata["name"] = antigen + metadata["clone"] = clone + + ma_values = _collect_map_annotation_values(ome) + + if "Fluorochrome" in ma_values: + metadata["fluorophore"] = ma_values["Fluorochrome"] + + if "Exposure time" in ma_values: + exp_time = ma_values["Exposure time"] + try: + metadata["exposure"] = float(exp_time) + except (TypeError, ValueError): + metadata["exposure"] = None + + if "Cycle" in ma_values: + cyc = ma_values["Cycle"] + try: + metadata["cycle"] = int(cyc) + except (TypeError, ValueError): + metadata["cycle"] = None + + if "ROI ID" in ma_values: + roi = ma_values["ROI ID"] + try: + metadata["roi"] = int(roi) + except (TypeError, ValueError): + metadata["roi"] = None + + if "MICS cycle type" in ma_values: + metadata["imagetype"] = ma_values["MICS cycle type"] + + well = None + if ome.plates: + plate0 = ome.plates[0] + wells = getattr(plate0, "wells", []) + if wells: + w0 = wells[0] + ext_id = getattr(w0, "external_identifier", None) + if isinstance(ext_id, str) and ext_id: + well = ext_id + + metadata["well"] = well + + # Add _background suffix to marker name of bleach images, to distinguis them from stain image + if metadata["imagetype"] == "BleachCycle": + metadata["name"] = metadata["name"] + "_background" + + return metadata + + +def _parse_v1_ome_metadata(ome: OME) -> dict[str, Any]: + """Parse v1 format of OME metadata (software version 1.x.x).""" + logger.debug("Parsing OME metadata expecting version 1 format") + + metadata: dict[str, Any] = { + "name": None, + "clone": None, + "fluorophore": None, + "cycle": None, + "imagetype": None, + "well": None, + "roi": None, + "exposure": None, + } + + ma_values = _collect_map_annotation_values(ome) + + if "Clone" in ma_values: + metadata["clone"] = ma_values["Clone"] + + antigen_name = None + if "Biomarker" in ma_values and ma_values["Biomarker"]: + antigen_name = ma_values["Biomarker"] + elif "Dye" in ma_values and ma_values["Dye"]: + antigen_name = ma_values["Dye"] + + metadata["name"] = antigen_name + + if "Fluorochrome" in ma_values and ma_values["Fluorochrome"]: + metadata["fluorophore"] = ma_values["Fluorochrome"] + elif "Dye" in ma_values and ma_values["Dye"]: + metadata["fluorophore"] = ma_values["Dye"] + + if "ExposureTime" in ma_values: + exp_time = ma_values["ExposureTime"] + try: + metadata["exposure"] = float(exp_time) + except (TypeError, ValueError): + metadata["exposure"] = None + + if "Cycle" in ma_values: + cyc = ma_values["Cycle"] + try: + metadata["cycle"] = int(cyc) + except (TypeError, ValueError): + metadata["cycle"] = None + + if "RoiId" in ma_values: + roi = ma_values["RoiId"] + try: + metadata["roi"] = int(roi) + except (TypeError, ValueError): + metadata["roi"] = None + + if "ScanType" in ma_values: + metadata["imagetype"] = ma_values["ScanType"] + + well = None + if ome.plates: + plate0 = ome.plates[0] + wells = getattr(plate0, "wells", []) + if wells: + w0 = wells[0] + ext_id = getattr(w0, "external_identifier", None) + if isinstance(ext_id, str) and ext_id: + well = ext_id + + metadata["well"] = well + + # Add _background suffix to marker name of bleach images, to distinguis them from stain image + if metadata["imagetype"] == "B": + metadata["name"] = metadata["name"] + "_background" + + return metadata + + +def _parse_ome_metadata(ome: OME) -> dict[str, Any]: + """Extract the software version from OME metadata and parse with appropriate parser.""" + ma_values = _collect_map_annotation_values(ome) + version_str = _get_software_version(ma_values) + major = _get_software_major_version(version_str) + + if major == 0: + return _parse_v0_ome_metadata(ome) + elif major == 1: + return _parse_v1_ome_metadata(ome) + else: + raise ValueError("Unknown software version, cannot determine parser") + + +def parse_metadata(path: Path) -> dict[str, Any]: + """Parse metadata for a file. + + All metadata is extracted from the OME metadata. + """ + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + ome = from_tiff(path) + + return _parse_ome_metadata(ome) def parse_processed_folder( @@ -396,7 +668,9 @@ def create_sdata( [i for i in range(len(mci.metadata)) if i not in nuclei_idx_without_first_and_last], ) - pixels_to_microns = parse_physical_size(path_files[0]) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + pixels_to_microns = parse_physical_size(path_files[0]) image_element = create_image_element( mci, @@ -440,12 +714,30 @@ def create_sdata( def create_table(mci: MultiChannelImage) -> ad.AnnData: cycles = mci.get_cycles() names = mci.get_channel_names() + imagetypes = mci.get_image_types() + wells = mci.get_wells() + rois = mci.get_rois() + fluorophores = mci.get_fluorophores() + clones = mci.get_clones() + exposures = mci.get_exposures() + df = pd.DataFrame( { "name": names, "cycle": cycles, + "imagetype": imagetypes, + "well": wells, + "ROI": rois, + "fluorophore": fluorophores, + "clone": clones, + "exposure": exposures, } ) + + # Replace missing data. This happens mostly in the clone column. + df = df.replace({None: pd.NA, "": pd.NA}) + df.index = df.index.astype(str) + table = ad.AnnData(var=df) table.var_names = names return sd.models.TableModel.parse(table) diff --git a/tests/test_macsima.py b/tests/test_macsima.py index 63d6ca24..d05710b5 100644 --- a/tests/test_macsima.py +++ b/tests/test_macsima.py @@ -7,6 +7,15 @@ import dask.array as da import pytest from click.testing import CliRunner +from ome_types import OME +from ome_types.model import ( + MapAnnotation, + Plate, + Reagent, + Screen, + StructuredAnnotations, + Well, +) from spatialdata import read_zarr from spatialdata.models import get_channel_names @@ -14,27 +23,55 @@ from spatialdata_io.readers.macsima import ( ChannelMetadata, MultiChannelImage, + _collect_map_annotation_values, + _get_software_major_version, + _get_software_version, + _parse_ome_metadata, + _parse_v0_ome_metadata, + _parse_v1_ome_metadata, macsima, - parse_name_to_cycle, ) from tests._utils import skip_if_below_python_version RNG = da.random.default_rng(seed=0) -if not (Path("./data/Lung_adc_demo").exists() or Path("./data/MACSimaData_HCA").exists()): +if not (Path("./data/OMAP10").exists() or Path("./data/OMAP23").exists()): pytest.skip( - "Requires the Lung_adc_demo or MACSimaData_HCA datasets, please check " - "https://github.com/giovp/spatialdata-sandbox/macsima/Readme.md for instructions on how to get the data.", + "Requires the OMAP10 or OMAP23 datasets. " + "The OMAP10 dataset can be downloaded from https://zenodo.org/records/7875938 " + "The OMAP23 dataset can be downloaded from https://zenodo.org/records/14008816", allow_module_level=True, ) +# Helper to create ChannelMetadata with some defaults +def make_ChannelMetadata( + name: str, + cycle: int, + fluorophore: str | None = None, + exposure: float | None = None, + imagetype: str | None = None, + well: str | None = None, + roi: int | None = None, +) -> ChannelMetadata: + """Helper to construct ChannelMetadata with required defaults.""" + return ChannelMetadata( + name=name, + cycle=cycle, + fluorophore=fluorophore or "", + exposure=exposure if exposure is not None else 0.0, + imagetype=imagetype or "StainCycle", + well=well or "A01", + roi=roi if roi is not None else 0, + ) + + @skip_if_below_python_version() @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", {"y": (0, 15460), "x": (0, 13864)}), - ("MACSimaData_HCA/HumanLiverH35", {"y": (0, 1154), "x": (0, 1396)}), + ("OMAP10", {"y": (0, 7797), "x": (0, 9407)}), + ("OMAP23", {"y": (0, 7703), "x": (0, 9329)}), ], ) def test_image_size(dataset: str, expected: dict[str, Any]) -> None: @@ -42,7 +79,7 @@ def test_image_size(dataset: str, expected: dict[str, Any]) -> None: f = Path("./data") / dataset assert f.is_dir() - sdata = macsima(f) + sdata = macsima(f, transformations=False) # Do not transform to make it easier to compare against pixel dimensions el = sdata[list(sdata.images.keys())[0]] cs = sdata.coordinate_systems[0] @@ -54,7 +91,7 @@ def test_image_size(dataset: str, expected: dict[str, Any]) -> None: @skip_if_below_python_version() @pytest.mark.parametrize( "dataset,expected", - [("Lung_adc_demo", 116), ("MACSimaData_HCA/HumanLiverH35", 102)], + [("OMAP10", 31), ("OMAP23", 29)], ) def test_total_channels(dataset: str, expected: int) -> None: f = Path("./data") / dataset @@ -67,12 +104,12 @@ def test_total_channels(dataset: str, expected: int) -> None: assert channels == expected -@skip_if_below_python_version() +# @skip_if_below_python_version() @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", ["R0 DAPI", "R1 CD68", "R1 CD163"]), - ("MACSimaData_HCA/HumanLiverH35", ["R0 DAPI", "R1 PE", "R1 DAPI"]), + ("OMAP10", ["R1 DAPI", "R1 CD11c", "R1 Actin"]), + ("OMAP23", ["R1 CD3", "R1 DAPI", "R1 CD274"]), ], ) def test_channel_names(dataset: str, expected: list[str]) -> None: @@ -90,8 +127,8 @@ def test_channel_names(dataset: str, expected: list[str]) -> None: @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", 68), - ("MACSimaData_HCA/HumanLiverH35", 51), + ("OMAP10", 22), + ("OMAP23", 15), ], ) def test_total_rounds(dataset: str, expected: list[int]) -> None: @@ -107,11 +144,11 @@ def test_total_rounds(dataset: str, expected: list[int]) -> None: @pytest.mark.parametrize( "dataset,skip_rounds,expected", [ - ("Lung_adc_demo", list(range(2, 68)), ["DAPI (1)", "CD68", "CD163", "DAPI (2)", "Control"]), + ("OMAP10", list(range(2, 23)), ["DAPI", "CD11c", "Actin", "CD15"]), ( - "MACSimaData_HCA/HumanLiverH35", - list(range(2, 51)), - ["DAPI (1)", "PE", "CD14", "Vimentin", "DAPI (2)", "WT1"], + "OMAP23", + list(range(3, 16)), + ["CD3", "DAPI", "CD274", "CD279"], ), ], ) @@ -130,8 +167,8 @@ def test_skip_rounds(dataset: str, skip_rounds: list[int], expected: list[str]) @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", [0, 1, 1]), - ("MACSimaData_HCA/HumanLiverH35", [0, 1, 1]), + ("OMAP10", [1, 1, 1]), + ("OMAP23", [1, 1, 1]), ], ) def test_cycle_metadata(dataset: str, expected: list[str]) -> None: @@ -140,7 +177,7 @@ def test_cycle_metadata(dataset: str, expected: list[str]) -> None: sdata = macsima(f, c_subset=3) table = sdata[list(sdata.tables.keys())[0]] - # get the channel names + # get the channel cycles cycles = table.var["cycle"] assert list(cycles) == expected @@ -150,25 +187,15 @@ def test_parsing_style() -> None: macsima(Path(), parsing_style="not_a_parsing_style") -@pytest.mark.parametrize( - "name,expected", - [ - ("C-002_S-000_S_FITC_R-01_W-C-1_ROI-01_A-CD147_C-REA282.tif", 2), - ("001_S_R-01_W-B-1_ROI-01_A-CD14REA599ROI1_C-REA599.ome.tif", 1), - ], -) -def test_parsing_of_name_to_cycle(name: str, expected: int) -> None: - result = parse_name_to_cycle(name) - assert result == expected - - def test_mci_sort_by_channel() -> None: sizes = [100, 200, 300] c_names = ["test11", "test3", "test2"] cycles = [2, 0, 1] mci = MultiChannelImage( data=[RNG.random((size, size), chunks=(10, 10)) for size in sizes], - metadata=[ChannelMetadata(name=c_name, cycle=cycle) for c_name, cycle in zip(c_names, cycles, strict=False)], + metadata=[ + make_ChannelMetadata(name=c_name, cycle=cycle) for c_name, cycle in zip(c_names, cycles, strict=False) + ], ) assert mci.get_channel_names() == c_names assert [x.shape[0] for x in mci.data] == sizes @@ -182,7 +209,7 @@ def test_mci_array_reference() -> None: arr2 = RNG.random((200, 200), chunks=(10, 10)) mci = MultiChannelImage( data=[arr1, arr2], - metadata=[ChannelMetadata(name="test1", cycle=0), ChannelMetadata(name="test2", cycle=1)], + metadata=[make_ChannelMetadata(name="test1", cycle=0), make_ChannelMetadata(name="test2", cycle=1)], ) orig_arr1 = arr1.copy() @@ -206,8 +233,8 @@ def test_mci_array_reference() -> None: @skip_if_below_python_version() -@pytest.mark.parametrize("dataset", ["Lung_adc_demo", "MACSimaData_HCA/HumanLiverH35"]) -def test_cli_macimsa(runner: CliRunner, dataset: str) -> None: +@pytest.mark.parametrize("dataset", ["OMAP10", "OMAP23"]) +def test_cli_macsima(runner: CliRunner, dataset: str) -> None: f = Path("./data") / dataset assert f.is_dir() with TemporaryDirectory() as tmpdir: @@ -229,3 +256,279 @@ def test_cli_macimsa(runner: CliRunner, dataset: str) -> None: ) assert result.exit_code == 0, result.output _ = read_zarr(output_zarr) + + +def test_collect_map_annotation_values_with_no_duplicate_keys() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"c": "3"}), + ] + ) + ) + + result = _collect_map_annotation_values(ome) + + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotations_values_with_duplicate_keys_identical_values() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"b": "2", "c": "3"}), + ] + ) + ) + + result = _collect_map_annotation_values(ome) + # Key should only be returned once + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotations_values_with_duplicate_keys_different_values() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"b": "99", "c": "3"}), + ] + ) + ) + import re + + # The parser should throw a warning when duplicate keys with different values are found + with pytest.warns( + UserWarning, + match=re.escape("Found different value for b: 99. The parser will only use the first found value, which is 2!"), + ): + result = _collect_map_annotation_values(ome) + + # The parser should return only the first found value. + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotation_values_handles_missing_sa_and_empty_list() -> None: + # No structured_annotations at all + ome1 = OME() + assert _collect_map_annotation_values(ome1) == {} + + # structured_annotations present but with empty map_annotation list + ome2 = OME(structured_annotations=StructuredAnnotations(map_annotations=[])) + assert _collect_map_annotation_values(ome2) == {} + + +@pytest.mark.parametrize( + "ma_values, expected", + [ + ({"SoftwareVersion": " 1.2.3 "}, "1.2.3"), + ({"Software version": " v0.9.0"}, "v0.9.0"), + ], +) +def test_get_software_version_success(ma_values: dict[str, str], expected: str) -> None: + assert _get_software_version(ma_values) == expected + + +@pytest.mark.parametrize( + "ma_values", + [ + ({}), + ({"SoftwareVersion": ""}), + ({"SoftwareVersion": " "}), + ({"Software version": ""}), + ({"Software version": None}), + ], +) +def test_get_software_version_failure(ma_values: dict[str, str | None]) -> None: + with pytest.raises(ValueError, match="Could not extract Software Version"): + _get_software_version(ma_values) + + +@pytest.mark.parametrize( + "version, expected", + [ + ("1.2.3", 1), + (" 2.0.0 ", 2), + ("v3.4.5", 3), + ("V4.0.1", 4), + ("10", 10), + ], +) +def test_get_software_major_version_success(version: str, expected: int) -> None: + assert _get_software_major_version(version) == expected + + +def test_get_software_major_version_empty_parts_raises() -> None: + with pytest.raises(ValueError): + _get_software_major_version("") + + +def test_parse_v0_ome_metadata_basic_extraction_and_conversions() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Fluorochrome": "AF488", + "Exposure time": "123.4", + "Cycle": "5", + "ROI ID": "7", + "MICS cycle type": "StainCycle", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="CD3__OKT3")])], + plates=[Plate(wells=[Well(column=0, row=0, external_identifier="A01")])], + ) + + md = _parse_v0_ome_metadata(ome) + + assert md["name"] == "CD3" + assert md["clone"] == "OKT3" + assert md["fluorophore"] == "AF488" + assert md["exposure"] == pytest.approx(123.4) + assert md["cycle"] == 5 + assert md["roi"] == 7 + assert md["imagetype"] == "StainCycle" + assert md["well"] == "A01" + + +def test_parse_v0_ome_metadata_handles_missing_or_invalid_numeric_fields() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Exposure time": "not-a-number", + "Cycle": "NaN", + "ROI ID": "x", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="MarkerOnly")])], + plates=[Plate(wells=[Well(column=0, row=0, external_identifier=None)])], + ) + + md = _parse_v0_ome_metadata(ome) + + # name from reagent without "__" + assert md["name"] == "MarkerOnly" + assert md["clone"] is None + assert md["exposure"] is None + assert md["cycle"] is None + assert md["roi"] is None + # well remains None + assert md["well"] is None + + +def test_parse_v0_ome_metadata_bleach_cycle_appends_background() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "MICS cycle type": "BleachCycle", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="CD4__RPA-T4")])], + ) + + md = _parse_v0_ome_metadata(ome) + + assert md["imagetype"] == "BleachCycle" + assert md["name"] == "CD4_background" + + +def test_parse_v1_ome_metadata_primary_keys_biomarker_and_clone() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Clone": "OKT3", + "Biomarker": "CD3", + "Fluorochrome": "AF488", + "ExposureTime": "45.6", + "Cycle": "3", + "RoiId": "10", + "ScanType": "S", + } + ) + ] + ), + plates=[Plate(wells=[Well(column=0, row=0, external_identifier="B02")])], + ) + + md = _parse_v1_ome_metadata(ome) + + assert md["name"] == "CD3" + assert md["clone"] == "OKT3" + assert md["fluorophore"] == "AF488" + assert md["exposure"] == pytest.approx(45.6) + assert md["cycle"] == 3 + assert md["roi"] == 10 + assert md["imagetype"] == "S" + assert md["well"] == "B02" + + +def test_parse_v1_ome_metadata_invalid_numerics_become_none() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "ExposureTime": "x", + "Cycle": "NaN", + "RoiId": "ABC", + } + ) + ] + ), + ) + + md = _parse_v1_ome_metadata(ome) + + assert md["exposure"] is None + assert md["cycle"] is None + assert md["roi"] is None + + +def make_ome_with_version(version_value: str, extra_ma: dict[str, Any] | None = None) -> OME: + base = {"SoftwareVersion": version_value} + if extra_ma: + base.update(extra_ma) + return OME(structured_annotations=StructuredAnnotations(map_annotations=[MapAnnotation(value=base)])) + + +def test_parse_ome_metadata_dispatches_to_v0() -> None: + ome = make_ome_with_version("0.9.0") + # enrich some so v0 parser has something to see + ome.screens = [Screen(reagents=[Reagent(name="Marker0")])] + + md = _parse_ome_metadata(ome) + + # Assert that the v0 parser was used by checking a field + # that can only come from v0 parsing logic (e.g. name from reagent) + assert "name" in md + assert md["name"] == "Marker0" + + +def test_parse_ome_metadata_dispatches_to_v1() -> None: + ome = make_ome_with_version("1.0.0", extra_ma={"Biomarker": "CD3"}) + + md = _parse_ome_metadata(ome) + + assert md["name"] == "CD3" + + +def test_parse_ome_metadata_unknown_major_raises() -> None: + ome = make_ome_with_version("2.0.0") + + with pytest.raises(ValueError, match="Unknown software version"): + _parse_ome_metadata(ome)