From e1ec144af6af080639bf92f15f144d6b1b76c0cd Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Sat, 23 May 2026 14:27:41 +0800 Subject: [PATCH] Add Firstrade session check persistence --- .env.example | 8 + .github/workflows/sync-cloud-run-env.yml | 12 ++ README.md | 28 ++- application/firstrade_client.py | 87 +++++++-- application/session_check_service.py | 217 +++++++++++++++++++++++ application/state_persistence.py | 119 +++++++++++++ main.py | 26 +++ pyproject.toml | 1 + tests/test_firstrade_client.py | 58 ++++++ tests/test_request_handling.py | 29 +++ tests/test_session_check_service.py | 93 ++++++++++ 11 files changed, 662 insertions(+), 16 deletions(-) create mode 100644 application/session_check_service.py create mode 100644 application/state_persistence.py create mode 100644 tests/test_session_check_service.py diff --git a/.env.example b/.env.example index bc43c23..9aaa327 100644 --- a/.env.example +++ b/.env.example @@ -24,8 +24,16 @@ GLOBAL_TELEGRAM_CHAT_ID= # Runtime safety controls. FIRSTRADE_COOKIE_DIR=.runtime/firstrade-cookies +FIRSTRADE_REUSE_SESSION=false +FIRSTRADE_SESSION_CACHE_TTL_SECONDS=21600 +FIRSTRADE_PERSIST_SESSION_CACHE=false +FIRSTRADE_GCS_STATE_BUCKET= +FIRSTRADE_STATE_PREFIX=firstrade-platform +FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=false FIRSTRADE_ENABLE_LIVE_TRADING=false FIRSTRADE_RUN_SMOKE_ON_HTTP=false +FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=false +FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS=false FIRSTRADE_RUN_STRATEGY_ON_HTTP=false FIRSTRADE_LIVE_ORDER_ACK=false FIRSTRADE_MAX_ORDER_NOTIONAL_USD=25 diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index 39d3e57..2ab3097 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -50,6 +50,12 @@ jobs: FIRSTRADE_SMOKE_SYMBOL: ${{ vars.FIRSTRADE_SMOKE_SYMBOL }} FIRSTRADE_FEATURE_SNAPSHOT_PATH: ${{ vars.FIRSTRADE_FEATURE_SNAPSHOT_PATH }} FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH: ${{ vars.FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH }} + FIRSTRADE_GCS_STATE_BUCKET: ${{ vars.FIRSTRADE_GCS_STATE_BUCKET }} + FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT: ${{ vars.FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT }} + FIRSTRADE_PERSIST_SESSION_CACHE: ${{ vars.FIRSTRADE_PERSIST_SESSION_CACHE }} + FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP: ${{ vars.FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP }} + FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS: ${{ vars.FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS }} + FIRSTRADE_STATE_PREFIX: ${{ vars.FIRSTRADE_STATE_PREFIX }} FIRSTRADE_STRATEGY_CONFIG_PATH: ${{ vars.FIRSTRADE_STRATEGY_CONFIG_PATH }} FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON: ${{ vars.FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON }} FIRSTRADE_TECH_RUNTIME_EXECUTION_WINDOW_TRADING_DAYS: ${{ vars.FIRSTRADE_TECH_RUNTIME_EXECUTION_WINDOW_TRADING_DAYS }} @@ -394,8 +400,14 @@ jobs: add_optional_env FIRSTRADE_DRY_RUN_ONLY add_optional_env FIRSTRADE_REUSE_SESSION add_optional_env FIRSTRADE_SESSION_CACHE_TTL_SECONDS + add_optional_env FIRSTRADE_PERSIST_SESSION_CACHE + add_optional_env FIRSTRADE_GCS_STATE_BUCKET + add_optional_env FIRSTRADE_STATE_PREFIX + add_optional_env FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT add_optional_env FIRSTRADE_ENABLE_LIVE_TRADING add_optional_env FIRSTRADE_RUN_SMOKE_ON_HTTP + add_optional_env FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP + add_optional_env FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS add_optional_env FIRSTRADE_RUN_STRATEGY_ON_HTTP add_optional_env FIRSTRADE_LIVE_ORDER_ACK add_optional_env FIRSTRADE_MAX_ORDER_NOTIONAL_USD diff --git a/README.md b/README.md index d1dad65..335b67c 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,10 @@ commit credentials. | `FIRSTRADE_DRY_RUN_ONLY` | Optional | Defaults to `true` for platform runtime | | `FIRSTRADE_REUSE_SESSION` | Optional | Reuse cached Firstrade session headers inside the same warm runtime instance before logging in again. Defaults to `false` | | `FIRSTRADE_SESSION_CACHE_TTL_SECONDS` | Optional | Max age for local session header reuse when `FIRSTRADE_REUSE_SESSION=true`. Defaults to `21600` | +| `FIRSTRADE_PERSIST_SESSION_CACHE` | Optional | Persist Firstrade session headers to the configured GCS state bucket when `FIRSTRADE_REUSE_SESSION=true`. Defaults to `false` | +| `FIRSTRADE_GCS_STATE_BUCKET` | Optional | GCS bucket for runtime state JSON, including persisted session cache and account funds snapshots | +| `FIRSTRADE_STATE_PREFIX` | Optional | Object prefix within `FIRSTRADE_GCS_STATE_BUCKET`, default `firstrade-platform` | +| `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT` | Optional | Persist compact masked account funds snapshots from `/session-check`. Defaults to `false` | | `ACCOUNT_PREFIX` | Optional | Alert/log prefix, default `FIRSTRADE` | | `ACCOUNT_REGION` | Optional | Runtime account scope, default `US` | | `NOTIFY_LANG` | Optional | Notification language, `en` or `zh` | @@ -81,6 +85,8 @@ commit credentials. | `FIRSTRADE_COOKIE_DIR` | Optional | Cookie cache directory, default `.runtime/firstrade-cookies` | | `FIRSTRADE_ENABLE_LIVE_TRADING` | Optional | Must be `true` before any live order can be submitted | | `FIRSTRADE_RUN_SMOKE_ON_HTTP` | Optional | Must be `true` before `/smoke` performs a real login/quote | +| `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP` | Optional | Must be `true` before `/session-check` performs a read-only login/session/account-state check | +| `FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS` | Optional | Include compact symbol/quantity/market-value positions in `/session-check` funds snapshots. Defaults to `false` | | `FIRSTRADE_RUN_STRATEGY_ON_HTTP` | Optional | Must be `true` before `/run` performs strategy evaluation and order routing | | `FIRSTRADE_LIVE_ORDER_ACK` | Optional | Must be `true` before `/run` can submit live orders | | `FIRSTRADE_MAX_ORDER_NOTIONAL_USD` | Optional | Single-order cap for strategy-generated orders, default `25` | @@ -175,11 +181,21 @@ The strategy execution service uses whole-share limit orders for generated strategy orders. If the notional cap is below the current price of a target symbol, that order is skipped instead of being enlarged. -`FIRSTRADE_REUSE_SESSION=true` reduces repeated login attempts while the same -Cloud Run instance stays warm. It stores the current session headers only in the -container-local cookie directory and tries that session before calling Firstrade -login again. A cold start, new revision, expired session, or broker-side -invalidation still falls back to a fresh login. +`FIRSTRADE_REUSE_SESSION=true` reduces repeated login attempts by trying cached +session headers before calling Firstrade login again. By default this cache is +container-local. When `FIRSTRADE_PERSIST_SESSION_CACHE=true` and +`FIRSTRADE_GCS_STATE_BUCKET` is set, the same cache is also written to GCS so a +cold start can try the last known session first. Expired sessions, new broker +sessions from another device, or broker-side invalidation still fall back to a +fresh login. + +`/session-check` is a read-only route for session keepalive experiments and +account-state persistence. It connects to Firstrade, selects the account, reads +balances, optionally reads positions, and returns a compact masked funds +snapshot. With `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true`, it writes the snapshot +to `accounts//funds/latest.json` plus a timestamped history path +under the configured GCS prefix. Raw account IDs and login secrets are not +included in the snapshot. ## Cloud Run Shape @@ -190,6 +206,8 @@ invalidation still falls back to a fresh login. - `/probe` health metadata only - `/profiles` shared US equity strategy matrix - `/smoke` login + quote only when `FIRSTRADE_RUN_SMOKE_ON_HTTP=true` +- `/session-check` read-only session/account-state check only when + `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true` - `/run` strategy evaluation + guarded order routing only when `FIRSTRADE_RUN_STRATEGY_ON_HTTP=true` diff --git a/application/firstrade_client.py b/application/firstrade_client.py index 16026c7..c802c02 100644 --- a/application/firstrade_client.py +++ b/application/firstrade_client.py @@ -13,6 +13,8 @@ from time import time from typing import Any, Callable +from application.state_persistence import GcsStateStore + class FirstradePlatformError(RuntimeError): """Base error for platform integration failures.""" @@ -42,6 +44,9 @@ class FirstradeCredentials: cookie_dir: str = ".runtime/firstrade-cookies" reuse_session: bool = False session_cache_ttl_seconds: int = 21_600 + persist_session_cache: bool = False + gcs_state_bucket: str = "" + gcs_state_prefix: str = "firstrade-platform" debug: bool = False @classmethod @@ -63,6 +68,13 @@ def from_env(cls, env: Callable[[str, str | None], str | None] = os.getenv) -> " env("FIRSTRADE_SESSION_CACHE_TTL_SECONDS", "21600"), default=21_600, ), + persist_session_cache=(env("FIRSTRADE_PERSIST_SESSION_CACHE", "false") or "") + .strip() + .lower() + == "true", + gcs_state_bucket=(env("FIRSTRADE_GCS_STATE_BUCKET", "") or "").strip(), + gcs_state_prefix=env("FIRSTRADE_STATE_PREFIX", "firstrade-platform") + or "firstrade-platform", debug=(env("FIRSTRADE_DEBUG", "false") or "").lower() == "true", ) @@ -195,6 +207,7 @@ def __init__( order_factory: Callable[[Any], Any] | None = None, quote_factory: Callable[[Any, str, str], Any] | None = None, ohlc_factory: Callable[[Any, str, str], Any] | None = None, + session_cache_store: GcsStateStore | None = None, ) -> None: self.credentials = credentials self.live_trading_enabled = live_trading_enabled @@ -203,6 +216,7 @@ def __init__( self._order_factory = order_factory self._quote_factory = quote_factory self._ohlc_factory = ohlc_factory + self._session_cache_store = session_cache_store self.session: Any | None = None self.account_data: Any | None = None self.session_reused = False @@ -261,19 +275,31 @@ def _load_session_cache(self, cookie_dir: Path) -> dict[str, Any] | None: try: payload = json.loads(path.read_text()) except (OSError, json.JSONDecodeError): + payload = None + if self._is_valid_session_cache_payload(payload): + return payload + store = self._session_state_store() + if store is None: return None - if not isinstance(payload, dict): + try: + persisted_payload = store.read_json(self._session_state_key()) + except Exception: return None + if self._is_valid_session_cache_payload(persisted_payload): + return persisted_payload + return None + + def _is_valid_session_cache_payload(self, payload: Any) -> bool: + if not isinstance(payload, dict): + return False try: saved_at = float(payload.get("saved_at") or 0.0) except (TypeError, ValueError): - return None + return False ttl = max(1, int(self.credentials.session_cache_ttl_seconds or 1)) if saved_at <= 0.0 or (time() - saved_at) > ttl: - return None - if not payload.get("ftat") or not payload.get("sid"): - return None - return payload + return False + return bool(payload.get("ftat") and payload.get("sid")) def _try_cached_session( self, @@ -288,10 +314,18 @@ def _try_cached_session( try: from firstrade import urls - session.session.headers.update(urls.session_headers()) - session.session.headers["access-token"] = urls.access_token() - session.session.headers["ftat"] = str(payload["ftat"]) - session.session.headers["sid"] = str(payload["sid"]) + if hasattr(session, "build_session_from_tokens"): + session.build_session_from_tokens(payload) + else: + session.session.headers.update(urls.session_headers()) + session.session.headers["access-token"] = str( + payload.get("access-token") or urls.access_token() + ) + session.session.headers["ftat"] = str(payload["ftat"]) + session.session.headers["sid"] = str(payload["sid"]) + cookies = payload.get("cookies") + if isinstance(cookies, dict) and hasattr(session.session, "cookies"): + session.session.cookies.update(cookies) account_data = account_data_factory(session) except Exception: try: @@ -302,15 +336,25 @@ def _try_cached_session( self.session = session self.account_data = account_data self.session_reused = True + self._save_session_cache(cookie_dir) return True def _save_session_cache(self, cookie_dir: Path) -> None: if not self.credentials.reuse_session or self.session is None: return - headers = getattr(getattr(self.session, "session", None), "headers", {}) or {} + session_obj = getattr(self.session, "session", None) + headers = getattr(session_obj, "headers", {}) or {} + cookies = {} + if hasattr(session_obj, "cookies"): + try: + cookies = session_obj.cookies.get_dict() + except Exception: + cookies = {} payload = { + "access-token": headers.get("access-token"), "ftat": headers.get("ftat"), "sid": headers.get("sid"), + "cookies": cookies, "saved_at": time(), } if not payload["ftat"] or not payload["sid"]: @@ -318,8 +362,29 @@ def _save_session_cache(self, cookie_dir: Path) -> None: try: self._session_cache_path(cookie_dir).write_text(json.dumps(payload), encoding="utf-8") except OSError: + pass + store = self._session_state_store() + if store is None: + return + try: + store.write_json(self._session_state_key(), payload) + except Exception: return + def _session_state_store(self) -> GcsStateStore | None: + if self._session_cache_store is not None: + return self._session_cache_store + if not self.credentials.persist_session_cache or not self.credentials.gcs_state_bucket: + return None + return GcsStateStore( + bucket=self.credentials.gcs_state_bucket, + prefix=self.credentials.gcs_state_prefix, + ) + + def _session_state_key(self) -> str: + safe_username = "".join(ch for ch in self.credentials.username if ch.isalnum() or ch in ("-", "_")) + return f"sessions/{safe_username or 'unknown'}/latest.json" + def require_connected(self) -> tuple[Any, Any]: if self.session is None or self.account_data is None: raise FirstradePlatformError("Firstrade client is not connected.") diff --git a/application/session_check_service.py b/application/session_check_service.py new file mode 100644 index 0000000..acd5443 --- /dev/null +++ b/application/session_check_service.py @@ -0,0 +1,217 @@ +"""Read-only Firstrade session and account-state checks.""" + +from __future__ import annotations + +import os +from collections.abc import Callable, Mapping +from datetime import datetime, timezone +from typing import Any + +from application.firstrade_client import ( + FirstradeBrokerClient, + FirstradeCredentials, + is_live_trading_enabled, + mask_account_id, +) +from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env + +BALANCE_KEYWORDS = ( + "cash", + "available", + "avail", + "withdraw", + "buying", + "bp", + "equity", + "value", + "margin", +) + + +def _flag(name: str, default: str = "false", env: Callable[[str, str | None], str | None] = os.getenv) -> bool: + return (env(name, default) or "").strip().lower() == "true" + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _safe_key(value: str) -> str: + return "".join(ch if ch.isalnum() else "_" for ch in str(value or "")) or "unknown" + + +def _float_or_none(value: Any) -> float | None: + if value in (None, ""): + return None + try: + return float(str(value).replace(",", "")) + except (TypeError, ValueError): + return None + + +def _flatten_values(payload: Any, prefix: str = "") -> dict[str, Any]: + values: dict[str, Any] = {} + if isinstance(payload, Mapping): + for key, value in payload.items(): + child_key = f"{prefix}.{key}" if prefix else str(key) + values.update(_flatten_values(value, child_key)) + elif isinstance(payload, list): + for index, value in enumerate(payload): + values.update(_flatten_values(value, f"{prefix}.{index}")) + else: + values[prefix] = payload + return values + + +def _selected_balance_metrics(payload: Any) -> dict[str, float]: + metrics: dict[str, float] = {} + for key, value in _flatten_values(payload).items(): + lowered = key.lower() + if not any(keyword in lowered for keyword in BALANCE_KEYWORDS): + continue + number = _float_or_none(value) + if number is not None: + metrics[key] = number + return metrics + + +def _iter_position_rows(payload: Any) -> list[Mapping[str, Any]]: + if isinstance(payload, Mapping): + for key in ("items", "positions", "data", "result"): + value = payload.get(key) + if isinstance(value, list): + return [row for row in value if isinstance(row, Mapping)] + if "symbol" in payload: + return [payload] + if isinstance(payload, list): + return [row for row in payload if isinstance(row, Mapping)] + return [] + + +def _get_first(row: Mapping[str, Any], *keys: str) -> Any: + for key in keys: + if key in row: + return row[key] + lower_map = {str(key).lower(): value for key, value in row.items()} + for key in keys: + lowered = key.lower() + if lowered in lower_map: + return lower_map[lowered] + return None + + +def _compact_positions(payload: Any) -> list[dict[str, Any]]: + positions: list[dict[str, Any]] = [] + for row in _iter_position_rows(payload): + symbol = _get_first(row, "symbol", "ticker", "security_symbol") + if not symbol: + continue + positions.append( + { + "symbol": str(symbol).strip().upper(), + "quantity": _float_or_none(_get_first(row, "quantity", "shares", "qty")), + "market_value": _float_or_none( + _get_first(row, "market_value", "marketValue", "value", "current_value") + ), + } + ) + return positions + + +def build_account_funds_snapshot( + *, + account: str, + account_summaries: list[dict[str, Any]], + balances: Mapping[str, Any], + positions_payload: Any | None, + session_reused: bool, + now: datetime | None = None, +) -> dict[str, Any]: + as_of = now or _utcnow() + snapshot: dict[str, Any] = { + "as_of": as_of.isoformat(), + "account": mask_account_id(account), + "session_reused": session_reused, + "account_summaries": account_summaries, + "balance_metrics": _selected_balance_metrics(balances), + } + if positions_payload is not None: + snapshot["positions"] = _compact_positions(positions_payload) + return snapshot + + +def persist_funds_snapshot( + *, + store: GcsStateStore, + snapshot: dict[str, Any], + now: datetime | None = None, +) -> bool: + as_of = now or _utcnow() + account_key = _safe_key(str(snapshot.get("account") or "unknown")) + stamp = as_of.strftime("%Y%m%dT%H%M%SZ") + store.write_json(f"accounts/{account_key}/funds/latest.json", snapshot) + store.write_json(f"accounts/{account_key}/funds/history/{as_of:%Y/%m/%d}/{stamp}.json", snapshot) + return True + + +def run_session_check( + *, + credentials: FirstradeCredentials | None = None, + client_factory: Callable[..., FirstradeBrokerClient] = FirstradeBrokerClient, + state_store: GcsStateStore | None = None, + env_reader: Callable[[str, str | None], str | None] = os.getenv, + now: datetime | None = None, +) -> dict[str, Any]: + resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader) + store = state_store or build_gcs_state_store_from_env(env_reader) + client = client_factory( + resolved_credentials, + live_trading_enabled=is_live_trading_enabled(env_reader), + ).connect() + session_reused = bool(getattr(client, "session_reused", False)) + account = client.select_account(env_reader("FIRSTRADE_ACCOUNT", "") or None) + account_summaries = client.list_account_summaries() + balances = client.get_balances(account) + positions_payload = ( + client.get_positions(account) + if _flag("FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS", "false", env_reader) + else None + ) + snapshot = build_account_funds_snapshot( + account=account, + account_summaries=account_summaries, + balances=balances, + positions_payload=positions_payload, + session_reused=session_reused, + now=now, + ) + snapshot_persisted = False + snapshot_error = None + if store is not None and _flag("FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT", "false", env_reader): + try: + snapshot_persisted = persist_funds_snapshot( + store=store, + snapshot=snapshot, + now=now, + ) + except Exception as exc: + snapshot_error = f"{type(exc).__name__}: {exc}" + + print( + "Firstrade session-check " + f"session_reused={session_reused} " + f"account={mask_account_id(account)} " + f"snapshot_persisted={snapshot_persisted}", + flush=True, + ) + result: dict[str, Any] = { + "ok": True, + "api_kind": "unofficial-reverse-engineered", + "account": mask_account_id(account), + "session_reused": session_reused, + "funds_snapshot": snapshot, + "snapshot_persisted": snapshot_persisted, + } + if snapshot_error: + result["snapshot_error"] = snapshot_error + return result diff --git a/application/state_persistence.py b/application/state_persistence.py new file mode 100644 index 0000000..36fa480 --- /dev/null +++ b/application/state_persistence.py @@ -0,0 +1,119 @@ +"""Lightweight runtime state persistence. + +The service intentionally avoids heavyweight Google client libraries. Cloud Run +already has a metadata server token, and the JSON APIs are enough for small +session and account-state payloads. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from typing import Any, Callable +from urllib.parse import quote, urlencode + +import requests + +METADATA_TOKEN_URL = ( + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" +) + + +class StatePersistenceError(RuntimeError): + """Raised when the configured state store cannot read or write.""" + + +def _normalize_prefix(prefix: str | None) -> str: + return str(prefix or "").strip("/") + + +@dataclass(frozen=True) +class GcsStateStore: + bucket: str + prefix: str = "firstrade-platform" + timeout_seconds: float = 10.0 + token_getter: Callable[[], str] | None = None + + def __post_init__(self) -> None: + if not str(self.bucket or "").strip(): + raise ValueError("GCS state bucket must be non-empty.") + + def read_json(self, key: str) -> dict[str, Any] | None: + object_name = self._object_name(key) + url = ( + f"https://storage.googleapis.com/storage/v1/b/{quote(self.bucket, safe='')}/o/" + f"{quote(object_name, safe='')}?alt=media" + ) + response = requests.get( + url, + headers=self._auth_headers(), + timeout=self.timeout_seconds, + ) + if response.status_code == 404: + return None + if response.status_code != 200: + raise StatePersistenceError( + f"GCS read failed for {object_name}: HTTP {response.status_code}" + ) + payload = response.json() + if not isinstance(payload, dict): + raise StatePersistenceError(f"GCS payload for {object_name} is not a JSON object.") + return payload + + def write_json(self, key: str, payload: dict[str, Any]) -> bool: + object_name = self._object_name(key) + query = urlencode({"uploadType": "media", "name": object_name}) + url = f"https://storage.googleapis.com/upload/storage/v1/b/{quote(self.bucket, safe='')}/o?{query}" + response = requests.post( + url, + headers={ + **self._auth_headers(), + "Content-Type": "application/json; charset=utf-8", + }, + data=json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8"), + timeout=self.timeout_seconds, + ) + if response.status_code not in (200, 201): + raise StatePersistenceError( + f"GCS write failed for {object_name}: HTTP {response.status_code}" + ) + return True + + def _object_name(self, key: str) -> str: + clean_key = str(key or "").strip("/") + if not clean_key: + raise ValueError("State key must be non-empty.") + prefix = _normalize_prefix(self.prefix) + return f"{prefix}/{clean_key}" if prefix else clean_key + + def _auth_headers(self) -> dict[str, str]: + token = self.token_getter() if self.token_getter is not None else _metadata_access_token() + return {"Authorization": f"Bearer {token}"} + + +def _metadata_access_token() -> str: + response = requests.get( + METADATA_TOKEN_URL, + headers={"Metadata-Flavor": "Google"}, + timeout=3.0, + ) + if response.status_code != 200: + raise StatePersistenceError(f"Metadata token request failed: HTTP {response.status_code}") + payload = response.json() + token = payload.get("access_token") + if not token: + raise StatePersistenceError("Metadata token response did not include access_token.") + return str(token) + + +def build_gcs_state_store_from_env( + env: Callable[[str, str | None], str | None] = os.getenv, +) -> GcsStateStore | None: + bucket = (env("FIRSTRADE_GCS_STATE_BUCKET", "") or "").strip() + if not bucket: + return None + return GcsStateStore( + bucket=bucket, + prefix=env("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform", + ) diff --git a/main.py b/main.py index b9e7c77..9a5686c 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ mask_account_id, ) from application.rebalance_service import run_strategy_cycle +from application.session_check_service import run_session_check from strategy_registry import get_platform_profile_status_matrix app = Flask(__name__) @@ -33,6 +34,7 @@ def health(): "strategy_domain": "us_equity", "live_trading_enabled": is_live_trading_enabled(), "smoke_on_http": _flag("FIRSTRADE_RUN_SMOKE_ON_HTTP"), + "session_check_on_http": _flag("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP"), "strategy_run_on_http": _flag("FIRSTRADE_RUN_STRATEGY_ON_HTTP"), "as_of": datetime.now(timezone.utc).isoformat(), } @@ -82,6 +84,30 @@ def smoke(): return jsonify({"ok": False, "error": str(exc)}), 500 +@app.post("/session-check") +@app.get("/session-check") +def session_check(): + if not _flag("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP"): + return ( + jsonify( + { + "ok": False, + "error": ( + "Set FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true to allow HTTP-triggered " + "read-only Firstrade session and account-state checks." + ), + } + ), + 403, + ) + try: + return jsonify(run_session_check()) + except FirstradePlatformError as exc: + return jsonify({"ok": False, "error": str(exc)}), 500 + except Exception as exc: + return jsonify({"ok": False, "error": f"{type(exc).__name__}: {exc}"}), 500 + + @app.post("/") @app.post("/run") @app.get("/run") diff --git a/pyproject.toml b/pyproject.toml index 7910b4e..9bacc2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "firstrade==0.0.38", "quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@663e80be60b0da80e81513b711c579d221a2111d", "us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@4d5cd0f5dc389edebc648028202fd116934ca325", + "requests", ] [tool.pytest.ini_options] diff --git a/tests/test_firstrade_client.py b/tests/test_firstrade_client.py index 4d3b5ab..399a356 100644 --- a/tests/test_firstrade_client.py +++ b/tests/test_firstrade_client.py @@ -177,3 +177,61 @@ def test_client_reuses_cached_session_without_logging_in_again(tmp_path): assert second_client.session_reused is True assert ReusableFakeSession.login_calls == 1 + + +class FakeStateStore: + def __init__(self): + self.payloads = {} + self.writes = 0 + + def read_json(self, key): + return self.payloads.get(key) + + def write_json(self, key, payload): + self.payloads[key] = dict(payload) + self.writes += 1 + return True + + +def test_client_reuses_persisted_session_cache_when_local_cache_is_missing(tmp_path): + ReusableFakeSession.login_calls = 0 + store = FakeStateStore() + credentials = FirstradeCredentials( + username="user", + password="pass", + cookie_dir=str(tmp_path / "first"), + reuse_session=True, + session_cache_ttl_seconds=3600, + persist_session_cache=True, + ) + + first_client = FirstradeBrokerClient( + credentials, + session_factory=ReusableFakeSession, + account_data_factory=HeaderCheckingAccountData, + order_factory=FakeOrder, + session_cache_store=store, + ).connect() + assert first_client.session_reused is False + assert ReusableFakeSession.login_calls == 1 + assert store.writes == 1 + + second_credentials = FirstradeCredentials( + username="user", + password="pass", + cookie_dir=str(tmp_path / "second"), + reuse_session=True, + session_cache_ttl_seconds=3600, + persist_session_cache=True, + ) + second_client = FirstradeBrokerClient( + second_credentials, + session_factory=ReusableFakeSession, + account_data_factory=HeaderCheckingAccountData, + order_factory=FakeOrder, + session_cache_store=store, + ).connect() + + assert second_client.session_reused is True + assert ReusableFakeSession.login_calls == 1 + assert store.writes == 2 diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 86734d0..4a9dc6b 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -28,6 +28,35 @@ def test_run_endpoint_calls_strategy_cycle_when_gate_enabled(monkeypatch): assert response.get_json() == {"ok": True, "action_done": False} +def test_session_check_endpoint_is_disabled_without_explicit_http_gate(monkeypatch): + monkeypatch.delenv("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP", raising=False) + client = main.app.test_client() + + response = client.get("/session-check") + + assert response.status_code == 403 + assert response.get_json()["ok"] is False + + +def test_session_check_endpoint_calls_service_when_gate_enabled(monkeypatch): + monkeypatch.setenv("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP", "true") + monkeypatch.setattr( + main, + "run_session_check", + lambda: {"ok": True, "session_reused": True, "snapshot_persisted": True}, + ) + client = main.app.test_client() + + response = client.post("/session-check") + + assert response.status_code == 200 + assert response.get_json() == { + "ok": True, + "session_reused": True, + "snapshot_persisted": True, + } + + def test_root_post_calls_strategy_cycle_when_gate_enabled(monkeypatch): monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true") monkeypatch.setattr(main, "run_strategy_cycle", lambda: {"ok": True, "action_done": False}) diff --git a/tests/test_session_check_service.py b/tests/test_session_check_service.py new file mode 100644 index 0000000..7fcc5fe --- /dev/null +++ b/tests/test_session_check_service.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +from application.firstrade_client import FirstradeCredentials +from application.session_check_service import build_account_funds_snapshot, run_session_check + + +class FakeClient: + def __init__(self, _credentials, *, live_trading_enabled=False): + self.live_trading_enabled = live_trading_enabled + self.session_reused = True + + def connect(self): + return self + + def select_account(self, requested_account=None): + return requested_account or "12345678" + + def list_account_summaries(self): + return [{"account": "****5678", "total_value": "100.00"}] + + def get_balances(self, _account): + return { + "result": { + "total_account_value": "100.00", + "cash_balance": "40.00", + "margin_buying_power": "80.00", + "unrelated": "ignored", + } + } + + def get_positions(self, _account): + return { + "items": [ + {"symbol": "SPY", "quantity": "2", "market_value": "900.50"}, + {"ticker": "QQQ", "qty": "1", "value": "450.25"}, + ] + } + + +class FakeStateStore: + def __init__(self): + self.writes = [] + + def write_json(self, key, payload): + self.writes.append((key, payload)) + return True + + +def test_build_account_funds_snapshot_masks_account_and_compacts_values(): + snapshot = build_account_funds_snapshot( + account="12345678", + account_summaries=[{"account": "****5678", "total_value": "100.00"}], + balances={"total_account_value": "100.00", "cash_balance": "40.00", "note": "x"}, + positions_payload={"items": [{"symbol": "SPY", "quantity": "2", "market_value": "900.50"}]}, + session_reused=True, + now=datetime(2026, 5, 23, 1, 2, 3, tzinfo=timezone.utc), + ) + + assert snapshot["account"] == "****5678" + assert snapshot["session_reused"] is True + assert snapshot["balance_metrics"] == { + "total_account_value": 100.0, + "cash_balance": 40.0, + } + assert snapshot["positions"] == [ + {"symbol": "SPY", "quantity": 2.0, "market_value": 900.5} + ] + + +def test_run_session_check_persists_funds_snapshot_when_enabled(): + store = FakeStateStore() + now = datetime(2026, 5, 23, 1, 2, 3, tzinfo=timezone.utc) + + result = run_session_check( + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=FakeClient, + state_store=store, + env_reader=lambda name, default=None: { + "FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT": "true", + "FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS": "true", + }.get(name, default), + now=now, + ) + + assert result["ok"] is True + assert result["session_reused"] is True + assert result["snapshot_persisted"] is True + assert len(store.writes) == 2 + assert store.writes[0][0] == "accounts/____5678/funds/latest.json" + assert store.writes[1][0] == "accounts/____5678/funds/history/2026/05/23/20260523T010203Z.json" + assert store.writes[0][1]["positions"][0]["symbol"] == "SPY"