Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
53fee18
feat(water-level-import): normalize standalone CSV schema
ksmuczynski Mar 26, 2026
eccf31f
feat(water-level-import): enhance validation and resolve mp_height
ksmuczynski Mar 26, 2026
6a09881
feat(water-level-import): add idempotent groundwater persistence
ksmuczynski Mar 26, 2026
2f2f923
feat(water-level-import): add best-effort row savepoints
ksmuczynski Mar 27, 2026
4c3eea0
feat(water-level-import): align partial-success API and CLI behavior
ksmuczynski Mar 27, 2026
83bc1d9
fix(well-transfer): exclude monitoring_status from Thing creation
ksmuczynski Mar 27, 2026
fa257cd
fix(water-level-import): harden real-file import cleanup
ksmuczynski Mar 27, 2026
44102db
test(bdd): align water-level CSV feature coverage with current import…
ksmuczynski Mar 27, 2026
0785cac
Formatting changes
ksmuczynski Mar 28, 2026
452fe0c
test(water-level-import): ensure unrelated observations are preserved…
ksmuczynski Mar 30, 2026
9741d41
fix(water-level-import): skip persistence when no valid rows exist
ksmuczynski Mar 30, 2026
e0d7e45
fix(water-level-import): handle savepoint initialization failure grac…
ksmuczynski Mar 30, 2026
8490550
fix(well-transfer): defer WellTransferer external I/O until needed
ksmuczynski Mar 30, 2026
81a016c
fix(well-transfer): improve aquifer persistence with nested savepoint…
ksmuczynski Mar 31, 2026
623206e
fix(water-level-import): refine handling of alias and canonical heade…
ksmuczynski Mar 31, 2026
fbdc18b
fix(well-transfer): preload shared elevation cache before parallel wo…
ksmuczynski Mar 31, 2026
a3ce92c
Update services/water_level_csv.py
ksmuczynski Mar 31, 2026
91aaaf4
fix(well-transfer): preload measuring point estimator before parallel…
ksmuczynski Mar 31, 2026
a2584c9
feat(water-level-csv): warn when uploaded mp height differs from well…
ksmuczynski Mar 31, 2026
6ccdc7c
Merge remote-tracking branch 'origin/kas-water-level-import' into kas…
ksmuczynski Mar 31, 2026
256af68
test(well-transfer): isolate parallel preload tests from external est…
ksmuczynski Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,16 @@ def water_levels_bulk_upload(
payload = result.payload if isinstance(result.payload, dict) else {}
summary = payload.get("summary", {})
validation_errors = payload.get("validation_errors", [])
rows_with_issues = summary.get("validation_errors_or_warnings", 0)

if result.exit_code == 0:
if result.exit_code == 0 and not rows_with_issues:
typer.secho("[WATER LEVEL IMPORT] SUCCESS", fg=colors["ok"], bold=True)
elif result.exit_code == 0:
typer.secho(
"[WATER LEVEL IMPORT] COMPLETED WITH ISSUES",
fg=colors["issue"],
bold=True,
)
else:
typer.secho(
"[WATER LEVEL IMPORT] COMPLETED WITH ISSUES",
Expand Down Expand Up @@ -709,7 +716,6 @@ def water_levels_bulk_upload(
if summary:
processed = summary.get("total_rows_processed", 0)
imported = summary.get("total_rows_imported", 0)
rows_with_issues = summary.get("validation_errors_or_warnings", 0)
typer.secho("SUMMARY", fg=colors["accent"], bold=True)
label_width = 16
value_width = 8
Expand Down
218 changes: 215 additions & 3 deletions schemas/water_level_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,219 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
from pydantic import BaseModel
from __future__ import annotations

from datetime import datetime, timezone
from typing import Annotated

from core.enums import DataQuality, GroundwaterLevelReason, SampleMethod
from pydantic import (
AliasChoices,
BaseModel,
ConfigDict,
Field,
field_validator,
model_validator,
)
from pydantic.functional_validators import BeforeValidator

from services.util import convert_dt_tz_naive_to_tz_aware

WATER_LEVEL_REQUIRED_FIELDS = [
"well_name_point_id",
"field_event_date_time",
"field_staff",
"water_level_date_time",
"measuring_person",
"sample_method",
]

WATER_LEVEL_HEADER_ALIASES = {
"measurement_date_time": "water_level_date_time",
"sampler": "measuring_person",
"mp_height_ft": "mp_height",
}

WATER_LEVEL_IGNORED_FIELDS = {
"hold(not saved)",
"cut(not saved)",
}

SAMPLE_METHOD_ALIASES = {
"electric tape": "Electric tape measurement (E-probe)",
"steel tape": "Steel-tape measurement",
}
SAMPLE_METHOD_CANONICAL = {
value.lower(): value for value in SAMPLE_METHOD_ALIASES.values()
}
GROUNDWATER_LEVEL_REASON_ALIASES = {
"dry": "Site was dry",
"obstructed": ("Obstruction was encountered in the well (no level recorded)"),
"obstruction": ("Obstruction was encountered in the well (no level recorded)"),
"flowing": (
"Site was flowing. Water level or head couldn't be measured "
"w/out additional equipment."
),
"flowing recently": "Site was flowing recently.",
"pumped": "Site was being pumped",
"pumped recently": "Site was pumped recently",
"not affected": "Water level not affected",
"other": "Other conditions exist that would affect the level (remarks)",
}


def empty_str_to_none(value):
if isinstance(value, str) and value.strip() == "":
return None
return value


OptionalText = Annotated[str | None, BeforeValidator(empty_str_to_none)]
OptionalFloat = Annotated[float | None, BeforeValidator(empty_str_to_none)]


def _normalize_datetime_to_utc(value: datetime | str) -> datetime:
if isinstance(value, str):
value = datetime.fromisoformat(value)
elif not isinstance(value, datetime):
raise ValueError("value must be a datetime or ISO format string")

if value.tzinfo is None:
value = convert_dt_tz_naive_to_tz_aware(value, "America/Denver")

return value.astimezone(timezone.utc)


def _canonicalize_enum_value(
value: str | None, enum_cls, field_name: str
) -> str | None:
if value is None:
return None

normalized = value.strip().lower()
for item in enum_cls:
if item.value.lower() == normalized:
return item.value

raise ValueError(f"Unknown {field_name}: {value}")


class WaterLevelCsvRow(BaseModel):
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)

well_name_point_id: str
field_event_date_time: datetime
field_staff: str
field_staff_2: OptionalText = None
field_staff_3: OptionalText = None
water_level_date_time: datetime = Field(
validation_alias=AliasChoices(
"water_level_date_time",
"measurement_date_time",
)
)
measuring_person: str = Field(
validation_alias=AliasChoices("measuring_person", "sampler")
)
sample_method: str
mp_height: OptionalFloat = Field(
default=None,
validation_alias=AliasChoices("mp_height", "mp_height_ft"),
)
level_status: OptionalText = None
depth_to_water_ft: OptionalFloat = None
data_quality: OptionalText = None
water_level_notes: OptionalText = None

@property
def measurement_date_time(self) -> datetime:
return self.water_level_date_time

@property
def sampler(self) -> str:
return self.measuring_person

@classmethod
def required_fields(cls) -> list[str]:
return list(WATER_LEVEL_REQUIRED_FIELDS)

@classmethod
def header_aliases(cls) -> dict[str, str]:
return dict(WATER_LEVEL_HEADER_ALIASES)

@classmethod
def ignored_fields(cls) -> set[str]:
return set(WATER_LEVEL_IGNORED_FIELDS)

@staticmethod
def canonicalize_sample_method(value: str) -> str:
normalized = value.strip().lower()
if normalized in SAMPLE_METHOD_ALIASES:
return SAMPLE_METHOD_ALIASES[normalized]
if normalized in SAMPLE_METHOD_CANONICAL:
return SAMPLE_METHOD_CANONICAL[normalized]
return value.strip()

@field_validator("sample_method")
@classmethod
def normalize_sample_method(cls, value: str) -> str:
return _canonicalize_enum_value(
cls.canonicalize_sample_method(value),
SampleMethod,
"sample_method",
)

@field_validator(
"field_event_date_time",
"water_level_date_time",
mode="before",
)
@classmethod
def normalize_datetime_field(cls, value: datetime | str) -> datetime:
return _normalize_datetime_to_utc(value)

@field_validator("depth_to_water_ft")
@classmethod
def validate_non_negative_depth_to_water(cls, value: float | None) -> float | None:
if value is not None and value < 0:
raise ValueError("depth_to_water_ft must be greater than or equal to 0")
return value

@field_validator("level_status")
@classmethod
def normalize_level_status(cls, value: str | None) -> str | None:
if value is not None:
value = GROUNDWATER_LEVEL_REASON_ALIASES.get(value.strip().lower(), value)
return _canonicalize_enum_value(value, GroundwaterLevelReason, "level_status")

@field_validator("data_quality")
@classmethod
def normalize_data_quality(cls, value: str | None) -> str | None:
return _canonicalize_enum_value(value, DataQuality, "data_quality")

@model_validator(mode="after")
def validate_row_constraints(self) -> WaterLevelCsvRow:
field_staff = [
staff
for staff in (self.field_staff, self.field_staff_2, self.field_staff_3)
if staff
]
if self.measuring_person not in field_staff:
raise ValueError(
"measuring_person must match one of field_staff, "
"field_staff_2, or field_staff_3"
)

if self.water_level_date_time < self.field_event_date_time:
raise ValueError(
"water_level_date_time must be greater than or equal to "
"field_event_date_time"
)

if self.depth_to_water_ft is None and self.level_status is None:
raise ValueError("level_status is required when depth_to_water_ft is blank")

return self


class WaterLevelBulkUploadSummary(BaseModel):
Expand All @@ -29,8 +241,8 @@ class WaterLevelBulkUploadRow(BaseModel):
sample_id: int
observation_id: int
measurement_date_time: str
level_status: str
data_quality: str
level_status: str | None
data_quality: str | None


class WaterLevelBulkUploadResponse(BaseModel):
Expand Down
Loading
Loading