-
Notifications
You must be signed in to change notification settings - Fork 0
Persist Firstrade strategy run state #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
|
|
||
| import os | ||
| from collections.abc import Callable, Mapping | ||
| from datetime import datetime, timezone | ||
| from typing import Any | ||
|
|
||
| import pandas as pd | ||
|
|
@@ -24,6 +25,14 @@ | |
| mask_account_id, | ||
| ) | ||
| from application.runtime_broker_adapters import build_runtime_broker_adapters | ||
| from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env | ||
| from application.strategy_run_persistence import ( | ||
| build_strategy_run_state, | ||
| is_duplicate_live_run, | ||
| persist_strategy_run_state, | ||
| read_latest_strategy_run_state, | ||
| resolve_strategy_run_period, | ||
| ) | ||
| from decision_mapper import map_strategy_decision_to_plan | ||
| from notifications.telegram import build_sender, build_translator, render_cycle_summary | ||
| from quant_platform_kit.common.runtime_inputs import ( | ||
|
|
@@ -38,6 +47,10 @@ | |
| LIMIT_BUY_PREMIUM = 1.005 | ||
|
|
||
|
|
||
| def _utcnow() -> datetime: | ||
| return datetime.now(timezone.utc) | ||
|
|
||
|
|
||
| def get_project_id() -> str | None: | ||
| return os.getenv("GOOGLE_CLOUD_PROJECT") | ||
|
|
||
|
|
@@ -137,11 +150,15 @@ def run_strategy_cycle( | |
| runtime_settings: PlatformRuntimeSettings | None = None, | ||
| credentials: FirstradeCredentials | None = None, | ||
| client_factory: Callable[..., FirstradeBrokerClient] = FirstradeBrokerClient, | ||
| state_store: GcsStateStore | None = None, | ||
| notification_sender: Callable[[str], None] | None = None, | ||
| env_reader: Callable[[str, str | None], str | None] = os.getenv, | ||
| ) -> dict[str, Any]: | ||
| now = _utcnow() | ||
| settings = runtime_settings or load_platform_runtime_settings(project_id_resolver=get_project_id) | ||
| resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader) | ||
| store = state_store or build_gcs_state_store_from_env(env_reader) | ||
| persist_strategy_runs = bool(settings.persist_strategy_runs and store is not None) | ||
| client = _connect_client( | ||
| credentials=resolved_credentials, | ||
| live_trading_enabled=settings.live_trading_enabled, | ||
|
|
@@ -193,6 +210,70 @@ def run_strategy_cycle( | |
| plan, | ||
| threshold_usd=settings.safe_haven_cash_substitute_threshold_usd, | ||
| ) | ||
| run_period = resolve_strategy_run_period( | ||
| now=now, | ||
| plan=plan, | ||
| evaluation_metadata=getattr(evaluation, "metadata", None), | ||
| ) | ||
| masked_account = mask_account_id(account) | ||
| existing_run = None | ||
| if persist_strategy_runs and not settings.dry_run_only: | ||
| existing_run = read_latest_strategy_run_state( | ||
| store=store, | ||
| account=masked_account, | ||
| strategy_profile=strategy_runtime.profile, | ||
| run_period=run_period, | ||
| ) | ||
| if is_duplicate_live_run(existing_run): | ||
| return { | ||
| "ok": True, | ||
| "api_kind": "unofficial-reverse-engineered", | ||
| "account": masked_account, | ||
| "strategy_profile": strategy_runtime.profile, | ||
| "strategy_display_name": strategy_runtime.display_name, | ||
| "dry_run_only": settings.dry_run_only, | ||
| "live_trading_enabled": settings.live_trading_enabled, | ||
| "session_reused": bool(getattr(client, "session_reused", False)), | ||
| "strategy_run_period": run_period, | ||
| "strategy_run_persisted": False, | ||
| "idempotency_skipped": True, | ||
| "existing_strategy_run_stage": existing_run.get("stage"), | ||
| "existing_strategy_run_as_of": existing_run.get("as_of"), | ||
| "submitted_orders": [], | ||
| "skipped_orders": [ | ||
| { | ||
| "reason": "duplicate_live_strategy_run", | ||
| "run_period": run_period, | ||
| } | ||
| ], | ||
| "action_done": False, | ||
| } | ||
| strategy_run_persisted = False | ||
| strategy_run_persistence_error = None | ||
| if persist_strategy_runs: | ||
| planned_state = build_strategy_run_state( | ||
| stage="ORDERS_PLANNED", | ||
| account=masked_account, | ||
| strategy_profile=strategy_runtime.profile, | ||
| strategy_display_name=strategy_runtime.display_name, | ||
| run_period=run_period, | ||
| dry_run_only=settings.dry_run_only, | ||
| live_trading_enabled=settings.live_trading_enabled, | ||
| session_reused=bool(getattr(client, "session_reused", False)), | ||
| portfolio_snapshot=plan.get("portfolio", {}), | ||
| evaluation_metadata=getattr(evaluation, "metadata", None), | ||
| plan=plan, | ||
| now=now, | ||
| ) | ||
| try: | ||
| strategy_run_persisted = persist_strategy_run_state( | ||
| store=store, | ||
| state=planned_state, | ||
| now=now, | ||
| ) | ||
| except Exception as exc: | ||
| strategy_run_persisted = False | ||
| strategy_run_persistence_error = f"{type(exc).__name__}: {exc}" | ||
| execution_result = execute_value_target_plan( | ||
| plan=plan, | ||
| market_data_port=market_data_port, | ||
|
|
@@ -212,13 +293,44 @@ def run_strategy_cycle( | |
| "dry_run_only": settings.dry_run_only, | ||
| "live_trading_enabled": settings.live_trading_enabled, | ||
| "session_reused": bool(getattr(client, "session_reused", False)), | ||
| "strategy_run_period": run_period, | ||
| "strategy_run_persisted": strategy_run_persisted, | ||
| "portfolio": plan.get("portfolio", {}), | ||
| "allocation": plan.get("allocation", {}), | ||
| "execution": plan.get("execution", {}), | ||
| "submitted_orders": list(execution_result.submitted_orders), | ||
| "skipped_orders": list(execution_result.skipped_orders), | ||
| "action_done": execution_result.action_done, | ||
| } | ||
| if strategy_run_persistence_error: | ||
| result["strategy_run_persistence_error"] = strategy_run_persistence_error | ||
| if persist_strategy_runs: | ||
| completed_state = build_strategy_run_state( | ||
| stage="DRY_RUN_COMPLETED" if settings.dry_run_only else "SUBMITTED", | ||
| account=masked_account, | ||
| strategy_profile=strategy_runtime.profile, | ||
| strategy_display_name=strategy_runtime.display_name, | ||
| run_period=run_period, | ||
| dry_run_only=settings.dry_run_only, | ||
| live_trading_enabled=settings.live_trading_enabled, | ||
| session_reused=bool(getattr(client, "session_reused", False)), | ||
| portfolio_snapshot=plan.get("portfolio", {}), | ||
| evaluation_metadata=getattr(evaluation, "metadata", None), | ||
| plan=plan, | ||
| submitted_orders=list(execution_result.submitted_orders), | ||
| skipped_orders=list(execution_result.skipped_orders), | ||
| action_done=execution_result.action_done, | ||
| now=now, | ||
| ) | ||
| try: | ||
| result["strategy_run_persisted"] = persist_strategy_run_state( | ||
| store=store, | ||
| state=completed_state, | ||
| now=now, | ||
| ) | ||
| except Exception as exc: | ||
| result["strategy_run_persisted"] = False | ||
| result["strategy_run_persistence_error"] = f"{type(exc).__name__}: {exc}" | ||
|
Comment on lines
+331
to
+333
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The duplicate-run guard depends entirely on reading a terminal persisted stage ( Useful? React with 👍 / 👎. |
||
| try: | ||
| result["notification_sent"] = _publish_cycle_notification( | ||
| result, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sets
stagetoSUBMITTEDfor every non-dry-run cycle, even whenexecution_result.action_doneis false and no orders were actually submitted. Because duplicate detection treatsSUBMITTEDas terminal, a no-op live run (for example, all deltas below threshold) will block later runs in the same month that might legitimately need to submit orders after portfolio/market changes. The persisted stage should reflect whether a submission actually occurred.Useful? React with 👍 / 👎.