Skip to content

Commit d3de737

Browse files
committed
feat(nma_legacy): establish relationship between NMA_SurfaceWaterPhotos and NMA_SurfaceWaterData via SurfaceID
- make NMA_SurfaceWaterData.surface_id unique to support FK target - add FK + relationship on NMA_SurfaceWaterPhotos.surface_id with validation - enforce parent-first ordering by running SurfaceWaterPhotos after SurfaceWaterData - skip orphan photo rows in transfer using cached SurfaceIDs - add migration to backfill/cleanup and enforce NOT NULL + FK - update legacy tests to create photos with real parent SurfaceWaterData
1 parent bfac303 commit d3de737

5 files changed

Lines changed: 189 additions & 19 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""link surface water photos to surface water data
2+
3+
Revision ID: a1b2c3d4e5f6
4+
Revises: f6e5d4c3b2a1
5+
Create Date: 2026-02-05 11:10:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "a1b2c3d4e5f6"
16+
down_revision: Union[str, Sequence[str], None] = "f6e5d4c3b2a1"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
op.create_unique_constraint(
24+
"uq_surface_water_data_surface_id",
25+
"NMA_SurfaceWaterData",
26+
["SurfaceID"],
27+
)
28+
op.create_foreign_key(
29+
"fk_surface_water_photos_surface_id",
30+
"NMA_SurfaceWaterPhotos",
31+
"NMA_SurfaceWaterData",
32+
["SurfaceID"],
33+
["SurfaceID"],
34+
ondelete="CASCADE",
35+
)
36+
op.execute(
37+
"""
38+
DELETE FROM "NMA_SurfaceWaterPhotos" p
39+
WHERE p."SurfaceID" IS NULL
40+
OR NOT EXISTS (
41+
SELECT 1
42+
FROM "NMA_SurfaceWaterData" d
43+
WHERE d."SurfaceID" = p."SurfaceID"
44+
)
45+
"""
46+
)
47+
op.alter_column(
48+
"NMA_SurfaceWaterPhotos",
49+
"SurfaceID",
50+
existing_type=sa.UUID(),
51+
nullable=False,
52+
)
53+
54+
55+
def downgrade() -> None:
56+
"""Downgrade schema."""
57+
op.alter_column(
58+
"NMA_SurfaceWaterPhotos",
59+
"SurfaceID",
60+
existing_type=sa.UUID(),
61+
nullable=True,
62+
)
63+
op.drop_constraint(
64+
"fk_surface_water_photos_surface_id",
65+
"NMA_SurfaceWaterPhotos",
66+
type_="foreignkey",
67+
)
68+
op.drop_constraint(
69+
"uq_surface_water_data_surface_id",
70+
"NMA_SurfaceWaterData",
71+
type_="unique",
72+
)

db/nma_legacy.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ class NMA_SurfaceWaterData(Base):
586586

587587
# Legacy PK (for audit)
588588
surface_id: Mapped[uuid.UUID] = mapped_column(
589-
"SurfaceID", UUID(as_uuid=True), nullable=False
589+
"SurfaceID", UUID(as_uuid=True), nullable=False, unique=True
590590
)
591591

592592
# Legacy FK (for audit)
@@ -617,6 +617,12 @@ class NMA_SurfaceWaterData(Base):
617617

618618
# Relationships
619619
thing: Mapped["Thing"] = relationship("Thing", back_populates="surface_water_data")
620+
surface_water_photos: Mapped[list["NMA_SurfaceWaterPhotos"]] = relationship(
621+
"NMA_SurfaceWaterPhotos",
622+
back_populates="surface_water_data",
623+
cascade="all, delete-orphan",
624+
passive_deletes=True,
625+
)
620626

621627
@validates("thing_id")
622628
def validate_thing_id(self, key, value):
@@ -632,7 +638,7 @@ class NMA_SurfaceWaterPhotos(Base):
632638
"""
633639
Legacy SurfaceWaterPhotos table from NM_Aquifer.
634640
635-
Note: This table is OUT OF SCOPE for refactoring (not a Thing child).
641+
Note: This table is a child of NMA_SurfaceWaterData via SurfaceID.
636642
"""
637643

638644
__tablename__ = "NMA_SurfaceWaterPhotos"
@@ -643,21 +649,39 @@ class NMA_SurfaceWaterPhotos(Base):
643649
)
644650

645651
# FK
646-
# FK not assigned.
652+
surface_id: Mapped[uuid.UUID] = mapped_column(
653+
"SurfaceID",
654+
UUID(as_uuid=True),
655+
ForeignKey("NMA_SurfaceWaterData.SurfaceID", ondelete="CASCADE"),
656+
nullable=False,
657+
)
647658

