Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FIRSTRADE_PERSIST_SESSION_CACHE=false
FIRSTRADE_GCS_STATE_BUCKET=
FIRSTRADE_STATE_PREFIX=firstrade-platform
FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=false
FIRSTRADE_PERSIST_STRATEGY_RUNS=false
FIRSTRADE_ENABLE_LIVE_TRADING=false
FIRSTRADE_RUN_SMOKE_ON_HTTP=false
FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=false
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/sync-cloud-run-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH: ${{ vars.FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH }}
FIRSTRADE_GCS_STATE_BUCKET: ${{ vars.FIRSTRADE_GCS_STATE_BUCKET }}
FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT: ${{ vars.FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT }}
FIRSTRADE_PERSIST_STRATEGY_RUNS: ${{ vars.FIRSTRADE_PERSIST_STRATEGY_RUNS }}
FIRSTRADE_PERSIST_SESSION_CACHE: ${{ vars.FIRSTRADE_PERSIST_SESSION_CACHE }}
FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP: ${{ vars.FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP }}
FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS: ${{ vars.FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS }}
Expand Down Expand Up @@ -404,6 +405,7 @@ jobs:
add_optional_env FIRSTRADE_GCS_STATE_BUCKET
add_optional_env FIRSTRADE_STATE_PREFIX
add_optional_env FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT
add_optional_env FIRSTRADE_PERSIST_STRATEGY_RUNS
add_optional_env FIRSTRADE_ENABLE_LIVE_TRADING
add_optional_env FIRSTRADE_RUN_SMOKE_ON_HTTP
add_optional_env FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ commit credentials.
| `FIRSTRADE_GCS_STATE_BUCKET` | Optional | GCS bucket for runtime state JSON, including persisted session cache and account funds snapshots |
| `FIRSTRADE_STATE_PREFIX` | Optional | Object prefix within `FIRSTRADE_GCS_STATE_BUCKET`, default `firstrade-platform` |
| `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT` | Optional | Persist compact masked account funds snapshots from `/session-check`. Defaults to `false` |
| `FIRSTRADE_PERSIST_STRATEGY_RUNS` | Optional | Persist `/run` strategy state, plans, and submitted/skipped order results to GCS. Defaults to `false` |
| `ACCOUNT_PREFIX` | Optional | Alert/log prefix, default `FIRSTRADE` |
| `ACCOUNT_REGION` | Optional | Runtime account scope, default `US` |
| `NOTIFY_LANG` | Optional | Notification language, `en` or `zh` |
Expand Down Expand Up @@ -197,6 +198,15 @@ to `accounts/<masked-account>/funds/latest.json` plus a timestamped history path
under the configured GCS prefix. Raw account IDs and login secrets are not
included in the snapshot.

When `FIRSTRADE_PERSIST_STRATEGY_RUNS=true` and a GCS state bucket is configured,
`/run` writes strategy state to
`strategy-runs/<masked-account>/<strategy-profile>/<yyyy-mm>/latest.json` plus a
timestamped history path. The record includes the planned targets, compact
portfolio snapshot, evaluation metadata, submitted orders, skipped orders, and
stage (`ORDERS_PLANNED`, `DRY_RUN_COMPLETED`, or `SUBMITTED`). For live runs,
an existing terminal record in the same account/profile/month blocks duplicate
order submission.

## Cloud Run Shape

`main.py` exposes:
Expand Down Expand Up @@ -230,6 +240,7 @@ runtime service account object read/write access, and set:
- `FIRSTRADE_REUSE_SESSION=true`
- `FIRSTRADE_PERSIST_SESSION_CACHE=true`
- `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true`
- `FIRSTRADE_PERSIST_STRATEGY_RUNS=true`
- `FIRSTRADE_GCS_STATE_BUCKET=<bucket-name>`
- `FIRSTRADE_STATE_PREFIX=firstrade-platform`
- `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true`
Expand Down
112 changes: 112 additions & 0 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import os
from collections.abc import Callable, Mapping
from datetime import datetime, timezone
from typing import Any

