Skip to content

Commit 1e4cbd4

Browse files
authored
Merge pull request #613 from DataIntegrationGroup/codex/ogc-actively-monitored
Add actively monitored wells OGC collection
2 parents 771b9b3 + e341f5f commit 1e4cbd4

7 files changed

Lines changed: 333 additions & 111 deletions

alembic/versions/c7f8a9b0c1d2_add_thing_id_to_nma_surface_water_data.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ def upgrade() -> None:
4444
# Remove any rows that cannot be linked to a Thing, then enforce NOT NULL
4545
op.execute('DELETE FROM "NMA_SurfaceWaterData" WHERE thing_id IS NULL')
4646
op.alter_column(
47-
"NMA_SurfaceWaterData", "thing_id", existing_type=sa.Integer(), nullable=False
47+
"NMA_SurfaceWaterData",
48+
"thing_id",
49+
existing_type=sa.Integer(),
50+
nullable=False,
4851
)
4952

5053

alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@
3434
("monitoring_wells", "monitoring well"),
3535
("observation_wells", "observation well"),
3636
("other_things", "other"),
37-
("outfalls_wastewater_return_flow", "outfall of wastewater or return flow"),
37+
(
38+
"outfalls_wastewater_return_flow",
39+
"outfall of wastewater or return flow",
40+
),
3841
("perennial_streams", "perennial stream"),
3942
("piezometers", "piezometer"),
4043
("production_wells", "production well"),
@@ -107,8 +110,11 @@ def _create_latest_depth_view() -> str:
107110
o.observation_datetime,
108111
o.value,
109112
o.measuring_point_height,
110-
-- Treat NULL measuring_point_height as 0 when computing depth_to_water_bgs
111-
(o.value - COALESCE(o.measuring_point_height, 0)) AS depth_to_water_bgs,
113+
-- Treat NULL measuring_point_height as 0 when computing
114+
-- depth_to_water_bgs.
115+
(
116+
o.value - COALESCE(o.measuring_point_height, 0)
117+
) AS depth_to_water_bgs,
112118
ROW_NUMBER() OVER (
113119
PARTITION BY fe.thing_id
114120
ORDER BY o.observation_datetime DESC, o.id DESC
@@ -151,7 +157,10 @@ def _create_avg_tds_view() -> str:
151157
SELECT
152158
csi.thing_id,
153159
mc.id AS major_chemistry_id,
154-
COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date,
160+
COALESCE(
161+
mc."AnalysisDate",
162+
csi."CollectionDate"
163+
)::date AS observation_date,
155164
mc."SampleValue" AS sample_value,
156165
mc."Units" AS units
157166
FROM "NMA_MajorChemistry" AS mc
@@ -193,15 +202,16 @@ def _drop_view_or_materialized_view(view_name: str) -> None:
193202

194203
def _create_matview_indexes() -> None:
195204
# Required so REFRESH MATERIALIZED VIEW CONCURRENTLY can run.
205+
avg_tds_index_sql = (
206+
"CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)"
207+
)
196208
op.execute(
197209
text(
198210
"CREATE UNIQUE INDEX ux_ogc_latest_depth_to_water_wells_id "
199211
"ON ogc_latest_depth_to_water_wells (id)"
200212
)
201213
)
202-
op.execute(
203-
text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)")
204-
)
214+
op.execute(text(avg_tds_index_sql))
205215

206216

207217
def _create_refresh_function() -> str:
@@ -220,7 +230,11 @@ def _create_refresh_function() -> str:
220230
WHERE schemaname = 'public'
221231
AND matviewname LIKE 'ogc_%'
222232
LOOP
223-
matview_fqname := format('%I.%I', matview_record.schemaname, matview_record.matviewname);
233+
matview_fqname := format(
234+
'%I.%I',
235+
matview_record.schemaname,
236+
matview_record.matviewname
237+
);
224238
EXECUTE format('REFRESH MATERIALIZED VIEW %s', matview_fqname);
225239
END LOOP;
226240
END;
@@ -235,10 +249,15 @@ def upgrade() -> None:
235249
required_core = {"thing", "location", "location_thing_association"}
236250
existing_tables = set(inspector.get_table_names(schema="public"))
237251
if not required_core.issubset(existing_tables):
238-
missing_tables = sorted(t for t in required_core if t not in existing_tables)
252+
missing_tables = sorted(
253+
table_name
254+
for table_name in required_core
255+
if table_name not in existing_tables
256+
)
239257
missing_tables_str = ", ".join(missing_tables)
240258
raise RuntimeError(
241-
"Cannot create pygeoapi supporting views. The following required core "
259+
"Cannot create pygeoapi supporting views. "
260+
"The following required core "
242261
f"tables are missing: {missing_tables_str}"
243262
)
244263

