diff --git a/.briefs/issue-64-brief.md b/.briefs/issue-64-brief.md new file mode 100644 index 0000000..f4b9956 --- /dev/null +++ b/.briefs/issue-64-brief.md @@ -0,0 +1,105 @@ +# Brief: Fix Open-Meteo Rate Limiting (Issue #64) + +**Repo:** `mostlyrightmd/mostlyright-sdk` @ `16d62de` (v1.5.2) +**Branch:** `fix/64-open-meteo-rate-limiting` (off `main`) +**Issue:** +**Labels:** bug + enhancement + +## Problem + +Three compounding issues cause users to hit Open-Meteo free-tier rate limits during backtests: + +1. **Forecast cache exists but is never called** — `read_forecast_cache` / `write_forecast_cache` in `cache.py:542-571` are only referenced in `test_cache_forecasts.py`. `_fetch_open_meteo_range()` in `research.py:1384` calls the network directly every time. A quant iterating on a model re-fetches identical historical data on every run. +2. **Politeness throttle counts requests, not weight** — `_OM_POLITE_DELAY_S = 0.2` caps at ~300 req/min nominally, but Open-Meteo bills by *weighted* call cost (`max(vars/10, 1) × max(days/14, 1) × locations`). A 1-year window with 18 variables weighs ~47 calls — exhausts the 600/min ceiling in ~13 stations. +3. **Over-fetching 18 variables on the `research()` path** — `_OM_VARIABLES_TO_FETCH` always requests 18 hourly variables, but `_fetch_open_meteo_range()` in `research.py:1384` only consumes `temp_c`, `precip_probability`, and `precipitation_mm` from the returned DataFrame. The other 15 variables are fetched, billed, and discarded. + +## Scope + +### Fix 1: Wire forecast cache into `_fetch_open_meteo_range` (highest impact) + +**File:** `packages/core/src/mostlyright/research.py` — function `_fetch_open_meteo_range()` (line 1384) + +**What to do:** +- After fetching the DataFrame from `fetch_open_meteo()`, group rows by `(station, source, model, year, month)` and write each group to the cache using `write_forecast_cache()`. +- Before the `fetch_open_meteo()` call, attempt to read cached data covering `[from_date, to_date]` using `read_forecast_cache()` for each month in the range. +- On cache hit: reconstruct the DataFrame from the cached rows (they're `list[dict]`) and skip the network call for that month range. +- On cache miss (partial or full): fetch from network for the missing portion, cache the result, and concatenate with any cached data. +- **Never cache `live` or `seamless` source rows** — `read_forecast_cache` and `write_forecast_cache` already enforce this (they return None / no-op for live/seamless sources). +- **Never cache the current UTC month** — already enforced by the cache functions. + +**Cache key partitioning:** The cache is partitioned by `(station, source, model, year, month)` as Parquet files. For `_fetch_open_meteo_range`, the `source` will be `"open_meteo.previous_runs"` (since `mode="training"` is hardcoded in the caller at line 1404). + +**Key consideration:** `read_forecast_cache` returns `list[dict]` — these are row dicts as stored, NOT the same format as `_parse_om_row` output. The cached rows are what `write_forecast_cache` receives, which in this case would be the row dicts from the fetched DataFrame. We need to ensure the cached format round-trips correctly through `_fetch_open_meteo_range`'s grouping logic. **Recommendation:** cache at the DataFrame level (convert to list[dict] for write, reconstruct DataFrame from list[dict] on read) rather than trying to cache the already-grouped `{date_iso: [row]}` structure. + +**What NOT to change:** Do not modify `fetch_open_meteo()` itself. The cache should be applied at the `_fetch_open_meteo_range` level in `research.py`, which is the orchestration layer. This keeps the fetcher as a pure HTTP→DataFrame function (testable, composable). + +### Fix 2: Throttle by weight, not request count + +**File:** `packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py` + +**What to do:** +- The file already has `_OM_POLITE_DELAY_S = 0.2` and the fetcher sleeps this amount after each HTTP call. +- After each successful HTTP call, calculate the weighted cost using `_weighted_call_cost(num_vars, num_days)` and scale the sleep: `time.sleep(_OM_POLITE_DELAY_S * ceil(weight))`. +- This means a 1.8-weight call sleeps 0.4s, a 3.9-weight call sleeps 0.8s, etc. — keeping the effective rate under 600/min regardless of per-call weight. +- **Also:** chunk date ranges >14 days client-side. Add `_chunk_date_range(from_date, to_date, max_days=14)` that splits into ≤14-day windows and issues separate HTTP calls per chunk, concatenating the DataFrames. This keeps per-call weight at ~1.8 (18 vars / 10 × 1) instead of ballooning. Single-Runs API is exempt (it uses `run=` and returns the full horizon). +- The chunking + weight-aware throttle together mean the existing `_OM_POLITE_DELAY_S` becomes a base rate that scales up proportionally. + +**Constants to add:** +```python +_OM_MAX_DAYS_PER_CALL: int = 14 # Open-Meteo weight threshold +_OM_VAR_FREE_BUDGET: int = 10 # Open-Meteo free variable count per call +``` + +### Fix 3: Trim variables on the `research()` path + +**File:** `packages/core/src/mostlyright/research.py` — function `_fetch_open_meteo_range()` (line 1384) + +**What to do:** +- Pass `variables=("temperature_2m", "precipitation_probability", "precipitation")` to the `fetch_open_meteo()` call at line 1404 instead of letting it default to the full 18-variable set. +- This drops the per-call variable weight from 1.8 to 1.0 — cutting the weighted cost in half. +- The `fetch_open_meteo()` function already supports a `variables` parameter (added in a prior change — check `_validate_variables()` at line ~195 of `_open_meteo.py`). If not yet present, it needs to be added. +- The returned DataFrame will have all canonical columns but only 3 will be populated; the rest will be null. This is fine — `_fetch_open_meteo_range` already handles null values (lines 1431-1449). +- **Do NOT change the default behavior of `fetch_open_meteo()`** — the standalone API should still fetch all 18 variables for users who want the full DataFrame. Only trim on the `research()` orchestration path. + +**IMPORTANT — verify `variables` param exists:** Check if `fetch_open_meteo()` already accepts `variables=`. Look for `_validate_variables()` and a `variables` parameter in the function signature. If it doesn't exist yet, it needs to be added to `_open_meteo.py` first (including the validation, the hourly param builder adjustment, and the row parser handling for missing columns). + +## Files to Modify + +| File | Change | +|------|--------| +| `packages/core/src/mostlyright/research.py` | Fix 1 (cache wiring) + Fix 3 (variable trim) in `_fetch_open_meteo_range()` | +| `packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py` | Fix 2 (weight-aware throttle + chunking) + possibly `variables` param if missing | + +## Files to Create (Tests) + +| File | What to test | +|------|-------------| +| `packages/core/tests/test_open_meteo_cache_wiring.py` | Cache hit/miss/partial in `_fetch_open_meteo_range` (mock HTTP, verify cache read/write) | +| `packages/weather/tests/test_open_meteo_throttle.py` | Weight-aware sleep calculation, chunking logic | +| `packages/core/tests/test_open_meteo_variables_trim.py` | `_fetch_open_meteo_range` passes trimmed variables, returned DF has only 3 non-null columns | + +## Testing Approach + +- **Fix 1 (cache):** Mock `fetch_open_meteo` to return a known DataFrame. First call → cache miss → network fetch → cache write. Second call → cache hit → no network call → same data. Verify with `unittest.mock.patch`. +- **Fix 2 (throttle):** Test `_chunk_date_range()` directly (pure function, no mocks needed). Test `_weighted_call_cost()` directly. Test that `fetch_open_meteo` with a long date range issues the correct number of chunked requests (mock HTTP, count calls). +- **Fix 3 (variables):** Mock `fetch_open_meteo`, verify it receives `variables=("temperature_2m", "precipitation_probability", "precipitation")` when called from `_fetch_open_meteo_range`. Verify the returned grouped dict still has `temperature_f`, `pop_6hr_pct`, `qpf_6hr_in` populated correctly. + +## Constraints + +- **Branch from `main`.** Do not commit directly to main. +- **TDD:** Write tests first, then implement. +- **Run `uv run ruff check --fix . && uv run ruff format .` before committing.** +- **Run `uv run pytest -m "not live" -q` to verify.** +- **Backward compatible:** `fetch_open_meteo()` standalone API must still fetch all 18 variables by default. +- **Cache format:** `list[dict]` of row dicts (Parquet-backed via `write_forecast_cache`). +- **Never cache live/seamless/current-month** — already enforced by cache functions. +- **Don't break `_fetch_open_meteo_range`'s return type:** `dict[str, list[dict[str, Any]]]` — keyed by settlement date ISO. + +## Acceptance Criteria + +1. `_fetch_open_meteo_range("KNYC", "2024-06-01", "2024-12-31", model="gfs_global")` caches results on first call and reads from cache on second call (no network). +2. `fetch_open_meteo("KNYC", "2024-01-01", "2024-12-31", model="gfs_global")` with a 1-year window issues ≤27 chunked requests (ceil(365/14) = 27) instead of 1 unbounded-weight call. +3. `_fetch_open_meteo_range` requests only 3 variables instead of 18. +4. Weighted sleep scales with call cost (verify with time mocking or assert sleep duration in tests). +5. All existing tests pass (`uv run pytest -m "not live" -q`). +6. New tests cover cache hit/miss, chunking, variable trimming. diff --git a/.briefs/issue-64-gemini-review-brief.md b/.briefs/issue-64-gemini-review-brief.md new file mode 100644 index 0000000..4a4ccfb --- /dev/null +++ b/.briefs/issue-64-gemini-review-brief.md @@ -0,0 +1,272 @@ +# Gemini Review Brief: Issue #64 — Open-Meteo Rate Limiting Fix + +## Context + +You are reviewing a code change for Issue #64 in the `mostlyright-sdk` repo. This is a public SDK for weather prediction market research. The code was written by Claude Code and has already had one adversarial review from Blenda (the infrastructure agent). We need a second independent review from a different model perspective. + +**Repo:** `mostlyrightmd/mostlyright-sdk` @ `16d62de` (v1.5.2, on `main` — changes are uncommitted working tree) +**Issue:** https://github.com/mostlyrightmd/mostlyright-sdk/issues/64 + +## What Changed + +Two source files modified, three test files created: + +| File | Change | +|------|--------| +| `packages/core/src/mostlyright/research.py` | Cache wiring (Fix 1) + variable trimming (Fix 3) in `_fetch_open_meteo_range()` | +| `packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py` | Weight-aware throttle + date chunking (Fix 2) + `variables=` param + `_validate_variables()` | +| `packages/core/tests/test_open_meteo_cache_wiring.py` | Tests for cache hit/miss and variable trimming on research path | +| `packages/weather/tests/test_open_meteo_variables_param.py` | Tests for `variables=` param (trim, unknown rejection, Single Runs no suffix) | +| `packages/weather/tests/test_open_meteo_window_chunking.py` | Tests for chunking (under/over 14 days, Single Runs exempt) | + +## The Diff (research.py) + +```diff +--- a/packages/core/src/mostlyright/research.py ++++ b/packages/core/src/mostlyright/research.py + ++_OM_RESEARCH_VARIABLES: tuple[str, ...] = ( ++ "temperature_2m", ++ "precipitation", ++ "precipitation_probability", ++) ++_OM_RESEARCH_SOURCE: str = "open_meteo.previous_runs" ++ ++ + def _fetch_open_meteo_range( + info: StationInfo, + from_date: str, + to_date: str, + *, + model: str, + ) -> dict[str, list[dict[str, Any]]]: +- """Phase 20 OM-05 — fetch Open-Meteo forecasts grouped by settlement date. +- Wraps ``mostlyright.weather._fetchers._open_meteo.fetch_open_meteo`` in +- training mode (Previous Runs API) and pivots its tabular DataFrame +- into the ``{date_iso: [forecast_row, ...]}`` shape that +- ``build_pairs(forecasts_by_date=...)`` expects. Each row carries +- ``model`` / ``issued_at`` / ``valid_at`` / ``temperature_f`` / +- ``pop_6hr_pct`` / ``qpf_6hr_in`` keys for build_pairs_row compatibility. ++ """Phase 20 OM-05 — fetch Open-Meteo forecasts grouped by settlement date. ++ Reads from the Phase 20 forecast cache before hitting the network. On a ++ cache miss the fetcher writes each elapsed month's rows back so subsequent ++ calls for the same window are served from disk. Only the 3 variables ++ consumed by the pairs join are requested (Fix 3 — cuts weighted call cost). ++ Returns the ``{date_iso: [forecast_row, ...]}`` shape that ++ ``build_pairs(forecasts_by_date=...)`` expects. + """ ++ from datetime import date as _date ++ from datetime import timedelta as _timedelta ++ + import pandas as pd + + from mostlyright.weather._fetchers._open_meteo import fetch_open_meteo ++ from mostlyright.weather.cache import read_forecast_cache, write_forecast_cache ++ ++ # Enumerate (year, month) partitions covered by [from_date, to_date]. ++ start = _date.fromisoformat(from_date) ++ end = _date.fromisoformat(to_date) ++ months: list[tuple[int, int]] = [] ++ cur = _date(start.year, start.month, 1) ++ while cur <= end: ++ months.append((cur.year, cur.month)) ++ cur = _date(cur.year + (cur.month // 12), (cur.month % 12) + 1, 1) ++ ++ # Serve cached partitions; collect months that need a network fetch. ++ all_rows: list[dict[str, Any]] = [] ++ missing: list[tuple[int, int]] = [] ++ for y, m in months: ++ hit = read_forecast_cache(info.icao, _OM_RESEARCH_SOURCE, model, y, m) ++ if hit is not None: ++ all_rows.extend(hit) ++ else: ++ missing.append((y, m)) ++ ++ # Fetch the missing span and populate the cache. ++ if missing: ++ miss_start = max(_date(missing[0][0], missing[0][1], 1), start) ++ miss_end_y, miss_end_m = missing[-1] ++ last_day = _date(miss_end_y + (miss_end_m // 12), (miss_end_m % 12) + 1, 1) - _timedelta( ++ days=1 ++ ) ++ miss_end = min(last_day, end) ++ ++ df_fetched = fetch_open_meteo( ++ info.icao, ++ miss_start.isoformat(), ++ miss_end.isoformat(), ++ model=model, ++ mode="training", ++ variables=_OM_RESEARCH_VARIABLES, ++ ) ++ ++ if df_fetched is not None and not df_fetched.empty: ++ for y, m in missing: ++ mask = (df_fetched["valid_at"].dt.year == y) & ( ++ df_fetched["valid_at"].dt.month == m ++ ) ++ month_rows = df_fetched[mask].to_dict("records") ++ if month_rows: ++ write_forecast_cache(info.icao, _OM_RESEARCH_SOURCE, model, y, m, month_rows) ++ all_rows.extend(df_fetched.to_dict("records")) + +- df = fetch_open_meteo(info.icao, from_date, to_date, model=model, mode="training") + groups: dict[str, list[dict[str, Any]]] = {} +- if df is None or df.empty: ++ if not all_rows: + return groups +- for _, row in df.iterrows(): ++ ++ for row in all_rows: + ftime = row.get("valid_at") + if ftime is None or (isinstance(ftime, float) and ftime != ftime): + continue +``` + +## The Diff (_open_meteo.py) + +```diff +--- a/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py ++++ b/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py + ++from datetime import UTC, date, datetime, timedelta ++from math import ceil + ++#: Open-Meteo per-call weight thresholds (free tier billing model). ++_OM_MAX_DAYS_PER_CALL: int = 14 ++_OM_VAR_FREE_BUDGET: int = 10 + ++def _chunk_date_range( ++ from_date: str, ++ to_date: str, ++ max_days: int = _OM_MAX_DAYS_PER_CALL, ++) -> list[tuple[str, str]]: ++ """Split [from_date, to_date] into ≤max_days-day chunks.""" ++ start = date.fromisoformat(from_date) ++ end = date.fromisoformat(to_date) ++ chunks: list[tuple[str, str]] = [] ++ cur = start ++ while cur <= end: ++ chunk_end = min(cur + timedelta(days=max_days - 1), end) ++ chunks.append((cur.isoformat(), chunk_end.isoformat())) ++ cur = chunk_end + timedelta(days=1) ++ return chunks ++ ++def _weighted_call_cost(num_vars: int, num_days: int) -> float: ++ """Open-Meteo weighted call cost: ceil(vars/10) * ceil(days/14).""" ++ return float(ceil(num_vars / _OM_VAR_FREE_BUDGET) * ceil(num_days / _OM_MAX_DAYS_PER_CALL)) ++ ++def _validate_variables(variables: tuple[str, ...] | None) -> tuple[str, ...]: ++ """Validate caller-supplied variables; return full default set when None.""" ++ if variables is None: ++ return _OM_VARIABLES_TO_FETCH ++ unknown = [v for v in variables if v not in _OM_VAR_TO_COLUMN] ++ if unknown: ++ raise ValueError( ++ f"unknown OM variable(s) — {unknown!r}; allowed: {sorted(_OM_VAR_TO_COLUMN)}" ++ ) ++ return tuple(variables) + +-def _build_hourly_param(endpoint: str) -> str: ++def _build_hourly_param( ++ endpoint: str, ++ variables: tuple[str, ...] = _OM_VARIABLES_TO_FETCH, ++) -> str: + if endpoint == OPEN_METEO_PREVIOUS_RUNS_URL: +- return ",".join(f"{v}_previous_day1" for v in _OM_VARIABLES_TO_FETCH) ++ return ",".join(f"{v}_previous_day1" for v in variables) +- return ",".join(_OM_VARIABLES_TO_FETCH) ++ return ",".join(variables) + + # In fetch_open_meteo(): ++ vars_to_fetch = _validate_variables(variables) ++ ++ # Chunk date ranges >14 days for Previous Runs API (no issued_at). ++ # Single Runs uses run= and returns a full 168h horizon — no chunking. ++ if issued_at is None and endpoint == OPEN_METEO_PREVIOUS_RUNS_URL: ++ chunks = _chunk_date_range(from_date, to_date) ++ else: ++ chunks = [(from_date, to_date)] ++ ++ close_client = client is None + if client is None: + client = httpx.Client(timeout=timeout) +- close_client = True +- +- retrieved_at = datetime.now(UTC) +- payload: dict[str, Any] = {} ++ frames: list[pd.DataFrame] = [] + try: +- for attempt in range(_MAX_RETRIES + 1): +- # ... retry logic unchanged ... +- time.sleep(_OM_POLITE_DELAY_S) ++ for chunk_from, chunk_to in chunks: ++ params = { ... "hourly": _build_hourly_param(endpoint, vars_to_fetch), ... } ++ # ... retry loop per chunk (same 429/404 handling) ... ++ ++ # Weight-aware polite delay scales with per-call cost. ++ num_days = (date.fromisoformat(chunk_to) - date.fromisoformat(chunk_from)).days + 1 ++ cost = _weighted_call_cost(len(vars_to_fetch), num_days) ++ time.sleep(_OM_POLITE_DELAY_S * ceil(cost)) ++ ++ if payload: ++ frames.append(_project_payload_to_dataframe(...)) + finally: + if close_client: + client.close() ++ ++ if not frames: ++ return _empty_df() ++ if len(frames) == 1: ++ return frames[0] ++ return pd.concat(frames, ignore_index=True) +``` + +## The Test Files + +### test_open_meteo_cache_wiring.py (155 lines) +- Mocks `fetch_open_meteo`, verifies cache file written on first call, cache hit on second call (no network), and that `variables=` kwarg passes exactly 3 variables. + +### test_open_meteo_variables_param.py (152 lines) +- Uses `httpx.MockTransport` to verify: default call requests 18 vars, trimmed call requests 3, unknown variable raises ValueError before HTTP, Single Runs has no `_previous_day1` suffix. + +### test_open_meteo_window_chunking.py (143 lines) +- Uses `httpx.MockTransport` to verify: 7-day window = 1 call, 30-day window = 2-3 chunks (each ≤13 days), Single Runs mode = 1 call regardless of window size. + +## Project Rules (CLAUDE.md) + +These MUST be followed. Flag any violations: + +1. **Never commit directly to main.** Always branch + PR. Branch name: `fix/64-open-meteo-rate-limiting`. +2. **TDD mandatory.** Tests first, RED → GREEN → REFACTOR. 80% coverage minimum. +3. **Two-reviewer loop** (Codex + Python Architect) before merging to `merged-vision`. +4. **Pre-commit hooks mandatory** — `uv run ruff check --fix . && uv run ruff format .` before committing. +5. **Pre-push hooks mandatory** — `uv run pytest -m "not live"` before pushing. No `--no-verify`. +6. **Dual-SDK rule:** Any public API change must include a TS parity section. +7. **All API calls direct from SDK.** No hosted API client calls anywhere. +8. **Branch workflow:** Feature branches off `merged-vision` (but this is a fix, so off `main` is acceptable per the issue workflow). +9. **Documentation:** Update CHANGELOG.md and relevant docs. + +## Previous Review Findings (Blenda) + +Already identified — verify these are handled: + +1. **Missing partial cache hit test** — No test covers the case where some months are cached and some are missing (e.g., month 1 hits cache, month 2 misses → fetch only month 2, concatenate). +2. **NaT timestamp round-trip** — `df.to_dict("records")` converts pandas NaT timestamps. On cache read, these come back as... what? Could break the downstream `isinstance(ftime, float) and ftime != ftime` NaN check. +3. **Branch discipline** — Changes are on `main`, need to be on a branch. +4. **`_parse_om_row` with subset variables** — When only 3 variables are requested, `_parse_om_row` still iterates over all 18 `_OM_VAR_TO_COLUMN` entries. The unrequested variables will have `None` values from `series[idx]` falling through to the `else` branch when the key doesn't exist in `hourly_payload`. Verify this doesn't cause index errors on the `idx >= len(series)` check when `series` is None. + +## What to Review + +Please provide: + +1. **Correctness:** Any logic bugs, edge cases, race conditions? +2. **API design:** Does `variables=` param fit the SDK's conventions? Is `_validate_variables` in the right place? +3. **Cache design:** Is caching at the `_fetch_open_meteo_range` level correct, or should it be lower (in `fetch_open_meteo` itself)? What about cache invalidation? +4. **Weight calculation:** Does `ceil(vars/10) * ceil(days/14)` match Open-Meteo's actual billing? Check against https://open-meteo.com/en/pricing. +5. **Chunking logic:** Any off-by-one errors in `_chunk_date_range`? What about a 1-day window or same-day from/to? +6. **Test coverage:** What's missing beyond the partial cache hit test? +7. **Performance:** The cache writes `to_dict("records")` which materializes all rows. For a 1-year window with hourly data, that's ~8,760 dicts per station. Is Parquet round-trip actually faster than the HTTP call for small windows? +8. **CLAUDE.md violations:** Any rules broken? +9. **TS parity:** Does this change need a TS parity note? +10. **Anything else** that a second pair of eyes catches? diff --git a/.briefs/issue-pairs-source-misclassification.md b/.briefs/issue-pairs-source-misclassification.md new file mode 100644 index 0000000..4eaccee --- /dev/null +++ b/.briefs/issue-pairs-source-misclassification.md @@ -0,0 +1,120 @@ +# Issue Report Draft: Source Misclassification in `build_pairs_row` (`_pairs.py`) + +## Title (proposed) +`bug(pairs): build_pairs_row misclassifies Open-Meteo records as IEM MOS when both sources requested — causes incorrect run selection and data corruption` + +## Labels (proposed) +`bug` + +## How Discovered +Found by Gemini 2.5 Pro during adversarial review of PR #64 (Open-Meteo rate limiting). The review scope was cache wiring + throttling, but the reviewer traced the data flow downstream and identified a pre-existing bug in the pairs join that becomes more impactful now that Open-Meteo data is cached and reliable. + +## Problem + +In `packages/core/src/mostlyright/_internal/_pairs.py`, `build_pairs_row()` separates IEM MOS and Open-Meteo forecast records using the **presence of `issued_at`**: + +```python +# Current code (line ~297-298) +iem_records = [r for r in forecasts if r.get("issued_at")] +om_records = [r for r in forecasts if not r.get("issued_at")] +``` + +This split is incorrect. **Phase 20 Open-Meteo Previous Runs records carry a derived `issued_at`** (cycle math: `valid_at - publish_lag`, floored to model cycle hours). This means Open-Meteo records **do** have `issued_at` set, and get classified as `iem_records`. + +### Impact + +When `forecast_source=["iem_mos", "open_meteo"]` (or when `forecast_source=None` which defaults to `("iem_mos",)` but may include both): + +1. Open-Meteo records are mixed into the IEM MOS pool. +2. Both sources' runs are grouped together under `_select_best_run(iem_records, market_close)`. +3. Run selection may pick an Open-Meteo cycle as the "best" IEM run (wrong model metadata). +4. The IEM-specific `_aggregate_fcst_temps_iem` path processes Open-Meteo rows, which carry different column names (`temp_c` vs `temperature_f`, `precip_probability` vs `precipitation_probability_pct`). +5. **Data corruption:** incorrect temperature/precipitation values in the output pairs. + +When only `forecast_source="open_meteo"` is requested, the bug is masked because there are no IEM records to confuse — all records end up in `iem_records` but `_select_best_run` on a single run still works. The bug only manifests when **both sources are requested simultaneously**. + +### Why It Wasn't Caught + +- Open-Meteo `issued_at` was added in Phase 20 to support leakage detection. +- The existing test fixtures for `build_pairs_row` likely use records without `issued_at` (matching the old Open-Meteo seamless behavior where `issued_at` was null by design). +- The `research()` single-source path (`forecast_source="open_meteo"`) works despite the misclassification because `_select_best_run` still picks the only available run. +- CI skips `@pytest.mark.live` tests, so the mixed-source path may not be exercised in CI. + +## Proposed Fix + +Replace the `issued_at` presence check with an explicit source field inspection: + +```python +iem_records = [ + r for r in forecasts + if not r.get("source", "").startswith("open_meteo") +] +om_records = [ + r for r in forecasts + if r.get("source", "").startswith("open_meteo") +] +``` + +This is unambiguous — every record carries a `source` field (set by the fetchers: `"iem_mos"` for IEM, `"open_meteo.previous_runs"` / `"open_meteo.single_run"` / `"open_meteo.seamless"` / `"open_meteo.live"` for Open-Meteo). + +### Secondary Issue: Open-Meteo Fallback Block Uses IEM Column Names + +The current fallback block (when IEM MOS yields no data and OM records exist) calls `_aggregate_fcst_temps_openmeteo()` which expects a specific column format. If `_fetch_open_meteo_range` is the source (via #64's cache wiring), the rows carry `temperature_f`, `pop_6hr_pct`, and `qpf_6hr_in` (converted from Celsius in `_fetch_open_meteo_range` lines ~1449-1468). But the fallback block looks for `precipitation_probability_pct` — a different column name than what the research path produces. + +A proposed fix would inline the aggregation and handle both column name conventions: + +```python +if fcst_high is None and om_records: + om_with_issued = [r for r in om_records if r.get("issued_at")] + om_no_issued = [r for r in om_records if not r.get("issued_at")] + + best_om_records = [] + if om_with_issued: + best_issued, best_om_records = _select_best_run(om_with_issued, market_close) + else: + best_om_records = om_no_issued + + if best_om_records: + temps_f = [] + for r in best_om_records: + if win_start_iso <= r.get("valid_at", "") <= win_end_iso: + if r.get("temperature_f") is not None: + temps_f.append(r["temperature_f"]) + elif r.get("temperature_c") is not None: + temps_f.append(r["temperature_c"] * 9 / 5 + 32) + if temps_f: + fcst_high = max(temps_f) + fcst_low = min(temps_f) + + # Support both pop_6hr_pct (research path) and precipitation_probability_pct (legacy) + probs = [] + for r in window_om: + if r.get("pop_6hr_pct") is not None: + probs.append(r["pop_6hr_pct"]) + elif r.get("precipitation_probability_pct") is not None: + probs.append(r["precipitation_probability_pct"]) + fcst_pop = max(probs) if probs else None +``` + +## Scope Decision Needed + +This fix touches `build_pairs_row` — the core join function that every `research()` call passes through. Options: + +1. **Bundle with this issue** — smallest PR, but mixes a #64 rate-limiting fix with a pairs-join correctness fix. +2. **Separate issue** (recommended) — `bug(pairs): Open-Meteo records misclassified as IEM in build_pairs_row`. Clean scope, independent review. Can reference #64 as the discovery context. + +## Test Cases Needed + +1. **Mixed source classification** — `build_pairs_row` with both IEM MOS and Open-Meteo records; verify OM records (with `issued_at`) are NOT placed in `iem_records`. +2. **Column name compatibility** — OM records from `_fetch_open_meteo_range` (carrying `temperature_f`, `pop_6hr_pct`, `qpf_6hr_in`) produce correct `fcst_high`, `fcst_low`, `fcst_pop`, `fcst_qpf` in the output. +3. **Single source regression** — `forecast_source="iem_mos"` only and `forecast_source="open_meteo"` only still produce correct results (no regression). + +## TS Parity + +If the TS SDK has an equivalent `build_pairs_row` or join function, the same source classification bug likely exists there. The TS parity note should reference `CROSS-SDK-SYNC.md`. + +## References + +- Discovered during review of: #64 (`fix(weather): wire forecast cache + weight-aware throttle + variable trim`) +- Related: Phase 20 OM-05 (`_fetch_open_meteo_range` — the function that produces the OM rows with `issued_at`) +- Related: `_aggregate_fcst_temps_openmeteo` (the existing helper that handles the fallback, may need column name update) diff --git a/packages/core/src/mostlyright/research.py b/packages/core/src/mostlyright/research.py index 940622f..6edef59 100644 --- a/packages/core/src/mostlyright/research.py +++ b/packages/core/src/mostlyright/research.py @@ -1381,6 +1381,14 @@ def _validate_research_kwargs( _FORECAST_SOURCES_ALLOWED: frozenset[str] = frozenset({"iem_mos", "open_meteo"}) +_OM_RESEARCH_VARIABLES: tuple[str, ...] = ( + "temperature_2m", + "precipitation", + "precipitation_probability", +) +_OM_RESEARCH_SOURCE: str = "open_meteo.previous_runs" + + def _fetch_open_meteo_range( info: StationInfo, from_date: str, @@ -1390,24 +1398,105 @@ def _fetch_open_meteo_range( ) -> dict[str, list[dict[str, Any]]]: """Phase 20 OM-05 — fetch Open-Meteo forecasts grouped by settlement date. - Wraps ``mostlyright.weather._fetchers._open_meteo.fetch_open_meteo`` in - training mode (Previous Runs API) and pivots its tabular DataFrame - into the ``{date_iso: [forecast_row, ...]}`` shape that - ``build_pairs(forecasts_by_date=...)`` expects. Each row carries - ``model`` / ``issued_at`` / ``valid_at`` / ``temperature_f`` / - ``pop_6hr_pct`` / ``qpf_6hr_in`` keys for build_pairs_row compatibility. + Reads from the Phase 20 forecast cache before hitting the network. On a + cache miss the fetcher writes each elapsed month's rows back so subsequent + calls for the same window are served from disk. Only the 3 variables + consumed by the pairs join are requested (Fix 3 — cuts weighted call cost). + + Returns the ``{date_iso: [forecast_row, ...]}`` shape that + ``build_pairs(forecasts_by_date=...)`` expects. """ + from datetime import date as _date + from datetime import timedelta as _timedelta + import pandas as pd from mostlyright.weather._fetchers._open_meteo import fetch_open_meteo + from mostlyright.weather.cache import read_forecast_cache, write_forecast_cache + + # Enumerate (year, month) partitions covered by [from_date, to_date]. + start = _date.fromisoformat(from_date) + end = _date.fromisoformat(to_date) + months: list[tuple[int, int]] = [] + cur = _date(start.year, start.month, 1) + while cur <= end: + months.append((cur.year, cur.month)) + cur = _date(cur.year + (cur.month // 12), (cur.month % 12) + 1, 1) + + # Serve cached partitions; collect months that need a network fetch. + # Pass month boundaries as need_from/need_to so a partition written for + # only a subset of days (e.g. June 1-2) doesn't masquerade as a full hit. + all_rows: list[dict[str, Any]] = [] + missing: list[tuple[int, int]] = [] + for y, m in months: + month_first = f"{y:04d}-{m:02d}-01" + month_last = (_date(y + (m // 12), (m % 12) + 1, 1) - _timedelta(days=1)).isoformat() + hit = read_forecast_cache( + info.icao, + _OM_RESEARCH_SOURCE, + model, + y, + m, + need_from=month_first, + need_to=month_last, + ) + if hit is not None: + all_rows.extend(hit) + else: + missing.append((y, m)) + + # Fetch missing months as full calendar months so each written partition + # carries complete coverage metadata and subsequent requests for any + # subrange of those months hit the cache. + if missing: + miss_start = _date(missing[0][0], missing[0][1], 1) + miss_end_y, miss_end_m = missing[-1] + miss_end = _date(miss_end_y + (miss_end_m // 12), (miss_end_m % 12) + 1, 1) - _timedelta( + days=1 + ) + + df_fetched = fetch_open_meteo( + info.icao, + miss_start.isoformat(), + miss_end.isoformat(), + model=model, + mode="training", + variables=_OM_RESEARCH_VARIABLES, + ) + + if df_fetched is not None and not df_fetched.empty: + for y, m in missing: + mask = (df_fetched["valid_at"].dt.year == y) & ( + df_fetched["valid_at"].dt.month == m + ) + month_rows = df_fetched[mask].to_dict("records") + if month_rows: + cleaned_rows = [ + {k: (None if pd.isna(v) else v) for k, v in r.items()} for r in month_rows + ] + month_from = f"{y:04d}-{m:02d}-01" + month_to = ( + _date(y + (m // 12), (m % 12) + 1, 1) - _timedelta(days=1) + ).isoformat() + write_forecast_cache( + info.icao, + _OM_RESEARCH_SOURCE, + model, + y, + m, + cleaned_rows, + from_date=month_from, + to_date=month_to, + ) + all_rows.extend(cleaned_rows) - df = fetch_open_meteo(info.icao, from_date, to_date, model=model, mode="training") groups: dict[str, list[dict[str, Any]]] = {} - if df is None or df.empty: + if not all_rows: return groups - for _, row in df.iterrows(): + + for row in all_rows: ftime = row.get("valid_at") - if ftime is None or (isinstance(ftime, float) and ftime != ftime): + if ftime is None or pd.isna(ftime): continue try: ftime_dt = pd.to_datetime(ftime, utc=True) @@ -1421,8 +1510,7 @@ def _fetch_open_meteo_range( try: issued_iso = ( pd.to_datetime(issued_at, utc=True).strftime("%Y-%m-%dT%H:%M:%SZ") - if issued_at is not None - and not (isinstance(issued_at, float) and issued_at != issued_at) + if issued_at is not None and not pd.isna(issued_at) else None ) except Exception: @@ -1430,21 +1518,21 @@ def _fetch_open_meteo_range( valid_iso = ftime_dt.strftime("%Y-%m-%dT%H:%M:%SZ") temp_c = row.get("temp_c") temperature_f: float | None = None - if temp_c is not None and not (isinstance(temp_c, float) and temp_c != temp_c): + if temp_c is not None and not pd.isna(temp_c): try: temperature_f = float(temp_c) * 9.0 / 5.0 + 32.0 except (TypeError, ValueError): temperature_f = None pop_prob = row.get("precip_probability") pop_6hr_pct: float | None = None - if pop_prob is not None and not (isinstance(pop_prob, float) and pop_prob != pop_prob): + if pop_prob is not None and not pd.isna(pop_prob): try: pop_6hr_pct = float(pop_prob) * 100.0 except (TypeError, ValueError): pop_6hr_pct = None precip_mm = row.get("precipitation_mm") qpf_6hr_in: float | None = None - if precip_mm is not None and not (isinstance(precip_mm, float) and precip_mm != precip_mm): + if precip_mm is not None and not pd.isna(precip_mm): try: qpf_6hr_in = float(precip_mm) / 25.4 except (TypeError, ValueError): diff --git a/packages/core/tests/test_open_meteo_cache_wiring.py b/packages/core/tests/test_open_meteo_cache_wiring.py new file mode 100644 index 0000000..5d60966 --- /dev/null +++ b/packages/core/tests/test_open_meteo_cache_wiring.py @@ -0,0 +1,458 @@ +"""Issue #64 Fix 1: ``_fetch_open_meteo_range`` reads and writes the +Phase-20 forecast cache. + +Previous-runs / single-runs / seamless forecast data is immutable, but the +cache built in Phase 20 OM-06 had no production caller. A second call to +``research(..., forecast_source="open_meteo")`` with the same args must +serve the cached parquet rather than re-fetching. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any +from unittest.mock import patch + +import pandas as pd +import pytest +from mostlyright._internal._stations import STATIONS + + +def _fake_om_payload_df(from_date: str, to_date: str) -> pd.DataFrame: + """Build a minimal Open-Meteo response DataFrame across [from_date, to_date].""" + rows: list[dict[str, Any]] = [] + cur = pd.Timestamp(from_date, tz="UTC") + end = pd.Timestamp(to_date, tz="UTC") + pd.Timedelta(days=1) + while cur < end: + rows.append( + { + "station": "KNYC", + "issued_at": cur - pd.Timedelta(days=1), + "valid_at": cur, + "forecast_hour": 24, + "model": "gfs_global", + "source": "open_meteo.previous_runs", + "temp_c": 20.0, + "dew_point_c": None, + "wind_speed_ms": None, + "wind_dir_deg": None, + "precip_probability": 0.10, + "sky_cover_pct": None, + "apparent_temp_c": None, + "shortwave_radiation_wm2": None, + "direct_radiation_wm2": None, + "cape_jkg": None, + "precipitation_mm": 0.5, + "cloud_cover_pct": None, + "surface_pressure_hpa": None, + "pressure_msl_hpa": None, + "freezing_level_m": None, + "snow_depth_m": None, + "visibility_m": None, + "wind_gusts_ms": None, + "weather_code": None, + "retrieved_at": pd.Timestamp.now(tz="UTC"), + } + ) + cur = cur + pd.Timedelta(hours=1) + return pd.DataFrame(rows) + + +def test_fetch_open_meteo_range_writes_forecast_cache( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """First call must hit fetch_open_meteo AND populate the cache parquet.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + + info = STATIONS["NYC"] + df = _fake_om_payload_df("2024-06-01", "2024-06-02") + + call_count = {"n": 0} + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + call_count["n"] += 1 + return df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + out = _fetch_open_meteo_range(info, "2024-06-01", "2024-06-02", model="gfs_global") + + assert call_count["n"] >= 1 + assert out # produced some dates + # Cache file must exist for the 2024-06 partition. + from mostlyright.weather.cache import forecast_cache_path + + cache_file = forecast_cache_path("KNYC", "open_meteo.previous_runs", "gfs_global", 2024, 6) + assert cache_file.exists(), f"expected cache parquet at {cache_file}" + + +def test_fetch_open_meteo_range_second_call_uses_cache( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Second call with the same args must serve from cache (no fetch_open_meteo).""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + + info = STATIONS["NYC"] + df = _fake_om_payload_df("2024-06-01", "2024-06-02") + + call_count = {"n": 0} + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + call_count["n"] += 1 + return df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + _fetch_open_meteo_range(info, "2024-06-01", "2024-06-02", model="gfs_global") + first_n = call_count["n"] + # Second call — exactly the same args. + out2 = _fetch_open_meteo_range(info, "2024-06-01", "2024-06-02", model="gfs_global") + + assert first_n >= 1 + # Second call must NOT increment call_count — cache hit. + assert call_count["n"] == first_n, ( + f"second call refetched: count went {first_n} -> {call_count['n']}" + ) + # And it must still return non-empty groups. + assert out2 + + +def test_fetch_open_meteo_range_trims_to_three_pairs_variables( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Research path must request only the 3 pairs-join variables, not all 18.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + + info = STATIONS["NYC"] + df = _fake_om_payload_df("2024-06-01", "2024-06-02") + + captured: list[dict[str, Any]] = [] + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + captured.append(dict(kwargs)) + return df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + _fetch_open_meteo_range(info, "2024-06-01", "2024-06-02", model="gfs_global") + + assert captured, "expected fetch_open_meteo to be called" + vars_passed = captured[0].get("variables") + assert vars_passed is not None, "expected variables= kwarg on the research path" + assert set(vars_passed) == { + "temperature_2m", + "precipitation", + "precipitation_probability", + } + + +def test_fetch_open_meteo_range_partial_cache_hit( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """If June and August are missing but July is cached, the fetch span covers [June, August]. + Only June and August records are added from the fetch result, and July is served from cache, + ensuring no duplicate July records are added to all_rows. + """ + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + from mostlyright.weather.cache import forecast_cache_path, write_forecast_cache + + info = STATIONS["NYC"] + model = "gfs_global" + source = "open_meteo.previous_runs" + + # Pre-cache July 2024 + july_cached_rows = [ + { + "station": "KNYC", + "issued_at": pd.Timestamp("2024-07-15T12:00:00Z"), + "valid_at": pd.Timestamp("2024-07-16T12:00:00Z"), + "model": model, + "source": source, + "temp_c": 25.0, + "precip_probability": 0.0, + "precipitation_mm": 0.0, + } + ] + write_forecast_cache( + "KNYC", + source, + model, + 2024, + 7, + july_cached_rows, + from_date="2024-07-01", + to_date="2024-07-31", + ) + + # June & August fetched data + june_rows = _fake_om_payload_df("2024-06-15", "2024-06-15") + august_rows = _fake_om_payload_df("2024-08-15", "2024-08-15") + # The fetcher returns the whole fetched DataFrame including July data + july_fetched_rows = _fake_om_payload_df("2024-07-15", "2024-07-15") + df_fetched = pd.concat([june_rows, july_fetched_rows, august_rows], ignore_index=True) + + captured: list[dict[str, Any]] = [] + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + captured.append(kwargs) + return df_fetched + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + # Request June to August + out = _fetch_open_meteo_range(info, "2024-06-01", "2024-08-31", model=model) + + assert len(captured) == 1 + # Check that June and August caches are written + assert forecast_cache_path("KNYC", source, model, 2024, 6).exists() + assert forecast_cache_path("KNYC", source, model, 2024, 8).exists() + + # The returned July date must correspond to the CACHED July data (temp_c=25.0 -> temperature_f=77.0) + # and NOT the fetched July data (temp_c=20.0 -> temperature_f=68.0). + july_fcst_rows = out.get("2024-07-16", []) + assert july_fcst_rows, "July forecast rows must exist" + # Ensure there is exactly 1 July row, not duplicates + assert len(july_fcst_rows) == 1 + assert july_fcst_rows[0]["temperature_f"] == pytest.approx(77.0) + + +def test_fetch_open_meteo_range_handles_nat( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Rows with pd.NaT valid_at or issued_at must be handled gracefully without crashing.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + + info = STATIONS["NYC"] + model = "gfs_global" + + df_with_nat = _fake_om_payload_df("2024-06-01", "2024-06-02") + # Inject NaT values + df_with_nat.loc[0, "valid_at"] = pd.NaT + df_with_nat.loc[1, "issued_at"] = pd.NaT + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + return df_with_nat + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + out = _fetch_open_meteo_range(info, "2024-06-01", "2024-06-02", model=model) + + # Should run to completion and produce some non-empty results for the non-NaT valid_at rows + assert out + + +# --------------------------------------------------------------------------- +# Coverage metadata tests (fix/66) +# --------------------------------------------------------------------------- + + +def test_forecast_cache_partial_month_triggers_refetch( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Cache June 1-2, then request June 15-20 → cache miss, full month re-fetched.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + from mostlyright.weather.cache import forecast_cache_path, write_forecast_cache + + info = STATIONS["NYC"] + model = "gfs_global" + source = "open_meteo.previous_runs" + + # Simulate broken PR #66 state: partition with metadata covering only June 1-2. + partial_rows = [ + { + "station": "KNYC", + "issued_at": pd.Timestamp("2024-05-31T12:00:00Z"), + "valid_at": pd.Timestamp("2024-06-01T12:00:00Z"), + "model": model, + "source": source, + "temp_c": 20.0, + "precip_probability": 0.0, + "precipitation_mm": 0.0, + } + ] + write_forecast_cache( + "KNYC", source, model, 2024, 6, partial_rows, from_date="2024-06-01", to_date="2024-06-02" + ) + + # Full-month fetch returns data covering June 1-30. + full_june_df = _fake_om_payload_df("2024-06-01", "2024-06-30") + call_count = {"n": 0} + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + call_count["n"] += 1 + return full_june_df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + out = _fetch_open_meteo_range(info, "2024-06-15", "2024-06-20", model=model) + + assert call_count["n"] == 1, "expected exactly one network fetch" + assert out, "expected non-empty result for June 15-20" + + # Partition must be overwritten with full-month metadata. + import pyarrow.parquet as pq + from mostlyright.weather.cache import _FORECAST_CACHE_FROM_KEY, _FORECAST_CACHE_TO_KEY + + table = pq.read_table(forecast_cache_path("KNYC", source, model, 2024, 6)) + md = table.schema.metadata or {} + assert md.get(_FORECAST_CACHE_FROM_KEY) == b"2024-06-01" + assert md.get(_FORECAST_CACHE_TO_KEY) == b"2024-06-30" + + +def test_forecast_cache_full_month_hit_no_refetch( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Full-month partition → any subrange request is served from cache, no network call.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + from mostlyright.weather.cache import write_forecast_cache + + info = STATIONS["NYC"] + model = "gfs_global" + source = "open_meteo.previous_runs" + + # Pre-cache all of June with full-month metadata. + june_rows = [ + { + "station": "KNYC", + "issued_at": pd.Timestamp(f"2024-06-{d:02d}T00:00:00Z") - pd.Timedelta(days=1), + "valid_at": pd.Timestamp(f"2024-06-{d:02d}T12:00:00Z"), + "model": model, + "source": source, + "temp_c": float(d), + "precip_probability": 0.0, + "precipitation_mm": 0.0, + } + for d in range(1, 31) + ] + write_forecast_cache( + "KNYC", source, model, 2024, 6, june_rows, from_date="2024-06-01", to_date="2024-06-30" + ) + + call_count = {"n": 0} + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + call_count["n"] += 1 + return _fake_om_payload_df("2024-06-01", "2024-06-30") + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + out = _fetch_open_meteo_range(info, "2024-06-01", "2024-06-20", model=model) + + assert call_count["n"] == 0, "full-month cache hit should not trigger any network fetch" + assert out, "expected non-empty result" + + +def test_forecast_cache_backwards_compat_no_metadata( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Partition written without range metadata (old code) → treated as miss, re-fetched, repaired.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from mostlyright.research import _fetch_open_meteo_range + from mostlyright.weather.cache import ( + _FORECAST_CACHE_FROM_KEY, + _FORECAST_CACHE_TO_KEY, + forecast_cache_path, + write_forecast_cache, + ) + + info = STATIONS["NYC"] + model = "gfs_global" + source = "open_meteo.previous_runs" + + # Write partition the old way — no from_date/to_date kwargs → no range metadata. + old_rows = [ + { + "station": "KNYC", + "issued_at": pd.Timestamp("2024-06-01T00:00:00Z"), + "valid_at": pd.Timestamp("2024-06-01T12:00:00Z"), + "model": model, + "source": source, + "temp_c": 20.0, + "precip_probability": 0.0, + "precipitation_mm": 0.0, + } + ] + write_forecast_cache("KNYC", source, model, 2024, 6, old_rows) + + # Verify: no metadata in the file we just wrote. + import pyarrow.parquet as pq + + old_table = pq.read_table(forecast_cache_path("KNYC", source, model, 2024, 6)) + assert _FORECAST_CACHE_FROM_KEY not in (old_table.schema.metadata or {}) + + full_june_df = _fake_om_payload_df("2024-06-01", "2024-06-30") + call_count = {"n": 0} + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + call_count["n"] += 1 + return full_june_df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + _fetch_open_meteo_range(info, "2024-06-01", "2024-06-15", model=model) + + assert call_count["n"] == 1, "old partition without metadata must trigger re-fetch" + + # After re-fetch, partition must have range metadata. + new_table = pq.read_table(forecast_cache_path("KNYC", source, model, 2024, 6)) + md = new_table.schema.metadata or {} + assert md.get(_FORECAST_CACHE_FROM_KEY) == b"2024-06-01" + assert md.get(_FORECAST_CACHE_TO_KEY) == b"2024-06-30" + + +def test_forecast_cache_current_month_never_cached( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Current UTC month is never written to or served from cache.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + from datetime import UTC, datetime + + from mostlyright.research import _fetch_open_meteo_range + from mostlyright.weather.cache import forecast_cache_path + + info = STATIONS["NYC"] + model = "gfs_global" + source = "open_meteo.previous_runs" + + now = datetime.now(UTC) + cur_year, cur_month = now.year, now.month + from_iso = f"{cur_year}-{cur_month:02d}-01" + to_iso = f"{cur_year}-{cur_month:02d}-05" + + df = _fake_om_payload_df(from_iso, to_iso) + + def fake_fetch(*args: Any, **kwargs: Any) -> pd.DataFrame: + return df + + with patch( + "mostlyright.weather._fetchers._open_meteo.fetch_open_meteo", + side_effect=fake_fetch, + ): + _fetch_open_meteo_range(info, from_iso, to_iso, model=model) + + # No cache file should be written for the current UTC month. + cache_file = forecast_cache_path("KNYC", source, model, cur_year, cur_month) + assert not cache_file.exists(), "current UTC month must never be cached" diff --git a/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py b/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py index 214b97b..99972db 100644 --- a/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py +++ b/packages/weather/src/mostlyright/weather/_fetchers/_open_meteo.py @@ -36,7 +36,8 @@ import logging import time -from datetime import UTC, datetime +from datetime import UTC, date, datetime, timedelta +from math import ceil from typing import Any, Literal import httpx @@ -62,6 +63,10 @@ #: Polite floor — 5 req/s single-worker (tighter than the documented 600/min). _OM_POLITE_DELAY_S: float = 0.2 +#: Open-Meteo per-call weight thresholds (free tier billing model). +_OM_MAX_DAYS_PER_CALL: int = 14 +_OM_VAR_FREE_BUDGET: int = 10 + #: Retry-After cap (mirrors ``_kalshi_client._RETRY_AFTER_CAP_SECONDS``). _RETRY_AFTER_CAP_SECONDS: float = 60.0 _MAX_RETRIES: int = 3 @@ -150,6 +155,40 @@ ) +def _chunk_date_range( + from_date: str, + to_date: str, + max_days: int = _OM_MAX_DAYS_PER_CALL, +) -> list[tuple[str, str]]: + """Split [from_date, to_date] into ≤max_days-day chunks.""" + start = date.fromisoformat(from_date) + end = date.fromisoformat(to_date) + chunks: list[tuple[str, str]] = [] + cur = start + while cur <= end: + chunk_end = min(cur + timedelta(days=max_days - 1), end) + chunks.append((cur.isoformat(), chunk_end.isoformat())) + cur = chunk_end + timedelta(days=1) + return chunks + + +def _weighted_call_cost(num_vars: int, num_days: int) -> float: + """Open-Meteo weighted call cost: ceil(vars/10) * ceil(days/14).""" + return float(ceil(num_vars / _OM_VAR_FREE_BUDGET) * ceil(num_days / _OM_MAX_DAYS_PER_CALL)) + + +def _validate_variables(variables: tuple[str, ...] | None) -> tuple[str, ...]: + """Validate caller-supplied variables; return full default set when None.""" + if variables is None: + return _OM_VARIABLES_TO_FETCH + unknown = [v for v in variables if v not in _OM_VAR_TO_COLUMN] + if unknown: + raise ValueError( + f"unknown OM variable(s) — {unknown!r}; allowed: {sorted(_OM_VAR_TO_COLUMN)}" + ) + return tuple(variables) + + def _station_to_lat_lon(station: str) -> tuple[float, float]: """Resolve an ICAO (or 3-letter US) station to ``(latitude, longitude)``. @@ -221,7 +260,10 @@ def _dispatch_endpoint( raise ValueError(f"mode must be one of {sorted(_VALID_MODES)}; got {mode!r}") -def _build_hourly_param(endpoint: str) -> str: +def _build_hourly_param( + endpoint: str, + variables: tuple[str, ...] = _OM_VARIABLES_TO_FETCH, +) -> str: """Build the comma-separated ``hourly=...`` URL param. Previous Runs API: suffix every variable with ``_previous_day1`` @@ -229,9 +271,9 @@ def _build_hourly_param(endpoint: str) -> str: Single Runs / Seamless: no suffix (exact-cycle / seamless stream). """ if endpoint == OPEN_METEO_PREVIOUS_RUNS_URL: - return ",".join(f"{v}_previous_day1" for v in _OM_VARIABLES_TO_FETCH) + return ",".join(f"{v}_previous_day1" for v in variables) # Single Runs / Seamless / Live: bare variable names (no suffix). - return ",".join(_OM_VARIABLES_TO_FETCH) + return ",".join(variables) def _parse_value(value: Any) -> float | None: @@ -471,6 +513,7 @@ def fetch_open_meteo( mode: Mode = "training", issued_at: str | None = None, allow_leakage: bool = False, + variables: tuple[str, ...] | None = None, client: httpx.Client | None = None, timeout: float = HTTP_TIMEOUT, ) -> pd.DataFrame: @@ -488,6 +531,9 @@ def fetch_open_meteo( cycle provenance. allow_leakage: Required ``True`` when ``mode='seamless'``; raises :class:`OpenMeteoSeamlessLeakageError` otherwise. + variables: Subset of :data:`_OM_VARIABLES_TO_FETCH` to request. + ``None`` (default) requests all 18. Unknown names raise + :class:`ValueError` before any HTTP request. client: Optional :class:`httpx.Client` (test-injection seam). timeout: Per-request timeout in seconds. @@ -496,11 +542,14 @@ def fetch_open_meteo( (with canonical columns + dtypes) on 404 or empty response. Raises: - ValueError: unknown model, unknown mode, or unknown station. + ValueError: unknown model, unknown mode, unknown station, or unknown + variable name (checked before any HTTP request). OpenMeteoSeamlessLeakageError: ``mode='seamless'`` without ``allow_leakage=True``. Raised BEFORE any HTTP request. NotImplementedError: ``mode='live'`` (deferred to PLAN-05). """ + vars_to_fetch = _validate_variables(variables) + if model not in OPEN_METEO_MODELS: raise ValueError( f"model must be one of {sorted(OPEN_METEO_MODELS)[:5]}... " @@ -514,71 +563,90 @@ def fetch_open_meteo( ) lat, lon = _station_to_lat_lon(station) - params: dict[str, Any] = { - "latitude": lat, - "longitude": lon, - "models": model, - "hourly": _build_hourly_param(endpoint), - "timezone": "UTC", - "timeformat": "iso8601", - } - if endpoint == OPEN_METEO_SINGLE_RUNS_URL: - # Single-Runs API rejects start_date/end_date; use run= only. - # The response contains the full horizon (up to 168 h); we clip - # to [from_date, to_date] after parsing (see below). - params["run"] = issued_at + + # Chunk date ranges >14 days for Previous Runs API (no issued_at). + # Single Runs uses run= and returns a full 168h horizon — no chunking. + if issued_at is None and endpoint == OPEN_METEO_PREVIOUS_RUNS_URL: + chunks = _chunk_date_range(from_date, to_date) else: - params["start_date"] = from_date - params["end_date"] = to_date + chunks = [(from_date, to_date)] - close_client = False + close_client = client is None if client is None: client = httpx.Client(timeout=timeout) - close_client = True - retrieved_at = datetime.now(UTC) - payload: dict[str, Any] = {} + frames: list[pd.DataFrame] = [] try: - for attempt in range(_MAX_RETRIES + 1): - try: - resp = client.get(endpoint, params=params) - resp.raise_for_status() - payload = resp.json() - break - except httpx.HTTPStatusError as exc: - status = getattr(exc.response, "status_code", None) - if status == 404: - log.debug("open_meteo 404 on %s; skipping", endpoint) - return _empty_df() - if status == 429 and attempt < _MAX_RETRIES: - retry_after = _parse_retry_after_seconds( - exc.response.headers.get("Retry-After") - ) - sleep_for = max(retry_after, _OM_POLITE_DELAY_S * (attempt + 1)) - log.warning( - "open_meteo 429 — sleeping %.1fs (attempt %d)", - sleep_for, - attempt + 1, + for chunk_from, chunk_to in chunks: + params: dict[str, Any] = { + "latitude": lat, + "longitude": lon, + "models": model, + "hourly": _build_hourly_param(endpoint, vars_to_fetch), + "timezone": "UTC", + "timeformat": "iso8601", + } + if endpoint == OPEN_METEO_SINGLE_RUNS_URL: + # Single-Runs API rejects start_date/end_date; use run= only. + # The response contains the full horizon (up to 168 h); we clip + # to [from_date, to_date] after parsing (see below). + params["run"] = issued_at + else: + params["start_date"] = chunk_from + params["end_date"] = chunk_to + + retrieved_at = datetime.now(UTC) + payload: dict[str, Any] = {} + for attempt in range(_MAX_RETRIES + 1): + try: + resp = client.get(endpoint, params=params) + resp.raise_for_status() + payload = resp.json() + break + except httpx.HTTPStatusError as exc: + status = getattr(exc.response, "status_code", None) + if status == 404: + log.debug("open_meteo 404 on %s; skipping", endpoint) + payload = {} + break + if status == 429 and attempt < _MAX_RETRIES: + retry_after = _parse_retry_after_seconds( + exc.response.headers.get("Retry-After") + ) + sleep_for = max(retry_after, _OM_POLITE_DELAY_S * (attempt + 1)) + log.warning( + "open_meteo 429 — sleeping %.1fs (attempt %d)", + sleep_for, + attempt + 1, + ) + time.sleep(sleep_for) + continue + raise + + # Weight-aware polite delay scales with per-call cost. + num_days = (date.fromisoformat(chunk_to) - date.fromisoformat(chunk_from)).days + 1 + cost = _weighted_call_cost(len(vars_to_fetch), num_days) + time.sleep(_OM_POLITE_DELAY_S * ceil(cost)) + + if payload: + frames.append( + _project_payload_to_dataframe( + payload, + station=station, + model=model, + endpoint=endpoint, + issued_at_str=issued_at, + retrieved_at=retrieved_at, ) - time.sleep(sleep_for) - continue - raise - time.sleep(_OM_POLITE_DELAY_S) + ) finally: if close_client: client.close() - if not payload: + if not frames: return _empty_df() - df = _project_payload_to_dataframe( - payload, - station=station, - model=model, - endpoint=endpoint, - issued_at_str=issued_at, - retrieved_at=retrieved_at, - ) + df = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0] # Single-Runs returns the full horizon from run=; clip to requested window. if endpoint == OPEN_METEO_SINGLE_RUNS_URL and not df.empty: diff --git a/packages/weather/src/mostlyright/weather/cache.py b/packages/weather/src/mostlyright/weather/cache.py index ce25ba9..d2c3ded 100644 --- a/packages/weather/src/mostlyright/weather/cache.py +++ b/packages/weather/src/mostlyright/weather/cache.py @@ -534,6 +534,10 @@ def forecast_cache_path( return raw +_FORECAST_CACHE_FROM_KEY = b"_forecast_cache_from" +_FORECAST_CACHE_TO_KEY = b"_forecast_cache_to" + + def _is_seamless_source(source: str | None) -> bool: """True if ``source`` is the banned Open-Meteo seamless endpoint.""" return source == "open_meteo.seamless" @@ -545,6 +549,9 @@ def read_forecast_cache( model: str, year: int, month: int, + *, + need_from: str | None = None, + need_to: str | None = None, ) -> list[dict] | None: """Return cached forecast rows for the partition key or ``None`` on miss. @@ -552,6 +559,8 @@ def read_forecast_cache( - the partition file does not exist - ``source`` is live or seamless (never cached) - (year, month) is the current UTC month (cycles may still publish) + - the partition has no coverage metadata (old format; re-fetch to repair) + - the cached range does not cover [need_from, need_to] when supplied """ if _is_live_source(source) or _is_seamless_source(source): return None @@ -565,6 +574,36 @@ def read_forecast_cache( table = pq.read_table(path) except (FileNotFoundError, OSError): return None + metadata = table.schema.metadata or {} + cached_from = metadata.get(_FORECAST_CACHE_FROM_KEY, b"").decode("utf-8") + cached_to = metadata.get(_FORECAST_CACHE_TO_KEY, b"").decode("utf-8") + if not cached_from or not cached_to: + logger.debug( + "forecast cache: no range metadata for %s/%s %04d-%02d; treating as miss", + station, + model, + year, + month, + ) + return None + if ( + need_from is not None + and need_to is not None + and (cached_from > need_from or cached_to < need_to) + ): + logger.debug( + "forecast cache: partial coverage for %s/%s %04d-%02d " + "(cached=%s..%s need=%s..%s); treating as miss", + station, + model, + year, + month, + cached_from, + cached_to, + need_from, + need_to, + ) + return None return table.to_pylist() @@ -575,6 +614,9 @@ def write_forecast_cache( year: int, month: int, rows: list[dict], + *, + from_date: str | None = None, + to_date: str | None = None, ) -> None: """Atomically write ``rows`` to the forecast cache partition. @@ -582,6 +624,10 @@ def write_forecast_cache( - ``source`` is live or seamless (never cached) - (year, month) is the current UTC month (cycles may still publish) - ``rows`` is empty + + Pass ``from_date`` and ``to_date`` (ISO-8601, inclusive) to embed coverage + metadata in the parquet. ``read_forecast_cache`` treats partitions without + this metadata as cache misses so callers should always supply them. """ if _is_live_source(source) or _is_seamless_source(source): logger.debug( @@ -606,6 +652,11 @@ def write_forecast_cache( if not rows: return table = pa.Table.from_pylist(rows) + if from_date is not None and to_date is not None: + md = dict(table.schema.metadata or {}) + md[_FORECAST_CACHE_FROM_KEY] = from_date.encode("utf-8") + md[_FORECAST_CACHE_TO_KEY] = to_date.encode("utf-8") + table = table.replace_schema_metadata(md) _atomic_write(forecast_cache_path(station, source, model, year, month), table) @@ -633,6 +684,8 @@ def invalidate_forecast( "DEFAULT_ROOT", "_CACHE_SCHEMA_VERSION", # Phase 18 PREC-04 "_CACHE_SCHEMA_VERSION_KEY", # Phase 18 PREC-04 + "_FORECAST_CACHE_FROM_KEY", + "_FORECAST_CACHE_TO_KEY", "_has_cached_year", "cache_path", "climate_cache_path", diff --git a/packages/weather/tests/test_cache_forecasts.py b/packages/weather/tests/test_cache_forecasts.py index b038480..1ee3455 100644 --- a/packages/weather/tests/test_cache_forecasts.py +++ b/packages/weather/tests/test_cache_forecasts.py @@ -38,7 +38,16 @@ def test_write_then_read_forecast_cache(tmp_path: Path, monkeypatch: pytest.Monk "temp_c": 22.5, } ] - write_forecast_cache("KNYC", "open_meteo.previous_runs", "gfs_global", 2024, 6, rows) + write_forecast_cache( + "KNYC", + "open_meteo.previous_runs", + "gfs_global", + 2024, + 6, + rows, + from_date="2024-06-01", + to_date="2024-06-30", + ) got = read_forecast_cache("KNYC", "open_meteo.previous_runs", "gfs_global", 2024, 6) assert got is not None assert len(got) == 1 diff --git a/packages/weather/tests/test_open_meteo_variables_param.py b/packages/weather/tests/test_open_meteo_variables_param.py new file mode 100644 index 0000000..034d1ef --- /dev/null +++ b/packages/weather/tests/test_open_meteo_variables_param.py @@ -0,0 +1,152 @@ +"""Issue #64 Fix 3: variables= param trims the OM hourly variable list. + +The pairs join in ``research()`` only consumes temperature, precipitation, +and precipitation_probability — over-fetching 18 variables triples the +weighted Open-Meteo call cost. ``fetch_open_meteo(variables=...)`` lets the +caller (``_fetch_open_meteo_range``) request only the columns it actually +needs while the standalone DataFrame API keeps the full 18-variable default. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import httpx +from mostlyright.weather._fetchers._open_meteo import ( + _OM_VARIABLES_TO_FETCH, + fetch_open_meteo, +) + + +def _hourly_param(url: str) -> list[str]: + """Extract the hourly= query value, URL-decoded into a list of names.""" + qp = httpx.QueryParams(httpx.URL(url).query) + raw = qp.get("hourly", "") + assert raw, f"no hourly= in {url!r}" + return raw.split(",") + + +def test_default_variables_unchanged_full_18() -> None: + """Standalone API: bare call still requests the full 18-variable set.""" + calls: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append(str(request.url)) + return httpx.Response( + 200, + json={ + "latitude": 40.78, + "longitude": -73.97, + "elevation": 51.0, + "hourly_units": {"time": "iso8601"}, + "hourly": {"time": []}, + }, + ) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-01", + model="gfs_global", + mode="training", + client=client, + ) + requested = _hourly_param(calls[0]) + # 18 variables with _previous_day1 suffix + assert len(requested) == len(_OM_VARIABLES_TO_FETCH) + assert all(v.endswith("_previous_day1") for v in requested) + + +def test_variables_param_trims_request_previous_runs() -> None: + """variables=('temperature_2m', 'precipitation', 'precipitation_probability') + must produce exactly 3 hourly params with the previous_day1 suffix.""" + calls: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append(str(request.url)) + return httpx.Response( + 200, + json={ + "latitude": 40.78, + "longitude": -73.97, + "elevation": 51.0, + "hourly_units": {"time": "iso8601"}, + "hourly": {"time": []}, + }, + ) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-01", + model="gfs_global", + mode="training", + variables=("temperature_2m", "precipitation", "precipitation_probability"), + client=client, + ) + requested = _hourly_param(calls[0]) + assert sorted(requested) == sorted( + [ + "temperature_2m_previous_day1", + "precipitation_previous_day1", + "precipitation_probability_previous_day1", + ] + ) + + +def test_variables_param_rejects_unknown_variable() -> None: + """Unknown variable name must raise ValueError before any HTTP request.""" + import pytest + + client = MagicMock(spec=httpx.Client) + with pytest.raises(ValueError, match=r"unknown.*variable"): + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-01", + model="gfs_global", + mode="training", + variables=("temperature_2m", "bogus_variable_42"), + client=client, + ) + assert not client.get.called + + +def test_variables_param_single_runs_no_suffix() -> None: + """Single-Runs API uses bare variable names (no _previous_day1 suffix).""" + calls: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append(str(request.url)) + return httpx.Response( + 200, + json={ + "latitude": 40.78, + "longitude": -73.97, + "elevation": 51.0, + "hourly_units": {"time": "iso8601", "temperature_2m": "°C"}, + "hourly": { + "time": ["2024-06-01T06:00"], + "temperature_2m": [22.0], + }, + }, + ) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-01", + model="gfs_global", + mode="training", + issued_at="2024-06-01T06:00", + variables=("temperature_2m",), + client=client, + ) + requested = _hourly_param(calls[0]) + assert requested == ["temperature_2m"] diff --git a/packages/weather/tests/test_open_meteo_window_chunking.py b/packages/weather/tests/test_open_meteo_window_chunking.py new file mode 100644 index 0000000..ea891e4 --- /dev/null +++ b/packages/weather/tests/test_open_meteo_window_chunking.py @@ -0,0 +1,143 @@ +"""Issue #64 Fix 2: chunk windows >14 days so per-call weighted cost stays ≤1.x. + +Open-Meteo's free tier bills by weighted call cost where every 14 days *or* +every 10 variables doubles the weight. A 1-year window with the default 18 +variables is a ~47-weighted single call, exhausting the 600/min budget after +~13 stations. The fetcher's own docstring warns "longer windows must chunk +client-side" — these tests pin that behaviour. +""" + +from __future__ import annotations + +import httpx +import pandas as pd +from mostlyright.weather._fetchers._open_meteo import fetch_open_meteo + + +def _payload_for_window(from_date: str, to_date: str) -> dict: + """Build a minimal Open-Meteo payload covering [from_date, to_date].""" + start = pd.Timestamp(from_date) + end = pd.Timestamp(to_date) + pd.Timedelta(days=1) + hours = [] + cur = start + while cur < end: + hours.append(cur.strftime("%Y-%m-%dT%H:%M")) + cur = cur + pd.Timedelta(hours=1) + n = len(hours) + return { + "latitude": 40.78, + "longitude": -73.97, + "elevation": 51.0, + "hourly_units": { + "time": "iso8601", + "temperature_2m_previous_day1": "°C", + }, + "hourly": { + "time": hours, + "temperature_2m_previous_day1": [20.0] * n, + }, + } + + +def test_window_under_14_days_single_call() -> None: + """A 7-day window should produce exactly one HTTP call.""" + calls: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + qp = dict(httpx.QueryParams(request.url.query)) + calls.append(qp) + return httpx.Response(200, json=_payload_for_window(qp["start_date"], qp["end_date"])) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-07", + model="gfs_global", + mode="training", + variables=("temperature_2m",), + client=client, + ) + assert len(calls) == 1 + assert calls[0]["start_date"] == "2024-06-01" + assert calls[0]["end_date"] == "2024-06-07" + + +def test_window_over_14_days_is_chunked() -> None: + """A 30-day window must be split into ≤14-day chunks (3 calls).""" + calls: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + qp = dict(httpx.QueryParams(request.url.query)) + calls.append(qp) + return httpx.Response(200, json=_payload_for_window(qp["start_date"], qp["end_date"])) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + df = fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-30", + model="gfs_global", + mode="training", + variables=("temperature_2m",), + client=client, + ) + # ceil(30 / 14) == 3 chunks + assert len(calls) >= 2 + assert len(calls) <= 3 + # Every chunk must span at most 14 days. + for qp in calls: + start = pd.Timestamp(qp["start_date"]) + end = pd.Timestamp(qp["end_date"]) + assert (end - start).days <= 13, ( + f"chunk {qp['start_date']}..{qp['end_date']} exceeds 14 days" + ) + # Concatenation must still cover the whole 30 days. + chunk_starts = sorted(qp["start_date"] for qp in calls) + chunk_ends = sorted(qp["end_date"] for qp in calls) + assert chunk_starts[0] == "2024-06-01" + assert chunk_ends[-1] == "2024-06-30" + # The merged DataFrame must contain rows across the full range. + assert not df.empty + assert df["valid_at"].min() <= pd.Timestamp("2024-06-02", tz="UTC") + assert df["valid_at"].max() >= pd.Timestamp("2024-06-29", tz="UTC") + + +def test_single_runs_mode_not_chunked() -> None: + """Single-Runs uses run=, returns full 168h horizon; chunking does not apply.""" + calls: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + qp = dict(httpx.QueryParams(request.url.query)) + calls.append(qp) + hours = [ + (pd.Timestamp("2024-06-01T06:00") + pd.Timedelta(hours=i)).isoformat() + for i in range(168) + ] + return httpx.Response( + 200, + json={ + "latitude": 40.78, + "longitude": -73.97, + "elevation": 51.0, + "hourly_units": {"time": "iso8601", "temperature_2m": "°C"}, + "hourly": {"time": hours, "temperature_2m": list(range(168))}, + }, + ) + + transport = httpx.MockTransport(handler) + client = httpx.Client(transport=transport) + fetch_open_meteo( + "NYC", + "2024-06-01", + "2024-06-30", # asking for 30 days; Single-Runs returns 7 days from run= + model="gfs_global", + mode="training", + issued_at="2024-06-01T06:00", + client=client, + ) + # Single-Runs uses run= once; no client-side chunking. + assert len(calls) == 1 + assert calls[0].get("run") == "2024-06-01T06:00"