From 53fee18538d3cfa1cc821d850afbe45c32e83f86 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 26 Mar 2026 12:46:22 -0600 Subject: [PATCH 01/20] feat(water-level-import): normalize standalone CSV schema Introduce a dedicated schema for the standalone water-level importer and move header/value normalization into the schema layer. Changes: - add WaterLevelCsvRow to schemas/water_level_csv.py - support source CSV headers for field_staff_2, field_staff_3, water_level_date_time, and measuring_person - preserve compatibility aliases for measurement_date_time, sampler,and mp_height_ft - normalize blank optional values to None - normalize naive water-level datetimes from America/Denver to UTC - canonicalize sample method aliases such as electric tape and steel tape - ignore source-only columns hold(not saved) and cut(not saved) - wire services/water_level_csv.py to use the shared schema - add focused schema tests for aliasing and datetime normalization Impact: - the standalone importer now understands the new source CSV shape without changing persistence behavior yet - datetime handling is explicit and consistent instead of relying on mixed input formats downstream - later stages can add validation and persistence rules on top of a stable, reusable normalization layer --- schemas/water_level_csv.py | 141 +++++++++++++++++++++- services/water_level_csv.py | 173 +++++++-------------------- tests/test_water_level_csv_schema.py | 72 +++++++++++ 3 files changed, 256 insertions(+), 130 deletions(-) create mode 100644 tests/test_water_level_csv_schema.py diff --git a/schemas/water_level_csv.py b/schemas/water_level_csv.py index 00d71eaf4..572ad86c2 100644 --- a/schemas/water_level_csv.py +++ b/schemas/water_level_csv.py @@ -13,7 +13,142 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from pydantic import BaseModel +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Annotated + +from pydantic import ( + AliasChoices, + BaseModel, + ConfigDict, + Field, + field_validator, +) +from pydantic.functional_validators import BeforeValidator + +from services.util import convert_dt_tz_naive_to_tz_aware + +WATER_LEVEL_REQUIRED_FIELDS = [ + "well_name_point_id", + "field_event_date_time", + "field_staff", + "water_level_date_time", + "measuring_person", + "sample_method", +] + +WATER_LEVEL_HEADER_ALIASES = { + "measurement_date_time": "water_level_date_time", + "sampler": "measuring_person", + "mp_height_ft": "mp_height", +} + +WATER_LEVEL_IGNORED_FIELDS = { + "hold(not saved)", + "cut(not saved)", +} + +SAMPLE_METHOD_ALIASES = { + "electric tape": "Electric tape measurement (E-probe)", + "steel tape": "Steel-tape measurement", +} +SAMPLE_METHOD_CANONICAL = { + value.lower(): value for value in SAMPLE_METHOD_ALIASES.values() +} + + +def empty_str_to_none(value): + if isinstance(value, str) and value.strip() == "": + return None + return value + + +OptionalText = Annotated[str | None, BeforeValidator(empty_str_to_none)] +OptionalFloat = Annotated[float | None, BeforeValidator(empty_str_to_none)] + + +def _normalize_datetime_to_utc(value: datetime | str) -> datetime: + if isinstance(value, str): + value = datetime.fromisoformat(value) + elif not isinstance(value, datetime): + raise ValueError("value must be a datetime or ISO format string") + + if value.tzinfo is None: + value = convert_dt_tz_naive_to_tz_aware(value, "America/Denver") + + return value.astimezone(timezone.utc) + + +class WaterLevelCsvRow(BaseModel): + model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) + + well_name_point_id: str + field_event_date_time: datetime + field_staff: str + field_staff_2: OptionalText = None + field_staff_3: OptionalText = None + water_level_date_time: datetime = Field( + validation_alias=AliasChoices( + "water_level_date_time", + "measurement_date_time", + ) + ) + measuring_person: str = Field( + validation_alias=AliasChoices("measuring_person", "sampler") + ) + sample_method: str + mp_height: OptionalFloat = Field( + default=None, + validation_alias=AliasChoices("mp_height", "mp_height_ft"), + ) + level_status: OptionalText = None + depth_to_water_ft: OptionalFloat = None + data_quality: OptionalText = None + water_level_notes: OptionalText = None + + @property + def measurement_date_time(self) -> datetime: + return self.water_level_date_time + + @property + def sampler(self) -> str: + return self.measuring_person + + @classmethod + def required_fields(cls) -> list[str]: + return list(WATER_LEVEL_REQUIRED_FIELDS) + + @classmethod + def header_aliases(cls) -> dict[str, str]: + return dict(WATER_LEVEL_HEADER_ALIASES) + + @classmethod + def ignored_fields(cls) -> set[str]: + return set(WATER_LEVEL_IGNORED_FIELDS) + + @staticmethod + def canonicalize_sample_method(value: str) -> str: + normalized = value.strip().lower() + if normalized in SAMPLE_METHOD_ALIASES: + return SAMPLE_METHOD_ALIASES[normalized] + if normalized in SAMPLE_METHOD_CANONICAL: + return SAMPLE_METHOD_CANONICAL[normalized] + return value.strip() + + @field_validator("sample_method") + @classmethod + def normalize_sample_method(cls, value: str) -> str: + return cls.canonicalize_sample_method(value) + + @field_validator( + "field_event_date_time", + "water_level_date_time", + mode="before", + ) + @classmethod + def normalize_datetime_field(cls, value: datetime | str) -> datetime: + return _normalize_datetime_to_utc(value) class WaterLevelBulkUploadSummary(BaseModel): @@ -29,8 +164,8 @@ class WaterLevelBulkUploadRow(BaseModel): sample_id: int observation_id: int measurement_date_time: str - level_status: str - data_quality: str + level_status: str | None + data_quality: str | None class WaterLevelBulkUploadResponse(BaseModel): diff --git a/services/water_level_csv.py b/services/water_level_csv.py index f695fcd14..e8a743174 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -27,43 +27,19 @@ from db import Thing, FieldEvent, FieldActivity, Sample, Observation, Parameter from db.engine import session_ctx -from pydantic import BaseModel, ConfigDict, ValidationError, field_validator +from pydantic import ValidationError +from schemas.water_level_csv import ( + WaterLevelCsvRow, + WATER_LEVEL_REQUIRED_FIELDS, + WATER_LEVEL_HEADER_ALIASES, + WATER_LEVEL_IGNORED_FIELDS, +) from sqlalchemy import select from sqlalchemy.orm import Session -# Required CSV columns for the bulk upload -REQUIRED_FIELDS: List[str] = [ - "field_staff", - "well_name_point_id", - "field_event_date_time", - "measurement_date_time", - "sampler", - "sample_method", - "mp_height", - "level_status", - "depth_to_water_ft", - "data_quality", -] - -HEADER_ALIASES: dict[str, str] = { - "measuring_person": "sampler", - "water_level_date_time": "measurement_date_time", -} - -# Allow-list values for validation. These represent early MVP lexicon values. -VALID_LEVEL_STATUSES = {"stable", "rising", "falling"} -VALID_DATA_QUALITIES = {"approved", "provisional"} -VALID_SAMPLERS = {"groundwater team", "consultant"} - -# Mapping between human-friendly sample methods provided in CSV uploads and -# their canonical lexicon terms stored in the database. -SAMPLE_METHOD_ALIASES = { - "electric tape": "Electric tape measurement (E-probe)", - "steel tape": "Steel-tape measurement", -} -SAMPLE_METHOD_CANONICAL = { - value.lower(): value for value in SAMPLE_METHOD_ALIASES.values() -} +REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) +HEADER_ALIASES: dict[str, str] = dict(WATER_LEVEL_HEADER_ALIASES) +IGNORED_FIELDS: set[str] = set(WATER_LEVEL_IGNORED_FIELDS) @dataclass @@ -84,91 +60,13 @@ class _ValidatedRow: sample_method_term: str field_event_dt: datetime measurement_dt: datetime - mp_height: float - depth_to_water_ft: float - level_status: str - data_quality: str + mp_height: float | None + depth_to_water_ft: float | None + level_status: str | None + data_quality: str | None water_level_notes: str | None -class WaterLevelCsvRow(BaseModel): - model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) - - field_staff: str - well_name_point_id: str - field_event_date_time: datetime - measurement_date_time: datetime - sampler: str - sample_method: str - mp_height: float - level_status: str - depth_to_water_ft: float - data_quality: str - water_level_notes: str | None = None - - @field_validator( - "field_staff", - "well_name_point_id", - "sampler", - "sample_method", - "level_status", - "data_quality", - ) - @classmethod - def _require_value(cls, value: str) -> str: - if value is None or value == "": - raise ValueError("value is required") - return value - - @field_validator("sampler") - @classmethod - def _validate_sampler(cls, value: str) -> str: - if value.lower() not in VALID_SAMPLERS: - raise ValueError( - f"Invalid sampler '{value}'. Expected one of: {sorted(VALID_SAMPLERS)}" - ) - return value - - @field_validator("level_status") - @classmethod - def _validate_level_status(cls, value: str) -> str: - if value.lower() not in VALID_LEVEL_STATUSES: - raise ValueError( - f"Invalid level_status '{value}'. Expected one of: {sorted(VALID_LEVEL_STATUSES)}" - ) - return value - - @field_validator("data_quality") - @classmethod - def _validate_data_quality(cls, value: str) -> str: - if value.lower() not in VALID_DATA_QUALITIES: - raise ValueError( - f"Invalid data_quality '{value}'. Expected one of: {sorted(VALID_DATA_QUALITIES)}" - ) - return value - - @field_validator("sample_method") - @classmethod - def _normalize_sample_method(cls, value: str) -> str: - normalized = value.lower() - if normalized in SAMPLE_METHOD_ALIASES: - return SAMPLE_METHOD_ALIASES[normalized] - if normalized in SAMPLE_METHOD_CANONICAL: - return SAMPLE_METHOD_CANONICAL[normalized] - raise ValueError( - f"Invalid sample_method '{value}'. Expected one of: {sorted(SAMPLE_METHOD_ALIASES.keys())}" - ) - - @field_validator("water_level_notes", mode="before") - @classmethod - def _empty_to_none(cls, value: str | None) -> str | None: - if value is None: - return None - if isinstance(value, str) and value.strip() == "": - return None - return value - - def bulk_upload_water_levels( source_file: str | Path | bytes | BinaryIO, *, pretty_json: bool = False ) -> BulkUploadResult: @@ -180,7 +78,12 @@ def bulk_upload_water_levels( msg = f"File not found: {source_file}" payload = _build_payload([], [], 0, 0, 1, errors=[msg]) stdout = _serialize_payload(payload, pretty_json) - return BulkUploadResult(exit_code=1, stdout=stdout, stderr=msg, payload=payload) + return BulkUploadResult( + exit_code=1, + stdout=stdout, + stderr=msg, + payload=payload, + ) validation_errors: list[str] = [] created_rows: list[dict[str, Any]] = [] @@ -188,7 +91,8 @@ def bulk_upload_water_levels( with session_ctx() as session: parameter_id = _get_groundwater_level_parameter_id(session) - # Validate headers early so we can short-circuit without touching the DB. + # Validate headers early so we can short-circuit + # without touching the DB. header_errors = _validate_headers(headers) if header_errors: validation_errors.extend(header_errors) @@ -198,7 +102,11 @@ def bulk_upload_water_levels( if not validation_errors: try: - created_rows = _create_records(session, parameter_id, valid_rows) + created_rows = _create_records( + session, + parameter_id, + valid_rows, + ) session.commit() except Exception as exc: # pragma: no cover - safety fallback session.rollback() @@ -209,7 +117,7 @@ def bulk_upload_water_levels( summary = { "total_rows_processed": len(csv_rows), - "total_rows_imported": len(created_rows) if not validation_errors else 0, + "total_rows_imported": (len(created_rows) if not validation_errors else 0), "validation_errors_or_warnings": _count_rows_with_issues(validation_errors), } payload = _build_payload( @@ -288,16 +196,22 @@ def _read_csv( for k, v in row.items(): if k is None: continue - key = HEADER_ALIASES.get(k.strip(), k.strip()) + stripped_key = k.strip() + if stripped_key in IGNORED_FIELDS: + continue + key = HEADER_ALIASES.get(stripped_key, stripped_key) value = v.strip() if isinstance(v, str) else v or "" - # If both alias and canonical header are present, preserve first non-empty value. + # If both alias and canonical header are present, + # preserve the first non-empty value. if key in normalized_row and normalized_row[key] and not value: continue normalized_row[key] = value rows.append(normalized_row) headers = [ - HEADER_ALIASES.get(h.strip(), h.strip()) for h in (reader.fieldnames or []) + HEADER_ALIASES.get(h.strip(), h.strip()) + for h in (reader.fieldnames or []) + if h is not None and h.strip() not in IGNORED_FIELDS ] return headers, rows @@ -350,10 +264,10 @@ def _validate_rows( raw={**normalized}, well=well, field_staff=model.field_staff, - sampler=model.sampler, + sampler=model.measuring_person, sample_method_term=model.sample_method, field_event_dt=model.field_event_date_time, - measurement_dt=model.measurement_date_time, + measurement_dt=model.water_level_date_time, mp_height=model.mp_height, depth_to_water_ft=model.depth_to_water_ft, level_status=model.level_status, @@ -413,7 +327,8 @@ def _create_records( "field_activity_id": field_activity.id, "sample_id": sample.id, "observation_id": observation.id, - "measurement_date_time": row.raw["measurement_date_time"], + "measurement_date_time": row.raw.get("water_level_date_time") + or row.raw.get("measurement_date_time"), "level_status": row.level_status, "data_quality": row.data_quality, } @@ -431,7 +346,11 @@ def _build_field_event_notes(row: _ValidatedRow) -> str | None: def _build_observation_notes(row: _ValidatedRow) -> str | None: - parts = [f"Level status: {row.level_status}", f"Data quality: {row.data_quality}"] + parts = [] + if row.level_status is not None: + parts.append(f"Level status: {row.level_status}") + if row.data_quality is not None: + parts.append(f"Data quality: {row.data_quality}") notes = " | ".join(parts) return notes or None diff --git a/tests/test_water_level_csv_schema.py b/tests/test_water_level_csv_schema.py new file mode 100644 index 000000000..d83444714 --- /dev/null +++ b/tests/test_water_level_csv_schema.py @@ -0,0 +1,72 @@ +from datetime import datetime, timezone + +from schemas.water_level_csv import WaterLevelCsvRow + + +def test_water_level_csv_row_normalizes_source_headers_and_naive_datetimes(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + field_staff_2="", + field_staff_3="", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="electric tape", + mp_height="1.5", + level_status="Water level not affected", + depth_to_water_ft="45.2", + data_quality="Approved", + water_level_notes="Initial measurement", + ) + + assert row.field_staff_2 is None + assert row.field_staff_3 is None + assert row.sample_method == "Electric tape measurement (E-probe)" + assert row.field_event_date_time == datetime( + 2025, 2, 15, 15, 0, tzinfo=timezone.utc + ) + assert row.water_level_date_time == datetime( + 2025, 2, 15, 17, 30, tzinfo=timezone.utc + ) + + +def test_water_level_csv_row_accepts_legacy_alias_headers(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00-07:00", + field_staff="Tech 1", + measurement_date_time="2025-02-15T10:30:00-07:00", + sampler="Tech 1", + sample_method="Steel-tape measurement", + mp_height_ft="2.5", + ) + + assert row.measuring_person == "Tech 1" + assert row.sampler == "Tech 1" + assert row.mp_height == 2.5 + assert row.measurement_date_time == datetime( + 2025, 2, 15, 17, 30, tzinfo=timezone.utc + ) + + +def test_water_level_csv_row_normalizes_blank_optional_values_to_none(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + mp_height="", + level_status="", + depth_to_water_ft="", + data_quality="", + water_level_notes="", + ) + + assert row.mp_height is None + assert row.level_status is None + assert row.depth_to_water_ft is None + assert row.data_quality is None + assert row.water_level_notes is None From eccf31f9bf15735757ad8303e36dfb3c11e1a50f Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 26 Mar 2026 16:31:29 -0600 Subject: [PATCH 02/20] feat(water-level-import): enhance validation and resolve mp_height Tighten standalone water-level validation and add well-backed measuring point height resolution ahead of the persistence refactor. Changes: - enforce measuring_person, datetime ordering, and conditional level_status validation in WaterLevelCsvRow - reject negative depth_to_water_ft values while allowing negative mp_height values for surveyed reference points below land surface - validate data_quality against the DataQuality enum and normalize shorthand level_status values to canonical groundwater terms - resolve measuring_point_height from CSV input or the well's current MeasuringPointHistory-derived value - reject rows where depth_to_water_ft implies a water level below the bottom of the well - update schema, service, and API tests for the new rules --- schemas/water_level_csv.py | 79 +++++++++++++++++- services/water_level_csv.py | 77 +++++++++++++++++- tests/test_observation.py | 8 +- tests/test_water_level_csv_schema.py | 113 +++++++++++++++++++++++++- tests/test_water_level_csv_service.py | 86 ++++++++++++++++++++ 5 files changed, 351 insertions(+), 12 deletions(-) create mode 100644 tests/test_water_level_csv_service.py diff --git a/schemas/water_level_csv.py b/schemas/water_level_csv.py index 572ad86c2..32f333331 100644 --- a/schemas/water_level_csv.py +++ b/schemas/water_level_csv.py @@ -18,12 +18,14 @@ from datetime import datetime, timezone from typing import Annotated +from core.enums import DataQuality, GroundwaterLevelReason, SampleMethod from pydantic import ( AliasChoices, BaseModel, ConfigDict, Field, field_validator, + model_validator, ) from pydantic.functional_validators import BeforeValidator @@ -56,6 +58,20 @@ SAMPLE_METHOD_CANONICAL = { value.lower(): value for value in SAMPLE_METHOD_ALIASES.values() } +GROUNDWATER_LEVEL_REASON_ALIASES = { + "dry": "Site was dry", + "obstructed": ("Obstruction was encountered in the well (no level recorded)"), + "obstruction": ("Obstruction was encountered in the well (no level recorded)"), + "flowing": ( + "Site was flowing. Water level or head couldn't be measured " + "w/out additional equipment." + ), + "flowing recently": "Site was flowing recently.", + "pumped": "Site was being pumped", + "pumped recently": "Site was pumped recently", + "not affected": "Water level not affected", + "other": "Other conditions exist that would affect the level (remarks)", +} def empty_str_to_none(value): @@ -80,6 +96,20 @@ def _normalize_datetime_to_utc(value: datetime | str) -> datetime: return value.astimezone(timezone.utc) +def _canonicalize_enum_value( + value: str | None, enum_cls, field_name: str +) -> str | None: + if value is None: + return None + + normalized = value.strip().lower() + for item in enum_cls: + if item.value.lower() == normalized: + return item.value + + raise ValueError(f"Unknown {field_name}: {value}") + + class WaterLevelCsvRow(BaseModel): model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) @@ -139,7 +169,11 @@ def canonicalize_sample_method(value: str) -> str: @field_validator("sample_method") @classmethod def normalize_sample_method(cls, value: str) -> str: - return cls.canonicalize_sample_method(value) + return _canonicalize_enum_value( + cls.canonicalize_sample_method(value), + SampleMethod, + "sample_method", + ) @field_validator( "field_event_date_time", @@ -150,6 +184,49 @@ def normalize_sample_method(cls, value: str) -> str: def normalize_datetime_field(cls, value: datetime | str) -> datetime: return _normalize_datetime_to_utc(value) + @field_validator("depth_to_water_ft") + @classmethod + def validate_non_negative_depth_to_water(cls, value: float | None) -> float | None: + if value is not None and value < 0: + raise ValueError("depth_to_water_ft must be greater than or equal to 0") + return value + + @field_validator("level_status") + @classmethod + def normalize_level_status(cls, value: str | None) -> str | None: + if value is not None: + value = GROUNDWATER_LEVEL_REASON_ALIASES.get(value.strip().lower(), value) + return _canonicalize_enum_value(value, GroundwaterLevelReason, "level_status") + + @field_validator("data_quality") + @classmethod + def normalize_data_quality(cls, value: str | None) -> str | None: + return _canonicalize_enum_value(value, DataQuality, "data_quality") + + @model_validator(mode="after") + def validate_row_constraints(self) -> WaterLevelCsvRow: + field_staff = [ + staff + for staff in (self.field_staff, self.field_staff_2, self.field_staff_3) + if staff + ] + if self.measuring_person not in field_staff: + raise ValueError( + "measuring_person must match one of field_staff, " + "field_staff_2, or field_staff_3" + ) + + if self.water_level_date_time < self.field_event_date_time: + raise ValueError( + "water_level_date_time must be greater than or equal to " + "field_event_date_time" + ) + + if self.depth_to_water_ft is None and self.level_status is None: + raise ValueError("level_status is required when depth_to_water_ft is blank") + + return self + class WaterLevelBulkUploadSummary(BaseModel): total_rows_processed: int diff --git a/services/water_level_csv.py b/services/water_level_csv.py index e8a743174..d7cf4f2c0 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -35,7 +35,7 @@ WATER_LEVEL_IGNORED_FIELDS, ) from sqlalchemy import select -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, selectinload REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) HEADER_ALIASES: dict[str, str] = dict(WATER_LEVEL_HEADER_ALIASES) @@ -61,6 +61,9 @@ class _ValidatedRow: field_event_dt: datetime measurement_dt: datetime mp_height: float | None + resolved_mp_height: float | int | None + existing_mp_height: float | int | None + mp_height_differs_from_history: bool depth_to_water_ft: float | None level_status: str | None data_quality: str | None @@ -251,13 +254,36 @@ def _validate_rows( well_name = model.well_name_point_id well = wells_by_name.get(well_name) if well is None: - sql = select(Thing).where(Thing.name == well_name) + sql = ( + select(Thing) + .options(selectinload(Thing.measuring_points)) + .where( + Thing.name == well_name, + Thing.thing_type == "water well", + ) + ) well = session.scalars(sql).one_or_none() if well is None: errors.append(f"Row {idx}: Unknown well_name_point_id '{well_name}'") continue wells_by_name[well_name] = well + ( + resolved_mp_height, + existing_mp_height, + mp_height_differs_from_history, + ) = _resolve_measuring_point_height(well, model.mp_height) + + depth_error = _validate_depth_to_water_against_well( + idx, + well, + model.depth_to_water_ft, + resolved_mp_height, + ) + if depth_error: + errors.append(depth_error) + continue + valid_rows.append( _ValidatedRow( row_index=idx, @@ -269,6 +295,9 @@ def _validate_rows( field_event_dt=model.field_event_date_time, measurement_dt=model.water_level_date_time, mp_height=model.mp_height, + resolved_mp_height=resolved_mp_height, + existing_mp_height=existing_mp_height, + mp_height_differs_from_history=mp_height_differs_from_history, depth_to_water_ft=model.depth_to_water_ft, level_status=model.level_status, data_quality=model.data_quality, @@ -279,6 +308,44 @@ def _validate_rows( return valid_rows, errors +def _resolve_measuring_point_height( + well: Thing, csv_mp_height: float | None +) -> tuple[float | int | None, float | int | None, bool]: + existing_mp_height = well.measuring_point_height + if csv_mp_height is not None: + return ( + csv_mp_height, + existing_mp_height, + (existing_mp_height is not None and csv_mp_height != existing_mp_height), + ) + + return existing_mp_height, existing_mp_height, False + + +def _validate_depth_to_water_against_well( + row_index: int, + well: Thing, + depth_to_water_ft: float | None, + resolved_mp_height: float | int | None, +) -> str | None: + if ( + depth_to_water_ft is None + or resolved_mp_height is None + or well.well_depth is None + ): + return None + + corrected_depth_to_water = depth_to_water_ft - resolved_mp_height + if corrected_depth_to_water >= well.well_depth: + return ( + f"Row {row_index}: depth_to_water_ft minus measuring point height " + f"({corrected_depth_to_water}) must be less than well depth " + f"({well.well_depth})" + ) + + return None + + def _create_records( session: Session, parameter_id: int, rows: list[_ValidatedRow] ) -> list[dict[str, Any]]: @@ -327,8 +394,10 @@ def _create_records( "field_activity_id": field_activity.id, "sample_id": sample.id, "observation_id": observation.id, - "measurement_date_time": row.raw.get("water_level_date_time") - or row.raw.get("measurement_date_time"), + "measurement_date_time": ( + row.raw.get("water_level_date_time") + or row.raw.get("measurement_date_time") + ), "level_status": row.level_status, "data_quality": row.data_quality, } diff --git a/tests/test_observation.py b/tests/test_observation.py index 386c823cf..34d2bc41e 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -161,12 +161,12 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): water_well_thing.name, "2025-02-15T08:00:00-07:00", "2025-02-15T10:30:00-07:00", - "Groundwater Team", + "A Lopez", "electric tape", "1.5", - "stable", - "45.2", - "approved", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", "Initial measurement", ] ) diff --git a/tests/test_water_level_csv_schema.py b/tests/test_water_level_csv_schema.py index d83444714..09abed2dd 100644 --- a/tests/test_water_level_csv_schema.py +++ b/tests/test_water_level_csv_schema.py @@ -1,7 +1,12 @@ from datetime import datetime, timezone +import pytest +from pydantic import ValidationError + from schemas.water_level_csv import WaterLevelCsvRow +DATA_QUALITY_VALUE = "Water level accurate to within two hundreths of a foot" + def test_water_level_csv_row_normalizes_source_headers_and_naive_datetimes(): row = WaterLevelCsvRow( @@ -16,7 +21,7 @@ def test_water_level_csv_row_normalizes_source_headers_and_naive_datetimes(): mp_height="1.5", level_status="Water level not affected", depth_to_water_ft="45.2", - data_quality="Approved", + data_quality=DATA_QUALITY_VALUE, water_level_notes="Initial measurement", ) @@ -40,6 +45,7 @@ def test_water_level_csv_row_accepts_legacy_alias_headers(): sampler="Tech 1", sample_method="Steel-tape measurement", mp_height_ft="2.5", + depth_to_water_ft="45.2", ) assert row.measuring_person == "Tech 1" @@ -59,14 +65,115 @@ def test_water_level_csv_row_normalizes_blank_optional_values_to_none(): measuring_person="Tech 1", sample_method="Steel-tape measurement", mp_height="", - level_status="", + level_status="Water level not affected", depth_to_water_ft="", data_quality="", water_level_notes="", ) assert row.mp_height is None - assert row.level_status is None + assert row.level_status == "Water level not affected" assert row.depth_to_water_ft is None assert row.data_quality is None assert row.water_level_notes is None + + +def test_water_level_csv_row_requires_measuring_person_to_match_field_staff(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + field_staff_2="Tech 2", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 3", + sample_method="Steel-tape measurement", + depth_to_water_ft="45.2", + ) + + assert ( + "measuring_person must match one of field_staff, field_staff_2, " + "or field_staff_3" + ) in str(exc.value) + + +def test_water_level_csv_row_requires_level_status_when_depth_is_blank(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="", + level_status="", + ) + + assert "level_status is required when depth_to_water_ft is blank" in str(exc.value) + + +def test_water_level_csv_row_rejects_water_level_before_field_event(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T10:30:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T08:00:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="45.2", + ) + + assert ( + "water_level_date_time must be greater than or equal to " + "field_event_date_time" + ) in str(exc.value) + + +def test_water_level_csv_row_canonicalizes_case_insensitive_lexicon_values(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="electric tape measurement (e-probe)", + depth_to_water_ft="", + level_status="dry", + data_quality=DATA_QUALITY_VALUE.lower(), + ) + + assert row.sample_method == "Electric tape measurement (E-probe)" + assert row.level_status == "Site was dry" + assert row.data_quality == DATA_QUALITY_VALUE + + +def test_water_level_csv_row_allows_negative_mp_height(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + mp_height="-0.1", + depth_to_water_ft="45.2", + ) + + assert row.mp_height == -0.1 + + +def test_water_level_csv_row_rejects_negative_depth_to_water(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="-0.1", + ) + + assert "depth_to_water_ft must be greater than or equal to 0" in str(exc.value) diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py new file mode 100644 index 000000000..a1c1e1770 --- /dev/null +++ b/tests/test_water_level_csv_service.py @@ -0,0 +1,86 @@ +from datetime import date + +from db import Thing +from db.measuring_point_history import MeasuringPointHistory +from services.water_level_csv import ( + _resolve_measuring_point_height, + _validate_depth_to_water_against_well, +) + + +def _build_well( + *, + well_depth: float | None = None, + measuring_point_height: float | None = None, +) -> Thing: + well = Thing(name="AR0001", thing_type="water well", well_depth=well_depth) + well.measuring_points = [] + if measuring_point_height is not None: + well.measuring_points.append( + MeasuringPointHistory( + start_date=date(2025, 1, 1), + measuring_point_height=measuring_point_height, + ) + ) + return well + + +def test_resolve_measuring_point_height_prefers_csv_value(): + well = _build_well(measuring_point_height=3.5) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, 4.0) + + assert resolved_mp_height == 4.0 + assert existing_mp_height == 3.5 + assert differs is True + + +def test_resolve_measuring_point_height_falls_back_to_well_history(): + well = _build_well(measuring_point_height=3.5) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height == 3.5 + assert existing_mp_height == 3.5 + assert differs is False + + +def test_resolve_measuring_point_height_allows_missing_values(): + well = _build_well() + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height is None + assert existing_mp_height is None + assert differs is False + + +def test_validate_depth_to_water_against_well_rejects_depth_past_bottom(): + well = _build_well(well_depth=10.0) + + error = _validate_depth_to_water_against_well(4, well, 12.5, 1.0) + + assert ( + error == "Row 4: depth_to_water_ft minus measuring point height (11.5) " + "must be less than well depth (10.0)" + ) + + +def test_validate_depth_to_water_against_well_skips_when_height_unavailable(): + well = _build_well(well_depth=10.0) + + error = _validate_depth_to_water_against_well(4, well, 12.5, None) + + assert error is None From 6a09881afc9b0107caffe3915577c5cdd02bf5ba Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 26 Mar 2026 17:02:18 -0600 Subject: [PATCH 03/20] feat(water-level-import): add idempotent groundwater persistence Switch the standalone importer from placeholder inserts to deterministic, structured groundwater persistence. Changes: - replace random UUID sample names with deterministic well-inventory-style sample names based on well name and measurement timestamp - look up existing standalone groundwater-level samples by deterministic sample name and update them on rerun instead of inserting duplicates - persist samples with sample_matrix="groundwater" - resolve the groundwater-level parameter by name and groundwater matrix, creating it when missing - write structured groundwater observation fields instead of relying on note strings - persist resolved_mp_height to Observation.measuring_point_height - persist level_status to Observation.groundwater_level_reason - persist data_quality to Observation.nma_data_quality - persist water_level_notes directly to sample and observation notes - add tests for deterministic sample naming, idempotent reruns, and API-level structured groundwater persistence Impact: - rerunning the same standalone water-level CSV is now idempotent - imported groundwater observations are stored with the intended groundwater semantics instead of the earlier placeholder behavior - downstream queries can rely on structured observation fields rather than extracting meaning from generated note text --- services/water_level_csv.py | 154 ++++++++++++++++++-------- tests/test_observation.py | 11 +- tests/test_water_level_csv_service.py | 103 ++++++++++++++++- 3 files changed, 217 insertions(+), 51 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index d7cf4f2c0..ba7654d8f 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -19,7 +19,6 @@ import io import json import re -import uuid from dataclasses import dataclass from datetime import datetime from pathlib import Path @@ -352,39 +351,40 @@ def _create_records( created: list[dict[str, Any]] = [] for row in rows: - field_event = FieldEvent( - thing=row.well, - event_date=row.field_event_dt, - notes=_build_field_event_notes(row), - ) - field_activity = FieldActivity( - field_event=field_event, - activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", - ) - sample = Sample( - field_activity=field_activity, - sample_date=row.measurement_dt, - sample_name=f"wl-{uuid.uuid4()}", - sample_matrix="water", - sample_method=row.sample_method_term, - qc_type="Normal", - notes=row.water_level_notes, - ) - observation = Observation( - sample=sample, - observation_datetime=row.measurement_dt, - parameter_id=parameter_id, - value=row.depth_to_water_ft, - unit="ft", - measuring_point_height=row.mp_height, - groundwater_level_reason=None, - notes=_build_observation_notes(row), - ) - session.add(field_event) - session.add(field_activity) - session.add(sample) - session.add(observation) + sample_name = _build_sample_name(row) + sample = _find_existing_imported_sample(session, row, sample_name) + + if sample is None: + field_event = FieldEvent( + thing=row.well, + event_date=row.field_event_dt, + notes=_build_field_event_notes(row), + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes=f"Sampler: {row.sampler}", + ) + sample = Sample(field_activity=field_activity) + observation = Observation(sample=sample) + session.add(field_event) + session.add(field_activity) + session.add(sample) + session.add(observation) + else: + field_activity = sample.field_activity + field_event = field_activity.field_event + observation = _find_existing_observation(sample, parameter_id) + if observation is None: + observation = Observation(sample=sample) + session.add(observation) + + field_event.event_date = row.field_event_dt + field_event.notes = _build_field_event_notes(row) + field_activity.notes = f"Sampler: {row.sampler}" + + _apply_sample_values(sample, row, sample_name) + _apply_observation_values(observation, row, parameter_id) session.flush() created.append( @@ -406,6 +406,62 @@ def _create_records( return created +def _build_sample_name(row: _ValidatedRow) -> str: + return f"{row.well.name}-WL-{row.measurement_dt.strftime('%Y%m%d%H%M')}" + + +def _find_existing_imported_sample( + session: Session, row: _ValidatedRow, sample_name: str +) -> Sample | None: + sql = ( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .options( + selectinload(Sample.field_activity).selectinload(FieldActivity.field_event), + selectinload(Sample.observations), + ) + .where( + Thing.name == row.well.name, + Thing.thing_type == "water well", + FieldActivity.activity_type == "groundwater level", + Sample.sample_name == sample_name, + ) + .order_by(Sample.id.asc()) + ) + return session.scalars(sql).first() + + +def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation | None: + for observation in sample.observations: + if observation.parameter_id == parameter_id: + return observation + return sample.observations[0] if sample.observations else None + + +def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None: + sample.sample_date = row.measurement_dt + sample.sample_name = sample_name + sample.sample_matrix = "groundwater" + sample.sample_method = row.sample_method_term + sample.qc_type = "Normal" + sample.notes = row.water_level_notes + + +def _apply_observation_values( + observation: Observation, row: _ValidatedRow, parameter_id: int +) -> None: + observation.observation_datetime = row.measurement_dt + observation.parameter_id = parameter_id + observation.value = row.depth_to_water_ft + observation.unit = "ft" + observation.measuring_point_height = row.resolved_mp_height + observation.groundwater_level_reason = row.level_status + observation.nma_data_quality = row.data_quality + observation.notes = row.water_level_notes + + def _build_field_event_notes(row: _ValidatedRow) -> str | None: parts = [f"Field staff: {row.field_staff}"] if row.water_level_notes: @@ -414,22 +470,24 @@ def _build_field_event_notes(row: _ValidatedRow) -> str | None: return notes or None -def _build_observation_notes(row: _ValidatedRow) -> str | None: - parts = [] - if row.level_status is not None: - parts.append(f"Level status: {row.level_status}") - if row.data_quality is not None: - parts.append(f"Data quality: {row.data_quality}") - notes = " | ".join(parts) - return notes or None - - def _get_groundwater_level_parameter_id(session: Session) -> int: - sql = select(Parameter.id).where(Parameter.parameter_name == "groundwater level") + sql = select(Parameter.id).where( + Parameter.parameter_name == "groundwater level", + Parameter.matrix == "groundwater", + ) parameter_id = session.scalars(sql).one_or_none() - if parameter_id is None: - raise RuntimeError("Groundwater level parameter is not initialized") - return parameter_id + if parameter_id is not None: + return parameter_id + + parameter = Parameter( + parameter_name="groundwater level", + matrix="groundwater", + parameter_type="Field Parameter", + default_unit="ft", + ) + session.add(parameter) + session.flush() + return parameter.id # ============= EOF ============================================= diff --git a/tests/test_observation.py b/tests/test_observation.py index 34d2bc41e..b68453b00 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -188,10 +188,19 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): with session_ctx() as session: observation = session.get(Observation, row["observation_id"]) assert observation is not None + sample = session.get(Sample, row["sample_id"]) + assert sample is not None + assert sample.sample_name == f"{water_well_thing.name}-WL-202502151730" + assert sample.sample_matrix == "groundwater" + assert observation.groundwater_level_reason == "Water level not affected" + assert ( + observation.nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) + assert observation.measuring_point_height == 1.5 # cleanup in reverse dependency order if observation: session.delete(observation) - sample = session.get(Sample, row["sample_id"]) if sample: session.delete(sample) field_activity = session.get(FieldActivity, row["field_activity_id"]) diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index a1c1e1770..7f619d7d1 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -1,11 +1,16 @@ -from datetime import date +from datetime import date, datetime, timezone +from types import SimpleNamespace -from db import Thing +from db import FieldActivity, FieldEvent, Observation, Sample, Thing from db.measuring_point_history import MeasuringPointHistory +from db.engine import session_ctx from services.water_level_csv import ( + _build_sample_name, _resolve_measuring_point_height, _validate_depth_to_water_against_well, + bulk_upload_water_levels, ) +from sqlalchemy import select def _build_well( @@ -84,3 +89,97 @@ def test_validate_depth_to_water_against_well_skips_when_height_unavailable(): error = _validate_depth_to_water_against_well(4, well, 12.5, None) assert error is None + + +def test_build_sample_name_uses_deterministic_well_inventory_style_format(): + row = SimpleNamespace( + well=SimpleNamespace(name="AR0001"), + measurement_dt=datetime(2025, 2, 15, 10, 30, tzinfo=timezone.utc), + ) + + assert _build_sample_name(row) == "AR0001-WL-202502151030" + + +def test_bulk_upload_water_levels_is_idempotent(water_well_thing): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + first = bulk_upload_water_levels(csv_content.encode("utf-8")) + second = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert first.exit_code == 0, first.payload + assert second.exit_code == 0, second.payload + assert ( + first.payload["water_levels"][0]["sample_id"] + == second.payload["water_levels"][0]["sample_id"] + ) + assert ( + first.payload["water_levels"][0]["observation_id"] + == second.payload["water_levels"][0]["observation_id"] + ) + + with session_ctx() as session: + samples = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + ) + ).all() + observations = session.scalars( + select(Observation) + .join(Sample, Observation.sample_id == Sample.id) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + ) + ).all() + + assert len(samples) == 1 + assert len(observations) == 1 + assert samples[0].sample_name == "Test Well-WL-202502151730" + assert samples[0].sample_matrix == "groundwater" + assert observations[0].groundwater_level_reason == "Water level not affected" + assert ( + observations[0].nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) + assert observations[0].measuring_point_height == 1.5 From 2f2f923daa61575f85d738156bb05d9d12d14780 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Fri, 27 Mar 2026 09:34:05 -0600 Subject: [PATCH 04/20] feat(water-level-import): add best-effort row savepoints Change the standalone water-level importer from all-or-nothing writes to best-effort row imports using per-row savepoints. Changes: - stop skipping all persistence when some rows have validation errors - wrap each standalone water-level row import in a nested transaction so one bad row does not roll back successful rows - continue processing remaining rows after row-level persistence errors - record row-specific persistence errors in the existing validation_errors output - report total_rows_imported as the number of successfully written rows even when the file contains failures - add mixed-row service coverage proving a valid row still imports when another row in the same file fails Impact: - standalone water-level CSV imports are now operationally resilient in the same way as well inventory imports - users can import good rows from partially bad files without manually splitting the source CSV first - later stages can refine API and CLI reporting without changing the core best-effort import behavior --- services/water_level_csv.py | 141 +++++++++++++------------- tests/test_water_level_csv_service.py | 66 ++++++++++++ 2 files changed, 137 insertions(+), 70 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index ba7654d8f..d01c9ce05 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -91,8 +91,6 @@ def bulk_upload_water_levels( created_rows: list[dict[str, Any]] = [] with session_ctx() as session: - parameter_id = _get_groundwater_level_parameter_id(session) - # Validate headers early so we can short-circuit # without touching the DB. header_errors = _validate_headers(headers) @@ -102,24 +100,22 @@ def bulk_upload_water_levels( valid_rows, row_errors = _validate_rows(session, csv_rows) validation_errors.extend(row_errors) - if not validation_errors: - try: - created_rows = _create_records( - session, - parameter_id, - valid_rows, - ) - session.commit() - except Exception as exc: # pragma: no cover - safety fallback - session.rollback() - validation_errors.append(str(exc)) - - if validation_errors: - session.rollback() + try: + parameter_id = _get_groundwater_level_parameter_id(session) + created_rows, persistence_errors = _create_records( + session, + parameter_id, + valid_rows, + ) + validation_errors.extend(persistence_errors) + session.commit() + except Exception as exc: # pragma: no cover - safety fallback + session.rollback() + validation_errors.append(str(exc)) summary = { "total_rows_processed": len(csv_rows), - "total_rows_imported": (len(created_rows) if not validation_errors else 0), + "total_rows_imported": len(created_rows), "validation_errors_or_warnings": _count_rows_with_issues(validation_errors), } payload = _build_payload( @@ -347,63 +343,68 @@ def _validate_depth_to_water_against_well( def _create_records( session: Session, parameter_id: int, rows: list[_ValidatedRow] -) -> list[dict[str, Any]]: +) -> tuple[list[dict[str, Any]], list[str]]: created: list[dict[str, Any]] = [] + errors: list[str] = [] for row in rows: - sample_name = _build_sample_name(row) - sample = _find_existing_imported_sample(session, row, sample_name) - - if sample is None: - field_event = FieldEvent( - thing=row.well, - event_date=row.field_event_dt, - notes=_build_field_event_notes(row), - ) - field_activity = FieldActivity( - field_event=field_event, - activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", - ) - sample = Sample(field_activity=field_activity) - observation = Observation(sample=sample) - session.add(field_event) - session.add(field_activity) - session.add(sample) - session.add(observation) - else: - field_activity = sample.field_activity - field_event = field_activity.field_event - observation = _find_existing_observation(sample, parameter_id) - if observation is None: - observation = Observation(sample=sample) - session.add(observation) - - field_event.event_date = row.field_event_dt - field_event.notes = _build_field_event_notes(row) - field_activity.notes = f"Sampler: {row.sampler}" - - _apply_sample_values(sample, row, sample_name) - _apply_observation_values(observation, row, parameter_id) - session.flush() - - created.append( - { - "well_name_point_id": row.raw["well_name_point_id"], - "field_event_id": field_event.id, - "field_activity_id": field_activity.id, - "sample_id": sample.id, - "observation_id": observation.id, - "measurement_date_time": ( - row.raw.get("water_level_date_time") - or row.raw.get("measurement_date_time") - ), - "level_status": row.level_status, - "data_quality": row.data_quality, - } - ) + try: + with session.begin_nested(): + sample_name = _build_sample_name(row) + sample = _find_existing_imported_sample(session, row, sample_name) + + if sample is None: + field_event = FieldEvent( + thing=row.well, + event_date=row.field_event_dt, + notes=_build_field_event_notes(row), + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes=f"Sampler: {row.sampler}", + ) + sample = Sample(field_activity=field_activity) + observation = Observation(sample=sample) + session.add(field_event) + session.add(field_activity) + session.add(sample) + session.add(observation) + else: + field_activity = sample.field_activity + field_event = field_activity.field_event + observation = _find_existing_observation(sample, parameter_id) + if observation is None: + observation = Observation(sample=sample) + session.add(observation) + + field_event.event_date = row.field_event_dt + field_event.notes = _build_field_event_notes(row) + field_activity.notes = f"Sampler: {row.sampler}" + + _apply_sample_values(sample, row, sample_name) + _apply_observation_values(observation, row, parameter_id) + session.flush() + + created.append( + { + "well_name_point_id": row.raw["well_name_point_id"], + "field_event_id": field_event.id, + "field_activity_id": field_activity.id, + "sample_id": sample.id, + "observation_id": observation.id, + "measurement_date_time": ( + row.raw.get("water_level_date_time") + or row.raw.get("measurement_date_time") + ), + "level_status": row.level_status, + "data_quality": row.data_quality, + } + ) + except Exception as exc: # pragma: no cover - exercised via DB tests + errors.append(f"Row {row.row_index}: {exc}") - return created + return created, errors def _build_sample_name(row: _ValidatedRow) -> str: diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 7f619d7d1..4d8826422 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -183,3 +183,69 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): == "Water level accurate to within two hundreths of a foot" ) assert observations[0].measuring_point_height == 1.5 + + +def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ",".join( + [ + "A Lopez", + "Unknown Well", + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Bad row", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 1 + assert result.payload["summary"]["total_rows_processed"] == 2 + assert result.payload["summary"]["total_rows_imported"] == 1 + assert result.payload["summary"]["validation_errors_or_warnings"] == 1 + assert len(result.payload["water_levels"]) == 1 + assert ( + "Unknown well_name_point_id 'Unknown Well'" + in result.payload["validation_errors"][0] + ) From 4c3eea000a338e5031bb0296fc26ea94b35865a3 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Fri, 27 Mar 2026 10:14:51 -0600 Subject: [PATCH 05/20] feat(water-level-import): align partial-success API and CLI behavior Update the standalone water-level API and CLI contract to reflect the best-effort row import behavior. Changes: - treat mixed-result standalone water-level imports as successful when at least one row imports - keep total-failure imports as non-zero/400 outcomes when no rows are written - preserve row-level validation and persistence errors in the existing validation_errors payload - update the CLI status banner to distinguish clean success from completed-with-issues using summary data instead of exit code alone - add API coverage for partial-success bulk uploads - add CLI coverage for completed-with-issues reporting - update end-to-end CLI water-level expectations to current importer semantics from earlier stages Impact: - API callers no longer see a hard failure when valid rows were imported successfully from a mixed file - CLI users get more accurate status reporting for partial-success imports without losing warning visibility - the external contract now matches the row-level atomicity and best-effort behavior of the importer itself --- cli/cli.py | 10 +++- services/water_level_csv.py | 2 +- tests/test_cli_commands.py | 57 +++++++++++++++--- tests/test_observation.py | 83 +++++++++++++++++++++++++++ tests/test_water_level_csv_service.py | 2 +- 5 files changed, 143 insertions(+), 11 deletions(-) diff --git a/cli/cli.py b/cli/cli.py index 8a91f5f4b..50b2b24a9 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -666,9 +666,16 @@ def water_levels_bulk_upload( payload = result.payload if isinstance(result.payload, dict) else {} summary = payload.get("summary", {}) validation_errors = payload.get("validation_errors", []) + rows_with_issues = summary.get("validation_errors_or_warnings", 0) - if result.exit_code == 0: + if result.exit_code == 0 and not rows_with_issues: typer.secho("[WATER LEVEL IMPORT] SUCCESS", fg=colors["ok"], bold=True) + elif result.exit_code == 0: + typer.secho( + "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES", + fg=colors["issue"], + bold=True, + ) else: typer.secho( "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES", @@ -709,7 +716,6 @@ def water_levels_bulk_upload( if summary: processed = summary.get("total_rows_processed", 0) imported = summary.get("total_rows_imported", 0) - rows_with_issues = summary.get("validation_errors_or_warnings", 0) typer.secho("SUMMARY", fg=colors["accent"], bold=True) label_width = 16 value_width = 8 diff --git a/services/water_level_csv.py b/services/water_level_csv.py index d01c9ce05..99dd0848f 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -123,7 +123,7 @@ def bulk_upload_water_levels( ) stdout = _serialize_payload(payload, pretty_json) stderr = "\n".join(validation_errors) - exit_code = 0 if not validation_errors else 1 + exit_code = 0 if created_rows or not validation_errors else 1 return BulkUploadResult( exit_code=exit_code, stdout=stdout, stderr=stderr, payload=payload ) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index fb351fbd5..656dd91eb 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -643,6 +643,39 @@ def fake_upload(file_path, *, pretty_json=False): assert captured["pretty_json"] is True +def test_water_levels_bulk_upload_reports_partial_success(monkeypatch, tmp_path): + csv_file = tmp_path / "water_levels.csv" + csv_file.write_text("col\nvalue\n") + + def fake_upload(_file_path, *, pretty_json=False): + assert pretty_json is False + return SimpleNamespace( + exit_code=0, + stdout="", + stderr="Row 2: Unknown well_name_point_id 'Bad Well'", + payload={ + "summary": { + "total_rows_processed": 2, + "total_rows_imported": 1, + "validation_errors_or_warnings": 1, + }, + "validation_errors": ["Row 2: Unknown well_name_point_id 'Bad Well'"], + "water_levels": [{}], + }, + ) + + monkeypatch.setattr("cli.service_adapter.water_levels_csv", fake_upload) + + runner = CliRunner() + result = runner.invoke( + cli, ["water-levels", "bulk-upload", "--file", str(csv_file)] + ) + + assert result.exit_code == 0, result.output + assert "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES" in result.output + assert "rows_with_issues" in result.output + + def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ End-to-end CLI invocation should create FieldEvent, Sample, @@ -658,13 +691,17 @@ def _write_csv(path: Path, *, well_name: str, notes: str): ) row = ( f"CLI Tester,{well_name},2025-02-15T08:00:00-07:00," - "2025-02-15T10:30:00-07:00,Groundwater Team,electric tape," - f"1.5,stable,42.5,approved,{notes}" + "2025-02-15T10:30:00-07:00,CLI Tester,electric tape," + f"1.5,Water level not affected,7.0," + "Water level accurate to within two hundreths of a foot," + f"{notes}" ) - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ {header} {row} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" @@ -697,10 +734,16 @@ def _write_csv(path: Path, *, well_name: str, notes: str): assert field_event.thing_id == water_well_thing.id assert sample.sample_method == "Electric tape measurement (E-probe)" - assert sample.sample_matrix == "water" - assert observation.value == 42.5 + assert sample.sample_matrix == "groundwater" + assert sample.sample_name == f"{water_well_thing.name}-WL-202502151730" + assert observation.value == 7.0 assert observation.measuring_point_height == 1.5 - assert observation.notes == "Level status: stable | Data quality: approved" + assert observation.notes == unique_notes + assert observation.groundwater_level_reason == "Water level not affected" + assert ( + observation.nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) assert ( field_event.notes == f"Field staff: CLI Tester | {unique_notes}" ), "Field event notes should capture field staff and notes" diff --git a/tests/test_observation.py b/tests/test_observation.py index b68453b00..d1806788d 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -212,6 +212,89 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): session.commit() +def test_bulk_upload_groundwater_levels_api_partial_success(water_well_thing): + csv_content = ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ) + csv_content += "\n" + csv_content += "\n".join( + [ + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ",".join( + [ + "A Lopez", + "Bad Well", + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Bad row", + ] + ), + ] + ) + + files = { + "file": ("water_levels.csv", csv_content, "text/csv"), + } + + response = client.post("/observation/groundwater-level/bulk-upload", files=files) + data = response.json() + assert response.status_code == 200 + assert data["summary"]["total_rows_imported"] == 1 + assert data["summary"]["total_rows_processed"] == 2 + assert data["summary"]["validation_errors_or_warnings"] == 1 + assert len(data["validation_errors"]) == 1 + assert "Bad Well" in data["validation_errors"][0] + + row = data["water_levels"][0] + with session_ctx() as session: + observation = session.get(Observation, row["observation_id"]) + sample = session.get(Sample, row["sample_id"]) + field_activity = session.get(FieldActivity, row["field_activity_id"]) + field_event = session.get(FieldEvent, row["field_event_id"]) + + if observation: + session.delete(observation) + if sample: + session.delete(sample) + if field_activity: + session.delete(field_activity) + if field_event: + session.delete(field_event) + session.commit() + + # PATCH tests ================================================================== diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 4d8826422..3e10de66f 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -240,7 +240,7 @@ def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( result = bulk_upload_water_levels(csv_content.encode("utf-8")) - assert result.exit_code == 1 + assert result.exit_code == 0 assert result.payload["summary"]["total_rows_processed"] == 2 assert result.payload["summary"]["total_rows_imported"] == 1 assert result.payload["summary"]["validation_errors_or_warnings"] == 1 From 83bc1d94923593870b2bf28b8b7f03b769789b94 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Fri, 27 Mar 2026 12:25:03 -0600 Subject: [PATCH 06/20] fix(well-transfer): exclude monitoring_status from Thing creation Filtered well transfers failed while seeding dev data because CreateWell dumps included monitoring_status, but Thing exposes that as a computed status-history property rather than a writable ORM field. Exclude it from Thing creation so the transfer can persist wells and let monitoring status continue to be recorded through StatusHistory. --- tests/test_well_transfer.py | 51 +++++++++++++++++++++++++++++++++++++ transfers/well_transfer.py | 1 + 2 files changed, 52 insertions(+) create mode 100644 tests/test_well_transfer.py diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py new file mode 100644 index 000000000..bae3133a6 --- /dev/null +++ b/tests/test_well_transfer.py @@ -0,0 +1,51 @@ +from types import SimpleNamespace + +from schemas.thing import CreateWell +from transfers import well_transfer as wt + + +class _FakeSession: + def __init__(self): + self.added = [] + self.expunge_calls = [] + + def add(self, obj): + self.added.append(obj) + + def expunge(self, obj): + self.expunge_calls.append(obj) + + +def test_persist_well_excludes_monitoring_status_from_thing_kwargs( + monkeypatch, +): + captured_kwargs = {} + + class FakeThing: + def __init__(self, **kwargs): + captured_kwargs.update(kwargs) + + monkeypatch.setattr(wt, "Thing", FakeThing) + + transferer = wt.WellTransferer() + session = _FakeSession() + row = SimpleNamespace(PointID="AR0001", WellID=12, LocationId=34) + payload = { + "data": CreateWell( + name="AR0001", + monitoring_status="Not currently monitored", + ), + "well_purposes": [], + "well_casing_materials": [], + } + batch_errors = [] + + well = transferer._persist_well(session, row, payload, batch_errors) + + assert well is session.added[0] + assert "monitoring_status" not in captured_kwargs + assert captured_kwargs["thing_type"] == "water well" + assert captured_kwargs["nma_pk_welldata"] == 12 + assert captured_kwargs["nma_pk_location"] == 34 + assert batch_errors == [] + assert session.expunge_calls == [] diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index d8e1c200f..c5b5e5988 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -93,6 +93,7 @@ "is_suitable_for_datalogger", "is_open", "well_status", + "monitoring_status", ] From fa257cd0047a52aeb3f4e0fd9866eb0d6504ff04 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Fri, 27 Mar 2026 15:24:34 -0600 Subject: [PATCH 07/20] fix(water-level-import): harden real-file import cleanup Address the remaining issues found during validation against a real standalone water-level CSV. Changes: - coerce measuring point height and well depth values to float before depth-to-water validation so DB-backed Decimal values do not crash the importer - replace the nested savepoint context-manager flow with explicit savepoint handling to avoid the SQLAlchemy "nested transaction already deassociated from connection" warning - add regression coverage for measuring-point history values coming back as Decimal Impact: - real water-level CSV imports no longer fail on float-vs-Decimal depth checks - best-effort row imports complete without the savepoint warning seen in mixed-result runs - rerunning the same real file remains idempotent --- services/water_level_csv.py | 121 ++++++++++++++------------ tests/test_water_level_csv_service.py | 15 ++++ 2 files changed, 79 insertions(+), 57 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 99dd0848f..f56cd8055 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -307,6 +307,8 @@ def _resolve_measuring_point_height( well: Thing, csv_mp_height: float | None ) -> tuple[float | int | None, float | int | None, bool]: existing_mp_height = well.measuring_point_height + if existing_mp_height is not None: + existing_mp_height = float(existing_mp_height) if csv_mp_height is not None: return ( csv_mp_height, @@ -323,19 +325,19 @@ def _validate_depth_to_water_against_well( depth_to_water_ft: float | None, resolved_mp_height: float | int | None, ) -> str | None: - if ( - depth_to_water_ft is None - or resolved_mp_height is None - or well.well_depth is None - ): + well_depth = well.well_depth + if well_depth is not None: + well_depth = float(well_depth) + + if depth_to_water_ft is None or resolved_mp_height is None or well_depth is None: return None corrected_depth_to_water = depth_to_water_ft - resolved_mp_height - if corrected_depth_to_water >= well.well_depth: + if corrected_depth_to_water >= well_depth: return ( f"Row {row_index}: depth_to_water_ft minus measuring point height " f"({corrected_depth_to_water}) must be less than well depth " - f"({well.well_depth})" + f"({well_depth})" ) return None @@ -348,60 +350,65 @@ def _create_records( errors: list[str] = [] for row in rows: + savepoint = session.begin_nested() try: - with session.begin_nested(): - sample_name = _build_sample_name(row) - sample = _find_existing_imported_sample(session, row, sample_name) - - if sample is None: - field_event = FieldEvent( - thing=row.well, - event_date=row.field_event_dt, - notes=_build_field_event_notes(row), - ) - field_activity = FieldActivity( - field_event=field_event, - activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", - ) - sample = Sample(field_activity=field_activity) + sample_name = _build_sample_name(row) + sample = _find_existing_imported_sample(session, row, sample_name) + + if sample is None: + field_event = FieldEvent( + thing=row.well, + event_date=row.field_event_dt, + notes=_build_field_event_notes(row), + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes=f"Sampler: {row.sampler}", + ) + sample = Sample(field_activity=field_activity) + observation = Observation(sample=sample) + session.add(field_event) + session.add(field_activity) + session.add(sample) + session.add(observation) + else: + field_activity = sample.field_activity + field_event = field_activity.field_event + observation = _find_existing_observation(sample, parameter_id) + if observation is None: observation = Observation(sample=sample) - session.add(field_event) - session.add(field_activity) - session.add(sample) session.add(observation) - else: - field_activity = sample.field_activity - field_event = field_activity.field_event - observation = _find_existing_observation(sample, parameter_id) - if observation is None: - observation = Observation(sample=sample) - session.add(observation) - - field_event.event_date = row.field_event_dt - field_event.notes = _build_field_event_notes(row) - field_activity.notes = f"Sampler: {row.sampler}" - - _apply_sample_values(sample, row, sample_name) - _apply_observation_values(observation, row, parameter_id) - session.flush() - - created.append( - { - "well_name_point_id": row.raw["well_name_point_id"], - "field_event_id": field_event.id, - "field_activity_id": field_activity.id, - "sample_id": sample.id, - "observation_id": observation.id, - "measurement_date_time": ( - row.raw.get("water_level_date_time") - or row.raw.get("measurement_date_time") - ), - "level_status": row.level_status, - "data_quality": row.data_quality, - } - ) + + field_event.event_date = row.field_event_dt + field_event.notes = _build_field_event_notes(row) + field_activity.notes = f"Sampler: {row.sampler}" + + _apply_sample_values(sample, row, sample_name) + _apply_observation_values(observation, row, parameter_id) + session.flush() + savepoint.commit() + + created.append( + { + "well_name_point_id": row.raw["well_name_point_id"], + "field_event_id": field_event.id, + "field_activity_id": field_activity.id, + "sample_id": sample.id, + "observation_id": observation.id, + "measurement_date_time": ( + row.raw.get("water_level_date_time") + or row.raw.get("measurement_date_time") + ), + "level_status": row.level_status, + "data_quality": row.data_quality, + } + ) except Exception as exc: # pragma: no cover - exercised via DB tests + if savepoint.is_active: + savepoint.rollback() + else: + session.expire_all() errors.append(f"Row {row.row_index}: {exc}") return created, errors diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 3e10de66f..6b487adad 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -1,4 +1,5 @@ from datetime import date, datetime, timezone +from decimal import Decimal from types import SimpleNamespace from db import FieldActivity, FieldEvent, Observation, Sample, Thing @@ -58,6 +59,20 @@ def test_resolve_measuring_point_height_falls_back_to_well_history(): assert differs is False +def test_resolve_measuring_point_height_coerces_decimal_history_value(): + well = _build_well(measuring_point_height=Decimal("3.5")) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height == 3.5 + assert existing_mp_height == 3.5 + assert differs is False + + def test_resolve_measuring_point_height_allows_missing_values(): well = _build_well() From 44102db035fe9c02672a04feae38dcecd124b18c Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Fri, 27 Mar 2026 15:58:04 -0600 Subject: [PATCH 08/20] test(bdd): align water-level CSV feature coverage with current importer behavior - update standalone water-level BDD scenarios for partial-success imports - switch step data to current canonical headers and groundwater vocabularies - add coverage for legacy alias headers and measuring_person validation - fix shared CSV step compatibility by populating file_content in test context --- tests/features/steps/water-levels-csv.py | 147 ++++++++++++++++++----- tests/features/water-level-csv.feature | 93 +++++++------- 2 files changed, 161 insertions(+), 79 deletions(-) diff --git a/tests/features/steps/water-levels-csv.py b/tests/features/steps/water-levels-csv.py index 4a8d6b57c..052571631 100644 --- a/tests/features/steps/water-levels-csv.py +++ b/tests/features/steps/water-levels-csv.py @@ -23,27 +23,50 @@ from db import Observation from db.engine import session_ctx from services.water_level_csv import bulk_upload_water_levels +from tests.features.environment import ( + add_location, + add_measuring_point_history, + add_well, +) REQUIRED_FIELDS: List[str] = [ "field_staff", "well_name_point_id", "field_event_date_time", - "measurement_date_time", - "sampler", + "water_level_date_time", + "measuring_person", "sample_method", +] +OPTIONAL_FIELDS = [ + "field_staff_2", + "field_staff_3", "mp_height", "level_status", "depth_to_water_ft", "data_quality", + "water_level_notes", +] +VALID_SAMPLE_METHODS = [ + "Electric tape measurement (E-probe)", + "Steel-tape measurement", +] +VALID_LEVEL_STATUSES = ["Water level not affected", "Site was dry"] +VALID_DATA_QUALITIES = [ + "Water level accurate to within two hundreths of a foot", + "None", ] -OPTIONAL_FIELDS = ["water_level_notes"] -VALID_SAMPLERS = ["Groundwater Team", "Consultant"] -VALID_SAMPLE_METHODS = ["electric tape", "steel tape"] -VALID_LEVEL_STATUSES = ["stable", "rising", "falling"] -VALID_DATA_QUALITIES = ["approved", "provisional"] def _available_well_names(context: Context) -> list[str]: + if "wells" not in context.objects or not context.objects["wells"]: + with session_ctx() as session: + loc_1 = add_location(context, session) + loc_2 = add_location(context, session) + well_1 = add_well(context, session, loc_1, name_num=101) + well_2 = add_well(context, session, loc_2, name_num=102) + add_measuring_point_history(context, session, well_1) + add_measuring_point_history(context, session, well_2) + if not hasattr(context, "well_names"): context.well_names = [well.name for well in context.objects["wells"]] return context.well_names @@ -53,16 +76,19 @@ def _base_row(context: Context, index: int) -> Dict[str, str]: well_names = _available_well_names(context) well_name = well_names[(index - 1) % len(well_names)] measurement_day = 14 + index + field_staff = "A Lopez" if index == 1 else "B Chen" return { - "field_staff": "A Lopez" if index == 1 else "B Chen", + "field_staff": field_staff, + "field_staff_2": "", + "field_staff_3": "", "well_name_point_id": well_name, - "field_event_date_time": f"2025-02-{measurement_day:02d}T08:00:00-07:00", - "measurement_date_time": f"2025-02-{measurement_day:02d}T10:30:00-07:00", - "sampler": VALID_SAMPLERS[(index - 1) % len(VALID_SAMPLERS)], + "field_event_date_time": f"2025-02-{measurement_day:02d}T08:00:00", + "water_level_date_time": f"2025-02-{measurement_day:02d}T10:30:00", + "measuring_person": field_staff, "sample_method": VALID_SAMPLE_METHODS[(index - 1) % len(VALID_SAMPLE_METHODS)], "mp_height": "1.5" if index == 1 else "1.8", "level_status": VALID_LEVEL_STATUSES[(index - 1) % len(VALID_LEVEL_STATUSES)], - "depth_to_water_ft": "45.2" if index == 1 else "47.0", + "depth_to_water_ft": "7.0" if index == 1 else "", "data_quality": VALID_DATA_QUALITIES[(index - 1) % len(VALID_DATA_QUALITIES)], "water_level_notes": "Initial measurement" if index == 1 else "Follow-up", } @@ -89,6 +115,7 @@ def _write_csv_to_context(context: Context) -> None: temp_file.close() context.csv_file = str(Path(temp_file.name)) context.csv_raw_text = csv_text + context.file_content = csv_text def _set_rows( @@ -146,21 +173,53 @@ def step_given_each_well_name_point_id_value_matches_an_existing_well(context: C @given( - '"measurement_date_time" values are valid ISO 8601 timestamps with timezone offsets (e.g. "2025-02-15T10:30:00-08:00")' + '"field_event_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T08:00:00")' ) -def step_step_step(context: Context): +def step_given_field_event_date_time_values_are_valid_naive_iso_datetimes( + context: Context, +): for row in context.csv_rows: - assert row["measurement_date_time"].startswith("2025-02") - assert "T" in row["measurement_date_time"] + assert row["field_event_date_time"].startswith("2025-02") + assert "T" in row["field_event_date_time"] + assert "+" not in row["field_event_date_time"] + assert row["field_event_date_time"].count(":") == 2 -# @given("the water level CSV includes optional fields when available:") -# def step_impl(context: Context): -# field_name = context.table.headings[0] -# optional_fields = [row[field_name].strip() for row in context.table] -# headers = set(context.csv_headers) -# missing = [field for field in optional_fields if field not in headers] -# assert not missing, f"Missing optional headers: {missing}" +@given( + '"water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00")' +) +def step_given_water_level_date_time_values_are_valid_naive_iso_datetimes( + context: Context, +): + for row in context.csv_rows: + assert row["water_level_date_time"].startswith("2025-02") + assert "T" in row["water_level_date_time"] + assert "+" not in row["water_level_date_time"] + assert row["water_level_date_time"].count(":") == 2 + + +@given( + 'when provided, "sample_method", "level_status", and "data_quality" values are valid lexicon values' +) +def step_given_lexicon_values_are_valid(context: Context): + for row in context.csv_rows: + if row.get("sample_method"): + assert row["sample_method"] in VALID_SAMPLE_METHODS + if row.get("level_status"): + assert row["level_status"] in VALID_LEVEL_STATUSES + if row.get("data_quality"): + assert row["data_quality"] in VALID_DATA_QUALITIES + + +@given("the water level CSV includes optional fields when available:") +def step_given_the_water_level_csv_includes_optional_fields_when_available( + context: Context, +): + field_name = context.table.headings[0] + optional_fields = [row[field_name].strip() for row in context.table] + headers = set(context.csv_headers) + missing = [field for field in optional_fields if field not in headers] + assert not missing, f"Missing optional headers: {missing}" @when("I run the CLI command:") @@ -217,10 +276,26 @@ def step_then_stderr_should_be_empty(context: Context): # ============================================================================ # Scenario: Upload succeeds when required columns are present but reordered # ============================================================================ +@given( + "my water level CSV file uses legacy alias headers for measurement date, sampler, and measuring point height" +) +def step_given_my_water_level_csv_file_uses_legacy_alias_headers(context: Context): + rows = _build_valid_rows(context) + alias_rows = [] + for row in rows: + alias_row = dict(row) + alias_row["measurement_date_time"] = alias_row.pop("water_level_date_time") + alias_row["sampler"] = alias_row.pop("measuring_person") + alias_row["mp_height_ft"] = alias_row.pop("mp_height") + alias_rows.append(alias_row) + headers = list(alias_rows[0].keys()) + _set_rows(context, alias_rows, headers=headers) + + @given( "my water level CSV file contains all required headers but in a different column order" ) -def step_step_step_2(context: Context): +def step_given_my_water_level_csv_file_contains_reordered_headers(context: Context): rows = _build_valid_rows(context) headers = list(reversed(list(rows[0].keys()))) _set_rows(context, rows, headers=headers) @@ -299,13 +374,13 @@ def step_then_stderr_should_contain_a_validation_error_for_the_required_field( # Scenario: Upload fails due to invalid date formats # ============================================================================ @given( - 'my CSV file contains invalid ISO 8601 date values in the "measurement_date_time" field' + 'my CSV file contains invalid ISO 8601 date values in the "water_level_date_time" field' ) def step_step_step_6(context: Context): rows = _build_valid_rows(context, count=1) - rows[0]["measurement_date_time"] = "02/15/2025 10:30" + rows[0]["water_level_date_time"] = "02/15/2025 10:30" _set_rows(context, rows) - context.invalid_fields = ["measurement_date_time"] + context.invalid_fields = ["water_level_date_time"] @then("stderr should contain validation errors identifying the invalid field and row") @@ -323,7 +398,7 @@ def step_then_stderr_should_contain_validation_errors_identifying_the_invalid_fi # Scenario: Upload fails due to invalid numeric fields # ============================================================================ @given( - 'my CSV file contains values that cannot be parsed as numeric in numeric-required fields such as "mp_height" or "depth_to_water_ft"' + 'my CSV file contains values that cannot be parsed as numeric in numeric fields such as "mp_height" or "depth_to_water_ft"' ) def step_step_step_7(context: Context): rows = _build_valid_rows(context, count=1) @@ -337,21 +412,31 @@ def step_step_step_7(context: Context): # Scenario: Upload fails due to invalid lexicon values # ============================================================================ @given( - 'my CSV file contains invalid lexicon values for "sampler", "sample_method", "level_status", or "data_quality"' + 'my CSV file contains invalid lexicon values for "sample_method", "level_status", or "data_quality"' ) def step_step_step_8(context: Context): rows = _build_valid_rows(context, count=1) - rows[0]["sampler"] = "Unknown Team" rows[0]["sample_method"] = "mystery" rows[0]["level_status"] = "supercharged" rows[0]["data_quality"] = "bad" _set_rows(context, rows) context.invalid_fields = [ - "sampler", "sample_method", "level_status", "data_quality", ] +@given( + "my water level CSV file contains a row where measuring_person is not one of the supplied field staff" +) +def step_given_measuring_person_is_not_one_of_the_supplied_field_staff( + context: Context, +): + rows = _build_valid_rows(context, count=1) + rows[0]["measuring_person"] = "Unexpected Person" + _set_rows(context, rows) + context.invalid_fields = ["measuring_person"] + + # ============= EOF ============================================= diff --git a/tests/features/water-level-csv.feature b/tests/features/water-level-csv.feature index 701718c33..1c3b7511a 100644 --- a/tests/features/water-level-csv.feature +++ b/tests/features/water-level-csv.feature @@ -1,21 +1,10 @@ -# features/cli/bulk_upload_water_levels.feature - @cli @backend @BDMS-TBD Feature: Bulk upload water level entries from CSV via CLI As a hydrogeologist or data specialist I want to upload a CSV file containing water level entry data for multiple wells using a CLI command - So that water level records can be created efficiently and accurately in the system - -# Background: -# Given the CLI binary "bdms" is installed and available on the PATH -# And I have a valid CLI configuration for the target environment -# And valid lexicon values exist for: -# | lexicon category | -# | sample_method | -# | level_status | -# | data_quality | + So that groundwater-level records can be created efficiently and accurately in the system @positive @happy_path @BDMS-TBD @cleanup_samples Scenario: Uploading a valid water level entry CSV containing required and optional fields @@ -27,21 +16,21 @@ Feature: Bulk upload water level entries from CSV via CLI | field_staff | | well_name_point_id | | field_event_date_time | + | water_level_date_time | | measuring_person | + | sample_method | And each "well_name_point_id" value matches an existing well And "field_event_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T08:00:00") And "water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00") And when provided, "sample_method", "level_status", and "data_quality" values are valid lexicon values - And the CSV includes optional fields when available: + And the water level CSV includes optional fields when available: | optional field name | | field_staff_2 | | field_staff_3 | - | water_level_date_time | - | sample_method | - | mp_height | - | level_status | - | depth_to_water_ft | - | data_quality | + | mp_height | + | level_status | + | depth_to_water_ft | + | data_quality | | water_level_notes | When I run the CLI command: """ @@ -57,57 +46,52 @@ Feature: Bulk upload water level entries from CSV via CLI And stdout includes an array of created water level entry objects And stderr should be empty - @positive @validation @column_order @BDMS-TBD @cleanup_samples - Scenario: Upload succeeds when required columns are present but in a different order - Given my water level CSV file contains all required headers but in a different column order - And the CSV includes required fields: - | required field name | - | field_staff | - | well_name_point_id | - | field_event_date_time | - | measuring_person | + @positive @validation @aliases @BDMS-TBD @cleanup_samples + Scenario: Upload succeeds when legacy alias headers are used + Given my water level CSV file uses legacy alias headers for measurement date, sampler, and measuring point height When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ - # assumes users are entering datetimes as Mountain Time because well location is restricted to New Mexico - Then all datetime objects are assigned the correct Mountain Time timezone offset based on the date value. - And the command exits with code 0 + Then the command exits with code 0 + And stdout should be valid JSON And all water level entries are imported And stderr should be empty @positive @validation @extra_columns @BDMS-TBD @cleanup_samples - Scenario: Upload succeeds when CSV contains extra, unknown columns + Scenario: Upload succeeds when CSV contains extra columns Given my water level CSV file contains extra columns but is otherwise valid When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with code 0 + And stdout should be valid JSON And all water level entries are imported And stderr should be empty - ########################################################################### - # NEGATIVE VALIDATION SCENARIOS - ########################################################################### - - @negative @validation @BDMS-TBD - Scenario: No water level entries are imported when any row fails validation + @positive @validation @partial_success @BDMS-TBD @cleanup_samples + Scenario: Valid rows are imported when another row fails validation Given my water level CSV contains 3 rows with 2 valid rows and 1 row missing the required "well_name_point_id" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ - Then the command exits with a non-zero exit code + Then the command exits with code 0 + And stdout should be valid JSON + And stdout includes a summary containing: + | summary_field | value | + | total_rows_processed | 3 | + | total_rows_imported | 2 | + | validation_errors_or_warnings | 1 | And stderr should contain a validation error for the row missing "well_name_point_id" - And no water level entries are imported @negative @validation @required_fields @BDMS-TBD Scenario Outline: Upload fails when a required field is missing Given my water level CSV file contains a row missing the required "" field When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain a validation error for the "" field @@ -118,14 +102,16 @@ Feature: Bulk upload water level entries from CSV via CLI | field_staff | | well_name_point_id | | field_event_date_time | + | water_level_date_time | | measuring_person | + | sample_method | @negative @validation @date_formats @BDMS-TBD Scenario: Upload fails due to invalid date formats Given my CSV file contains invalid ISO 8601 date values in the "water_level_date_time" field When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row @@ -133,10 +119,10 @@ Feature: Bulk upload water level entries from CSV via CLI @negative @validation @numeric_fields @BDMS-TBD Scenario: Upload fails due to invalid numeric fields - Given my CSV file contains values that cannot be parsed as numeric in numeric-required fields such as "mp_height" or "depth_to_water_ft" + Given my CSV file contains values that cannot be parsed as numeric in numeric fields such as "mp_height" or "depth_to_water_ft" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row @@ -147,7 +133,18 @@ Feature: Bulk upload water level entries from CSV via CLI Given my CSV file contains invalid lexicon values for "sample_method", "level_status", or "data_quality" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json + """ + Then the command exits with a non-zero exit code + And stderr should contain validation errors identifying the invalid field and row + And no water level entries are imported + + @negative @validation @measuring_person @BDMS-TBD + Scenario: Upload fails when measuring_person does not match supplied field staff + Given my water level CSV file contains a row where measuring_person is not one of the supplied field staff + When I run the CLI command: + """ + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row From 0785cac7372e08a9693a42ce1a34e5add28fb5d2 Mon Sep 17 00:00:00 2001 From: ksmuczynski <20096455+ksmuczynski@users.noreply.github.com> Date: Sat, 28 Mar 2026 01:07:05 +0000 Subject: [PATCH 09/20] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 656dd91eb..a1d4515f6 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -696,12 +696,10 @@ def _write_csv(path: Path, *, well_name: str, notes: str): "Water level accurate to within two hundreths of a foot," f"{notes}" ) - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ {header} {row} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" From 452fe0c09b01fbc99a4d1b4e000bcf275f98493f Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 14:42:23 -0600 Subject: [PATCH 10/20] test(water-level-import): ensure unrelated observations are preserved during bulk upload - Add test to validate that bulk water-level uploads do not remove existing unrelated observations - Update testing utility with `get_parameter_id` for parameter resolution - Ensure edge cases where new observations coexist with pre-existing observations function as expected --- services/water_level_csv.py | 2 +- tests/test_water_level_csv_service.py | 119 ++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index f56cd8055..6eaa7ae46 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -445,7 +445,7 @@ def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation for observation in sample.observations: if observation.parameter_id == parameter_id: return observation - return sample.observations[0] if sample.observations else None + return None def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None: diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 6b487adad..3f6084c2d 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -5,6 +5,7 @@ from db import FieldActivity, FieldEvent, Observation, Sample, Thing from db.measuring_point_history import MeasuringPointHistory from db.engine import session_ctx +from tests import get_parameter_id from services.water_level_csv import ( _build_sample_name, _resolve_measuring_point_height, @@ -200,6 +201,124 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert observations[0].measuring_point_height == 1.5 +def test_bulk_upload_water_levels_preserves_unrelated_existing_observations( + water_well_thing, +): + groundwater_parameter_id = get_parameter_id("groundwater level", "Field Parameter") + ph_parameter_id = get_parameter_id("pH", "Field Parameter") + + with session_ctx() as session: + well = session.merge(water_well_thing) + field_event = FieldEvent( + thing=well, + event_date=datetime(2025, 2, 15, 15, 0, tzinfo=timezone.utc), + notes="Existing field event", + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes="Sampler: Original Sampler", + ) + sample = Sample( + field_activity=field_activity, + sample_date=datetime(2025, 2, 15, 17, 30, tzinfo=timezone.utc), + sample_name="Test Well-WL-202502151730", + sample_matrix="groundwater", + sample_method="Electric tape measurement (E-probe)", + qc_type="Normal", + notes="Existing sample", + ) + unrelated_observation = Observation( + sample=sample, + observation_datetime=datetime(2025, 2, 15, 17, 30, tzinfo=timezone.utc), + parameter_id=ph_parameter_id, + value=7.2, + unit="dimensionless", + notes="Keep me as pH", + ) + session.add_all([field_event, field_activity, sample, unrelated_observation]) + session.commit() + unrelated_observation_id = unrelated_observation.id + + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Imported groundwater level", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + Sample.sample_name == "Test Well-WL-202502151730", + ) + ).one() + observations = session.scalars( + select(Observation) + .where(Observation.sample_id == sample.id) + .order_by(Observation.id.asc()) + ).all() + + assert len(observations) == 2 + assert observations[0].id == unrelated_observation_id + assert observations[0].parameter_id == ph_parameter_id + assert observations[0].value == 7.2 + assert observations[0].unit == "dimensionless" + assert observations[0].notes == "Keep me as pH" + + groundwater_observations = [ + observation + for observation in observations + if observation.parameter_id == groundwater_parameter_id + ] + assert len(groundwater_observations) == 1 + assert ( + groundwater_observations[0].id + == result.payload["water_levels"][0]["observation_id"] + ) + assert groundwater_observations[0].value == 7.0 + assert groundwater_observations[0].unit == "ft" + assert groundwater_observations[0].notes == "Imported groundwater level" + + def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( water_well_thing, ): From 9741d41c43feac59e045b0aaa0e2e847461cd97a Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 14:59:05 -0600 Subject: [PATCH 11/20] fix(water-level-import): skip persistence when no valid rows exist --- services/water_level_csv.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 6eaa7ae46..20b69bf73 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -100,18 +100,19 @@ def bulk_upload_water_levels( valid_rows, row_errors = _validate_rows(session, csv_rows) validation_errors.extend(row_errors) - try: - parameter_id = _get_groundwater_level_parameter_id(session) - created_rows, persistence_errors = _create_records( - session, - parameter_id, - valid_rows, - ) - validation_errors.extend(persistence_errors) - session.commit() - except Exception as exc: # pragma: no cover - safety fallback - session.rollback() - validation_errors.append(str(exc)) + if valid_rows: + try: + parameter_id = _get_groundwater_level_parameter_id(session) + created_rows, persistence_errors = _create_records( + session, + parameter_id, + valid_rows, + ) + validation_errors.extend(persistence_errors) + session.commit() + except Exception as exc: # pragma: no cover - safety fallback + session.rollback() + validation_errors.append(str(exc)) summary = { "total_rows_processed": len(csv_rows), From e0d7e453187fc788f9a6c304cbb1e53ea4b1a807 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 15:07:34 -0600 Subject: [PATCH 12/20] fix(water-level-import): handle savepoint initialization failure gracefully - Move `begin_nested` savepoint call inside the try block for proper error handling. - Ensure session expiration when savepoint creation fails. - Add test coverage for savepoint creation failure scenarios. --- services/water_level_csv.py | 5 +++-- tests/test_water_level_csv_service.py | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 20b69bf73..7ab5ca7fc 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -351,8 +351,9 @@ def _create_records( errors: list[str] = [] for row in rows: - savepoint = session.begin_nested() + savepoint = None try: + savepoint = session.begin_nested() sample_name = _build_sample_name(row) sample = _find_existing_imported_sample(session, row, sample_name) @@ -406,7 +407,7 @@ def _create_records( } ) except Exception as exc: # pragma: no cover - exercised via DB tests - if savepoint.is_active: + if savepoint is not None and savepoint.is_active: savepoint.rollback() else: session.expire_all() diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 3f6084c2d..d0f8d6e15 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -8,6 +8,7 @@ from tests import get_parameter_id from services.water_level_csv import ( _build_sample_name, + _create_records, _resolve_measuring_point_height, _validate_depth_to_water_against_well, bulk_upload_water_levels, @@ -116,6 +117,30 @@ def test_build_sample_name_uses_deterministic_well_inventory_style_format(): assert _build_sample_name(row) == "AR0001-WL-202502151030" +def test_create_records_reports_savepoint_creation_failure_as_row_error(): + class BrokenSession: + def __init__(self): + self.expire_all_called = False + + def begin_nested(self): + raise RuntimeError("savepoint failed") + + def expire_all(self): + self.expire_all_called = True + + session = BrokenSession() + + created, errors = _create_records( + session, + parameter_id=1, + rows=[SimpleNamespace(row_index=7)], + ) + + assert created == [] + assert errors == ["Row 7: savepoint failed"] + assert session.expire_all_called is True + + def test_bulk_upload_water_levels_is_idempotent(water_well_thing): csv_content = "\n".join( [ From 8490550ec0f850c6743c02e4390148add51848c0 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 15:36:01 -0600 Subject: [PATCH 13/20] fix(well-transfer): defer WellTransferer external I/O until needed Avoid loading GCS-backed elevation cache and CSV-backed measuring point data during `WellTransferer` construction. This prevents unit tests that only instantiate the transferer from requiring Google Application Default Credentials, while preserving existing behavior for real transfer paths that actually need those dependencies. --- transfers/well_transfer.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index c5b5e5988..458a2dfcd 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -138,12 +138,24 @@ class WellTransferer(Transferer): def __init__(self, *args, **kw): super().__init__(*args, **kw) - self._cached_elevations = get_cached_elevations() + # Delay external I/O so unit tests can instantiate the transferer + # without requiring GCS credentials or source CSV files. + self._cached_elevations = None self._added_locations = {} self._aquifers = None - self._measuring_point_estimator = MeasuringPointEstimator() + self._measuring_point_estimator = None self._row_by_pointid: dict[str, pd.Series] = {} + def _get_cached_elevations(self) -> dict: + if self._cached_elevations is None: + self._cached_elevations = get_cached_elevations() + return self._cached_elevations + + def _get_measuring_point_estimator(self) -> MeasuringPointEstimator: + if self._measuring_point_estimator is None: + self._measuring_point_estimator = MeasuringPointEstimator() + return self._measuring_point_estimator + def transfer_parallel(self, num_workers: int = None) -> None: """ Transfer wells using parallel processing for improved performance. @@ -300,7 +312,7 @@ def process_batch(batch_idx: int, batch_df: pd.DataFrame) -> dict: logger.info(f"Parallel transfer complete: {n} wells, {len(all_errors)} errors") # Dump cached elevations (minimal after-processing) - dump_cached_elevations(self._cached_elevations) + dump_cached_elevations(self._get_cached_elevations()) def _get_dfs(self): """Load and clean WellData/Location dataframes.""" @@ -658,7 +670,7 @@ def _persist_location(self, session: Session, row, batch_errors: list): """Create a Location from the legacy row.""" try: location, elevation_method, location_notes = make_location( - row, self._cached_elevations + row, self._get_cached_elevations() ) session.add(location) return location, elevation_method, location_notes @@ -745,7 +757,11 @@ def _add_histories(self, session: Session, row, well: Thing) -> None: ) ) else: - mphs = self._measuring_point_estimator.estimate_measuring_point_height(row) + mphs = ( + self._get_measuring_point_estimator().estimate_measuring_point_height( + row + ) + ) added_measuring_point = False for mph, mph_desc, start_date, end_date in zip(*mphs): session.add( @@ -1007,7 +1023,7 @@ def _add_aquifers_parallel(self, session, row, well, local_aquifers, aquifers_lo ) if not existing: local_aquifers.append(aquifer) - except Exception as e: + except Exception: # Race condition - another thread created it session.rollback() aquifer = ( From 81a016c08ef866336068457d68dfa3df454ec4c7 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 21:28:54 -0600 Subject: [PATCH 14/20] fix(well-transfer): improve aquifer persistence with nested savepoints and better error handling - Use a nested transaction around parallel aquifer creation so a duplicate-key race only rolls back the aquifer insert attempt instead of the full in-flight well/location work. - Catch IntegrityError for the expected unique-constraint race, re-query the existing aquifer, and let unexpected flush failures surface normally for debugging and error handling. - Add unit coverage for the aquifer race path and update contact transfer tests to match the current helper contract. --- tests/test_well_transfer.py | 142 ++++++++++++++++++ .../test_contact_with_multiple_wells.py | 32 +++- transfers/well_transfer.py | 39 +++-- 3 files changed, 192 insertions(+), 21 deletions(-) diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py index bae3133a6..07ba61bcf 100644 --- a/tests/test_well_transfer.py +++ b/tests/test_well_transfer.py @@ -1,5 +1,9 @@ +import threading from types import SimpleNamespace +import pytest +from sqlalchemy.exc import IntegrityError + from schemas.thing import CreateWell from transfers import well_transfer as wt @@ -16,6 +20,53 @@ def expunge(self, obj): self.expunge_calls.append(obj) +class _FakeQuery: + def __init__(self, session): + self.session = session + + def filter(self, *_args, **_kwargs): + return self + + def first(self): + return self.session.query_results.pop(0) + + +class _FakeSavepoint: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class _FakeAquiferSession: + def __init__(self, query_results, flush_exc=None): + self.added = [] + self.begin_nested_calls = 0 + self.rollback_calls = 0 + self.query_results = list(query_results) + self.flush_exc = flush_exc + + def add(self, obj): + self.added.append(obj) + + def begin_nested(self): + self.begin_nested_calls += 1 + return _FakeSavepoint() + + def flush(self): + if self.flush_exc is not None: + exc = self.flush_exc + self.flush_exc = None + raise exc + + def query(self, _model): + return _FakeQuery(self) + + def rollback(self): + self.rollback_calls += 1 + + def test_persist_well_excludes_monitoring_status_from_thing_kwargs( monkeypatch, ): @@ -49,3 +100,94 @@ def __init__(self, **kwargs): assert captured_kwargs["nma_pk_location"] == 34 assert batch_errors == [] assert session.expunge_calls == [] + + +def test_add_aquifers_parallel_recovers_from_integrity_error(monkeypatch): + class FakeAquiferSystem: + name = "name" + + def __init__(self, name, primary_aquifer_type, geographic_scale): + self.name = name + self.primary_aquifer_type = primary_aquifer_type + self.geographic_scale = geographic_scale + + class FakeThingAquiferAssociation: + def __init__(self, thing, aquifer_system): + self.thing = thing + self.aquifer_system = aquifer_system + + class FakeAquiferType: + def __init__(self, thing_aquifer_association, aquifer_type): + self.thing_aquifer_association = thing_aquifer_association + self.aquifer_type = aquifer_type + + def fake_map_value(value): + if value.startswith("LU_AquiferClass:"): + return "Test Aquifer" + if value.startswith("LU_AquiferType:"): + return "Confined" + raise KeyError(value) + + existing_aquifer = SimpleNamespace(name="Test Aquifer") + session = _FakeAquiferSession( + query_results=[None, existing_aquifer], + flush_exc=IntegrityError("insert", {}, Exception("duplicate key")), + ) + transferer = wt.WellTransferer() + row = SimpleNamespace(PointID="AR0001", AqClass="AQ", AquiferType="A") + well = SimpleNamespace(name="AR0001") + local_aquifers = [] + + monkeypatch.setattr(wt, "AquiferSystem", FakeAquiferSystem) + monkeypatch.setattr(wt, "ThingAquiferAssociation", FakeThingAquiferAssociation) + monkeypatch.setattr(wt, "AquiferType", FakeAquiferType) + monkeypatch.setattr(wt, "extract_aquifer_type_codes", lambda _value: ["A"]) + monkeypatch.setattr(wt.lexicon_mapper, "map_value", fake_map_value) + + transferer._add_aquifers_parallel( + session, row, well, local_aquifers, threading.Lock() + ) + + associations = [ + obj for obj in session.added if isinstance(obj, FakeThingAquiferAssociation) + ] + + assert session.begin_nested_calls == 1 + assert session.rollback_calls == 0 + assert associations[0].aquifer_system is existing_aquifer + assert local_aquifers == [existing_aquifer] + + +def test_add_aquifers_parallel_reraises_unexpected_flush_errors(monkeypatch): + class FakeAquiferSystem: + name = "name" + + def __init__(self, name, primary_aquifer_type, geographic_scale): + self.name = name + self.primary_aquifer_type = primary_aquifer_type + self.geographic_scale = geographic_scale + + def fake_map_value(value): + if value.startswith("LU_AquiferClass:"): + return "Test Aquifer" + if value.startswith("LU_AquiferType:"): + return "Confined" + raise KeyError(value) + + session = _FakeAquiferSession( + query_results=[None], + flush_exc=RuntimeError("database unavailable"), + ) + transferer = wt.WellTransferer() + row = SimpleNamespace(PointID="AR0001", AqClass="AQ", AquiferType="A") + well = SimpleNamespace(name="AR0001") + + monkeypatch.setattr(wt, "AquiferSystem", FakeAquiferSystem) + monkeypatch.setattr(wt, "extract_aquifer_type_codes", lambda _value: ["A"]) + monkeypatch.setattr(wt.lexicon_mapper, "map_value", fake_map_value) + + with pytest.raises(RuntimeError, match="database unavailable"): + transferer._add_aquifers_parallel(session, row, well, [], threading.Lock()) + + assert session.begin_nested_calls == 1 + assert session.rollback_calls == 0 diff --git a/tests/transfers/test_contact_with_multiple_wells.py b/tests/transfers/test_contact_with_multiple_wells.py index 40b4b26ea..60af21b50 100644 --- a/tests/transfers/test_contact_with_multiple_wells.py +++ b/tests/transfers/test_contact_with_multiple_wells.py @@ -93,6 +93,8 @@ def test_owner_comment_absent_skips_notes(): def test_ownerkey_fallback_name_when_name_and_org_missing(water_well_thing): with session_ctx() as sess: thing = sess.get(Thing, water_well_thing.id) + contact_by_owner_type = {} + contact_by_name_org = {} row = SimpleNamespace( FirstName=None, LastName=None, @@ -118,12 +120,18 @@ def test_ownerkey_fallback_name_when_name_and_org_missing(water_well_thing): # Should not raise "Either name or organization must be provided." contact = _add_first_contact( - sess, row=row, thing=thing, organization=None, added=[] + sess, + row=row, + thing=thing, + organization=None, + added=set(), + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) sess.flush() assert contact is not None - assert contact.name == "Fallback OwnerKey Name" + assert contact.name == "Fallback OwnerKey Name-primary" assert contact.organization is None @@ -131,6 +139,8 @@ def test_ownerkey_dedupes_when_fallback_name_differs(water_well_thing): owner_key = f"OwnerKey-{uuid4()}" with session_ctx() as sess: first_thing = sess.get(Thing, water_well_thing.id) + contact_by_owner_type = {} + contact_by_name_org = {} second_thing = Thing( name=f"Second Well {uuid4()}", thing_type="water well", @@ -184,15 +194,27 @@ def test_ownerkey_dedupes_when_fallback_name_differs(water_well_thing): PhysicalZipCode=None, ) - added = [] + added = set() first_contact = _add_first_contact( - sess, row=complete_row, thing=first_thing, organization=None, added=added + sess, + row=complete_row, + thing=first_thing, + organization=None, + added=added, + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) assert first_contact is not None assert first_contact.name == "Casey Owner" second_contact = _add_first_contact( - sess, row=fallback_row, thing=second_thing, organization=None, added=added + sess, + row=fallback_row, + thing=second_thing, + organization=None, + added=added, + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) sess.flush() diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 458a2dfcd..5068acc60 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -25,7 +25,7 @@ import pandas as pd from pandas import isna, notna from pydantic import ValidationError -from sqlalchemy.exc import DatabaseError +from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.orm import Session from core.enums import ( @@ -902,12 +902,9 @@ def _step_parallel_complete( # Aquifers if notna(row.AquiferType): - try: - self._add_aquifers_parallel( - session, row, well, local_aquifers, aquifers_lock - ) - except Exception as e: - logger.warning(f"Error adding aquifer for {well.name}: {e}") + self._add_aquifers_parallel( + session, row, well, local_aquifers, aquifers_lock + ) # Formation zone formation_code = row.FormationZone if hasattr(row, "FormationZone") else None @@ -1006,13 +1003,14 @@ def _add_aquifers_parallel(self, session, row, well, local_aquifers, aquifers_lo if not aquifer: try: - aquifer = AquiferSystem( - name=aquifer_name, - primary_aquifer_type=primary_type, - geographic_scale=None, - ) - session.add(aquifer) - session.flush() + with session.begin_nested(): + aquifer = AquiferSystem( + name=aquifer_name, + primary_aquifer_type=primary_type, + geographic_scale=None, + ) + session.add(aquifer) + session.flush() logger.info(f"Created aquifer: {aquifer_name}") # Update local cache under lock @@ -1023,14 +1021,23 @@ def _add_aquifers_parallel(self, session, row, well, local_aquifers, aquifers_lo ) if not existing: local_aquifers.append(aquifer) - except Exception: + except IntegrityError: # Race condition - another thread created it - session.rollback() aquifer = ( session.query(AquiferSystem) .filter(AquiferSystem.name == aquifer_name) .first() ) + if aquifer: + with aquifers_lock: + existing = next( + (a for a in local_aquifers if a.name == aquifer_name), + None, + ) + if not existing: + local_aquifers.append(aquifer) + else: + raise if aquifer: aquifer_assoc = ThingAquiferAssociation(thing=well, aquifer_system=aquifer) From 623206ed4729789de13a197d988da8e0c17ac029 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 21:41:06 -0600 Subject: [PATCH 15/20] fix(water-level-import): refine handling of alias and canonical headers in CSV processing --- services/water_level_csv.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 7ab5ca7fc..6b2ba49fe 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -200,8 +200,9 @@ def _read_csv( continue key = HEADER_ALIASES.get(stripped_key, stripped_key) value = v.strip() if isinstance(v, str) else v or "" - # If both alias and canonical header are present, - # preserve the first non-empty value. + # If both alias and canonical headers are present, keep the later + # non-empty value in CSV column order. An empty later value does not + # overwrite an earlier non-empty value. if key in normalized_row and normalized_row[key] and not value: continue normalized_row[key] = value From fbdc18b4044ea23bba5926a8fbb8244e55f6b1e1 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 21:52:46 -0600 Subject: [PATCH 16/20] fix(well-transfer): preload shared elevation cache before parallel workers Load cached elevations once before starting parallel well transfer workers. This avoids multiple workers loading the same cache at the same time and makes sure they all use the same shared in-memory data during the transfer. --- tests/test_well_transfer.py | 60 +++++++++++++++++++++++++++++++++++++ transfers/well_transfer.py | 4 +++ 2 files changed, 64 insertions(+) diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py index 07ba61bcf..b5d2067d9 100644 --- a/tests/test_well_transfer.py +++ b/tests/test_well_transfer.py @@ -1,6 +1,8 @@ import threading +from contextlib import contextmanager from types import SimpleNamespace +import pandas as pd import pytest from sqlalchemy.exc import IntegrityError @@ -191,3 +193,61 @@ def fake_map_value(value): assert session.begin_nested_calls == 1 assert session.rollback_calls == 0 + + +def test_transfer_parallel_preloads_cached_elevations_before_worker_submission( + monkeypatch, +): + class FakePreloadSession: + def query(self, _model): + return self + + def all(self): + return [] + + def expunge_all(self): + pass + + class FakeFuture: + def result(self): + return {"errors": []} + + class FakeExecutor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def submit(self, fn, idx, batch): + assert transferer._cached_elevations == {"source": "preloaded"} + return FakeFuture() + + @contextmanager + def fake_session_ctx(): + yield FakePreloadSession() + + load_calls = [] + dumped = [] + + def fake_get_cached_elevations(): + load_calls.append("load") + return {"source": "preloaded"} + + def fake_dump_cached_elevations(lut): + dumped.append(lut) + + transferer = wt.WellTransferer() + df = pd.DataFrame([{"PointID": "AR0001"}]) + + monkeypatch.setattr(wt, "session_ctx", fake_session_ctx) + monkeypatch.setattr(wt, "get_cached_elevations", fake_get_cached_elevations) + monkeypatch.setattr(wt, "dump_cached_elevations", fake_dump_cached_elevations) + monkeypatch.setattr(wt, "ThreadPoolExecutor", lambda max_workers: FakeExecutor()) + monkeypatch.setattr(wt, "as_completed", lambda futures: list(futures)) + monkeypatch.setattr(transferer, "_get_dfs", lambda: (df, df.copy())) + + transferer.transfer_parallel(num_workers=2) + + assert load_calls == ["load"] + assert dumped == [{"source": "preloaded"}] diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 5068acc60..114b7c462 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -183,6 +183,10 @@ def transfer_parallel(self, num_workers: int = None) -> None: logger.info("No wells to transfer") return + # Pre-load shared cached elevations on the main thread so workers + # mutate a single cache instance instead of racing lazy initialization. + self._get_cached_elevations() + # Calculate batch size batch_size = max(100, n // num_workers) batches = [df.iloc[i : i + batch_size] for i in range(0, n, batch_size)] From a3ce92cdcf17b6ec6064424f0229b811658f8efc Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 22:17:43 -0600 Subject: [PATCH 17/20] Update services/water_level_csv.py Accepted codex's commit suggestion. Seems benign, mostly focused on clean up Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- services/water_level_csv.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 6b2ba49fe..cf57e643e 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -399,10 +399,7 @@ def _create_records( "field_activity_id": field_activity.id, "sample_id": sample.id, "observation_id": observation.id, - "measurement_date_time": ( - row.raw.get("water_level_date_time") - or row.raw.get("measurement_date_time") - ), + "measurement_date_time": row.raw.get("water_level_date_time"), "level_status": row.level_status, "data_quality": row.data_quality, } From 91aaaf418031de7e73b519d133cfbe9362ebb850 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 22:20:19 -0600 Subject: [PATCH 18/20] fix(well-transfer): preload measuring point estimator before parallel workers --- tests/test_well_transfer.py | 63 +++++++++++++++++++++++++++++++++++++ transfers/well_transfer.py | 1 + 2 files changed, 64 insertions(+) diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py index b5d2067d9..05cda5b35 100644 --- a/tests/test_well_transfer.py +++ b/tests/test_well_transfer.py @@ -251,3 +251,66 @@ def fake_dump_cached_elevations(lut): assert load_calls == ["load"] assert dumped == [{"source": "preloaded"}] + + +def test_transfer_parallel_preloads_measuring_point_estimator_before_workers( + monkeypatch, +): + class FakePreloadSession: + def query(self, _model): + return self + + def all(self): + return [] + + def expunge_all(self): + pass + + class FakeFuture: + def result(self): + return {"errors": []} + + class FakeExecutor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def submit(self, fn, idx, batch): + assert transferer._measuring_point_estimator is estimator + return FakeFuture() + + @contextmanager + def fake_session_ctx(): + yield FakePreloadSession() + + dumped = [] + estimator = object() + build_calls = [] + + def fake_get_cached_elevations(): + return {} + + def fake_dump_cached_elevations(lut): + dumped.append(lut) + + def fake_estimator_ctor(): + build_calls.append("build") + return estimator + + transferer = wt.WellTransferer() + df = pd.DataFrame([{"PointID": "AR0001"}]) + + monkeypatch.setattr(wt, "session_ctx", fake_session_ctx) + monkeypatch.setattr(wt, "get_cached_elevations", fake_get_cached_elevations) + monkeypatch.setattr(wt, "dump_cached_elevations", fake_dump_cached_elevations) + monkeypatch.setattr(wt, "MeasuringPointEstimator", fake_estimator_ctor) + monkeypatch.setattr(wt, "ThreadPoolExecutor", lambda max_workers: FakeExecutor()) + monkeypatch.setattr(wt, "as_completed", lambda futures: list(futures)) + monkeypatch.setattr(transferer, "_get_dfs", lambda: (df, df.copy())) + + transferer.transfer_parallel(num_workers=2) + + assert build_calls == ["build"] + assert dumped == [{}] diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 114b7c462..e477eb3b5 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -186,6 +186,7 @@ def transfer_parallel(self, num_workers: int = None) -> None: # Pre-load shared cached elevations on the main thread so workers # mutate a single cache instance instead of racing lazy initialization. self._get_cached_elevations() + self._get_measuring_point_estimator() # Calculate batch size batch_size = max(100, n // num_workers) From a2584c98921a03bf86261a1b77c812a657b46499 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 22:32:30 -0600 Subject: [PATCH 19/20] feat(water-level-csv): warn when uploaded mp height differs from well history Report a row-level warning when a CSV mp_height differs from the well's existing measuring point height while still importing the row and using the CSV value. This makes the existing measuring point comparison visible to users instead of carrying unused state through validation. Update service and API tests to cover warning-only imports and mixed warning/error uploads. --- services/water_level_csv.py | 8 ++++ tests/test_observation.py | 18 +++++--- tests/test_water_level_csv_service.py | 63 +++++++++++++++++++++++++-- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 6b2ba49fe..26c38a83b 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -392,6 +392,14 @@ def _create_records( session.flush() savepoint.commit() + if row.mp_height_differs_from_history: + errors.append( + "Row " + f"{row.row_index}: CSV mp_height ({row.mp_height}) differs " + "from existing measuring point height " + f"({row.existing_mp_height}); CSV value will be used" + ) + created.append( { "well_name_point_id": row.raw["well_name_point_id"], diff --git a/tests/test_observation.py b/tests/test_observation.py index d1806788d..be43b8086 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -180,8 +180,11 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): assert response.status_code == 200 assert data["summary"]["total_rows_imported"] == 1 assert data["summary"]["total_rows_processed"] == 1 - assert data["summary"]["validation_errors_or_warnings"] == 0 - assert data["validation_errors"] == [] + assert data["summary"]["validation_errors_or_warnings"] == 1 + assert data["validation_errors"] == [ + "Row 1: CSV mp_height (1.5) differs from existing measuring point height " + "(2.0); CSV value will be used" + ] row = data["water_levels"][0] assert row["well_name_point_id"] == water_well_thing.name @@ -273,9 +276,14 @@ def test_bulk_upload_groundwater_levels_api_partial_success(water_well_thing): assert response.status_code == 200 assert data["summary"]["total_rows_imported"] == 1 assert data["summary"]["total_rows_processed"] == 2 - assert data["summary"]["validation_errors_or_warnings"] == 1 - assert len(data["validation_errors"]) == 1 - assert "Bad Well" in data["validation_errors"][0] + assert data["summary"]["validation_errors_or_warnings"] == 2 + assert len(data["validation_errors"]) == 2 + assert any( + "CSV mp_height (1.5) differs from existing measuring point height (2.0)" + in message + for message in data["validation_errors"] + ) + assert any("Bad Well" in message for message in data["validation_errors"]) row = data["water_levels"][0] with session_ctx() as session: diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index d0f8d6e15..64c145bd1 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -226,6 +226,55 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert observations[0].measuring_point_height == 1.5 +def test_bulk_upload_water_levels_warns_when_mp_height_differs_from_history( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Measurement with warning", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + assert result.payload["summary"]["total_rows_imported"] == 1 + assert result.payload["summary"]["validation_errors_or_warnings"] == 1 + assert result.payload["validation_errors"] == [ + "Row 1: CSV mp_height (1.5) differs from existing measuring point height " + "(2.0); CSV value will be used" + ] + + def test_bulk_upload_water_levels_preserves_unrelated_existing_observations( water_well_thing, ): @@ -402,9 +451,15 @@ def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( assert result.exit_code == 0 assert result.payload["summary"]["total_rows_processed"] == 2 assert result.payload["summary"]["total_rows_imported"] == 1 - assert result.payload["summary"]["validation_errors_or_warnings"] == 1 + assert result.payload["summary"]["validation_errors_or_warnings"] == 2 assert len(result.payload["water_levels"]) == 1 - assert ( - "Unknown well_name_point_id 'Unknown Well'" - in result.payload["validation_errors"][0] + assert len(result.payload["validation_errors"]) == 2 + assert any( + "CSV mp_height (1.5) differs from existing measuring point height (2.0)" + in message + for message in result.payload["validation_errors"] + ) + assert any( + "Unknown well_name_point_id 'Unknown Well'" in message + for message in result.payload["validation_errors"] ) From 256af686a7b5df92252fa463b0cbd8e1df37e39c Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Mon, 30 Mar 2026 22:38:30 -0600 Subject: [PATCH 20/20] test(well-transfer): isolate parallel preload tests from external estimator setup --- tests/test_well_transfer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py index 05cda5b35..373e70b11 100644 --- a/tests/test_well_transfer.py +++ b/tests/test_well_transfer.py @@ -243,6 +243,7 @@ def fake_dump_cached_elevations(lut): monkeypatch.setattr(wt, "session_ctx", fake_session_ctx) monkeypatch.setattr(wt, "get_cached_elevations", fake_get_cached_elevations) monkeypatch.setattr(wt, "dump_cached_elevations", fake_dump_cached_elevations) + monkeypatch.setattr(wt, "MeasuringPointEstimator", lambda: object()) monkeypatch.setattr(wt, "ThreadPoolExecutor", lambda max_workers: FakeExecutor()) monkeypatch.setattr(wt, "as_completed", lambda futures: list(futures)) monkeypatch.setattr(transferer, "_get_dfs", lambda: (df, df.copy()))