Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7a51c38596
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| .where(PermissionHistory.target_table == "thing") | ||
| .where( | ||
| PermissionHistory.permission_type.in_( | ||
| ("Water Chemistry Sample", "Water Level Sample") | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Scope permission destination query to water wells
The permissions source side is explicitly limited to Thing.thing_type == "water well", but the destination query only filters by target_table and permission_type. If the database has PermissionHistory rows for non-well things using the same permission types, this audit will report false extras and inflate destination counts, so source and destination are no longer compared over the same population.
Useful? React with 👍 / 👎.
| sample_ok = getattr(row, "SampleOK", None) | ||
| if sample_ok is not None: | ||
| rows.append( | ||
| f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}" |
There was a problem hiding this comment.
Parse permission flags instead of casting with bool()
Casting CSV values with bool(sample_ok) treats any non-empty string as True, so values like "False" or "0" will be interpreted as allowed permissions when these columns are string-typed. That produces incorrect source keys and therefore incorrect missing/extra counts in transfer-results output for those datasets.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
This PR adds support for auditing and orchestrating a “Permissions” transfer, including a new transfer toggle and updated transfer-results reporting.
Changes:
- Adds a new
Permissionsentry to the transfer-results auditing specs/types and implements custom comparison logic for PermissionHistory. - Updates the transfer orchestration to run permissions after parallel group 1 (after contacts are available) and adds an env toggle
TRANSFER_PERMISSIONS. - Updates docs and CI environment configuration.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
transfers/transfer_results_types.py |
Registers Permissions as a transfer-results type so it appears in auditing output. |
transfers/transfer_results_specs.py |
Adds Permissions to TRANSFER_COMPARISON_SPECS and imports PermissionHistory + result type. |
transfers/transfer_results_builder.py |
Implements permissions-specific source/destination key building and result computation. |
transfers/transfer.py |
Adds transfer_permissions option/env toggle and runs permissions after group 1 completes. |
transfers/README.md |
Documents the transfer auditing CLI usage and output. |
.github/workflows/tests.yml |
Adds PYGEOAPI_POSTGRES_USER to CI env. |
.env.example |
Adds TRANSFER_PERMISSIONS toggle. |
| try: | ||
| result_name, result, elapsed = _execute_permissions_with_timing( | ||
| "Permissions" | ||
| ) | ||
| results_map[result_name] = result | ||
| logger.info(f"Task {result_name} completed in {elapsed:.2f}s") | ||
| except Exception as e: | ||
| logger.critical(f"Task Permissions failed: {e}") |
There was a problem hiding this comment.
The sequential permissions transfer failure log only includes str(e) and drops the stack trace, while other parallel-transfer error paths log traceback.format_exc(). Logging the full traceback here would make diagnosing transfer failures significantly easier.
| eligible_pointids = {name for (name,) in eligible_rows if name} | ||
| if not eligible_pointids: | ||
| return pd.Series([], dtype=object) | ||
|
|
||
| rows: list[str] = [] | ||
| for row in wdf.itertuples(index=False): | ||
| pointid = getattr(row, "PointID", None) | ||
| if pointid not in eligible_pointids: | ||
| continue | ||
|
|
||
| sample_ok = getattr(row, "SampleOK", None) | ||
| if sample_ok is not None: | ||
| rows.append( | ||
| f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}" | ||
| ) | ||
|
|
||
| monitor_ok = getattr(row, "MonitorOK", None) | ||
| if monitor_ok is not None: | ||
| rows.append( | ||
| f"{_normalize_key(pointid)}|Water Level Sample|{bool(monitor_ok)}" |
There was a problem hiding this comment.
In _permissions_source_series, membership is checked against eligible_pointids using the raw PointID value from the CSV, but the produced keys are normalized/lowercased via _normalize_key. This can incorrectly drop eligible wells when casing/whitespace differs between CSV and DB values. Normalize both sides (e.g., build eligible_pointids as normalized keys and compare with _normalize_key(pointid)).
| eligible_pointids = {name for (name,) in eligible_rows if name} | |
| if not eligible_pointids: | |
| return pd.Series([], dtype=object) | |
| rows: list[str] = [] | |
| for row in wdf.itertuples(index=False): | |
| pointid = getattr(row, "PointID", None) | |
| if pointid not in eligible_pointids: | |
| continue | |
| sample_ok = getattr(row, "SampleOK", None) | |
| if sample_ok is not None: | |
| rows.append( | |
| f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}" | |
| ) | |
| monitor_ok = getattr(row, "MonitorOK", None) | |
| if monitor_ok is not None: | |
| rows.append( | |
| f"{_normalize_key(pointid)}|Water Level Sample|{bool(monitor_ok)}" | |
| eligible_pointids = { | |
| _normalize_key(name) for (name,) in eligible_rows if _normalize_key(name) | |
| } | |
| if not eligible_pointids: | |
| return pd.Series([], dtype=object) | |
| rows: list[str] = [] | |
| for row in wdf.itertuples(index=False): | |
| pointid = getattr(row, "PointID", None) | |
| normalized_pointid = _normalize_key(pointid) | |
| if not normalized_pointid or normalized_pointid not in eligible_pointids: | |
| continue | |
| sample_ok = getattr(row, "SampleOK", None) | |
| if sample_ok is not None: | |
| rows.append( | |
| f"{normalized_pointid}|Water Chemistry Sample|{bool(sample_ok)}" | |
| ) | |
| monitor_ok = getattr(row, "MonitorOK", None) | |
| if monitor_ok is not None: | |
| rows.append( | |
| f"{normalized_pointid}|Water Level Sample|{bool(monitor_ok)}" |
| rows: list[str] = [] | ||
| for row in wdf.itertuples(index=False): | ||
| pointid = getattr(row, "PointID", None) | ||
| if pointid not in eligible_pointids: | ||
| continue | ||
|
|
||
| sample_ok = getattr(row, "SampleOK", None) | ||
| if sample_ok is not None: | ||
| rows.append( | ||
| f"{_normalize_key(pointid)}|Water Chemistry Sample|{bool(sample_ok)}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
_permissions_source_series appends permission rows for every matching WellData record, which can inflate counts and introduce duplicate keys if WellData contains duplicate PointID rows. The actual permissions transfer de-duplicates by well (visited) and effectively uses the first matching WellData value per well. Consider de-duping WellData by PointID (stable) before iterating, or aggregating per PointID to mirror transfer behavior.
| with session_ctx() as session: | ||
| source_series = ( | ||
| _permissions_source_series(session) | ||
| if enabled | ||
| else pd.Series([], dtype=object) | ||
| ) | ||
| source_keys = set(source_series.unique().tolist()) | ||
| source_keyed_row_count = int(source_series.shape[0]) | ||
| source_duplicate_key_row_count = source_keyed_row_count - len(source_keys) | ||
| agreed_transfer_row_count = source_keyed_row_count | ||
|
|
There was a problem hiding this comment.
_build_permissions reads WellData into source_df to count rows, but _permissions_source_series re-reads WellData internally. This duplicates I/O and parsing work for large CSVs. Consider passing the already-loaded source_df (or a pre-cleaned version) into _permissions_source_series so the file is only read once.
| with session_ctx() as session: | |
| source_series = ( | |
| _permissions_source_series(session) | |
| if enabled | |
| else pd.Series([], dtype=object) | |
| ) | |
| source_keys = set(source_series.unique().tolist()) | |
| source_keyed_row_count = int(source_series.shape[0]) | |
| source_duplicate_key_row_count = source_keyed_row_count - len(source_keys) | |
| agreed_transfer_row_count = source_keyed_row_count | |
| if enabled: | |
| # Derive the source key series directly from the already-loaded CSV | |
| if spec.source_key_column in source_df.columns: | |
| source_series = source_df[spec.source_key_column].astype(str) | |
| else: | |
| source_series = pd.Series([], dtype=object) | |
| else: | |
| source_series = pd.Series([], dtype=object) | |
| source_keys = set(source_series.unique().tolist()) | |
| source_keyed_row_count = int(source_series.shape[0]) | |
| source_duplicate_key_row_count = source_keyed_row_count - len(source_keys) | |
| agreed_transfer_row_count = source_keyed_row_count | |
| with session_ctx() as session: |
| destination_row_count = int( | ||
| session.execute( | ||
| select(func.count()) | ||
| .select_from(PermissionHistory) | ||
| .where(PermissionHistory.target_table == "thing") | ||
| .where( | ||
| PermissionHistory.permission_type.in_( | ||
| ("Water Chemistry Sample", "Water Level Sample") | ||
| ) | ||
| ) | ||
| ).scalar_one() | ||
| ) |
There was a problem hiding this comment.
destination_row_count for permissions is computed without the same constraints used to build destination_series (inner join to Thing, Thing.name non-null). This can make the reported destination row count inconsistent with the key series (e.g., orphaned PermissionHistory rows or unnamed Things). Align the count query with _permissions_destination_series filters (and any additional filters like end_date is NULL).
| destination_row_count = int( | |
| session.execute( | |
| select(func.count()) | |
| .select_from(PermissionHistory) | |
| .where(PermissionHistory.target_table == "thing") | |
| .where( | |
| PermissionHistory.permission_type.in_( | |
| ("Water Chemistry Sample", "Water Level Sample") | |
| ) | |
| ) | |
| ).scalar_one() | |
| ) | |
| destination_row_count = int(destination_series.shape[0]) |
| sql = ( | ||
| select( | ||
| Thing.name.label("point_id"), | ||
| PermissionHistory.permission_type.label("permission_type"), | ||
| PermissionHistory.permission_allowed.label("permission_allowed"), | ||
| ) | ||
| .select_from(PermissionHistory) | ||
| .join(Thing, Thing.id == PermissionHistory.target_id) | ||
| .where(PermissionHistory.target_table == "thing") | ||
| .where( | ||
| PermissionHistory.permission_type.in_( | ||
| ("Water Chemistry Sample", "Water Level Sample") | ||
| ) | ||
| ) |
There was a problem hiding this comment.
_permissions_destination_series pulls all PermissionHistory rows for the permission types, including historical/ended records. This can create false matches (a well may match a historical value even if the current active permission differs) and inflate extra counts over time. Consider restricting to the active/current records (e.g., end_date IS NULL) and, if multiples exist, selecting the latest start_date per (target_id, permission_type) to match how the API surfaces permissions.
| sql = ( | |
| select( | |
| Thing.name.label("point_id"), | |
| PermissionHistory.permission_type.label("permission_type"), | |
| PermissionHistory.permission_allowed.label("permission_allowed"), | |
| ) | |
| .select_from(PermissionHistory) | |
| .join(Thing, Thing.id == PermissionHistory.target_id) | |
| .where(PermissionHistory.target_table == "thing") | |
| .where( | |
| PermissionHistory.permission_type.in_( | |
| ("Water Chemistry Sample", "Water Level Sample") | |
| ) | |
| ) | |
| # Select only active permission records (end_date IS NULL) and, for each | |
| # (target_id, permission_type), keep the record with the latest start_date. | |
| latest_active_subq = ( | |
| select( | |
| PermissionHistory.target_id.label("target_id"), | |
| PermissionHistory.permission_type.label("permission_type"), | |
| func.max(PermissionHistory.start_date).label("max_start_date"), | |
| ) | |
| .where(PermissionHistory.target_table == "thing") | |
| .where( | |
| PermissionHistory.permission_type.in_( | |
| ("Water Chemistry Sample", "Water Level Sample") | |
| ) | |
| ) | |
| .where(PermissionHistory.end_date.is_(None)) | |
| .group_by( | |
| PermissionHistory.target_id, | |
| PermissionHistory.permission_type, | |
| ) | |
| .subquery() | |
| ) | |
| sql = ( | |
| select( | |
| Thing.name.label("point_id"), | |
| PermissionHistory.permission_type.label("permission_type"), | |
| PermissionHistory.permission_allowed.label("permission_allowed"), | |
| ) | |
| .select_from(PermissionHistory) | |
| .join( | |
| latest_active_subq, | |
| ( | |
| (PermissionHistory.target_id == latest_active_subq.c.target_id) | |
| & ( | |
| PermissionHistory.permission_type | |
| == latest_active_subq.c.permission_type | |
| ) | |
| & ( | |
| PermissionHistory.start_date | |
| == latest_active_subq.c.max_start_date | |
| ) | |
| ), | |
| ) | |
| .join(Thing, Thing.id == PermissionHistory.target_id) |
Summary
This PR fixes permissions transfer ordering and adds first-class permissions coverage to transfer options and transfer validation reporting.
Problem
Permissions were silently missing because:
Permissionstransfer ran in parallel withContacts, but permissions require contact associations to exist first.transfer-resultsdid not include any permissions comparison spec, so missingPermissionHistoryrows were not reported.Changes
Sequencing fix:
New transfer option:
TransferOptions.transfer_permissions.TRANSFER_PERMISSIONS(defaultTrue).TRANSFER_PERMISSIONS=Trueto.env.example.Transfer-results permissions validation:
PermissionsTransferResulttype.Permissionsentry toTRANSFER_COMPARISON_SPECSwithoption_field="transfer_permissions".WellData(SampleOK,MonitorOK) for wells that have contact associations,PermissionHistory,pointid|permission_type|permission_allowed,TRANSFER_PERMISSIONS=false.Why this works
Permissions transfer now executes only after its prerequisites are present.
Validation now explicitly checks permissions, so zero/missing permissions are surfaced in
oco transfer-resultsoutput instead of being invisible.Files changed
transfers/transfer.py.env.exampletransfers/transfer_results_types.pytransfers/transfer_results_specs.pytransfers/transfer_results_builder.pyValidation
python -m py_compileon updated transfer-results modules andtransfer.pypassed.pytestcould not be fully run in sandbox due to blocked DB connection tolocalhost:5432.