diff --git a/cli/cli.py b/cli/cli.py index 8a91f5f4..50b2b24a 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -666,9 +666,16 @@ def water_levels_bulk_upload( payload = result.payload if isinstance(result.payload, dict) else {} summary = payload.get("summary", {}) validation_errors = payload.get("validation_errors", []) + rows_with_issues = summary.get("validation_errors_or_warnings", 0) - if result.exit_code == 0: + if result.exit_code == 0 and not rows_with_issues: typer.secho("[WATER LEVEL IMPORT] SUCCESS", fg=colors["ok"], bold=True) + elif result.exit_code == 0: + typer.secho( + "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES", + fg=colors["issue"], + bold=True, + ) else: typer.secho( "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES", @@ -709,7 +716,6 @@ def water_levels_bulk_upload( if summary: processed = summary.get("total_rows_processed", 0) imported = summary.get("total_rows_imported", 0) - rows_with_issues = summary.get("validation_errors_or_warnings", 0) typer.secho("SUMMARY", fg=colors["accent"], bold=True) label_width = 16 value_width = 8 diff --git a/schemas/water_level_csv.py b/schemas/water_level_csv.py index 00d71eaf..32f33333 100644 --- a/schemas/water_level_csv.py +++ b/schemas/water_level_csv.py @@ -13,7 +13,219 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from pydantic import BaseModel +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Annotated + +from core.enums import DataQuality, GroundwaterLevelReason, SampleMethod +from pydantic import ( + AliasChoices, + BaseModel, + ConfigDict, + Field, + field_validator, + model_validator, +) +from pydantic.functional_validators import BeforeValidator + +from services.util import convert_dt_tz_naive_to_tz_aware + +WATER_LEVEL_REQUIRED_FIELDS = [ + "well_name_point_id", + "field_event_date_time", + "field_staff", + "water_level_date_time", + "measuring_person", + "sample_method", +] + +WATER_LEVEL_HEADER_ALIASES = { + "measurement_date_time": "water_level_date_time", + "sampler": "measuring_person", + "mp_height_ft": "mp_height", +} + +WATER_LEVEL_IGNORED_FIELDS = { + "hold(not saved)", + "cut(not saved)", +} + +SAMPLE_METHOD_ALIASES = { + "electric tape": "Electric tape measurement (E-probe)", + "steel tape": "Steel-tape measurement", +} +SAMPLE_METHOD_CANONICAL = { + value.lower(): value for value in SAMPLE_METHOD_ALIASES.values() +} +GROUNDWATER_LEVEL_REASON_ALIASES = { + "dry": "Site was dry", + "obstructed": ("Obstruction was encountered in the well (no level recorded)"), + "obstruction": ("Obstruction was encountered in the well (no level recorded)"), + "flowing": ( + "Site was flowing. Water level or head couldn't be measured " + "w/out additional equipment." + ), + "flowing recently": "Site was flowing recently.", + "pumped": "Site was being pumped", + "pumped recently": "Site was pumped recently", + "not affected": "Water level not affected", + "other": "Other conditions exist that would affect the level (remarks)", +} + + +def empty_str_to_none(value): + if isinstance(value, str) and value.strip() == "": + return None + return value + + +OptionalText = Annotated[str | None, BeforeValidator(empty_str_to_none)] +OptionalFloat = Annotated[float | None, BeforeValidator(empty_str_to_none)] + + +def _normalize_datetime_to_utc(value: datetime | str) -> datetime: + if isinstance(value, str): + value = datetime.fromisoformat(value) + elif not isinstance(value, datetime): + raise ValueError("value must be a datetime or ISO format string") + + if value.tzinfo is None: + value = convert_dt_tz_naive_to_tz_aware(value, "America/Denver") + + return value.astimezone(timezone.utc) + + +def _canonicalize_enum_value( + value: str | None, enum_cls, field_name: str +) -> str | None: + if value is None: + return None + + normalized = value.strip().lower() + for item in enum_cls: + if item.value.lower() == normalized: + return item.value + + raise ValueError(f"Unknown {field_name}: {value}") + + +class WaterLevelCsvRow(BaseModel): + model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) + + well_name_point_id: str + field_event_date_time: datetime + field_staff: str + field_staff_2: OptionalText = None + field_staff_3: OptionalText = None + water_level_date_time: datetime = Field( + validation_alias=AliasChoices( + "water_level_date_time", + "measurement_date_time", + ) + ) + measuring_person: str = Field( + validation_alias=AliasChoices("measuring_person", "sampler") + ) + sample_method: str + mp_height: OptionalFloat = Field( + default=None, + validation_alias=AliasChoices("mp_height", "mp_height_ft"), + ) + level_status: OptionalText = None + depth_to_water_ft: OptionalFloat = None + data_quality: OptionalText = None + water_level_notes: OptionalText = None + + @property + def measurement_date_time(self) -> datetime: + return self.water_level_date_time + + @property + def sampler(self) -> str: + return self.measuring_person + + @classmethod + def required_fields(cls) -> list[str]: + return list(WATER_LEVEL_REQUIRED_FIELDS) + + @classmethod + def header_aliases(cls) -> dict[str, str]: + return dict(WATER_LEVEL_HEADER_ALIASES) + + @classmethod + def ignored_fields(cls) -> set[str]: + return set(WATER_LEVEL_IGNORED_FIELDS) + + @staticmethod + def canonicalize_sample_method(value: str) -> str: + normalized = value.strip().lower() + if normalized in SAMPLE_METHOD_ALIASES: + return SAMPLE_METHOD_ALIASES[normalized] + if normalized in SAMPLE_METHOD_CANONICAL: + return SAMPLE_METHOD_CANONICAL[normalized] + return value.strip() + + @field_validator("sample_method") + @classmethod + def normalize_sample_method(cls, value: str) -> str: + return _canonicalize_enum_value( + cls.canonicalize_sample_method(value), + SampleMethod, + "sample_method", + ) + + @field_validator( + "field_event_date_time", + "water_level_date_time", + mode="before", + ) + @classmethod + def normalize_datetime_field(cls, value: datetime | str) -> datetime: + return _normalize_datetime_to_utc(value) + + @field_validator("depth_to_water_ft") + @classmethod + def validate_non_negative_depth_to_water(cls, value: float | None) -> float | None: + if value is not None and value < 0: + raise ValueError("depth_to_water_ft must be greater than or equal to 0") + return value + + @field_validator("level_status") + @classmethod + def normalize_level_status(cls, value: str | None) -> str | None: + if value is not None: + value = GROUNDWATER_LEVEL_REASON_ALIASES.get(value.strip().lower(), value) + return _canonicalize_enum_value(value, GroundwaterLevelReason, "level_status") + + @field_validator("data_quality") + @classmethod + def normalize_data_quality(cls, value: str | None) -> str | None: + return _canonicalize_enum_value(value, DataQuality, "data_quality") + + @model_validator(mode="after") + def validate_row_constraints(self) -> WaterLevelCsvRow: + field_staff = [ + staff + for staff in (self.field_staff, self.field_staff_2, self.field_staff_3) + if staff + ] + if self.measuring_person not in field_staff: + raise ValueError( + "measuring_person must match one of field_staff, " + "field_staff_2, or field_staff_3" + ) + + if self.water_level_date_time < self.field_event_date_time: + raise ValueError( + "water_level_date_time must be greater than or equal to " + "field_event_date_time" + ) + + if self.depth_to_water_ft is None and self.level_status is None: + raise ValueError("level_status is required when depth_to_water_ft is blank") + + return self class WaterLevelBulkUploadSummary(BaseModel): @@ -29,8 +241,8 @@ class WaterLevelBulkUploadRow(BaseModel): sample_id: int observation_id: int measurement_date_time: str - level_status: str - data_quality: str + level_status: str | None + data_quality: str | None class WaterLevelBulkUploadResponse(BaseModel): diff --git a/services/water_level_csv.py b/services/water_level_csv.py index f695fcd1..895e4099 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -19,7 +19,6 @@ import io import json import re -import uuid from dataclasses import dataclass from datetime import datetime from pathlib import Path @@ -27,43 +26,19 @@ from db import Thing, FieldEvent, FieldActivity, Sample, Observation, Parameter from db.engine import session_ctx -from pydantic import BaseModel, ConfigDict, ValidationError, field_validator +from pydantic import ValidationError +from schemas.water_level_csv import ( + WaterLevelCsvRow, + WATER_LEVEL_REQUIRED_FIELDS, + WATER_LEVEL_HEADER_ALIASES, + WATER_LEVEL_IGNORED_FIELDS, +) from sqlalchemy import select -from sqlalchemy.orm import Session - -# Required CSV columns for the bulk upload -REQUIRED_FIELDS: List[str] = [ - "field_staff", - "well_name_point_id", - "field_event_date_time", - "measurement_date_time", - "sampler", - "sample_method", - "mp_height", - "level_status", - "depth_to_water_ft", - "data_quality", -] - -HEADER_ALIASES: dict[str, str] = { - "measuring_person": "sampler", - "water_level_date_time": "measurement_date_time", -} - -# Allow-list values for validation. These represent early MVP lexicon values. -VALID_LEVEL_STATUSES = {"stable", "rising", "falling"} -VALID_DATA_QUALITIES = {"approved", "provisional"} -VALID_SAMPLERS = {"groundwater team", "consultant"} - -# Mapping between human-friendly sample methods provided in CSV uploads and -# their canonical lexicon terms stored in the database. -SAMPLE_METHOD_ALIASES = { - "electric tape": "Electric tape measurement (E-probe)", - "steel tape": "Steel-tape measurement", -} -SAMPLE_METHOD_CANONICAL = { - value.lower(): value for value in SAMPLE_METHOD_ALIASES.values() -} +from sqlalchemy.orm import Session, selectinload + +REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) +HEADER_ALIASES: dict[str, str] = dict(WATER_LEVEL_HEADER_ALIASES) +IGNORED_FIELDS: set[str] = set(WATER_LEVEL_IGNORED_FIELDS) @dataclass @@ -84,91 +59,16 @@ class _ValidatedRow: sample_method_term: str field_event_dt: datetime measurement_dt: datetime - mp_height: float - depth_to_water_ft: float - level_status: str - data_quality: str + mp_height: float | None + resolved_mp_height: float | int | None + existing_mp_height: float | int | None + mp_height_differs_from_history: bool + depth_to_water_ft: float | None + level_status: str | None + data_quality: str | None water_level_notes: str | None -class WaterLevelCsvRow(BaseModel): - model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) - - field_staff: str - well_name_point_id: str - field_event_date_time: datetime - measurement_date_time: datetime - sampler: str - sample_method: str - mp_height: float - level_status: str - depth_to_water_ft: float - data_quality: str - water_level_notes: str | None = None - - @field_validator( - "field_staff", - "well_name_point_id", - "sampler", - "sample_method", - "level_status", - "data_quality", - ) - @classmethod - def _require_value(cls, value: str) -> str: - if value is None or value == "": - raise ValueError("value is required") - return value - - @field_validator("sampler") - @classmethod - def _validate_sampler(cls, value: str) -> str: - if value.lower() not in VALID_SAMPLERS: - raise ValueError( - f"Invalid sampler '{value}'. Expected one of: {sorted(VALID_SAMPLERS)}" - ) - return value - - @field_validator("level_status") - @classmethod - def _validate_level_status(cls, value: str) -> str: - if value.lower() not in VALID_LEVEL_STATUSES: - raise ValueError( - f"Invalid level_status '{value}'. Expected one of: {sorted(VALID_LEVEL_STATUSES)}" - ) - return value - - @field_validator("data_quality") - @classmethod - def _validate_data_quality(cls, value: str) -> str: - if value.lower() not in VALID_DATA_QUALITIES: - raise ValueError( - f"Invalid data_quality '{value}'. Expected one of: {sorted(VALID_DATA_QUALITIES)}" - ) - return value - - @field_validator("sample_method") - @classmethod - def _normalize_sample_method(cls, value: str) -> str: - normalized = value.lower() - if normalized in SAMPLE_METHOD_ALIASES: - return SAMPLE_METHOD_ALIASES[normalized] - if normalized in SAMPLE_METHOD_CANONICAL: - return SAMPLE_METHOD_CANONICAL[normalized] - raise ValueError( - f"Invalid sample_method '{value}'. Expected one of: {sorted(SAMPLE_METHOD_ALIASES.keys())}" - ) - - @field_validator("water_level_notes", mode="before") - @classmethod - def _empty_to_none(cls, value: str | None) -> str | None: - if value is None: - return None - if isinstance(value, str) and value.strip() == "": - return None - return value - - def bulk_upload_water_levels( source_file: str | Path | bytes | BinaryIO, *, pretty_json: bool = False ) -> BulkUploadResult: @@ -180,15 +80,19 @@ def bulk_upload_water_levels( msg = f"File not found: {source_file}" payload = _build_payload([], [], 0, 0, 1, errors=[msg]) stdout = _serialize_payload(payload, pretty_json) - return BulkUploadResult(exit_code=1, stdout=stdout, stderr=msg, payload=payload) + return BulkUploadResult( + exit_code=1, + stdout=stdout, + stderr=msg, + payload=payload, + ) validation_errors: list[str] = [] created_rows: list[dict[str, Any]] = [] with session_ctx() as session: - parameter_id = _get_groundwater_level_parameter_id(session) - - # Validate headers early so we can short-circuit without touching the DB. + # Validate headers early so we can short-circuit + # without touching the DB. header_errors = _validate_headers(headers) if header_errors: validation_errors.extend(header_errors) @@ -196,20 +100,23 @@ def bulk_upload_water_levels( valid_rows, row_errors = _validate_rows(session, csv_rows) validation_errors.extend(row_errors) - if not validation_errors: + if valid_rows: try: - created_rows = _create_records(session, parameter_id, valid_rows) + parameter_id = _get_groundwater_level_parameter_id(session) + created_rows, persistence_errors = _create_records( + session, + parameter_id, + valid_rows, + ) + validation_errors.extend(persistence_errors) session.commit() except Exception as exc: # pragma: no cover - safety fallback session.rollback() validation_errors.append(str(exc)) - if validation_errors: - session.rollback() - summary = { "total_rows_processed": len(csv_rows), - "total_rows_imported": len(created_rows) if not validation_errors else 0, + "total_rows_imported": len(created_rows), "validation_errors_or_warnings": _count_rows_with_issues(validation_errors), } payload = _build_payload( @@ -217,7 +124,7 @@ def bulk_upload_water_levels( ) stdout = _serialize_payload(payload, pretty_json) stderr = "\n".join(validation_errors) - exit_code = 0 if not validation_errors else 1 + exit_code = 0 if created_rows or not validation_errors else 1 return BulkUploadResult( exit_code=exit_code, stdout=stdout, stderr=stderr, payload=payload ) @@ -288,16 +195,23 @@ def _read_csv( for k, v in row.items(): if k is None: continue - key = HEADER_ALIASES.get(k.strip(), k.strip()) + stripped_key = k.strip() + if stripped_key in IGNORED_FIELDS: + continue + key = HEADER_ALIASES.get(stripped_key, stripped_key) value = v.strip() if isinstance(v, str) else v or "" - # If both alias and canonical header are present, preserve first non-empty value. + # If both alias and canonical headers are present, keep the later + # non-empty value in CSV column order. An empty later value does not + # overwrite an earlier non-empty value. if key in normalized_row and normalized_row[key] and not value: continue normalized_row[key] = value rows.append(normalized_row) headers = [ - HEADER_ALIASES.get(h.strip(), h.strip()) for h in (reader.fieldnames or []) + HEADER_ALIASES.get(h.strip(), h.strip()) + for h in (reader.fieldnames or []) + if h is not None and h.strip() not in IGNORED_FIELDS ] return headers, rows @@ -337,24 +251,50 @@ def _validate_rows( well_name = model.well_name_point_id well = wells_by_name.get(well_name) if well is None: - sql = select(Thing).where(Thing.name == well_name) + sql = ( + select(Thing) + .options(selectinload(Thing.measuring_points)) + .where( + Thing.name == well_name, + Thing.thing_type == "water well", + ) + ) well = session.scalars(sql).one_or_none() if well is None: errors.append(f"Row {idx}: Unknown well_name_point_id '{well_name}'") continue wells_by_name[well_name] = well + ( + resolved_mp_height, + existing_mp_height, + mp_height_differs_from_history, + ) = _resolve_measuring_point_height(well, model.mp_height) + + depth_error = _validate_depth_to_water_against_well( + idx, + well, + model.depth_to_water_ft, + resolved_mp_height, + ) + if depth_error: + errors.append(depth_error) + continue + valid_rows.append( _ValidatedRow( row_index=idx, raw={**normalized}, well=well, field_staff=model.field_staff, - sampler=model.sampler, + sampler=model.measuring_person, sample_method_term=model.sample_method, field_event_dt=model.field_event_date_time, - measurement_dt=model.measurement_date_time, + measurement_dt=model.water_level_date_time, mp_height=model.mp_height, + resolved_mp_height=resolved_mp_height, + existing_mp_height=existing_mp_height, + mp_height_differs_from_history=mp_height_differs_from_history, depth_to_water_ft=model.depth_to_water_ft, level_status=model.level_status, data_quality=model.data_quality, @@ -365,61 +305,177 @@ def _validate_rows( return valid_rows, errors +def _resolve_measuring_point_height( + well: Thing, csv_mp_height: float | None +) -> tuple[float | int | None, float | int | None, bool]: + existing_mp_height = well.measuring_point_height + if existing_mp_height is not None: + existing_mp_height = float(existing_mp_height) + if csv_mp_height is not None: + return ( + csv_mp_height, + existing_mp_height, + (existing_mp_height is not None and csv_mp_height != existing_mp_height), + ) + + return existing_mp_height, existing_mp_height, False + + +def _validate_depth_to_water_against_well( + row_index: int, + well: Thing, + depth_to_water_ft: float | None, + resolved_mp_height: float | int | None, +) -> str | None: + well_depth = well.well_depth + if well_depth is not None: + well_depth = float(well_depth) + + if depth_to_water_ft is None or resolved_mp_height is None or well_depth is None: + return None + + corrected_depth_to_water = depth_to_water_ft - resolved_mp_height + if corrected_depth_to_water >= well_depth: + return ( + f"Row {row_index}: depth_to_water_ft minus measuring point height " + f"({corrected_depth_to_water}) must be less than well depth " + f"({well_depth})" + ) + + return None + + def _create_records( session: Session, parameter_id: int, rows: list[_ValidatedRow] -) -> list[dict[str, Any]]: +) -> tuple[list[dict[str, Any]], list[str]]: created: list[dict[str, Any]] = [] + errors: list[str] = [] for row in rows: - field_event = FieldEvent( - thing=row.well, - event_date=row.field_event_dt, - notes=_build_field_event_notes(row), - ) - field_activity = FieldActivity( - field_event=field_event, - activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", - ) - sample = Sample( - field_activity=field_activity, - sample_date=row.measurement_dt, - sample_name=f"wl-{uuid.uuid4()}", - sample_matrix="water", - sample_method=row.sample_method_term, - qc_type="Normal", - notes=row.water_level_notes, - ) - observation = Observation( - sample=sample, - observation_datetime=row.measurement_dt, - parameter_id=parameter_id, - value=row.depth_to_water_ft, - unit="ft", - measuring_point_height=row.mp_height, - groundwater_level_reason=None, - notes=_build_observation_notes(row), + savepoint = None + try: + savepoint = session.begin_nested() + sample_name = _build_sample_name(row) + sample = _find_existing_imported_sample(session, row, sample_name) + + if sample is None: + field_event = FieldEvent( + thing=row.well, + event_date=row.field_event_dt, + notes=_build_field_event_notes(row), + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes=f"Sampler: {row.sampler}", + ) + sample = Sample(field_activity=field_activity) + observation = Observation(sample=sample) + session.add(field_event) + session.add(field_activity) + session.add(sample) + session.add(observation) + else: + field_activity = sample.field_activity + field_event = field_activity.field_event + observation = _find_existing_observation(sample, parameter_id) + if observation is None: + observation = Observation(sample=sample) + session.add(observation) + + field_event.event_date = row.field_event_dt + field_event.notes = _build_field_event_notes(row) + field_activity.notes = f"Sampler: {row.sampler}" + + _apply_sample_values(sample, row, sample_name) + _apply_observation_values(observation, row, parameter_id) + session.flush() + savepoint.commit() + + if row.mp_height_differs_from_history: + errors.append( + "Row " + f"{row.row_index}: CSV mp_height ({row.mp_height}) differs " + "from existing measuring point height " + f"({row.existing_mp_height}); CSV value will be used" + ) + + created.append( + { + "well_name_point_id": row.raw["well_name_point_id"], + "field_event_id": field_event.id, + "field_activity_id": field_activity.id, + "sample_id": sample.id, + "observation_id": observation.id, + "measurement_date_time": row.raw.get("water_level_date_time"), + "level_status": row.level_status, + "data_quality": row.data_quality, + } + ) + except Exception as exc: # pragma: no cover - exercised via DB tests + if savepoint is not None and savepoint.is_active: + savepoint.rollback() + else: + session.expire_all() + errors.append(f"Row {row.row_index}: {exc}") + + return created, errors + + +def _build_sample_name(row: _ValidatedRow) -> str: + return f"{row.well.name}-WL-{row.measurement_dt.strftime('%Y%m%d%H%M')}" + + +def _find_existing_imported_sample( + session: Session, row: _ValidatedRow, sample_name: str +) -> Sample | None: + sql = ( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .options( + selectinload(Sample.field_activity).selectinload(FieldActivity.field_event), + selectinload(Sample.observations), ) - session.add(field_event) - session.add(field_activity) - session.add(sample) - session.add(observation) - session.flush() - - created.append( - { - "well_name_point_id": row.raw["well_name_point_id"], - "field_event_id": field_event.id, - "field_activity_id": field_activity.id, - "sample_id": sample.id, - "observation_id": observation.id, - "measurement_date_time": row.raw["measurement_date_time"], - "level_status": row.level_status, - "data_quality": row.data_quality, - } + .where( + Thing.name == row.well.name, + Thing.thing_type == "water well", + FieldActivity.activity_type == "groundwater level", + Sample.sample_name == sample_name, ) + .order_by(Sample.id.asc()) + ) + return session.scalars(sql).first() + + +def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation | None: + for observation in sample.observations: + if observation.parameter_id == parameter_id: + return observation + return None + + +def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None: + sample.sample_date = row.measurement_dt + sample.sample_name = sample_name + sample.sample_matrix = "groundwater" + sample.sample_method = row.sample_method_term + sample.qc_type = "Normal" + sample.notes = row.water_level_notes + - return created +def _apply_observation_values( + observation: Observation, row: _ValidatedRow, parameter_id: int +) -> None: + observation.observation_datetime = row.measurement_dt + observation.parameter_id = parameter_id + observation.value = row.depth_to_water_ft + observation.unit = "ft" + observation.measuring_point_height = row.resolved_mp_height + observation.groundwater_level_reason = row.level_status + observation.nma_data_quality = row.data_quality + observation.notes = row.water_level_notes def _build_field_event_notes(row: _ValidatedRow) -> str | None: @@ -430,18 +486,24 @@ def _build_field_event_notes(row: _ValidatedRow) -> str | None: return notes or None -def _build_observation_notes(row: _ValidatedRow) -> str | None: - parts = [f"Level status: {row.level_status}", f"Data quality: {row.data_quality}"] - notes = " | ".join(parts) - return notes or None - - def _get_groundwater_level_parameter_id(session: Session) -> int: - sql = select(Parameter.id).where(Parameter.parameter_name == "groundwater level") + sql = select(Parameter.id).where( + Parameter.parameter_name == "groundwater level", + Parameter.matrix == "groundwater", + ) parameter_id = session.scalars(sql).one_or_none() - if parameter_id is None: - raise RuntimeError("Groundwater level parameter is not initialized") - return parameter_id + if parameter_id is not None: + return parameter_id + + parameter = Parameter( + parameter_name="groundwater level", + matrix="groundwater", + parameter_type="Field Parameter", + default_unit="ft", + ) + session.add(parameter) + session.flush() + return parameter.id # ============= EOF ============================================= diff --git a/tests/features/steps/water-levels-csv.py b/tests/features/steps/water-levels-csv.py index 4a8d6b57..05257163 100644 --- a/tests/features/steps/water-levels-csv.py +++ b/tests/features/steps/water-levels-csv.py @@ -23,27 +23,50 @@ from db import Observation from db.engine import session_ctx from services.water_level_csv import bulk_upload_water_levels +from tests.features.environment import ( + add_location, + add_measuring_point_history, + add_well, +) REQUIRED_FIELDS: List[str] = [ "field_staff", "well_name_point_id", "field_event_date_time", - "measurement_date_time", - "sampler", + "water_level_date_time", + "measuring_person", "sample_method", +] +OPTIONAL_FIELDS = [ + "field_staff_2", + "field_staff_3", "mp_height", "level_status", "depth_to_water_ft", "data_quality", + "water_level_notes", +] +VALID_SAMPLE_METHODS = [ + "Electric tape measurement (E-probe)", + "Steel-tape measurement", +] +VALID_LEVEL_STATUSES = ["Water level not affected", "Site was dry"] +VALID_DATA_QUALITIES = [ + "Water level accurate to within two hundreths of a foot", + "None", ] -OPTIONAL_FIELDS = ["water_level_notes"] -VALID_SAMPLERS = ["Groundwater Team", "Consultant"] -VALID_SAMPLE_METHODS = ["electric tape", "steel tape"] -VALID_LEVEL_STATUSES = ["stable", "rising", "falling"] -VALID_DATA_QUALITIES = ["approved", "provisional"] def _available_well_names(context: Context) -> list[str]: + if "wells" not in context.objects or not context.objects["wells"]: + with session_ctx() as session: + loc_1 = add_location(context, session) + loc_2 = add_location(context, session) + well_1 = add_well(context, session, loc_1, name_num=101) + well_2 = add_well(context, session, loc_2, name_num=102) + add_measuring_point_history(context, session, well_1) + add_measuring_point_history(context, session, well_2) + if not hasattr(context, "well_names"): context.well_names = [well.name for well in context.objects["wells"]] return context.well_names @@ -53,16 +76,19 @@ def _base_row(context: Context, index: int) -> Dict[str, str]: well_names = _available_well_names(context) well_name = well_names[(index - 1) % len(well_names)] measurement_day = 14 + index + field_staff = "A Lopez" if index == 1 else "B Chen" return { - "field_staff": "A Lopez" if index == 1 else "B Chen", + "field_staff": field_staff, + "field_staff_2": "", + "field_staff_3": "", "well_name_point_id": well_name, - "field_event_date_time": f"2025-02-{measurement_day:02d}T08:00:00-07:00", - "measurement_date_time": f"2025-02-{measurement_day:02d}T10:30:00-07:00", - "sampler": VALID_SAMPLERS[(index - 1) % len(VALID_SAMPLERS)], + "field_event_date_time": f"2025-02-{measurement_day:02d}T08:00:00", + "water_level_date_time": f"2025-02-{measurement_day:02d}T10:30:00", + "measuring_person": field_staff, "sample_method": VALID_SAMPLE_METHODS[(index - 1) % len(VALID_SAMPLE_METHODS)], "mp_height": "1.5" if index == 1 else "1.8", "level_status": VALID_LEVEL_STATUSES[(index - 1) % len(VALID_LEVEL_STATUSES)], - "depth_to_water_ft": "45.2" if index == 1 else "47.0", + "depth_to_water_ft": "7.0" if index == 1 else "", "data_quality": VALID_DATA_QUALITIES[(index - 1) % len(VALID_DATA_QUALITIES)], "water_level_notes": "Initial measurement" if index == 1 else "Follow-up", } @@ -89,6 +115,7 @@ def _write_csv_to_context(context: Context) -> None: temp_file.close() context.csv_file = str(Path(temp_file.name)) context.csv_raw_text = csv_text + context.file_content = csv_text def _set_rows( @@ -146,21 +173,53 @@ def step_given_each_well_name_point_id_value_matches_an_existing_well(context: C @given( - '"measurement_date_time" values are valid ISO 8601 timestamps with timezone offsets (e.g. "2025-02-15T10:30:00-08:00")' + '"field_event_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T08:00:00")' ) -def step_step_step(context: Context): +def step_given_field_event_date_time_values_are_valid_naive_iso_datetimes( + context: Context, +): for row in context.csv_rows: - assert row["measurement_date_time"].startswith("2025-02") - assert "T" in row["measurement_date_time"] + assert row["field_event_date_time"].startswith("2025-02") + assert "T" in row["field_event_date_time"] + assert "+" not in row["field_event_date_time"] + assert row["field_event_date_time"].count(":") == 2 -# @given("the water level CSV includes optional fields when available:") -# def step_impl(context: Context): -# field_name = context.table.headings[0] -# optional_fields = [row[field_name].strip() for row in context.table] -# headers = set(context.csv_headers) -# missing = [field for field in optional_fields if field not in headers] -# assert not missing, f"Missing optional headers: {missing}" +@given( + '"water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00")' +) +def step_given_water_level_date_time_values_are_valid_naive_iso_datetimes( + context: Context, +): + for row in context.csv_rows: + assert row["water_level_date_time"].startswith("2025-02") + assert "T" in row["water_level_date_time"] + assert "+" not in row["water_level_date_time"] + assert row["water_level_date_time"].count(":") == 2 + + +@given( + 'when provided, "sample_method", "level_status", and "data_quality" values are valid lexicon values' +) +def step_given_lexicon_values_are_valid(context: Context): + for row in context.csv_rows: + if row.get("sample_method"): + assert row["sample_method"] in VALID_SAMPLE_METHODS + if row.get("level_status"): + assert row["level_status"] in VALID_LEVEL_STATUSES + if row.get("data_quality"): + assert row["data_quality"] in VALID_DATA_QUALITIES + + +@given("the water level CSV includes optional fields when available:") +def step_given_the_water_level_csv_includes_optional_fields_when_available( + context: Context, +): + field_name = context.table.headings[0] + optional_fields = [row[field_name].strip() for row in context.table] + headers = set(context.csv_headers) + missing = [field for field in optional_fields if field not in headers] + assert not missing, f"Missing optional headers: {missing}" @when("I run the CLI command:") @@ -217,10 +276,26 @@ def step_then_stderr_should_be_empty(context: Context): # ============================================================================ # Scenario: Upload succeeds when required columns are present but reordered # ============================================================================ +@given( + "my water level CSV file uses legacy alias headers for measurement date, sampler, and measuring point height" +) +def step_given_my_water_level_csv_file_uses_legacy_alias_headers(context: Context): + rows = _build_valid_rows(context) + alias_rows = [] + for row in rows: + alias_row = dict(row) + alias_row["measurement_date_time"] = alias_row.pop("water_level_date_time") + alias_row["sampler"] = alias_row.pop("measuring_person") + alias_row["mp_height_ft"] = alias_row.pop("mp_height") + alias_rows.append(alias_row) + headers = list(alias_rows[0].keys()) + _set_rows(context, alias_rows, headers=headers) + + @given( "my water level CSV file contains all required headers but in a different column order" ) -def step_step_step_2(context: Context): +def step_given_my_water_level_csv_file_contains_reordered_headers(context: Context): rows = _build_valid_rows(context) headers = list(reversed(list(rows[0].keys()))) _set_rows(context, rows, headers=headers) @@ -299,13 +374,13 @@ def step_then_stderr_should_contain_a_validation_error_for_the_required_field( # Scenario: Upload fails due to invalid date formats # ============================================================================ @given( - 'my CSV file contains invalid ISO 8601 date values in the "measurement_date_time" field' + 'my CSV file contains invalid ISO 8601 date values in the "water_level_date_time" field' ) def step_step_step_6(context: Context): rows = _build_valid_rows(context, count=1) - rows[0]["measurement_date_time"] = "02/15/2025 10:30" + rows[0]["water_level_date_time"] = "02/15/2025 10:30" _set_rows(context, rows) - context.invalid_fields = ["measurement_date_time"] + context.invalid_fields = ["water_level_date_time"] @then("stderr should contain validation errors identifying the invalid field and row") @@ -323,7 +398,7 @@ def step_then_stderr_should_contain_validation_errors_identifying_the_invalid_fi # Scenario: Upload fails due to invalid numeric fields # ============================================================================ @given( - 'my CSV file contains values that cannot be parsed as numeric in numeric-required fields such as "mp_height" or "depth_to_water_ft"' + 'my CSV file contains values that cannot be parsed as numeric in numeric fields such as "mp_height" or "depth_to_water_ft"' ) def step_step_step_7(context: Context): rows = _build_valid_rows(context, count=1) @@ -337,21 +412,31 @@ def step_step_step_7(context: Context): # Scenario: Upload fails due to invalid lexicon values # ============================================================================ @given( - 'my CSV file contains invalid lexicon values for "sampler", "sample_method", "level_status", or "data_quality"' + 'my CSV file contains invalid lexicon values for "sample_method", "level_status", or "data_quality"' ) def step_step_step_8(context: Context): rows = _build_valid_rows(context, count=1) - rows[0]["sampler"] = "Unknown Team" rows[0]["sample_method"] = "mystery" rows[0]["level_status"] = "supercharged" rows[0]["data_quality"] = "bad" _set_rows(context, rows) context.invalid_fields = [ - "sampler", "sample_method", "level_status", "data_quality", ] +@given( + "my water level CSV file contains a row where measuring_person is not one of the supplied field staff" +) +def step_given_measuring_person_is_not_one_of_the_supplied_field_staff( + context: Context, +): + rows = _build_valid_rows(context, count=1) + rows[0]["measuring_person"] = "Unexpected Person" + _set_rows(context, rows) + context.invalid_fields = ["measuring_person"] + + # ============= EOF ============================================= diff --git a/tests/features/water-level-csv.feature b/tests/features/water-level-csv.feature index 701718c3..1c3b7511 100644 --- a/tests/features/water-level-csv.feature +++ b/tests/features/water-level-csv.feature @@ -1,21 +1,10 @@ -# features/cli/bulk_upload_water_levels.feature - @cli @backend @BDMS-TBD Feature: Bulk upload water level entries from CSV via CLI As a hydrogeologist or data specialist I want to upload a CSV file containing water level entry data for multiple wells using a CLI command - So that water level records can be created efficiently and accurately in the system - -# Background: -# Given the CLI binary "bdms" is installed and available on the PATH -# And I have a valid CLI configuration for the target environment -# And valid lexicon values exist for: -# | lexicon category | -# | sample_method | -# | level_status | -# | data_quality | + So that groundwater-level records can be created efficiently and accurately in the system @positive @happy_path @BDMS-TBD @cleanup_samples Scenario: Uploading a valid water level entry CSV containing required and optional fields @@ -27,21 +16,21 @@ Feature: Bulk upload water level entries from CSV via CLI | field_staff | | well_name_point_id | | field_event_date_time | + | water_level_date_time | | measuring_person | + | sample_method | And each "well_name_point_id" value matches an existing well And "field_event_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T08:00:00") And "water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00") And when provided, "sample_method", "level_status", and "data_quality" values are valid lexicon values - And the CSV includes optional fields when available: + And the water level CSV includes optional fields when available: | optional field name | | field_staff_2 | | field_staff_3 | - | water_level_date_time | - | sample_method | - | mp_height | - | level_status | - | depth_to_water_ft | - | data_quality | + | mp_height | + | level_status | + | depth_to_water_ft | + | data_quality | | water_level_notes | When I run the CLI command: """ @@ -57,57 +46,52 @@ Feature: Bulk upload water level entries from CSV via CLI And stdout includes an array of created water level entry objects And stderr should be empty - @positive @validation @column_order @BDMS-TBD @cleanup_samples - Scenario: Upload succeeds when required columns are present but in a different order - Given my water level CSV file contains all required headers but in a different column order - And the CSV includes required fields: - | required field name | - | field_staff | - | well_name_point_id | - | field_event_date_time | - | measuring_person | + @positive @validation @aliases @BDMS-TBD @cleanup_samples + Scenario: Upload succeeds when legacy alias headers are used + Given my water level CSV file uses legacy alias headers for measurement date, sampler, and measuring point height When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ - # assumes users are entering datetimes as Mountain Time because well location is restricted to New Mexico - Then all datetime objects are assigned the correct Mountain Time timezone offset based on the date value. - And the command exits with code 0 + Then the command exits with code 0 + And stdout should be valid JSON And all water level entries are imported And stderr should be empty @positive @validation @extra_columns @BDMS-TBD @cleanup_samples - Scenario: Upload succeeds when CSV contains extra, unknown columns + Scenario: Upload succeeds when CSV contains extra columns Given my water level CSV file contains extra columns but is otherwise valid When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with code 0 + And stdout should be valid JSON And all water level entries are imported And stderr should be empty - ########################################################################### - # NEGATIVE VALIDATION SCENARIOS - ########################################################################### - - @negative @validation @BDMS-TBD - Scenario: No water level entries are imported when any row fails validation + @positive @validation @partial_success @BDMS-TBD @cleanup_samples + Scenario: Valid rows are imported when another row fails validation Given my water level CSV contains 3 rows with 2 valid rows and 1 row missing the required "well_name_point_id" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ - Then the command exits with a non-zero exit code + Then the command exits with code 0 + And stdout should be valid JSON + And stdout includes a summary containing: + | summary_field | value | + | total_rows_processed | 3 | + | total_rows_imported | 2 | + | validation_errors_or_warnings | 1 | And stderr should contain a validation error for the row missing "well_name_point_id" - And no water level entries are imported @negative @validation @required_fields @BDMS-TBD Scenario Outline: Upload fails when a required field is missing Given my water level CSV file contains a row missing the required "" field When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain a validation error for the "" field @@ -118,14 +102,16 @@ Feature: Bulk upload water level entries from CSV via CLI | field_staff | | well_name_point_id | | field_event_date_time | + | water_level_date_time | | measuring_person | + | sample_method | @negative @validation @date_formats @BDMS-TBD Scenario: Upload fails due to invalid date formats Given my CSV file contains invalid ISO 8601 date values in the "water_level_date_time" field When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row @@ -133,10 +119,10 @@ Feature: Bulk upload water level entries from CSV via CLI @negative @validation @numeric_fields @BDMS-TBD Scenario: Upload fails due to invalid numeric fields - Given my CSV file contains values that cannot be parsed as numeric in numeric-required fields such as "mp_height" or "depth_to_water_ft" + Given my CSV file contains values that cannot be parsed as numeric in numeric fields such as "mp_height" or "depth_to_water_ft" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row @@ -147,7 +133,18 @@ Feature: Bulk upload water level entries from CSV via CLI Given my CSV file contains invalid lexicon values for "sample_method", "level_status", or "data_quality" When I run the CLI command: """ - oco water-levels bulk-upload --file ./water_levels.csv + oco water-levels bulk-upload --file ./water_levels.csv --output json + """ + Then the command exits with a non-zero exit code + And stderr should contain validation errors identifying the invalid field and row + And no water level entries are imported + + @negative @validation @measuring_person @BDMS-TBD + Scenario: Upload fails when measuring_person does not match supplied field staff + Given my water level CSV file contains a row where measuring_person is not one of the supplied field staff + When I run the CLI command: + """ + oco water-levels bulk-upload --file ./water_levels.csv --output json """ Then the command exits with a non-zero exit code And stderr should contain validation errors identifying the invalid field and row diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index fb351fbd..a1d4515f 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -643,6 +643,39 @@ def fake_upload(file_path, *, pretty_json=False): assert captured["pretty_json"] is True +def test_water_levels_bulk_upload_reports_partial_success(monkeypatch, tmp_path): + csv_file = tmp_path / "water_levels.csv" + csv_file.write_text("col\nvalue\n") + + def fake_upload(_file_path, *, pretty_json=False): + assert pretty_json is False + return SimpleNamespace( + exit_code=0, + stdout="", + stderr="Row 2: Unknown well_name_point_id 'Bad Well'", + payload={ + "summary": { + "total_rows_processed": 2, + "total_rows_imported": 1, + "validation_errors_or_warnings": 1, + }, + "validation_errors": ["Row 2: Unknown well_name_point_id 'Bad Well'"], + "water_levels": [{}], + }, + ) + + monkeypatch.setattr("cli.service_adapter.water_levels_csv", fake_upload) + + runner = CliRunner() + result = runner.invoke( + cli, ["water-levels", "bulk-upload", "--file", str(csv_file)] + ) + + assert result.exit_code == 0, result.output + assert "[WATER LEVEL IMPORT] COMPLETED WITH ISSUES" in result.output + assert "rows_with_issues" in result.output + + def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ End-to-end CLI invocation should create FieldEvent, Sample, @@ -658,8 +691,10 @@ def _write_csv(path: Path, *, well_name: str, notes: str): ) row = ( f"CLI Tester,{well_name},2025-02-15T08:00:00-07:00," - "2025-02-15T10:30:00-07:00,Groundwater Team,electric tape," - f"1.5,stable,42.5,approved,{notes}" + "2025-02-15T10:30:00-07:00,CLI Tester,electric tape," + f"1.5,Water level not affected,7.0," + "Water level accurate to within two hundreths of a foot," + f"{notes}" ) csv_text = textwrap.dedent(f"""\ {header} @@ -697,10 +732,16 @@ def _write_csv(path: Path, *, well_name: str, notes: str): assert field_event.thing_id == water_well_thing.id assert sample.sample_method == "Electric tape measurement (E-probe)" - assert sample.sample_matrix == "water" - assert observation.value == 42.5 + assert sample.sample_matrix == "groundwater" + assert sample.sample_name == f"{water_well_thing.name}-WL-202502151730" + assert observation.value == 7.0 assert observation.measuring_point_height == 1.5 - assert observation.notes == "Level status: stable | Data quality: approved" + assert observation.notes == unique_notes + assert observation.groundwater_level_reason == "Water level not affected" + assert ( + observation.nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) assert ( field_event.notes == f"Field staff: CLI Tester | {unique_notes}" ), "Field event notes should capture field staff and notes" diff --git a/tests/test_observation.py b/tests/test_observation.py index 386c823c..be43b808 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -161,12 +161,12 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): water_well_thing.name, "2025-02-15T08:00:00-07:00", "2025-02-15T10:30:00-07:00", - "Groundwater Team", + "A Lopez", "electric tape", "1.5", - "stable", - "45.2", - "approved", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", "Initial measurement", ] ) @@ -180,18 +180,30 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): assert response.status_code == 200 assert data["summary"]["total_rows_imported"] == 1 assert data["summary"]["total_rows_processed"] == 1 - assert data["summary"]["validation_errors_or_warnings"] == 0 - assert data["validation_errors"] == [] + assert data["summary"]["validation_errors_or_warnings"] == 1 + assert data["validation_errors"] == [ + "Row 1: CSV mp_height (1.5) differs from existing measuring point height " + "(2.0); CSV value will be used" + ] row = data["water_levels"][0] assert row["well_name_point_id"] == water_well_thing.name with session_ctx() as session: observation = session.get(Observation, row["observation_id"]) assert observation is not None + sample = session.get(Sample, row["sample_id"]) + assert sample is not None + assert sample.sample_name == f"{water_well_thing.name}-WL-202502151730" + assert sample.sample_matrix == "groundwater" + assert observation.groundwater_level_reason == "Water level not affected" + assert ( + observation.nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) + assert observation.measuring_point_height == 1.5 # cleanup in reverse dependency order if observation: session.delete(observation) - sample = session.get(Sample, row["sample_id"]) if sample: session.delete(sample) field_activity = session.get(FieldActivity, row["field_activity_id"]) @@ -203,6 +215,94 @@ def test_bulk_upload_groundwater_levels_api(water_well_thing): session.commit() +def test_bulk_upload_groundwater_levels_api_partial_success(water_well_thing): + csv_content = ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ) + csv_content += "\n" + csv_content += "\n".join( + [ + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ",".join( + [ + "A Lopez", + "Bad Well", + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Bad row", + ] + ), + ] + ) + + files = { + "file": ("water_levels.csv", csv_content, "text/csv"), + } + + response = client.post("/observation/groundwater-level/bulk-upload", files=files) + data = response.json() + assert response.status_code == 200 + assert data["summary"]["total_rows_imported"] == 1 + assert data["summary"]["total_rows_processed"] == 2 + assert data["summary"]["validation_errors_or_warnings"] == 2 + assert len(data["validation_errors"]) == 2 + assert any( + "CSV mp_height (1.5) differs from existing measuring point height (2.0)" + in message + for message in data["validation_errors"] + ) + assert any("Bad Well" in message for message in data["validation_errors"]) + + row = data["water_levels"][0] + with session_ctx() as session: + observation = session.get(Observation, row["observation_id"]) + sample = session.get(Sample, row["sample_id"]) + field_activity = session.get(FieldActivity, row["field_activity_id"]) + field_event = session.get(FieldEvent, row["field_event_id"]) + + if observation: + session.delete(observation) + if sample: + session.delete(sample) + if field_activity: + session.delete(field_activity) + if field_event: + session.delete(field_event) + session.commit() + + # PATCH tests ================================================================== diff --git a/tests/test_water_level_csv_schema.py b/tests/test_water_level_csv_schema.py new file mode 100644 index 00000000..09abed2d --- /dev/null +++ b/tests/test_water_level_csv_schema.py @@ -0,0 +1,179 @@ +from datetime import datetime, timezone + +import pytest +from pydantic import ValidationError + +from schemas.water_level_csv import WaterLevelCsvRow + +DATA_QUALITY_VALUE = "Water level accurate to within two hundreths of a foot" + + +def test_water_level_csv_row_normalizes_source_headers_and_naive_datetimes(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + field_staff_2="", + field_staff_3="", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="electric tape", + mp_height="1.5", + level_status="Water level not affected", + depth_to_water_ft="45.2", + data_quality=DATA_QUALITY_VALUE, + water_level_notes="Initial measurement", + ) + + assert row.field_staff_2 is None + assert row.field_staff_3 is None + assert row.sample_method == "Electric tape measurement (E-probe)" + assert row.field_event_date_time == datetime( + 2025, 2, 15, 15, 0, tzinfo=timezone.utc + ) + assert row.water_level_date_time == datetime( + 2025, 2, 15, 17, 30, tzinfo=timezone.utc + ) + + +def test_water_level_csv_row_accepts_legacy_alias_headers(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00-07:00", + field_staff="Tech 1", + measurement_date_time="2025-02-15T10:30:00-07:00", + sampler="Tech 1", + sample_method="Steel-tape measurement", + mp_height_ft="2.5", + depth_to_water_ft="45.2", + ) + + assert row.measuring_person == "Tech 1" + assert row.sampler == "Tech 1" + assert row.mp_height == 2.5 + assert row.measurement_date_time == datetime( + 2025, 2, 15, 17, 30, tzinfo=timezone.utc + ) + + +def test_water_level_csv_row_normalizes_blank_optional_values_to_none(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + mp_height="", + level_status="Water level not affected", + depth_to_water_ft="", + data_quality="", + water_level_notes="", + ) + + assert row.mp_height is None + assert row.level_status == "Water level not affected" + assert row.depth_to_water_ft is None + assert row.data_quality is None + assert row.water_level_notes is None + + +def test_water_level_csv_row_requires_measuring_person_to_match_field_staff(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + field_staff_2="Tech 2", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 3", + sample_method="Steel-tape measurement", + depth_to_water_ft="45.2", + ) + + assert ( + "measuring_person must match one of field_staff, field_staff_2, " + "or field_staff_3" + ) in str(exc.value) + + +def test_water_level_csv_row_requires_level_status_when_depth_is_blank(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="", + level_status="", + ) + + assert "level_status is required when depth_to_water_ft is blank" in str(exc.value) + + +def test_water_level_csv_row_rejects_water_level_before_field_event(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T10:30:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T08:00:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="45.2", + ) + + assert ( + "water_level_date_time must be greater than or equal to " + "field_event_date_time" + ) in str(exc.value) + + +def test_water_level_csv_row_canonicalizes_case_insensitive_lexicon_values(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="electric tape measurement (e-probe)", + depth_to_water_ft="", + level_status="dry", + data_quality=DATA_QUALITY_VALUE.lower(), + ) + + assert row.sample_method == "Electric tape measurement (E-probe)" + assert row.level_status == "Site was dry" + assert row.data_quality == DATA_QUALITY_VALUE + + +def test_water_level_csv_row_allows_negative_mp_height(): + row = WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + mp_height="-0.1", + depth_to_water_ft="45.2", + ) + + assert row.mp_height == -0.1 + + +def test_water_level_csv_row_rejects_negative_depth_to_water(): + with pytest.raises(ValidationError) as exc: + WaterLevelCsvRow( + well_name_point_id="AR0001", + field_event_date_time="2025-02-15T08:00:00", + field_staff="Tech 1", + water_level_date_time="2025-02-15T10:30:00", + measuring_person="Tech 1", + sample_method="Steel-tape measurement", + depth_to_water_ft="-0.1", + ) + + assert "depth_to_water_ft must be greater than or equal to 0" in str(exc.value) diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py new file mode 100644 index 00000000..64c145bd --- /dev/null +++ b/tests/test_water_level_csv_service.py @@ -0,0 +1,465 @@ +from datetime import date, datetime, timezone +from decimal import Decimal +from types import SimpleNamespace + +from db import FieldActivity, FieldEvent, Observation, Sample, Thing +from db.measuring_point_history import MeasuringPointHistory +from db.engine import session_ctx +from tests import get_parameter_id +from services.water_level_csv import ( + _build_sample_name, + _create_records, + _resolve_measuring_point_height, + _validate_depth_to_water_against_well, + bulk_upload_water_levels, +) +from sqlalchemy import select + + +def _build_well( + *, + well_depth: float | None = None, + measuring_point_height: float | None = None, +) -> Thing: + well = Thing(name="AR0001", thing_type="water well", well_depth=well_depth) + well.measuring_points = [] + if measuring_point_height is not None: + well.measuring_points.append( + MeasuringPointHistory( + start_date=date(2025, 1, 1), + measuring_point_height=measuring_point_height, + ) + ) + return well + + +def test_resolve_measuring_point_height_prefers_csv_value(): + well = _build_well(measuring_point_height=3.5) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, 4.0) + + assert resolved_mp_height == 4.0 + assert existing_mp_height == 3.5 + assert differs is True + + +def test_resolve_measuring_point_height_falls_back_to_well_history(): + well = _build_well(measuring_point_height=3.5) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height == 3.5 + assert existing_mp_height == 3.5 + assert differs is False + + +def test_resolve_measuring_point_height_coerces_decimal_history_value(): + well = _build_well(measuring_point_height=Decimal("3.5")) + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height == 3.5 + assert existing_mp_height == 3.5 + assert differs is False + + +def test_resolve_measuring_point_height_allows_missing_values(): + well = _build_well() + + ( + resolved_mp_height, + existing_mp_height, + differs, + ) = _resolve_measuring_point_height(well, None) + + assert resolved_mp_height is None + assert existing_mp_height is None + assert differs is False + + +def test_validate_depth_to_water_against_well_rejects_depth_past_bottom(): + well = _build_well(well_depth=10.0) + + error = _validate_depth_to_water_against_well(4, well, 12.5, 1.0) + + assert ( + error == "Row 4: depth_to_water_ft minus measuring point height (11.5) " + "must be less than well depth (10.0)" + ) + + +def test_validate_depth_to_water_against_well_skips_when_height_unavailable(): + well = _build_well(well_depth=10.0) + + error = _validate_depth_to_water_against_well(4, well, 12.5, None) + + assert error is None + + +def test_build_sample_name_uses_deterministic_well_inventory_style_format(): + row = SimpleNamespace( + well=SimpleNamespace(name="AR0001"), + measurement_dt=datetime(2025, 2, 15, 10, 30, tzinfo=timezone.utc), + ) + + assert _build_sample_name(row) == "AR0001-WL-202502151030" + + +def test_create_records_reports_savepoint_creation_failure_as_row_error(): + class BrokenSession: + def __init__(self): + self.expire_all_called = False + + def begin_nested(self): + raise RuntimeError("savepoint failed") + + def expire_all(self): + self.expire_all_called = True + + session = BrokenSession() + + created, errors = _create_records( + session, + parameter_id=1, + rows=[SimpleNamespace(row_index=7)], + ) + + assert created == [] + assert errors == ["Row 7: savepoint failed"] + assert session.expire_all_called is True + + +def test_bulk_upload_water_levels_is_idempotent(water_well_thing): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + first = bulk_upload_water_levels(csv_content.encode("utf-8")) + second = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert first.exit_code == 0, first.payload + assert second.exit_code == 0, second.payload + assert ( + first.payload["water_levels"][0]["sample_id"] + == second.payload["water_levels"][0]["sample_id"] + ) + assert ( + first.payload["water_levels"][0]["observation_id"] + == second.payload["water_levels"][0]["observation_id"] + ) + + with session_ctx() as session: + samples = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + ) + ).all() + observations = session.scalars( + select(Observation) + .join(Sample, Observation.sample_id == Sample.id) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + ) + ).all() + + assert len(samples) == 1 + assert len(observations) == 1 + assert samples[0].sample_name == "Test Well-WL-202502151730" + assert samples[0].sample_matrix == "groundwater" + assert observations[0].groundwater_level_reason == "Water level not affected" + assert ( + observations[0].nma_data_quality + == "Water level accurate to within two hundreths of a foot" + ) + assert observations[0].measuring_point_height == 1.5 + + +def test_bulk_upload_water_levels_warns_when_mp_height_differs_from_history( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Measurement with warning", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + assert result.payload["summary"]["total_rows_imported"] == 1 + assert result.payload["summary"]["validation_errors_or_warnings"] == 1 + assert result.payload["validation_errors"] == [ + "Row 1: CSV mp_height (1.5) differs from existing measuring point height " + "(2.0); CSV value will be used" + ] + + +def test_bulk_upload_water_levels_preserves_unrelated_existing_observations( + water_well_thing, +): + groundwater_parameter_id = get_parameter_id("groundwater level", "Field Parameter") + ph_parameter_id = get_parameter_id("pH", "Field Parameter") + + with session_ctx() as session: + well = session.merge(water_well_thing) + field_event = FieldEvent( + thing=well, + event_date=datetime(2025, 2, 15, 15, 0, tzinfo=timezone.utc), + notes="Existing field event", + ) + field_activity = FieldActivity( + field_event=field_event, + activity_type="groundwater level", + notes="Sampler: Original Sampler", + ) + sample = Sample( + field_activity=field_activity, + sample_date=datetime(2025, 2, 15, 17, 30, tzinfo=timezone.utc), + sample_name="Test Well-WL-202502151730", + sample_matrix="groundwater", + sample_method="Electric tape measurement (E-probe)", + qc_type="Normal", + notes="Existing sample", + ) + unrelated_observation = Observation( + sample=sample, + observation_datetime=datetime(2025, 2, 15, 17, 30, tzinfo=timezone.utc), + parameter_id=ph_parameter_id, + value=7.2, + unit="dimensionless", + notes="Keep me as pH", + ) + session.add_all([field_event, field_activity, sample, unrelated_observation]) + session.commit() + unrelated_observation_id = unrelated_observation.id + + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Imported groundwater level", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .join(FieldEvent, FieldActivity.field_event_id == FieldEvent.id) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where( + Thing.id == water_well_thing.id, + FieldActivity.activity_type == "groundwater level", + Sample.sample_name == "Test Well-WL-202502151730", + ) + ).one() + observations = session.scalars( + select(Observation) + .where(Observation.sample_id == sample.id) + .order_by(Observation.id.asc()) + ).all() + + assert len(observations) == 2 + assert observations[0].id == unrelated_observation_id + assert observations[0].parameter_id == ph_parameter_id + assert observations[0].value == 7.2 + assert observations[0].unit == "dimensionless" + assert observations[0].notes == "Keep me as pH" + + groundwater_observations = [ + observation + for observation in observations + if observation.parameter_id == groundwater_parameter_id + ] + assert len(groundwater_observations) == 1 + assert ( + groundwater_observations[0].id + == result.payload["water_levels"][0]["observation_id"] + ) + assert groundwater_observations[0].value == 7.0 + assert groundwater_observations[0].unit == "ft" + assert groundwater_observations[0].notes == "Imported groundwater level" + + +def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ",".join( + [ + "A Lopez", + "Unknown Well", + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Bad row", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0 + assert result.payload["summary"]["total_rows_processed"] == 2 + assert result.payload["summary"]["total_rows_imported"] == 1 + assert result.payload["summary"]["validation_errors_or_warnings"] == 2 + assert len(result.payload["water_levels"]) == 1 + assert len(result.payload["validation_errors"]) == 2 + assert any( + "CSV mp_height (1.5) differs from existing measuring point height (2.0)" + in message + for message in result.payload["validation_errors"] + ) + assert any( + "Unknown well_name_point_id 'Unknown Well'" in message + for message in result.payload["validation_errors"] + ) diff --git a/tests/test_well_transfer.py b/tests/test_well_transfer.py new file mode 100644 index 00000000..373e70b1 --- /dev/null +++ b/tests/test_well_transfer.py @@ -0,0 +1,317 @@ +import threading +from contextlib import contextmanager +from types import SimpleNamespace + +import pandas as pd +import pytest +from sqlalchemy.exc import IntegrityError + +from schemas.thing import CreateWell +from transfers import well_transfer as wt + + +class _FakeSession: + def __init__(self): + self.added = [] + self.expunge_calls = [] + + def add(self, obj): + self.added.append(obj) + + def expunge(self, obj): + self.expunge_calls.append(obj) + + +class _FakeQuery: + def __init__(self, session): + self.session = session + + def filter(self, *_args, **_kwargs): + return self + + def first(self): + return self.session.query_results.pop(0) + + +class _FakeSavepoint: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class _FakeAquiferSession: + def __init__(self, query_results, flush_exc=None): + self.added = [] + self.begin_nested_calls = 0 + self.rollback_calls = 0 + self.query_results = list(query_results) + self.flush_exc = flush_exc + + def add(self, obj): + self.added.append(obj) + + def begin_nested(self): + self.begin_nested_calls += 1 + return _FakeSavepoint() + + def flush(self): + if self.flush_exc is not None: + exc = self.flush_exc + self.flush_exc = None + raise exc + + def query(self, _model): + return _FakeQuery(self) + + def rollback(self): + self.rollback_calls += 1 + + +def test_persist_well_excludes_monitoring_status_from_thing_kwargs( + monkeypatch, +): + captured_kwargs = {} + + class FakeThing: + def __init__(self, **kwargs): + captured_kwargs.update(kwargs) + + monkeypatch.setattr(wt, "Thing", FakeThing) + + transferer = wt.WellTransferer() + session = _FakeSession() + row = SimpleNamespace(PointID="AR0001", WellID=12, LocationId=34) + payload = { + "data": CreateWell( + name="AR0001", + monitoring_status="Not currently monitored", + ), + "well_purposes": [], + "well_casing_materials": [], + } + batch_errors = [] + + well = transferer._persist_well(session, row, payload, batch_errors) + + assert well is session.added[0] + assert "monitoring_status" not in captured_kwargs + assert captured_kwargs["thing_type"] == "water well" + assert captured_kwargs["nma_pk_welldata"] == 12 + assert captured_kwargs["nma_pk_location"] == 34 + assert batch_errors == [] + assert session.expunge_calls == [] + + +def test_add_aquifers_parallel_recovers_from_integrity_error(monkeypatch): + class FakeAquiferSystem: + name = "name" + + def __init__(self, name, primary_aquifer_type, geographic_scale): + self.name = name + self.primary_aquifer_type = primary_aquifer_type + self.geographic_scale = geographic_scale + + class FakeThingAquiferAssociation: + def __init__(self, thing, aquifer_system): + self.thing = thing + self.aquifer_system = aquifer_system + + class FakeAquiferType: + def __init__(self, thing_aquifer_association, aquifer_type): + self.thing_aquifer_association = thing_aquifer_association + self.aquifer_type = aquifer_type + + def fake_map_value(value): + if value.startswith("LU_AquiferClass:"): + return "Test Aquifer" + if value.startswith("LU_AquiferType:"): + return "Confined" + raise KeyError(value) + + existing_aquifer = SimpleNamespace(name="Test Aquifer") + session = _FakeAquiferSession( + query_results=[None, existing_aquifer], + flush_exc=IntegrityError("insert", {}, Exception("duplicate key")), + ) + transferer = wt.WellTransferer() + row = SimpleNamespace(PointID="AR0001", AqClass="AQ", AquiferType="A") + well = SimpleNamespace(name="AR0001") + local_aquifers = [] + + monkeypatch.setattr(wt, "AquiferSystem", FakeAquiferSystem) + monkeypatch.setattr(wt, "ThingAquiferAssociation", FakeThingAquiferAssociation) + monkeypatch.setattr(wt, "AquiferType", FakeAquiferType) + monkeypatch.setattr(wt, "extract_aquifer_type_codes", lambda _value: ["A"]) + monkeypatch.setattr(wt.lexicon_mapper, "map_value", fake_map_value) + + transferer._add_aquifers_parallel( + session, row, well, local_aquifers, threading.Lock() + ) + + associations = [ + obj for obj in session.added if isinstance(obj, FakeThingAquiferAssociation) + ] + + assert session.begin_nested_calls == 1 + assert session.rollback_calls == 0 + assert associations[0].aquifer_system is existing_aquifer + assert local_aquifers == [existing_aquifer] + + +def test_add_aquifers_parallel_reraises_unexpected_flush_errors(monkeypatch): + class FakeAquiferSystem: + name = "name" + + def __init__(self, name, primary_aquifer_type, geographic_scale): + self.name = name + self.primary_aquifer_type = primary_aquifer_type + self.geographic_scale = geographic_scale + + def fake_map_value(value): + if value.startswith("LU_AquiferClass:"): + return "Test Aquifer" + if value.startswith("LU_AquiferType:"): + return "Confined" + raise KeyError(value) + + session = _FakeAquiferSession( + query_results=[None], + flush_exc=RuntimeError("database unavailable"), + ) + transferer = wt.WellTransferer() + row = SimpleNamespace(PointID="AR0001", AqClass="AQ", AquiferType="A") + well = SimpleNamespace(name="AR0001") + + monkeypatch.setattr(wt, "AquiferSystem", FakeAquiferSystem) + monkeypatch.setattr(wt, "extract_aquifer_type_codes", lambda _value: ["A"]) + monkeypatch.setattr(wt.lexicon_mapper, "map_value", fake_map_value) + + with pytest.raises(RuntimeError, match="database unavailable"): + transferer._add_aquifers_parallel(session, row, well, [], threading.Lock()) + + assert session.begin_nested_calls == 1 + assert session.rollback_calls == 0 + + +def test_transfer_parallel_preloads_cached_elevations_before_worker_submission( + monkeypatch, +): + class FakePreloadSession: + def query(self, _model): + return self + + def all(self): + return [] + + def expunge_all(self): + pass + + class FakeFuture: + def result(self): + return {"errors": []} + + class FakeExecutor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def submit(self, fn, idx, batch): + assert transferer._cached_elevations == {"source": "preloaded"} + return FakeFuture() + + @contextmanager + def fake_session_ctx(): + yield FakePreloadSession() + + load_calls = [] + dumped = [] + + def fake_get_cached_elevations(): + load_calls.append("load") + return {"source": "preloaded"} + + def fake_dump_cached_elevations(lut): + dumped.append(lut) + + transferer = wt.WellTransferer() + df = pd.DataFrame([{"PointID": "AR0001"}]) + + monkeypatch.setattr(wt, "session_ctx", fake_session_ctx) + monkeypatch.setattr(wt, "get_cached_elevations", fake_get_cached_elevations) + monkeypatch.setattr(wt, "dump_cached_elevations", fake_dump_cached_elevations) + monkeypatch.setattr(wt, "MeasuringPointEstimator", lambda: object()) + monkeypatch.setattr(wt, "ThreadPoolExecutor", lambda max_workers: FakeExecutor()) + monkeypatch.setattr(wt, "as_completed", lambda futures: list(futures)) + monkeypatch.setattr(transferer, "_get_dfs", lambda: (df, df.copy())) + + transferer.transfer_parallel(num_workers=2) + + assert load_calls == ["load"] + assert dumped == [{"source": "preloaded"}] + + +def test_transfer_parallel_preloads_measuring_point_estimator_before_workers( + monkeypatch, +): + class FakePreloadSession: + def query(self, _model): + return self + + def all(self): + return [] + + def expunge_all(self): + pass + + class FakeFuture: + def result(self): + return {"errors": []} + + class FakeExecutor: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def submit(self, fn, idx, batch): + assert transferer._measuring_point_estimator is estimator + return FakeFuture() + + @contextmanager + def fake_session_ctx(): + yield FakePreloadSession() + + dumped = [] + estimator = object() + build_calls = [] + + def fake_get_cached_elevations(): + return {} + + def fake_dump_cached_elevations(lut): + dumped.append(lut) + + def fake_estimator_ctor(): + build_calls.append("build") + return estimator + + transferer = wt.WellTransferer() + df = pd.DataFrame([{"PointID": "AR0001"}]) + + monkeypatch.setattr(wt, "session_ctx", fake_session_ctx) + monkeypatch.setattr(wt, "get_cached_elevations", fake_get_cached_elevations) + monkeypatch.setattr(wt, "dump_cached_elevations", fake_dump_cached_elevations) + monkeypatch.setattr(wt, "MeasuringPointEstimator", fake_estimator_ctor) + monkeypatch.setattr(wt, "ThreadPoolExecutor", lambda max_workers: FakeExecutor()) + monkeypatch.setattr(wt, "as_completed", lambda futures: list(futures)) + monkeypatch.setattr(transferer, "_get_dfs", lambda: (df, df.copy())) + + transferer.transfer_parallel(num_workers=2) + + assert build_calls == ["build"] + assert dumped == [{}] diff --git a/tests/transfers/test_contact_with_multiple_wells.py b/tests/transfers/test_contact_with_multiple_wells.py index 40b4b26e..60af21b5 100644 --- a/tests/transfers/test_contact_with_multiple_wells.py +++ b/tests/transfers/test_contact_with_multiple_wells.py @@ -93,6 +93,8 @@ def test_owner_comment_absent_skips_notes(): def test_ownerkey_fallback_name_when_name_and_org_missing(water_well_thing): with session_ctx() as sess: thing = sess.get(Thing, water_well_thing.id) + contact_by_owner_type = {} + contact_by_name_org = {} row = SimpleNamespace( FirstName=None, LastName=None, @@ -118,12 +120,18 @@ def test_ownerkey_fallback_name_when_name_and_org_missing(water_well_thing): # Should not raise "Either name or organization must be provided." contact = _add_first_contact( - sess, row=row, thing=thing, organization=None, added=[] + sess, + row=row, + thing=thing, + organization=None, + added=set(), + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) sess.flush() assert contact is not None - assert contact.name == "Fallback OwnerKey Name" + assert contact.name == "Fallback OwnerKey Name-primary" assert contact.organization is None @@ -131,6 +139,8 @@ def test_ownerkey_dedupes_when_fallback_name_differs(water_well_thing): owner_key = f"OwnerKey-{uuid4()}" with session_ctx() as sess: first_thing = sess.get(Thing, water_well_thing.id) + contact_by_owner_type = {} + contact_by_name_org = {} second_thing = Thing( name=f"Second Well {uuid4()}", thing_type="water well", @@ -184,15 +194,27 @@ def test_ownerkey_dedupes_when_fallback_name_differs(water_well_thing): PhysicalZipCode=None, ) - added = [] + added = set() first_contact = _add_first_contact( - sess, row=complete_row, thing=first_thing, organization=None, added=added + sess, + row=complete_row, + thing=first_thing, + organization=None, + added=added, + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) assert first_contact is not None assert first_contact.name == "Casey Owner" second_contact = _add_first_contact( - sess, row=fallback_row, thing=second_thing, organization=None, added=added + sess, + row=fallback_row, + thing=second_thing, + organization=None, + added=added, + contact_by_owner_type=contact_by_owner_type, + contact_by_name_org=contact_by_name_org, ) sess.flush() diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index d8e1c200..e477eb3b 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -25,7 +25,7 @@ import pandas as pd from pandas import isna, notna from pydantic import ValidationError -from sqlalchemy.exc import DatabaseError +from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.orm import Session from core.enums import ( @@ -93,6 +93,7 @@ "is_suitable_for_datalogger", "is_open", "well_status", + "monitoring_status", ] @@ -137,12 +138,24 @@ class WellTransferer(Transferer): def __init__(self, *args, **kw): super().__init__(*args, **kw) - self._cached_elevations = get_cached_elevations() + # Delay external I/O so unit tests can instantiate the transferer + # without requiring GCS credentials or source CSV files. + self._cached_elevations = None self._added_locations = {} self._aquifers = None - self._measuring_point_estimator = MeasuringPointEstimator() + self._measuring_point_estimator = None self._row_by_pointid: dict[str, pd.Series] = {} + def _get_cached_elevations(self) -> dict: + if self._cached_elevations is None: + self._cached_elevations = get_cached_elevations() + return self._cached_elevations + + def _get_measuring_point_estimator(self) -> MeasuringPointEstimator: + if self._measuring_point_estimator is None: + self._measuring_point_estimator = MeasuringPointEstimator() + return self._measuring_point_estimator + def transfer_parallel(self, num_workers: int = None) -> None: """ Transfer wells using parallel processing for improved performance. @@ -170,6 +183,11 @@ def transfer_parallel(self, num_workers: int = None) -> None: logger.info("No wells to transfer") return + # Pre-load shared cached elevations on the main thread so workers + # mutate a single cache instance instead of racing lazy initialization. + self._get_cached_elevations() + self._get_measuring_point_estimator() + # Calculate batch size batch_size = max(100, n // num_workers) batches = [df.iloc[i : i + batch_size] for i in range(0, n, batch_size)] @@ -299,7 +317,7 @@ def process_batch(batch_idx: int, batch_df: pd.DataFrame) -> dict: logger.info(f"Parallel transfer complete: {n} wells, {len(all_errors)} errors") # Dump cached elevations (minimal after-processing) - dump_cached_elevations(self._cached_elevations) + dump_cached_elevations(self._get_cached_elevations()) def _get_dfs(self): """Load and clean WellData/Location dataframes.""" @@ -657,7 +675,7 @@ def _persist_location(self, session: Session, row, batch_errors: list): """Create a Location from the legacy row.""" try: location, elevation_method, location_notes = make_location( - row, self._cached_elevations + row, self._get_cached_elevations() ) session.add(location) return location, elevation_method, location_notes @@ -744,7 +762,11 @@ def _add_histories(self, session: Session, row, well: Thing) -> None: ) ) else: - mphs = self._measuring_point_estimator.estimate_measuring_point_height(row) + mphs = ( + self._get_measuring_point_estimator().estimate_measuring_point_height( + row + ) + ) added_measuring_point = False for mph, mph_desc, start_date, end_date in zip(*mphs): session.add( @@ -885,12 +907,9 @@ def _step_parallel_complete( # Aquifers if notna(row.AquiferType): - try: - self._add_aquifers_parallel( - session, row, well, local_aquifers, aquifers_lock - ) - except Exception as e: - logger.warning(f"Error adding aquifer for {well.name}: {e}") + self._add_aquifers_parallel( + session, row, well, local_aquifers, aquifers_lock + ) # Formation zone formation_code = row.FormationZone if hasattr(row, "FormationZone") else None @@ -989,13 +1008,14 @@ def _add_aquifers_parallel(self, session, row, well, local_aquifers, aquifers_lo if not aquifer: try: - aquifer = AquiferSystem( - name=aquifer_name, - primary_aquifer_type=primary_type, - geographic_scale=None, - ) - session.add(aquifer) - session.flush() + with session.begin_nested(): + aquifer = AquiferSystem( + name=aquifer_name, + primary_aquifer_type=primary_type, + geographic_scale=None, + ) + session.add(aquifer) + session.flush() logger.info(f"Created aquifer: {aquifer_name}") # Update local cache under lock @@ -1006,14 +1026,23 @@ def _add_aquifers_parallel(self, session, row, well, local_aquifers, aquifers_lo ) if not existing: local_aquifers.append(aquifer) - except Exception as e: + except IntegrityError: # Race condition - another thread created it - session.rollback() aquifer = ( session.query(AquiferSystem) .filter(AquiferSystem.name == aquifer_name) .first() ) + if aquifer: + with aquifers_lock: + existing = next( + (a for a in local_aquifers if a.name == aquifer_name), + None, + ) + if not existing: + local_aquifers.append(aquifer) + else: + raise if aquifer: aquifer_assoc = ThingAquiferAssociation(thing=well, aquifer_system=aquifer)