Skip to content

Commit 924652c

Browse files
committed
feat: add materialized views for latest TDS, depth to water trend, and water well summary
1 parent a18c1d8 commit 924652c

9 files changed

Lines changed: 701 additions & 28 deletions

alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,7 @@ def _create_thing_view(view_id: str, thing_type: str) -> str:
7171
SELECT
7272
t.id,
7373
t.name,
74-
t.thing_type,
7574
t.first_visit_date,
76-
t.spring_type,
7775
t.nma_pk_welldata,
7876
t.well_depth,
7977
t.hole_depth,
@@ -87,6 +85,7 @@ def _create_thing_view(view_id: str, thing_type: str) -> str:
8785
t.formation_completion_code,
8886
t.nma_formation_zone,
8987
t.release_status,
88+
l.elevation,
9089
l.point
9190
FROM thing AS t
9291
JOIN latest_location AS ll ON ll.thing_id = t.id
@@ -152,7 +151,7 @@ def _create_avg_tds_view() -> str:
152151
SELECT
153152
csi.thing_id,
154153
mc.id AS major_chemistry_id,
155-
mc."AnalysisDate" AS analysis_date,
154+
COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date,
156155
mc."SampleValue" AS sample_value,
157156
mc."Units" AS units
158157
FROM "NMA_MajorChemistry" AS mc
@@ -176,8 +175,8 @@ def _create_avg_tds_view() -> str:
176175
t.thing_type,
177176
COUNT(to2.major_chemistry_id)::integer AS tds_observation_count,
178177
AVG(to2.sample_value)::double precision AS avg_tds_value,
179-
MIN(to2.analysis_date) AS first_tds_observation_datetime,
180-
MAX(to2.analysis_date) AS latest_tds_observation_datetime,
178+
MIN(to2.observation_date) AS first_tds_observation_date,
179+
MAX(to2.observation_date) AS last_tds_observation_date,
181180
l.point
182181
FROM tds_obs AS to2
183182
JOIN thing AS t ON t.id = to2.thing_id
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
"""add latest tds pygeoapi materialized view
2+
3+
Revision ID: i2b3c4d5e6f7
4+
Revises: d5e6f7a8b9c0
5+
Create Date: 2026-03-02 11:00:00.000000
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
from sqlalchemy import inspect, text
12+
13+
# revision identifiers, used by Alembic.
14+
revision: str = "i2b3c4d5e6f7"
15+
down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0"
16+
branch_labels: Union[str, Sequence[str], None] = None
17+
depends_on: Union[str, Sequence[str], None] = None
18+
19+
LATEST_LOCATION_CTE = """
20+
SELECT DISTINCT ON (lta.thing_id)
21+
lta.thing_id,
22+
lta.location_id,
23+
lta.effective_start
24+
FROM location_thing_association AS lta
25+
WHERE lta.effective_end IS NULL
26+
ORDER BY lta.thing_id, lta.effective_start DESC
27+
""".strip()
28+
29+
30+
def _create_latest_tds_view() -> str:
31+
return f"""
32+
CREATE VIEW ogc_latest_tds_wells AS
33+
WITH latest_location AS (
34+
{LATEST_LOCATION_CTE}
35+
),
36+
tds_obs AS (
37+
SELECT
38+
csi.thing_id,
39+
mc.id AS major_chemistry_id,
40+
COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date,
41+
mc."SampleValue" AS sample_value,
42+
mc."Units" AS units
43+
FROM "NMA_MajorChemistry" AS mc
44+
JOIN "NMA_Chemistry_SampleInfo" AS csi
45+
ON csi.id = mc.chemistry_sample_info_id
46+
JOIN thing AS t ON t.id = csi.thing_id
47+
WHERE
48+
t.thing_type = 'water well'
49+
AND mc."SampleValue" IS NOT NULL
50+
AND (
51+
lower(coalesce(mc."Analyte", '')) IN (
52+
'tds',
53+
'total dissolved solids'
54+
)
55+
OR lower(coalesce(mc."Symbol", '')) = 'tds'
56+
)
57+
),
58+
ranked_tds AS (
59+
SELECT
60+
to2.thing_id,
61+
to2.major_chemistry_id,
62+
to2.observation_date,
63+
to2.sample_value,
64+
to2.units,
65+
ROW_NUMBER() OVER (
66+
PARTITION BY to2.thing_id
67+
ORDER BY to2.observation_date DESC NULLS LAST, to2.major_chemistry_id DESC
68+
) AS rn
69+
FROM tds_obs AS to2
70+
)
71+
SELECT
72+
t.id AS id,
73+
t.name,
74+
t.thing_type,
75+
rt.major_chemistry_id,
76+
rt.observation_date AS latest_tds_observation_date,
77+
rt.sample_value AS latest_tds_value,
78+
rt.units AS latest_tds_units,
79+
l.point
80+
FROM ranked_tds AS rt
81+
JOIN thing AS t ON t.id = rt.thing_id
82+
JOIN latest_location AS ll ON ll.thing_id = t.id
83+
JOIN location AS l ON l.id = ll.location_id
84+
WHERE rt.rn = 1
85+
"""
86+
87+
88+
def _create_avg_tds_view() -> str:
89+
return f"""
90+
CREATE MATERIALIZED VIEW ogc_avg_tds_wells AS
91+
WITH latest_location AS (
92+
{LATEST_LOCATION_CTE}
93+
),
94+
tds_obs AS (
95+
SELECT
96+
csi.thing_id,
97+
mc.id AS major_chemistry_id,
98+
COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date,
99+
mc."SampleValue" AS sample_value,
100+
mc."Units" AS units
101+
FROM "NMA_MajorChemistry" AS mc
102+
JOIN "NMA_Chemistry_SampleInfo" AS csi
103+
ON csi.id = mc.chemistry_sample_info_id
104+
JOIN thing AS t ON t.id = csi.thing_id
105+
WHERE
106+
t.thing_type = 'water well'
107+
AND mc."SampleValue" IS NOT NULL
108+
AND (
109+
lower(coalesce(mc."Analyte", '')) IN (
110+
'tds',
111+
'total dissolved solids'
112+
)
113+
OR lower(coalesce(mc."Symbol", '')) = 'tds'
114+
)
115+
)
116+
SELECT
117+
t.id AS id,
118+
t.name,
119+
t.thing_type,
120+
COUNT(to2.major_chemistry_id)::integer AS tds_observation_count,
121+
AVG(to2.sample_value)::double precision AS avg_tds_value,
122+
MIN(to2.observation_date) AS first_tds_observation_date,
123+
MAX(to2.observation_date) AS last_tds_observation_date,
124+
l.point
125+
FROM tds_obs AS to2
126+
JOIN thing AS t ON t.id = to2.thing_id
127+
JOIN latest_location AS ll ON ll.thing_id = t.id
128+
JOIN location AS l ON l.id = ll.location_id
129+
GROUP BY t.id, t.name, t.thing_type, l.point
130+
"""
131+
132+
133+
def _create_avg_tds_view_with_datetime_columns() -> str:
134+
return f"""
135+
CREATE MATERIALIZED VIEW ogc_avg_tds_wells AS
136+
WITH latest_location AS (
137+
{LATEST_LOCATION_CTE}
138+
),
139+
tds_obs AS (
140+
SELECT
141+
csi.thing_id,
142+
mc.id AS major_chemistry_id,
143+
mc."AnalysisDate" AS analysis_date,
144+
mc."SampleValue" AS sample_value,
145+
mc."Units" AS units
146+
FROM "NMA_MajorChemistry" AS mc
147+
JOIN "NMA_Chemistry_SampleInfo" AS csi
148+
ON csi.id = mc.chemistry_sample_info_id
149+
JOIN thing AS t ON t.id = csi.thing_id
150+
WHERE
151+
t.thing_type = 'water well'
152+
AND mc."SampleValue" IS NOT NULL
153+
AND (
154+
lower(coalesce(mc."Analyte", '')) IN (
155+
'tds',
156+
'total dissolved solids'
157+
)
158+
OR lower(coalesce(mc."Symbol", '')) = 'tds'
159+
)
160+
)
161+
SELECT
162+
t.id AS id,
163+
t.name,
164+
t.thing_type,
165+
COUNT(to2.major_chemistry_id)::integer AS tds_observation_count,
166+
AVG(to2.sample_value)::double precision AS avg_tds_value,
167+
MIN(to2.analysis_date::date) AS first_tds_observation_date,
168+
MAX(to2.analysis_date::date) AS last_tds_observation_date,
169+
l.point
170+
FROM tds_obs AS to2
171+
JOIN thing AS t ON t.id = to2.thing_id
172+
JOIN latest_location AS ll ON ll.thing_id = t.id
173+
JOIN location AS l ON l.id = ll.location_id
174+
GROUP BY t.id, t.name, t.thing_type, l.point
175+
"""
176+
177+
178+
def upgrade() -> None:
179+
bind = op.get_bind()
180+
inspector = inspect(bind)
181+
existing_tables = set(inspector.get_table_names(schema="public"))
182+
required_tds = {"NMA_MajorChemistry", "NMA_Chemistry_SampleInfo"}
183+
184+
if not required_tds.issubset(existing_tables):
185+
missing_tds_tables = sorted(t for t in required_tds if t not in existing_tables)
186+
missing_tds_tables_str = ", ".join(missing_tds_tables)
187+
raise RuntimeError(
188+
"Cannot create TDS views. The following required "
189+
f"tables are missing: {missing_tds_tables_str}"
190+
)
191+
192+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_avg_tds_wells"))
193+
op.execute(text("DROP VIEW IF EXISTS ogc_latest_tds_wells"))
194+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_latest_tds_wells"))
195+
196+
op.execute(text(_create_avg_tds_view()))
197+
op.execute(
198+
text(
199+
"COMMENT ON MATERIALIZED VIEW ogc_avg_tds_wells IS "
200+
"'Average TDS per well from major chemistry results for pygeoapi.'"
201+
)
202+
)
203+
op.execute(
204+
text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)")
205+
)
206+
207+
op.execute(text(_create_latest_tds_view()))
208+
op.execute(
209+
text(
210+
"COMMENT ON VIEW ogc_latest_tds_wells IS "
211+
"'Latest TDS per well from major chemistry results for pygeoapi.'"
212+
)
213+
)
214+
215+
216+
def downgrade() -> None:
217+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_avg_tds_wells"))
218+
op.execute(text("DROP VIEW IF EXISTS ogc_latest_tds_wells"))
219+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_latest_tds_wells"))
220+
op.execute(text(_create_avg_tds_view_with_datetime_columns()))
221+
op.execute(
222+
text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)")
223+
)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""add depth to water trend materialized view
2+
3+
Revision ID: k4d5e6f7a8b9
4+
Revises: i2b3c4d5e6f7
5+
Create Date: 2026-03-02 19:15:00.000000
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
from sqlalchemy import inspect, text
12+
13+
# revision identifiers, used by Alembic.
14+
revision: str = "k4d5e6f7a8b9"
15+
down_revision: Union[str, Sequence[str], None] = "i2b3c4d5e6f7"
16+
branch_labels: Union[str, Sequence[str], None] = None
17+
depends_on: Union[str, Sequence[str], None] = None
18+
19+
LATEST_LOCATION_CTE = """
20+
SELECT DISTINCT ON (lta.thing_id)
21+
lta.thing_id,
22+
lta.location_id,
23+
lta.effective_start
24+
FROM location_thing_association AS lta
25+
WHERE lta.effective_end IS NULL
26+
ORDER BY lta.thing_id, lta.effective_start DESC
27+
""".strip()
28+
29+
30+
def _create_depth_to_water_trend_view() -> str:
31+
return f"""
32+
CREATE MATERIALIZED VIEW ogc_depth_to_water_trend_wells AS
33+
WITH latest_location AS (
34+
{LATEST_LOCATION_CTE}
35+
),
36+
obs AS (
37+
SELECT
38+
fe.thing_id,
39+
o.observation_datetime,
40+
(o.value - COALESCE(o.measuring_point_height, 0)) AS depth_to_water_bgs
41+
FROM observation AS o
42+
JOIN sample AS s ON s.id = o.sample_id
43+
JOIN field_activity AS fa ON fa.id = s.field_activity_id
44+
JOIN field_event AS fe ON fe.id = fa.field_event_id
45+
JOIN thing AS t ON t.id = fe.thing_id
46+
WHERE
47+
t.thing_type = 'water well'
48+
AND fa.activity_type = 'groundwater level'
49+
AND o.value IS NOT NULL
50+
AND o.observation_datetime IS NOT NULL
51+
),
52+
agg AS (
53+
SELECT
54+
ob.thing_id,
55+
COUNT(*)::integer AS record_count,
56+
MIN(ob.observation_datetime) AS first_observation_datetime,
57+
MAX(ob.observation_datetime) AS last_observation_datetime,
58+
EXTRACT(EPOCH FROM (MAX(ob.observation_datetime) - MIN(ob.observation_datetime)))
59+
/ 31557600.0 AS span_years,
60+
REGR_SLOPE(
61+
ob.depth_to_water_bgs,
62+
EXTRACT(EPOCH FROM ob.observation_datetime)
63+
) * 31557600.0 AS slope_ft_per_year
64+
FROM obs AS ob
65+
GROUP BY ob.thing_id
66+
)
67+
SELECT
68+
t.id AS id,
69+
t.name,
70+
t.thing_type,
71+
a.record_count,
72+
a.first_observation_datetime,
73+
a.last_observation_datetime,
74+
a.span_years,
75+
a.slope_ft_per_year,
76+
CASE
77+
WHEN a.record_count >= 10 OR (a.record_count >= 4 AND a.span_years >= 2.0) THEN
78+
CASE
79+
WHEN a.slope_ft_per_year > 0.25 THEN 'increasing'
80+
WHEN a.slope_ft_per_year < -0.25 THEN 'decreasing'
81+
ELSE 'stable'
82+
END
83+
ELSE 'not enough data'
84+
END AS trend_category,
85+
l.point
86+
FROM agg AS a
87+
JOIN thing AS t ON t.id = a.thing_id
88+
JOIN latest_location AS ll ON ll.thing_id = t.id
89+
JOIN location AS l ON l.id = ll.location_id
90+
"""
91+
92+
93+
def upgrade() -> None:
94+
bind = op.get_bind()
95+
inspector = inspect(bind)
96+
existing_tables = set(inspector.get_table_names(schema="public"))
97+
required_tables = {
98+
"thing",
99+
"location",
100+
"location_thing_association",
101+
"observation",
102+
"sample",
103+
"field_activity",
104+
"field_event",
105+
}
106+
107+
if not required_tables.issubset(existing_tables):
108+
missing = sorted(t for t in required_tables if t not in existing_tables)
109+
raise RuntimeError(
110+
"Cannot create ogc_depth_to_water_trend_wells. Missing required tables: "
111+
+ ", ".join(missing)
112+
)
113+
114+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_depth_to_water_trend_wells"))
115+
op.execute(text(_create_depth_to_water_trend_view()))
116+
op.execute(
117+
text(
118+
"COMMENT ON MATERIALIZED VIEW ogc_depth_to_water_trend_wells IS "
119+
"'Depth-to-water trend classification for water wells.'"
120+
)
121+
)
122+
op.execute(
123+
text(
124+
"CREATE UNIQUE INDEX ux_ogc_depth_to_water_trend_wells_id "
125+
"ON ogc_depth_to_water_trend_wells (id)"
126+
)
127+
)
128+
129+
130+
def downgrade() -> None:
131+
op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_depth_to_water_trend_wells"))

0 commit comments

Comments
 (0)