diff --git a/.env.example b/.env.example index dffd3dfd..d8a7547d 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,7 @@ TRANSFER_PARALLEL=1 TRANSFER_WELL_SCREENS=True TRANSFER_SENSORS=True TRANSFER_CONTACTS=True +TRANSFER_PERMISSIONS=True TRANSFER_WATERLEVELS=True TRANSFER_WATERLEVELS_PRESSURE=True TRANSFER_WATERLEVELS_ACOUSTIC=True diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a17335c8..2743428d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,6 +19,7 @@ jobs: POSTGRES_HOST: localhost POSTGRES_PORT: 5432 POSTGRES_USER: postgres + PYGEOAPI_POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: ocotilloapi_test DB_DRIVER: postgres @@ -98,6 +99,7 @@ jobs: POSTGRES_HOST: localhost POSTGRES_PORT: 5432 POSTGRES_USER: postgres + PYGEOAPI_POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: ocotilloapi_test DB_DRIVER: postgres diff --git a/transfers/README.md b/transfers/README.md index 48a5743a..08e03234 100644 --- a/transfers/README.md +++ b/transfers/README.md @@ -25,3 +25,44 @@ Avoid ORM-heavy per-row object construction for bulk workloads. - Logs: `transfers/logs/` - Metrics: `transfers/metrics/` + +## Transfer Auditing CLI + +Use the transfer-auditing CLI to compare each source CSV against the current destination Postgres table. + +### Run + +```bash +source .venv/bin/activate +set -a; source .env; set +a +oco transfer-results +``` + +### Useful options + +```bash +oco transfer-results --sample-limit 5 +oco transfer-results --summary-path transfers/metrics/transfer_results_summary.md +``` + +- `--sample-limit`: limits sampled key details retained internally per transfer result. +- `--summary-path`: path to the markdown report. + +If `oco` is not on your PATH, use: + +```bash +python -m cli.cli transfer-results --sample-limit 5 +``` + +### Output + +Default report file: + +- `transfers/metrics/transfer_results_summary.md` + +Summary columns: + +- `Source Rows`: raw row count in the source CSV. +- `Agreed Rows`: rows considered in-scope by transfer rules/toggles. +- `Dest Rows`: current row count in destination table/model. +- `Missing Agreed`: `Agreed Rows - Dest Rows` (positive means destination is short vs agreed source rows). diff --git a/transfers/transfer.py b/transfers/transfer.py index 83b8df3b..ff37d4af 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -106,6 +106,7 @@ class TransferOptions: transfer_screens: bool transfer_sensors: bool transfer_contacts: bool + transfer_permissions: bool transfer_waterlevels: bool transfer_pressure: bool transfer_acoustic: bool @@ -147,6 +148,7 @@ def load_transfer_options() -> TransferOptions: transfer_screens=get_bool_env("TRANSFER_WELL_SCREENS", True), transfer_sensors=get_bool_env("TRANSFER_SENSORS", True), transfer_contacts=get_bool_env("TRANSFER_CONTACTS", True), + transfer_permissions=get_bool_env("TRANSFER_PERMISSIONS", True), transfer_waterlevels=get_bool_env("TRANSFER_WATERLEVELS", True), transfer_pressure=get_bool_env("TRANSFER_WATERLEVELS_PRESSURE", True), transfer_acoustic=get_bool_env("TRANSFER_WATERLEVELS_ACOUSTIC", True), @@ -570,9 +572,6 @@ def _transfer_parallel( ) futures[future] = "StratigraphyNew" - future = executor.submit(_execute_permissions_with_timing, "Permissions") - futures[future] = "Permissions" - # Collect results for future in as_completed(futures): name = futures[future] @@ -632,6 +631,17 @@ def _transfer_parallel( if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) + if opts.transfer_permissions: + # Permissions require contact associations; run after group 1 completes. + try: + result_name, result, elapsed = _execute_permissions_with_timing( + "Permissions" + ) + results_map[result_name] = result + logger.info(f"Task {result_name} completed in {elapsed:.2f}s") + except Exception as e: + logger.critical(f"Task Permissions failed: {e}") + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) diff --git a/transfers/transfer_results_builder.py b/transfers/transfer_results_builder.py index 296529cd..42e7c49b 100644 --- a/transfers/transfer_results_builder.py +++ b/transfers/transfer_results_builder.py @@ -7,7 +7,7 @@ import pandas as pd from sqlalchemy import select, func -from db import Deployment, Sensor, Thing +from db import Deployment, PermissionHistory, Sensor, Thing, ThingContactAssociation from db.engine import session_ctx from transfers.sensor_transfer import ( EQUIPMENT_TO_SENSOR_TYPE_MAP, @@ -165,6 +165,76 @@ def _equipment_destination_series(session) -> pd.Series: return pointid + "|" + serial + "|" + installed + "|" + removed +def _permissions_source_series(session) -> pd.Series: + wdf = read_csv("WellData", dtype={"OSEWelltagID": str}) + wdf = replace_nans(wdf) + if "PointID" not in wdf.columns: + return pd.Series([], dtype=object) + + eligible_rows = ( + session.query(Thing.name) + .join(ThingContactAssociation, ThingContactAssociation.thing_id == Thing.id) + .filter(Thing.thing_type == "water well") + .filter(Thing.name.is_not(None)) + .distinct() + .all() + ) + eligible_pointids = {name for (name,) in eligible_rows if name} + if not eligible_pointids: + return pd.Series([], dtype=object) + + rows: list[str] = [] + for row in wdf.itertuples(index=False): + pointid = getattr(row, "PointID", None) + if pointid not in eligible_pointids: + continue + + sample_ok = getattr(row, "SampleOK", None) + if sample_ok is not None: + rows.append( + f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}" + ) + + monitor_ok = getattr(row, "MonitorOK", None) + if monitor_ok is not None: + rows.append( + f"{_normalize_key(pointid)}|Water Level Sample|{bool(monitor_ok)}" + ) + + if not rows: + return pd.Series([], dtype=object) + return pd.Series(rows, dtype=object) + + +def _permissions_destination_series(session) -> pd.Series: + sql = ( + select( + Thing.name.label("point_id"), + PermissionHistory.permission_type.label("permission_type"), + PermissionHistory.permission_allowed.label("permission_allowed"), + ) + .select_from(PermissionHistory) + .join(Thing, Thing.id == PermissionHistory.target_id) + .where(PermissionHistory.target_table == "thing") + .where( + PermissionHistory.permission_type.in_( + ("Water Chemistry Sample", "Water Level Sample") + ) + ) + .where(Thing.name.is_not(None)) + ) + rows = session.execute(sql).all() + if not rows: + return pd.Series([], dtype=object) + return pd.Series( + [ + f"{_normalize_key(r.point_id)}|{r.permission_type}|{bool(r.permission_allowed)}" + for r in rows + ], + dtype=object, + ) + + class TransferResultsBuilder: """Compare transfer input CSV keys to destination database keys per transfer.""" @@ -183,6 +253,9 @@ def build(self) -> TransferComparisonResults: ) def _build_one(self, spec: TransferComparisonSpec) -> TransferResult: + if spec.transfer_name == "Permissions": + return self._build_permissions(spec) + source_df = read_csv(spec.source_csv) if spec.source_filter: source_df = spec.source_filter(source_df) @@ -277,6 +350,78 @@ def _build_one(self, spec: TransferComparisonSpec) -> TransferResult: extra_in_destination_sample=extra[: self.sample_limit], ) + def _build_permissions(self, spec: TransferComparisonSpec) -> TransferResult: + source_df = read_csv(spec.source_csv, dtype={"OSEWelltagID": str}) + source_row_count = len(source_df) + enabled = self._is_enabled(spec) + + with session_ctx() as session: + source_series = ( + _permissions_source_series(session) + if enabled + else pd.Series([], dtype=object) + ) + source_keys = set(source_series.unique().tolist()) + source_keyed_row_count = int(source_series.shape[0]) + source_duplicate_key_row_count = source_keyed_row_count - len(source_keys) + agreed_transfer_row_count = source_keyed_row_count + + destination_series = _permissions_destination_series(session) + destination_row_count = int( + session.execute( + select(func.count()) + .select_from(PermissionHistory) + .where(PermissionHistory.target_table == "thing") + .where( + PermissionHistory.permission_type.in_( + ("Water Chemistry Sample", "Water Level Sample") + ) + ) + ).scalar_one() + ) + + if destination_series.empty: + destination_series = pd.Series([], dtype=object) + else: + destination_series = destination_series.astype(str) + + destination_keys = set(destination_series.unique().tolist()) + destination_keyed_row_count = int(destination_series.shape[0]) + destination_duplicate_key_row_count = destination_keyed_row_count - len( + destination_keys + ) + missing = sorted(source_keys - destination_keys) + extra = sorted(destination_keys - source_keys) + transferred_agreed_row_count = int(source_series.isin(destination_keys).sum()) + missing_agreed_row_count = max( + agreed_transfer_row_count - transferred_agreed_row_count, + 0, + ) + + return spec.result_cls( + transfer_name=spec.transfer_name, + source_csv=spec.source_csv, + source_key_column=spec.source_key_column, + destination_model="PermissionHistory", + destination_key_column=spec.destination_key_column, + source_row_count=source_row_count, + agreed_transfer_row_count=agreed_transfer_row_count, + source_keyed_row_count=source_keyed_row_count, + source_key_count=len(source_keys), + source_duplicate_key_row_count=source_duplicate_key_row_count, + destination_row_count=destination_row_count, + destination_keyed_row_count=destination_keyed_row_count, + destination_key_count=len(destination_keys), + destination_duplicate_key_row_count=destination_duplicate_key_row_count, + matched_key_count=len(source_keys & destination_keys), + missing_in_destination_count=len(missing), + extra_in_destination_count=len(extra), + transferred_agreed_row_count=transferred_agreed_row_count, + missing_agreed_row_count=missing_agreed_row_count, + missing_in_destination_sample=missing[: self.sample_limit], + extra_in_destination_sample=extra[: self.sample_limit], + ) + def _is_enabled(self, spec: TransferComparisonSpec) -> bool: if not spec.option_field: return True diff --git a/transfers/transfer_results_specs.py b/transfers/transfer_results_specs.py index c117e7b3..5a23f40b 100644 --- a/transfers/transfer_results_specs.py +++ b/transfers/transfer_results_specs.py @@ -28,6 +28,7 @@ NMA_view_NGWMN_WaterLevels, NMA_view_NGWMN_WellConstruction, Observation, + PermissionHistory, Sensor, Thing, WellScreen, @@ -58,6 +59,7 @@ OtherSiteTypesTransferResult, OutfallWastewaterReturnFlowTransferResult, OwnersDataTransferResult, + PermissionsTransferResult, PerennialStreamsTransferResult, PressureDailyTransferResult, ProjectsTransferResult, @@ -516,6 +518,15 @@ def _record_new_contact( destination_where=lambda m: m.nma_pk_owners.is_not(None), option_field="transfer_contacts", ), + TransferComparisonSpec( + "Permissions", + PermissionsTransferResult, + "WellData", + "PointID|PermissionType|PermissionAllowed", + PermissionHistory, + "thing.name|permission_type|permission_allowed", + option_field="transfer_permissions", + ), TransferComparisonSpec( "WaterLevels", WaterLevelsTransferResult, diff --git a/transfers/transfer_results_types.py b/transfers/transfer_results_types.py index 1163a2c7..5759b7c9 100644 --- a/transfers/transfer_results_types.py +++ b/transfers/transfer_results_types.py @@ -38,6 +38,7 @@ class TransferComparisonResults: "WellData", "WellScreens", "OwnersData", + "Permissions", "WaterLevels", "Equipment", "Projects",