-
Notifications
You must be signed in to change notification settings - Fork 0
Add standard signal snapshots #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import os | ||
| from collections.abc import Callable, Mapping | ||
| from datetime import datetime, timezone | ||
|
|
@@ -25,6 +26,7 @@ | |
| mask_account_id, | ||
| ) | ||
| from application.runtime_broker_adapters import build_runtime_broker_adapters | ||
| from application.signal_snapshot import build_signal_snapshot | ||
| from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env | ||
| from application.strategy_run_persistence import ( | ||
| build_strategy_run_state, | ||
|
|
@@ -459,6 +461,17 @@ def run_strategy_cycle( | |
| terminal_funding_block=terminal_funding_block, | ||
| action_done=execution_result.action_done, | ||
| ) | ||
| signal_snapshot = build_signal_snapshot( | ||
| platform="firstrade", | ||
| strategy_profile=strategy_runtime.profile, | ||
| execution={ | ||
| **dict(plan.get("execution", {}) or {}), | ||
| "latest_price_source": "firstrade_ohlc_with_live_quote_overlay", | ||
| }, | ||
| allocation=plan.get("allocation", {}), | ||
| metadata=getattr(evaluation, "metadata", None), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For strategies that emit standard indicators in Useful? React with 👍 / 👎. |
||
| ) | ||
| print("signal_snapshot " + json.dumps(signal_snapshot, ensure_ascii=False), flush=True) | ||
| result = { | ||
| "ok": not execution_blocked, | ||
| "api_kind": "unofficial-reverse-engineered", | ||
|
|
@@ -474,6 +487,7 @@ def run_strategy_cycle( | |
| "portfolio": plan.get("portfolio", {}), | ||
| "allocation": plan.get("allocation", {}), | ||
| "execution": plan.get("execution", {}), | ||
| "signal_snapshot": signal_snapshot, | ||
| "submitted_orders": submitted_orders, | ||
| "skipped_orders": skipped_orders, | ||
| "execution_notes": execution_notes, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| """Shared signal snapshot payload helpers for platform reports.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Mapping | ||
| from datetime import date, datetime, timezone | ||
| from typing import Any | ||
|
|
||
| SIGNAL_SNAPSHOT_SCHEMA_VERSION = "signal_snapshot.v1" | ||
|
|
||
| _INDICATOR_FIELDS = ( | ||
| "benchmark_symbol", | ||
| "benchmark_price", | ||
| "long_trend_value", | ||
| "exit_line", | ||
| "active_risk_asset", | ||
| "allocation_mode", | ||
| "trend_symbol", | ||
| "trend_price", | ||
| "trend_ma", | ||
| "trend_ma20", | ||
| "trend_ma20_slope", | ||
| "trend_rsi14", | ||
| "trend_rsi14_dynamic_threshold", | ||
| "trend_rsi14_effective_threshold", | ||
| "trend_bb_upper", | ||
| "blend_gate_volatility_delever_metric", | ||
| "blend_gate_volatility_delever_triggered", | ||
| ) | ||
|
|
||
|
|
||
| def _utcnow() -> datetime: | ||
| return datetime.now(timezone.utc) | ||
|
|
||
|
|
||
| def _json_safe(value: Any) -> Any: | ||
| if isinstance(value, datetime): | ||
| return value.isoformat() | ||
| if isinstance(value, date): | ||
| return value.isoformat() | ||
| if isinstance(value, Mapping): | ||
| return {str(key): _json_safe(item) for key, item in value.items()} | ||
| if isinstance(value, (list, tuple, set)): | ||
| return [_json_safe(item) for item in value] | ||
| return value | ||
|
|
||
|
|
||
| def _first_value(*values: Any) -> Any: | ||
| for value in values: | ||
| if value is not None and value != "": | ||
| return value | ||
| return None | ||
|
|
||
|
|
||
| def _merge_signal_sources(*sources: Mapping[str, Any] | None) -> dict[str, Any]: | ||
| merged: dict[str, Any] = {} | ||
| for source in sources: | ||
| if not isinstance(source, Mapping): | ||
| continue | ||
| annotations = source.get("execution_annotations") | ||
| if isinstance(annotations, Mapping): | ||
| merged.update(annotations) | ||
| merged.update(source) | ||
| return merged | ||
|
|
||
|
|
||
| def _normalized_numeric_mapping(value: Any) -> dict[str, float]: | ||
| if not isinstance(value, Mapping): | ||
| return {} | ||
| normalized: dict[str, float] = {} | ||
| for key, raw_value in value.items(): | ||
| symbol = str(key or "").strip().upper() | ||
| if not symbol: | ||
| continue | ||
| try: | ||
| normalized[symbol] = float(raw_value) | ||
| except (TypeError, ValueError): | ||
| continue | ||
| return normalized | ||
|
|
||
|
|
||
| def _target_payload( | ||
| *, | ||
| allocation: Mapping[str, Any] | None, | ||
| explicit_target_weights: Mapping[str, Any] | None, | ||
| ) -> tuple[str | None, dict[str, float], dict[str, float]]: | ||
| allocation = allocation if isinstance(allocation, Mapping) else {} | ||
| target_mode = str(allocation.get("target_mode") or "").strip() or None | ||
| targets = _normalized_numeric_mapping(explicit_target_weights or allocation.get("targets")) | ||
| if target_mode == "value": | ||
| return target_mode, {}, targets | ||
| return target_mode, targets, {} | ||
|
|
||
|
|
||
| def build_signal_snapshot( | ||
| *, | ||
| platform: str, | ||
| strategy_profile: str | None = None, | ||
| generated_at: datetime | None = None, | ||
| diagnostics: Mapping[str, Any] | None = None, | ||
| execution: Mapping[str, Any] | None = None, | ||
| allocation: Mapping[str, Any] | None = None, | ||
| metadata: Mapping[str, Any] | None = None, | ||
| target_weights: Mapping[str, Any] | None = None, | ||
| ) -> dict[str, Any]: | ||
| source = _merge_signal_sources(metadata, diagnostics, execution) | ||
| target_mode, normalized_weights, normalized_values = _target_payload( | ||
| allocation=allocation, | ||
| explicit_target_weights=target_weights, | ||
| ) | ||
| indicators = { | ||
| field: _json_safe(source[field]) | ||
| for field in _INDICATOR_FIELDS | ||
| if source.get(field) not in (None, "") | ||
| } | ||
| snapshot = { | ||
| "schema_version": SIGNAL_SNAPSHOT_SCHEMA_VERSION, | ||
| "platform": str(platform or "").strip(), | ||
| "strategy_profile": _first_value(strategy_profile, source.get("strategy_profile")), | ||
| "strategy_version": source.get("strategy_version"), | ||
| "generated_at": _json_safe(generated_at or _utcnow()), | ||
| "signal_as_of": _json_safe( | ||
| _first_value( | ||
| source.get("signal_as_of"), | ||
| source.get("signal_date"), | ||
| source.get("snapshot_as_of"), | ||
| source.get("trade_date"), | ||
| ) | ||
| ), | ||
| "market_date": _json_safe( | ||
| _first_value( | ||
| source.get("market_date"), | ||
| source.get("signal_date"), | ||
| source.get("snapshot_as_of"), | ||
| source.get("trade_date"), | ||
| ) | ||
| ), | ||
| "effective_date": _json_safe(source.get("effective_date")), | ||
| "latest_price_source": _first_value( | ||
| source.get("latest_price_source"), | ||
| source.get("price_source_mode"), | ||
| source.get("market_data_source"), | ||
| source.get("signal_source"), | ||
| ), | ||
| "quote_overlay_used": source.get("quote_overlay_used"), | ||
| "data_freshness_warning": _first_value( | ||
| source.get("data_freshness_warning"), | ||
| source.get("snapshot_price_fallback_used"), | ||
| ), | ||
| "signal": _first_value( | ||
| source.get("signal_display"), | ||
| source.get("signal_description"), | ||
| source.get("signal_message"), | ||
| ), | ||
| "status": _first_value( | ||
| source.get("status_display"), | ||
| source.get("status_description"), | ||
| source.get("market_status"), | ||
| source.get("canary_status"), | ||
| ), | ||
| "target_mode": target_mode, | ||
| "target_weights": normalized_weights, | ||
| "target_values": normalized_values, | ||
| "indicators": indicators, | ||
| } | ||
| return {key: _json_safe(value) for key, value in snapshot.items()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a Firstrade quote lookup fails or returns an unusable quote,
build_market_data_port()catches that exception and evaluates the strategy with OHLC history only, but this hard-coded source still records every snapshot asfirstrade_ohlc_with_live_quote_overlay. In those fallback runs the JSON result, log line, and Telegram summary all claim a live quote overlay influenced the signal even though it did not, which undermines the snapshot's audit value; the market data adapter needs to expose whether the overlay was applied instead of setting this unconditionally.Useful? React with 👍 / 👎.