diff --git a/alembic/versions/e71807682f57_add_sample_point_fields_to_minor_trace.py b/alembic/versions/e71807682f57_add_sample_point_fields_to_minor_trace.py new file mode 100644 index 000000000..c8cb463dc --- /dev/null +++ b/alembic/versions/e71807682f57_add_sample_point_fields_to_minor_trace.py @@ -0,0 +1,53 @@ +"""add sample point fields to minor trace + +Revision ID: e71807682f57 +Revises: h1b2c3d4e5f6 +Create Date: 2026-02-10 20:07:25.586385 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "e71807682f57" +down_revision: Union[str, Sequence[str], None] = "h1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Step 1: add the column as nullable with a temporary default so existing rows get a value. + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column( + "nma_SamplePointID", + sa.String(length=10), + nullable=True, + server_default=sa.text("''"), + ), + ) + + # Step 2: enforce NOT NULL now that all existing rows have a non-NULL value. + op.alter_column( + "NMA_MinorTraceChemistry", + "nma_SamplePointID", + existing_type=sa.String(length=10), + nullable=False, + ) + + # Step 3: drop the temporary default so future inserts must supply a value explicitly. + op.alter_column( + "NMA_MinorTraceChemistry", + "nma_SamplePointID", + existing_type=sa.String(length=10), + server_default=None, + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_column("NMA_MinorTraceChemistry", "nma_SamplePointID") diff --git a/core/initializers.py b/core/initializers.py index 4ffbfb744..c3fe058fc 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -168,6 +168,7 @@ def init_lexicon(path: str = None) -> None: ) association_rows = [] + seen_links = set() for term_dict in terms: term_id = existing_terms.get(term_dict["term"]) if term_id is None: @@ -177,8 +178,9 @@ def init_lexicon(path: str = None) -> None: if category_id is None: continue key = (term_id, category_id) - if key in existing_links: + if key in existing_links or key in seen_links: continue + seen_links.add(key) association_rows.append( {"term_id": term_id, "category_id": category_id} ) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 557c415ad..f07942b15 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -774,6 +774,7 @@ class NMA_MinorTraceChemistry(Base): - nma_global_id: Original UUID PK, now UNIQUE for audit - chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id - nma_chemistry_sample_info_uuid: Legacy UUID FK for audit + - nma_sample_point_id: Legacy SamplePointID string - nma_wclab_id: Legacy WCLab_ID string (audit) """ @@ -807,6 +808,9 @@ class NMA_MinorTraceChemistry(Base): ) # Additional columns + nma_sample_point_id: Mapped[str] = mapped_column( + "nma_SamplePointID", String(10), nullable=False + ) analyte: Mapped[Optional[str]] = mapped_column("analyte", String(50)) symbol: Mapped[Optional[str]] = mapped_column("symbol", String(10)) sample_value: Mapped[Optional[float]] = mapped_column("sample_value", Float) diff --git a/tests/integration/test_admin_minor_trace_chemistry.py b/tests/integration/test_admin_minor_trace_chemistry.py index fcdcd539a..f5cf0d0fa 100644 --- a/tests/integration/test_admin_minor_trace_chemistry.py +++ b/tests/integration/test_admin_minor_trace_chemistry.py @@ -104,6 +104,7 @@ def minor_trace_chemistry_record(): chemistry = NMA_MinorTraceChemistry( nma_global_id=uuid.uuid4(), chemistry_sample_info_id=sample_info.id, # Integer FK + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Arsenic", symbol="As", sample_value=0.005, diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py index 2d38e1a19..58ecc01ec 100644 --- a/tests/test_minor_trace_chemistry_transfer.py +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -36,3 +36,40 @@ def test_row_to_dict_includes_wclab_id(): row_dict = transfer._row_to_dict(row) assert row_dict["nma_WCLab_ID"] == "LAB-123" + assert row_dict["nma_sample_point_id"] == "POINT-1" + + +def test_row_to_dict_missing_sample_point_id_returns_none_and_captures_error(): + # Bypass __init__ so we can stub the cache without hitting the DB. + transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer) + sample_pt_id = uuid.uuid4() + transfer._sample_info_cache = {sample_pt_id: 1} + transfer.flags = {} + transfer.errors = [] + + row = pd.Series( + { + "SamplePtID": str(sample_pt_id), + "GlobalID": str(uuid.uuid4()), + # SamplePointID intentionally missing + "Analyte": "Ca", + "SampleValue": 10.5, + "Units": "mg/L", + "Symbol": None, + "AnalysisMethod": "ICP", + "AnalysisDate": "2024-01-01 00:00:00.000", + "Notes": "note", + "AnalysesAgency": "Lab", + "Uncertainty": 0.1, + "Volume": "2", + "VolumeUnit": "L", + "WCLab_ID": "LAB-123", + } + ) + + row_dict = transfer._row_to_dict(row) + assert row_dict is None + assert len(transfer.errors) == 1 + error = transfer.errors[0] + assert error["field"] == "SamplePointID" + assert "Missing SamplePointID" in error["error"] diff --git a/tests/test_nma_chemistry_lineage.py b/tests/test_nma_chemistry_lineage.py index 4ad4a8ea7..f0853958d 100644 --- a/tests/test_nma_chemistry_lineage.py +++ b/tests/test_nma_chemistry_lineage.py @@ -134,6 +134,7 @@ def test_nma_minor_trace_chemistry_columns(): "id", # Integer PK "nma_global_id", # Legacy UUID "chemistry_sample_info_id", # Integer FK + "nma_sample_point_id", # Legacy sample point id # from legacy "analyte", "sample_value", @@ -173,6 +174,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="As", sample_value=0.015, units="mg/L", @@ -193,6 +195,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_thing): assert mtc.id is not None # Integer PK assert mtc.nma_global_id is not None # Legacy UUID assert mtc.chemistry_sample_info_id == sample_info.id # Integer FK + assert mtc.nma_sample_point_id == sample_info.nma_sample_point_id assert mtc.analyte == "As" assert mtc.sample_value == 0.015 assert mtc.units == "mg/L" @@ -398,6 +401,7 @@ def test_assign_sample_info_to_mtc(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, # OO: assign object + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Pb", ) session.add(mtc) @@ -430,6 +434,7 @@ def test_append_mtc_to_sample_info(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Fe", ) sample_info.minor_trace_chemistries.append(mtc) @@ -451,6 +456,7 @@ def test_mtc_requires_chemistry_sample_info(): with session_ctx() as session: mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), + nma_sample_point_id=_next_sample_point_id(), analyte="Cu", # No chemistry_sample_info_id - should fail ) @@ -484,6 +490,7 @@ def test_full_lineage_navigation(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Zn", ) session.add(mtc) @@ -521,6 +528,7 @@ def test_reverse_lineage_navigation(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Mn", ) session.add(mtc) @@ -557,6 +565,7 @@ def test_cascade_delete_sample_info_deletes_mtc(shared_thing): mtc = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Cd", ) session.add(mtc) @@ -678,11 +687,13 @@ def test_multiple_mtc_per_sample_info(shared_thing): mtc1 = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="As", ) mtc2 = NMA_MinorTraceChemistry( nma_global_id=_next_global_id(), chemistry_sample_info=sample_info, + nma_sample_point_id=sample_info.nma_sample_point_id, analyte="Pb", ) session.add_all([mtc1, mtc2]) diff --git a/transfers/field_parameters_transfer.py b/transfers/field_parameters_transfer.py index 3a894222e..adc8f23f4 100644 --- a/transfers/field_parameters_transfer.py +++ b/transfers/field_parameters_transfer.py @@ -57,10 +57,7 @@ def _transfer_hook(self, session: Session) -> None: Uses ON CONFLICT DO UPDATE on nma_GlobalID (legacy UUID PK, now UNIQUE). """ - limit = self.flags.get("LIMIT", 0) df = self.cleaned_df - if limit > 0: - df = df.head(limit) row_dicts = [] for row in df.itertuples(): diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index ed1d16da7..230767929 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -25,6 +25,7 @@ - nma_global_id: Legacy UUID PK (GlobalID), UNIQUE for audit - chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id - nma_chemistry_sample_info_uuid: Legacy UUID FK for audit +- nma_sample_point_id: Legacy SamplePointID string """ from __future__ import annotations @@ -115,10 +116,7 @@ def _transfer_hook(self, session: Session) -> None: Uses ON CONFLICT DO UPDATE on nma_GlobalID (the legacy UUID PK, now UNIQUE). """ - limit = self.flags.get("LIMIT", 0) df = self.cleaned_df - if limit > 0: - df = df.head(limit) # Convert rows to dicts row_dicts = [] @@ -147,6 +145,7 @@ def _transfer_hook(self, session: Session) -> None: set_={ "chemistry_sample_info_id": excluded.chemistry_sample_info_id, "nma_chemistry_sample_info_uuid": excluded.nma_chemistry_sample_info_uuid, + "nma_sample_point_id": excluded.nma_sample_point_id, "sample_value": excluded.sample_value, "units": excluded.units, "symbol": excluded.symbol, @@ -176,6 +175,15 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: ) return None + sample_point_id = self._safe_str(row, "SamplePointID") + if sample_point_id is None: + self._capture_error( + legacy_sample_pt_id, + f"Missing SamplePointID for SamplePtID: {legacy_sample_pt_id}", + "SamplePointID", + ) + return None + # Look up Integer FK from cache chemistry_sample_info_id = self._sample_info_cache.get(legacy_sample_pt_id) if chemistry_sample_info_id is None: @@ -203,6 +211,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "chemistry_sample_info_id": chemistry_sample_info_id, # Legacy UUID FK for audit "nma_chemistry_sample_info_uuid": legacy_sample_pt_id, + "nma_sample_point_id": sample_point_id, # Data columns "analyte": self._safe_str(row, "Analyte"), "sample_value": self._safe_float(row, "SampleValue"), diff --git a/transfers/radionuclides.py b/transfers/radionuclides.py index 8b4ad9dfc..1a8713ec8 100644 --- a/transfers/radionuclides.py +++ b/transfers/radionuclides.py @@ -38,7 +38,6 @@ from db import NMA_Radionuclides from transfers.logger import logger from transfers.transferer import ChemistryTransferer -from transfers.util import read_csv class RadionuclidesTransferer(ChemistryTransferer): @@ -50,15 +49,10 @@ class RadionuclidesTransferer(ChemistryTransferer): source_table = "Radionuclides" - def __init__(self, *args, batch_size: int = 1000, **kwargs): + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._parse_dates = ["AnalysisDate"] - def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: - input_df = read_csv(self.source_table, parse_dates=self._parse_dates) - cleaned_df = self._filter_to_valid_sample_infos(input_df) - return input_df, cleaned_df - def _transfer_hook(self, session: Session) -> None: row_dicts = [] skipped_global_id = 0 diff --git a/transfers/transferer.py b/transfers/transferer.py index e6fe93e34..afef86e34 100644 --- a/transfers/transferer.py +++ b/transfers/transferer.py @@ -314,7 +314,7 @@ def _build_sample_info_cache(self) -> None: ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: - input_df = read_csv(self.source_table, parse_dates=self._parse_dates) + input_df = self._read_csv(self.source_table, parse_dates=self._parse_dates) cleaned_df = self._filter_to_valid_sample_infos(input_df) return input_df, cleaned_df @@ -332,10 +332,10 @@ def _filter_to_valid_sample_infos(self, df: pd.DataFrame) -> pd.DataFrame: inverted_df = df[~mask].copy() if not inverted_df.empty: for _, row in inverted_df.iterrows(): - pointid = row["SamplePointID"] + sample_pt_id = row.get("SamplePtID") self._capture_error( - pointid, - f"No matching ChemistrySampleInfo for SamplePtID: {pointid}", + sample_pt_id, + f"No matching ChemistrySampleInfo for SamplePtID: {sample_pt_id}", "SamplePtID", ) @@ -343,8 +343,9 @@ def _filter_to_valid_sample_infos(self, df: pd.DataFrame) -> pd.DataFrame: if before_count > after_count: skipped = before_count - after_count + table_name = self.source_table or self.__class__.__name__ logger.warning( - f"Filtered out {skipped} FieldParameters records without matching " + f"Filtered out {skipped} {table_name} records without matching " f"ChemistrySampleInfo ({after_count} valid, {skipped} orphan records prevented)" )