Skip to content

Commit e4d94ae

Browse files
authored
Merge pull request #621 from DataIntegrationGroup/kas-water-level-import
kas-water-level-import
2 parents 78e899d + 256af68 commit e4d94ae

12 files changed

Lines changed: 1830 additions & 315 deletions

cli/cli.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,9 +666,16 @@ def water_levels_bulk_upload(
666666
payload = result.payload if isinstance(result.payload, dict) else {}
667667
summary = payload.get("summary", {})
668668
validation_errors = payload.get("validation_errors", [])
669+
rows_with_issues = summary.get("validation_errors_or_warnings", 0)
669670

670-
if result.exit_code == 0:
671+
if result.exit_code == 0 and not rows_with_issues:
671672
typer.secho("[WATER LEVEL IMPORT] SUCCESS", fg=colors["ok"], bold=True)
673+
elif result.exit_code == 0:
674+
typer.secho(
675+
"[WATER LEVEL IMPORT] COMPLETED WITH ISSUES",
676+
fg=colors["issue"],
677+
bold=True,
678+
)
672679
else:
673680
typer.secho(
674681
"[WATER LEVEL IMPORT] COMPLETED WITH ISSUES",
@@ -709,7 +716,6 @@ def water_levels_bulk_upload(
709716
if summary:
710717
processed = summary.get("total_rows_processed", 0)
711718
imported = summary.get("total_rows_imported", 0)
712-
rows_with_issues = summary.get("validation_errors_or_warnings", 0)
713719
typer.secho("SUMMARY", fg=colors["accent"], bold=True)
714720
label_width = 16
715721
value_width = 8

schemas/water_level_csv.py

Lines changed: 215 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,219 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
# ===============================================================================
16-
from pydantic import BaseModel
16+
from __future__ import annotations
17+
18+
from datetime import datetime, timezone
19+
from typing import Annotated
20+
21+
from core.enums import DataQuality, GroundwaterLevelReason, SampleMethod
22+
from pydantic import (
23+
AliasChoices,
24+
BaseModel,
25+
ConfigDict,
26+
Field,
27+
field_validator,
28+
model_validator,
29+
)
30+
from pydantic.functional_validators import BeforeValidator
31+
32+
from services.util import convert_dt_tz_naive_to_tz_aware
33+
34+
WATER_LEVEL_REQUIRED_FIELDS = [
35+
"well_name_point_id",
36+
"field_event_date_time",
37+
"field_staff",
38+
"water_level_date_time",
39+
"measuring_person",
40+
"sample_method",
41+
]
42+
43+
WATER_LEVEL_HEADER_ALIASES = {
44+
"measurement_date_time": "water_level_date_time",
45+
"sampler": "measuring_person",
46+
"mp_height_ft": "mp_height",
47+
}
48+
49+
WATER_LEVEL_IGNORED_FIELDS = {
50+
"hold(not saved)",
51+
"cut(not saved)",
52+
}
53+
54+
SAMPLE_METHOD_ALIASES = {
55+
"electric tape": "Electric tape measurement (E-probe)",
56+
"steel tape": "Steel-tape measurement",
57+
}
58+
SAMPLE_METHOD_CANONICAL = {
59+
value.lower(): value for value in SAMPLE_METHOD_ALIASES.values()
60+
}
61+
GROUNDWATER_LEVEL_REASON_ALIASES = {
62+
"dry": "Site was dry",
63+
"obstructed": ("Obstruction was encountered in the well (no level recorded)"),
64+
"obstruction": ("Obstruction was encountered in the well (no level recorded)"),
65+
"flowing": (
66+
"Site was flowing. Water level or head couldn't be measured "
67+
"w/out additional equipment."
68+
),
69+
"flowing recently": "Site was flowing recently.",
70+
"pumped": "Site was being pumped",
71+
"pumped recently": "Site was pumped recently",
72+
"not affected": "Water level not affected",
73+
"other": "Other conditions exist that would affect the level (remarks)",
74+
}
75+
76+
77+
def empty_str_to_none(value):
78+
if isinstance(value, str) and value.strip() == "":
79+
return None
80+
return value
81+
82+
83+
OptionalText = Annotated[str | None, BeforeValidator(empty_str_to_none)]
84+
OptionalFloat = Annotated[float | None, BeforeValidator(empty_str_to_none)]
85+
86+
87+
def _normalize_datetime_to_utc(value: datetime | str) -> datetime:
88+
if isinstance(value, str):
89+
value = datetime.fromisoformat(value)
90+
elif not isinstance(value, datetime):
91+
raise ValueError("value must be a datetime or ISO format string")
92+
93+
if value.tzinfo is None:
94+
value = convert_dt_tz_naive_to_tz_aware(value, "America/Denver")
95+
96+
return value.astimezone(timezone.utc)
97+
98+
99+
def _canonicalize_enum_value(
100+
value: str | None, enum_cls, field_name: str
101+
) -> str | None:
102+
if value is None:
103+
return None
104+
105+
normalized = value.strip().lower()
106+
for item in enum_cls:
107+
if item.value.lower() == normalized:
108+
return item.value
109+
110+
raise ValueError(f"Unknown {field_name}: {value}")
111+
112+
113+
class WaterLevelCsvRow(BaseModel):
114+
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)
115+
116+
well_name_point_id: str
117+
field_event_date_time: datetime
118+
field_staff: str
119+
field_staff_2: OptionalText = None
120+
field_staff_3: OptionalText = None
121+
water_level_date_time: datetime = Field(
122+
validation_alias=AliasChoices(
123+
"water_level_date_time",
124+
"measurement_date_time",
125+
)
126+
)
127+
measuring_person: str = Field(
128+
validation_alias=AliasChoices("measuring_person", "sampler")
129+
)
130+
sample_method: str
131+
mp_height: OptionalFloat = Field(
132+
default=None,
133+
validation_alias=AliasChoices("mp_height", "mp_height_ft"),
134+
)
135+
level_status: OptionalText = None
136+
depth_to_water_ft: OptionalFloat = None
137+
data_quality: OptionalText = None
138+
water_level_notes: OptionalText = None
139+
140+
@property
141+
def measurement_date_time(self) -> datetime:
142+
return self.water_level_date_time
143+
144+
@property
145+
def sampler(self) -> str:
146+
return self.measuring_person
147+
148+
@classmethod
149+
def required_fields(cls) -> list[str]:
150+
return list(WATER_LEVEL_REQUIRED_FIELDS)
151+
152+
@classmethod
153+
def header_aliases(cls) -> dict[str, str]:
154+
return dict(WATER_LEVEL_HEADER_ALIASES)
155+
156+
@classmethod
157+
def ignored_fields(cls) -> set[str]:
158+
return set(WATER_LEVEL_IGNORED_FIELDS)
159+
160+
@staticmethod
161+
def canonicalize_sample_method(value: str) -> str:
162+
normalized = value.strip().lower()
163+
if normalized in SAMPLE_METHOD_ALIASES:
164+
return SAMPLE_METHOD_ALIASES[normalized]
165+
if normalized in SAMPLE_METHOD_CANONICAL:
166+
return SAMPLE_METHOD_CANONICAL[normalized]
167+
return value.strip()
168+
169+
@field_validator("sample_method")
170+
@classmethod
171+
def normalize_sample_method(cls, value: str) -> str:
172+
return _canonicalize_enum_value(
173+
cls.canonicalize_sample_method(value),
174+
SampleMethod,
175+
"sample_method",
176+
)
177+
178+
@field_validator(
179+
"field_event_date_time",
180+
"water_level_date_time",
181+
mode="before",
182+
)
183+
@classmethod
184+
def normalize_datetime_field(cls, value: datetime | str) -> datetime:
185+
return _normalize_datetime_to_utc(value)
186+
187+
@field_validator("depth_to_water_ft")
188+
@classmethod
189+
def validate_non_negative_depth_to_water(cls, value: float | None) -> float | None:
190+
if value is not None and value < 0:
191+
raise ValueError("depth_to_water_ft must be greater than or equal to 0")
192+
return value
193+
194+
@field_validator("level_status")
195+
@classmethod
196+
def normalize_level_status(cls, value: str | None) -> str | None:
197+
if value is not None:
198+
value = GROUNDWATER_LEVEL_REASON_ALIASES.get(value.strip().lower(), value)
199+
return _canonicalize_enum_value(value, GroundwaterLevelReason, "level_status")
200+
201+
@field_validator("data_quality")
202+
@classmethod
203+
def normalize_data_quality(cls, value: str | None) -> str | None:
204+
return _canonicalize_enum_value(value, DataQuality, "data_quality")
205+
206+
@model_validator(mode="after")
207+
def validate_row_constraints(self) -> WaterLevelCsvRow:
208+
field_staff = [
209+
staff
210+
for staff in (self.field_staff, self.field_staff_2, self.field_staff_3)
211+
if staff
212+
]
213+
if self.measuring_person not in field_staff:
214+
raise ValueError(
215+
"measuring_person must match one of field_staff, "
216+
"field_staff_2, or field_staff_3"
217+
)
218+
219+
if self.water_level_date_time < self.field_event_date_time:
220+
raise ValueError(
221+
"water_level_date_time must be greater than or equal to "
222+
"field_event_date_time"
223+
)
224+
225+
if self.depth_to_water_ft is None and self.level_status is None:
226+
raise ValueError("level_status is required when depth_to_water_ft is blank")
227+
228+
return self
17229

18230

19231
class WaterLevelBulkUploadSummary(BaseModel):
@@ -29,8 +241,8 @@ class WaterLevelBulkUploadRow(BaseModel):
29241
sample_id: int
30242
observation_id: int
31243
measurement_date_time: str
32-
level_status: str
33-
data_quality: str
244+
level_status: str | None
245+
data_quality: str | None
34246

35247

36248
class WaterLevelBulkUploadResponse(BaseModel):

0 commit comments

Comments
 (0)