From 55f87fbb3a1d40ad3af639ea08c9aaaedf3598e9 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Sat, 20 Jun 2026 02:23:50 +0800 Subject: [PATCH] Add last-valid fallback for feature snapshots --- .../common/feature_snapshot.py | 341 +++++++++++++++++- .../common/feature_snapshot_runtime.py | 6 + tests/test_feature_snapshot.py | 65 ++++ 3 files changed, 409 insertions(+), 3 deletions(-) diff --git a/src/quant_platform_kit/common/feature_snapshot.py b/src/quant_platform_kit/common/feature_snapshot.py index 22eb8ac..f8d238d 100644 --- a/src/quant_platform_kit/common/feature_snapshot.py +++ b/src/quant_platform_kit/common/feature_snapshot.py @@ -4,11 +4,12 @@ import hashlib import json +import shutil import tempfile from dataclasses import dataclass -from datetime import timezone +from datetime import datetime, timezone from pathlib import Path -from typing import Iterable +from typing import Any, Iterable, Mapping import pandas as pd @@ -17,6 +18,13 @@ DEFAULT_MAX_SNAPSHOT_MONTH_LAG = 1 DEFAULT_SNAPSHOT_MANIFEST_SUFFIX = ".manifest.json" DEFAULT_ARTIFACT_CACHE_DIR = Path(tempfile.gettempdir()) / "quant_strategy_artifacts" +DEFAULT_FEATURE_SNAPSHOT_FALLBACK_MODE = "none" +FEATURE_SNAPSHOT_FALLBACK_MODE_NONE = "none" +FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID = "last_valid" +DEFAULT_FEATURE_SNAPSHOT_FALLBACK_MAX_STALE_DAYS = 3 +DEFAULT_FEATURE_SNAPSHOT_FALLBACK_CACHE_DIR = ( + DEFAULT_ARTIFACT_CACHE_DIR / "last_valid_feature_snapshots" +) _MANIFEST_DIAGNOSTIC_FIELDS = ( "price_as_of", "universe_as_of", @@ -251,6 +259,73 @@ def load_feature_snapshot_guarded( expected_config_name: str | None = None, expected_config_path: str | None = None, expected_contract_version: str | None = None, + fallback_mode: str | None = DEFAULT_FEATURE_SNAPSHOT_FALLBACK_MODE, + fallback_cache_dir: str | Path | None = None, + fallback_max_stale_days: int | None = DEFAULT_FEATURE_SNAPSHOT_FALLBACK_MAX_STALE_DAYS, +) -> FeatureSnapshotGuardResult: + """Load a guarded snapshot, optionally falling back to the last valid artifact.""" + + fallback_context = _feature_snapshot_fallback_context( + path=path, + manifest_path=manifest_path, + expected_strategy_profile=expected_strategy_profile, + expected_config_name=expected_config_name, + expected_contract_version=expected_contract_version, + required_columns=required_columns, + snapshot_date_columns=snapshot_date_columns, + cache_dir=fallback_cache_dir, + ) + result = _load_feature_snapshot_guarded_without_fallback( + path, + run_as_of=run_as_of, + required_columns=required_columns, + snapshot_date_columns=snapshot_date_columns, + max_snapshot_month_lag=max_snapshot_month_lag, + manifest_path=manifest_path, + require_manifest=require_manifest, + expected_strategy_profile=expected_strategy_profile, + expected_config_name=expected_config_name, + expected_config_path=expected_config_path, + expected_contract_version=expected_contract_version, + ) + normalized_fallback_mode = normalize_feature_snapshot_fallback_mode(fallback_mode) + if result.metadata.get("snapshot_guard_decision") == "proceed": + if normalized_fallback_mode == FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID: + _write_feature_snapshot_last_valid(fallback_context, result.metadata) + return result + if normalized_fallback_mode != FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID: + return result + + fallback_result = _load_feature_snapshot_last_valid( + fallback_context, + run_as_of=run_as_of, + required_columns=required_columns, + snapshot_date_columns=snapshot_date_columns, + max_snapshot_month_lag=max_snapshot_month_lag, + require_manifest=require_manifest, + expected_strategy_profile=expected_strategy_profile, + expected_config_name=expected_config_name, + expected_config_path=expected_config_path, + expected_contract_version=expected_contract_version, + failed_metadata=result.metadata, + max_stale_days=fallback_max_stale_days, + ) + return fallback_result if fallback_result is not None else result + + +def _load_feature_snapshot_guarded_without_fallback( + path: str, + *, + run_as_of, + required_columns: Iterable[str] | None = None, + snapshot_date_columns: Iterable[str] = DEFAULT_SNAPSHOT_DATE_COLUMNS, + max_snapshot_month_lag: int = DEFAULT_MAX_SNAPSHOT_MONTH_LAG, + manifest_path: str | None = None, + require_manifest: bool = False, + expected_strategy_profile: str | None = None, + expected_config_name: str | None = None, + expected_config_path: str | None = None, + expected_contract_version: str | None = None, ) -> FeatureSnapshotGuardResult: raw_path = str(path or "").strip() if not raw_path: @@ -310,7 +385,7 @@ def load_feature_snapshot_guarded( ), ) - result = load_feature_snapshot_guarded( + result = _load_feature_snapshot_guarded_without_fallback( str(local_snapshot_path), run_as_of=run_as_of, required_columns=required_columns, @@ -769,3 +844,263 @@ def load_feature_snapshot_guarded( actual_config_sha256=actual_config_sha256, ), ) + + +def normalize_feature_snapshot_fallback_mode(value: object) -> str: + mode = str(value or DEFAULT_FEATURE_SNAPSHOT_FALLBACK_MODE).strip().lower().replace("-", "_") + aliases = { + "": FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + "off": FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + "disabled": FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + "false": FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + "none": FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + "last": FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + "last_valid": FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + "last_valid_snapshot": FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + } + normalized = aliases.get(mode, mode) + if normalized not in { + FEATURE_SNAPSHOT_FALLBACK_MODE_NONE, + FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + }: + raise ValueError( + "unsupported feature snapshot fallback mode " + f"{value!r}; supported: none, last_valid" + ) + return normalized + + +def _feature_snapshot_fallback_context( + *, + path: str, + manifest_path: str | None, + expected_strategy_profile: str | None, + expected_config_name: str | None, + expected_contract_version: str | None, + required_columns: Iterable[str] | None, + snapshot_date_columns: Iterable[str], + cache_dir: str | Path | None, +) -> dict[str, Any]: + raw_path = str(path or "").strip() + raw_manifest = str(manifest_path or "").strip() + payload = { + "path": raw_path, + "manifest_path": raw_manifest, + "expected_strategy_profile": str(expected_strategy_profile or "").strip(), + "expected_config_name": str(expected_config_name or "").strip(), + "expected_contract_version": str(expected_contract_version or "").strip(), + "required_columns": tuple(str(column) for column in (required_columns or ())), + "snapshot_date_columns": tuple(str(column) for column in snapshot_date_columns), + } + digest = hashlib.sha256( + json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode("utf-8") + ).hexdigest()[:24] + cache_root = Path(cache_dir or DEFAULT_FEATURE_SNAPSHOT_FALLBACK_CACHE_DIR) / digest + snapshot_suffix = Path(raw_path).suffix or ".snapshot" + return { + "cache_root": cache_root, + "record_path": cache_root / "record.json", + "snapshot_path": cache_root / f"snapshot{snapshot_suffix}", + "manifest_path": cache_root / "snapshot.manifest.json", + "source_path": raw_path, + "source_manifest_path": raw_manifest or _resolve_manifest_reference(raw_path, manifest_path), + "cache_key_payload": payload, + } + + +def _write_feature_snapshot_last_valid( + fallback_context: Mapping[str, Any], + metadata: Mapping[str, Any], +) -> None: + source_snapshot = _path_from_metadata( + metadata.get("snapshot_local_path") or metadata.get("snapshot_path") + ) + if source_snapshot is None or not source_snapshot.exists(): + return + + cache_root = Path(fallback_context["cache_root"]) + snapshot_path = Path(fallback_context["snapshot_path"]) + manifest_path = Path(fallback_context["manifest_path"]) + record_path = Path(fallback_context["record_path"]) + cache_root.mkdir(parents=True, exist_ok=True) + shutil.copy2(source_snapshot, snapshot_path) + + source_manifest = _path_from_metadata( + metadata.get("snapshot_manifest_local_path") or metadata.get("snapshot_manifest_path") + ) + manifest_copied = False + if source_manifest is not None and source_manifest.exists(): + shutil.copy2(source_manifest, manifest_path) + manifest_copied = True + + record = { + "schema_version": "feature_snapshot_last_valid.v1", + "saved_at": datetime.now(timezone.utc).isoformat(), + "source_path": fallback_context["source_path"], + "source_manifest_path": fallback_context["source_manifest_path"], + "snapshot_path": str(snapshot_path), + "manifest_path": str(manifest_path) if manifest_copied else None, + "snapshot_as_of": _json_safe_value(metadata.get("snapshot_as_of")), + "snapshot_sha256": _sha256_file(snapshot_path), + "manifest_sha256": _sha256_file(manifest_path) if manifest_copied else None, + "cache_key_payload": fallback_context["cache_key_payload"], + } + record_path.write_text(json.dumps(record, sort_keys=True, indent=2), encoding="utf-8") + + +def _load_feature_snapshot_last_valid( + fallback_context: Mapping[str, Any], + *, + run_as_of, + required_columns: Iterable[str] | None, + snapshot_date_columns: Iterable[str], + max_snapshot_month_lag: int, + require_manifest: bool, + expected_strategy_profile: str | None, + expected_config_name: str | None, + expected_config_path: str | None, + expected_contract_version: str | None, + failed_metadata: Mapping[str, Any], + max_stale_days: int | None, +) -> FeatureSnapshotGuardResult | None: + record_path = Path(fallback_context["record_path"]) + snapshot_path = Path(fallback_context["snapshot_path"]) + manifest_path = Path(fallback_context["manifest_path"]) + if not record_path.exists() or not snapshot_path.exists(): + return _feature_snapshot_fallback_failure( + failed_metadata, + reason="last_valid_missing", + fallback_context=fallback_context, + ) + try: + record = json.loads(record_path.read_text(encoding="utf-8")) + except Exception as exc: + return _feature_snapshot_fallback_failure( + failed_metadata, + reason=f"last_valid_record_invalid:{type(exc).__name__}:{exc}", + fallback_context=fallback_context, + ) + if not isinstance(record, Mapping): + return _feature_snapshot_fallback_failure( + failed_metadata, + reason="last_valid_record_invalid:not_mapping", + fallback_context=fallback_context, + ) + stale_reason = _feature_snapshot_fallback_stale_reason(record, max_stale_days=max_stale_days) + if stale_reason: + return _feature_snapshot_fallback_failure( + failed_metadata, + reason=stale_reason, + fallback_context=fallback_context, + record=record, + ) + if require_manifest and not manifest_path.exists(): + return _feature_snapshot_fallback_failure( + failed_metadata, + reason="last_valid_manifest_missing", + fallback_context=fallback_context, + record=record, + ) + + fallback_result = _load_feature_snapshot_guarded_without_fallback( + str(snapshot_path), + run_as_of=run_as_of, + required_columns=required_columns, + snapshot_date_columns=snapshot_date_columns, + max_snapshot_month_lag=max_snapshot_month_lag, + manifest_path=str(manifest_path) if manifest_path.exists() else None, + require_manifest=require_manifest, + expected_strategy_profile=expected_strategy_profile, + expected_config_name=expected_config_name, + expected_config_path=expected_config_path, + expected_contract_version=expected_contract_version, + ) + metadata = dict(fallback_result.metadata) + if metadata.get("snapshot_guard_decision") != "proceed": + return _feature_snapshot_fallback_failure( + failed_metadata, + reason=str(metadata.get("fail_reason") or metadata.get("no_op_reason") or "last_valid_guard_failed"), + fallback_context=fallback_context, + record=record, + ) + + metadata.update( + { + "feature_snapshot_path": fallback_context["source_path"], + "snapshot_path": fallback_context["source_path"], + "snapshot_manifest_path": fallback_context["source_manifest_path"], + "snapshot_local_path": str(snapshot_path), + "snapshot_manifest_local_path": str(manifest_path) if manifest_path.exists() else None, + "artifact_fallback_used": True, + "artifact_fallback_mode": FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + "artifact_fallback_reason": failed_metadata.get("fail_reason") + or failed_metadata.get("no_op_reason"), + "artifact_fallback_saved_at": record.get("saved_at"), + "artifact_fallback_cache_dir": str(fallback_context["cache_root"]), + "artifact_fallback_snapshot_path": str(snapshot_path), + "artifact_fallback_manifest_path": str(manifest_path) if manifest_path.exists() else None, + } + ) + return FeatureSnapshotGuardResult(frame=fallback_result.frame, metadata=metadata) + + +def _feature_snapshot_fallback_failure( + failed_metadata: Mapping[str, Any], + *, + reason: str, + fallback_context: Mapping[str, Any], + record: Mapping[str, Any] | None = None, +) -> FeatureSnapshotGuardResult: + metadata = dict(failed_metadata) + metadata.update( + { + "artifact_fallback_used": False, + "artifact_fallback_mode": FEATURE_SNAPSHOT_FALLBACK_MODE_LAST_VALID, + "artifact_fallback_fail_reason": reason, + "artifact_fallback_cache_dir": str(fallback_context["cache_root"]), + } + ) + if record is not None: + metadata["artifact_fallback_saved_at"] = record.get("saved_at") + return FeatureSnapshotGuardResult(frame=None, metadata=metadata) + + +def _feature_snapshot_fallback_stale_reason( + record: Mapping[str, Any], + *, + max_stale_days: int | None, +) -> str | None: + if max_stale_days is None: + return None + if int(max_stale_days) < 0: + return "last_valid_max_stale_days_negative" + saved_at = record.get("saved_at") + try: + saved_ts = pd.Timestamp(saved_at) + except Exception: + return "last_valid_saved_at_invalid" + if pd.isna(saved_ts): + return "last_valid_saved_at_invalid" + if saved_ts.tzinfo is None: + saved_ts = saved_ts.tz_localize(timezone.utc) + now_ts = pd.Timestamp(datetime.now(timezone.utc)) + if saved_ts < now_ts - pd.Timedelta(days=int(max_stale_days)): + return "last_valid_stale:saved_at=" + str(saved_at) + return None + + +def _path_from_metadata(value: object) -> Path | None: + text = str(value or "").strip() + if not text or text.startswith("gs://"): + return None + return Path(text) + + +def _json_safe_value(value: object) -> object: + if isinstance(value, pd.Timestamp): + return value.isoformat() + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return str(value) diff --git a/src/quant_platform_kit/common/feature_snapshot_runtime.py b/src/quant_platform_kit/common/feature_snapshot_runtime.py index dbacfc7..6bb39da 100644 --- a/src/quant_platform_kit/common/feature_snapshot_runtime.py +++ b/src/quant_platform_kit/common/feature_snapshot_runtime.py @@ -22,6 +22,9 @@ class FeatureSnapshotRuntimeSettings: feature_snapshot_path: str | None feature_snapshot_manifest_path: str | None = None + feature_snapshot_fallback_mode: str | None = None + feature_snapshot_fallback_cache_dir: str | None = None + feature_snapshot_fallback_max_stale_days: int | None = None strategy_config_path: str | None = None strategy_config_source: str | None = None dry_run_only: bool = False @@ -148,6 +151,9 @@ def evaluate_feature_snapshot_strategy( expected_config_name=runtime_config_name, expected_config_path=runtime_config_path, expected_contract_version=runtime_adapter.snapshot_contract_version, + fallback_mode=runtime_settings.feature_snapshot_fallback_mode, + fallback_cache_dir=runtime_settings.feature_snapshot_fallback_cache_dir, + fallback_max_stale_days=runtime_settings.feature_snapshot_fallback_max_stale_days, ) guard_metadata = dict(guard_result.metadata) if on_guard_metadata is not None: diff --git a/tests/test_feature_snapshot.py b/tests/test_feature_snapshot.py index 06397d9..3777150 100644 --- a/tests/test_feature_snapshot.py +++ b/tests/test_feature_snapshot.py @@ -124,6 +124,71 @@ def test_guard_includes_manifest_diagnostics_when_stale(self) -> None: self.assertIs(result.metadata["snapshot_manifest_source_input_fallback_used"], True) self.assertEqual(result.metadata["snapshot_manifest_source_input_fallback_streak"], 1) + def test_guard_uses_last_valid_snapshot_when_current_source_missing(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = Path(tmp_dir) + cache_dir = tmp_path / "fallback-cache" + snapshot_path = tmp_path / "snapshot.csv" + manifest_path = tmp_path / "snapshot.csv.manifest.json" + config_path = tmp_path / "config.json" + snapshot_path.write_text( + "as_of,symbol,close\n2026-04-01,QQQ,500\n", + encoding="utf-8", + ) + config_path.write_text('{"name": "feature_snapshot_strategy"}', encoding="utf-8") + snapshot_sha256 = hashlib.sha256(snapshot_path.read_bytes()).hexdigest() + config_sha256 = hashlib.sha256(config_path.read_bytes()).hexdigest() + manifest_path.write_text( + json.dumps( + { + "snapshot_as_of": "2026-04-01", + "strategy_profile": "feature_snapshot_strategy", + "config_name": "feature_snapshot_strategy", + "contract_version": "feature_snapshot_strategy.feature_snapshot.v1", + "snapshot_sha256": snapshot_sha256, + "config_sha256": config_sha256, + } + ), + encoding="utf-8", + ) + + first_result = load_feature_snapshot_guarded( + str(snapshot_path), + run_as_of="2026-04-02", + required_columns=("as_of", "symbol", "close"), + manifest_path=str(manifest_path), + require_manifest=True, + expected_strategy_profile="feature_snapshot_strategy", + expected_config_name="feature_snapshot_strategy", + expected_config_path=str(config_path), + expected_contract_version="feature_snapshot_strategy.feature_snapshot.v1", + fallback_mode="last_valid", + fallback_cache_dir=cache_dir, + ) + snapshot_path.unlink() + manifest_path.unlink() + + fallback_result = load_feature_snapshot_guarded( + str(snapshot_path), + run_as_of="2026-04-02", + required_columns=("as_of", "symbol", "close"), + manifest_path=str(manifest_path), + require_manifest=True, + expected_strategy_profile="feature_snapshot_strategy", + expected_config_name="feature_snapshot_strategy", + expected_config_path=str(config_path), + expected_contract_version="feature_snapshot_strategy.feature_snapshot.v1", + fallback_mode="last_valid", + fallback_cache_dir=cache_dir, + ) + + self.assertIsNotNone(first_result.frame) + self.assertIsNotNone(fallback_result.frame) + self.assertEqual(fallback_result.metadata["snapshot_guard_decision"], "proceed") + self.assertIs(fallback_result.metadata["artifact_fallback_used"], True) + self.assertEqual(fallback_result.metadata["artifact_fallback_mode"], "last_valid") + self.assertIn("feature_snapshot_missing", fallback_result.metadata["artifact_fallback_reason"]) + if __name__ == "__main__": unittest.main()