@@ -255,7 +274,8 @@ def upgrade() -> None:
255274
)
256275
missing_depth_tables_str = ", ".join(missing_depth_tables)
257276
raise RuntimeError(
258-
"Cannot create ogc_latest_depth_to_water_wells. The following required "
277+
"Cannot create ogc_latest_depth_to_water_wells. "
278+
"The following required "
259279
f"tables are missing: {missing_depth_tables_str}"
260280
)
261281
op.execute(text(_create_latest_depth_view()))
@@ -269,7 +289,11 @@ def upgrade() -> None:
269289
_drop_view_or_materialized_view("ogc_avg_tds_wells")
270290
required_tds = {"NMA_MajorChemistry", "NMA_Chemistry_SampleInfo"}
271291
if not required_tds.issubset(existing_tables):
272-
missing_tds_tables = sorted(t for t in required_tds if t not in existing_tables)
292+
missing_tds_tables = sorted(
293+
table_name
294+
for table_name in required_tds
295+
if table_name not in existing_tables
296+
)
273297
missing_tds_tables_str = ", ".join(missing_tds_tables)
274298
raise RuntimeError(
275299
"Cannot create ogc_avg_tds_wells. The following required "
@@ -288,7 +312,10 @@ def upgrade() -> None:
288312

289313

290314
def downgrade() -> None:
291-
op.execute(text(f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()"))
315+
drop_refresh_function_sql = (
316+
f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()"
317+
)
318+
op.execute(text(drop_refresh_function_sql))
292319
_drop_view_or_materialized_view("ogc_avg_tds_wells")
293320
_drop_view_or_materialized_view("ogc_latest_depth_to_water_wells")
294321
for view_id, _ in THING_COLLECTIONS:
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""add actively monitored wells pygeoapi view
2+
3+
Revision ID: r2s3t4u5v6w7
4+
Revises: p9c0d1e2f3a4
5+
Create Date: 2026-03-19 10:10:00.000000
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
from sqlalchemy import inspect, text
12+
13+
# revision identifiers, used by Alembic.
14+
revision: str = "r2s3t4u5v6w7"
15+
down_revision: Union[str, Sequence[str], None] = "p9c0d1e2f3a4"
16+
branch_labels: Union[str, Sequence[str], None] = None
17+
depends_on: Union[str, Sequence[str], None] = None
18+
DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_actively_monitored_wells"
19+
DROP_MATVIEW_SQL = "DROP MATERIALIZED VIEW IF EXISTS " "ogc_actively_monitored_wells"
20+
21+
22+
def _create_actively_monitored_wells_view() -> str:
23+
return """
24+
CREATE VIEW ogc_actively_monitored_wells AS
25+
SELECT
26+
wws.id,
27+
wws.name,
28+
'water well'::text AS thing_type,
29+
wws.well_depth,
30+
wws.elevation,
31+
wws.elevation_method,
32+
wws.formation_zone,
33+
wws.total_water_levels,
34+
wws.last_water_level,
35+
wws.last_water_level_datetime,
36+
wws.min_water_level,
37+
wws.max_water_level,
38+
wws.water_level_trend_ft_per_year,
39+
g.id AS group_id,
40+
g.name AS group_name,
41+
g.group_type,
42+
wws.point
43+
FROM "group" AS g
44+
JOIN group_thing_association AS gta ON gta.group_id = g.id
45+
JOIN ogc_water_well_summary AS wws ON wws.id = gta.thing_id
46+
WHERE lower(trim(g.name)) = 'water level network'
47+
"""
48+
49+
50+
def upgrade() -> None:
51+
bind = op.get_bind()
52+
inspector = inspect(bind)
53+
existing_tables = set(inspector.get_table_names(schema="public"))
54+
required_tables = {
55+
"group",
56+
"group_thing_association",
57+
}
58+
59+
if not required_tables.issubset(existing_tables):
60+
missing = sorted(
61+
table_name
62+
for table_name in required_tables
63+
if table_name not in existing_tables
64+
)
65+
raise RuntimeError(
66+
"Cannot create ogc_actively_monitored_wells. "
67+
f"Missing required tables: {', '.join(missing)}"
68+
)
69+
70+
has_summary = bind.execute(
71+
text(
72+
"SELECT 1 FROM pg_matviews "
73+
"WHERE schemaname = 'public' "
74+
"AND matviewname = 'ogc_water_well_summary'"
75+
)
76+
).scalar()
77+
if has_summary != 1:
78+
raise RuntimeError(
79+
"Cannot create ogc_actively_monitored_wells. "
80+
"Missing required materialized view: ogc_water_well_summary"
81+
)
82+
83+
op.execute(text(DROP_VIEW_SQL))
84+
op.execute(text(DROP_MATVIEW_SQL))
85+
op.execute(text(_create_actively_monitored_wells_view()))
86+
op.execute(
87+
text(
88+
"COMMENT ON VIEW ogc_actively_monitored_wells IS "
89+
"'Wells in the Water Level Network group for pygeoapi.'"
90+
)
91+
)
92+
93+
94+
def downgrade() -> None:
95+
op.execute(text(DROP_VIEW_SQL))
96+
op.execute(text(DROP_MATVIEW_SQL))
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""drop unused well-type OGC views
2+
3+
Revision ID: s4t5u6v7w8x9
4+
Revises: r2s3t4u5v6w7
5+
Create Date: 2026-03-19 14:30:00.000000
6+
"""
7+
8+
import re
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
from sqlalchemy import text
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "s4t5u6v7w8x9"
16+
down_revision: Union[str, Sequence[str], None] = "r2s3t4u5v6w7"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
REMOVED_THING_COLLECTIONS = [
21+
("abandoned_wells", "abandoned well"),
22+
("artesian_wells", "artesian well"),
23+
("dry_holes", "dry hole"),
24+
("dug_wells", "dug well"),
25+
("exploration_wells", "exploration well"),
26+
("injection_wells", "injection well"),
27+
("monitoring_wells", "monitoring well"),
28+
("observation_wells", "observation well"),
29+
("piezometers", "piezometer"),
30+
("production_wells", "production well"),
31+
("test_wells", "test well"),
32+
]
33+
34+
LATEST_LOCATION_CTE = """
35+
SELECT DISTINCT ON (lta.thing_id)
36+
lta.thing_id,
37+
lta.location_id,
38+
lta.effective_start
39+
FROM location_thing_association AS lta
40+
WHERE lta.effective_end IS NULL
41+
ORDER BY lta.thing_id, lta.effective_start DESC
42+
""".strip()
43+
44+
45+
def _safe_view_id(view_id: str) -> str:
46+
if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", view_id):
47+
raise ValueError(f"Unsafe view id: {view_id!r}")
48+
return view_id
49+
50+
51+
def _drop_view_or_materialized_view(view_name: str) -> None:
52+
op.execute(text(f"DROP VIEW IF EXISTS {view_name}"))
53+
op.execute(text(f"DROP MATERIALIZED VIEW IF EXISTS {view_name}"))
54+
55+
56+
def _create_thing_view(view_id: str, thing_type: str) -> str:
57+
safe_view_id = _safe_view_id(view_id)
58+
escaped_thing_type = thing_type.replace("'", "''")
59+
return f"""
60+
CREATE VIEW ogc_{safe_view_id} AS
61+
WITH latest_location AS (
62+
{LATEST_LOCATION_CTE}
63+
)
64+
SELECT
65+
t.id,
66+
t.name,
67+
t.first_visit_date,
68+
t.nma_pk_welldata,
69+
t.well_depth,
70+
t.hole_depth,
71+
t.well_casing_diameter,
72+
t.well_casing_depth,
73+
t.well_completion_date,
74+
t.well_driller_name,
75+
t.well_construction_method,
76+
t.well_pump_type,
77+
t.well_pump_depth,
78+
t.formation_completion_code,
79+
t.nma_formation_zone,
80+
t.release_status,
81+
l.elevation,
82+
l.point
83+
FROM thing AS t
84+
JOIN latest_location AS ll ON ll.thing_id = t.id
85+
JOIN location AS l ON l.id = ll.location_id
86+
WHERE t.thing_type = '{escaped_thing_type}'
87+
"""
88+
89+
90+
def upgrade() -> None:
91+
for view_id, _ in REMOVED_THING_COLLECTIONS:
92+
_drop_view_or_materialized_view(f"ogc_{_safe_view_id(view_id)}")
93+
94+
95+
def downgrade() -> None:
96+
for view_id, thing_type in REMOVED_THING_COLLECTIONS:
97+
safe_view_id = _safe_view_id(view_id)
98+
_drop_view_or_materialized_view(f"ogc_{safe_view_id}")
99+
op.execute(text(_create_thing_view(view_id, thing_type)))

core/pygeoapi-config.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,27 @@ resources:
241241
table: ogc_minor_chemistry_wells
242242
geom_field: point
243243

244+
actively_monitored_wells:
245+
type: collection
246+
title: Actively Monitored Wells
247+
description: Wells in the collaborative network currently flagged as actively monitored.
248+
keywords: [water-wells, monitoring, collaborative-network, actively-monitored]
249+
extents:
250+
spatial:
251+
bbox: [-109.05, 31.33, -103.00, 37.00]
252+
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
253+
providers:
254+
- type: feature
255+
name: PostgreSQL
256+
data:
257+
host: {postgres_host}
258+
port: {postgres_port}
259+
dbname: {postgres_db}
260+
user: {postgres_user}
261+
password: {postgres_password_env}
262+
search_path: [public]
263+
id_field: id
264+
table: ogc_actively_monitored_wells
265+
geom_field: point
266+
244267
{thing_collections_block}

0 commit comments

Comments
 (0)