import pandas as pd
Expand All @@ -24,6 +25,14 @@
mask_account_id,
)
from application.runtime_broker_adapters import build_runtime_broker_adapters
from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env
from application.strategy_run_persistence import (
build_strategy_run_state,
is_duplicate_live_run,
persist_strategy_run_state,
read_latest_strategy_run_state,
resolve_strategy_run_period,
)
from decision_mapper import map_strategy_decision_to_plan
from notifications.telegram import build_sender, build_translator, render_cycle_summary
from quant_platform_kit.common.runtime_inputs import (
Expand All @@ -38,6 +47,10 @@
LIMIT_BUY_PREMIUM = 1.005


def _utcnow() -> datetime:
return datetime.now(timezone.utc)


def get_project_id() -> str | None:
return os.getenv("GOOGLE_CLOUD_PROJECT")

Expand Down Expand Up @@ -137,11 +150,15 @@ def run_strategy_cycle(
runtime_settings: PlatformRuntimeSettings | None = None,
credentials: FirstradeCredentials | None = None,
client_factory: Callable[..., FirstradeBrokerClient] = FirstradeBrokerClient,
state_store: GcsStateStore | None = None,
notification_sender: Callable[[str], None] | None = None,
env_reader: Callable[[str, str | None], str | None] = os.getenv,
) -> dict[str, Any]:
now = _utcnow()
settings = runtime_settings or load_platform_runtime_settings(project_id_resolver=get_project_id)
resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader)
store = state_store or build_gcs_state_store_from_env(env_reader)
persist_strategy_runs = bool(settings.persist_strategy_runs and store is not None)
client = _connect_client(
credentials=resolved_credentials,
live_trading_enabled=settings.live_trading_enabled,
Expand Down Expand Up @@ -193,6 +210,70 @@ def run_strategy_cycle(
plan,
threshold_usd=settings.safe_haven_cash_substitute_threshold_usd,
)
run_period = resolve_strategy_run_period(
now=now,
plan=plan,
evaluation_metadata=getattr(evaluation, "metadata", None),
)
masked_account = mask_account_id(account)
existing_run = None
if persist_strategy_runs and not settings.dry_run_only:
existing_run = read_latest_strategy_run_state(
store=store,
account=masked_account,
strategy_profile=strategy_runtime.profile,
run_period=run_period,
)
if is_duplicate_live_run(existing_run):
return {
"ok": True,
"api_kind": "unofficial-reverse-engineered",
"account": masked_account,
"strategy_profile": strategy_runtime.profile,
"strategy_display_name": strategy_runtime.display_name,
"dry_run_only": settings.dry_run_only,
"live_trading_enabled": settings.live_trading_enabled,
"session_reused": bool(getattr(client, "session_reused", False)),
"strategy_run_period": run_period,
"strategy_run_persisted": False,
"idempotency_skipped": True,
"existing_strategy_run_stage": existing_run.get("stage"),
"existing_strategy_run_as_of": existing_run.get("as_of"),
"submitted_orders": [],
"skipped_orders": [
{
"reason": "duplicate_live_strategy_run",
"run_period": run_period,
}
],
"action_done": False,
}
strategy_run_persisted = False
strategy_run_persistence_error = None
if persist_strategy_runs:
planned_state = build_strategy_run_state(
stage="ORDERS_PLANNED",
account=masked_account,
strategy_profile=strategy_runtime.profile,
strategy_display_name=strategy_runtime.display_name,
run_period=run_period,
dry_run_only=settings.dry_run_only,
live_trading_enabled=settings.live_trading_enabled,
session_reused=bool(getattr(client, "session_reused", False)),
portfolio_snapshot=plan.get("portfolio", {}),
evaluation_metadata=getattr(evaluation, "metadata", None),
plan=plan,
now=now,
)
try:
strategy_run_persisted = persist_strategy_run_state(
store=store,
state=planned_state,
now=now,
)
except Exception as exc:
strategy_run_persisted = False
strategy_run_persistence_error = f"{type(exc).__name__}: {exc}"
execution_result = execute_value_target_plan(
plan=plan,
market_data_port=market_data_port,
Expand All @@ -212,13 +293,44 @@ def run_strategy_cycle(
"dry_run_only": settings.dry_run_only,
"live_trading_enabled": settings.live_trading_enabled,
"session_reused": bool(getattr(client, "session_reused", False)),
"strategy_run_period": run_period,
"strategy_run_persisted": strategy_run_persisted,
"portfolio": plan.get("portfolio", {}),
"allocation": plan.get("allocation", {}),
"execution": plan.get("execution", {}),
"submitted_orders": list(execution_result.submitted_orders),
"skipped_orders": list(execution_result.skipped_orders),
"action_done": execution_result.action_done,
}
if strategy_run_persistence_error:
result["strategy_run_persistence_error"] = strategy_run_persistence_error
if persist_strategy_runs:
completed_state = build_strategy_run_state(
stage="DRY_RUN_COMPLETED" if settings.dry_run_only else "SUBMITTED",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't mark no-op live cycles as SUBMITTED

This sets stage to SUBMITTED for every non-dry-run cycle, even when execution_result.action_done is false and no orders were actually submitted. Because duplicate detection treats SUBMITTED as terminal, a no-op live run (for example, all deltas below threshold) will block later runs in the same month that might legitimately need to submit orders after portfolio/market changes. The persisted stage should reflect whether a submission actually occurred.

Useful? React with 👍 / 👎.

account=masked_account,
strategy_profile=strategy_runtime.profile,
strategy_display_name=strategy_runtime.display_name,
run_period=run_period,
dry_run_only=settings.dry_run_only,
live_trading_enabled=settings.live_trading_enabled,
session_reused=bool(getattr(client, "session_reused", False)),
portfolio_snapshot=plan.get("portfolio", {}),
evaluation_metadata=getattr(evaluation, "metadata", None),
plan=plan,
submitted_orders=list(execution_result.submitted_orders),
skipped_orders=list(execution_result.skipped_orders),
action_done=execution_result.action_done,
now=now,
)
try:
result["strategy_run_persisted"] = persist_strategy_run_state(
store=store,
state=completed_state,
now=now,
)
except Exception as exc:
result["strategy_run_persisted"] = False
result["strategy_run_persistence_error"] = f"{type(exc).__name__}: {exc}"
Comment on lines +331 to +333

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Fail run when terminal strategy state cannot be persisted

The duplicate-run guard depends entirely on reading a terminal persisted stage (is_duplicate_live_run), but this except swallows persistence failures after execution. In a live cycle where orders were submitted, a transient GCS write failure here leaves the latest state as ORDERS_PLANNED (or missing), so the next invocation for the same month will not trip idempotency and can submit the same orders again. For live trading, returning success after failing to persist the terminal state breaks the duplicate-submission protection this change is adding.

Useful? React with 👍 / 👎.

try:
result["notification_sent"] = _publish_cycle_notification(
result,
Expand Down
Loading