diff --git a/application/rebalance_service.py b/application/rebalance_service.py index dd7ae34..b2eb983 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -8,6 +8,7 @@ from __future__ import annotations +import json import os from collections.abc import Callable, Mapping from datetime import datetime, timezone @@ -25,6 +26,7 @@ mask_account_id, ) from application.runtime_broker_adapters import build_runtime_broker_adapters +from application.signal_snapshot import build_signal_snapshot from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env from application.strategy_run_persistence import ( build_strategy_run_state, @@ -459,6 +461,17 @@ def run_strategy_cycle( terminal_funding_block=terminal_funding_block, action_done=execution_result.action_done, ) + signal_snapshot = build_signal_snapshot( + platform="firstrade", + strategy_profile=strategy_runtime.profile, + execution={ + **dict(plan.get("execution", {}) or {}), + "latest_price_source": "firstrade_ohlc_with_live_quote_overlay", + }, + allocation=plan.get("allocation", {}), + metadata=getattr(evaluation, "metadata", None), + ) + print("signal_snapshot " + json.dumps(signal_snapshot, ensure_ascii=False), flush=True) result = { "ok": not execution_blocked, "api_kind": "unofficial-reverse-engineered", @@ -474,6 +487,7 @@ def run_strategy_cycle( "portfolio": plan.get("portfolio", {}), "allocation": plan.get("allocation", {}), "execution": plan.get("execution", {}), + "signal_snapshot": signal_snapshot, "submitted_orders": submitted_orders, "skipped_orders": skipped_orders, "execution_notes": execution_notes, diff --git a/application/signal_snapshot.py b/application/signal_snapshot.py new file mode 100644 index 0000000..4e0463d --- /dev/null +++ b/application/signal_snapshot.py @@ -0,0 +1,166 @@ +"""Shared signal snapshot payload helpers for platform reports.""" + +from __future__ import annotations + +from collections.abc import Mapping +from datetime import date, datetime, timezone +from typing import Any + +SIGNAL_SNAPSHOT_SCHEMA_VERSION = "signal_snapshot.v1" + +_INDICATOR_FIELDS = ( + "benchmark_symbol", + "benchmark_price", + "long_trend_value", + "exit_line", + "active_risk_asset", + "allocation_mode", + "trend_symbol", + "trend_price", + "trend_ma", + "trend_ma20", + "trend_ma20_slope", + "trend_rsi14", + "trend_rsi14_dynamic_threshold", + "trend_rsi14_effective_threshold", + "trend_bb_upper", + "blend_gate_volatility_delever_metric", + "blend_gate_volatility_delever_triggered", +) + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _json_safe(value: Any) -> Any: + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, date): + return value.isoformat() + if isinstance(value, Mapping): + return {str(key): _json_safe(item) for key, item in value.items()} + if isinstance(value, (list, tuple, set)): + return [_json_safe(item) for item in value] + return value + + +def _first_value(*values: Any) -> Any: + for value in values: + if value is not None and value != "": + return value + return None + + +def _merge_signal_sources(*sources: Mapping[str, Any] | None) -> dict[str, Any]: + merged: dict[str, Any] = {} + for source in sources: + if not isinstance(source, Mapping): + continue + annotations = source.get("execution_annotations") + if isinstance(annotations, Mapping): + merged.update(annotations) + merged.update(source) + return merged + + +def _normalized_numeric_mapping(value: Any) -> dict[str, float]: + if not isinstance(value, Mapping): + return {} + normalized: dict[str, float] = {} + for key, raw_value in value.items(): + symbol = str(key or "").strip().upper() + if not symbol: + continue + try: + normalized[symbol] = float(raw_value) + except (TypeError, ValueError): + continue + return normalized + + +def _target_payload( + *, + allocation: Mapping[str, Any] | None, + explicit_target_weights: Mapping[str, Any] | None, +) -> tuple[str | None, dict[str, float], dict[str, float]]: + allocation = allocation if isinstance(allocation, Mapping) else {} + target_mode = str(allocation.get("target_mode") or "").strip() or None + targets = _normalized_numeric_mapping(explicit_target_weights or allocation.get("targets")) + if target_mode == "value": + return target_mode, {}, targets + return target_mode, targets, {} + + +def build_signal_snapshot( + *, + platform: str, + strategy_profile: str | None = None, + generated_at: datetime | None = None, + diagnostics: Mapping[str, Any] | None = None, + execution: Mapping[str, Any] | None = None, + allocation: Mapping[str, Any] | None = None, + metadata: Mapping[str, Any] | None = None, + target_weights: Mapping[str, Any] | None = None, +) -> dict[str, Any]: + source = _merge_signal_sources(metadata, diagnostics, execution) + target_mode, normalized_weights, normalized_values = _target_payload( + allocation=allocation, + explicit_target_weights=target_weights, + ) + indicators = { + field: _json_safe(source[field]) + for field in _INDICATOR_FIELDS + if source.get(field) not in (None, "") + } + snapshot = { + "schema_version": SIGNAL_SNAPSHOT_SCHEMA_VERSION, + "platform": str(platform or "").strip(), + "strategy_profile": _first_value(strategy_profile, source.get("strategy_profile")), + "strategy_version": source.get("strategy_version"), + "generated_at": _json_safe(generated_at or _utcnow()), + "signal_as_of": _json_safe( + _first_value( + source.get("signal_as_of"), + source.get("signal_date"), + source.get("snapshot_as_of"), + source.get("trade_date"), + ) + ), + "market_date": _json_safe( + _first_value( + source.get("market_date"), + source.get("signal_date"), + source.get("snapshot_as_of"), + source.get("trade_date"), + ) + ), + "effective_date": _json_safe(source.get("effective_date")), + "latest_price_source": _first_value( + source.get("latest_price_source"), + source.get("price_source_mode"), + source.get("market_data_source"), + source.get("signal_source"), + ), + "quote_overlay_used": source.get("quote_overlay_used"), + "data_freshness_warning": _first_value( + source.get("data_freshness_warning"), + source.get("snapshot_price_fallback_used"), + ), + "signal": _first_value( + source.get("signal_display"), + source.get("signal_description"), + source.get("signal_message"), + ), + "status": _first_value( + source.get("status_display"), + source.get("status_description"), + source.get("market_status"), + source.get("canary_status"), + ), + "target_mode": target_mode, + "target_weights": normalized_weights, + "target_values": normalized_values, + "indicators": indicators, + } + return {key: _json_safe(value) for key, value in snapshot.items()} diff --git a/notifications/telegram.py b/notifications/telegram.py index 3af4121..c1a2313 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -517,6 +517,37 @@ def _format_timing_lines(execution: Mapping[str, Any], *, translator: Callable[. return [translator("timing_line", value=value)] +def _format_signal_snapshot_line(snapshot: Any, *, lang: str) -> str: + if not isinstance(snapshot, Mapping): + return "" + market_date = str(snapshot.get("market_date") or snapshot.get("signal_as_of") or "").strip() + source = str(snapshot.get("latest_price_source") or "").strip() + overlay = snapshot.get("quote_overlay_used") + warning = snapshot.get("data_freshness_warning") + if not market_date and not source and overlay is None and warning in (None, "", False): + return "" + use_zh = str(lang or "").lower().startswith("zh") + if use_zh: + overlay_text = "是" if overlay is True else "否" if overlay is False else "未知" + parts = [ + f"日期 {market_date or '未知'}", + f"数据源 {source or '未知'}", + f"报价覆盖 {overlay_text}", + ] + if warning not in (None, "", False): + parts.append(f"提示 {warning}") + return "🧾 信号快照: " + " | ".join(parts) + overlay_text = "yes" if overlay is True else "no" if overlay is False else "unknown" + parts = [ + f"date {market_date or 'unknown'}", + f"source {source or 'unknown'}", + f"quote overlay {overlay_text}", + ] + if warning not in (None, "", False): + parts.append(f"warning {warning}") + return "🧾 Signal snapshot: " + " | ".join(parts) + + def _first_summary(value: Any, *, translator: Callable[..., str]) -> str: text = str(value or "").strip() if not text: @@ -689,6 +720,9 @@ def render_cycle_summary(result: Mapping[str, Any], *, lang: str = "en") -> str: lines.append(SEPARATOR) lines.extend(dashboard_lines) lines.extend(_format_timing_lines(execution, translator=translator)) + signal_snapshot_line = _format_signal_snapshot_line(result.get("signal_snapshot"), lang=lang) + if signal_snapshot_line: + lines.append(signal_snapshot_line) lines.extend(_format_signal_lines(execution, translator=translator)) lines.append(SEPARATOR) lines.extend(target_diff_lines)