1818import csv
1919import io
2020import json
21+ import re
2122import uuid
2223from dataclasses import dataclass
2324from datetime import datetime
2425from pathlib import Path
2526from typing import Any , BinaryIO , Iterable , List
2627
28+ from db import Thing , FieldEvent , FieldActivity , Sample , Observation , Parameter
29+ from db .engine import session_ctx
2730from pydantic import BaseModel , ConfigDict , ValidationError , field_validator
2831from sqlalchemy import select
2932from sqlalchemy .orm import Session
3033
31- from db import Thing , FieldEvent , FieldActivity , Sample , Observation , Parameter
32- from db .engine import session_ctx
33-
3434# Required CSV columns for the bulk upload
3535REQUIRED_FIELDS : List [str ] = [
3636 "field_staff" ,
4545 "data_quality" ,
4646]
4747
48+ HEADER_ALIASES : dict [str , str ] = {
49+ "measuring_person" : "sampler" ,
50+ "water_level_date_time" : "measurement_date_time" ,
51+ }
52+
4853# Allow-list values for validation. These represent early MVP lexicon values.
4954VALID_LEVEL_STATUSES = {"stable" , "rising" , "falling" }
5055VALID_DATA_QUALITIES = {"approved" , "provisional" }
@@ -173,7 +178,7 @@ def bulk_upload_water_levels(
173178 headers , csv_rows = _read_csv (source_file )
174179 except FileNotFoundError :
175180 msg = f"File not found: { source_file } "
176- payload = _build_payload ([], [], 0 , 0 , [msg ])
181+ payload = _build_payload ([], [], 0 , 0 , 1 , errors = [msg ])
177182 stdout = _serialize_payload (payload , pretty_json )
178183 return BulkUploadResult (exit_code = 1 , stdout = stdout , stderr = msg , payload = payload )
179184
@@ -205,7 +210,7 @@ def bulk_upload_water_levels(
205210 summary = {
206211 "total_rows_processed" : len (csv_rows ),
207212 "total_rows_imported" : len (created_rows ) if not validation_errors else 0 ,
208- "validation_errors_or_warnings" : len (validation_errors ),
213+ "validation_errors_or_warnings" : _count_rows_with_issues (validation_errors ),
209214 }
210215 payload = _build_payload (
211216 csv_rows , created_rows , ** summary , errors = validation_errors
@@ -222,6 +227,22 @@ def _serialize_payload(payload: dict[str, Any], pretty: bool) -> str:
222227 return json .dumps (payload , indent = 2 if pretty else None )
223228
224229
230+ def _count_rows_with_issues (errors : list [str ]) -> int :
231+ """
232+ Count unique row numbers represented in validation errors.
233+ Falls back to total error count when row numbers are unavailable.
234+ """
235+ row_ids : set [int ] = set ()
236+ for err in errors :
237+ match = re .match (r"^Row\s+(\d+):" , str (err ))
238+ if match :
239+ row_ids .add (int (match .group (1 )))
240+
241+ if row_ids :
242+ return len (row_ids )
243+ return len (errors )
244+
245+
225246def _build_payload (
226247 csv_rows : Iterable [dict [str , Any ]],
227248 created_rows : list [dict [str , Any ]],
@@ -261,14 +282,23 @@ def _read_csv(
261282
262283 stream = io .StringIO (text )
263284 reader = csv .DictReader (stream )
264- rows = [
265- {
266- k .strip (): (v .strip () if isinstance (v , str ) else v or "" )
267- for k , v in row .items ()
268- }
269- for row in reader
285+ rows : list [dict [str , str ]] = []
286+ for row in reader :
287+ normalized_row : dict [str , str ] = {}
288+ for k , v in row .items ():
289+ if k is None :
290+ continue
291+ key = HEADER_ALIASES .get (k .strip (), k .strip ())
292+ value = v .strip () if isinstance (v , str ) else v or ""
293+ # If both alias and canonical header are present, preserve first non-empty value.
294+ if key in normalized_row and normalized_row [key ] and not value :
295+ continue
296+ normalized_row [key ] = value
297+ rows .append (normalized_row )
298+
299+ headers = [
300+ HEADER_ALIASES .get (h .strip (), h .strip ()) for h in (reader .fieldnames or [])
270301 ]
271- headers = [h .strip () for h in reader .fieldnames or []]
272302 return headers , rows
273303
274304
0 commit comments