Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion services/water_level_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
WATER_LEVEL_IGNORED_FIELDS,
)
from sqlalchemy import select
from sqlalchemy.exc import MultipleResultsFound
from sqlalchemy.orm import Session, selectinload

REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS)
Expand Down Expand Up @@ -259,7 +260,14 @@ def _validate_rows(
Thing.thing_type == "water well",
)
)
well = session.scalars(sql).one_or_none()
try:
well = session.scalars(sql).one_or_none()
except MultipleResultsFound:
errors.append(
f"Row {idx}: Multiple wells found for well_name_point_id "
f"'{well_name}'"
)
continue
if well is None:
errors.append(f"Row {idx}: Unknown well_name_point_id '{well_name}'")
continue
Expand Down
20 changes: 19 additions & 1 deletion services/well_inventory_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,12 @@ class dialect:
field = "Database error"
else:
error_text = str(e)
field = _extract_field_from_value_error(error_text)
if error_text.startswith(
"Well already exists in database for well_name_point_id "
):
field = "well_name_point_id"
else:
field = _extract_field_from_value_error(error_text)
Comment on lines +283 to +288
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping the validation field by checking error_text.startswith(...) is brittle and couples control flow to an exact error-message string. Consider raising a structured/typed exception (or returning a structured error) that carries the target field explicitly, so future message wording changes don’t silently regress field attribution.

Suggested change
if error_text.startswith(
"Well already exists in database for well_name_point_id "
):
field = "well_name_point_id"
else:
field = _extract_field_from_value_error(error_text)
field = _extract_field_from_value_error(error_text)

Copilot uses AI. Check for mistakes.

logging.error(
f"Error while importing row {row_number} ('{current_row_id}'): {error_text}"
Expand Down Expand Up @@ -666,6 +671,19 @@ def _add_csv_row(session: Session, group: Group, model: WellInventoryRow, user)
if existing_well is not None:
return existing_well.name

existing_named_well = session.scalars(
select(Thing)
.where(
Thing.name == model.well_name_point_id,
Thing.thing_type == "water well",
)
.order_by(Thing.id.asc())
).first()
if existing_named_well is not None:
raise ValueError(
f"Well already exists in database for well_name_point_id '{model.well_name_point_id}'"
)

# --------------------
# Location and associated tables
# --------------------
Expand Down
62 changes: 62 additions & 0 deletions tests/test_water_level_csv_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,65 @@ def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail(
"Unknown well_name_point_id 'Unknown Well'" in message
for message in result.payload["validation_errors"]
)


def test_bulk_upload_water_levels_reports_duplicate_well_name_matches():
with session_ctx() as session:
well_one = Thing(name="Duplicate Well", thing_type="water well")
well_two = Thing(name="Duplicate Well", thing_type="water well")
session.add_all([well_one, well_two])
session.commit()
well_one_id = well_one.id
well_two_id = well_two.id

csv_content = "\n".join(
[
",".join(
[
"field_staff",
"well_name_point_id",
"field_event_date_time",
"measurement_date_time",
"sampler",
"sample_method",
"mp_height",
"level_status",
"depth_to_water_ft",
"data_quality",
"water_level_notes",
]
),
",".join(
[
"A Lopez",
"Duplicate Well",
"2025-02-15T08:00:00-07:00",
"2025-02-15T10:30:00-07:00",
"A Lopez",
"electric tape",
"1.5",
"Water level not affected",
"7.0",
"Water level accurate to within two hundreths of a foot",
"Initial measurement",
]
),
]
)

try:
result = bulk_upload_water_levels(csv_content.encode("utf-8"))

assert result.exit_code == 1
assert result.payload["summary"]["total_rows_processed"] == 1
assert result.payload["summary"]["total_rows_imported"] == 0
assert result.payload["validation_errors"] == [
"Row 1: Multiple wells found for well_name_point_id 'Duplicate Well'"
]
finally:
with session_ctx() as session:
for well_id in (well_one_id, well_two_id):
well = session.get(Thing, well_id)
if well is not None:
session.delete(well)
session.commit()
36 changes: 36 additions & 0 deletions tests/test_well_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,42 @@ def test_upload_duplicate_well_ids(self):
errors = result.payload.get("validation_errors", [])
assert any("Duplicate" in str(e) for e in errors)

def test_upload_fails_when_well_name_already_exists_in_database(self, tmp_path):
"""Upload fails when a water well with the same Thing.name already exists."""
row = _minimal_valid_well_inventory_row()

with session_ctx() as session:
session.add(Thing(name=row["well_name_point_id"], thing_type="water well"))
session.commit()

file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)

result = well_inventory_csv(file_path)

assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)

with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == row["well_name_point_id"],
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
Comment on lines +839 to +870
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test inserts a Thing into the shared session-scoped test database but never deletes it. Because the test suite shares DB state across tests (see tests/conftest.py session fixture), leaving this row behind can make other well-inventory tests flaky by causing unexpected name collisions. Clean up the created Thing (e.g., capture its id and delete it in a finally, or use a fixture that yields and deletes).

Suggested change
with session_ctx() as session:
session.add(Thing(name=row["well_name_point_id"], thing_type="water well"))
session.commit()
file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)
result = well_inventory_csv(file_path)
assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)
with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == row["well_name_point_id"],
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
thing_name = row["well_name_point_id"]
try:
with session_ctx() as session:
session.add(Thing(name=thing_name, thing_type="water well"))
session.commit()
file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)
result = well_inventory_csv(file_path)
assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)
with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == thing_name,
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
finally:
# Clean up the Thing created for this test to avoid leaking state
with session_ctx() as session:
(
session.query(Thing)
.filter(
Thing.name == thing_name,
Thing.thing_type == "water well",
)
.delete(synchronize_session=False)
)
session.commit()

Copilot uses AI. Check for mistakes.

def test_upload_blank_well_name_point_id_autogenerates(self, tmp_path):
"""Upload succeeds when well_name_point_id is blank and auto-generates IDs."""
source_path = Path("tests/features/data/well-inventory-valid.csv")
Expand Down
Loading