diff --git a/alembic/versions/r2s3t4u5v6w7_add_actively_monitored_wells_pygeoapi_materialized_view.py b/alembic/versions/r2s3t4u5v6w7_add_actively_monitored_wells_pygeoapi_materialized_view.py index 8c674c96..4cbd9612 100644 --- a/alembic/versions/r2s3t4u5v6w7_add_actively_monitored_wells_pygeoapi_materialized_view.py +++ b/alembic/versions/r2s3t4u5v6w7_add_actively_monitored_wells_pygeoapi_materialized_view.py @@ -16,12 +16,27 @@ branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_actively_monitored_wells" -DROP_MATVIEW_SQL = "DROP MATERIALIZED VIEW IF EXISTS " "ogc_actively_monitored_wells" +DROP_MATVIEW_SQL = "".join( + [ + "DROP MATERIALIZED VIEW IF EXISTS ", + "ogc_actively_monitored_wells", + ] +) def _create_actively_monitored_wells_view() -> str: return """ CREATE VIEW ogc_actively_monitored_wells AS + WITH latest_monitoring_status AS ( + SELECT DISTINCT ON (sh.target_id) + sh.target_id AS thing_id, + sh.status_value + FROM status_history AS sh + WHERE + sh.target_table = 'thing' + AND sh.status_type = 'Monitoring Status' + ORDER BY sh.target_id, sh.start_date DESC, sh.id DESC + ) SELECT wws.id, wws.name, @@ -43,7 +58,9 @@ def _create_actively_monitored_wells_view() -> str: FROM "group" AS g JOIN group_thing_association AS gta ON gta.group_id = g.id JOIN ogc_water_well_summary AS wws ON wws.id = gta.thing_id + JOIN latest_monitoring_status AS lms ON lms.thing_id = wws.id WHERE lower(trim(g.name)) = 'water level network' + AND lms.status_value = 'Currently monitored' """ @@ -54,6 +71,7 @@ def upgrade() -> None: required_tables = { "group", "group_thing_association", + "status_history", } if not required_tables.issubset(existing_tables): diff --git a/alembic/versions/t6u7v8w9x0y1_add_project_areas_ogc_view.py b/alembic/versions/t6u7v8w9x0y1_add_project_areas_ogc_view.py new file mode 100644 index 00000000..c03af311 --- /dev/null +++ b/alembic/versions/t6u7v8w9x0y1_add_project_areas_ogc_view.py @@ -0,0 +1,64 @@ +"""add project areas OGC view + +Revision ID: t6u7v8w9x0y1 +Revises: s4t5u6v7w8x9 +Create Date: 2026-03-19 16:45:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "t6u7v8w9x0y1" +down_revision: Union[str, Sequence[str], None] = "s4t5u6v7w8x9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_project_areas" + + +def _create_project_areas_view() -> str: + return """ + CREATE VIEW ogc_project_areas AS + SELECT + g.id, + g.name, + g.description, + g.group_type, + g.release_status, + g.project_area + FROM "group" AS g + WHERE g.project_area IS NOT NULL + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + if "group" not in existing_tables: + raise RuntimeError( + "Cannot create ogc_project_areas. Missing required table: group" + ) + + group_columns = {column["name"] for column in inspector.get_columns("group")} + if "project_area" not in group_columns: + raise RuntimeError( + "Cannot create ogc_project_areas. " + "Missing required column: group.project_area" + ) + + op.execute(text(DROP_VIEW_SQL)) + op.execute(text(_create_project_areas_view())) + op.execute( + text( + "COMMENT ON VIEW ogc_project_areas IS " + "'Project areas for groups with polygon boundaries for pygeoapi.'" + ) + ) + + +def downgrade() -> None: + op.execute(text(DROP_VIEW_SQL)) diff --git a/cli/cli.py b/cli/cli.py index 44e9d02f..be093917 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -1005,6 +1005,30 @@ def refresh_pygeoapi_materialized_views( typer.echo(f"Refreshed {len(target_views)} materialized view(s).") +@cli.command("import-project-area-boundaries") +def import_project_area_boundaries_command( + layer_url: str = typer.Option( + ( + "https://maps.nmt.edu/server/rest/services/Water/" + "Water_Resources/MapServer/17" + ), + "--layer-url", + help="ArcGIS Feature Layer URL for project area boundaries.", + ), +): + from cli.project_area_import import import_project_area_boundaries + + result = import_project_area_boundaries(layer_url=layer_url) + typer.echo(f"Fetched {result.fetched} feature(s).") + typer.echo(f"Matched {result.matched} group row(s).") + typer.echo(f"Updated {result.updated} group project area(s).") + if result.unmatched_locations: + typer.echo( + "Unmatched locations: " + ", ".join(result.unmatched_locations), + err=True, + ) + + if __name__ == "__main__": cli() diff --git a/cli/project_area_import.py b/cli/project_area_import.py new file mode 100644 index 00000000..2f748530 --- /dev/null +++ b/cli/project_area_import.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import httpx +from geoalchemy2 import WKTElement +from shapely.geometry import MultiPolygon, Polygon, shape +from sqlalchemy import func, select + +from db import Group +from db.engine import session_ctx + +PROJECT_AREA_LAYER_URL = "".join( + [ + "https://maps.nmt.edu/server/rest/services/Water/", + "Water_Resources/MapServer/17", + ] +) +PROJECT_AREA_PAGE_SIZE = 1000 + + +@dataclass(frozen=True) +class ProjectAreaImportResult: + fetched: int + matched: int + updated: int + unmatched_locations: tuple[str, ...] + + +def _normalize_name(value: str) -> str: + return value.strip().lower() + + +def _geojson_to_multipolygon_wkt(geometry: dict[str, Any]) -> str: + geom = shape(geometry) + if isinstance(geom, Polygon): + geom = MultiPolygon([geom]) + if not isinstance(geom, MultiPolygon): + raise ValueError( + f"Expected Polygon or MultiPolygon geometry, got {geom.geom_type}" + ) + return geom.wkt + + +def _fetch_project_area_features( + client: httpx.Client, + layer_url: str, +) -> list[dict[str, Any]]: + features: list[dict[str, Any]] = [] + offset = 0 + + while True: + response = client.get( + f"{layer_url}/query", + params={ + "where": "1=1", + "outFields": "location", + "returnGeometry": "true", + "f": "geojson", + "resultOffset": offset, + "resultRecordCount": PROJECT_AREA_PAGE_SIZE, + }, + ) + response.raise_for_status() + payload = response.json() + batch = payload.get("features", []) + if not batch: + break + features.extend(batch) + if not payload.get("exceededTransferLimit"): + break + offset += len(batch) + + return features + + +def import_project_area_boundaries( + layer_url: str = PROJECT_AREA_LAYER_URL, +) -> ProjectAreaImportResult: + with httpx.Client(timeout=60.0) as client: + features = _fetch_project_area_features(client, layer_url) + + unmatched_locations: list[str] = [] + matched = 0 + updated = 0 + + with session_ctx() as session: + for feature in features: + attributes = feature.get("properties", {}) + geometry = feature.get("geometry") + location_name = (attributes.get("location") or "").strip() + + if not location_name or geometry is None: + continue + + groups = session.scalars( + select(Group).where( + func.lower(func.trim(Group.name)) == _normalize_name(location_name) + ) + ).all() + + if not groups: + unmatched_locations.append(location_name) + continue + + matched += len(groups) + project_area = WKTElement( + _geojson_to_multipolygon_wkt(geometry), + srid=4326, + ) + for group in groups: + group.project_area = project_area + updated += 1 + + session.commit() + + return ProjectAreaImportResult( + fetched=len(features), + matched=matched, + updated=updated, + unmatched_locations=tuple(sorted(set(unmatched_locations))), + ) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index f060e499..1bae81d9 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -264,4 +264,27 @@ resources: table: ogc_actively_monitored_wells geom_field: point + project_areas: + type: collection + title: Project Areas + description: Project groups with polygon project-area boundaries. + keywords: [project-areas, groups, boundaries] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_project_areas + geom_field: project_area + {thing_collections_block} diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 47f451ec..6db32370 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -125,6 +125,102 @@ def test_refresh_pygeoapi_materialized_views_rejects_invalid_identifier(): assert "Invalid SQL identifier" in result.output +def test_import_project_area_boundaries_updates_matching_groups(monkeypatch): + class FakeGroup: + def __init__(self): + self.project_area = None + + fake_group = FakeGroup() + + class FakeClient: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + monkeypatch.setattr("cli.project_area_import.httpx.Client", FakeClient) + monkeypatch.setattr( + "cli.project_area_import._fetch_project_area_features", + lambda client, layer_url: [ + { + "properties": {"location": "Test Group"}, + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [-106.9, 33.9], + [-106.7, 33.9], + [-106.7, 34.1], + [-106.9, 34.1], + [-106.9, 33.9], + ] + ], + }, + }, + { + "properties": {"location": "Missing Group"}, + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [-105.0, 33.0], + [-104.8, 33.0], + [-104.8, 33.2], + [-105.0, 33.2], + [-105.0, 33.0], + ] + ], + }, + }, + ], + ) + + class FakeScalarResult: + def __init__(self, groups): + self._groups = groups + + def all(self): + return self._groups + + class FakeSession: + def __init__(self): + self.commit_called = False + self.scalar_calls = 0 + + def scalars(self, stmt): + self.scalar_calls += 1 + if self.scalar_calls == 1: + return FakeScalarResult([fake_group]) + return FakeScalarResult([]) + + def commit(self): + self.commit_called = True + + class FakeSessionCtx: + def __enter__(self): + self.session = FakeSession() + return self.session + + def __exit__(self, exc_type, exc, tb): + return False + + monkeypatch.setattr("cli.project_area_import.session_ctx", lambda: FakeSessionCtx()) + + runner = CliRunner() + result = runner.invoke(cli, ["import-project-area-boundaries"]) + + assert result.exit_code == 0, result.output + assert "Fetched 2 feature(s)." in result.output + assert "Matched 1 group row(s)." in result.output + assert "Updated 1 group project area(s)." in result.output + assert "Unmatched locations: Missing Group" in result.output + assert fake_group.project_area is not None + + def test_initialize_lexicon_invokes_initializer(monkeypatch): called = {"count": 0} diff --git a/tests/test_ogc.py b/tests/test_ogc.py index 7abcc981..42a0c984 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -20,7 +20,6 @@ from fastapi.testclient import TestClient from sqlalchemy import text -from core.factory import create_api_app from core.dependencies import ( admin_function, editor_function, @@ -29,12 +28,14 @@ viewer_function, amp_viewer_function, ) +from core.factory import create_api_app from db import ( Group, GroupThingAssociation, NMA_Chemistry_SampleInfo, NMA_MajorChemistry, NMA_MinorTraceChemistry, + StatusHistory, ) from db.engine import session_ctx from tests import override_authentication @@ -422,6 +423,14 @@ def test_ogc_actively_monitored_wells_exposes_water_level_network_group_wells( thing_id=water_well_thing.id, ) session.add(group_assoc) + status_history = StatusHistory( + status_type="Monitoring Status", + status_value="Currently monitored", + start_date=date(2024, 1, 1), + target_id=water_well_thing.id, + target_table="thing", + ) + session.add(status_history) session.commit() row = session.execute( @@ -436,6 +445,59 @@ def test_ogc_actively_monitored_wells_exposes_water_level_network_group_wells( assert row.group_name == "Water Level Network" assert row.group_type == "Monitoring Plan" + session.delete(status_history) + session.delete(group_assoc) + session.delete(group) + session.commit() + + +def test_ogc_actively_monitored_wells_excludes_latest_not_currently_monitored( + water_well_thing, + groundwater_level_observation, +): + with session_ctx() as session: + session.execute(text("REFRESH MATERIALIZED VIEW ogc_water_well_summary")) + session.commit() + + group = Group( + name="Water Level Network", + group_type="Monitoring Plan", + release_status="draft", + ) + session.add(group) + session.flush() + + group_assoc = GroupThingAssociation( + group_id=group.id, + thing_id=water_well_thing.id, + ) + session.add(group_assoc) + currently_monitored = StatusHistory( + status_type="Monitoring Status", + status_value="Currently monitored", + start_date=date(2024, 1, 1), + target_id=water_well_thing.id, + target_table="thing", + ) + not_currently_monitored = StatusHistory( + status_type="Monitoring Status", + status_value="Not currently monitored", + start_date=date(2024, 2, 1), + target_id=water_well_thing.id, + target_table="thing", + ) + session.add_all([currently_monitored, not_currently_monitored]) + session.commit() + + row = session.execute( + text("SELECT id FROM ogc_actively_monitored_wells WHERE id = :thing_id"), + {"thing_id": water_well_thing.id}, + ).one_or_none() + + assert row is None + + session.delete(not_currently_monitored) + session.delete(currently_monitored) session.delete(group_assoc) session.delete(group) session.commit() @@ -457,6 +519,7 @@ def test_ogc_collections(ogc_client): "major_chemistry_results", "minor_chemistry_wells", "actively_monitored_wells", + "project_areas", }.issubset(ids) @@ -469,6 +532,7 @@ def test_ogc_new_collection_items_endpoints(ogc_client): "major_chemistry_results", "minor_chemistry_wells", "actively_monitored_wells", + "project_areas", ): response = ogc_client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") assert response.status_code == 200 @@ -476,6 +540,15 @@ def test_ogc_new_collection_items_endpoints(ogc_client): assert payload["type"] == "FeatureCollection" +def test_ogc_project_areas_items_expose_groups_with_project_areas(ogc_client, group): + response = ogc_client.get("/ogcapi/collections/project_areas/items?limit=20") + + assert response.status_code == 200 + payload = response.json() + ids = {str(feature["id"]) for feature in payload["features"]} + assert str(group.id) in ids + + @pytest.mark.skip("PostGIS spatial operators not available in CI - see issue #449") def test_ogc_locations_items_bbox(location): bbox = "-107.95,33.80,-107.94,33.81"