-
Notifications
You must be signed in to change notification settings - Fork 0
Add Firstrade strategy precheck runtime reports #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,9 @@ | |
|
|
||
| import os | ||
| import traceback | ||
| from dataclasses import replace | ||
| from datetime import datetime, timezone | ||
| from typing import Any | ||
|
|
||
| from flask import Flask, jsonify, request | ||
|
|
||
|
|
@@ -18,11 +20,22 @@ | |
| from application.rebalance_service import run_strategy_cycle | ||
| from application.session_check_service import run_session_check | ||
| from notifications.telegram import build_sender | ||
| from quant_platform_kit.common.runtime_reports import ( | ||
| append_runtime_report_error, | ||
| build_runtime_report_base, | ||
| finalize_runtime_report, | ||
| persist_runtime_report, | ||
| ) | ||
| from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings | ||
| from strategy_registry import get_platform_profile_status_matrix | ||
|
|
||
| app = Flask(__name__) | ||
|
|
||
|
|
||
| def get_project_id() -> str | None: | ||
| return os.getenv("GOOGLE_CLOUD_PROJECT") | ||
|
|
||
|
|
||
| def _flag(name: str, default: str = "false") -> bool: | ||
| return (os.getenv(name, default) or "").strip().lower() == "true" | ||
|
|
||
|
|
@@ -95,6 +108,170 @@ def _handle_strategy_run_exception(exc: Exception) -> bool: | |
| return _notify_runtime_error(exc) | ||
|
|
||
|
|
||
| def _build_run_id() -> str: | ||
| return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | ||
|
|
||
|
|
||
| def _service_name() -> str: | ||
| return os.getenv("K_SERVICE") or "firstrade-quant-service" | ||
|
|
||
|
|
||
| def _runtime_settings(*, dry_run_override: bool | None = None) -> PlatformRuntimeSettings: | ||
| settings = load_platform_runtime_settings(project_id_resolver=get_project_id) | ||
| if dry_run_override is None: | ||
| return settings | ||
| runtime_target = settings.runtime_target | ||
| if runtime_target is not None: | ||
| runtime_target = replace(runtime_target, dry_run_only=bool(dry_run_override)) | ||
| return replace( | ||
| settings, | ||
| dry_run_only=bool(dry_run_override), | ||
| live_trading_enabled=False if dry_run_override else settings.live_trading_enabled, | ||
| runtime_target=runtime_target, | ||
|
Comment on lines
+126
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
| ) | ||
|
|
||
|
|
||
| def _build_runtime_report(settings: PlatformRuntimeSettings, *, dry_run: bool) -> dict[str, Any]: | ||
| runtime_target = settings.runtime_target | ||
| return build_runtime_report_base( | ||
| platform="firstrade", | ||
| deploy_target="cloud_run", | ||
| service_name=_service_name(), | ||
| strategy_profile=settings.strategy_profile, | ||
| run_id=_build_run_id(), | ||
| run_source="cloud_run", | ||
| runtime_target=runtime_target, | ||
| strategy_domain=settings.strategy_domain, | ||
| account_scope=( | ||
| getattr(runtime_target, "account_scope", None) | ||
| if runtime_target is not None | ||
| else settings.account_region | ||
| ), | ||
| account_region=settings.account_region, | ||
| project_id=settings.project_id or get_project_id(), | ||
| dry_run=dry_run, | ||
| started_at=datetime.now(timezone.utc), | ||
| summary={ | ||
| "strategy_display_name": settings.strategy_display_name, | ||
| "strategy_display_name_localized": settings.strategy_display_name, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| def _plugin_report_fields(result: dict[str, Any]) -> dict[str, Any]: | ||
| fields: dict[str, Any] = {} | ||
| for key, value in result.items(): | ||
| if key == "strategy_plugins" or key.startswith("strategy_plugin_alert_"): | ||
| fields[key] = value | ||
| return fields | ||
|
|
||
|
|
||
| def _strategy_result_summary(result: dict[str, Any], *, dry_run: bool) -> dict[str, Any]: | ||
| submitted_orders = list(result.get("submitted_orders") or ()) | ||
| skipped_orders = list(result.get("skipped_orders") or ()) | ||
| execution_notes = list(result.get("execution_notes") or ()) | ||
| orders_previewed_count = len(submitted_orders) if dry_run else 0 | ||
| summary = { | ||
| "action_done": bool(result.get("action_done")), | ||
| "execution_status": result.get("strategy_run_stage"), | ||
| "order_events_count": len(submitted_orders), | ||
| "orders_previewed_count": orders_previewed_count, | ||
| "orders_skipped_count": len(skipped_orders), | ||
| "notes_count": len(execution_notes), | ||
| "dry_run_order_preview_available": bool(dry_run and orders_previewed_count > 0), | ||
| "strategy_run_period": result.get("strategy_run_period"), | ||
| "strategy_run_stage": result.get("strategy_run_stage"), | ||
| "strategy_run_persisted": result.get("strategy_run_persisted"), | ||
| "session_reused": bool(result.get("session_reused")), | ||
| "live_trading_enabled": bool(result.get("live_trading_enabled")), | ||
| **_plugin_report_fields(result), | ||
| } | ||
| if dry_run and submitted_orders: | ||
| summary["orders_previewed"] = submitted_orders | ||
| if skipped_orders: | ||
| summary["orders_skipped"] = skipped_orders | ||
| if execution_notes: | ||
| summary["execution_notes"] = execution_notes | ||
| return summary | ||
|
|
||
|
|
||
| def _strategy_result_diagnostics(result: dict[str, Any]) -> dict[str, Any]: | ||
| diagnostics: dict[str, Any] = {} | ||
| signal_snapshot = result.get("signal_snapshot") | ||
| if signal_snapshot: | ||
| diagnostics["signal_snapshot"] = signal_snapshot | ||
| for key in ( | ||
| "strategy_plugin_error", | ||
| "strategy_plugin_alert_error", | ||
| "strategy_run_persistence_error", | ||
| "execution_blocked", | ||
| "execution_block_retryable", | ||
| "execution_blocking_skips", | ||
| "funding_blocked", | ||
| "error", | ||
| ): | ||
| value = result.get(key) | ||
| if value not in (None, "", [], {}): | ||
| diagnostics[key] = value | ||
| return diagnostics | ||
|
|
||
|
|
||
| def _persist_runtime_report(report: dict[str, Any]) -> str | None: | ||
| persisted = persist_runtime_report( | ||
| report, | ||
| gcs_prefix_uri=os.getenv("EXECUTION_REPORT_GCS_URI"), | ||
| gcp_project_id=get_project_id(), | ||
| ) | ||
| if isinstance(persisted, str): | ||
| return persisted | ||
| return getattr(persisted, "gcs_uri", None) or getattr(persisted, "local_path", None) | ||
|
|
||
|
|
||
| def _run_strategy_cycle_with_report( | ||
| *, | ||
| dry_run_override: bool | None = None, | ||
| send_cycle_notification: bool = True, | ||
| dispatch_plugin_alerts: bool = True, | ||
| ) -> dict[str, Any]: | ||
| settings = _runtime_settings(dry_run_override=dry_run_override) | ||
| dry_run = bool(settings.dry_run_only) | ||
| report = _build_runtime_report(settings, dry_run=dry_run) | ||
| try: | ||
| result = run_strategy_cycle( | ||
| runtime_settings=settings, | ||
| send_cycle_notification=send_cycle_notification, | ||
| dispatch_plugin_alerts=dispatch_plugin_alerts, | ||
| ) | ||
| finalize_runtime_report( | ||
| report, | ||
| status="ok" if result.get("ok", True) else "error", | ||
| summary=_strategy_result_summary(result, dry_run=dry_run), | ||
| diagnostics=_strategy_result_diagnostics(result), | ||
| ) | ||
| try: | ||
| report_path = _persist_runtime_report(report) | ||
| if report_path: | ||
| print(f"execution_report {report_path}", flush=True) | ||
| except Exception as persist_exc: | ||
| print(f"failed to persist execution report: {persist_exc}", flush=True) | ||
| return result | ||
| except Exception as exc: | ||
| append_runtime_report_error( | ||
| report, | ||
| stage="strategy_cycle", | ||
| message=str(exc), | ||
| error_type=type(exc).__name__, | ||
| ) | ||
| finalize_runtime_report(report, status="error") | ||
| try: | ||
| report_path = _persist_runtime_report(report) | ||
| if report_path: | ||
| print(f"execution_report {report_path}", flush=True) | ||
| except Exception as persist_exc: | ||
| print(f"failed to persist execution report: {persist_exc}", flush=True) | ||
| raise | ||
|
|
||
|
|
||
| @app.get("/") | ||
| def health(): | ||
| return jsonify( | ||
|
|
@@ -196,7 +373,7 @@ def run_strategy(): | |
| 403, | ||
| ) | ||
| try: | ||
| return jsonify(run_strategy_cycle()) | ||
| return jsonify(_run_strategy_cycle_with_report()) | ||
| except (FirstradePlatformError, EnvironmentError, ValueError) as exc: | ||
| notification_attempted = _handle_strategy_run_exception(exc) | ||
| return ( | ||
|
|
@@ -226,7 +403,38 @@ def run_strategy(): | |
| @app.post("/precheck") | ||
| @app.get("/precheck") | ||
| def precheck(): | ||
| return health() | ||
| try: | ||
| return jsonify( | ||
| _run_strategy_cycle_with_report( | ||
| dry_run_override=True, | ||
| send_cycle_notification=False, | ||
| dispatch_plugin_alerts=False, | ||
| ) | ||
| ) | ||
| except (FirstradePlatformError, EnvironmentError, ValueError) as exc: | ||
| notification_attempted = _handle_strategy_run_exception(exc) | ||
| return ( | ||
| jsonify( | ||
| { | ||
| "ok": False, | ||
| "error": str(exc), | ||
| "runtime_error_notification_attempted": notification_attempted, | ||
| } | ||
| ), | ||
| 500, | ||
| ) | ||
| except Exception as exc: | ||
| notification_attempted = _handle_strategy_run_exception(exc) | ||
| return ( | ||
| jsonify( | ||
| { | ||
| "ok": False, | ||
| "error": f"{type(exc).__name__}: {exc}", | ||
| "runtime_error_notification_attempted": notification_attempted, | ||
| } | ||
| ), | ||
| 500, | ||
| ) | ||
|
|
||
|
|
||
| @app.post("/probe") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This run ID only has one-second resolution, but
persist_runtime_reportderives the local/GCS report path fromrun_id. If Cloud Run handles two requests for the same strategy/account in the same second (for example a scheduler retry overlapping a manual/run, with concurrency enabled), the later report overwrites the earlier one, losing one runtime record. Include a unique suffix such as a UUID or request ID in addition to the timestamp.Useful? React with 👍 / 👎.