Skip to content

Commit 2b46439

Browse files
committed
chore: refactor transfer functions to improve point ID handling and logging
1 parent 49c089e commit 2b46439

2 files changed

Lines changed: 177 additions & 16 deletions

File tree

transfers/transfer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from dataclasses import dataclass
2121

2222
from dotenv import load_dotenv
23-
2423
from transfers.thing_transfer import (
2524
transfer_rock_sample_locations,
2625
transfer_springs,
@@ -216,13 +215,16 @@ def transfer_context(name: str, *, pad: int = 10):
216215
logger.info("Finished %s", name)
217216

218217

219-
def _execute_transfer(klass, flags: dict = None):
220-
"""Execute a single transfer class. Thread-safe since each creates its own session."""
218+
def _get_test_pointids():
221219
pointids = None
222220
if os.getenv("TRANSFER_TEST_POINTIDS"):
223221
pointids = os.getenv("TRANSFER_TEST_POINTIDS").split(",")
222+
return pointids
224223

225-
transferer = klass(flags=flags, pointids=pointids)
224+
225+
def _execute_transfer(klass, flags: dict = None):
226+
"""Execute a single transfer class. Thread-safe since each creates its own session."""
227+
transferer = klass(flags=flags, pointids=_get_test_pointids())
226228
transferer.transfer()
227229
return transferer.input_df, transferer.cleaned_df, transferer.errors
228230

@@ -372,7 +374,7 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]:
372374
use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", True)
373375
if use_parallel_wells:
374376
logger.info("Using PARALLEL wells transfer")
375-
transferer = WellTransferer(flags=flags)
377+
transferer = WellTransferer(flags=flags, pointids=_get_test_pointids())
376378
transferer.transfer_parallel()
377379
results = (transferer.input_df, transferer.cleaned_df, transferer.errors)
378380
else:

transfers/waterlevels_transfer.py

Lines changed: 170 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
import json
1717
import uuid
1818
from datetime import datetime, timezone, timedelta
19+
from typing import Any
1920

