Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ full guarded strategy cycle:
`GLOBAL_TELEGRAM_CHAT_ID` are configured
- send independent SMTP email alerts for escalated strategy plugin signals when
`CRISIS_ALERT_*` is configured
- write email alert results into the response and suppress duplicate plugin
alert keys through `STRATEGY_PLUGIN_ALERT_STATE_GCS_URI`, `EXECUTION_REPORT_GCS_URI`,
or the configured Firstrade state bucket

The default mode remains dry-run. A live HTTP-triggered strategy order requires
all of these gates:
Expand Down Expand Up @@ -313,6 +316,8 @@ Firstrade 登录、账户/行情读取、下单转换、安全闸和部署 wirin
- 配置 `TELEGRAM_TOKEN` 和 `GLOBAL_TELEGRAM_CHAT_ID` 后发送运行摘要
- 读取通用策略插件信号,并在危机类插件触发时通过 `CRISIS_ALERT_*`
配置发送独立邮件告警
- 在响应中写入邮件告警结果,并通过 `STRATEGY_PLUGIN_ALERT_STATE_GCS_URI`、
`EXECUTION_REPORT_GCS_URI` 或已配置的 Firstrade state bucket 抑制重复插件告警 key
- 在你再次确认后,才允许极小金额实盘验证
- 通用 `us_equity` 策略 profile 的平台层接入

Expand Down
98 changes: 53 additions & 45 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@
required_semiconductor_rotation_history_lookback,
)
from quant_platform_kit.common.strategy_plugins import (
build_strategy_plugin_alert_messages,
build_strategy_plugin_notification_lines,
build_strategy_plugin_report_payload,
load_configured_strategy_plugin_signals,
parse_strategy_plugin_mounts,
)
from quant_platform_kit.notifications.email import send_smtp_email
from quant_platform_kit.notifications.events import NotificationPublisher, RenderedNotification
from quant_platform_kit.notifications.strategy_plugin_email import (
StrategyPluginEmailAlertMarkerStore,
build_strategy_plugin_alert_context_label as build_email_alert_context_label,
publish_strategy_plugin_email_alerts,
)
from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs
from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings
from strategy_runtime import load_strategy_runtime
Expand Down Expand Up @@ -208,36 +211,30 @@ def attach_strategy_plugin_result(
return result


def _call_log_message(log_message: Callable[..., Any], text: str) -> None:
try:
log_message(text, flush=True)
except TypeError:
log_message(text)
def build_strategy_plugin_alert_context_label(settings: PlatformRuntimeSettings) -> str:
return build_email_alert_context_label(
platform_id="firstrade",
strategy_profile=settings.strategy_profile,
account_scope=settings.account_region or settings.account_prefix,
service_name=settings.account_prefix,
runtime_target=settings.runtime_target,
)


def send_crisis_alert_email(
alert_message,
*,
def build_strategy_plugin_alert_store(
settings: PlatformRuntimeSettings,
smtp_module=None,
log_message: Callable[..., Any] = print,
) -> bool:
send_kwargs: dict[str, Any] = {}
if smtp_module is not None:
send_kwargs["smtp_module"] = smtp_module
return send_smtp_email(
subject=alert_message.subject,
body=alert_message.body,
smtp_host=getattr(settings, "crisis_alert_smtp_host", None),
smtp_port=getattr(settings, "crisis_alert_smtp_port", 587),
sender=getattr(settings, "crisis_alert_email_from", None),
recipients=getattr(settings, "crisis_alert_email_to", ()),
username=getattr(settings, "crisis_alert_smtp_username", None),
password=getattr(settings, "crisis_alert_smtp_password", None),
use_starttls=getattr(settings, "crisis_alert_smtp_starttls", True),
use_ssl=getattr(settings, "crisis_alert_smtp_ssl", False),
printer=lambda text, **_kwargs: _call_log_message(log_message, text),
**send_kwargs,
*,
env_reader: Callable[[str, str | None], str | None] = os.getenv,
):
explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None)
report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None)
state_bucket = env_reader("FIRSTRADE_GCS_STATE_BUCKET", None)
state_prefix = env_reader("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform"
state_gcs_uri = f"gs://{state_bucket}/{state_prefix}" if state_bucket else None
return StrategyPluginEmailAlertMarkerStore(
local_dir=env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) or "/tmp/quant_strategy_plugin_alerts",
gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or state_gcs_uri,
gcp_project_id=settings.project_id,
)


