Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ TRANSFER_PARALLEL=1
TRANSFER_WELL_SCREENS=True
TRANSFER_SENSORS=True
TRANSFER_CONTACTS=True
TRANSFER_PERMISSIONS=True
TRANSFER_WATERLEVELS=True
TRANSFER_WATERLEVELS_PRESSURE=True
TRANSFER_WATERLEVELS_ACOUSTIC=True
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
PYGEOAPI_POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: ocotilloapi_test
DB_DRIVER: postgres
Expand Down Expand Up @@ -98,6 +99,7 @@ jobs:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
PYGEOAPI_POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: ocotilloapi_test
DB_DRIVER: postgres
Expand Down
41 changes: 41 additions & 0 deletions transfers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,44 @@ Avoid ORM-heavy per-row object construction for bulk workloads.

- Logs: `transfers/logs/`
- Metrics: `transfers/metrics/`

## Transfer Auditing CLI

Use the transfer-auditing CLI to compare each source CSV against the current destination Postgres table.

### Run

```bash
source .venv/bin/activate
set -a; source .env; set +a
oco transfer-results
```

### Useful options

```bash
oco transfer-results --sample-limit 5
oco transfer-results --summary-path transfers/metrics/transfer_results_summary.md
```

- `--sample-limit`: limits sampled key details retained internally per transfer result.
- `--summary-path`: path to the markdown report.

If `oco` is not on your PATH, use:

```bash
python -m cli.cli transfer-results --sample-limit 5
```

### Output

Default report file:

- `transfers/metrics/transfer_results_summary.md`

Summary columns:

- `Source Rows`: raw row count in the source CSV.
- `Agreed Rows`: rows considered in-scope by transfer rules/toggles.
- `Dest Rows`: current row count in destination table/model.
- `Missing Agreed`: `Agreed Rows - Dest Rows` (positive means destination is short vs agreed source rows).
16 changes: 13 additions & 3 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class TransferOptions:
transfer_screens: bool
transfer_sensors: bool
transfer_contacts: bool
transfer_permissions: bool
transfer_waterlevels: bool
transfer_pressure: bool
transfer_acoustic: bool
Expand Down Expand Up @@ -147,6 +148,7 @@ def load_transfer_options() -> TransferOptions:
transfer_screens=get_bool_env("TRANSFER_WELL_SCREENS", True),
transfer_sensors=get_bool_env("TRANSFER_SENSORS", True),
transfer_contacts=get_bool_env("TRANSFER_CONTACTS", True),
transfer_permissions=get_bool_env("TRANSFER_PERMISSIONS", True),
transfer_waterlevels=get_bool_env("TRANSFER_WATERLEVELS", True),
transfer_pressure=get_bool_env("TRANSFER_WATERLEVELS_PRESSURE", True),
transfer_acoustic=get_bool_env("TRANSFER_WATERLEVELS_ACOUSTIC", True),
Expand Down Expand Up @@ -570,9 +572,6 @@ def _transfer_parallel(
)
futures[future] = "StratigraphyNew"

future = executor.submit(_execute_permissions_with_timing, "Permissions")
futures[future] = "Permissions"

# Collect results
for future in as_completed(futures):
name = futures[future]
Expand Down Expand Up @@ -632,6 +631,17 @@ def _transfer_parallel(
if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]:
metrics.weather_photos_metrics(*results_map["WeatherPhotos"])

if opts.transfer_permissions:
# Permissions require contact associations; run after group 1 completes.
try:
result_name, result, elapsed = _execute_permissions_with_timing(
"Permissions"
)
results_map[result_name] = result
logger.info(f"Task {result_name} completed in {elapsed:.2f}s")
except Exception as e:
logger.critical(f"Task Permissions failed: {e}")
Comment on lines +636 to +643
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequential permissions transfer failure log only includes str(e) and drops the stack trace, while other parallel-transfer error paths log traceback.format_exc(). Logging the full traceback here would make diagnosing transfer failures significantly easier.

Copilot uses AI. Check for mistakes.

if opts.transfer_major_chemistry:
message("TRANSFERRING MAJOR CHEMISTRY")
results = _execute_transfer(MajorChemistryTransferer, flags=flags)
Expand Down
147 changes: 146 additions & 1 deletion transfers/transfer_results_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
from sqlalchemy import select, func

