From 2a88d121f0da5e09ef2beece97b66cc88de4aca1 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Sun, 24 May 2026 16:18:32 +0800 Subject: [PATCH] Persist Firstrade strategy run state --- .env.example | 1 + .github/workflows/sync-cloud-run-env.yml | 2 + README.md | 11 ++ application/rebalance_service.py | 112 +++++++++++++ application/strategy_run_persistence.py | 195 +++++++++++++++++++++++ runtime_config_support.py | 2 + tests/test_rebalance_service.py | 92 +++++++++++ 7 files changed, 415 insertions(+) create mode 100644 application/strategy_run_persistence.py diff --git a/.env.example b/.env.example index 9aaa327..72e8dee 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,7 @@ FIRSTRADE_PERSIST_SESSION_CACHE=false FIRSTRADE_GCS_STATE_BUCKET= FIRSTRADE_STATE_PREFIX=firstrade-platform FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=false +FIRSTRADE_PERSIST_STRATEGY_RUNS=false FIRSTRADE_ENABLE_LIVE_TRADING=false FIRSTRADE_RUN_SMOKE_ON_HTTP=false FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=false diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index 1567a52..0626935 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -52,6 +52,7 @@ jobs: FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH: ${{ vars.FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH }} FIRSTRADE_GCS_STATE_BUCKET: ${{ vars.FIRSTRADE_GCS_STATE_BUCKET }} FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT: ${{ vars.FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT }} + FIRSTRADE_PERSIST_STRATEGY_RUNS: ${{ vars.FIRSTRADE_PERSIST_STRATEGY_RUNS }} FIRSTRADE_PERSIST_SESSION_CACHE: ${{ vars.FIRSTRADE_PERSIST_SESSION_CACHE }} FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP: ${{ vars.FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP }} FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS: ${{ vars.FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS }} @@ -404,6 +405,7 @@ jobs: add_optional_env FIRSTRADE_GCS_STATE_BUCKET add_optional_env FIRSTRADE_STATE_PREFIX add_optional_env FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT + add_optional_env FIRSTRADE_PERSIST_STRATEGY_RUNS add_optional_env FIRSTRADE_ENABLE_LIVE_TRADING add_optional_env FIRSTRADE_RUN_SMOKE_ON_HTTP add_optional_env FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP diff --git a/README.md b/README.md index 8006b2e..a1bcb71 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ commit credentials. | `FIRSTRADE_GCS_STATE_BUCKET` | Optional | GCS bucket for runtime state JSON, including persisted session cache and account funds snapshots | | `FIRSTRADE_STATE_PREFIX` | Optional | Object prefix within `FIRSTRADE_GCS_STATE_BUCKET`, default `firstrade-platform` | | `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT` | Optional | Persist compact masked account funds snapshots from `/session-check`. Defaults to `false` | +| `FIRSTRADE_PERSIST_STRATEGY_RUNS` | Optional | Persist `/run` strategy state, plans, and submitted/skipped order results to GCS. Defaults to `false` | | `ACCOUNT_PREFIX` | Optional | Alert/log prefix, default `FIRSTRADE` | | `ACCOUNT_REGION` | Optional | Runtime account scope, default `US` | | `NOTIFY_LANG` | Optional | Notification language, `en` or `zh` | @@ -197,6 +198,15 @@ to `accounts//funds/latest.json` plus a timestamped history path under the configured GCS prefix. Raw account IDs and login secrets are not included in the snapshot. +When `FIRSTRADE_PERSIST_STRATEGY_RUNS=true` and a GCS state bucket is configured, +`/run` writes strategy state to +`strategy-runs////latest.json` plus a +timestamped history path. The record includes the planned targets, compact +portfolio snapshot, evaluation metadata, submitted orders, skipped orders, and +stage (`ORDERS_PLANNED`, `DRY_RUN_COMPLETED`, or `SUBMITTED`). For live runs, +an existing terminal record in the same account/profile/month blocks duplicate +order submission. + ## Cloud Run Shape `main.py` exposes: @@ -230,6 +240,7 @@ runtime service account object read/write access, and set: - `FIRSTRADE_REUSE_SESSION=true` - `FIRSTRADE_PERSIST_SESSION_CACHE=true` - `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true` +- `FIRSTRADE_PERSIST_STRATEGY_RUNS=true` - `FIRSTRADE_GCS_STATE_BUCKET=` - `FIRSTRADE_STATE_PREFIX=firstrade-platform` - `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true` diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 62496fe..aa052d2 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -10,6 +10,7 @@ import os from collections.abc import Callable, Mapping +from datetime import datetime, timezone from typing import Any import pandas as pd @@ -24,6 +25,14 @@ mask_account_id, ) from application.runtime_broker_adapters import build_runtime_broker_adapters +from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env +from application.strategy_run_persistence import ( + build_strategy_run_state, + is_duplicate_live_run, + persist_strategy_run_state, + read_latest_strategy_run_state, + resolve_strategy_run_period, +) from decision_mapper import map_strategy_decision_to_plan from notifications.telegram import build_sender, build_translator, render_cycle_summary from quant_platform_kit.common.runtime_inputs import ( @@ -38,6 +47,10 @@ LIMIT_BUY_PREMIUM = 1.005 +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + def get_project_id() -> str | None: return os.getenv("GOOGLE_CLOUD_PROJECT") @@ -137,11 +150,15 @@ def run_strategy_cycle( runtime_settings: PlatformRuntimeSettings | None = None, credentials: FirstradeCredentials | None = None, client_factory: Callable[..., FirstradeBrokerClient] = FirstradeBrokerClient, + state_store: GcsStateStore | None = None, notification_sender: Callable[[str], None] | None = None, env_reader: Callable[[str, str | None], str | None] = os.getenv, ) -> dict[str, Any]: + now = _utcnow() settings = runtime_settings or load_platform_runtime_settings(project_id_resolver=get_project_id) resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader) + store = state_store or build_gcs_state_store_from_env(env_reader) + persist_strategy_runs = bool(settings.persist_strategy_runs and store is not None) client = _connect_client( credentials=resolved_credentials, live_trading_enabled=settings.live_trading_enabled, @@ -193,6 +210,70 @@ def run_strategy_cycle( plan, threshold_usd=settings.safe_haven_cash_substitute_threshold_usd, ) + run_period = resolve_strategy_run_period( + now=now, + plan=plan, + evaluation_metadata=getattr(evaluation, "metadata", None), + ) + masked_account = mask_account_id(account) + existing_run = None + if persist_strategy_runs and not settings.dry_run_only: + existing_run = read_latest_strategy_run_state( + store=store, + account=masked_account, + strategy_profile=strategy_runtime.profile, + run_period=run_period, + ) + if is_duplicate_live_run(existing_run): + return { + "ok": True, + "api_kind": "unofficial-reverse-engineered", + "account": masked_account, + "strategy_profile": strategy_runtime.profile, + "strategy_display_name": strategy_runtime.display_name, + "dry_run_only": settings.dry_run_only, + "live_trading_enabled": settings.live_trading_enabled, + "session_reused": bool(getattr(client, "session_reused", False)), + "strategy_run_period": run_period, + "strategy_run_persisted": False, + "idempotency_skipped": True, + "existing_strategy_run_stage": existing_run.get("stage"), + "existing_strategy_run_as_of": existing_run.get("as_of"), + "submitted_orders": [], + "skipped_orders": [ + { + "reason": "duplicate_live_strategy_run", + "run_period": run_period, + } + ], + "action_done": False, + } + strategy_run_persisted = False + strategy_run_persistence_error = None + if persist_strategy_runs: + planned_state = build_strategy_run_state( + stage="ORDERS_PLANNED", + account=masked_account, + strategy_profile=strategy_runtime.profile, + strategy_display_name=strategy_runtime.display_name, + run_period=run_period, + dry_run_only=settings.dry_run_only, + live_trading_enabled=settings.live_trading_enabled, + session_reused=bool(getattr(client, "session_reused", False)), + portfolio_snapshot=plan.get("portfolio", {}), + evaluation_metadata=getattr(evaluation, "metadata", None), + plan=plan, + now=now, + ) + try: + strategy_run_persisted = persist_strategy_run_state( + store=store, + state=planned_state, + now=now, + ) + except Exception as exc: + strategy_run_persisted = False + strategy_run_persistence_error = f"{type(exc).__name__}: {exc}" execution_result = execute_value_target_plan( plan=plan, market_data_port=market_data_port, @@ -212,6 +293,8 @@ def run_strategy_cycle( "dry_run_only": settings.dry_run_only, "live_trading_enabled": settings.live_trading_enabled, "session_reused": bool(getattr(client, "session_reused", False)), + "strategy_run_period": run_period, + "strategy_run_persisted": strategy_run_persisted, "portfolio": plan.get("portfolio", {}), "allocation": plan.get("allocation", {}), "execution": plan.get("execution", {}), @@ -219,6 +302,35 @@ def run_strategy_cycle( "skipped_orders": list(execution_result.skipped_orders), "action_done": execution_result.action_done, } + if strategy_run_persistence_error: + result["strategy_run_persistence_error"] = strategy_run_persistence_error + if persist_strategy_runs: + completed_state = build_strategy_run_state( + stage="DRY_RUN_COMPLETED" if settings.dry_run_only else "SUBMITTED", + account=masked_account, + strategy_profile=strategy_runtime.profile, + strategy_display_name=strategy_runtime.display_name, + run_period=run_period, + dry_run_only=settings.dry_run_only, + live_trading_enabled=settings.live_trading_enabled, + session_reused=bool(getattr(client, "session_reused", False)), + portfolio_snapshot=plan.get("portfolio", {}), + evaluation_metadata=getattr(evaluation, "metadata", None), + plan=plan, + submitted_orders=list(execution_result.submitted_orders), + skipped_orders=list(execution_result.skipped_orders), + action_done=execution_result.action_done, + now=now, + ) + try: + result["strategy_run_persisted"] = persist_strategy_run_state( + store=store, + state=completed_state, + now=now, + ) + except Exception as exc: + result["strategy_run_persisted"] = False + result["strategy_run_persistence_error"] = f"{type(exc).__name__}: {exc}" try: result["notification_sent"] = _publish_cycle_notification( result, diff --git a/application/strategy_run_persistence.py b/application/strategy_run_persistence.py new file mode 100644 index 0000000..cc2b98c --- /dev/null +++ b/application/strategy_run_persistence.py @@ -0,0 +1,195 @@ +"""Persistence helpers for strategy execution runs.""" + +from __future__ import annotations + +import math +from collections.abc import Mapping +from dataclasses import asdict, is_dataclass +from datetime import date, datetime, timezone +from typing import Any + +from application.state_persistence import GcsStateStore + +LIVE_TERMINAL_STAGES = frozenset({"SUBMITTED", "RECONCILED", "COMPLETED"}) + + +def utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def safe_key(value: str) -> str: + return "".join(ch if ch.isalnum() else "_" for ch in str(value or "")) or "unknown" + + +def _jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, bool, int)): + return value + if isinstance(value, float): + return value if math.isfinite(value) else None + if isinstance(value, (datetime, date)): + return value.isoformat() + if hasattr(value, "isoformat") and callable(value.isoformat): + try: + return value.isoformat() + except Exception: + pass + if is_dataclass(value): + return _jsonable(asdict(value)) + if isinstance(value, Mapping): + return {str(key): _jsonable(item) for key, item in value.items()} + if isinstance(value, (list, tuple, set, frozenset)): + return [_jsonable(item) for item in value] + return str(value) + + +def _coerce_month_period(value: Any) -> str | None: + if value is None: + return None + try: + parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + except ValueError: + text = str(value).strip() + if len(text) >= 7 and text[4] == "-": + return text[:7] + return None + return f"{parsed.year:04d}-{parsed.month:02d}" + + +def resolve_strategy_run_period( + *, + now: datetime, + plan: Mapping[str, Any] | None = None, + evaluation_metadata: Mapping[str, Any] | None = None, +) -> str: + execution = dict((plan or {}).get("execution") or {}) + candidates = ( + (evaluation_metadata or {}).get("snapshot_as_of"), + execution.get("signal_date"), + execution.get("effective_date"), + now, + ) + for candidate in candidates: + period = _coerce_month_period(candidate) + if period: + return period + return f"{now.year:04d}-{now.month:02d}" + + +def strategy_run_state_key(*, account: str, strategy_profile: str, run_period: str) -> str: + return ( + f"strategy-runs/{safe_key(account)}/{safe_key(strategy_profile)}/" + f"{safe_key(run_period)}/latest.json" + ) + + +def strategy_run_history_key( + *, + account: str, + strategy_profile: str, + run_period: str, + stage: str, + now: datetime, +) -> str: + stamp = now.strftime("%Y%m%dT%H%M%SZ") + return ( + f"strategy-runs/{safe_key(account)}/{safe_key(strategy_profile)}/" + f"{safe_key(run_period)}/history/{now:%Y/%m/%d}/{stamp}-{safe_key(stage)}.json" + ) + + +def read_latest_strategy_run_state( + *, + store: GcsStateStore, + account: str, + strategy_profile: str, + run_period: str, +) -> dict[str, Any] | None: + return store.read_json( + strategy_run_state_key( + account=account, + strategy_profile=strategy_profile, + run_period=run_period, + ) + ) + + +def is_duplicate_live_run(existing_state: Mapping[str, Any] | None) -> bool: + if not existing_state: + return False + if bool(existing_state.get("dry_run_only")): + return False + return str(existing_state.get("stage") or "").strip().upper() in LIVE_TERMINAL_STAGES + + +def build_strategy_run_state( + *, + stage: str, + account: str, + strategy_profile: str, + strategy_display_name: str, + run_period: str, + dry_run_only: bool, + live_trading_enabled: bool, + session_reused: bool, + portfolio_snapshot: Mapping[str, Any] | None = None, + evaluation_metadata: Mapping[str, Any] | None = None, + plan: Mapping[str, Any] | None = None, + submitted_orders: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), + skipped_orders: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), + action_done: bool = False, + error: str | None = None, + now: datetime | None = None, +) -> dict[str, Any]: + as_of = now or utcnow() + payload = { + "stage": stage, + "as_of": as_of.isoformat(), + "account": account, + "strategy_profile": strategy_profile, + "strategy_display_name": strategy_display_name, + "run_period": run_period, + "dry_run_only": dry_run_only, + "live_trading_enabled": live_trading_enabled, + "session_reused": session_reused, + "portfolio_snapshot": dict(portfolio_snapshot or {}), + "evaluation_metadata": dict(evaluation_metadata or {}), + "plan": dict(plan or {}), + "submitted_orders": list(submitted_orders), + "skipped_orders": list(skipped_orders), + "action_done": action_done, + } + if error: + payload["error"] = error + return _jsonable(payload) + + +def persist_strategy_run_state( + *, + store: GcsStateStore, + state: Mapping[str, Any], + now: datetime | None = None, +) -> bool: + as_of = now or utcnow() + account = str(state.get("account") or "unknown") + strategy_profile = str(state.get("strategy_profile") or "unknown") + run_period = str(state.get("run_period") or f"{as_of.year:04d}-{as_of.month:02d}") + stage = str(state.get("stage") or "UNKNOWN") + store.write_json( + strategy_run_state_key( + account=account, + strategy_profile=strategy_profile, + run_period=run_period, + ), + dict(state), + ) + store.write_json( + strategy_run_history_key( + account=account, + strategy_profile=strategy_profile, + run_period=run_period, + stage=stage, + now=as_of, + ), + dict(state), + ) + return True diff --git a/runtime_config_support.py b/runtime_config_support.py index a6ad523..d81760e 100644 --- a/runtime_config_support.py +++ b/runtime_config_support.py @@ -42,6 +42,7 @@ class PlatformRuntimeSettings: run_strategy_on_http: bool live_order_ack: bool max_order_notional_usd: float + persist_strategy_runs: bool = False safe_haven_cash_substitute_threshold_usd: float = DEFAULT_SAFE_HAVEN_CASH_SUBSTITUTE_THRESHOLD_USD debug_position_snapshot: bool = False income_threshold_usd: float | None = None @@ -107,6 +108,7 @@ def load_platform_runtime_settings( live_trading_enabled=resolve_bool_value(os.getenv("FIRSTRADE_ENABLE_LIVE_TRADING")), run_strategy_on_http=resolve_bool_value(os.getenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP")), live_order_ack=resolve_bool_value(os.getenv("FIRSTRADE_LIVE_ORDER_ACK")), + persist_strategy_runs=resolve_bool_value(os.getenv("FIRSTRADE_PERSIST_STRATEGY_RUNS")), max_order_notional_usd=( resolve_optional_float_env(os.environ, "FIRSTRADE_MAX_ORDER_NOTIONAL_USD") or 25.0 diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index 96c479b..0204e84 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, timezone from types import SimpleNamespace from application.firstrade_client import FirstradeCredentials @@ -28,6 +29,13 @@ def _runtime_settings() -> PlatformRuntimeSettings: ) +def _runtime_settings_with_persistence(**overrides) -> PlatformRuntimeSettings: + base = _runtime_settings() + values = dict(base.__dict__) + values.update(overrides) + return PlatformRuntimeSettings(**values) + + class FakeFirstradeClient: def __init__(self, _credentials, *, live_trading_enabled=False): self.live_trading_enabled = live_trading_enabled @@ -81,6 +89,20 @@ def evaluate(self, **inputs): ) +class FakeStateStore: + def __init__(self): + self.payloads = {} + self.writes = [] + + def read_json(self, key): + return self.payloads.get(key) + + def write_json(self, key, payload): + self.payloads[key] = dict(payload) + self.writes.append((key, dict(payload))) + return True + + def test_notification_i18n_keys_are_aligned(): assert set(I18N["zh"]) == set(I18N["en"]) assert build_translator("zh")("account_label", account="****1234") == "🆔 账户: ****1234" @@ -143,6 +165,76 @@ def fake_client_factory(*args, **kwargs): assert "🧪 Dry-run limit buy AAA: 2 shares @ $10.05" in messages[0] +def test_run_strategy_cycle_persists_strategy_run_state(monkeypatch): + store = FakeStateStore() + + monkeypatch.setattr( + "application.rebalance_service.load_strategy_runtime", + lambda *_args, **_kwargs: FakeStrategyRuntime(), + ) + + result = run_strategy_cycle( + runtime_settings=_runtime_settings_with_persistence(persist_strategy_runs=True), + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=FakeFirstradeClient, + state_store=store, + env_reader=lambda _name, default=None: default, + ) + + stages = [payload["stage"] for _key, payload in store.writes if _key.endswith("latest.json")] + assert stages == ["ORDERS_PLANNED", "DRY_RUN_COMPLETED"] + assert result["strategy_run_persisted"] is True + assert result["strategy_run_period"] + latest_payload = store.writes[-2][1] + assert latest_payload["stage"] == "DRY_RUN_COMPLETED" + assert latest_payload["submitted_orders"][0]["symbol"] == "AAA" + assert latest_payload["plan"]["allocation"]["targets"]["AAA"] == 50.0 + + +def test_run_strategy_cycle_skips_duplicate_live_monthly_run(monkeypatch): + store = FakeStateStore() + settings = _runtime_settings_with_persistence( + dry_run_only=False, + live_trading_enabled=True, + live_order_ack=True, + persist_strategy_runs=True, + ) + key = "strategy-runs/____5678/tqqq_growth_income/2026_05/latest.json" + store.payloads[key] = { + "stage": "SUBMITTED", + "as_of": "2026-05-01T01:02:03+00:00", + "dry_run_only": False, + } + observed = {} + + def fake_client_factory(*args, **kwargs): + client = FakeFirstradeClient(*args, **kwargs) + observed["client"] = client + return client + + monkeypatch.setattr( + "application.rebalance_service.load_strategy_runtime", + lambda *_args, **_kwargs: FakeStrategyRuntime(), + ) + monkeypatch.setattr( + "application.rebalance_service._utcnow", + lambda: datetime(2026, 5, 15, tzinfo=timezone.utc), + ) + + result = run_strategy_cycle( + runtime_settings=settings, + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=fake_client_factory, + state_store=store, + env_reader=lambda _name, default=None: default, + ) + + assert result["idempotency_skipped"] is True + assert result["action_done"] is False + assert observed["client"].orders == [] + assert store.writes == [] + + def test_render_cycle_summary_formats_skipped_orders_in_unified_chinese_template(): message = render_cycle_summary( {