Skip to content

Commit 830609d

Browse files
authored
Merge pull request #470 from DataIntegrationGroup/kas-bdms-540-NMA_WeatherPhotos-FK-relationship
BDMS-540: Link NMA_WeatherPhotos to NMA_WeatherData
2 parents f174966 + 438ef7d commit 830609d

File tree

5 files changed

+187
-19
lines changed

5 files changed

+187
-19
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""link weather photos to weather data
2+
3+
Revision ID: b7c8d9e0f1a2
4+
Revises: a1b2c3d4e5f6
5+
Create Date: 2026-02-05 11:20:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "b7c8d9e0f1a2"
16+
down_revision: Union[str, Sequence[str], None] = "a1b2c3d4e5f6"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
op.create_unique_constraint(
24+
"uq_weather_data_weather_id",
25+
"NMA_WeatherData",
26+
["WeatherID"],
27+
)
28+
op.create_foreign_key(
29+
"fk_weather_photos_weather_id",
30+
"NMA_WeatherPhotos",
31+
"NMA_WeatherData",
32+
["WeatherID"],
33+
["WeatherID"],
34+
ondelete="CASCADE",
35+
)
36+
op.execute("""
37+
DELETE FROM "NMA_WeatherPhotos" p
38+
WHERE p."WeatherID" IS NULL
39+
OR NOT EXISTS (
40+
SELECT 1
41+
FROM "NMA_WeatherData" d
42+
WHERE d."WeatherID" = p."WeatherID"
43+
)
44+
""")
45+
op.alter_column(
46+
"NMA_WeatherPhotos",
47+
"WeatherID",
48+
existing_type=sa.UUID(),
49+
nullable=False,
50+
)
51+
52+
53+
def downgrade() -> None:
54+
"""Downgrade schema."""
55+
op.alter_column(
56+
"NMA_WeatherPhotos",
57+
"WeatherID",
58+
existing_type=sa.UUID(),
59+
nullable=True,
60+
)
61+
op.drop_constraint(
62+
"fk_weather_photos_weather_id",
63+
"NMA_WeatherPhotos",
64+
type_="foreignkey",
65+
)
66+
op.drop_constraint(
67+
"uq_weather_data_weather_id",
68+
"NMA_WeatherData",
69+
type_="unique",
70+
)

db/nma_legacy.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ class NMA_WeatherData(Base):
702702

703703
# Legacy PK (for audit)
704704
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
705-
"WeatherID", UUID(as_uuid=True)
705+
"WeatherID", UUID(as_uuid=True), unique=True
706706
)
707707

708708
# Legacy FK (for audit)
@@ -715,6 +715,12 @@ class NMA_WeatherData(Base):
715715

716716
# Relationships
717717
thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data")
718+
weather_photos: Mapped[list["NMA_WeatherPhotos"]] = relationship(
719+
"NMA_WeatherPhotos",
720+
back_populates="weather_data",
721+
cascade="all, delete-orphan",
722+
passive_deletes=True,
723+
)
718724

719725
@validates("thing_id")
720726
def validate_thing_id(self, key, value):
@@ -730,7 +736,7 @@ class NMA_WeatherPhotos(Base):
730736
"""
731737
Legacy WeatherPhotos table from NM_Aquifer.
732738
733-
Note: This table is OUT OF SCOPE for refactoring (not a Thing child).
739+
Note: This table is a child of NMA_WeatherData via WeatherID.
734740
"""
735741

736742
__tablename__ = "NMA_WeatherPhotos"
@@ -741,21 +747,39 @@ class NMA_WeatherPhotos(Base):
741747
)
742748

743749
# FK:
744-
# FK not assigned.
750+
weather_id: Mapped[uuid.UUID] = mapped_column(
751+
"WeatherID",
752+
UUID(as_uuid=True),
753+
ForeignKey("NMA_WeatherData.WeatherID", ondelete="CASCADE"),
754+
nullable=False,
755+
)
745756

746757
# Legacy PK (for audit):
747758
# Current `global_id` is also the original PK in the legacy DB
748759

749760
# Legacy FK (for audit):
750-
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
751-
"WeatherID", UUID(as_uuid=True)
752-
)
761+
# weather_id is also the legacy FK in the source table.
753762

