Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""add sample point fields to minor trace

Revision ID: e71807682f57
Revises: h1b2c3d4e5f6
Create Date: 2026-02-10 20:07:25.586385

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "e71807682f57"
down_revision: Union[str, Sequence[str], None] = "h1b2c3d4e5f6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# Step 1: add the column as nullable with a temporary default so existing rows get a value.
op.add_column(
"NMA_MinorTraceChemistry",
sa.Column(
"nma_SamplePointID",
sa.String(length=10),
nullable=True,
server_default=sa.text("''"),
),
)

# Step 2: enforce NOT NULL now that all existing rows have a non-NULL value.
op.alter_column(
"NMA_MinorTraceChemistry",
"nma_SamplePointID",
existing_type=sa.String(length=10),
nullable=False,
)

# Step 3: drop the temporary default so future inserts must supply a value explicitly.
op.alter_column(
"NMA_MinorTraceChemistry",
"nma_SamplePointID",
existing_type=sa.String(length=10),
server_default=None,
)


def downgrade() -> None:
"""Downgrade schema."""
op.drop_column("NMA_MinorTraceChemistry", "nma_SamplePointID")
4 changes: 3 additions & 1 deletion core/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def init_lexicon(path: str = None) -> None:
)

association_rows = []
seen_links = set()
for term_dict in terms:
term_id = existing_terms.get(term_dict["term"])
if term_id is None:
Expand All @@ -177,8 +178,9 @@ def init_lexicon(path: str = None) -> None:
if category_id is None:
continue
key = (term_id, category_id)
if key in existing_links:
if key in existing_links or key in seen_links:
continue
seen_links.add(key)
association_rows.append(
{"term_id": term_id, "category_id": category_id}
)
Expand Down
4 changes: 4 additions & 0 deletions db/nma_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ class NMA_MinorTraceChemistry(Base):
- nma_global_id: Original UUID PK, now UNIQUE for audit
- chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id
- nma_chemistry_sample_info_uuid: Legacy UUID FK for audit
- nma_sample_point_id: Legacy SamplePointID string
- nma_wclab_id: Legacy WCLab_ID string (audit)
"""

Expand Down Expand Up @@ -807,6 +808,9 @@ class NMA_MinorTraceChemistry(Base):
)

# Additional columns
nma_sample_point_id: Mapped[str] = mapped_column(
"nma_SamplePointID", String(10), nullable=False
)
analyte: Mapped[Optional[str]] = mapped_column("analyte", String(50))
symbol: Mapped[Optional[str]] = mapped_column("symbol", String(10))
sample_value: Mapped[Optional[float]] = mapped_column("sample_value", Float)
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_admin_minor_trace_chemistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def minor_trace_chemistry_record():
chemistry = NMA_MinorTraceChemistry(
nma_global_id=uuid.uuid4(),
chemistry_sample_info_id=sample_info.id, # Integer FK
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Arsenic",
symbol="As",
sample_value=0.005,
Expand Down
37 changes: 37 additions & 0 deletions tests/test_minor_trace_chemistry_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,40 @@ def test_row_to_dict_includes_wclab_id():

row_dict = transfer._row_to_dict(row)
assert row_dict["nma_WCLab_ID"] == "LAB-123"
assert row_dict["nma_sample_point_id"] == "POINT-1"


def test_row_to_dict_missing_sample_point_id_returns_none_and_captures_error():
# Bypass __init__ so we can stub the cache without hitting the DB.
transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer)
sample_pt_id = uuid.uuid4()
transfer._sample_info_cache = {sample_pt_id: 1}
transfer.flags = {}
transfer.errors = []

row = pd.Series(
{
"SamplePtID": str(sample_pt_id),
"GlobalID": str(uuid.uuid4()),
# SamplePointID intentionally missing
"Analyte": "Ca",
"SampleValue": 10.5,
"Units": "mg/L",
"Symbol": None,
"AnalysisMethod": "ICP",
"AnalysisDate": "2024-01-01 00:00:00.000",
"Notes": "note",
"AnalysesAgency": "Lab",
"Uncertainty": 0.1,
"Volume": "2",
"VolumeUnit": "L",
"WCLab_ID": "LAB-123",
}
)

row_dict = transfer._row_to_dict(row)
assert row_dict is None
assert len(transfer.errors) == 1
error = transfer.errors[0]
assert error["field"] == "SamplePointID"
assert "Missing SamplePointID" in error["error"]
11 changes: 11 additions & 0 deletions tests/test_nma_chemistry_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def test_nma_minor_trace_chemistry_columns():
"id", # Integer PK
"nma_global_id", # Legacy UUID
"chemistry_sample_info_id", # Integer FK
"nma_sample_point_id", # Legacy sample point id
# from legacy
"analyte",
"sample_value",
Expand Down Expand Up @@ -173,6 +174,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_thing):
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="As",
sample_value=0.015,
units="mg/L",
Expand All @@ -193,6 +195,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_thing):
assert mtc.id is not None # Integer PK
assert mtc.nma_global_id is not None # Legacy UUID
assert mtc.chemistry_sample_info_id == sample_info.id # Integer FK
assert mtc.nma_sample_point_id == sample_info.nma_sample_point_id
assert mtc.analyte == "As"
assert mtc.sample_value == 0.015
assert mtc.units == "mg/L"
Expand Down Expand Up @@ -398,6 +401,7 @@ def test_assign_sample_info_to_mtc(shared_thing):
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info, # OO: assign object
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Pb",
)
session.add(mtc)
Expand Down Expand Up @@ -430,6 +434,7 @@ def test_append_mtc_to_sample_info(shared_thing):

mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Fe",
)
sample_info.minor_trace_chemistries.append(mtc)
Expand All @@ -451,6 +456,7 @@ def test_mtc_requires_chemistry_sample_info():
with session_ctx() as session:
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
nma_sample_point_id=_next_sample_point_id(),
analyte="Cu",
# No chemistry_sample_info_id - should fail
)
Expand Down Expand Up @@ -484,6 +490,7 @@ def test_full_lineage_navigation(shared_thing):
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Zn",
)
session.add(mtc)
Expand Down Expand Up @@ -521,6 +528,7 @@ def test_reverse_lineage_navigation(shared_thing):
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Mn",
)
session.add(mtc)
Expand Down Expand Up @@ -557,6 +565,7 @@ def test_cascade_delete_sample_info_deletes_mtc(shared_thing):
mtc = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Cd",
)
session.add(mtc)
Expand Down Expand Up @@ -678,11 +687,13 @@ def test_multiple_mtc_per_sample_info(shared_thing):
mtc1 = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="As",
)
mtc2 = NMA_MinorTraceChemistry(
nma_global_id=_next_global_id(),
chemistry_sample_info=sample_info,
nma_sample_point_id=sample_info.nma_sample_point_id,
analyte="Pb",
)
session.add_all([mtc1, mtc2])
Expand Down
3 changes: 0 additions & 3 deletions transfers/field_parameters_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ def _transfer_hook(self, session: Session) -> None:

Uses ON CONFLICT DO UPDATE on nma_GlobalID (legacy UUID PK, now UNIQUE).
"""
limit = self.flags.get("LIMIT", 0)
df = self.cleaned_df
if limit > 0:
df = df.head(limit)

