Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 108 additions & 29 deletions packages/core/src/mostlyright/_internal/_pairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@

This is the primary training/feature surface for AI settlement models.

Forecast join:
- IEM MOS records (`forecast.json`) have `issued_at`; grouped by issued_at to
pick the most-recent run before market close, then temperature_f values for
valid_at timestamps within the settlement window are aggregated (max/min).
- Open-Meteo records (`forecast_series.json`) have no issued_at; all records
in the settlement window are used. temperature_c is converted to F.
Forecast join (records are split by their authoritative ``source`` field —
``source`` prefixed ``open_meteo`` -> Open-Meteo, else IEM MOS; see issue #67):
- IEM MOS records (`forecast.json`, `source="iem.archive"`) are grouped by
issued_at to pick the most-recent run before market close, then
temperature_f values for valid_at timestamps within the settlement window
are aggregated (max/min).
- Open-Meteo records (`forecast_series.json`, `source="open_meteo.*"`) use
all records in the settlement window. temperature_c is converted to F
(or a pre-converted temperature_f is used as-is). NOTE: Phase 20+ OM rows
also carry a derived `issued_at`, so `issued_at` is NOT the discriminator.
- If both are available, IEM MOS is preferred. Open-Meteo used as fallback.
- If forecast data is unavailable, forecast columns are None - the row is
still returned.
Expand Down Expand Up @@ -180,6 +184,26 @@ def _select_best_run(
return best_issued, runs[best_issued]


def _is_open_meteo_record(r: dict[str, Any]) -> bool:
"""True if ``r`` is an Open-Meteo forecast record (issue #67).

The authoritative signal is the ``source`` field: real Open-Meteo rows
carry ``source`` prefixed ``open_meteo`` (.previous_runs / .single_run /
.seamless / .live), while IEM MOS rows carry ``source="iem.archive"``.

For backward compatibility, the previously-documented *legacy* Open-Meteo
shape — no ``source`` AND no ``issued_at`` (the old ``forecast_series.json``
discriminator) — is also treated as Open-Meteo. Real IEM MOS rows always
carry an ``issued_at``, so a record lacking both fields can only be a
legacy OM row; this avoids regressing source-less OM callers to null.
"""
src = str(r.get("source") or "")
if src.startswith("open_meteo"):
return True
# Legacy OM shape (pre-Phase-20): no source, no issued_at.
return not src and not r.get("issued_at")


def _aggregate_fcst_temps_iem(
run_records: list[dict[str, Any]],
window_start_iso: str,
Expand Down Expand Up @@ -209,9 +233,13 @@ def _aggregate_fcst_temps_openmeteo(
window_start_iso: str,
window_end_iso: str,
) -> tuple[float | None, float | None]:
"""Aggregate Open-Meteo hourly temperature_c (-> F) over the settlement window.
"""Aggregate Open-Meteo hourly temperature (-> F) over the settlement window.

Open-Meteo stores temperature in Celsius. Conversion: F = C * 9/5 + 32.
Open-Meteo rows store temperature in Celsius under ``temperature_c``.
Conversion: F = C * 9/5 + 32. As a fallback, rows that already carry a
pre-converted ``temperature_f`` (the shape ``research._fetch_open_meteo_range``
emits) are used as-is — without this fallback, source-discriminated OM rows
from the research() wrapper would aggregate to null (issue #67).

Args:
run_records: All Open-Meteo hourly records for the date.
Expand All @@ -221,12 +249,17 @@ def _aggregate_fcst_temps_openmeteo(
Returns:
(fcst_high_f, fcst_low_f) or (None, None) if no records in window.
"""
temps_f = [
r["temperature_c"] * 9 / 5 + 32
for r in run_records
if r.get("temperature_c") is not None
and window_start_iso <= r.get("valid_at", "") <= window_end_iso
]
temps_f: list[float] = []
for r in run_records:
if not (window_start_iso <= r.get("valid_at", "") <= window_end_iso):
continue
temp_c = r.get("temperature_c")
if temp_c is not None:
temps_f.append(temp_c * 9 / 5 + 32)
continue
temp_f = r.get("temperature_f")
if temp_f is not None:
temps_f.append(temp_f)
return (max(temps_f), min(temps_f)) if temps_f else (None, None)


Expand All @@ -250,7 +283,10 @@ def build_pairs_row(
climate: NWS CLI record for the date (or None). Must be a dict or None
- non-dict values are treated as None.
forecasts: All forecast records with valid_at (or None if unavailable).
IEM MOS records have issued_at; Open-Meteo records do not.
Records are split by their ``source`` field: ``source`` prefixed
``open_meteo`` -> Open-Meteo, everything else (e.g.
``source="iem.archive"``) -> IEM MOS. ``issued_at`` is NOT used as
the discriminator (Phase 20+ Open-Meteo rows carry one too).
forecast_model: Filter IEM MOS records to this model before run
selection. None = no filtering (best available run).
tz_override: IANA timezone name override for stations not in the known
Expand Down Expand Up @@ -295,9 +331,17 @@ def build_pairs_row(
win_start_iso = win_start.strftime("%Y-%m-%dT%H:%M:%SZ")
win_end_iso = win_end.strftime("%Y-%m-%dT%H:%M:%SZ")

# Separate IEM MOS (has issued_at) from Open-Meteo (no issued_at)
iem_records = [r for r in forecasts if r.get("issued_at")]
om_records = [r for r in forecasts if not r.get("issued_at")]
# Separate IEM MOS from Open-Meteo by the authoritative ``source``
# field (issue #67), with a legacy no-source/no-issued_at fallback —
# see _is_open_meteo_record. ``issued_at`` presence alone is NOT a
# valid discriminator: Phase 20+ Open-Meteo rows carry a derived
# ``issued_at`` (for cycle-math / previous-runs caching), so the old
# ``issued_at``-based split misrouted those rows into the IEM MOS
# aggregation path, silently nulling forecast temps and polluting IEM
# run-selection. research._fetch_open_meteo_range already documents
# this contract ("discriminates via row.get('source')").
om_records = [r for r in forecasts if _is_open_meteo_record(r)]
iem_records = [r for r in forecasts if not _is_open_meteo_record(r)]

# Apply forecast_model filter to IEM MOS records before run selection.
# Phase 17 Wave 4 iter-3 review HIGH: case-insensitive match because
Expand Down Expand Up @@ -339,19 +383,54 @@ def build_pairs_row(

# Fall back to Open-Meteo if IEM MOS yielded no temperature data
if fcst_high is None and om_records:
fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo(
om_records, win_start_iso, win_end_iso
)
fcst_model = next((r.get("model") for r in om_records if r.get("model")), "open-meteo")
# Leakage guard (issue #67, codex P1): Phase 20+ OM rows carry a
# derived issued_at. Pre-#67 these flowed through the IEM branch,
# where _select_best_run filtered runs issued AFTER market close.
# The OM branch has no such filter, so apply the cutoff here too —
# otherwise a row from a run issued after settlement (e.g. live /
# single-run cycles mixed into training pairs) would leak its
# temperature/POP/QPF into the pair, not just its timestamp.
# Legacy source-less OM rows have no issued_at provenance and are
# kept (documented all-window behavior — nothing to leak).
cutoff_iso = market_close.strftime("%Y-%m-%dT%H:%M:%SZ")
window_om = [
r for r in om_records if win_start_iso <= r.get("valid_at", "") <= win_end_iso
]
probs = [
r["precipitation_probability_pct"]
for r in window_om
if r.get("precipitation_probability_pct") is not None
r
for r in om_records
if win_start_iso <= r.get("valid_at", "") <= win_end_iso
and ((iss := r.get("issued_at")) is None or iss <= cutoff_iso)
]
fcst_pop = max(probs) if probs else None
fcst_high, fcst_low = _aggregate_fcst_temps_openmeteo(
window_om, win_start_iso, win_end_iso
)
if window_om:
fcst_model = next(
(r.get("model") for r in window_om if r.get("model")), "open-meteo"
)
# POP: accept the unit-contract ``precipitation_probability_pct``
# OR the ``pop_6hr_pct`` alias research._fetch_open_meteo_range
# emits. Without the alias, source-discriminated wrapper rows
# would regress POP to None now that they no longer flow through
# the IEM branch. Explicit None-checks preserve a valid 0.0.
probs: list[float] = []
for r in window_om:
p = r.get("precipitation_probability_pct")
if p is None:
p = r.get("pop_6hr_pct")
if p is not None:
probs.append(p)
fcst_pop = max(probs) if probs else None
# QPF: the OM unit-contract shape carries no QPF, but the
# research wrapper emits ``qpf_6hr_in`` — sum over the window to
# match IEM-branch semantics (else wrapper QPF regresses too).
qpfs_om = [r["qpf_6hr_in"] for r in window_om if r.get("qpf_6hr_in") is not None]
if qpfs_om:
fcst_qpf = sum(qpfs_om)
# ISSUED_AT provenance: most-recent run timestamp (all already
# <= cutoff by the window_om filter above). None for legacy
# source-less rows.
om_issued = [iss for r in window_om if (iss := r.get("issued_at")) is not None]
if om_issued:
fcst_issued = max(om_issued)

fcst.update(
{
Expand Down
186 changes: 181 additions & 5 deletions packages/core/tests/_internal/test_pairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,40 @@ def _iem_record(

def _om_record(
valid_at: str,
temperature_c: float = 28.0,
temperature_c: float | None = 28.0,
model: str = "open-meteo-gfs",
precipitation_probability_pct: float | None = None,
source: str = "open_meteo.previous_runs",
issued_at: str | None = None,
temperature_f: float | None = None,
pop_6hr_pct: float | None = None,
qpf_6hr_in: float | None = None,
) -> dict:
"""Open-Meteo hourly forecast record matching specs/forecast_series.json."""
return {
"""Open-Meteo hourly forecast record matching specs/forecast_series.json.

Real Open-Meteo rows always carry a ``source`` prefixed ``open_meteo`` —
that field (NOT ``issued_at`` presence) is the authoritative discriminator
from IEM MOS (issue #67). Phase 20+ rows also carry a derived ``issued_at``.
``temperature_f`` is accepted to mirror the pre-converted shape that
``research._fetch_open_meteo_range`` emits.
"""
rec: dict = {
"valid_at": valid_at,
"temperature_c": temperature_c,
"model": model,
"precipitation_probability_pct": precipitation_probability_pct,
# No issued_at - this distinguishes Open-Meteo from IEM MOS
"source": source,
}
if temperature_c is not None:
rec["temperature_c"] = temperature_c
if temperature_f is not None:
rec["temperature_f"] = temperature_f
if issued_at is not None:
rec["issued_at"] = issued_at
if pop_6hr_pct is not None:
rec["pop_6hr_pct"] = pop_6hr_pct
if qpf_6hr_in is not None:
rec["qpf_6hr_in"] = qpf_6hr_in
return rec


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -414,6 +436,160 @@ def test_open_meteo_fallback_when_iem_yields_no_window_data(self) -> None:
assert row["fcst_high_f"] == pytest.approx(86.0)
assert row["fcst_model"] == "open-meteo-gfs"

# ----- issue #67: source-based OM/IEM discrimination -----

def test_om_with_derived_issued_at_classified_by_source(self) -> None:
"""Issue #67: Phase 20+ OM rows carry a derived ``issued_at`` but must
still be classified as Open-Meteo via their ``source`` prefix, not
misrouted into the IEM MOS aggregation path."""
om = [
_om_record(
"2024-07-04T08:00:00Z",
temperature_c=20.0, # 68F
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
),
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=32.0, # 89.6F
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
),
]
row = build_pairs_row("2024-07-04", "NYC", [], None, om)
assert row["fcst_high_f"] == pytest.approx(89.6)
assert row["fcst_low_f"] == pytest.approx(68.0)
assert row["fcst_model"] == "open-meteo-gfs"

def test_iem_preferred_over_om_even_when_om_has_issued_at(self) -> None:
"""Issue #67: a hot OM row carrying ``issued_at`` must NOT pollute the
IEM MOS run-selection; IEM still wins and OM stays a fallback."""
iem = [
_iem_record(
"2024-07-04T12:00:00Z",
"2024-07-04T14:00:00Z",
temperature_f=89.0,
model="GFS",
)
]
om = [
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=99.0, # very hot - must not win, must not corrupt IEM run
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
)
]
row = build_pairs_row("2024-07-04", "NYC", [], None, iem + om)
assert row["fcst_high_f"] == 89.0
assert row["fcst_model"] == "GFS"

def test_om_research_wrapper_shape_temperature_f(self) -> None:
"""Issue #67: rows shaped like ``_fetch_open_meteo_range`` output —
``source`` + derived ``issued_at`` + pre-converted ``temperature_f``
(no ``temperature_c``) — must still aggregate, not return null."""
om = [
_om_record(
"2024-07-04T08:00:00Z",
temperature_c=None,
temperature_f=68.0,
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
),
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=None,
temperature_f=89.6,
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
),
]
row = build_pairs_row("2024-07-04", "NYC", [], None, om)
assert row["fcst_high_f"] == pytest.approx(89.6)
assert row["fcst_low_f"] == pytest.approx(68.0)

def test_om_research_wrapper_pop_and_qpf_survive(self) -> None:
"""Issue #67 (codex P2): research-wrapper OM rows carry ``pop_6hr_pct``
/ ``qpf_6hr_in`` (not ``precipitation_probability_pct``). Now that
source routing sends them to the OM branch, those precip columns must
still populate — not regress to None as they would if the OM branch
only read ``precipitation_probability_pct`` and never set QPF."""
om = [
_om_record(
"2024-07-04T08:00:00Z",
temperature_c=None,
temperature_f=68.0,
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
pop_6hr_pct=20.0,
qpf_6hr_in=0.1,
),
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=None,
temperature_f=89.6,
source="open_meteo.previous_runs",
issued_at="2024-07-04T06:00:00Z",
pop_6hr_pct=60.0,
qpf_6hr_in=0.2,
),
]
row = build_pairs_row("2024-07-04", "NYC", [], None, om)
assert row["fcst_pop_6hr_pct"] == 60.0 # max over window
assert row["fcst_qpf_6hr_in"] == pytest.approx(0.3) # sum over window
# Issue #67 (codex P2): OM issued_at provenance must survive routing.
assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z"

def test_om_after_close_run_excluded_from_aggregation(self) -> None:
"""Issue #67 (codex P1, leakage): an OM row from a run issued AFTER
market close must not contribute its temp/POP/QPF to the pair, and its
timestamp must not be exposed. Only the eligible (<=close) run counts."""
# NYC market close for 2024-07-04 is 21:30Z. A 23:00Z-issued run is
# lookahead — its hot 31C reading must NOT raise fcst_high_f.
om = [
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=30.0, # 86F - eligible run
issued_at="2024-07-04T06:00:00Z",
),
_om_record(
"2024-07-04T15:00:00Z",
temperature_c=31.0, # 87.8F - AFTER close, must be excluded
issued_at="2024-07-04T23:00:00Z",
),
]
row = build_pairs_row("2024-07-04", "NYC", [], None, om)
assert row["fcst_issued_at"] == "2024-07-04T06:00:00Z"
assert row["fcst_high_f"] == pytest.approx(86.0) # 87.8F leak excluded
assert row["fcst_low_f"] == pytest.approx(86.0)

def test_legacy_source_less_om_shape_classified_as_om(self) -> None:
"""Issue #67 (codex P2): the previously-documented OM shape — no
``source`` AND no ``issued_at``, carrying ``temperature_c`` — must
still classify as Open-Meteo (a record lacking both fields can only be
legacy OM; real IEM always carries issued_at). Without the legacy
fallback these rows would misroute to the IEM branch and null out."""
legacy_om = [
{"valid_at": "2024-07-04T08:00:00Z", "temperature_c": 20.0, "model": "om"}, # 68F
{"valid_at": "2024-07-04T14:00:00Z", "temperature_c": 32.0, "model": "om"}, # 89.6F
]
row = build_pairs_row("2024-07-04", "NYC", [], None, legacy_om)
assert row["fcst_high_f"] == pytest.approx(89.6)
assert row["fcst_low_f"] == pytest.approx(68.0)

def test_om_pop_zero_not_dropped(self) -> None:
"""A valid 0.0 POP must survive the alias fallback (no truthiness bug)."""
om = [
_om_record(
"2024-07-04T14:00:00Z",
temperature_c=20.0,
source="open_meteo.previous_runs",
precipitation_probability_pct=0.0,
)
]
row = build_pairs_row("2024-07-04", "NYC", [], None, om)
assert row["fcst_pop_6hr_pct"] == 0.0

def test_fcst_pop_6hr_pct_from_iem(self) -> None:
records = [
_iem_record("2024-07-04T12:00:00Z", "2024-07-04T08:00:00Z", pop_6hr_pct=20.0),
Expand Down
Loading