-
Notifications
You must be signed in to change notification settings - Fork 4
codex/ogc-actively-monitored #614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,12 +16,27 @@ | |
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
| DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_actively_monitored_wells" | ||
| DROP_MATVIEW_SQL = "DROP MATERIALIZED VIEW IF EXISTS " "ogc_actively_monitored_wells" | ||
| DROP_MATVIEW_SQL = "".join( | ||
| [ | ||
| "DROP MATERIALIZED VIEW IF EXISTS ", | ||
| "ogc_actively_monitored_wells", | ||
| ] | ||
| ) | ||
|
|
||
|
|
||
| def _create_actively_monitored_wells_view() -> str: | ||
| return """ | ||
| CREATE VIEW ogc_actively_monitored_wells AS | ||
| WITH latest_monitoring_status AS ( | ||
| SELECT DISTINCT ON (sh.target_id) | ||
| sh.target_id AS thing_id, | ||
| sh.status_value | ||
| FROM status_history AS sh | ||
| WHERE | ||
| sh.target_table = 'thing' | ||
| AND sh.status_type = 'Monitoring Status' | ||
| ORDER BY sh.target_id, sh.start_date DESC, sh.id DESC | ||
| ) | ||
| SELECT | ||
| wws.id, | ||
| wws.name, | ||
|
|
@@ -43,7 +58,9 @@ def _create_actively_monitored_wells_view() -> str: | |
| FROM "group" AS g | ||
| JOIN group_thing_association AS gta ON gta.group_id = g.id | ||
| JOIN ogc_water_well_summary AS wws ON wws.id = gta.thing_id | ||
| JOIN latest_monitoring_status AS lms ON lms.thing_id = wws.id | ||
| WHERE lower(trim(g.name)) = 'water level network' | ||
| AND lms.status_value = 'Currently monitored' | ||
|
Comment on lines
+61
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This behavioral change is being shipped by editing the already-existing Alembic revision Useful? React with 👍 / 👎. |
||
| """ | ||
|
|
||
|
|
||
|
|
@@ -54,6 +71,7 @@ def upgrade() -> None: | |
| required_tables = { | ||
| "group", | ||
| "group_thing_association", | ||
| "status_history", | ||
| } | ||
|
|
||
| if not required_tables.issubset(existing_tables): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| """add project areas OGC view | ||
|
|
||
| Revision ID: t6u7v8w9x0y1 | ||
| Revises: s4t5u6v7w8x9 | ||
| Create Date: 2026-03-19 16:45:00.000000 | ||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| from alembic import op | ||
| from sqlalchemy import inspect, text | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "t6u7v8w9x0y1" | ||
| down_revision: Union[str, Sequence[str], None] = "s4t5u6v7w8x9" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
| DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_project_areas" | ||
|
|
||
|
|
||
| def _create_project_areas_view() -> str: | ||
| return """ | ||
| CREATE VIEW ogc_project_areas AS | ||
| SELECT | ||
| g.id, | ||
| g.name, | ||
| g.description, | ||
| g.group_type, | ||
| g.release_status, | ||
| g.project_area | ||
| FROM "group" AS g | ||
| WHERE g.project_area IS NOT NULL | ||
| """ | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| bind = op.get_bind() | ||
| inspector = inspect(bind) | ||
| existing_tables = set(inspector.get_table_names(schema="public")) | ||
| if "group" not in existing_tables: | ||
| raise RuntimeError( | ||
| "Cannot create ogc_project_areas. Missing required table: group" | ||
| ) | ||
|
|
||
| group_columns = {column["name"] for column in inspector.get_columns("group")} | ||
| if "project_area" not in group_columns: | ||
| raise RuntimeError( | ||
| "Cannot create ogc_project_areas. " | ||
| "Missing required column: group.project_area" | ||
| ) | ||
|
|
||
| op.execute(text(DROP_VIEW_SQL)) | ||
| op.execute(text(_create_project_areas_view())) | ||
| op.execute( | ||
| text( | ||
| "COMMENT ON VIEW ogc_project_areas IS " | ||
| "'Project areas for groups with polygon boundaries for pygeoapi.'" | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| op.execute(text(DROP_VIEW_SQL)) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1005,6 +1005,30 @@ def refresh_pygeoapi_materialized_views( | |
| typer.echo(f"Refreshed {len(target_views)} materialized view(s).") | ||
|
|
||
|
|
||
| @cli.command("import-project-area-boundaries") | ||
| def import_project_area_boundaries_command( | ||
| layer_url: str = typer.Option( | ||
| ( | ||
| "https://maps.nmt.edu/server/rest/services/Water/" | ||
| "Water_Resources/MapServer/17" | ||
| ), | ||
| "--layer-url", | ||
| help="ArcGIS Feature Layer URL for project area boundaries.", | ||
| ), | ||
| ): | ||
| from cli.project_area_import import import_project_area_boundaries | ||
|
|
||
| result = import_project_area_boundaries(layer_url=layer_url) | ||
| typer.echo(f"Fetched {result.fetched} feature(s).") | ||
| typer.echo(f"Matched {result.matched} group row(s).") | ||
| typer.echo(f"Updated {result.updated} group project area(s).") | ||
|
Comment on lines
+1008
to
+1024
|
||
| if result.unmatched_locations: | ||
| typer.echo( | ||
| "Unmatched locations: " + ", ".join(result.unmatched_locations), | ||
| err=True, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| cli() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| import httpx | ||
| from geoalchemy2 import WKTElement | ||
| from shapely.geometry import MultiPolygon, Polygon, shape | ||
| from sqlalchemy import func, select | ||
|
|
||
| from db import Group | ||
| from db.engine import session_ctx | ||
|
|
||
| PROJECT_AREA_LAYER_URL = "".join( | ||
| [ | ||
| "https://maps.nmt.edu/server/rest/services/Water/", | ||
| "Water_Resources/MapServer/17", | ||
| ] | ||
| ) | ||
| PROJECT_AREA_PAGE_SIZE = 1000 | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class ProjectAreaImportResult: | ||
| fetched: int | ||
| matched: int | ||
| updated: int | ||
| unmatched_locations: tuple[str, ...] | ||
|
|
||
|
|
||
| def _normalize_name(value: str) -> str: | ||
| return value.strip().lower() | ||
|
|
||
|
|
||
| def _geojson_to_multipolygon_wkt(geometry: dict[str, Any]) -> str: | ||
| geom = shape(geometry) | ||
| if isinstance(geom, Polygon): | ||
| geom = MultiPolygon([geom]) | ||
| if not isinstance(geom, MultiPolygon): | ||
| raise ValueError( | ||
| f"Expected Polygon or MultiPolygon geometry, got {geom.geom_type}" | ||
| ) | ||
| return geom.wkt | ||
|
|
||
|
|
||
| def _fetch_project_area_features( | ||
| client: httpx.Client, | ||
| layer_url: str, | ||
| ) -> list[dict[str, Any]]: | ||
| features: list[dict[str, Any]] = [] | ||
| offset = 0 | ||
|
|
||
| while True: | ||
| response = client.get( | ||
| f"{layer_url}/query", | ||
| params={ | ||
| "where": "1=1", | ||
| "outFields": "location", | ||
| "returnGeometry": "true", | ||
| "f": "geojson", | ||
| "resultOffset": offset, | ||
| "resultRecordCount": PROJECT_AREA_PAGE_SIZE, | ||
| }, | ||
| ) | ||
| response.raise_for_status() | ||
| payload = response.json() | ||
| batch = payload.get("features", []) | ||
| if not batch: | ||
| break | ||
| features.extend(batch) | ||
| if not payload.get("exceededTransferLimit"): | ||
| break | ||
| offset += len(batch) | ||
|
|
||
| return features | ||
|
|
||
|
|
||
| def import_project_area_boundaries( | ||
| layer_url: str = PROJECT_AREA_LAYER_URL, | ||
| ) -> ProjectAreaImportResult: | ||
| with httpx.Client(timeout=60.0) as client: | ||
| features = _fetch_project_area_features(client, layer_url) | ||
|
|
||
| unmatched_locations: list[str] = [] | ||
| matched = 0 | ||
| updated = 0 | ||
|
|
||
| with session_ctx() as session: | ||
| for feature in features: | ||
| attributes = feature.get("properties", {}) | ||
| geometry = feature.get("geometry") | ||
| location_name = (attributes.get("location") or "").strip() | ||
|
|
||
| if not location_name or geometry is None: | ||
| continue | ||
|
|
||
| groups = session.scalars( | ||
| select(Group).where( | ||
| func.lower(func.trim(Group.name)) == _normalize_name(location_name) | ||
| ) | ||
| ).all() | ||
|
Comment on lines
+97
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The importer matches groups only by normalized Useful? React with 👍 / 👎. |
||
|
|
||
| if not groups: | ||
| unmatched_locations.append(location_name) | ||
| continue | ||
|
|
||
| matched += len(groups) | ||
| project_area = WKTElement( | ||
| _geojson_to_multipolygon_wkt(geometry), | ||
| srid=4326, | ||
| ) | ||
| for group in groups: | ||
| group.project_area = project_area | ||
| updated += 1 | ||
|
|
||
| session.commit() | ||
|
|
||
| return ProjectAreaImportResult( | ||
| fetched=len(features), | ||
| matched=matched, | ||
| updated=updated, | ||
| unmatched_locations=tuple(sorted(set(unmatched_locations))), | ||
| ) | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -125,6 +125,102 @@ def test_refresh_pygeoapi_materialized_views_rejects_invalid_identifier(): | |||||
| assert "Invalid SQL identifier" in result.output | ||||||
|
|
||||||
|
|
||||||
| def test_import_project_area_boundaries_updates_matching_groups(monkeypatch): | ||||||
| class FakeGroup: | ||||||
| def __init__(self): | ||||||
| self.project_area = None | ||||||
|
|
||||||
| fake_group = FakeGroup() | ||||||
|
|
||||||
| class FakeClient: | ||||||
| def __init__(self, *args, **kwargs): | ||||||
| pass | ||||||
|
|
||||||
| def __enter__(self): | ||||||
| return self | ||||||
|
|
||||||
| def __exit__(self, exc_type, exc, tb): | ||||||
| return False | ||||||
|
|
||||||
| monkeypatch.setattr("cli.project_area_import.httpx.Client", FakeClient) | ||||||
| monkeypatch.setattr( | ||||||
| "cli.project_area_import._fetch_project_area_features", | ||||||
| lambda client, layer_url: [ | ||||||
| { | ||||||
| "properties": {"location": "Test Group"}, | ||||||
| "geometry": { | ||||||
| "type": "Polygon", | ||||||
| "coordinates": [ | ||||||
| [ | ||||||
| [-106.9, 33.9], | ||||||
| [-106.7, 33.9], | ||||||
| [-106.7, 34.1], | ||||||
| [-106.9, 34.1], | ||||||
| [-106.9, 33.9], | ||||||
| ] | ||||||
| ], | ||||||
| }, | ||||||
| }, | ||||||
| { | ||||||
| "properties": {"location": "Missing Group"}, | ||||||
| "geometry": { | ||||||
| "type": "Polygon", | ||||||
| "coordinates": [ | ||||||
| [ | ||||||
| [-105.0, 33.0], | ||||||
| [-104.8, 33.0], | ||||||
| [-104.8, 33.2], | ||||||
| [-105.0, 33.2], | ||||||
| [-105.0, 33.0], | ||||||
| ] | ||||||
| ], | ||||||
| }, | ||||||
| }, | ||||||
| ], | ||||||
| ) | ||||||
|
|
||||||
| class FakeScalarResult: | ||||||
| def __init__(self, groups): | ||||||
| self._groups = groups | ||||||
|
|
||||||
| def all(self): | ||||||
| return self._groups | ||||||
|
|
||||||
| class FakeSession: | ||||||
| def __init__(self): | ||||||
| self.commit_called = False | ||||||
| self.scalar_calls = 0 | ||||||
|
|
||||||
| def scalars(self, stmt): | ||||||
| self.scalar_calls += 1 | ||||||
| if self.scalar_calls == 1: | ||||||
| return FakeScalarResult([fake_group]) | ||||||
| return FakeScalarResult([]) | ||||||
|
|
||||||
| def commit(self): | ||||||
| self.commit_called = True | ||||||
|
|
||||||
| class FakeSessionCtx: | ||||||
| def __enter__(self): | ||||||
| self.session = FakeSession() | ||||||
| return self.session | ||||||
|
|
||||||
| def __exit__(self, exc_type, exc, tb): | ||||||
| return False | ||||||
|
|
||||||
| monkeypatch.setattr("cli.project_area_import.session_ctx", lambda: FakeSessionCtx()) | ||||||
|
|
||||||
| runner = CliRunner() | ||||||
| result = runner.invoke(cli, ["import-project-area-boundaries"]) | ||||||
|
|
||||||
| assert result.exit_code == 0, result.output | ||||||
| assert "Fetched 2 feature(s)." in result.output | ||||||
| assert "Matched 1 group row(s)." in result.output | ||||||
| assert "Updated 1 group project area(s)." in result.output | ||||||
| assert "Unmatched locations: Missing Group" in result.output | ||||||
|
||||||
| assert "Unmatched locations: Missing Group" in result.output | |
| assert "Unmatched locations: Missing Group" in result.stderr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thing.monitoring_statusonly treats rows withend_date is Noneas current (services/util.py:260-264), but this CTE now ranks everyMonitoring Statusrow regardless ofend_date. For wells whose latest/current status has been closed out without a replacement,ogc_actively_monitored_wellswill still be driven by stale history and return the wrong feature set.Useful? React with 👍 / 👎.