row_dicts = []
for row in df.itertuples():
Expand Down
15 changes: 12 additions & 3 deletions transfers/minor_trace_chemistry_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- nma_global_id: Legacy UUID PK (GlobalID), UNIQUE for audit
- chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id
- nma_chemistry_sample_info_uuid: Legacy UUID FK for audit
- nma_sample_point_id: Legacy SamplePointID string
"""

from __future__ import annotations
Expand Down Expand Up @@ -115,10 +116,7 @@ def _transfer_hook(self, session: Session) -> None:

Uses ON CONFLICT DO UPDATE on nma_GlobalID (the legacy UUID PK, now UNIQUE).
"""
limit = self.flags.get("LIMIT", 0)
df = self.cleaned_df
if limit > 0:
df = df.head(limit)

# Convert rows to dicts
row_dicts = []
Expand Down Expand Up @@ -147,6 +145,7 @@ def _transfer_hook(self, session: Session) -> None:
set_={
"chemistry_sample_info_id": excluded.chemistry_sample_info_id,
"nma_chemistry_sample_info_uuid": excluded.nma_chemistry_sample_info_uuid,
"nma_sample_point_id": excluded.nma_sample_point_id,
"sample_value": excluded.sample_value,
"units": excluded.units,
"symbol": excluded.symbol,
Expand Down Expand Up @@ -176,6 +175,15 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]:
)
return None

