-
Notifications
You must be signed in to change notification settings - Fork 4
transfer-fix #501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transfer-fix #501
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,7 +114,8 @@ def _transfer_hook(self, session: Session) -> None: | |
| """ | ||
| Override transfer hook to use batch upsert for idempotent transfers. | ||
|
|
||
| Uses ON CONFLICT DO UPDATE on nma_GlobalID (the legacy UUID PK, now UNIQUE). | ||
| Uses ON CONFLICT DO UPDATE on (chemistry_sample_info_id, analyte), | ||
| matching uq_minor_trace_chemistry_sample_analyte. | ||
| """ | ||
| df = self.cleaned_df | ||
|
|
||
|
|
@@ -129,8 +130,12 @@ def _transfer_hook(self, session: Session) -> None: | |
| logger.warning("No valid rows to transfer") | ||
| return | ||
|
|
||
| # Dedupe by nma_GlobalID to avoid PK conflicts. | ||
| rows = self._dedupe_rows(row_dicts) | ||
| # Dedupe by the same logical key used by the table unique constraint. | ||
| rows = self._dedupe_rows( | ||
| row_dicts, | ||
| key=["chemistry_sample_info_id", "analyte"], | ||
| include_missing=True, | ||
| ) | ||
| logger.info(f"Upserting {len(rows)} MinorTraceChemistry records") | ||
|
|
||
| insert_stmt = insert(NMA_MinorTraceChemistry) | ||
|
|
@@ -139,9 +144,9 @@ def _transfer_hook(self, session: Session) -> None: | |
| for i in range(0, len(rows), self.batch_size): | ||
| chunk = rows[i : i + self.batch_size] | ||
| logger.info(f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows)") | ||
| # Upsert on nma_GlobalID (legacy UUID PK, now UNIQUE) | ||
| # Upsert on unique logical key (chemistry_sample_info_id, analyte) | ||
| stmt = insert_stmt.values(chunk).on_conflict_do_update( | ||
| index_elements=["nma_GlobalID"], | ||
| index_elements=["chemistry_sample_info_id", "analyte"], | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Switching Useful? React with 👍 / 👎.
Comment on lines
+147
to
+149
|
||
| set_={ | ||
| "chemistry_sample_info_id": excluded.chemistry_sample_info_id, | ||
| "nma_chemistry_sample_info_uuid": excluded.nma_chemistry_sample_info_uuid, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
analyteis nullable onNMA_MinorTraceChemistry, but the new dedupe/upsert key includes it andinclude_missing=Truekeeps rows whereanalyteis NULL. In Postgres,ON CONFLICT (chemistry_sample_info_id, analyte)will not match existing rows whenanalyteis NULL, so re-running the transfer will keep inserting additional NULL-analyte rows (non-idempotent growth). Consider treating missinganalyteas an error/skip (capture it in_row_to_dict), or use a different conflict key/fallback path (e.g., upsert onnma_GlobalIDwhenanalyteis missing), or enforceanalyteas NOT NULL in the schema if that’s valid for the dataset.