648659
# Legacy PK (for audit)
649660
# Current `global_id` is also the original PK in the legacy DB
650661

651662
# Legacy FK (for audit)
652-
surface_id: Mapped[Optional[uuid.UUID]] = mapped_column(
653-
"SurfaceID", UUID(as_uuid=True)
654-
)
663+
# surface_id is also the legacy FK in the source table.
655664

656665
# Additional columns
657666
point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False)
658667
ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50))
659668
object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True)
660669

670+
# Relationships
671+
surface_water_data: Mapped["NMA_SurfaceWaterData"] = relationship(
672+
"NMA_SurfaceWaterData", back_populates="surface_water_photos"
673+
)
674+
675+
@validates("surface_id")
676+
def validate_surface_id(self, key, value):
677+
"""Prevent orphan NMA_SurfaceWaterPhotos - must have a parent NMA_SurfaceWaterData."""
678+
if value is None:
679+
raise ValueError(
680+
"NMA_SurfaceWaterPhotos requires a parent NMA_SurfaceWaterData "
681+
"(surface_id cannot be None)"
682+
)
683+
return value
684+
661685

662686
class NMA_WeatherData(Base):
663687
"""

tests/test_surface_water_photos_legacy.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,43 @@
2828
from uuid import uuid4
2929

3030
from db.engine import session_ctx
31-
from db.nma_legacy import NMA_SurfaceWaterPhotos
31+
from db.nma_legacy import NMA_SurfaceWaterData, NMA_SurfaceWaterPhotos
32+
from db.thing import Thing
3233

3334

34-
def test_create_surface_water_photos_all_fields():
35+
def _next_object_id() -> int:
36+
# Use a negative value to avoid collisions with existing legacy OBJECTIDs.
37+
return -(uuid4().int % 2_000_000_000)
38+
39+
40+
def _attach_thing_with_location(session, water_well_thing):
41+
location_id = uuid4()
42+
thing = session.get(Thing, water_well_thing.id)
43+
thing.nma_pk_location = str(location_id)
44+
session.commit()
45+
return thing, location_id
46+
47+
48+
def _create_surface_water_data(session, water_well_thing):
49+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
50+
record = NMA_SurfaceWaterData(
51+
location_id=location_id,
52+
thing_id=thing.id,
53+
surface_id=uuid4(),
54+
point_id="SW-1000",
55+
object_id=_next_object_id(),
56+
)
57+
session.add(record)
58+
session.commit()
59+
return record
60+
61+
62+
def test_create_surface_water_photos_all_fields(water_well_thing):
3563
"""Test creating a surface water photos record with all fields."""
3664
with session_ctx() as session:
65+
parent = _create_surface_water_data(session, water_well_thing)
3766
record = NMA_SurfaceWaterPhotos(
38-
surface_id=uuid4(),
67+
surface_id=parent.surface_id,
3968
point_id="SW-0001",
4069
ole_path="photo.jpg",
4170
object_id=123,
@@ -52,14 +81,17 @@ def test_create_surface_water_photos_all_fields():
5281
assert record.object_id == 123
5382

5483
session.delete(record)
84+
session.delete(parent)
5585
session.commit()
5686

5787

58-
def test_create_surface_water_photos_minimal():
88+
def test_create_surface_water_photos_minimal(water_well_thing):
5989
"""Test creating a surface water photos record with required fields only."""
6090
with session_ctx() as session:
91+
parent = _create_surface_water_data(session, water_well_thing)
6192
record = NMA_SurfaceWaterPhotos(
6293
point_id="SW-0002",
94+
surface_id=parent.surface_id,
6395
global_id=uuid4(),
6496
)
6597
session.add(record)
@@ -68,11 +100,12 @@ def test_create_surface_water_photos_minimal():
68100

69101
assert record.global_id is not None
70102
assert record.point_id == "SW-0002"
71-
assert record.surface_id is None
103+
assert record.surface_id is not None
72104
assert record.ole_path is None
73105
assert record.object_id is None
74106

75107
session.delete(record)
108+
session.delete(parent)
76109
session.commit()
77110

78111

transfers/surface_water_photos.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from sqlalchemy.dialects.postgresql import insert
2424
from sqlalchemy.orm import Session
2525

26-
from db import NMA_SurfaceWaterPhotos
26+
from db import NMA_SurfaceWaterData, NMA_SurfaceWaterPhotos
27+
from db.engine import session_ctx
2728
from transfers.logger import logger
2829
from transfers.transferer import Transferer
2930
from transfers.util import replace_nans
@@ -37,19 +38,44 @@ class SurfaceWaterPhotosTransferer(Transferer):
3738
def __init__(self, *args, batch_size: int = 1000, **kwargs):
3839
super().__init__(*args, **kwargs)
3940
self.batch_size = batch_size
41+
self._surface_id_cache: set[str] = set()
42+
self._build_surface_id_cache()
43+
44+
def _build_surface_id_cache(self) -> None:
45+
with session_ctx() as session:
46+
surface_ids = session.query(NMA_SurfaceWaterData.surface_id).all()
47+
for (surface_id,) in surface_ids:
48+
if surface_id:
49+
self._surface_id_cache.add(self._normalize_surface_id(surface_id))
50+
logger.info(
51+
"Built SurfaceWaterData cache with %s surface ids",
52+
len(self._surface_id_cache),
53+
)
4054

4155
def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
4256
df = self._read_csv(self.source_table)
4357
cleaned_df = replace_nans(df)
4458
return df, cleaned_df
4559

4660
def _transfer_hook(self, session: Session) -> None:
47-
rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")]
61+
rows: list[dict[str, Any]] = []
62+
skipped_missing_parent = 0
63+
for raw in self.cleaned_df.to_dict("records"):
64+
record = self._row_dict(raw)
65+
if record is None:
66+
skipped_missing_parent += 1
67+
continue
68+
rows.append(record)
4869
rows = self._dedupe_rows(rows, key="GlobalID")
4970

5071
if not rows:
5172
logger.info("No SurfaceWaterPhotos rows to transfer")
5273
return
74+
if skipped_missing_parent:
75+
logger.warning(
76+
"Skipped %s SurfaceWaterPhotos rows without matching SurfaceWaterData",
77+
skipped_missing_parent,
78+
)
5379

5480
insert_stmt = insert(NMA_SurfaceWaterPhotos)
5581
excluded = insert_stmt.excluded
@@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None:
74100
session.execute(stmt)
75101
session.commit()
76102

77-
def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
103+
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
104+
surface_id = self._uuid_val(row.get("SurfaceID"))
105+
if surface_id is None or not self._has_surface_id(surface_id):
106+
logger.warning(
107+
"Skipping SurfaceWaterPhotos SurfaceID=%s - SurfaceWaterData not found",
108+
surface_id,
109+
)
110+
return None
78111
return {
79-
"SurfaceID": self._uuid_val(row.get("SurfaceID")),
112+
"SurfaceID": surface_id,
80113
"PointID": row.get("PointID"),
81114
"OLEPath": row.get("OLEPath"),
82115
"OBJECTID": row.get("OBJECTID"),
@@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]:
107140
return None
108141
return None
109142

143+
def _has_surface_id(self, surface_id: UUID) -> bool:
144+
return self._normalize_surface_id(surface_id) in self._surface_id_cache
145+
146+
@staticmethod
147+
def _normalize_surface_id(value: UUID) -> str:
148+
return str(value).strip().lower()
149+
110150

111151
def run(batch_size: int = 1000) -> None:
112152
"""Entrypoint to execute the transfer."""

transfers/transfer.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,6 @@ def _transfer_parallel(
513513
parallel_tasks_1.append(("LinkIdsLocation", LinkIdsLocationDataTransferer))
514514
if opts.transfer_groups:
515515
parallel_tasks_1.append(("Groups", ProjectGroupTransferer))
516-
if opts.transfer_surface_water_photos:
517-
parallel_tasks_1.append(("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer))
518516
if opts.transfer_soil_rock_results:
519517
parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer))
520518
if opts.transfer_weather_photos:
@@ -599,8 +597,6 @@ def _transfer_parallel(
599597
metrics.location_link_ids_metrics(*results_map["LinkIdsLocation"])
600598
if "Groups" in results_map and results_map["Groups"]:
601599
metrics.group_metrics(*results_map["Groups"])
602-
if "SurfaceWaterPhotos" in results_map and results_map["SurfaceWaterPhotos"]:
603-
metrics.surface_water_photos_metrics(*results_map["SurfaceWaterPhotos"])
604600
if "SoilRockResults" in results_map and results_map["SoilRockResults"]:
605601
metrics.soil_rock_results_metrics(*results_map["SoilRockResults"])
606602
if "Assets" in results_map and results_map["Assets"]:
@@ -629,6 +625,11 @@ def _transfer_parallel(
629625
if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]:
630626
metrics.weather_photos_metrics(*results_map["WeatherPhotos"])
631627

628+
if opts.transfer_surface_water_photos:
629+
message("TRANSFERRING SURFACE WATER PHOTOS")
630+
results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags)
631+
metrics.surface_water_photos_metrics(*results)
632+
632633
if opts.transfer_major_chemistry:
633634
message("TRANSFERRING MAJOR CHEMISTRY")
634635
results = _execute_transfer(MajorChemistryTransferer, flags=flags)

0 commit comments

Comments
 (0)