From 701899b68aa1ab58a3d5ad1f37b4bfe1edd3ab9d Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:37:44 +0200 Subject: [PATCH 1/6] fix(pairs): discriminate OM/IEM forecasts by source, not issued_at (#67) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 20+ Open-Meteo rows carry a derived issued_at, so the old issued_at-presence split in build_pairs_row() misrouted them into the IEM MOS aggregation path — silently nulling forecast temps and polluting IEM run-selection when both sources are combined. Discriminate by the authoritative source field instead (open_meteo* -> Open-Meteo, else IEM), matching the contract research._fetch_open_meteo_range already documents. Also teach _aggregate_fcst_temps_openmeteo to fall back to a pre-converted temperature_f so source-discriminated rows from the research() wrapper (which emits temperature_f, not temperature_c) aggregate correctly instead of returning null. TDD: 3 new regression tests in test_pairs.py cover the derived-issued_at classification, IEM-still-preferred, and the research-wrapper temperature_f shape. --- .../core/src/mostlyright/_internal/_pairs.py | 47 ++++++--- packages/core/tests/_internal/test_pairs.py | 98 ++++++++++++++++++- 2 files changed, 128 insertions(+), 17 deletions(-) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index f152d4ad..456191c9 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -209,9 +209,13 @@ def _aggregate_fcst_temps_openmeteo( window_start_iso: str, window_end_iso: str, ) -> tuple[float | None, float | None]: - """Aggregate Open-Meteo hourly temperature_c (-> F) over the settlement window. + """Aggregate Open-Meteo hourly temperature (-> F) over the settlement window. - Open-Meteo stores temperature in Celsius. Conversion: F = C * 9/5 + 32. + Open-Meteo rows store temperature in Celsius under ``temperature_c``. + Conversion: F = C * 9/5 + 32. As a fallback, rows that already carry a + pre-converted ``temperature_f`` (the shape ``research._fetch_open_meteo_range`` + emits) are used as-is — without this fallback, source-discriminated OM rows + from the research() wrapper would aggregate to null (issue #67). Args: run_records: All Open-Meteo hourly records for the date. @@ -221,12 +225,17 @@ def _aggregate_fcst_temps_openmeteo( Returns: (fcst_high_f, fcst_low_f) or (None, None) if no records in window. """ - temps_f = [ - r["temperature_c"] * 9 / 5 + 32 - for r in run_records - if r.get("temperature_c") is not None - and window_start_iso <= r.get("valid_at", "") <= window_end_iso - ] + temps_f: list[float] = [] + for r in run_records: + if not (window_start_iso <= r.get("valid_at", "") <= window_end_iso): + continue + temp_c = r.get("temperature_c") + if temp_c is not None: + temps_f.append(temp_c * 9 / 5 + 32) + continue + temp_f = r.get("temperature_f") + if temp_f is not None: + temps_f.append(temp_f) return (max(temps_f), min(temps_f)) if temps_f else (None, None) @@ -250,7 +259,10 @@ def build_pairs_row( climate: NWS CLI record for the date (or None). Must be a dict or None - non-dict values are treated as None. forecasts: All forecast records with valid_at (or None if unavailable). - IEM MOS records have issued_at; Open-Meteo records do not. + Records are split by their ``source`` field: ``source`` prefixed + ``open_meteo`` -> Open-Meteo, everything else (e.g. + ``source="iem.archive"``) -> IEM MOS. ``issued_at`` is NOT used as + the discriminator (Phase 20+ Open-Meteo rows carry one too). forecast_model: Filter IEM MOS records to this model before run selection. None = no filtering (best available run). tz_override: IANA timezone name override for stations not in the known @@ -295,9 +307,20 @@ def build_pairs_row( win_start_iso = win_start.strftime("%Y-%m-%dT%H:%M:%SZ") win_end_iso = win_end.strftime("%Y-%m-%dT%H:%M:%SZ") - # Separate IEM MOS (has issued_at) from Open-Meteo (no issued_at) - iem_records = [r for r in forecasts if r.get("issued_at")] - om_records = [r for r in forecasts if not r.get("issued_at")] + # Separate IEM MOS from Open-Meteo by the authoritative ``source`` + # field (issue #67). ``issued_at`` presence is NOT a valid + # discriminator: Phase 20+ Open-Meteo rows carry a derived + # ``issued_at`` (for cycle-math / previous-runs caching), so the old + # ``issued_at``-based split misrouted those rows into the IEM MOS + # aggregation path, silently nulling forecast temps and polluting IEM + # run-selection. IEM rows carry ``source="iem.archive"``; Open-Meteo + # rows carry ``source`` prefixed ``open_meteo`` (.previous_runs / + # .single_run / .seamless / .live). research._fetch_open_meteo_range + # already documents this contract ("discriminates via row.get('source')"). + om_records = [r for r in forecasts if str(r.get("source") or "").startswith("open_meteo")] + iem_records = [ + r for r in forecasts if not str(r.get("source") or "").startswith("open_meteo") + ] # Apply forecast_model filter to IEM MOS records before run selection. # Phase 17 Wave 4 iter-3 review HIGH: case-insensitive match because diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index 1fa4549c..a73f1b36 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -86,18 +86,34 @@ def _iem_record( def _om_record( valid_at: str, - temperature_c: float = 28.0, + temperature_c: float | None = 28.0, model: str = "open-meteo-gfs", precipitation_probability_pct: float | None = None, + source: str = "open_meteo.previous_runs", + issued_at: str | None = None, + temperature_f: float | None = None, ) -> dict: - """Open-Meteo hourly forecast record matching specs/forecast_series.json.""" - return { + """Open-Meteo hourly forecast record matching specs/forecast_series.json. + + Real Open-Meteo rows always carry a ``source`` prefixed ``open_meteo`` — + that field (NOT ``issued_at`` presence) is the authoritative discriminator + from IEM MOS (issue #67). Phase 20+ rows also carry a derived ``issued_at``. + ``temperature_f`` is accepted to mirror the pre-converted shape that + ``research._fetch_open_meteo_range`` emits. + """ + rec: dict = { "valid_at": valid_at, - "temperature_c": temperature_c, "model": model, "precipitation_probability_pct": precipitation_probability_pct, - # No issued_at - this distinguishes Open-Meteo from IEM MOS + "source": source, } + if temperature_c is not None: + rec["temperature_c"] = temperature_c + if temperature_f is not None: + rec["temperature_f"] = temperature_f + if issued_at is not None: + rec["issued_at"] = issued_at + return rec # --------------------------------------------------------------------------- @@ -414,6 +430,78 @@ def test_open_meteo_fallback_when_iem_yields_no_window_data(self) -> None: assert row["fcst_high_f"] == pytest.approx(86.0) assert row["fcst_model"] == "open-meteo-gfs" + # ----- issue #67: source-based OM/IEM discrimination ----- + + def test_om_with_derived_issued_at_classified_by_source(self) -> None: + """Issue #67: Phase 20+ OM rows carry a derived ``issued_at`` but must + still be classified as Open-Meteo via their ``source`` prefix, not + misrouted into the IEM MOS aggregation path.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=20.0, # 68F + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=32.0, # 89.6F + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + assert row["fcst_model"] == "open-meteo-gfs" + + def test_iem_preferred_over_om_even_when_om_has_issued_at(self) -> None: + """Issue #67: a hot OM row carrying ``issued_at`` must NOT pollute the + IEM MOS run-selection; IEM still wins and OM stays a fallback.""" + iem = [ + _iem_record( + "2024-07-04T12:00:00Z", + "2024-07-04T14:00:00Z", + temperature_f=89.0, + model="GFS", + ) + ] + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=99.0, # very hot - must not win, must not corrupt IEM run + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ) + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, iem + om) + assert row["fcst_high_f"] == 89.0 + assert row["fcst_model"] == "GFS" + + def test_om_research_wrapper_shape_temperature_f(self) -> None: + """Issue #67: rows shaped like ``_fetch_open_meteo_range`` output — + ``source`` + derived ``issued_at`` + pre-converted ``temperature_f`` + (no ``temperature_c``) — must still aggregate, not return null.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=None, + temperature_f=68.0, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=None, + temperature_f=89.6, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + def test_fcst_pop_6hr_pct_from_iem(self) -> None: records = [ _iem_record("2024-07-04T12:00:00Z", "2024-07-04T08:00:00Z", pop_6hr_pct=20.0), From 9ca5b4886f1e20ea1a85d49de67c58f48e3fb89f Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:40:03 +0200 Subject: [PATCH 2/6] docs(pairs): update module docstring to reflect source-based forecast split (#67) --- .../core/src/mostlyright/_internal/_pairs.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index 456191c9..c6e06a2b 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -31,12 +31,16 @@ This is the primary training/feature surface for AI settlement models. -Forecast join: - - IEM MOS records (`forecast.json`) have `issued_at`; grouped by issued_at to - pick the most-recent run before market close, then temperature_f values for - valid_at timestamps within the settlement window are aggregated (max/min). - - Open-Meteo records (`forecast_series.json`) have no issued_at; all records - in the settlement window are used. temperature_c is converted to F. +Forecast join (records are split by their authoritative ``source`` field — +``source`` prefixed ``open_meteo`` -> Open-Meteo, else IEM MOS; see issue #67): + - IEM MOS records (`forecast.json`, `source="iem.archive"`) are grouped by + issued_at to pick the most-recent run before market close, then + temperature_f values for valid_at timestamps within the settlement window + are aggregated (max/min). + - Open-Meteo records (`forecast_series.json`, `source="open_meteo.*"`) use + all records in the settlement window. temperature_c is converted to F + (or a pre-converted temperature_f is used as-is). NOTE: Phase 20+ OM rows + also carry a derived `issued_at`, so `issued_at` is NOT the discriminator. - If both are available, IEM MOS is preferred. Open-Meteo used as fallback. - If forecast data is unavailable, forecast columns are None - the row is still returned. From 5d9187ba812ec5cad2af6fe1ec2f97c8050433a7 Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:45:22 +0200 Subject: [PATCH 3/6] fix(pairs): preserve OM pop/qpf after source routing (#67, codex P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex review caught a regression: research._fetch_open_meteo_range emits OM rows carrying pop_6hr_pct / qpf_6hr_in (not precipitation_probability_pct and no QPF read). Pre-#67 those rows flowed through the IEM branch, which set both fields; after source-routing they hit the OM branch, which only read precipitation_probability_pct and never set QPF — silently nulling precip columns for research(forecast_source="open_meteo"). OM branch now accepts the pop_6hr_pct alias (explicit None-checks keep a valid 0.0) and sums qpf_6hr_in over the window, matching IEM semantics. +2 regression tests. --- .../core/src/mostlyright/_internal/_pairs.py | 24 +++++++-- packages/core/tests/_internal/test_pairs.py | 49 +++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index c6e06a2b..5d1fee8a 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -373,12 +373,26 @@ def build_pairs_row( window_om = [ r for r in om_records if win_start_iso <= r.get("valid_at", "") <= win_end_iso ] - probs = [ - r["precipitation_probability_pct"] - for r in window_om - if r.get("precipitation_probability_pct") is not None - ] + # POP: accept the unit-contract ``precipitation_probability_pct`` + # OR the ``pop_6hr_pct`` alias that research._fetch_open_meteo_range + # emits. Without the alias, source-discriminated wrapper rows (which + # carry pop_6hr_pct, not precipitation_probability_pct) would regress + # POP to None now that they no longer flow through the IEM branch + # (issue #67). Explicit None-checks preserve a valid 0.0 reading. + probs: list[float] = [] + for r in window_om: + p = r.get("precipitation_probability_pct") + if p is None: + p = r.get("pop_6hr_pct") + if p is not None: + probs.append(p) fcst_pop = max(probs) if probs else None + # QPF: the OM unit-contract shape carries no QPF, but the research + # wrapper emits ``qpf_6hr_in`` — sum it over the window to match the + # IEM-branch semantics (else wrapper QPF regresses to None too). + qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None] + if qpfs_om: + fcst_qpf = sum(qpfs_om) fcst.update( { diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index a73f1b36..cfc05ed5 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -92,6 +92,8 @@ def _om_record( source: str = "open_meteo.previous_runs", issued_at: str | None = None, temperature_f: float | None = None, + pop_6hr_pct: float | None = None, + qpf_6hr_in: float | None = None, ) -> dict: """Open-Meteo hourly forecast record matching specs/forecast_series.json. @@ -113,6 +115,10 @@ def _om_record( rec["temperature_f"] = temperature_f if issued_at is not None: rec["issued_at"] = issued_at + if pop_6hr_pct is not None: + rec["pop_6hr_pct"] = pop_6hr_pct + if qpf_6hr_in is not None: + rec["qpf_6hr_in"] = qpf_6hr_in return rec @@ -502,6 +508,49 @@ def test_om_research_wrapper_shape_temperature_f(self) -> None: assert row["fcst_high_f"] == pytest.approx(89.6) assert row["fcst_low_f"] == pytest.approx(68.0) + def test_om_research_wrapper_pop_and_qpf_survive(self) -> None: + """Issue #67 (codex P2): research-wrapper OM rows carry ``pop_6hr_pct`` + / ``qpf_6hr_in`` (not ``precipitation_probability_pct``). Now that + source routing sends them to the OM branch, those precip columns must + still populate — not regress to None as they would if the OM branch + only read ``precipitation_probability_pct`` and never set QPF.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=None, + temperature_f=68.0, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + pop_6hr_pct=20.0, + qpf_6hr_in=0.1, + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=None, + temperature_f=89.6, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + pop_6hr_pct=60.0, + qpf_6hr_in=0.2, + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_pop_6hr_pct"] == 60.0 # max over window + assert row["fcst_qpf_6hr_in"] == pytest.approx(0.3) # sum over window + + def test_om_pop_zero_not_dropped(self) -> None: + """A valid 0.0 POP must survive the alias fallback (no truthiness bug).""" + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=20.0, + source="open_meteo.previous_runs", + precipitation_probability_pct=0.0, + ) + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_pop_6hr_pct"] == 0.0 + def test_fcst_pop_6hr_pct_from_iem(self) -> None: records = [ _iem_record("2024-07-04T12:00:00Z", "2024-07-04T08:00:00Z", pop_6hr_pct=20.0), From cd458c68e4927495f5cbf7f64dc5d255c8c4e3af Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:48:08 +0200 Subject: [PATCH 4/6] fix(pairs): keep legacy source-less OM shape classified as OM (#67, codex P2) Codex flagged that the pure source-prefix split regressed the previously documented Open-Meteo shape (no source, no issued_at, temperature_c) to the IEM branch -> null temps. Extract _is_open_meteo_record(): source prefixed open_meteo OR (no source AND no issued_at). Real IEM rows always carry an issued_at, so a record missing both can only be legacy OM. +1 regression test. --- .../core/src/mostlyright/_internal/_pairs.py | 37 ++++++++++++++----- packages/core/tests/_internal/test_pairs.py | 14 +++++++ 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index 5d1fee8a..b98827ae 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -184,6 +184,26 @@ def _select_best_run( return best_issued, runs[best_issued] +def _is_open_meteo_record(r: dict[str, Any]) -> bool: + """True if ``r`` is an Open-Meteo forecast record (issue #67). + + The authoritative signal is the ``source`` field: real Open-Meteo rows + carry ``source`` prefixed ``open_meteo`` (.previous_runs / .single_run / + .seamless / .live), while IEM MOS rows carry ``source="iem.archive"``. + + For backward compatibility, the previously-documented *legacy* Open-Meteo + shape — no ``source`` AND no ``issued_at`` (the old ``forecast_series.json`` + discriminator) — is also treated as Open-Meteo. Real IEM MOS rows always + carry an ``issued_at``, so a record lacking both fields can only be a + legacy OM row; this avoids regressing source-less OM callers to null. + """ + src = str(r.get("source") or "") + if src.startswith("open_meteo"): + return True + # Legacy OM shape (pre-Phase-20): no source, no issued_at. + return not src and not r.get("issued_at") + + def _aggregate_fcst_temps_iem( run_records: list[dict[str, Any]], window_start_iso: str, @@ -312,19 +332,16 @@ def build_pairs_row( win_end_iso = win_end.strftime("%Y-%m-%dT%H:%M:%SZ") # Separate IEM MOS from Open-Meteo by the authoritative ``source`` - # field (issue #67). ``issued_at`` presence is NOT a valid - # discriminator: Phase 20+ Open-Meteo rows carry a derived + # field (issue #67), with a legacy no-source/no-issued_at fallback — + # see _is_open_meteo_record. ``issued_at`` presence alone is NOT a + # valid discriminator: Phase 20+ Open-Meteo rows carry a derived # ``issued_at`` (for cycle-math / previous-runs caching), so the old # ``issued_at``-based split misrouted those rows into the IEM MOS # aggregation path, silently nulling forecast temps and polluting IEM - # run-selection. IEM rows carry ``source="iem.archive"``; Open-Meteo - # rows carry ``source`` prefixed ``open_meteo`` (.previous_runs / - # .single_run / .seamless / .live). research._fetch_open_meteo_range - # already documents this contract ("discriminates via row.get('source')"). - om_records = [r for r in forecasts if str(r.get("source") or "").startswith("open_meteo")] - iem_records = [ - r for r in forecasts if not str(r.get("source") or "").startswith("open_meteo") - ] + # run-selection. research._fetch_open_meteo_range already documents + # this contract ("discriminates via row.get('source')"). + om_records = [r for r in forecasts if _is_open_meteo_record(r)] + iem_records = [r for r in forecasts if not _is_open_meteo_record(r)] # Apply forecast_model filter to IEM MOS records before run selection. # Phase 17 Wave 4 iter-3 review HIGH: case-insensitive match because diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index cfc05ed5..b8990795 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -538,6 +538,20 @@ def test_om_research_wrapper_pop_and_qpf_survive(self) -> None: assert row["fcst_pop_6hr_pct"] == 60.0 # max over window assert row["fcst_qpf_6hr_in"] == pytest.approx(0.3) # sum over window + def test_legacy_source_less_om_shape_classified_as_om(self) -> None: + """Issue #67 (codex P2): the previously-documented OM shape — no + ``source`` AND no ``issued_at``, carrying ``temperature_c`` — must + still classify as Open-Meteo (a record lacking both fields can only be + legacy OM; real IEM always carries issued_at). Without the legacy + fallback these rows would misroute to the IEM branch and null out.""" + legacy_om = [ + {"valid_at": "2024-07-04T08:00:00Z", "temperature_c": 20.0, "model": "om"}, # 68F + {"valid_at": "2024-07-04T14:00:00Z", "temperature_c": 32.0, "model": "om"}, # 89.6F + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, legacy_om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + def test_om_pop_zero_not_dropped(self) -> None: """A valid 0.0 POP must survive the alias fallback (no truthiness bug).""" om = [ From b26a5081576cd3b49f5208922d4f611a9ebf5a0b Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:51:05 +0200 Subject: [PATCH 5/6] fix(pairs): preserve OM issued_at provenance after source routing (#67, codex P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged leakage-safety provenance loss: Phase 20+ OM rows carry a derived issued_at; pre-#67 the IEM branch set fcst_issued via _select_best_run, but after source routing the OM branch left fcst_issued_at None for forecast_source="open_meteo". Restore it as the most-recent OM issued_at at-or-before market close — never leaking a run issued after settlement. +2 regression tests. --- .../core/src/mostlyright/_internal/_pairs.py | 15 +++++++++++++ packages/core/tests/_internal/test_pairs.py | 22 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index b98827ae..440eb010 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -410,6 +410,21 @@ def build_pairs_row( qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None] if qpfs_om: fcst_qpf = sum(qpfs_om) + # ISSUED_AT provenance (leakage-safety): Phase 20+ OM rows carry a + # derived issued_at. Pre-#67 these rows flowed through the IEM + # branch, which set fcst_issued via _select_best_run; after source + # routing the OM branch must restore that provenance itself or + # fcst_issued_at regresses to None for forecast_source="open_meteo". + # Use the most-recent issued_at at-or-before market close so the + # exposed timestamp never leaks a forecast issued after settlement. + cutoff_iso = market_close.strftime("%Y-%m-%dT%H:%M:%SZ") + om_issued = [ + iss + for r in window_om + if (iss := r.get("issued_at")) is not None and iss <= cutoff_iso + ] + if om_issued: + fcst_issued = max(om_issued) fcst.update( { diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index b8990795..a83aea1b 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -537,6 +537,28 @@ def test_om_research_wrapper_pop_and_qpf_survive(self) -> None: row = build_pairs_row("2024-07-04", "NYC", [], None, om) assert row["fcst_pop_6hr_pct"] == 60.0 # max over window assert row["fcst_qpf_6hr_in"] == pytest.approx(0.3) # sum over window + # Issue #67 (codex P2): OM issued_at provenance must survive routing. + assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" + + def test_om_issued_at_provenance_never_leaks_past_market_close(self) -> None: + """Issue #67 (codex P2): fcst_issued_at exposes the most-recent OM run + at-or-before market close — never a run issued after settlement.""" + # NYC market close for 2024-07-04 is 21:30Z. A 23:00Z-issued run must + # NOT be exposed; the 06:00Z run is the latest eligible. + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=30.0, + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T15:00:00Z", + temperature_c=31.0, + issued_at="2024-07-04T23:00:00Z", # after market close - must be ignored + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" def test_legacy_source_less_om_shape_classified_as_om(self) -> None: """Issue #67 (codex P2): the previously-documented OM shape — no From 12582eee6433dad6de973d73fcd4729f9efef305 Mon Sep 17 00:00:00 2001 From: helloiamvu Date: Sat, 6 Jun 2026 14:54:39 +0200 Subject: [PATCH 6/6] fix(pairs): exclude after-close OM runs from aggregation (#67, codex P1 leakage) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1: my provenance fix filtered issued_at only for the displayed timestamp; the OM aggregation still summed window rows issued AFTER market close, leaking their temp/POP/QPF into the training pair (lookahead). Pre-#67 _select_best_run applied this cutoff; the OM branch did not. Now filter window_om by issued_at <= market_close before aggregating temps/POP/QPF/ issued_at. Legacy source-less rows (no issued_at) are kept — nothing to leak. Strengthened test asserts the after-close temp is excluded, not just hidden. --- .../core/src/mostlyright/_internal/_pairs.py | 86 ++++++++++--------- packages/core/tests/_internal/test_pairs.py | 19 ++-- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index 440eb010..810f332e 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -383,48 +383,54 @@ def build_pairs_row( # Fall back to Open-Meteo if IEM MOS yielded no temperature data if fcst_high is None and om_records: - fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo( - om_records, win_start_iso, win_end_iso - ) - fcst_model = next((r.get("model") for r in om_records if r.get("model")), "open-meteo") - window_om = [ - r for r in om_records if win_start_iso <= r.get("valid_at", "") <= win_end_iso - ] - # POP: accept the unit-contract ``precipitation_probability_pct`` - # OR the ``pop_6hr_pct`` alias that research._fetch_open_meteo_range - # emits. Without the alias, source-discriminated wrapper rows (which - # carry pop_6hr_pct, not precipitation_probability_pct) would regress - # POP to None now that they no longer flow through the IEM branch - # (issue #67). Explicit None-checks preserve a valid 0.0 reading. - probs: list[float] = [] - for r in window_om: - p = r.get("precipitation_probability_pct") - if p is None: - p = r.get("pop_6hr_pct") - if p is not None: - probs.append(p) - fcst_pop = max(probs) if probs else None - # QPF: the OM unit-contract shape carries no QPF, but the research - # wrapper emits ``qpf_6hr_in`` — sum it over the window to match the - # IEM-branch semantics (else wrapper QPF regresses to None too). - qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None] - if qpfs_om: - fcst_qpf = sum(qpfs_om) - # ISSUED_AT provenance (leakage-safety): Phase 20+ OM rows carry a - # derived issued_at. Pre-#67 these rows flowed through the IEM - # branch, which set fcst_issued via _select_best_run; after source - # routing the OM branch must restore that provenance itself or - # fcst_issued_at regresses to None for forecast_source="open_meteo". - # Use the most-recent issued_at at-or-before market close so the - # exposed timestamp never leaks a forecast issued after settlement. + # Leakage guard (issue #67, codex P1): Phase 20+ OM rows carry a + # derived issued_at. Pre-#67 these flowed through the IEM branch, + # where _select_best_run filtered runs issued AFTER market close. + # The OM branch has no such filter, so apply the cutoff here too — + # otherwise a row from a run issued after settlement (e.g. live / + # single-run cycles mixed into training pairs) would leak its + # temperature/POP/QPF into the pair, not just its timestamp. + # Legacy source-less OM rows have no issued_at provenance and are + # kept (documented all-window behavior — nothing to leak). cutoff_iso = market_close.strftime("%Y-%m-%dT%H:%M:%SZ") - om_issued = [ - iss - for r in window_om - if (iss := r.get("issued_at")) is not None and iss <= cutoff_iso + window_om = [ + r + for r in om_records + if win_start_iso <= r.get("valid_at", "") <= win_end_iso + and ((iss := r.get("issued_at")) is None or iss <= cutoff_iso) ] - if om_issued: - fcst_issued = max(om_issued) + fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo( + window_om, win_start_iso, win_end_iso + ) + if window_om: + fcst_model = next( + (r.get("model") for r in window_om if r.get("model")), "open-meteo" + ) + # POP: accept the unit-contract ``precipitation_probability_pct`` + # OR the ``pop_6hr_pct`` alias research._fetch_open_meteo_range + # emits. Without the alias, source-discriminated wrapper rows + # would regress POP to None now that they no longer flow through + # the IEM branch. Explicit None-checks preserve a valid 0.0. + probs: list[float] = [] + for r in window_om: + p = r.get("precipitation_probability_pct") + if p is None: + p = r.get("pop_6hr_pct") + if p is not None: + probs.append(p) + fcst_pop = max(probs) if probs else None + # QPF: the OM unit-contract shape carries no QPF, but the + # research wrapper emits ``qpf_6hr_in`` — sum over the window to + # match IEM-branch semantics (else wrapper QPF regresses too). + qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None] + if qpfs_om: + fcst_qpf = sum(qpfs_om) + # ISSUED_AT provenance: most-recent run timestamp (all already + # <= cutoff by the window_om filter above). None for legacy + # source-less rows. + om_issued = [iss for r in window_om if (iss := r.get("issued_at")) is not None] + if om_issued: + fcst_issued = max(om_issued) fcst.update( { diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index a83aea1b..a6197343 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -540,25 +540,28 @@ def test_om_research_wrapper_pop_and_qpf_survive(self) -> None: # Issue #67 (codex P2): OM issued_at provenance must survive routing. assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" - def test_om_issued_at_provenance_never_leaks_past_market_close(self) -> None: - """Issue #67 (codex P2): fcst_issued_at exposes the most-recent OM run - at-or-before market close — never a run issued after settlement.""" - # NYC market close for 2024-07-04 is 21:30Z. A 23:00Z-issued run must - # NOT be exposed; the 06:00Z run is the latest eligible. + def test_om_after_close_run_excluded_from_aggregation(self) -> None: + """Issue #67 (codex P1, leakage): an OM row from a run issued AFTER + market close must not contribute its temp/POP/QPF to the pair, and its + timestamp must not be exposed. Only the eligible (<=close) run counts.""" + # NYC market close for 2024-07-04 is 21:30Z. A 23:00Z-issued run is + # lookahead — its hot 31C reading must NOT raise fcst_high_f. om = [ _om_record( "2024-07-04T14:00:00Z", - temperature_c=30.0, + temperature_c=30.0, # 86F - eligible run issued_at="2024-07-04T06:00:00Z", ), _om_record( "2024-07-04T15:00:00Z", - temperature_c=31.0, - issued_at="2024-07-04T23:00:00Z", # after market close - must be ignored + temperature_c=31.0, # 87.8F - AFTER close, must be excluded + issued_at="2024-07-04T23:00:00Z", ), ] row = build_pairs_row("2024-07-04", "NYC", [], None, om) assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" + assert row["fcst_high_f"] == pytest.approx(86.0) # 87.8F leak excluded + assert row["fcst_low_f"] == pytest.approx(86.0) def test_legacy_source_less_om_shape_classified_as_om(self) -> None: """Issue #67 (codex P2): the previously-documented OM shape — no