Skip to content

Commit bfac303

Browse files
committed
feat(nma_legacy): add thing_id foreign key to NMA_WeatherData and establish relationship with Thing
- add thing_id FK + relationship on NMA_WeatherData with validation - cache Thing.nma_pk_location and resolve thing_id in WeatherData transfer - skip unlinked WeatherData rows to prevent orphaned children - add alembic migration to backfill thing_id and enforce NOT NULL - update WeatherData legacy tests to include Thing linkage
1 parent 53df9c9 commit bfac303

5 files changed

Lines changed: 176 additions & 17 deletions

File tree

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""add thing_id to NMA_WeatherData
2+
3+
Revision ID: f6e5d4c3b2a1
4+
Revises: c7f8a9b0c1d2
5+
Create Date: 2026-02-05 10:40:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "f6e5d4c3b2a1"
16+
down_revision: Union[str, Sequence[str], None] = "c7f8a9b0c1d2"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
op.add_column(
24+
"NMA_WeatherData",
25+
sa.Column("thing_id", sa.Integer(), nullable=True),
26+
)
27+
op.create_foreign_key(
28+
"fk_weather_data_thing_id",
29+
"NMA_WeatherData",
30+
"thing",
31+
["thing_id"],
32+
["id"],
33+
ondelete="CASCADE",
34+
)
35+
# Backfill thing_id based on LocationId -> Thing.nma_pk_location
36+
op.execute(
37+
"""
38+
UPDATE "NMA_WeatherData" wd
39+
SET thing_id = t.id
40+
FROM thing t
41+
WHERE t.nma_pk_location IS NOT NULL
42+
AND wd."LocationId" IS NOT NULL
43+
AND t.nma_pk_location = wd."LocationId"::text
44+
"""
45+
)
46+
# Remove any rows that cannot be linked to a Thing, then enforce NOT NULL
47+
op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL')
48+
op.alter_column(
49+
"NMA_WeatherData", "thing_id", existing_type=sa.Integer(), nullable=False
50+
)
51+
52+
53+
def downgrade() -> None:
54+
"""Downgrade schema."""
55+
op.drop_constraint(
56+
"fk_weather_data_thing_id",
57+
"NMA_WeatherData",
58+
type_="foreignkey",
59+
)
60+
op.drop_column("NMA_WeatherData", "thing_id")

db/nma_legacy.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ class NMA_WeatherData(Base):
663663
"""
664664
Legacy WeatherData table from AMPAPI.
665665
666-
Note: This table is OUT OF SCOPE for refactoring (not a Thing child).
666+
Note: This table is a Thing child and must link to a parent Thing.
667667
"""
668668

669669
__tablename__ = "NMA_WeatherData"
@@ -672,7 +672,9 @@ class NMA_WeatherData(Base):
672672
object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True)
673673

674674
# FK
675-
# FK not assigned.
675+
thing_id: Mapped[int] = mapped_column(
676+
Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False
677+
)
676678

677679
# Legacy PK (for audit)
678680
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
@@ -687,6 +689,18 @@ class NMA_WeatherData(Base):
687689
# Additional columns
688690
point_id: Mapped[str] = mapped_column("PointID", String(10))
689691

692+
# Relationships
693+
thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data")
694+
695+
@validates("thing_id")
696+
def validate_thing_id(self, key, value):
697+
"""Prevent orphan NMA_WeatherData - must have a parent Thing."""
698+
if value is None:
699+
raise ValueError(
700+
"NMA_WeatherData requires a parent Thing (thing_id cannot be None)"
701+
)
702+
return value
703+
690704

