NO TICKET fix(importers): prevent duplicate well-name collisions in well inventory and water-level imports#633
Conversation
…orts Preserve well inventory re-import idempotency while blocking creation of a new water well when the same `Thing.name` already exists in the database. Also keep the water-level importer defensive against ambiguous well lookups so duplicate well names produce row-level validation errors instead of crashing the CLI with `MultipleResultsFound`. This reduces the risk of creating duplicate `Thing` records and makes import failures clearer for operators.
There was a problem hiding this comment.
Pull request overview
Hardens the well inventory and water-level CSV import flows against duplicate Thing.name collisions for water well records, preventing silent duplicate creation and ensuring ambiguous well lookups fail as row-level validation errors instead of crashing.
Changes:
- Well inventory import: after existing “re-import/idempotency” detection, block creation when a
water wellwith the sameThing.namealready exists elsewhere in the DB. - Water-level import: catch
MultipleResultsFoundduring well resolution and surface a row-level validation error instead of aborting. - Adds regression tests covering both the “existing DB well name” inventory case and the “duplicate name matches” water-level case.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
services/well_inventory_csv.py |
Adds DB-wide duplicate-name guard and maps the resulting row error to well_name_point_id. |
services/water_level_csv.py |
Catches MultipleResultsFound from ambiguous well-name lookups and records a row-level validation error. |
tests/test_well_inventory.py |
Adds coverage asserting inventory import fails when the well name already exists in the DB. |
tests/test_water_level_csv_service.py |
Adds coverage ensuring water-level import reports duplicate well-name matches instead of crashing. |
|
|
||
| with session_ctx() as session: | ||
| session.add(Thing(name=row["well_name_point_id"], thing_type="water well")) | ||
| session.commit() | ||
|
|
||
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | ||
| with file_path.open("w", encoding="utf-8", newline="") as f: | ||
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | ||
| writer.writeheader() | ||
| writer.writerow(row) | ||
|
|
||
| result = well_inventory_csv(file_path) | ||
|
|
||
| assert result.exit_code == 1, result.stderr | ||
| errors = result.payload.get("validation_errors", []) | ||
| assert errors | ||
| assert errors[0]["field"] == "well_name_point_id" | ||
| assert ( | ||
| errors[0]["error"] | ||
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | ||
| ) | ||
|
|
||
| with session_ctx() as session: | ||
| things = ( | ||
| session.query(Thing) | ||
| .filter( | ||
| Thing.name == row["well_name_point_id"], | ||
| Thing.thing_type == "water well", | ||
| ) | ||
| .all() | ||
| ) | ||
| assert len(things) == 1 |
There was a problem hiding this comment.
This test inserts a Thing into the shared session-scoped test database but never deletes it. Because the test suite shares DB state across tests (see tests/conftest.py session fixture), leaving this row behind can make other well-inventory tests flaky by causing unexpected name collisions. Clean up the created Thing (e.g., capture its id and delete it in a finally, or use a fixture that yields and deletes).
| with session_ctx() as session: | |
| session.add(Thing(name=row["well_name_point_id"], thing_type="water well")) | |
| session.commit() | |
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | |
| with file_path.open("w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | |
| writer.writeheader() | |
| writer.writerow(row) | |
| result = well_inventory_csv(file_path) | |
| assert result.exit_code == 1, result.stderr | |
| errors = result.payload.get("validation_errors", []) | |
| assert errors | |
| assert errors[0]["field"] == "well_name_point_id" | |
| assert ( | |
| errors[0]["error"] | |
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | |
| ) | |
| with session_ctx() as session: | |
| things = ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == row["well_name_point_id"], | |
| Thing.thing_type == "water well", | |
| ) | |
| .all() | |
| ) | |
| assert len(things) == 1 | |
| thing_name = row["well_name_point_id"] | |
| try: | |
| with session_ctx() as session: | |
| session.add(Thing(name=thing_name, thing_type="water well")) | |
| session.commit() | |
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | |
| with file_path.open("w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | |
| writer.writeheader() | |
| writer.writerow(row) | |
| result = well_inventory_csv(file_path) | |
| assert result.exit_code == 1, result.stderr | |
| errors = result.payload.get("validation_errors", []) | |
| assert errors | |
| assert errors[0]["field"] == "well_name_point_id" | |
| assert ( | |
| errors[0]["error"] | |
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | |
| ) | |
| with session_ctx() as session: | |
| things = ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == thing_name, | |
| Thing.thing_type == "water well", | |
| ) | |
| .all() | |
| ) | |
| assert len(things) == 1 | |
| finally: | |
| # Clean up the Thing created for this test to avoid leaking state | |
| with session_ctx() as session: | |
| ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == thing_name, | |
| Thing.thing_type == "water well", | |
| ) | |
| .delete(synchronize_session=False) | |
| ) | |
| session.commit() |
| if error_text.startswith( | ||
| "Well already exists in database for well_name_point_id " | ||
| ): | ||
| field = "well_name_point_id" | ||
| else: | ||
| field = _extract_field_from_value_error(error_text) |
There was a problem hiding this comment.
Mapping the validation field by checking error_text.startswith(...) is brittle and couples control flow to an exact error-message string. Consider raising a structured/typed exception (or returning a structured error) that carries the target field explicitly, so future message wording changes don’t silently regress field attribution.
| if error_text.startswith( | |
| "Well already exists in database for well_name_point_id " | |
| ): | |
| field = "well_name_point_id" | |
| else: | |
| field = _extract_field_from_value_error(error_text) | |
| field = _extract_field_from_value_error(error_text) |
Summary
This PR hardens the CSV importers around duplicate water well names.
It updates the well inventory importer so it:
water wellwith the sameThing.namealready exists elsewhere in the databaseThingrecordsIt also keeps the water-level importer defensive so ambiguous well lookups no longer crash the CLI and instead return a row-level validation error.
Why
This PR addresses the following problem / context:
I found that duplicate
Thing.namerecords forwater wellcould exist in the database, and at least one duplicate appears to have been introduced by the well inventory import path.That caused two issues:
Thingwith the same well nameMultipleResultsFoundwhen trying to resolve a well by nameHow
Implementation summary - the following was changed / added / removed:
well_name_point_idvalidation error when the well already existsWhy This Approach
The well inventory importer still needs to support safe reruns of the same import, so the existing
_find_existing_imported_well()logic is evaluated first.Only if the row does not match a prior import do we check whether a well with the same name already exists in the database and block creation.
That gives us:
ThingcreationNotes
Any special considerations, workarounds, or follow-up work to note?
Thing.namerows already in the database. Any pre-existing duplicates should be identified and resolved separately.