from db import Deployment, Sensor, Thing
from db import Deployment, PermissionHistory, Sensor, Thing, ThingContactAssociation
from db.engine import session_ctx
from transfers.sensor_transfer import (
EQUIPMENT_TO_SENSOR_TYPE_MAP,
Expand Down Expand Up @@ -165,6 +165,76 @@ def _equipment_destination_series(session) -> pd.Series:
return pointid + "|" + serial + "|" + installed + "|" + removed


def _permissions_source_series(session) -> pd.Series:
wdf = read_csv("WellData", dtype={"OSEWelltagID": str})
wdf = replace_nans(wdf)
if "PointID" not in wdf.columns:
return pd.Series([], dtype=object)

eligible_rows = (
session.query(Thing.name)
.join(ThingContactAssociation, ThingContactAssociation.thing_id == Thing.id)
.filter(Thing.thing_type == "water well")
.filter(Thing.name.is_not(None))
.distinct()
.all()
)
eligible_pointids = {name for (name,) in eligible_rows if name}
if not eligible_pointids:
return pd.Series([], dtype=object)

rows: list[str] = []
for row in wdf.itertuples(index=False):
pointid = getattr(row, "PointID", None)
if pointid not in eligible_pointids:
continue

sample_ok = getattr(row, "SampleOK", None)
if sample_ok is not None:
rows.append(
f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Parse permission flags instead of casting with bool()

Casting CSV values with bool(sample_ok) treats any non-empty string as True, so values like "False" or "0" will be interpreted as allowed permissions when these columns are string-typed. That produces incorrect source keys and therefore incorrect missing/extra counts in transfer-results output for those datasets.

Useful? React with 👍 / 👎.

)

Comment on lines +186 to +197
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_permissions_source_series appends permission rows for every matching WellData record, which can inflate counts and introduce duplicate keys if WellData contains duplicate PointID rows. The actual permissions transfer de-duplicates by well (visited) and effectively uses the first matching WellData value per well. Consider de-duping WellData by PointID (stable) before iterating, or aggregating per PointID to mirror transfer behavior.

Copilot uses AI. Check for mistakes.
monitor_ok = getattr(row, "MonitorOK", None)
if monitor_ok is not None:
rows.append(
f"{_normalize_key(pointid)}|Water Level Sample|{bool(monitor_ok)}"
Comment on lines +182 to +201
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _permissions_source_series, membership is checked against eligible_pointids using the raw PointID value from the CSV, but the produced keys are normalized/lowercased via _normalize_key. This can incorrectly drop eligible wells when casing/whitespace differs between CSV and DB values. Normalize both sides (e.g., build eligible_pointids as normalized keys and compare with _normalize_key(pointid)).

Suggested change
eligible_pointids = {name for (name,) in eligible_rows if name}
if not eligible_pointids:
return pd.Series([], dtype=object)
rows: list[str] = []
for row in wdf.itertuples(index=False):
pointid = getattr(row, "PointID", None)
if pointid not in eligible_pointids:
continue
sample_ok = getattr(row, "SampleOK", None)
if sample_ok is not None:
rows.append(
f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}"
)
monitor_ok = getattr(row, "MonitorOK", None)
if monitor_ok is not None:
rows.append(
f"{_normalize_key(pointid)}|Water Level Sample|{bool(monitor_ok)}"
eligible_pointids = {
_normalize_key(name) for (name,) in eligible_rows if _normalize_key(name)
}
if not eligible_pointids:
return pd.Series([], dtype=object)
rows: list[str] = []
for row in wdf.itertuples(index=False):
pointid = getattr(row, "PointID", None)
normalized_pointid = _normalize_key(pointid)
if not normalized_pointid or normalized_pointid not in eligible_pointids:
continue
sample_ok = getattr(row, "SampleOK", None)
if sample_ok is not None:
rows.append(
f"{normalized_pointid}|Water Chemistry Sample|{bool(sample_ok)}"
)
monitor_ok = getattr(row, "MonitorOK", None)
if monitor_ok is not None:
rows.append(
f"{normalized_pointid}|Water Level Sample|{bool(monitor_ok)}"

Copilot uses AI. Check for mistakes.
)

if not rows:
return pd.Series([], dtype=object)
return pd.Series(rows, dtype=object)


def _permissions_destination_series(session) -> pd.Series:
sql = (
select(
Thing.name.label("point_id"),
PermissionHistory.permission_type.label("permission_type"),
PermissionHistory.permission_allowed.label("permission_allowed"),
)
.select_from(PermissionHistory)
.join(Thing, Thing.id == PermissionHistory.target_id)
.where(PermissionHistory.target_table == "thing")
.where(
PermissionHistory.permission_type.in_(
("Water Chemistry Sample", "Water Level Sample")
)
)
Comment on lines +218 to +223
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Scope permission destination query to water wells

The permissions source side is explicitly limited to Thing.thing_type == "water well", but the destination query only filters by target_table and permission_type. If the database has PermissionHistory rows for non-well things using the same permission types, this audit will report false extras and inflate destination counts, so source and destination are no longer compared over the same population.

Useful? React with 👍 / 👎.

Comment on lines +210 to +223
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_permissions_destination_series pulls all PermissionHistory rows for the permission types, including historical/ended records. This can create false matches (a well may match a historical value even if the current active permission differs) and inflate extra counts over time. Consider restricting to the active/current records (e.g., end_date IS NULL) and, if multiples exist, selecting the latest start_date per (target_id, permission_type) to match how the API surfaces permissions.

Suggested change
sql = (
select(
Thing.name.label("point_id"),
PermissionHistory.permission_type.label("permission_type"),
PermissionHistory.permission_allowed.label("permission_allowed"),
)
.select_from(PermissionHistory)
.join(Thing, Thing.id == PermissionHistory.target_id)
.where(PermissionHistory.target_table == "thing")
.where(
PermissionHistory.permission_type.in_(
("Water Chemistry Sample", "Water Level Sample")
)
)
# Select only active permission records (end_date IS NULL) and, for each
# (target_id, permission_type), keep the record with the latest start_date.
latest_active_subq = (
select(
PermissionHistory.target_id.label("target_id"),
PermissionHistory.permission_type.label("permission_type"),
func.max(PermissionHistory.start_date).label("max_start_date"),
)
.where(PermissionHistory.target_table == "thing")
.where(
PermissionHistory.permission_type.in_(
("Water Chemistry Sample", "Water Level Sample")
)
)
.where(PermissionHistory.end_date.is_(None))
.group_by(
PermissionHistory.target_id,
PermissionHistory.permission_type,
)
.subquery()
)
sql = (
select(
Thing.name.label("point_id"),
PermissionHistory.permission_type.label("permission_type"),
PermissionHistory.permission_allowed.label("permission_allowed"),
)
.select_from(PermissionHistory)
.join(
latest_active_subq,
(
(PermissionHistory.target_id == latest_active_subq.c.target_id)
& (
PermissionHistory.permission_type
== latest_active_subq.c.permission_type
)
& (
PermissionHistory.start_date
== latest_active_subq.c.max_start_date
)
),
)
.join(Thing, Thing.id == PermissionHistory.target_id)

Copilot uses AI. Check for mistakes.
.where(Thing.name.is_not(None))
)
rows = session.execute(sql).all()
if not rows:
return pd.Series([], dtype=object)
return pd.Series(
[
f"{_normalize_key(r.point_id)}|{r.permission_type}|{bool(r.permission_allowed)}"
for r in rows
],
dtype=object,
)


class TransferResultsBuilder:
"""Compare transfer input CSV keys to destination database keys per transfer."""

Expand All @@ -183,6 +253,9 @@ def build(self) -> TransferComparisonResults:
)

def _build_one(self, spec: TransferComparisonSpec) -> TransferResult:
if spec.transfer_name == "Permissions":
return self._build_permissions(spec)

source_df = read_csv(spec.source_csv)
if spec.source_filter:
source_df = spec.source_filter(source_df)
Expand Down Expand Up @@ -277,6 +350,78 @@ def _build_one(self, spec: TransferComparisonSpec) -> TransferResult:
extra_in_destination_sample=extra[: self.sample_limit],
)

def _build_permissions(self, spec: TransferComparisonSpec) -> TransferResult:
source_df = read_csv(spec.source_csv, dtype={"OSEWelltagID": str})
source_row_count = len(source_df)
enabled = self._is_enabled(spec)

with session_ctx() as session:
source_series = (
_permissions_source_series(session)
if enabled
else pd.Series([], dtype=object)
)
source_keys = set(source_series.unique().tolist())
source_keyed_row_count = int(source_series.shape[0])
source_duplicate_key_row_count = source_keyed_row_count - len(source_keys)
agreed_transfer_row_count = source_keyed_row_count

Comment on lines +358 to +368
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_permissions reads WellData into source_df to count rows, but _permissions_source_series re-reads WellData internally. This duplicates I/O and parsing work for large CSVs. Consider passing the already-loaded source_df (or a pre-cleaned version) into _permissions_source_series so the file is only read once.

Suggested change
with session_ctx() as session:
source_series = (
_permissions_source_series(session)
if enabled
else pd.Series([], dtype=object)
)
source_keys = set(source_series.unique().tolist())
source_keyed_row_count = int(source_series.shape[0])
source_duplicate_key_row_count = source_keyed_row_count - len(source_keys)
agreed_transfer_row_count = source_keyed_row_count
if enabled:
# Derive the source key series directly from the already-loaded CSV
if spec.source_key_column in source_df.columns:
source_series = source_df[spec.source_key_column].astype(str)
else:
source_series = pd.Series([], dtype=object)
else:
source_series = pd.Series([], dtype=object)
source_keys = set(source_series.unique().tolist())
source_keyed_row_count = int(source_series.shape[0])
source_duplicate_key_row_count = source_keyed_row_count - len(source_keys)
agreed_transfer_row_count = source_keyed_row_count
with session_ctx() as session:

Copilot uses AI. Check for mistakes.
destination_series = _permissions_destination_series(session)
destination_row_count = int(
session.execute(
select(func.count())
.select_from(PermissionHistory)
.where(PermissionHistory.target_table == "thing")
.where(
PermissionHistory.permission_type.in_(
("Water Chemistry Sample", "Water Level Sample")
)
)
).scalar_one()
)
Comment on lines +370 to +381
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destination_row_count for permissions is computed without the same constraints used to build destination_series (inner join to Thing, Thing.name non-null). This can make the reported destination row count inconsistent with the key series (e.g., orphaned PermissionHistory rows or unnamed Things). Align the count query with _permissions_destination_series filters (and any additional filters like end_date is NULL).

Suggested change
destination_row_count = int(
session.execute(
select(func.count())
.select_from(PermissionHistory)
.where(PermissionHistory.target_table == "thing")
.where(
PermissionHistory.permission_type.in_(
("Water Chemistry Sample", "Water Level Sample")
)
)
).scalar_one()
)
destination_row_count = int(destination_series.shape[0])

Copilot uses AI. Check for mistakes.

if destination_series.empty:
destination_series = pd.Series([], dtype=object)
else:
destination_series = destination_series.astype(str)

destination_keys = set(destination_series.unique().tolist())
destination_keyed_row_count = int(destination_series.shape[0])
destination_duplicate_key_row_count = destination_keyed_row_count - len(
destination_keys
)
missing = sorted(source_keys - destination_keys)
extra = sorted(destination_keys - source_keys)
transferred_agreed_row_count = int(source_series.isin(destination_keys).sum())
missing_agreed_row_count = max(
agreed_transfer_row_count - transferred_agreed_row_count,
0,
)

return spec.result_cls(
transfer_name=spec.transfer_name,
source_csv=spec.source_csv,
source_key_column=spec.source_key_column,
destination_model="PermissionHistory",
destination_key_column=spec.destination_key_column,
source_row_count=source_row_count,
agreed_transfer_row_count=agreed_transfer_row_count,
source_keyed_row_count=source_keyed_row_count,
source_key_count=len(source_keys),
source_duplicate_key_row_count=source_duplicate_key_row_count,
destination_row_count=destination_row_count,
destination_keyed_row_count=destination_keyed_row_count,
destination_key_count=len(destination_keys),
destination_duplicate_key_row_count=destination_duplicate_key_row_count,
matched_key_count=len(source_keys & destination_keys),
missing_in_destination_count=len(missing),
extra_in_destination_count=len(extra),
transferred_agreed_row_count=transferred_agreed_row_count,
missing_agreed_row_count=missing_agreed_row_count,
missing_in_destination_sample=missing[: self.sample_limit],
extra_in_destination_sample=extra[: self.sample_limit],
)

def _is_enabled(self, spec: TransferComparisonSpec) -> bool:
if not spec.option_field:
return True
Expand Down
11 changes: 11 additions & 0 deletions transfers/transfer_results_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
NMA_view_NGWMN_WaterLevels,
NMA_view_NGWMN_WellConstruction,
Observation,
PermissionHistory,
Sensor,
Thing,
WellScreen,
Expand Down Expand Up @@ -58,6 +59,7 @@
OtherSiteTypesTransferResult,
OutfallWastewaterReturnFlowTransferResult,
OwnersDataTransferResult,
PermissionsTransferResult,
PerennialStreamsTransferResult,
PressureDailyTransferResult,
ProjectsTransferResult,
Expand Down Expand Up @@ -516,6 +518,15 @@ def _record_new_contact(
destination_where=lambda m: m.nma_pk_owners.is_not(None),
option_field="transfer_contacts",
),
TransferComparisonSpec(
"Permissions",
PermissionsTransferResult,
"WellData",
"PointID|PermissionType|PermissionAllowed",
PermissionHistory,
"thing.name|permission_type|permission_allowed",
option_field="transfer_permissions",
),
TransferComparisonSpec(
"WaterLevels",
WaterLevelsTransferResult,
Expand Down
1 change: 1 addition & 0 deletions transfers/transfer_results_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TransferComparisonResults:
"WellData",
"WellScreens",
"OwnersData",
"Permissions",
"WaterLevels",
"Equipment",
"Projects",
Expand Down