2021
import pandas as pd
21-
from sqlalchemy.orm import Session
22-
2322
from db import (
2423
Thing,
2524
Sample,
@@ -31,6 +30,8 @@
3130
Parameter,
3231
)
3332
from db.engine import session_ctx
33+
from sqlalchemy.exc import DatabaseError, SQLAlchemyError
34+
from sqlalchemy.orm import Session
3435
from transfers.transferer import Transferer
3536
from transfers.util import (
3637
filter_to_valid_point_ids,
@@ -72,9 +73,10 @@ def get_contacts_info(
7273

7374

7475
class WaterLevelTransferer(Transferer):
76+
source_table = "WaterLevels"
77+
7578
def __init__(self, *args, **kw):
7679
super().__init__(*args, **kw)
77-
self.source_table = "WaterLevels"
7880
with session_ctx() as session:
7981
groundwater_parameter_id = (
8082
session.query(Parameter)
@@ -94,23 +96,79 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
9496
input_df = read_csv(self.source_table, dtype={"MeasuredBy": str})
9597
cleaned_df = filter_to_valid_point_ids(input_df)
9698
cleaned_df = filter_by_valid_measuring_agency(cleaned_df)
99+
logger.info(
100+
"Prepared %s rows for %s after filtering (%s -> %s)",
101+
len(cleaned_df),
102+
self.source_table,
103+
len(input_df),
104+
len(cleaned_df),
105+
)
97106
return input_df, cleaned_df
98107

99108
def _transfer_hook(self, session: Session) -> None:
109+
stats: dict[str, int] = {
110+
"groups_total": 0,
111+
"groups_processed": 0,
112+
"groups_skipped_missing_thing": 0,
113+
"groups_failed_commit": 0,
114+
"rows_total": 0,
115+
"rows_created": 0,
116+
"rows_skipped_dt": 0,
117+
"rows_skipped_reason": 0,
118+
"rows_skipped_contacts": 0,
119+
"rows_well_destroyed": 0,
120+
"field_events_created": 0,
121+
"field_activities_created": 0,
122+
"samples_created": 0,
123+
"observations_created": 0,
124+
"contacts_created": 0,
125+
"contacts_reused": 0,
126+
}
127+
100128
gwd = self.cleaned_df.groupby(["PointID"])
101-
for index, group in gwd:
129+
total_groups = len(gwd)
130+
for gi, (index, group) in enumerate(gwd, start=1):
131+
stats["groups_total"] += 1
102132
pointid = index[0]
103-
thing = session.query(Thing).where(Thing.name == pointid).first()
133+
logger.info(
134+
"Processing WaterLevels group %s/%s for PointID=%s (%s rows)",
135+
gi,
136+
total_groups,
137+
pointid,
138+
len(group),
139+
)
140+
141+
thing = session.query(Thing).where(Thing.name == pointid).one_or_none()
142+
if thing is None:
143+
stats["groups_skipped_missing_thing"] += 1
144+
logger.warning(
145+
"Skipping PointID=%s because Thing was not found", pointid
146+
)
147+
self._capture_error(pointid, "Thing not found", "PointID")
148+
continue
104149

105150
for i, row in enumerate(group.itertuples()):
151+
stats["rows_total"] += 1
106152
dt_utc = self._get_dt_utc(row)
107153
if dt_utc is None:
154+
stats["rows_skipped_dt"] += 1
108155
continue
109156

110-
# reasons
157+
# reasons
111158
try:
112159
glv = self._get_groundwater_level_reason(row)
113-
except KeyError as e:
160+
except (KeyError, ValueError) as e:
161+
stats["rows_skipped_reason"] += 1
162+
logger.warning(
163+
"Skipping %s due to invalid groundwater level reason: %s",
164+
self._row_context(row),
165+
e,
166+
)
167+
self._capture_error(
168+
row.PointID,
169+
f"invalid groundwater level reason: {e}",
170+
"LevelStatus",
171+
)
114172
continue
115173

116174
release_status = "public" if row.PublicRelease else "private"
@@ -122,9 +180,25 @@ def _transfer_hook(self, session: Session) -> None:
122180
release_status=release_status,
123181
)
124182
session.add(field_event)
183+
stats["field_events_created"] += 1
125184
field_event_participants = self._get_field_event_participants(
126185
session, row, thing
127186
)
187+
stats["contacts_created"] += getattr(
188+
self, "_last_contacts_created_count", 0
189+
)
190+
stats["contacts_reused"] += getattr(
191+
self, "_last_contacts_reused_count", 0
192+
)
193+
194+
if not field_event_participants:
195+
stats["rows_skipped_contacts"] += 1
196+
logger.warning(
197+
"Skipping %s because no field event participants were found",
198+
self._row_context(row),
199+
)
200+
continue
201+
128202
sampler = None
129203
for i, participant in enumerate(field_event_participants):
130204
field_event_participant = FieldEventParticipant(
@@ -143,8 +217,10 @@ def _transfer_hook(self, session: Session) -> None:
143217
== "Well was destroyed (no subsequent water levels should be recorded)"
144218
):
145219
logger.warning(
146-
"Well is destroyed - no field activity/sample/observation will be made"
220+
"Well is destroyed for %s - no field activity/sample/observation will be made",
221+
self._row_context(row),
147222
)
223+
stats["rows_well_destroyed"] += 1
148224
field_event.notes = glv
149225
continue
150226

@@ -156,16 +232,52 @@ def _transfer_hook(self, session: Session) -> None:
156232
release_status=release_status,
157233
)
158234
session.add(field_activity)
235+
stats["field_activities_created"] += 1
159236

160237
# Sample
161238
sample = self._make_sample(row, field_activity, dt_utc, sampler)
162239
session.add(sample)
240+
stats["samples_created"] += 1
163241

164242
# Observation
165243
observation = self._make_observation(row, sample, dt_utc, glv)
166244
session.add(observation)
245+
stats["observations_created"] += 1
246+
stats["rows_created"] += 1
247+
248+
try:
249+
session.commit()
250+
session.expunge_all()
251+
stats["groups_processed"] += 1
252+
except DatabaseError as e:
253+
stats["groups_failed_commit"] += 1
254+
logger.exception(
255+
"Failed committing WaterLevels group for PointID=%s: %s",
256+
pointid,
257+
e,
258+
)
259+
session.rollback()
260+
self._capture_database_error(pointid, e)
261+
except SQLAlchemyError as e:
262+
stats["groups_failed_commit"] += 1
263+
logger.exception(
264+
"SQLAlchemy failure committing WaterLevels group for PointID=%s: %s",
265+
pointid,
266+
e,
267+
)
268+
session.rollback()
269+
self._capture_error(pointid, str(e), "UnknownField")
270+
except Exception as e:
271+
stats["groups_failed_commit"] += 1
272+
logger.exception(
273+
"Unexpected failure committing WaterLevels group for PointID=%s: %s",
274+
pointid,
275+
e,
276+
)
277+
session.rollback()
278+
self._capture_error(pointid, str(e), "UnknownField")
167279

168-
session.commit()
280+
self._log_transfer_summary(stats)
169281

170282
def _make_observation(
171283
self, row: pd.Series, sample: Sample, dt_utc: datetime, glv: str
@@ -265,6 +377,8 @@ def _get_groundwater_level_reason(self, row) -> str:
265377
return glv
266378

267379
def _get_field_event_participants(self, session, row, thing) -> list[Contact]:
380+
self._last_contacts_created_count = 0
381+
self._last_contacts_reused_count = 0
268382
field_event_participants = []
269383
measured_by = None if pd.isna(row.MeasuredBy) else row.MeasuredBy
270384

@@ -277,6 +391,7 @@ def _get_field_event_participants(self, session, row, thing) -> list[Contact]:
277391
for name, organization, role in contact_info:
278392
if (name, organization) in self._created_contacts:
279393
contact = self._created_contacts[(name, organization)]
394+
self._last_contacts_reused_count += 1
280395
else:
281396
try:
282397
# create new contact if not already created
@@ -294,6 +409,7 @@ def _get_field_event_participants(self, session, row, thing) -> list[Contact]:
294409
)
295410

296411
self._created_contacts[(name, organization)] = contact
412+
self._last_contacts_created_count += 1
297413
except Exception as e:
298414
logger.critical(
299415
f"Contact cannot be created: Name {name} | Role {role} | Organization {organization} because of the following: {str(e)}"
@@ -302,8 +418,21 @@ def _get_field_event_participants(self, session, row, thing) -> list[Contact]:
302418

303419
field_event_participants.append(contact)
304420
else:
305-
contact = thing.contacts[0]
306-
field_event_participants.append(contact)
421+
if thing.contacts:
422+
contact = thing.contacts[0]
423+
field_event_participants.append(contact)
424+
self._last_contacts_reused_count += 1
425+
else:
426+
logger.warning(
427+
"Thing for PointID=%s has no contacts; cannot use owner fallback for %s",
428+
row.PointID,
429+
self._row_context(row),
430+
)
431+
self._capture_error(
432+
row.PointID,
433+
"Thing has no contacts for owner fallback",
434+
"MeasuredBy",
435+
)
307436

308437
if len(field_event_participants) == 0:
309438
logger.critical(
@@ -313,6 +442,36 @@ def _get_field_event_participants(self, session, row, thing) -> list[Contact]:
313442

314443
return field_event_participants
315444

445+
def _row_context(self, row: Any) -> str:
446+
return (
447+
f"PointID={getattr(row, 'PointID', None)}, "
448+
f"OBJECTID={getattr(row, 'OBJECTID', None)}, "
449+
f"GlobalID={getattr(row, 'GlobalID', None)}"
450+
)
451+
452+
def _log_transfer_summary(self, stats: dict[str, int]) -> None:
453+
logger.info(
454+
"WaterLevels summary: groups total=%s processed=%s skipped_missing_thing=%s failed_commit=%s "
455+
"rows total=%s created=%s skipped_dt=%s skipped_reason=%s skipped_contacts=%s well_destroyed=%s "
456+
"field_events=%s activities=%s samples=%s observations=%s contacts_created=%s contacts_reused=%s",
457+
stats["groups_total"],
458+
stats["groups_processed"],
459+
stats["groups_skipped_missing_thing"],
460+
stats["groups_failed_commit"],
461+
stats["rows_total"],
462+
stats["rows_created"],
463+
stats["rows_skipped_dt"],
464+
stats["rows_skipped_reason"],
465+
stats["rows_skipped_contacts"],
466+
stats["rows_well_destroyed"],
467+
stats["field_events_created"],
468+
stats["field_activities_created"],
469+
stats["samples_created"],
470+
stats["observations_created"],
471+
stats["contacts_created"],
472+
stats["contacts_reused"],
473+
)
474+
316475
def _get_dt_utc(self, row) -> datetime | None:
317476
if pd.isna(row.DateMeasured):
318477
logger.critical(

0 commit comments

Comments
 (0)