diff --git a/packages/core/src/mostlyright/_internal/_pairs.py b/packages/core/src/mostlyright/_internal/_pairs.py index f152d4ad..810f332e 100644 --- a/packages/core/src/mostlyright/_internal/_pairs.py +++ b/packages/core/src/mostlyright/_internal/_pairs.py @@ -31,12 +31,16 @@ This is the primary training/feature surface for AI settlement models. -Forecast join: - - IEM MOS records (`forecast.json`) have `issued_at`; grouped by issued_at to - pick the most-recent run before market close, then temperature_f values for - valid_at timestamps within the settlement window are aggregated (max/min). - - Open-Meteo records (`forecast_series.json`) have no issued_at; all records - in the settlement window are used. temperature_c is converted to F. +Forecast join (records are split by their authoritative ``source`` field — +``source`` prefixed ``open_meteo`` -> Open-Meteo, else IEM MOS; see issue #67): + - IEM MOS records (`forecast.json`, `source="iem.archive"`) are grouped by + issued_at to pick the most-recent run before market close, then + temperature_f values for valid_at timestamps within the settlement window + are aggregated (max/min). + - Open-Meteo records (`forecast_series.json`, `source="open_meteo.*"`) use + all records in the settlement window. temperature_c is converted to F + (or a pre-converted temperature_f is used as-is). NOTE: Phase 20+ OM rows + also carry a derived `issued_at`, so `issued_at` is NOT the discriminator. - If both are available, IEM MOS is preferred. Open-Meteo used as fallback. - If forecast data is unavailable, forecast columns are None - the row is still returned. @@ -180,6 +184,26 @@ def _select_best_run( return best_issued, runs[best_issued] +def _is_open_meteo_record(r: dict[str, Any]) -> bool: + """True if ``r`` is an Open-Meteo forecast record (issue #67). + + The authoritative signal is the ``source`` field: real Open-Meteo rows + carry ``source`` prefixed ``open_meteo`` (.previous_runs / .single_run / + .seamless / .live), while IEM MOS rows carry ``source="iem.archive"``. + + For backward compatibility, the previously-documented *legacy* Open-Meteo + shape — no ``source`` AND no ``issued_at`` (the old ``forecast_series.json`` + discriminator) — is also treated as Open-Meteo. Real IEM MOS rows always + carry an ``issued_at``, so a record lacking both fields can only be a + legacy OM row; this avoids regressing source-less OM callers to null. + """ + src = str(r.get("source") or "") + if src.startswith("open_meteo"): + return True + # Legacy OM shape (pre-Phase-20): no source, no issued_at. + return not src and not r.get("issued_at") + + def _aggregate_fcst_temps_iem( run_records: list[dict[str, Any]], window_start_iso: str, @@ -209,9 +233,13 @@ def _aggregate_fcst_temps_openmeteo( window_start_iso: str, window_end_iso: str, ) -> tuple[float | None, float | None]: - """Aggregate Open-Meteo hourly temperature_c (-> F) over the settlement window. + """Aggregate Open-Meteo hourly temperature (-> F) over the settlement window. - Open-Meteo stores temperature in Celsius. Conversion: F = C * 9/5 + 32. + Open-Meteo rows store temperature in Celsius under ``temperature_c``. + Conversion: F = C * 9/5 + 32. As a fallback, rows that already carry a + pre-converted ``temperature_f`` (the shape ``research._fetch_open_meteo_range`` + emits) are used as-is — without this fallback, source-discriminated OM rows + from the research() wrapper would aggregate to null (issue #67). Args: run_records: All Open-Meteo hourly records for the date. @@ -221,12 +249,17 @@ def _aggregate_fcst_temps_openmeteo( Returns: (fcst_high_f, fcst_low_f) or (None, None) if no records in window. """ - temps_f = [ - r["temperature_c"] * 9 / 5 + 32 - for r in run_records - if r.get("temperature_c") is not None - and window_start_iso <= r.get("valid_at", "") <= window_end_iso - ] + temps_f: list[float] = [] + for r in run_records: + if not (window_start_iso <= r.get("valid_at", "") <= window_end_iso): + continue + temp_c = r.get("temperature_c") + if temp_c is not None: + temps_f.append(temp_c * 9 / 5 + 32) + continue + temp_f = r.get("temperature_f") + if temp_f is not None: + temps_f.append(temp_f) return (max(temps_f), min(temps_f)) if temps_f else (None, None) @@ -250,7 +283,10 @@ def build_pairs_row( climate: NWS CLI record for the date (or None). Must be a dict or None - non-dict values are treated as None. forecasts: All forecast records with valid_at (or None if unavailable). - IEM MOS records have issued_at; Open-Meteo records do not. + Records are split by their ``source`` field: ``source`` prefixed + ``open_meteo`` -> Open-Meteo, everything else (e.g. + ``source="iem.archive"``) -> IEM MOS. ``issued_at`` is NOT used as + the discriminator (Phase 20+ Open-Meteo rows carry one too). forecast_model: Filter IEM MOS records to this model before run selection. None = no filtering (best available run). tz_override: IANA timezone name override for stations not in the known @@ -295,9 +331,17 @@ def build_pairs_row( win_start_iso = win_start.strftime("%Y-%m-%dT%H:%M:%SZ") win_end_iso = win_end.strftime("%Y-%m-%dT%H:%M:%SZ") - # Separate IEM MOS (has issued_at) from Open-Meteo (no issued_at) - iem_records = [r for r in forecasts if r.get("issued_at")] - om_records = [r for r in forecasts if not r.get("issued_at")] + # Separate IEM MOS from Open-Meteo by the authoritative ``source`` + # field (issue #67), with a legacy no-source/no-issued_at fallback — + # see _is_open_meteo_record. ``issued_at`` presence alone is NOT a + # valid discriminator: Phase 20+ Open-Meteo rows carry a derived + # ``issued_at`` (for cycle-math / previous-runs caching), so the old + # ``issued_at``-based split misrouted those rows into the IEM MOS + # aggregation path, silently nulling forecast temps and polluting IEM + # run-selection. research._fetch_open_meteo_range already documents + # this contract ("discriminates via row.get('source')"). + om_records = [r for r in forecasts if _is_open_meteo_record(r)] + iem_records = [r for r in forecasts if not _is_open_meteo_record(r)] # Apply forecast_model filter to IEM MOS records before run selection. # Phase 17 Wave 4 iter-3 review HIGH: case-insensitive match because @@ -339,19 +383,54 @@ def build_pairs_row( # Fall back to Open-Meteo if IEM MOS yielded no temperature data if fcst_high is None and om_records: - fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo( - om_records, win_start_iso, win_end_iso - ) - fcst_model = next((r.get("model") for r in om_records if r.get("model")), "open-meteo") + # Leakage guard (issue #67, codex P1): Phase 20+ OM rows carry a + # derived issued_at. Pre-#67 these flowed through the IEM branch, + # where _select_best_run filtered runs issued AFTER market close. + # The OM branch has no such filter, so apply the cutoff here too — + # otherwise a row from a run issued after settlement (e.g. live / + # single-run cycles mixed into training pairs) would leak its + # temperature/POP/QPF into the pair, not just its timestamp. + # Legacy source-less OM rows have no issued_at provenance and are + # kept (documented all-window behavior — nothing to leak). + cutoff_iso = market_close.strftime("%Y-%m-%dT%H:%M:%SZ") window_om = [ - r for r in om_records if win_start_iso <= r.get("valid_at", "") <= win_end_iso - ] - probs = [ - r["precipitation_probability_pct"] - for r in window_om - if r.get("precipitation_probability_pct") is not None + r + for r in om_records + if win_start_iso <= r.get("valid_at", "") <= win_end_iso + and ((iss := r.get("issued_at")) is None or iss <= cutoff_iso) ] - fcst_pop = max(probs) if probs else None + fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo( + window_om, win_start_iso, win_end_iso + ) + if window_om: + fcst_model = next( + (r.get("model") for r in window_om if r.get("model")), "open-meteo" + ) + # POP: accept the unit-contract ``precipitation_probability_pct`` + # OR the ``pop_6hr_pct`` alias research._fetch_open_meteo_range + # emits. Without the alias, source-discriminated wrapper rows + # would regress POP to None now that they no longer flow through + # the IEM branch. Explicit None-checks preserve a valid 0.0. + probs: list[float] = [] + for r in window_om: + p = r.get("precipitation_probability_pct") + if p is None: + p = r.get("pop_6hr_pct") + if p is not None: + probs.append(p) + fcst_pop = max(probs) if probs else None + # QPF: the OM unit-contract shape carries no QPF, but the + # research wrapper emits ``qpf_6hr_in`` — sum over the window to + # match IEM-branch semantics (else wrapper QPF regresses too). + qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None] + if qpfs_om: + fcst_qpf = sum(qpfs_om) + # ISSUED_AT provenance: most-recent run timestamp (all already + # <= cutoff by the window_om filter above). None for legacy + # source-less rows. + om_issued = [iss for r in window_om if (iss := r.get("issued_at")) is not None] + if om_issued: + fcst_issued = max(om_issued) fcst.update( { diff --git a/packages/core/tests/_internal/test_pairs.py b/packages/core/tests/_internal/test_pairs.py index 1fa4549c..a6197343 100644 --- a/packages/core/tests/_internal/test_pairs.py +++ b/packages/core/tests/_internal/test_pairs.py @@ -86,18 +86,40 @@ def _iem_record( def _om_record( valid_at: str, - temperature_c: float = 28.0, + temperature_c: float | None = 28.0, model: str = "open-meteo-gfs", precipitation_probability_pct: float | None = None, + source: str = "open_meteo.previous_runs", + issued_at: str | None = None, + temperature_f: float | None = None, + pop_6hr_pct: float | None = None, + qpf_6hr_in: float | None = None, ) -> dict: - """Open-Meteo hourly forecast record matching specs/forecast_series.json.""" - return { + """Open-Meteo hourly forecast record matching specs/forecast_series.json. + + Real Open-Meteo rows always carry a ``source`` prefixed ``open_meteo`` — + that field (NOT ``issued_at`` presence) is the authoritative discriminator + from IEM MOS (issue #67). Phase 20+ rows also carry a derived ``issued_at``. + ``temperature_f`` is accepted to mirror the pre-converted shape that + ``research._fetch_open_meteo_range`` emits. + """ + rec: dict = { "valid_at": valid_at, - "temperature_c": temperature_c, "model": model, "precipitation_probability_pct": precipitation_probability_pct, - # No issued_at - this distinguishes Open-Meteo from IEM MOS + "source": source, } + if temperature_c is not None: + rec["temperature_c"] = temperature_c + if temperature_f is not None: + rec["temperature_f"] = temperature_f + if issued_at is not None: + rec["issued_at"] = issued_at + if pop_6hr_pct is not None: + rec["pop_6hr_pct"] = pop_6hr_pct + if qpf_6hr_in is not None: + rec["qpf_6hr_in"] = qpf_6hr_in + return rec # --------------------------------------------------------------------------- @@ -414,6 +436,160 @@ def test_open_meteo_fallback_when_iem_yields_no_window_data(self) -> None: assert row["fcst_high_f"] == pytest.approx(86.0) assert row["fcst_model"] == "open-meteo-gfs" + # ----- issue #67: source-based OM/IEM discrimination ----- + + def test_om_with_derived_issued_at_classified_by_source(self) -> None: + """Issue #67: Phase 20+ OM rows carry a derived ``issued_at`` but must + still be classified as Open-Meteo via their ``source`` prefix, not + misrouted into the IEM MOS aggregation path.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=20.0, # 68F + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=32.0, # 89.6F + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + assert row["fcst_model"] == "open-meteo-gfs" + + def test_iem_preferred_over_om_even_when_om_has_issued_at(self) -> None: + """Issue #67: a hot OM row carrying ``issued_at`` must NOT pollute the + IEM MOS run-selection; IEM still wins and OM stays a fallback.""" + iem = [ + _iem_record( + "2024-07-04T12:00:00Z", + "2024-07-04T14:00:00Z", + temperature_f=89.0, + model="GFS", + ) + ] + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=99.0, # very hot - must not win, must not corrupt IEM run + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ) + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, iem + om) + assert row["fcst_high_f"] == 89.0 + assert row["fcst_model"] == "GFS" + + def test_om_research_wrapper_shape_temperature_f(self) -> None: + """Issue #67: rows shaped like ``_fetch_open_meteo_range`` output — + ``source`` + derived ``issued_at`` + pre-converted ``temperature_f`` + (no ``temperature_c``) — must still aggregate, not return null.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=None, + temperature_f=68.0, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=None, + temperature_f=89.6, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + + def test_om_research_wrapper_pop_and_qpf_survive(self) -> None: + """Issue #67 (codex P2): research-wrapper OM rows carry ``pop_6hr_pct`` + / ``qpf_6hr_in`` (not ``precipitation_probability_pct``). Now that + source routing sends them to the OM branch, those precip columns must + still populate — not regress to None as they would if the OM branch + only read ``precipitation_probability_pct`` and never set QPF.""" + om = [ + _om_record( + "2024-07-04T08:00:00Z", + temperature_c=None, + temperature_f=68.0, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + pop_6hr_pct=20.0, + qpf_6hr_in=0.1, + ), + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=None, + temperature_f=89.6, + source="open_meteo.previous_runs", + issued_at="2024-07-04T06:00:00Z", + pop_6hr_pct=60.0, + qpf_6hr_in=0.2, + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_pop_6hr_pct"] == 60.0 # max over window + assert row["fcst_qpf_6hr_in"] == pytest.approx(0.3) # sum over window + # Issue #67 (codex P2): OM issued_at provenance must survive routing. + assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" + + def test_om_after_close_run_excluded_from_aggregation(self) -> None: + """Issue #67 (codex P1, leakage): an OM row from a run issued AFTER + market close must not contribute its temp/POP/QPF to the pair, and its + timestamp must not be exposed. Only the eligible (<=close) run counts.""" + # NYC market close for 2024-07-04 is 21:30Z. A 23:00Z-issued run is + # lookahead — its hot 31C reading must NOT raise fcst_high_f. + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=30.0, # 86F - eligible run + issued_at="2024-07-04T06:00:00Z", + ), + _om_record( + "2024-07-04T15:00:00Z", + temperature_c=31.0, # 87.8F - AFTER close, must be excluded + issued_at="2024-07-04T23:00:00Z", + ), + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z" + assert row["fcst_high_f"] == pytest.approx(86.0) # 87.8F leak excluded + assert row["fcst_low_f"] == pytest.approx(86.0) + + def test_legacy_source_less_om_shape_classified_as_om(self) -> None: + """Issue #67 (codex P2): the previously-documented OM shape — no + ``source`` AND no ``issued_at``, carrying ``temperature_c`` — must + still classify as Open-Meteo (a record lacking both fields can only be + legacy OM; real IEM always carries issued_at). Without the legacy + fallback these rows would misroute to the IEM branch and null out.""" + legacy_om = [ + {"valid_at": "2024-07-04T08:00:00Z", "temperature_c": 20.0, "model": "om"}, # 68F + {"valid_at": "2024-07-04T14:00:00Z", "temperature_c": 32.0, "model": "om"}, # 89.6F + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, legacy_om) + assert row["fcst_high_f"] == pytest.approx(89.6) + assert row["fcst_low_f"] == pytest.approx(68.0) + + def test_om_pop_zero_not_dropped(self) -> None: + """A valid 0.0 POP must survive the alias fallback (no truthiness bug).""" + om = [ + _om_record( + "2024-07-04T14:00:00Z", + temperature_c=20.0, + source="open_meteo.previous_runs", + precipitation_probability_pct=0.0, + ) + ] + row = build_pairs_row("2024-07-04", "NYC", [], None, om) + assert row["fcst_pop_6hr_pct"] == 0.0 + def test_fcst_pop_6hr_pct_from_iem(self) -> None: records = [ _iem_record("2024-07-04T12:00:00Z", "2024-07-04T08:00:00Z", pop_6hr_pct=20.0),