Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import json
import os
from collections.abc import Callable, Mapping
from datetime import datetime, timezone
Expand All @@ -25,6 +26,7 @@
mask_account_id,
)
from application.runtime_broker_adapters import build_runtime_broker_adapters
from application.signal_snapshot import build_signal_snapshot
from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env
from application.strategy_run_persistence import (
build_strategy_run_state,
Expand Down Expand Up @@ -459,6 +461,17 @@ def run_strategy_cycle(
terminal_funding_block=terminal_funding_block,
action_done=execution_result.action_done,
)
signal_snapshot = build_signal_snapshot(
platform="firstrade",
strategy_profile=strategy_runtime.profile,
execution={
**dict(plan.get("execution", {}) or {}),
"latest_price_source": "firstrade_ohlc_with_live_quote_overlay",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Record whether the quote overlay was actually used

When a Firstrade quote lookup fails or returns an unusable quote, build_market_data_port() catches that exception and evaluates the strategy with OHLC history only, but this hard-coded source still records every snapshot as firstrade_ohlc_with_live_quote_overlay. In those fallback runs the JSON result, log line, and Telegram summary all claim a live quote overlay influenced the signal even though it did not, which undermines the snapshot's audit value; the market data adapter needs to expose whether the overlay was applied instead of setting this unconditionally.

Useful? React with 👍 / 👎.

},
allocation=plan.get("allocation", {}),
metadata=getattr(evaluation, "metadata", None),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include decision diagnostics in signal snapshots

For strategies that emit standard indicators in evaluation.decision.diagnostics (for example trend_ma20, trend_rsi14, or blend_gate_volatility_delever_triggered), this call never passes those diagnostics to build_signal_snapshot(). The value-target plan only carries the execution annotation subset, so these indicator fields are silently omitted from signal_snapshot.v1 even though the snapshot helper explicitly supports a diagnostics source; pass the decision diagnostics here as well.

Useful? React with 👍 / 👎.

)
print("signal_snapshot " + json.dumps(signal_snapshot, ensure_ascii=False), flush=True)
result = {
"ok": not execution_blocked,
"api_kind": "unofficial-reverse-engineered",
Expand All @@ -474,6 +487,7 @@ def run_strategy_cycle(
"portfolio": plan.get("portfolio", {}),
"allocation": plan.get("allocation", {}),
"execution": plan.get("execution", {}),
"signal_snapshot": signal_snapshot,
"submitted_orders": submitted_orders,
"skipped_orders": skipped_orders,
"execution_notes": execution_notes,
Expand Down
166 changes: 166 additions & 0 deletions application/signal_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Shared signal snapshot payload helpers for platform reports."""

from __future__ import annotations

from collections.abc import Mapping
from datetime import date, datetime, timezone
from typing import Any

SIGNAL_SNAPSHOT_SCHEMA_VERSION = "signal_snapshot.v1"

_INDICATOR_FIELDS = (
"benchmark_symbol",
"benchmark_price",
"long_trend_value",
"exit_line",
"active_risk_asset",
"allocation_mode",
"trend_symbol",
"trend_price",
"trend_ma",
"trend_ma20",
"trend_ma20_slope",
"trend_rsi14",
"trend_rsi14_dynamic_threshold",
"trend_rsi14_effective_threshold",
"trend_bb_upper",
"blend_gate_volatility_delever_metric",
"blend_gate_volatility_delever_triggered",
)


def _utcnow() -> datetime:
return datetime.now(timezone.utc)


def _json_safe(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Mapping):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
return value


def _first_value(*values: Any) -> Any:
for value in values:
if value is not None and value != "":
return value
return None


def _merge_signal_sources(*sources: Mapping[str, Any] | None) -> dict[str, Any]:
merged: dict[str, Any] = {}
for source in sources:
if not isinstance(source, Mapping):
continue
annotations = source.get("execution_annotations")
if isinstance(annotations, Mapping):
merged.update(annotations)
merged.update(source)
return merged


def _normalized_numeric_mapping(value: Any) -> dict[str, float]:
if not isinstance(value, Mapping):
return {}
normalized: dict[str, float] = {}
for key, raw_value in value.items():
symbol = str(key or "").strip().upper()
if not symbol:
continue
try:
normalized[symbol] = float(raw_value)
except (TypeError, ValueError):
continue
return normalized


def _target_payload(
*,
allocation: Mapping[str, Any] | None,
explicit_target_weights: Mapping[str, Any] | None,
) -> tuple[str | None, dict[str, float], dict[str, float]]:
allocation = allocation if isinstance(allocation, Mapping) else {}
target_mode = str(allocation.get("target_mode") or "").strip() or None
targets = _normalized_numeric_mapping(explicit_target_weights or allocation.get("targets"))
if target_mode == "value":
return target_mode, {}, targets
return target_mode, targets, {}


def build_signal_snapshot(
*,
platform: str,
strategy_profile: str | None = None,
generated_at: datetime | None = None,
diagnostics: Mapping[str, Any] | None = None,
execution: Mapping[str, Any] | None = None,
allocation: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
target_weights: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
source = _merge_signal_sources(metadata, diagnostics, execution)
target_mode, normalized_weights, normalized_values = _target_payload(
allocation=allocation,
explicit_target_weights=target_weights,
)
indicators = {
field: _json_safe(source[field])
for field in _INDICATOR_FIELDS
if source.get(field) not in (None, "")
}
snapshot = {
"schema_version": SIGNAL_SNAPSHOT_SCHEMA_VERSION,
"platform": str(platform or "").strip(),
"strategy_profile": _first_value(strategy_profile, source.get("strategy_profile")),
"strategy_version": source.get("strategy_version"),
"generated_at": _json_safe(generated_at or _utcnow()),
"signal_as_of": _json_safe(
_first_value(
source.get("signal_as_of"),
source.get("signal_date"),
source.get("snapshot_as_of"),
source.get("trade_date"),
)
),
"market_date": _json_safe(
_first_value(
source.get("market_date"),
source.get("signal_date"),
source.get("snapshot_as_of"),
source.get("trade_date"),
)
),
"effective_date": _json_safe(source.get("effective_date")),
"latest_price_source": _first_value(
source.get("latest_price_source"),
source.get("price_source_mode"),
source.get("market_data_source"),
source.get("signal_source"),
),
"quote_overlay_used": source.get("quote_overlay_used"),
"data_freshness_warning": _first_value(
source.get("data_freshness_warning"),
source.get("snapshot_price_fallback_used"),
),
"signal": _first_value(
source.get("signal_display"),
source.get("signal_description"),
source.get("signal_message"),
),
"status": _first_value(
source.get("status_display"),
source.get("status_description"),
source.get("market_status"),
source.get("canary_status"),
),
"target_mode": target_mode,
"target_weights": normalized_weights,
"target_values": normalized_values,
"indicators": indicators,
}
return {key: _json_safe(value) for key, value in snapshot.items()}
34 changes: 34 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,37 @@ def _format_timing_lines(execution: Mapping[str, Any], *, translator: Callable[.
return [translator("timing_line", value=value)]


def _format_signal_snapshot_line(snapshot: Any, *, lang: str) -> str:
if not isinstance(snapshot, Mapping):
return ""
market_date = str(snapshot.get("market_date") or snapshot.get("signal_as_of") or "").strip()
source = str(snapshot.get("latest_price_source") or "").strip()
overlay = snapshot.get("quote_overlay_used")
warning = snapshot.get("data_freshness_warning")
if not market_date and not source and overlay is None and warning in (None, "", False):
return ""
use_zh = str(lang or "").lower().startswith("zh")
if use_zh:
overlay_text = "是" if overlay is True else "否" if overlay is False else "未知"
parts = [
f"日期 {market_date or '未知'}",
f"数据源 {source or '未知'}",
f"报价覆盖 {overlay_text}",
]
if warning not in (None, "", False):
parts.append(f"提示 {warning}")
return "🧾 信号快照: " + " | ".join(parts)
overlay_text = "yes" if overlay is True else "no" if overlay is False else "unknown"
parts = [
f"date {market_date or 'unknown'}",
f"source {source or 'unknown'}",
f"quote overlay {overlay_text}",
]
if warning not in (None, "", False):
parts.append(f"warning {warning}")
return "🧾 Signal snapshot: " + " | ".join(parts)


def _first_summary(value: Any, *, translator: Callable[..., str]) -> str:
text = str(value or "").strip()
if not text:
Expand Down Expand Up @@ -689,6 +720,9 @@ def render_cycle_summary(result: Mapping[str, Any], *, lang: str = "en") -> str:
lines.append(SEPARATOR)
lines.extend(dashboard_lines)
lines.extend(_format_timing_lines(execution, translator=translator))
signal_snapshot_line = _format_signal_snapshot_line(result.get("signal_snapshot"), lang=lang)
if signal_snapshot_line:
lines.append(signal_snapshot_line)
lines.extend(_format_signal_lines(execution, translator=translator))
lines.append(SEPARATOR)
lines.extend(target_diff_lines)
Expand Down