-
Notifications
You must be signed in to change notification settings - Fork 4
chore: refactor transfer functions to improve point ID handling and logging #518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,10 +16,9 @@ | |||||||
| import json | ||||||||
| import uuid | ||||||||
| from datetime import datetime, timezone, timedelta | ||||||||
| from typing import Any | ||||||||
|
|
||||||||
| import pandas as pd | ||||||||
| from sqlalchemy.orm import Session | ||||||||
|
|
||||||||
| from db import ( | ||||||||
| Thing, | ||||||||
| Sample, | ||||||||
|
|
@@ -31,6 +30,8 @@ | |||||||
| Parameter, | ||||||||
| ) | ||||||||
| from db.engine import session_ctx | ||||||||
| from sqlalchemy.exc import DatabaseError, SQLAlchemyError | ||||||||
| from sqlalchemy.orm import Session | ||||||||
| from transfers.transferer import Transferer | ||||||||
| from transfers.util import ( | ||||||||
| filter_to_valid_point_ids, | ||||||||
|
|
@@ -72,9 +73,10 @@ def get_contacts_info( | |||||||
|
|
||||||||
|
|
||||||||
| class WaterLevelTransferer(Transferer): | ||||||||
| source_table = "WaterLevels" | ||||||||
|
|
||||||||
| def __init__(self, *args, **kw): | ||||||||
| super().__init__(*args, **kw) | ||||||||
| self.source_table = "WaterLevels" | ||||||||
| with session_ctx() as session: | ||||||||
| groundwater_parameter_id = ( | ||||||||
| session.query(Parameter) | ||||||||
|
|
@@ -94,23 +96,79 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: | |||||||
| input_df = read_csv(self.source_table, dtype={"MeasuredBy": str}) | ||||||||
| cleaned_df = filter_to_valid_point_ids(input_df) | ||||||||
| cleaned_df = filter_by_valid_measuring_agency(cleaned_df) | ||||||||
| logger.info( | ||||||||
| "Prepared %s rows for %s after filtering (%s -> %s)", | ||||||||
| len(cleaned_df), | ||||||||
| self.source_table, | ||||||||
| len(input_df), | ||||||||
| len(cleaned_df), | ||||||||
| ) | ||||||||
| return input_df, cleaned_df | ||||||||
|
|
||||||||
| def _transfer_hook(self, session: Session) -> None: | ||||||||
| stats: dict[str, int] = { | ||||||||
| "groups_total": 0, | ||||||||
| "groups_processed": 0, | ||||||||
| "groups_skipped_missing_thing": 0, | ||||||||
| "groups_failed_commit": 0, | ||||||||
| "rows_total": 0, | ||||||||
| "rows_created": 0, | ||||||||
| "rows_skipped_dt": 0, | ||||||||
| "rows_skipped_reason": 0, | ||||||||
| "rows_skipped_contacts": 0, | ||||||||
| "rows_well_destroyed": 0, | ||||||||
| "field_events_created": 0, | ||||||||
| "field_activities_created": 0, | ||||||||
| "samples_created": 0, | ||||||||
| "observations_created": 0, | ||||||||
| "contacts_created": 0, | ||||||||
| "contacts_reused": 0, | ||||||||
| } | ||||||||
|
|
||||||||
| gwd = self.cleaned_df.groupby(["PointID"]) | ||||||||
| for index, group in gwd: | ||||||||
| total_groups = len(gwd) | ||||||||
| for gi, (index, group) in enumerate(gwd, start=1): | ||||||||
| stats["groups_total"] += 1 | ||||||||
| pointid = index[0] | ||||||||
| thing = session.query(Thing).where(Thing.name == pointid).first() | ||||||||
| logger.info( | ||||||||
| "Processing WaterLevels group %s/%s for PointID=%s (%s rows)", | ||||||||
| gi, | ||||||||
| total_groups, | ||||||||
| pointid, | ||||||||
| len(group), | ||||||||
| ) | ||||||||
|
|
||||||||
| thing = session.query(Thing).where(Thing.name == pointid).one_or_none() | ||||||||
| if thing is None: | ||||||||
| stats["groups_skipped_missing_thing"] += 1 | ||||||||
| logger.warning( | ||||||||
| "Skipping PointID=%s because Thing was not found", pointid | ||||||||
| ) | ||||||||
| self._capture_error(pointid, "Thing not found", "PointID") | ||||||||
| continue | ||||||||
|
|
||||||||
| for i, row in enumerate(group.itertuples()): | ||||||||
| stats["rows_total"] += 1 | ||||||||
| dt_utc = self._get_dt_utc(row) | ||||||||
| if dt_utc is None: | ||||||||
| stats["rows_skipped_dt"] += 1 | ||||||||
| continue | ||||||||
|
|
||||||||
| # reasons | ||||||||
| # reasons | ||||||||
| try: | ||||||||
| glv = self._get_groundwater_level_reason(row) | ||||||||
| except KeyError as e: | ||||||||
| except (KeyError, ValueError) as e: | ||||||||
| stats["rows_skipped_reason"] += 1 | ||||||||
| logger.warning( | ||||||||
| "Skipping %s due to invalid groundwater level reason: %s", | ||||||||
| self._row_context(row), | ||||||||
| e, | ||||||||
| ) | ||||||||
| self._capture_error( | ||||||||
| row.PointID, | ||||||||
| f"invalid groundwater level reason: {e}", | ||||||||
| "LevelStatus", | ||||||||
| ) | ||||||||
| continue | ||||||||
|
|
||||||||
| release_status = "public" if row.PublicRelease else "private" | ||||||||
|
|
@@ -122,9 +180,25 @@ def _transfer_hook(self, session: Session) -> None: | |||||||
| release_status=release_status, | ||||||||
| ) | ||||||||
| session.add(field_event) | ||||||||
| stats["field_events_created"] += 1 | ||||||||
| field_event_participants = self._get_field_event_participants( | ||||||||
| session, row, thing | ||||||||
| ) | ||||||||
| stats["contacts_created"] += getattr( | ||||||||
| self, "_last_contacts_created_count", 0 | ||||||||
| ) | ||||||||
| stats["contacts_reused"] += getattr( | ||||||||
| self, "_last_contacts_reused_count", 0 | ||||||||
| ) | ||||||||
|
|
||||||||
| if not field_event_participants: | ||||||||
| stats["rows_skipped_contacts"] += 1 | ||||||||
| logger.warning( | ||||||||
| "Skipping %s because no field event participants were found", | ||||||||
| self._row_context(row), | ||||||||
| ) | ||||||||
|
||||||||
| ) | |
| ) | |
| session.expunge(field_event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid committing orphan field events on skipped rows
This branch skips rows with no participants, but field_event has already been added to the session earlier in the same iteration, so the later group commit still persists a standalone FieldEvent without participants/activity/sample/observation. This is now reachable for rows with missing/invalid MeasuredBy or owner fallback with no thing contacts, and it silently creates inconsistent transfer data despite logging that the row was skipped.
Useful? React with 👍 / 👎.
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop variable 'i' on line 203 shadows the outer loop variable 'i' from line 150. While this doesn't cause a bug in this case since the outer 'i' is not used after line 150, it's better practice to use a different variable name (e.g., 'participant_idx' or 'p_idx') to avoid confusion and potential bugs if the code is modified later.
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The base Transferer.transfer() method calls session.commit() after _transfer_hook returns (line 65 of transferer.py). Since this implementation now commits per-group inside the loop, there will be a final commit after all groups are processed. While this is not necessarily wrong, it's redundant since all changes have already been committed. Consider whether this design is intentional or if the base class commit should be bypassed for this implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
.one_or_none()here will raiseMultipleResultsFoundif more than oneThingshares the samename, and that exception is not caught in this loop, so one duplicate can abort the whole water-level transfer. The model does not enforceThing.nameuniqueness (seedb/thing.py), so this is a real runtime regression from the previous.first()behavior unless duplicates are explicitly handled.Useful? React with 👍 / 👎.