Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 35b15e4af5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| field_event_participants.append(contact) | ||
| self._last_contacts_reused_count += 1 | ||
| else: | ||
| owner_contact_id = self._owner_contact_id_by_pointid.get(row.PointID) |
There was a problem hiding this comment.
Avoid static owner-contact lookup during parallel transfer
This lookup now depends on self._owner_contact_id_by_pointid, which is built once in __init__ and never refreshed, so owner fallbacks can become stale while the transfer is running. In this repo, Contacts and WaterLevels are launched together in parallel (transfers/transfer.py lines 509-513), so if contact/association rows are committed after WaterLevels starts, owner-measured rows will be treated as missing contacts and skipped even though the owner association exists later in the same run.
Useful? React with 👍 / 👎.
| self._thing_id_by_pointid = { | ||
| name: thing_id | ||
| for name, thing_id in session.query(Thing.name, Thing.id).all() | ||
| } |
There was a problem hiding this comment.
Detect duplicate Thing names before caching IDs
Building self._thing_id_by_pointid with a dict silently overwrites earlier rows when multiple Thing records share the same name; unlike the previous per-group one_or_none() query, this no longer fails fast on ambiguous matches and can attach all water-level rows for that PointID to an arbitrary Thing. Since Thing.name is not unique in the model, this is a real data-integrity regression whenever duplicate names exist.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
This PR updates the WaterLevels transfer to reduce per-row ORM overhead by caching lookup data and using batched inserts for FieldEvent → FieldActivity → Sample → Observation creation.
Changes:
- Add in-memory caches for
ThingIDs (by PointID) and fallback “owner” contact IDs to avoid repeated DB queries. - Replace per-row ORM object creation with batched
INSERT ... RETURNINGflows to create FieldEvents/Participants/Activities/Samples/Observations more efficiently. - Refactor observation construction by extracting shared parsing into
_get_observation_partsand adding an insert-row builder.
| def _get_observation_parts( | ||
| self, row: pd.Series | ||
| ) -> tuple[float | None, float | None, str | None]: | ||
| if pd.isna(row.MPHeight): | ||
| if pd.notna(row.DepthToWater) and pd.notna(row.DepthToWaterBGS): | ||
| logger.warning( |
There was a problem hiding this comment.
_get_observation_parts can compute measuring_point_height = None (e.g., when MPHeight and DepthToWater are NULL). Later in this helper the code adds DepthToWaterBGS + measuring_point_height, which will raise a TypeError when DepthToWaterBGS is present but measuring_point_height is None. Consider treating missing MP height as 0 for that calculation (as the log message suggests), or guard the addition when measuring_point_height is None.
| session.commit() | ||
| session.expunge_all() | ||
| stats["groups_processed"] += 1 |
There was a problem hiding this comment.
Contacts created during a group are cached in self._created_contacts before the group transaction is committed. If any of the subsequent batch inserts fail and the transaction is rolled back, those cached Contact objects/IDs may not exist in the DB anymore, but later rows/groups can still reuse them and attempt to insert FieldEventParticipant rows referencing missing contact_id values (FK failures or confusing errors). Consider only adding contacts to the cache after a successful commit, or tracking which contacts were created in the current group and removing them from the cache on rollback.
| def _get_observation_parts( | ||
| self, row: pd.Series | ||
| ) -> tuple[float | None, float | None, str | None]: | ||
| if pd.isna(row.MPHeight): | ||
| if pd.notna(row.DepthToWater) and pd.notna(row.DepthToWaterBGS): | ||
| logger.warning( |
There was a problem hiding this comment.
This helper now centralizes the value/measuring_point_height derivation and has a couple of tricky NULL combinations (e.g., MPHeight NULL + DepthToWater NULL + DepthToWaterBGS present). There are existing unit tests for _make_observation, but none that cover these edge cases in _get_observation_parts; adding a focused test here would help prevent regressions and catch the TypeError scenario.
Why
This PR addresses the following problem / context:
How
Implementation summary - the following was changed / added / removed:
Notes
Any special considerations, workarounds, or follow-up work to note?