Skip to content

Commit a2e8f57

Browse files
committed
feat: create supporting views for pygeoapi OGC API integration
1 parent d635162 commit a2e8f57

2 files changed

Lines changed: 266 additions & 372 deletions

File tree

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
"""Create pygeoapi supporting OGC views.
2+
3+
Revision ID: d5e6f7a8b9c0
4+
Revises: c4d5e6f7a8b9
5+
Create Date: 2026-02-25 12:00:00.000000
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
from sqlalchemy import inspect, text
12+
13+
# revision identifiers, used by Alembic.
14+
revision: str = "d5e6f7a8b9c0"
15+
down_revision: Union[str, Sequence[str], None] = "c4d5e6f7a8b9"
16+
branch_labels: Union[str, Sequence[str], None] = None
17+
depends_on: Union[str, Sequence[str], None] = None
18+
19+
THING_COLLECTIONS = [
20+
("wells", "water well"),
21+
("springs", "spring"),
22+
("abandoned_wells", "abandoned well"),
23+
("artesian_wells", "artesian well"),
24+
("diversions_surface_water", "diversion of surface water, etc."),
25+
("dry_holes", "dry hole"),
26+
("dug_wells", "dug well"),
27+
("ephemeral_streams", "ephemeral stream"),
28+
("exploration_wells", "exploration well"),
29+
("injection_wells", "injection well"),
30+
("lakes_ponds_reservoirs", "lake, pond or reservoir"),
31+
("meteorological_stations", "meteorological station"),
32+
("monitoring_wells", "monitoring well"),
33+
("observation_wells", "observation well"),
34+
("other_things", "other"),
35+
("outfalls_wastewater_return_flow", "outfall of wastewater or return flow"),
36+
("perennial_streams", "perennial stream"),
37+
("piezometers", "piezometer"),
38+
("production_wells", "production well"),
39+
("rock_sample_locations", "rock sample location"),
40+
("soil_gas_sample_locations", "soil gas sample location"),
41+
("test_wells", "test well"),
42+
]
43+
44+
45+
def _create_thing_view(view_id: str, thing_type: str) -> str:
46+
escaped_thing_type = thing_type.replace("'", "''")
47+
return f"""
48+
CREATE VIEW ogc_{view_id} AS
49+
WITH latest_location AS (
50+
SELECT DISTINCT ON (lta.thing_id)
51+
lta.thing_id,
52+
lta.location_id,
53+
lta.effective_start
54+
FROM location_thing_association AS lta
55+
WHERE lta.effective_end IS NULL
56+
ORDER BY lta.thing_id, lta.effective_start DESC
57+
)
58+
SELECT
59+
t.id,
60+
t.name,
61+
t.thing_type,
62+
t.first_visit_date,
63+
t.spring_type,
64+
t.nma_pk_welldata,
65+
t.well_depth,
66+
t.hole_depth,
67+
t.well_casing_diameter,
68+
t.well_casing_depth,
69+
t.well_completion_date,
70+
t.well_driller_name,
71+
t.well_construction_method,
72+
t.well_pump_type,
73+
t.well_pump_depth,
74+
t.formation_completion_code,
75+
t.nma_formation_zone,
76+
t.release_status,
77+
l.point
78+
FROM thing AS t
79+
JOIN latest_location AS ll ON ll.thing_id = t.id
80+
JOIN location AS l ON l.id = ll.location_id
81+
WHERE t.thing_type = '{escaped_thing_type}'
82+
"""
83+
84+
85+
def _create_latest_depth_view() -> str:
86+
return """
87+
CREATE VIEW ogc_latest_depth_to_water_wells AS
88+
WITH latest_location AS (
89+
SELECT DISTINCT ON (lta.thing_id)
90+
lta.thing_id,
91+
lta.location_id,
92+
lta.effective_start
93+
FROM location_thing_association AS lta
94+
WHERE lta.effective_end IS NULL
95+
ORDER BY lta.thing_id, lta.effective_start DESC
96+
),
97+
ranked_obs AS (
98+
SELECT
99+
fe.thing_id,
100+
o.id AS observation_id,
101+
o.observation_datetime,
102+
o.value,
103+
o.measuring_point_height,
104+
(o.value - o.measuring_point_height) AS depth_to_water_bgs,
105+
ROW_NUMBER() OVER (
106+
PARTITION BY fe.thing_id
107+
ORDER BY o.observation_datetime DESC, o.id DESC
108+
) AS rn
109+
FROM observation AS o
110+
JOIN sample AS s ON s.id = o.sample_id
111+
JOIN field_activity AS fa ON fa.id = s.field_activity_id
112+
JOIN field_event AS fe ON fe.id = fa.field_event_id
113+
JOIN thing AS t ON t.id = fe.thing_id
114+
WHERE
115+
t.thing_type = 'water well'
116+
AND fa.activity_type = 'groundwater level'
117+
AND o.value IS NOT NULL
118+
AND o.measuring_point_height IS NOT NULL
119+
)
120+
SELECT
121+
t.id AS id,
122+
t.name,
123+
t.thing_type,
124+
ro.observation_id,
125+
ro.observation_datetime,
126+
ro.value AS depth_to_water_reference,
127+
ro.measuring_point_height,
128+
ro.depth_to_water_bgs,
129+
l.point
130+
FROM ranked_obs AS ro
131+
JOIN thing AS t ON t.id = ro.thing_id
132+
JOIN latest_location AS ll ON ll.thing_id = t.id
133+
JOIN location AS l ON l.id = ll.location_id
134+
WHERE ro.rn = 1
135+
"""
136+
137+
138+
def _create_latest_depth_fallback_view() -> str:
139+
return """
140+
CREATE VIEW ogc_latest_depth_to_water_wells AS
141+
SELECT
142+
t.id AS id,
143+
t.name,
144+
t.thing_type,
145+
NULL::integer AS observation_id,
146+
NULL::timestamptz AS observation_datetime,
147+
NULL::double precision AS depth_to_water_reference,
148+
NULL::double precision AS measuring_point_height,
149+
NULL::double precision AS depth_to_water_bgs,
150+
l.point
151+
FROM thing AS t
152+
JOIN location_thing_association AS lta ON lta.thing_id = t.id
153+
JOIN location AS l ON l.id = lta.location_id
154+
WHERE FALSE
155+
"""
156+
157+
158+
def _create_avg_tds_view() -> str:
159+
return """
160+
CREATE VIEW ogc_avg_tds_wells AS
161+
WITH latest_location AS (
162+
SELECT DISTINCT ON (lta.thing_id)
163+
lta.thing_id,
164+
lta.location_id,
165+
lta.effective_start
166+
FROM location_thing_association AS lta
167+
WHERE lta.effective_end IS NULL
168+
ORDER BY lta.thing_id, lta.effective_start DESC
169+
),
170+
tds_obs AS (
171+
SELECT
172+
csi.thing_id,
173+
mc.id AS major_chemistry_id,
174+
mc."AnalysisDate" AS analysis_date,
175+
mc."SampleValue" AS sample_value,
176+
mc."Units" AS units
177+
FROM "NMA_MajorChemistry" AS mc
178+
JOIN "NMA_Chemistry_SampleInfo" AS csi
179+
ON csi.id = mc.chemistry_sample_info_id
180+
JOIN thing AS t ON t.id = csi.thing_id
181+
WHERE
182+
t.thing_type = 'water well'
183+
AND mc."SampleValue" IS NOT NULL
184+
AND (
185+
lower(coalesce(mc."Analyte", '')) IN (
186+
'tds',
187+
'total dissolved solids'
188+
)
189+
OR lower(coalesce(mc."Symbol", '')) = 'tds'
190+
)
191+
)
192+
SELECT
193+
t.id AS id,
194+
t.name,
195+
t.thing_type,
196+
COUNT(to2.major_chemistry_id)::integer AS tds_observation_count,
197+
AVG(to2.sample_value)::double precision AS avg_tds_value,
198+
MIN(to2.analysis_date) AS first_tds_observation_datetime,
199+
MAX(to2.analysis_date) AS latest_tds_observation_datetime,
200+
l.point
201+
FROM tds_obs AS to2
202+
JOIN thing AS t ON t.id = to2.thing_id
203+
JOIN latest_location AS ll ON ll.thing_id = t.id
204+
JOIN location AS l ON l.id = ll.location_id
205+
GROUP BY t.id, t.name, t.thing_type, l.point
206+
"""
207+
208+
209+
def _create_avg_tds_fallback_view() -> str:
210+
return """
211+
CREATE VIEW ogc_avg_tds_wells AS
212+
SELECT
213+
t.id AS id,
214+
t.name,
215+
t.thing_type,
216+
NULL::integer AS tds_observation_count,
217+
NULL::double precision AS avg_tds_value,
218+
NULL::timestamptz AS first_tds_observation_datetime,
219+
NULL::timestamptz AS latest_tds_observation_datetime,
220+
l.point
221+
FROM thing AS t
222+
JOIN location_thing_association AS lta ON lta.thing_id = t.id
223+
JOIN location AS l ON l.id = lta.location_id
224+
WHERE FALSE
225+
"""
226+
227+
228+
def upgrade() -> None:
229+
bind = op.get_bind()
230+
inspector = inspect(bind)
231+
232+
required_core = {"thing", "location", "location_thing_association"}
233+
if not required_core.issubset(set(inspector.get_table_names(schema="public"))):
234+
raise RuntimeError(
235+
"Cannot create pygeoapi supporting views: required core tables are missing"
236+
)
237+
238+
for view_id, thing_type in THING_COLLECTIONS:
239+
op.execute(text(f"DROP VIEW IF EXISTS ogc_{view_id}"))
240+
op.execute(text(_create_thing_view(view_id, thing_type)))
241+
242+
op.execute(text("DROP VIEW IF EXISTS ogc_latest_depth_to_water_wells"))
243+
required_depth = {"observation", "sample", "field_activity", "field_event"}
244+
if required_depth.issubset(set(inspector.get_table_names(schema="public"))):
245+
op.execute(text(_create_latest_depth_view()))
246+
else:
247+
op.execute(text(_create_latest_depth_fallback_view()))
248+
249+
op.execute(text("DROP VIEW IF EXISTS ogc_avg_tds_wells"))
250+
required_tds = {"NMA_MajorChemistry", "NMA_Chemistry_SampleInfo"}
251+
if required_tds.issubset(set(inspector.get_table_names(schema="public"))):
252+
op.execute(text(_create_avg_tds_view()))
253+
else:
254+
op.execute(text(_create_avg_tds_fallback_view()))
255+
256+
257+
def downgrade() -> None:
258+
op.execute(text("DROP VIEW IF EXISTS ogc_avg_tds_wells"))
259+
op.execute(text("DROP VIEW IF EXISTS ogc_latest_depth_to_water_wells"))
260+
for view_id, _ in THING_COLLECTIONS:
261+
op.execute(text(f"DROP VIEW IF EXISTS ogc_{view_id}"))

0 commit comments

Comments
 (0)