sample_point_id = self._safe_str(row, "SamplePointID")
if sample_point_id is None:
self._capture_error(
legacy_sample_pt_id,
f"Missing SamplePointID for SamplePtID: {legacy_sample_pt_id}",
"SamplePointID",
)
return None

# Look up Integer FK from cache
chemistry_sample_info_id = self._sample_info_cache.get(legacy_sample_pt_id)
if chemistry_sample_info_id is None:
Expand Down Expand Up @@ -203,6 +211,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]:
"chemistry_sample_info_id": chemistry_sample_info_id,
# Legacy UUID FK for audit
"nma_chemistry_sample_info_uuid": legacy_sample_pt_id,
"nma_sample_point_id": sample_point_id,
# Data columns
"analyte": self._safe_str(row, "Analyte"),
"sample_value": self._safe_float(row, "SampleValue"),
Expand Down
8 changes: 1 addition & 7 deletions transfers/radionuclides.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from db import NMA_Radionuclides
from transfers.logger import logger
from transfers.transferer import ChemistryTransferer
from transfers.util import read_csv


class RadionuclidesTransferer(ChemistryTransferer):
Expand All @@ -50,15 +49,10 @@ class RadionuclidesTransferer(ChemistryTransferer):

source_table = "Radionuclides"

def __init__(self, *args, batch_size: int = 1000, **kwargs):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parse_dates = ["AnalysisDate"]

def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
input_df = read_csv(self.source_table, parse_dates=self._parse_dates)
cleaned_df = self._filter_to_valid_sample_infos(input_df)
return input_df, cleaned_df

def _transfer_hook(self, session: Session) -> None:
row_dicts = []
skipped_global_id = 0
Expand Down
11 changes: 6 additions & 5 deletions transfers/transferer.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _build_sample_info_cache(self) -> None:
)

def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
input_df = read_csv(self.source_table, parse_dates=self._parse_dates)
input_df = self._read_csv(self.source_table, parse_dates=self._parse_dates)
cleaned_df = self._filter_to_valid_sample_infos(input_df)
return input_df, cleaned_df

Expand All @@ -332,19 +332,20 @@ def _filter_to_valid_sample_infos(self, df: pd.DataFrame) -> pd.DataFrame:
inverted_df = df[~mask].copy()
if not inverted_df.empty:
for _, row in inverted_df.iterrows():
pointid = row["SamplePointID"]
sample_pt_id = row.get("SamplePtID")
self._capture_error(
pointid,
f"No matching ChemistrySampleInfo for SamplePtID: {pointid}",
sample_pt_id,
f"No matching ChemistrySampleInfo for SamplePtID: {sample_pt_id}",
"SamplePtID",
)

after_count = len(filtered_df)

if before_count > after_count:
skipped = before_count - after_count
table_name = self.source_table or self.__class__.__name__
logger.warning(
f"Filtered out {skipped} FieldParameters records without matching "
f"Filtered out {skipped} {table_name} records without matching "
f"ChemistrySampleInfo ({after_count} valid, {skipped} orphan records prevented)"
)

Expand Down
Loading