Expand All @@ -247,22 +244,17 @@ def publish_strategy_plugin_alerts(
settings: PlatformRuntimeSettings,
translator: Callable[..., str],
log_message: Callable[..., Any] = print,
) -> int:
sent_count = 0
for alert_message in build_strategy_plugin_alert_messages(
env_reader: Callable[[str, str | None], str | None] = os.getenv,
):
return publish_strategy_plugin_email_alerts(
signals,
email_settings=settings,
translator=translator,
strategy_label=settings.strategy_profile,
):
if send_crisis_alert_email(
alert_message,
settings=settings,
log_message=log_message,
):
sent_count += 1
if sent_count:
_call_log_message(log_message, f"strategy_plugin_alert_email_sent count={sent_count}")
return sent_count
context_label=build_strategy_plugin_alert_context_label(settings),
alert_store=build_strategy_plugin_alert_store(settings, env_reader=env_reader),
log_message=log_message,
)


def _runtime_metadata_with_execution_policy(
Expand Down Expand Up @@ -388,21 +380,26 @@ def run_strategy_cycle(
}
],
"action_done": False,
"strategy_plugin_alert_email_attempted_count": 0,
"strategy_plugin_alert_email_sent_count": 0,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [],
}
return attach_strategy_plugin_result(
result,
signals=strategy_plugin_signals,
error=strategy_plugin_error,
translator=translator,
)
strategy_plugin_alert_email_sent_count = 0
strategy_plugin_alert_email_result = None
strategy_plugin_alert_email_error = None
try:
strategy_plugin_alert_email_sent_count = publish_strategy_plugin_alerts(
strategy_plugin_alert_email_result = publish_strategy_plugin_alerts(
strategy_plugin_signals,
settings=settings,
translator=translator,
env_reader=env_reader,
)
except Exception as exc:
strategy_plugin_alert_email_error = f"{type(exc).__name__}: {exc}"
Expand Down Expand Up @@ -482,7 +479,18 @@ def run_strategy_cycle(
result["funding_blocked"] = True
if strategy_run_persistence_error:
result["strategy_run_persistence_error"] = strategy_run_persistence_error
result["strategy_plugin_alert_email_sent_count"] = strategy_plugin_alert_email_sent_count
if strategy_plugin_alert_email_result is not None:
result.update(strategy_plugin_alert_email_result.to_report_fields())
else:
result.update(
{
"strategy_plugin_alert_email_attempted_count": 0,
"strategy_plugin_alert_email_sent_count": 0,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [],
}
)
if strategy_plugin_alert_email_error:
result["strategy_plugin_alert_email_error"] = strategy_plugin_alert_email_error
attach_strategy_plugin_result(
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
flask
gunicorn
firstrade==0.0.38
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@1b6febbba7df81179ad7579f430c26a811c0e1a8
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@1636271a3e0c17fc0c5da363f67eabe114eeff48
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@ba67541711228f5a72a294def0e5cc24cc5479f3
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@305f2cc0748ec08d001deabc3add6c4eff7fe7ba
google-cloud-storage
requests
pytest
26 changes: 20 additions & 6 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,23 @@ def test_run_strategy_cycle_loads_strategy_plugin_report_and_sends_email(
"application.rebalance_service.load_strategy_runtime",
lambda *_args, **_kwargs: FakeStrategyRuntime(),
)
monkeypatch.setattr(
"application.rebalance_service.send_crisis_alert_email",
lambda alert_message, **_kwargs: observed_alerts.append(alert_message) or True,
)

def fake_publish(signals, **kwargs):
observed_alerts.append((tuple(signals), kwargs))
return SimpleNamespace(
sent_count=1,
to_report_fields=lambda: {
"strategy_plugin_alert_email_attempted_count": 1,
"strategy_plugin_alert_email_sent_count": 1,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [
{"subject": "Crisis plugin alert", "status": "sent"}
],
},
)

monkeypatch.setattr("application.rebalance_service.publish_strategy_plugin_email_alerts", fake_publish)

result = run_strategy_cycle(
runtime_settings=settings,
Expand All @@ -258,8 +271,9 @@ def test_run_strategy_cycle_loads_strategy_plugin_report_and_sends_email(
"🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend",
)
assert len(observed_alerts) == 1
assert observed_alerts[0].subject == "🚨 Crisis plugin alert: Crisis Watch Notice | true crisis"
assert "Would trade if enabled: true" in observed_alerts[0].body
assert observed_alerts[0][0][0].canonical_route == "true_crisis"
assert "firstrade" in observed_alerts[0][1]["context_label"]
assert result["strategy_plugin_alert_email_deliveries"][0]["status"] == "sent"
assert "🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend" in messages[0]


Expand Down