|
| 1 | +"""add normalized chemistry results materialized view |
| 2 | +
|
| 3 | +Revision ID: b6f7a8b9c0d1 |
| 4 | +Revises: l5e6f7a8b9c0 |
| 5 | +Create Date: 2026-03-04 14:10:00.000000 |
| 6 | +""" |
| 7 | + |
| 8 | +from typing import Sequence, Union |
| 9 | + |
| 10 | +from alembic import op |
| 11 | +from sqlalchemy import inspect, text |
| 12 | + |
| 13 | +# revision identifiers, used by Alembic. |
| 14 | +revision: str = "b6f7a8b9c0d1" |
| 15 | +down_revision: Union[str, Sequence[str], None] = "l5e6f7a8b9c0" |
| 16 | +branch_labels: Union[str, Sequence[str], None] = None |
| 17 | +depends_on: Union[str, Sequence[str], None] = None |
| 18 | + |
| 19 | +LATEST_LOCATION_CTE = """ |
| 20 | +SELECT DISTINCT ON (lta.thing_id) |
| 21 | + lta.thing_id, |
| 22 | + lta.location_id, |
| 23 | + lta.effective_start |
| 24 | +FROM location_thing_association AS lta |
| 25 | +WHERE lta.effective_end IS NULL |
| 26 | +ORDER BY lta.thing_id, lta.effective_start DESC |
| 27 | +""".strip() |
| 28 | + |
| 29 | +# Static analyte columns for major chemistry pivots. |
| 30 | +# Includes aliases observed in current DB values (e.g., Ca(total), IONBAL, TAn, TCat, Na+K). |
| 31 | +STATIC_ANALYTE_COLUMNS: list[tuple[str, str]] = [ |
| 32 | + ("tds", "tds"), |
| 33 | + ("calcium", "calcium"), |
| 34 | + ("calcium_total", "calcium_total"), |
| 35 | + ("magnesium", "magnesium"), |
| 36 | + ("magnesium_total", "magnesium_total"), |
| 37 | + ("sodium", "sodium"), |
| 38 | + ("sodium_total", "sodium_total"), |
| 39 | + ("potassium", "potassium"), |
| 40 | + ("potassium_total", "potassium_total"), |
| 41 | + ("sodium_plus_potassium", "sodium_plus_potassium"), |
| 42 | + ("bicarbonate", "bicarbonate"), |
| 43 | + ("carbonate", "carbonate"), |
| 44 | + ("sulfate", "sulfate"), |
| 45 | + ("chloride", "chloride"), |
| 46 | + ("ion_balance", "ion_balance"), |
| 47 | + ("total_anions", "total_anions"), |
| 48 | + ("total_cations", "total_cations"), |
| 49 | + ("alkalinity", "alkalinity"), |
| 50 | + ("hardness", "hardness"), |
| 51 | + ("specific_conductance", "specific_conductance"), |
| 52 | + ("ph", "ph"), |
| 53 | + ("nitrate", "nitrate"), |
| 54 | + ("fluoride", "fluoride"), |
| 55 | + ("silica", "silica"), |
| 56 | +] |
| 57 | + |
| 58 | + |
| 59 | +def _static_analyte_select_columns() -> str: |
| 60 | + return ",\n".join( |
| 61 | + [ |
| 62 | + ( |
| 63 | + " MAX(lr.sample_value) FILTER " |
| 64 | + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}" |
| 65 | + ) |
| 66 | + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS |
| 67 | + ] |
| 68 | + ) |
| 69 | + |
| 70 | + |
| 71 | +def _static_analyte_unit_columns() -> str: |
| 72 | + return ",\n".join( |
| 73 | + [ |
| 74 | + ( |
| 75 | + " MAX(lr.units) FILTER " |
| 76 | + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}_units" |
| 77 | + ) |
| 78 | + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS |
| 79 | + ] |
| 80 | + ) |
| 81 | + |
| 82 | + |
| 83 | +def _create_normalized_chemistry_results_view() -> str: |
| 84 | + static_columns = _static_analyte_select_columns() |
| 85 | + static_unit_columns = _static_analyte_unit_columns() |
| 86 | + return f""" |
| 87 | + CREATE MATERIALIZED VIEW ogc_normalized_chemistry_results AS |
| 88 | + WITH latest_location AS ( |
| 89 | +{LATEST_LOCATION_CTE} |
| 90 | + ), |
| 91 | + chemistry_rows AS ( |
| 92 | + SELECT |
| 93 | + csi.thing_id, |
| 94 | + mc.id AS result_id, |
| 95 | + COALESCE(mc."AnalysisDate", csi."CollectionDate") AS observation_datetime, |
| 96 | + trim(mc."Analyte") AS analyte_name, |
| 97 | + trim(mc."Symbol") AS symbol_name, |
| 98 | + mc."SampleValue"::double precision AS sample_value, |
| 99 | + mc."Units" AS units |
| 100 | + FROM "NMA_MajorChemistry" AS mc |
| 101 | + JOIN "NMA_Chemistry_SampleInfo" AS csi |
| 102 | + ON csi.id = mc.chemistry_sample_info_id |
| 103 | + WHERE mc."SampleValue" IS NOT NULL |
| 104 | + ), |
| 105 | + normalized_rows AS ( |
| 106 | + SELECT |
| 107 | + cr.thing_id, |
| 108 | + cr.result_id, |
| 109 | + cr.observation_datetime, |
| 110 | + NULLIF( |
| 111 | + regexp_replace( |
| 112 | + lower(trim(coalesce(cr.analyte_name, ''))), |
| 113 | + '[^a-z0-9]+', |
| 114 | + '', |
| 115 | + 'g' |
| 116 | + ), |
| 117 | + '' |
| 118 | + ) AS analyte_token, |
| 119 | + NULLIF( |
| 120 | + regexp_replace( |
| 121 | + lower(trim(coalesce(cr.symbol_name, ''))), |
| 122 | + '[^a-z0-9]+', |
| 123 | + '', |
| 124 | + 'g' |
| 125 | + ), |
| 126 | + '' |
| 127 | + ) AS symbol_token, |
| 128 | + cr.sample_value, |
| 129 | + cr.units |
| 130 | + FROM chemistry_rows AS cr |
| 131 | + ), |
| 132 | + mapped_rows AS ( |
| 133 | + SELECT |
| 134 | + nr.thing_id, |
| 135 | + nr.result_id, |
| 136 | + nr.observation_datetime, |
| 137 | + CASE |
| 138 | + WHEN coalesce(nr.symbol_token, '') = 'tds' |
| 139 | + OR coalesce(nr.analyte_token, '') IN ('tds', 'totaldissolvedsolids') |
| 140 | + THEN 'tds' |
| 141 | +
|
| 142 | + WHEN coalesce(nr.symbol_token, '') = 'ca' |
| 143 | + OR coalesce(nr.analyte_token, '') = 'ca' |
| 144 | + THEN 'calcium' |
| 145 | + WHEN coalesce(nr.analyte_token, '') = 'catotal' |
| 146 | + THEN 'calcium_total' |
| 147 | +
|
| 148 | + WHEN coalesce(nr.symbol_token, '') = 'mg' |
| 149 | + OR coalesce(nr.analyte_token, '') = 'mg' |
| 150 | + THEN 'magnesium' |
| 151 | + WHEN coalesce(nr.analyte_token, '') = 'mgtotal' |
| 152 | + THEN 'magnesium_total' |
| 153 | +
|
| 154 | + WHEN coalesce(nr.symbol_token, '') = 'na' |
| 155 | + OR coalesce(nr.analyte_token, '') = 'na' |
| 156 | + THEN 'sodium' |
| 157 | + WHEN coalesce(nr.analyte_token, '') = 'natotal' |
| 158 | + THEN 'sodium_total' |
| 159 | +
|
| 160 | + WHEN coalesce(nr.symbol_token, '') = 'k' |
| 161 | + OR coalesce(nr.analyte_token, '') = 'k' |
| 162 | + THEN 'potassium' |
| 163 | + WHEN coalesce(nr.analyte_token, '') = 'ktotal' |
| 164 | + THEN 'potassium_total' |
| 165 | +
|
| 166 | + WHEN coalesce(nr.analyte_token, '') = 'nak' |
| 167 | + THEN 'sodium_plus_potassium' |
| 168 | +
|
| 169 | + WHEN coalesce(nr.symbol_token, '') = 'hco3' |
| 170 | + OR coalesce(nr.analyte_token, '') = 'hco3' |
| 171 | + THEN 'bicarbonate' |
| 172 | + WHEN coalesce(nr.symbol_token, '') = 'co3' |
| 173 | + OR coalesce(nr.analyte_token, '') = 'co3' |
| 174 | + THEN 'carbonate' |
| 175 | + WHEN coalesce(nr.symbol_token, '') = 'so4' |
| 176 | + OR coalesce(nr.analyte_token, '') = 'so4' |
| 177 | + THEN 'sulfate' |
| 178 | + WHEN coalesce(nr.symbol_token, '') = 'cl' |
| 179 | + OR coalesce(nr.analyte_token, '') = 'cl' |
| 180 | + THEN 'chloride' |
| 181 | +
|
| 182 | + WHEN coalesce(nr.analyte_token, '') = 'ionbal' |
| 183 | + THEN 'ion_balance' |
| 184 | + WHEN coalesce(nr.analyte_token, '') = 'tan' |
| 185 | + THEN 'total_anions' |
| 186 | + WHEN coalesce(nr.analyte_token, '') = 'tcat' |
| 187 | + THEN 'total_cations' |
| 188 | +
|
| 189 | + WHEN coalesce(nr.analyte_token, '') IN ('alk', 'alkalinity') |
| 190 | + THEN 'alkalinity' |
| 191 | + WHEN coalesce(nr.analyte_token, '') IN ('hrd', 'hardness') |
| 192 | + THEN 'hardness' |
| 193 | + WHEN coalesce(nr.analyte_token, '') IN ( |
| 194 | + 'condlab', |
| 195 | + 'specificconductance', |
| 196 | + 'specificconductivity', |
| 197 | + 'conductivity' |
| 198 | + ) |
| 199 | + THEN 'specific_conductance' |
| 200 | + WHEN coalesce(nr.symbol_token, '') = 'ph' |
| 201 | + OR coalesce(nr.analyte_token, '') IN ('ph', 'phl') |
| 202 | + THEN 'ph' |
| 203 | +
|
| 204 | + WHEN coalesce(nr.symbol_token, '') = 'no3' |
| 205 | + OR coalesce(nr.analyte_token, '') IN ('no3', 'nitrate') |
| 206 | + THEN 'nitrate' |
| 207 | + WHEN coalesce(nr.symbol_token, '') = 'f' |
| 208 | + OR coalesce(nr.analyte_token, '') IN ('f', 'fluoride') |
| 209 | + THEN 'fluoride' |
| 210 | + WHEN coalesce(nr.symbol_token, '') = 'sio2' |
| 211 | + OR coalesce(nr.analyte_token, '') IN ('sio2', 'silica') |
| 212 | + THEN 'silica' |
| 213 | +
|
| 214 | + ELSE NULL |
| 215 | + END AS analyte_key, |
| 216 | + nr.sample_value, |
| 217 | + nr.units |
| 218 | + FROM normalized_rows AS nr |
| 219 | + ), |
| 220 | + latest_results AS ( |
| 221 | + SELECT |
| 222 | + mr.thing_id, |
| 223 | + mr.analyte_key, |
| 224 | + mr.sample_value, |
| 225 | + mr.units, |
| 226 | + mr.observation_datetime, |
| 227 | + ROW_NUMBER() OVER ( |
| 228 | + PARTITION BY mr.thing_id, mr.analyte_key |
| 229 | + ORDER BY mr.observation_datetime DESC NULLS LAST, mr.result_id DESC |
| 230 | + ) AS rn |
| 231 | + FROM mapped_rows AS mr |
| 232 | + WHERE mr.analyte_key IS NOT NULL |
| 233 | + ) |
| 234 | + SELECT |
| 235 | + t.id AS id, |
| 236 | + ll.location_id, |
| 237 | + t.name, |
| 238 | + t.thing_type, |
| 239 | + COUNT(*)::integer AS analyte_count, |
| 240 | + MAX(lr.observation_datetime::date) AS latest_chemistry_date, |
| 241 | +{static_columns}, |
| 242 | +{static_unit_columns}, |
| 243 | + l.point |
| 244 | + FROM latest_results AS lr |
| 245 | + JOIN thing AS t ON t.id = lr.thing_id |
| 246 | + JOIN latest_location AS ll ON ll.thing_id = t.id |
| 247 | + JOIN location AS l ON l.id = ll.location_id |
| 248 | + WHERE lr.rn = 1 |
| 249 | + GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point |
| 250 | + """ |
| 251 | + |
| 252 | + |
| 253 | +def upgrade() -> None: |
| 254 | + bind = op.get_bind() |
| 255 | + inspector = inspect(bind) |
| 256 | + existing_tables = set(inspector.get_table_names(schema="public")) |
| 257 | + required_tables = { |
| 258 | + "thing", |
| 259 | + "location", |
| 260 | + "location_thing_association", |
| 261 | + "NMA_Chemistry_SampleInfo", |
| 262 | + "NMA_MajorChemistry", |
| 263 | + } |
| 264 | + |
| 265 | + if not required_tables.issubset(existing_tables): |
| 266 | + missing = sorted(t for t in required_tables if t not in existing_tables) |
| 267 | + raise RuntimeError( |
| 268 | + "Cannot create ogc_normalized_chemistry_results. Missing required tables: " |
| 269 | + + ", ".join(missing) |
| 270 | + ) |
| 271 | + |
| 272 | + op.execute( |
| 273 | + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") |
| 274 | + ) |
| 275 | + op.execute(text(_create_normalized_chemistry_results_view())) |
| 276 | + op.execute( |
| 277 | + text( |
| 278 | + "COMMENT ON MATERIALIZED VIEW ogc_normalized_chemistry_results IS " |
| 279 | + "'Latest major-chemistry analyte values per location, pivoted into static analyte columns.'" |
| 280 | + ) |
| 281 | + ) |
| 282 | + op.execute( |
| 283 | + text( |
| 284 | + "CREATE UNIQUE INDEX ux_ogc_normalized_chemistry_results_id " |
| 285 | + "ON ogc_normalized_chemistry_results (id)" |
| 286 | + ) |
| 287 | + ) |
| 288 | + |
| 289 | + |
| 290 | +def downgrade() -> None: |
| 291 | + op.execute( |
| 292 | + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") |
| 293 | + ) |
0 commit comments