754763
# Additional columns
755764
point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False)
756765
ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50))
757766
object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True)
758767

768+
# Relationships
769+
weather_data: Mapped["NMA_WeatherData"] = relationship(
770+
"NMA_WeatherData", back_populates="weather_photos"
771+
)
772+
773+
@validates("weather_id")
774+
def validate_weather_id(self, key, value):
775+
"""Prevent orphan NMA_WeatherPhotos - must have a parent NMA_WeatherData."""
776+
if value is None:
777+
raise ValueError(
778+
"NMA_WeatherPhotos requires a parent NMA_WeatherData "
779+
"(weather_id cannot be None)"
780+
)
781+
return value
782+
759783

760784
class NMA_Soil_Rock_Results(Base):
761785
"""

tests/test_weather_photos_legacy.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,43 @@
2828
from uuid import uuid4
2929

3030
from db.engine import session_ctx
31-
from db.nma_legacy import NMA_WeatherPhotos
31+
from db.nma_legacy import NMA_WeatherData, NMA_WeatherPhotos
32+
from db.thing import Thing
3233

3334

34-
def test_create_weather_photos_all_fields():
35+
def _next_object_id() -> int:
36+
# Use a negative value to avoid collisions with existing legacy OBJECTIDs.
37+
return -(uuid4().int % 2_000_000_000)
38+
39+
40+
def _attach_thing_with_location(session, water_well_thing):
41+
location_id = uuid4()
42+
thing = session.get(Thing, water_well_thing.id)
43+
thing.nma_pk_location = str(location_id)
44+
session.commit()
45+
return thing, location_id
46+
47+
48+
def _create_weather_data(session, water_well_thing):
49+
thing, location_id = _attach_thing_with_location(session, water_well_thing)
50+
record = NMA_WeatherData(
51+
object_id=_next_object_id(),
52+
location_id=location_id,
53+
point_id="WX-1000",
54+
weather_id=uuid4(),
55+
thing_id=thing.id,
56+
)
57+
session.add(record)
58+
session.commit()
59+
return record
60+
61+
62+
def test_create_weather_photos_all_fields(water_well_thing):
3563
"""Test creating a weather photos record with all fields."""
3664
with session_ctx() as session:
65+
parent = _create_weather_data(session, water_well_thing)
3766
record = NMA_WeatherPhotos(
38-
weather_id=uuid4(),
67+
weather_id=parent.weather_id,
3968
point_id="WP-0001",
4069
ole_path="weather.jpg",
4170
object_id=321,
@@ -52,14 +81,17 @@ def test_create_weather_photos_all_fields():
5281
assert record.object_id == 321
5382

5483
session.delete(record)
84+
session.delete(parent)
5585
session.commit()
5686

5787

58-
def test_create_weather_photos_minimal():
88+
def test_create_weather_photos_minimal(water_well_thing):
5989
"""Test creating a weather photos record with required fields only."""
6090
with session_ctx() as session:
91+
parent = _create_weather_data(session, water_well_thing)
6192
record = NMA_WeatherPhotos(
6293
point_id="WP-0002",
94+
weather_id=parent.weather_id,
6395
global_id=uuid4(),
6496
)
6597
session.add(record)
@@ -68,11 +100,12 @@ def test_create_weather_photos_minimal():
68100

69101
assert record.global_id is not None
70102
assert record.point_id == "WP-0002"
71-
assert record.weather_id is None
103+
assert record.weather_id is not None
72104
assert record.ole_path is None
73105
assert record.object_id is None
74106

75107
session.delete(record)
108+
session.delete(parent)
76109
session.commit()
77110

78111

transfers/transfer.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,6 @@ def _transfer_parallel(
515515
parallel_tasks_1.append(("Groups", ProjectGroupTransferer))
516516
if opts.transfer_soil_rock_results:
517517
parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer))
518-
if opts.transfer_weather_photos:
519-
parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer))
520518
if opts.transfer_assets:
521519
parallel_tasks_1.append(("Assets", AssetTransferer))
522520
if opts.transfer_associated_data:
@@ -622,14 +620,17 @@ def _transfer_parallel(
622620
)
623621
if "WeatherData" in results_map and results_map["WeatherData"]:
624622
metrics.weather_data_metrics(*results_map["WeatherData"])
625-
if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]:
626-
metrics.weather_photos_metrics(*results_map["WeatherPhotos"])
627623

628624
if opts.transfer_surface_water_photos:
629625
message("TRANSFERRING SURFACE WATER PHOTOS")
630626
results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags)
631627
metrics.surface_water_photos_metrics(*results)
632628

629+
if opts.transfer_weather_photos:
630+
message("TRANSFERRING WEATHER PHOTOS")
631+
results = _execute_transfer(WeatherPhotosTransferer, flags=flags)
632+
metrics.weather_photos_metrics(*results)
633+
633634
if opts.transfer_major_chemistry:
634635
message("TRANSFERRING MAJOR CHEMISTRY")
635636
results = _execute_transfer(MajorChemistryTransferer, flags=flags)

transfers/weather_photos.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from sqlalchemy.dialects.postgresql import insert
2424
from sqlalchemy.orm import Session
2525

26-
from db import NMA_WeatherPhotos
26+
from db import NMA_WeatherData, NMA_WeatherPhotos
27+
from db.engine import session_ctx
2728
from transfers.logger import logger
2829
from transfers.transferer import Transferer
2930
from transfers.util import replace_nans
@@ -37,19 +38,44 @@ class WeatherPhotosTransferer(Transferer):
3738
def __init__(self, *args, batch_size: int = 1000, **kwargs):
3839
super().__init__(*args, **kwargs)
3940
self.batch_size = batch_size
41+
self._weather_id_cache: set[str] = set()
42+
self._build_weather_id_cache()
43+
44+
def _build_weather_id_cache(self) -> None:
45+
with session_ctx() as session:
46+
weather_ids = session.query(NMA_WeatherData.weather_id).all()
47+
for (weather_id,) in weather_ids:
48+
if weather_id:
49+
self._weather_id_cache.add(self._normalize_weather_id(weather_id))
50+
logger.info(
51+
"Built WeatherData cache with %s weather ids",
52+
len(self._weather_id_cache),
53+
)
4054

4155
def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
4256
df = self._read_csv(self.source_table)
4357
cleaned_df = replace_nans(df)
4458
return df, cleaned_df
4559

4660
def _transfer_hook(self, session: Session) -> None:
47-
rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")]
61+
rows: list[dict[str, Any]] = []
62+
skipped_missing_parent = 0
63+
for raw in self.cleaned_df.to_dict("records"):
64+
record = self._row_dict(raw)
65+
if record is None:
66+
skipped_missing_parent += 1
67+
continue
68+
rows.append(record)
4869
rows = self._dedupe_rows(rows, key="GlobalID")
4970

5071
if not rows:
5172
logger.info("No WeatherPhotos rows to transfer")
5273
return
74+
if skipped_missing_parent:
75+
logger.warning(
76+
"Skipped %s WeatherPhotos rows without matching WeatherData",
77+
skipped_missing_parent,
78+
)
5379

5480
insert_stmt = insert(NMA_WeatherPhotos)
5581
excluded = insert_stmt.excluded
@@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None:
74100
session.execute(stmt)
75101
session.commit()
76102

77-
def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
103+
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
104+
weather_id = self._uuid_val(row.get("WeatherID"))
105+
if weather_id is None or not self._has_weather_id(weather_id):
106+
logger.warning(
107+
"Skipping WeatherPhotos WeatherID=%s - WeatherData not found",
108+
weather_id,
109+
)
110+
return None
78111
return {
79-
"WeatherID": self._uuid_val(row.get("WeatherID")),
112+
"WeatherID": weather_id,
80113
"PointID": row.get("PointID"),
81114
"OLEPath": row.get("OLEPath"),
82115
"OBJECTID": row.get("OBJECTID"),
@@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]:
107140
return None
108141
return None
109142

143+
def _has_weather_id(self, weather_id: UUID) -> bool:
144+
return self._normalize_weather_id(weather_id) in self._weather_id_cache
145+
146+
@staticmethod
147+
def _normalize_weather_id(value: UUID) -> str:
148+
return str(value).strip().lower()
149+
110150

111151
def run(batch_size: int = 1000) -> None:
112152
"""Entrypoint to execute the transfer."""

0 commit comments

Comments
 (0)