From 7f83e4c83121fce58eb0d81ade65aeb780217a9b Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 1 Apr 2026 14:46:10 -0600 Subject: [PATCH] fix(importers): prevent duplicate well-name collisions during CSV imports Preserve well inventory re-import idempotency while blocking creation of a new water well when the same `Thing.name` already exists in the database. Also keep the water-level importer defensive against ambiguous well lookups so duplicate well names produce row-level validation errors instead of crashing the CLI with `MultipleResultsFound`. This reduces the risk of creating duplicate `Thing` records and makes import failures clearer for operators. --- services/water_level_csv.py | 10 ++++- services/well_inventory_csv.py | 20 ++++++++- tests/test_water_level_csv_service.py | 62 +++++++++++++++++++++++++++ tests/test_well_inventory.py | 36 ++++++++++++++++ 4 files changed, 126 insertions(+), 2 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 895e4099..7ae651ca 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -34,6 +34,7 @@ WATER_LEVEL_IGNORED_FIELDS, ) from sqlalchemy import select +from sqlalchemy.exc import MultipleResultsFound from sqlalchemy.orm import Session, selectinload REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) @@ -259,7 +260,14 @@ def _validate_rows( Thing.thing_type == "water well", ) ) - well = session.scalars(sql).one_or_none() + try: + well = session.scalars(sql).one_or_none() + except MultipleResultsFound: + errors.append( + f"Row {idx}: Multiple wells found for well_name_point_id " + f"'{well_name}'" + ) + continue if well is None: errors.append(f"Row {idx}: Unknown well_name_point_id '{well_name}'") continue diff --git a/services/well_inventory_csv.py b/services/well_inventory_csv.py index 2a3dcc2f..1289658f 100644 --- a/services/well_inventory_csv.py +++ b/services/well_inventory_csv.py @@ -280,7 +280,12 @@ class dialect: field = "Database error" else: error_text = str(e) - field = _extract_field_from_value_error(error_text) + if error_text.startswith( + "Well already exists in database for well_name_point_id " + ): + field = "well_name_point_id" + else: + field = _extract_field_from_value_error(error_text) logging.error( f"Error while importing row {row_number} ('{current_row_id}'): {error_text}" @@ -666,6 +671,19 @@ def _add_csv_row(session: Session, group: Group, model: WellInventoryRow, user) if existing_well is not None: return existing_well.name + existing_named_well = session.scalars( + select(Thing) + .where( + Thing.name == model.well_name_point_id, + Thing.thing_type == "water well", + ) + .order_by(Thing.id.asc()) + ).first() + if existing_named_well is not None: + raise ValueError( + f"Well already exists in database for well_name_point_id '{model.well_name_point_id}'" + ) + # -------------------- # Location and associated tables # -------------------- diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 64c145bd..e4b01d9c 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -463,3 +463,65 @@ def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( "Unknown well_name_point_id 'Unknown Well'" in message for message in result.payload["validation_errors"] ) + + +def test_bulk_upload_water_levels_reports_duplicate_well_name_matches(): + with session_ctx() as session: + well_one = Thing(name="Duplicate Well", thing_type="water well") + well_two = Thing(name="Duplicate Well", thing_type="water well") + session.add_all([well_one, well_two]) + session.commit() + well_one_id = well_one.id + well_two_id = well_two.id + + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "Duplicate Well", + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + try: + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 1 + assert result.payload["summary"]["total_rows_processed"] == 1 + assert result.payload["summary"]["total_rows_imported"] == 0 + assert result.payload["validation_errors"] == [ + "Row 1: Multiple wells found for well_name_point_id 'Duplicate Well'" + ] + finally: + with session_ctx() as session: + for well_id in (well_one_id, well_two_id): + well = session.get(Thing, well_id) + if well is not None: + session.delete(well) + session.commit() diff --git a/tests/test_well_inventory.py b/tests/test_well_inventory.py index e3a5f45a..4e3be31b 100644 --- a/tests/test_well_inventory.py +++ b/tests/test_well_inventory.py @@ -833,6 +833,42 @@ def test_upload_duplicate_well_ids(self): errors = result.payload.get("validation_errors", []) assert any("Duplicate" in str(e) for e in errors) + def test_upload_fails_when_well_name_already_exists_in_database(self, tmp_path): + """Upload fails when a water well with the same Thing.name already exists.""" + row = _minimal_valid_well_inventory_row() + + with session_ctx() as session: + session.add(Thing(name=row["well_name_point_id"], thing_type="water well")) + session.commit() + + file_path = tmp_path / "well-inventory-existing-db-well.csv" + with file_path.open("w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=list(row.keys())) + writer.writeheader() + writer.writerow(row) + + result = well_inventory_csv(file_path) + + assert result.exit_code == 1, result.stderr + errors = result.payload.get("validation_errors", []) + assert errors + assert errors[0]["field"] == "well_name_point_id" + assert ( + errors[0]["error"] + == "Well already exists in database for well_name_point_id 'TEST-0001'" + ) + + with session_ctx() as session: + things = ( + session.query(Thing) + .filter( + Thing.name == row["well_name_point_id"], + Thing.thing_type == "water well", + ) + .all() + ) + assert len(things) == 1 + def test_upload_blank_well_name_point_id_autogenerates(self, tmp_path): """Upload succeeds when well_name_point_id is blank and auto-generates IDs.""" source_path = Path("tests/features/data/well-inventory-valid.csv")