diff --git a/README.md b/README.md index 335b67c..8006b2e 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ commit credentials. | `FIRSTRADE_ACCOUNT` | Optional | Required when multiple accounts are returned | | `STRATEGY_PROFILE` | Yes for runtime | Shared US equity strategy profile | | `FIRSTRADE_DRY_RUN_ONLY` | Optional | Defaults to `true` for platform runtime | -| `FIRSTRADE_REUSE_SESSION` | Optional | Reuse cached Firstrade session headers inside the same warm runtime instance before logging in again. Defaults to `false` | +| `FIRSTRADE_REUSE_SESSION` | Optional | Try cached Firstrade session headers before logging in again. Defaults to `false` | | `FIRSTRADE_SESSION_CACHE_TTL_SECONDS` | Optional | Max age for local session header reuse when `FIRSTRADE_REUSE_SESSION=true`. Defaults to `21600` | | `FIRSTRADE_PERSIST_SESSION_CACHE` | Optional | Persist Firstrade session headers to the configured GCS state bucket when `FIRSTRADE_REUSE_SESSION=true`. Defaults to `false` | | `FIRSTRADE_GCS_STATE_BUCKET` | Optional | GCS bucket for runtime state JSON, including persisted session cache and account funds snapshots | @@ -214,6 +214,31 @@ included in the snapshot. With the default environment, `/run` previews orders only. It can submit live orders only when every live-trading gate above is enabled. +## Runtime State And Schedulers + +The deployed Firstrade runtime keeps trading disabled unless the explicit live +order gates are changed: + +- `FIRSTRADE_DRY_RUN_ONLY=true` +- `FIRSTRADE_RUN_STRATEGY_ON_HTTP=false` +- `FIRSTRADE_ENABLE_LIVE_TRADING=false` +- `FIRSTRADE_LIVE_ORDER_ACK=false` + +For session keepalive tests, create a private GCS bucket, grant the Cloud Run +runtime service account object read/write access, and set: + +- `FIRSTRADE_REUSE_SESSION=true` +- `FIRSTRADE_PERSIST_SESSION_CACHE=true` +- `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true` +- `FIRSTRADE_GCS_STATE_BUCKET=` +- `FIRSTRADE_STATE_PREFIX=firstrade-platform` +- `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true` + +The `/session-check` scheduler can safely run more often than the strategy +scheduler because it is read-only. A typical test schedule is every 30 minutes +during US regular market hours. The route logs `session_reused=true|false` and +writes the latest masked funds snapshot plus timestamped history to GCS. + ## License And Upstream Compliance This repository is MIT licensed. The upstream `firstrade` package is also MIT diff --git a/application/account_payload_utils.py b/application/account_payload_utils.py new file mode 100644 index 0000000..dcafd01 --- /dev/null +++ b/application/account_payload_utils.py @@ -0,0 +1,76 @@ +"""Helpers for normalizing Firstrade account payload shapes.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + + +def float_or_none(value: Any) -> float | None: + if value in (None, ""): + return None + try: + return float(str(value).replace(",", "")) + except (TypeError, ValueError): + return None + + +def flatten_values(payload: Any, prefix: str = "") -> dict[str, Any]: + values: dict[str, Any] = {} + if isinstance(payload, Mapping): + for key, value in payload.items(): + child_key = f"{prefix}.{key}" if prefix else str(key) + values.update(flatten_values(value, child_key)) + elif isinstance(payload, list): + for index, value in enumerate(payload): + values.update(flatten_values(value, f"{prefix}.{index}")) + else: + values[prefix] = payload + return values + + +def first_numeric_by_keywords(payload: Any, keywords: tuple[str, ...]) -> float | None: + for key, value in flatten_values(payload).items(): + key_lower = key.lower() + if all(keyword in key_lower for keyword in keywords): + number = float_or_none(value) + if number is not None: + return number + return None + + +def selected_numeric_metrics(payload: Any, keywords: tuple[str, ...]) -> dict[str, float]: + metrics: dict[str, float] = {} + for key, value in flatten_values(payload).items(): + key_lower = key.lower() + if not any(keyword in key_lower for keyword in keywords): + continue + number = float_or_none(value) + if number is not None: + metrics[key] = number + return metrics + + +def iter_position_rows(payload: Any) -> list[Mapping[str, Any]]: + if isinstance(payload, Mapping): + for key in ("items", "positions", "data", "result"): + value = payload.get(key) + if isinstance(value, list): + return [row for row in value if isinstance(row, Mapping)] + if "symbol" in payload: + return [payload] + if isinstance(payload, list): + return [row for row in payload if isinstance(row, Mapping)] + return [] + + +def get_first(row: Mapping[str, Any], *keys: str) -> Any: + for key in keys: + if key in row: + return row[key] + lower_map = {str(key).lower(): value for key, value in row.items()} + for key in keys: + lowered = key.lower() + if lowered in lower_map: + return lower_map[lowered] + return None diff --git a/application/runtime_broker_adapters.py b/application/runtime_broker_adapters.py index c559d5d..931383f 100644 --- a/application/runtime_broker_adapters.py +++ b/application/runtime_broker_adapters.py @@ -2,13 +2,18 @@ from __future__ import annotations -from collections.abc import Callable, Mapping +from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any import pandas as pd +from application.account_payload_utils import ( + first_numeric_by_keywords, + float_or_none, + get_first, + iter_position_rows, +) from application.firstrade_client import ( FirstradeBrokerClient, StockOrderRequest, @@ -34,65 +39,6 @@ def _utcnow() -> datetime: return datetime.now(timezone.utc) -def _float_or_none(value: Any) -> float | None: - if value in (None, ""): - return None - try: - return float(str(value).replace(",", "")) - except (TypeError, ValueError): - return None - - -def _flatten_values(payload: Any, prefix: str = "") -> dict[str, Any]: - values: dict[str, Any] = {} - if isinstance(payload, Mapping): - for key, value in payload.items(): - child_key = f"{prefix}.{key}" if prefix else str(key) - values.update(_flatten_values(value, child_key)) - elif isinstance(payload, list): - for index, value in enumerate(payload): - values.update(_flatten_values(value, f"{prefix}.{index}")) - else: - values[prefix] = payload - return values - - -def _first_numeric_by_keywords(payload: Any, keywords: tuple[str, ...]) -> float | None: - flat = _flatten_values(payload) - for key, value in flat.items(): - key_lower = key.lower() - if all(keyword in key_lower for keyword in keywords): - number = _float_or_none(value) - if number is not None: - return number - return None - - -def _iter_position_rows(payload: Any) -> list[Mapping[str, Any]]: - if isinstance(payload, Mapping): - for key in ("items", "positions", "data", "result"): - value = payload.get(key) - if isinstance(value, list): - return [row for row in value if isinstance(row, Mapping)] - if "symbol" in payload: - return [payload] - if isinstance(payload, list): - return [row for row in payload if isinstance(row, Mapping)] - return [] - - -def _get_first(row: Mapping[str, Any], *keys: str) -> Any: - for key in keys: - if key in row: - return row[key] - lower_map = {str(key).lower(): value for key, value in row.items()} - for key in keys: - lowered = key.lower() - if lowered in lower_map: - return lower_map[lowered] - return None - - @dataclass(frozen=True) class FirstradeBrokerAdapters: client: FirstradeBrokerClient @@ -120,15 +66,15 @@ def load_quote(symbol: str) -> QuoteSnapshot: if cached is not None: return cached payload = self.client.get_quote(self.account, normalized) - price = _float_or_none(payload.get("last")) + price = float_or_none(payload.get("last")) if price is None: raise ValueError(f"Firstrade quote did not include a numeric last price for {normalized}.") snapshot = QuoteSnapshot( symbol=normalized, as_of=self.clock(), last_price=price, - bid_price=_float_or_none(payload.get("bid")), - ask_price=_float_or_none(payload.get("ask")), + bid_price=float_or_none(payload.get("bid")), + ask_price=float_or_none(payload.get("ask")), currency="USD", ) quote_cache[normalized] = snapshot @@ -150,7 +96,7 @@ def load_price_series(symbol: str) -> PriceSeries: PricePoint( as_of=datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc), close=close, - volume=_float_or_none(candle[5] if len(candle) > 5 else None), + volume=float_or_none(candle[5] if len(candle) > 5 else None), ) ) if not points: @@ -189,45 +135,45 @@ def build_price_history(self, market_data_port: MarketDataPort, symbol: str): def build_portfolio_snapshot(self) -> PortfolioSnapshot: balances = self.client.get_balances(self.account) positions_payload = self.client.get_positions(self.account) - rows = _iter_position_rows(positions_payload) + rows = iter_position_rows(positions_payload) positions = [] managed = set(self.strategy_symbols) for row in rows: - raw_symbol = _get_first(row, "symbol", "ticker", "security_symbol") + raw_symbol = get_first(row, "symbol", "ticker", "security_symbol") if not raw_symbol: continue symbol = self.normalize_symbol(raw_symbol) if managed and symbol not in managed: continue - quantity = _float_or_none(_get_first(row, "quantity", "shares", "qty")) + quantity = float_or_none(get_first(row, "quantity", "shares", "qty")) if quantity is None: continue positions.append( Position( symbol=symbol, quantity=quantity, - market_value=_float_or_none( - _get_first(row, "market_value", "marketValue", "value", "current_value") + market_value=float_or_none( + get_first(row, "market_value", "marketValue", "value", "current_value") ) or 0.0, - average_cost=_float_or_none( - _get_first(row, "average_cost", "avg_cost", "cost_basis", "averagePrice") + average_cost=float_or_none( + get_first(row, "average_cost", "avg_cost", "cost_basis", "averagePrice") ), currency="USD", account_id=mask_account_id(self.account), ) ) total_equity = ( - _first_numeric_by_keywords(balances, ("total", "value")) - or _first_numeric_by_keywords(balances, ("equity",)) + first_numeric_by_keywords(balances, ("total", "value")) + or first_numeric_by_keywords(balances, ("equity",)) or sum(position.market_value for position in positions) ) return PortfolioSnapshot( as_of=self.clock(), total_equity=float(total_equity or 0.0), - buying_power=_first_numeric_by_keywords(balances, ("buying",)) - or _first_numeric_by_keywords(balances, ("bp",)), - cash_balance=_first_numeric_by_keywords(balances, ("cash",)), + buying_power=first_numeric_by_keywords(balances, ("buying",)) + or first_numeric_by_keywords(balances, ("bp",)), + cash_balance=first_numeric_by_keywords(balances, ("cash",)), positions=tuple(positions), metadata={ "broker": "firstrade", diff --git a/application/session_check_service.py b/application/session_check_service.py index acd5443..21bb167 100644 --- a/application/session_check_service.py +++ b/application/session_check_service.py @@ -7,6 +7,12 @@ from datetime import datetime, timezone from typing import Any +from application.account_payload_utils import ( + float_or_none, + get_first, + iter_position_rows, + selected_numeric_metrics, +) from application.firstrade_client import ( FirstradeBrokerClient, FirstradeCredentials, @@ -40,78 +46,18 @@ def _safe_key(value: str) -> str: return "".join(ch if ch.isalnum() else "_" for ch in str(value or "")) or "unknown" -def _float_or_none(value: Any) -> float | None: - if value in (None, ""): - return None - try: - return float(str(value).replace(",", "")) - except (TypeError, ValueError): - return None - - -def _flatten_values(payload: Any, prefix: str = "") -> dict[str, Any]: - values: dict[str, Any] = {} - if isinstance(payload, Mapping): - for key, value in payload.items(): - child_key = f"{prefix}.{key}" if prefix else str(key) - values.update(_flatten_values(value, child_key)) - elif isinstance(payload, list): - for index, value in enumerate(payload): - values.update(_flatten_values(value, f"{prefix}.{index}")) - else: - values[prefix] = payload - return values - - -def _selected_balance_metrics(payload: Any) -> dict[str, float]: - metrics: dict[str, float] = {} - for key, value in _flatten_values(payload).items(): - lowered = key.lower() - if not any(keyword in lowered for keyword in BALANCE_KEYWORDS): - continue - number = _float_or_none(value) - if number is not None: - metrics[key] = number - return metrics - - -def _iter_position_rows(payload: Any) -> list[Mapping[str, Any]]: - if isinstance(payload, Mapping): - for key in ("items", "positions", "data", "result"): - value = payload.get(key) - if isinstance(value, list): - return [row for row in value if isinstance(row, Mapping)] - if "symbol" in payload: - return [payload] - if isinstance(payload, list): - return [row for row in payload if isinstance(row, Mapping)] - return [] - - -def _get_first(row: Mapping[str, Any], *keys: str) -> Any: - for key in keys: - if key in row: - return row[key] - lower_map = {str(key).lower(): value for key, value in row.items()} - for key in keys: - lowered = key.lower() - if lowered in lower_map: - return lower_map[lowered] - return None - - def _compact_positions(payload: Any) -> list[dict[str, Any]]: positions: list[dict[str, Any]] = [] - for row in _iter_position_rows(payload): - symbol = _get_first(row, "symbol", "ticker", "security_symbol") + for row in iter_position_rows(payload): + symbol = get_first(row, "symbol", "ticker", "security_symbol") if not symbol: continue positions.append( { "symbol": str(symbol).strip().upper(), - "quantity": _float_or_none(_get_first(row, "quantity", "shares", "qty")), - "market_value": _float_or_none( - _get_first(row, "market_value", "marketValue", "value", "current_value") + "quantity": float_or_none(get_first(row, "quantity", "shares", "qty")), + "market_value": float_or_none( + get_first(row, "market_value", "marketValue", "value", "current_value") ), } ) @@ -133,7 +79,7 @@ def build_account_funds_snapshot( "account": mask_account_id(account), "session_reused": session_reused, "account_summaries": account_summaries, - "balance_metrics": _selected_balance_metrics(balances), + "balance_metrics": selected_numeric_metrics(balances, BALANCE_KEYWORDS), } if positions_payload is not None: snapshot["positions"] = _compact_positions(positions_payload)