691705
class NMA_WeatherPhotos(Base):
692706
"""

db/thing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
NMA_Stratigraphy,
5656
NMA_SurfaceWaterData,
5757
NMA_WaterLevelsContinuous_Pressure_Daily,
58+
NMA_WeatherData,
5859
)
5960

6061

@@ -368,6 +369,12 @@ class Thing(
368369
cascade="all, delete-orphan",
369370
passive_deletes=True,
370371
)
372+
weather_data: Mapped[List["NMA_WeatherData"]] = relationship(
373+
"NMA_WeatherData",
374+
back_populates="thing",
375+
cascade="all, delete-orphan",
376+
passive_deletes=True,
377+
)
371378

372379
# --- Association Proxies ---
373380
assets: AssociationProxy[list["Asset"]] = association_proxy(

tests/test_weather_data_legacy.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,33 @@
2828

2929
from db.engine import session_ctx
3030
from db.nma_legacy import NMA_WeatherData
31+
from db.thing import Thing
3132

3233

3334
def _next_object_id() -> int:
3435
# Use a negative value to avoid collisions with existing legacy OBJECTIDs.
3536
return -(uuid4().int % 2_000_000_000)
3637

3738

39+
def _attach_thing_with_location(session, water_well_thing):
40+
location_id = uuid4()
41+
thing = session.get(Thing, water_well_thing.id)
42+
thing.nma_pk_location = str(location_id)
43+
session.commit()
44+
return thing, location_id
45+
46+
3847
# ===================== CREATE tests ==========================
39-
def test_create_weather_data_all_fields():
48+
def test_create_weather_data_all_fields(water_well_thing):
4049
"""Test creating a weather data record with all migrated fields."""
4150
with session_ctx() as session:
51+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
4252
record = NMA_WeatherData(
4353
object_id=_next_object_id(),
44-
location_id=uuid4(),
54+
location_id=location_id,
4555
point_id="WX-1001",
4656
weather_id=uuid4(),
57+
thing_id=thing.id,
4758
)
4859
session.add(record)
4960
session.commit()
@@ -58,33 +69,39 @@ def test_create_weather_data_all_fields():
5869
session.commit()
5970

6071

61-
def test_create_weather_data_minimal():
72+
def test_create_weather_data_minimal(water_well_thing):
6273
"""Test creating a weather data record with minimal fields."""
6374
with session_ctx() as session:
75+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
6476
record = NMA_WeatherData(
6577
object_id=_next_object_id(),
6678
point_id="WX-1002",
79+
location_id=location_id,
80+
thing_id=thing.id,
6781
)
6882
session.add(record)
6983
session.commit()
7084
session.refresh(record)
7185

7286
assert record.object_id is not None
7387
assert record.point_id == "WX-1002"
74-
assert record.location_id is None
88+
assert record.location_id is not None
7589
assert record.weather_id is None
7690

7791
session.delete(record)
7892
session.commit()
7993

8094

8195
# ===================== READ tests ==========================
82-
def test_read_weather_data_by_object_id():
96+
def test_read_weather_data_by_object_id(water_well_thing):
8397
"""Test reading a specific weather data record by OBJECTID."""
8498
with session_ctx() as session:
99+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
85100
record = NMA_WeatherData(
86101
object_id=_next_object_id(),
87102
point_id="WX-1003",
103+
location_id=location_id,
104+
thing_id=thing.id,
88105
)
89106
session.add(record)
90107
session.commit()
@@ -98,16 +115,21 @@ def test_read_weather_data_by_object_id():
98115
session.commit()
99116

100117

101-
def test_query_weather_data_by_point_id():
118+
def test_query_weather_data_by_point_id(water_well_thing):
102119
"""Test querying weather data by point_id."""
103120
with session_ctx() as session:
121+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
104122
record1 = NMA_WeatherData(
105123
object_id=_next_object_id(),
106124
point_id="WX-1004",
125+
location_id=location_id,
126+
thing_id=thing.id,
107127
)
108128
record2 = NMA_WeatherData(
109129
object_id=_next_object_id(),
110130
point_id="WX-1005",
131+
location_id=location_id,
132+
thing_id=thing.id,
111133
)
112134
session.add_all([record1, record2])
113135
session.commit()
@@ -126,12 +148,15 @@ def test_query_weather_data_by_point_id():
126148

127149

128150
# ===================== UPDATE tests ==========================
129-
def test_update_weather_data():
151+
def test_update_weather_data(water_well_thing):
130152
"""Test updating a weather data record."""
131153
with session_ctx() as session:
154+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
132155
record = NMA_WeatherData(
133156
object_id=_next_object_id(),
134157
point_id="WX-1006",
158+
location_id=location_id,
159+
thing_id=thing.id,
135160
)
136161
session.add(record)
137162
session.commit()
@@ -151,12 +176,15 @@ def test_update_weather_data():
151176

152177

153178
# ===================== DELETE tests ==========================
154-
def test_delete_weather_data():
179+
def test_delete_weather_data(water_well_thing):
155180
"""Test deleting a weather data record."""
156181
with session_ctx() as session:
182+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
157183
record = NMA_WeatherData(
158184
object_id=_next_object_id(),
159185
point_id="WX-1007",
186+
location_id=location_id,
187+
thing_id=thing.id,
160188
)
161189
session.add(record)
162190
session.commit()
@@ -178,6 +206,7 @@ def test_weather_data_has_all_migrated_columns():
178206
"point_id",
179207
"weather_id",
180208
"object_id",
209+
"thing_id",
181210
]
182211

183212
for column in expected_columns:

transfers/weather_data.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from sqlalchemy.dialects.postgresql import insert
2424
from sqlalchemy.orm import Session
2525

26-
from db import NMA_WeatherData
26+
from db import NMA_WeatherData, Thing
27+
from db.engine import session_ctx
2728
from transfers.logger import logger
2829
from transfers.transferer import Transferer
2930
from transfers.util import read_csv
@@ -39,16 +40,43 @@ class WeatherDataTransferer(Transferer):
3940
def __init__(self, *args, batch_size: int = 1000, **kwargs):
4041
super().__init__(*args, **kwargs)
4142
self.batch_size = batch_size
43+
self._thing_id_by_location_id: dict[str, int] = {}
44+
self._build_thing_id_cache()
45+
46+
def _build_thing_id_cache(self) -> None:
47+
with session_ctx() as session:
48+
things = session.query(Thing.id, Thing.nma_pk_location).all()
49+
for thing_id, nma_pk_location in things:
50+
if nma_pk_location:
51+
key = self._normalize_location_id(nma_pk_location)
52+
if key:
53+
self._thing_id_by_location_id[key] = thing_id
54+
logger.info(
55+
"Built Thing cache with %s location ids",
56+
len(self._thing_id_by_location_id),
57+
)
4258

4359
def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
4460
df = read_csv(self.source_table)
4561
return df, df
4662

4763
def _transfer_hook(self, session: Session) -> None:
48-
rows = self._dedupe_rows(
49-
[self._row_dict(row) for row in self.cleaned_df.to_dict("records")],
50-
key="OBJECTID",
51-
)
64+
rows: list[dict[str, Any]] = []
65+
skipped_missing_thing = 0
66+
for raw in self.cleaned_df.to_dict("records"):
67+
record = self._row_dict(raw)
68+
if record is None:
69+
skipped_missing_thing += 1
70+
continue
71+
rows.append(record)
72+
73+
rows = self._dedupe_rows(rows, key="OBJECTID")
74+
75+
if skipped_missing_thing:
76+
logger.warning(
77+
"Skipped %s WeatherData rows without matching Thing",
78+
skipped_missing_thing,
79+
)
5280

5381
insert_stmt = insert(NMA_WeatherData)
5482
excluded = insert_stmt.excluded
@@ -61,6 +89,7 @@ def _transfer_hook(self, session: Session) -> None:
6189
stmt = insert_stmt.values(chunk).on_conflict_do_update(
6290
index_elements=["OBJECTID"],
6391
set_={
92+
"thing_id": excluded["thing_id"],
6493
"LocationId": excluded.LocationId,
6594
"PointID": excluded.PointID,
6695
"WeatherID": excluded.WeatherID,
@@ -71,7 +100,7 @@ def _transfer_hook(self, session: Session) -> None:
71100
session.commit()
72101
session.expunge_all()
73102

74-
def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
103+
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
75104
def val(key: str) -> Optional[Any]:
76105
v = row.get(key)
77106
if pd.isna(v):
@@ -87,11 +116,21 @@ def to_uuid(v: Any) -> Optional[uuid.UUID]:
87116
return uuid.UUID(v)
88117
return None
89118

119+
location_id = to_uuid(val("LocationId"))
120+
thing_id = self._resolve_thing_id(location_id)
121+
if thing_id is None:
122+
logger.warning(
123+
"Skipping WeatherData LocationId=%s - Thing not found",
124+
location_id,
125+
)
126+
return None
127+
90128
return {
91-
"LocationId": to_uuid(val("LocationId")),
129+
"LocationId": location_id,
92130
"PointID": val("PointID"),
93131
"WeatherID": to_uuid(val("WeatherID")),
94132
"OBJECTID": val("OBJECTID"),
133+
"thing_id": thing_id,
95134
}
96135

97136
def _dedupe_rows(
@@ -111,6 +150,16 @@ def _dedupe_rows(
111150
deduped[row_key] = row
112151
return list(deduped.values()) + passthrough
113152

153+
def _resolve_thing_id(self, location_id: Optional[uuid.UUID]) -> Optional[int]:
154+
if location_id is None:
155+
return None
156+
key = self._normalize_location_id(str(location_id))
157+
return self._thing_id_by_location_id.get(key)
158+
159+
@staticmethod
160+
def _normalize_location_id(value: str) -> str:
161+
return value.strip().lower()
162+
114163

115164
def run(batch_size: int = 1000) -> None:
116165
"""Entrypoint to execute the transfer."""

0 commit comments

Comments
 (0)