Skip to content

Commit 2d4d8ff

Browse files
committed
feat(migrations): make NMA_SurfaceWaterData.thing_id nullable
1 parent cfb576e commit 2d4d8ff

5 files changed

Lines changed: 143 additions & 148 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""Make NMA_SurfaceWaterData.thing_id nullable.
2+
3+
Revision ID: i2c3d4e5f6a7
4+
Revises: f1a2b3c4d5e6
5+
Create Date: 2026-02-20 17:40:00.000000
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
import sqlalchemy as sa
11+
from alembic import op
12+
from sqlalchemy import inspect
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "i2c3d4e5f6a7"
16+
down_revision: Union[str, Sequence[str], None] = "f1a2b3c4d5e6"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Allow orphan legacy SurfaceWaterData rows without a mapped Thing."""
23+
bind = op.get_bind()
24+
inspector = inspect(bind)
25+
if not inspector.has_table("NMA_SurfaceWaterData"):
26+
return
27+
28+
columns = {col["name"] for col in inspector.get_columns("NMA_SurfaceWaterData")}
29+
if "thing_id" not in columns:
30+
return
31+
32+
op.alter_column(
33+
"NMA_SurfaceWaterData",
34+
"thing_id",
35+
existing_type=sa.Integer(),
36+
nullable=True,
37+
)
38+
39+
40+
def downgrade() -> None:
41+
"""Revert to NOT NULL only when no null thing_id values exist."""
42+
bind = op.get_bind()
43+
inspector = inspect(bind)
44+
if not inspector.has_table("NMA_SurfaceWaterData"):
45+
return
46+
47+
columns = {col["name"] for col in inspector.get_columns("NMA_SurfaceWaterData")}
48+
if "thing_id" not in columns:
49+
return
50+
51+
op.execute('DELETE FROM "NMA_SurfaceWaterData" WHERE thing_id IS NULL')
52+
op.alter_column(
53+
"NMA_SurfaceWaterData",
54+
"thing_id",
55+
existing_type=sa.Integer(),
56+
nullable=False,
57+
)

db/nma_legacy.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -578,9 +578,9 @@ class NMA_SurfaceWaterData(Base):
578578
object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True)
579579

580580
# FK
581-
# FK to Thing - required for all SurfaceWaterData records
582-
thing_id: Mapped[int] = mapped_column(
583-
Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False
581+
# FK to Thing - optional when legacy rows cannot be mapped to a Thing.
582+
thing_id: Mapped[Optional[int]] = mapped_column(
583+
Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=True
584584
)
585585

586586
# Legacy PK (for audit)
@@ -615,16 +615,9 @@ class NMA_SurfaceWaterData(Base):
615615
data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(255))
616616

617617
# Relationships
618-
thing: Mapped["Thing"] = relationship("Thing", back_populates="surface_water_data")
619-
620-
@validates("thing_id")
621-
def validate_thing_id(self, key, value):
622-
"""Prevent orphan NMA_SurfaceWaterData - must have a parent Thing."""
623-
if value is None:
624-
raise ValueError(
625-
"NMA_SurfaceWaterData requires a parent Thing (thing_id cannot be None)"
626-
)
627-
return value
618+
thing: Mapped[Optional["Thing"]] = relationship(
619+
"Thing", back_populates="surface_water_data"
620+
)
628621

629622

630623
class NMA_SurfaceWaterPhotos(Base):

transfers/surface_water_data.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,12 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
6262

6363
def _transfer_hook(self, session: Session) -> None:
6464
rows: list[dict[str, Any]] = []
65-
skipped_missing_thing = 0
6665
for raw in self.cleaned_df.to_dict("records"):
6766
record = self._row_dict(raw)
68-
if record is None:
69-
skipped_missing_thing += 1
70-
continue
7167
rows.append(record)
7268

7369
rows = self._dedupe_rows(rows, key="OBJECTID", include_missing=True)
7470

75-
if skipped_missing_thing:
76-
logger.warning(
77-
"Skipped %s SurfaceWaterData rows without matching Thing",
78-
skipped_missing_thing,
79-
)
80-
8171
insert_stmt = insert(NMA_SurfaceWaterData)
8272
excluded = insert_stmt.excluded
8373

@@ -111,7 +101,7 @@ def _transfer_hook(self, session: Session) -> None:
111101
session.commit()
112102
session.expunge_all()
113103

114-
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
104+
def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
115105
def val(key: str) -> Optional[Any]:
116106
v = row.get(key)
117107
if pd.isna(v):
@@ -133,12 +123,6 @@ def to_uuid(v: Any) -> Optional[uuid.UUID]:
133123

134124
location_id = to_uuid(val("LocationId"))
135125
thing_id = self._resolve_thing_id(location_id)
136-
if thing_id is None:
137-
logger.warning(
138-
"Skipping SurfaceWaterData LocationId=%s - Thing not found",
139-
location_id,
140-
)
141-
return None
142126

143127
return {
144128
"LocationId": location_id,

transfers/transfer_results_builder.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlalchemy import select, func
88

99
from db.engine import session_ctx
10+
from transfers.transfer import load_transfer_options
1011
from transfers.transfer_results_specs import (
1112
TRANSFER_COMPARISON_SPECS,
1213
TransferComparisonSpec,
@@ -15,7 +16,12 @@
1516
TransferComparisonResults,
1617
TransferResult,
1718
)
18-
from transfers.util import read_csv
19+
from transfers.util import (
20+
read_csv,
21+
replace_nans,
22+
get_transferable_wells,
23+
)
24+
import os
1925

2026

2127
def _normalize_key(value: Any) -> str | None:
@@ -56,6 +62,8 @@ class TransferResultsBuilder:
5662

5763
def __init__(self, sample_limit: int = 25):
5864
self.sample_limit = sample_limit
65+
self.transfer_options = load_transfer_options()
66+
self.transfer_limit = int(os.getenv("TRANSFER_LIMIT", "1000"))
5967

6068
def build(self) -> TransferComparisonResults:
6169
results: dict[str, TransferResult] = {}
@@ -70,16 +78,18 @@ def _build_one(self, spec: TransferComparisonSpec) -> TransferResult:
7078
source_df = read_csv(spec.source_csv)
7179
if spec.source_filter:
7280
source_df = spec.source_filter(source_df)
73-
source_series = _normalized_series(source_df, spec.source_key_column)
81+
comparison_df = source_df
82+
enabled = self._is_enabled(spec)
83+
if not enabled:
84+
comparison_df = source_df.iloc[0:0]
85+
elif spec.transfer_name == "WellData":
86+
comparison_df = self._agreed_welldata_df()
87+
88+
source_series = _normalized_series(comparison_df, spec.source_key_column)
7489
source_keys = set(source_series.unique().tolist())
7590
source_keyed_row_count = int(source_series.shape[0])
7691
source_duplicate_key_row_count = source_keyed_row_count - len(source_keys)
77-
agreed_transfer_row_count = int(len(source_df))
78-
if spec.agreed_row_counter is not None:
79-
try:
80-
agreed_transfer_row_count = int(spec.agreed_row_counter())
81-
except Exception:
82-
agreed_transfer_row_count = int(len(source_df))
92+
agreed_transfer_row_count = int(len(comparison_df))
8393

8494
model = spec.destination_model
8595
key_col = getattr(model, spec.destination_key_column)
@@ -134,20 +144,44 @@ def _build_one(self, spec: TransferComparisonSpec) -> TransferResult:
134144
extra_in_destination_sample=extra[: self.sample_limit],
135145
)
136146

147+
def _is_enabled(self, spec: TransferComparisonSpec) -> bool:
148+
if not spec.option_field:
149+
return True
150+
return bool(getattr(self.transfer_options, spec.option_field, True))
151+
152+
def _agreed_welldata_df(self) -> pd.DataFrame:
153+
wdf = read_csv("WellData", dtype={"OSEWelltagID": str})
154+
ldf = read_csv("Location")
155+
ldf = ldf.drop(["PointID", "SSMA_TimeStamp"], axis=1, errors="ignore")
156+
wdf = wdf.join(ldf.set_index("LocationId"), on="LocationId")
157+
wdf = wdf[wdf["SiteType"] == "GW"]
158+
wdf = wdf[wdf["Easting"].notna() & wdf["Northing"].notna()]
159+
wdf = replace_nans(wdf)
160+
161+
cleaned_df = get_transferable_wells(wdf)
162+
163+
dupes = cleaned_df["PointID"].duplicated(keep=False)
164+
if dupes.any():
165+
dup_ids = set(cleaned_df.loc[dupes, "PointID"])
166+
cleaned_df = cleaned_df[~cleaned_df["PointID"].isin(dup_ids)]
167+
168+
if self.transfer_limit > 0:
169+
cleaned_df = cleaned_df.head(self.transfer_limit)
170+
return cleaned_df
171+
137172
@staticmethod
138173
def write_summary(path: Path, comparison: TransferComparisonResults) -> None:
139174
lines = [
140175
f"generated_at={comparison.generated_at}",
141176
"",
142-
"| Transfer | Source CSV | Source Rows | Agreed Rows | Dest Model | Dest Rows | Missing Agreed | Matched | Missing | Extra |",
143-
"|---|---|---:|---:|---|---:|---:|---:|---:|---:|",
177+
"| Transfer | Source CSV | Source Rows | Agreed Rows | Dest Model | Dest Rows | Missing Agreed |",
178+
"|---|---|---:|---:|---|---:|---:|",
144179
]
145180
for name in sorted(comparison.results.keys()):
146181
r = comparison.results[name]
147182
missing_agreed = r.agreed_transfer_row_count - r.destination_row_count
148183
lines.append(
149184
f"| {name} | {r.source_csv} | {r.source_row_count} | {r.agreed_transfer_row_count} | "
150-
f"{r.destination_model} | {r.destination_row_count} | {missing_agreed} | "
151-
f"{r.matched_key_count} | {r.missing_in_destination_count} | {r.extra_in_destination_count} |"
185+
f"{r.destination_model} | {r.destination_row_count} | {missing_agreed} |"
152186
)
153187
path.write_text("\n".join(lines) + "\n")

0 commit comments

Comments
 (0)