From 5d2b8f55f0445b3e481b02e5f1e39168772aabe2 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Mon, 25 May 2026 20:42:31 +0800 Subject: [PATCH] Use shared strategy plugin alert dispatcher --- application/rebalance_service.py | 130 ++++++++++--------------------- pyproject.toml | 4 +- requirements.txt | 4 +- tests/test_rebalance_service.py | 34 ++++---- 4 files changed, 58 insertions(+), 114 deletions(-) diff --git a/application/rebalance_service.py b/application/rebalance_service.py index cb65067..2e6aa33 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -51,14 +51,10 @@ parse_strategy_plugin_mounts, ) from quant_platform_kit.notifications.events import NotificationPublisher, RenderedNotification -from quant_platform_kit.notifications.strategy_plugin_email import ( - StrategyPluginEmailAlertMarkerStore, - build_strategy_plugin_alert_context_label as build_email_alert_context_label, - publish_strategy_plugin_email_alerts, -) -from quant_platform_kit.notifications.strategy_plugin_sms import ( - StrategyPluginSmsAlertMarkerStore, - publish_strategy_plugin_sms_alerts, +from quant_platform_kit.notifications.strategy_plugin_alerts import ( + StrategyPluginAlertStateSettings, + build_strategy_plugin_alert_context_label as build_alert_context_label, + publish_strategy_plugin_alerts as dispatch_strategy_plugin_alerts, ) from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings @@ -216,7 +212,7 @@ def attach_strategy_plugin_result( def build_strategy_plugin_alert_context_label(settings: PlatformRuntimeSettings) -> str: - return build_email_alert_context_label( + return build_alert_context_label( platform_id="firstrade", strategy_profile=settings.strategy_profile, account_scope=settings.account_region or settings.account_prefix, @@ -225,52 +221,21 @@ def build_strategy_plugin_alert_context_label(settings: PlatformRuntimeSettings) ) -def build_strategy_plugin_alert_store( +def build_strategy_plugin_alert_state_settings( settings: PlatformRuntimeSettings, *, env_reader: Callable[[str, str | None], str | None] = os.getenv, ): - explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None) - report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None) state_bucket = env_reader("FIRSTRADE_GCS_STATE_BUCKET", None) state_prefix = env_reader("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform" state_gcs_uri = f"gs://{state_bucket}/{state_prefix}" if state_bucket else None - return StrategyPluginEmailAlertMarkerStore( - local_dir=env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) or "/tmp/quant_strategy_plugin_alerts", - gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or state_gcs_uri, + return StrategyPluginAlertStateSettings.from_env( + env_reader=env_reader, gcp_project_id=settings.project_id, + fallback_gcs_prefix_uri=state_gcs_uri, ) -def build_strategy_plugin_sms_alert_store( - settings: PlatformRuntimeSettings, - *, - env_reader: Callable[[str, str | None], str | None] = os.getenv, -): - explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None) - report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None) - state_bucket = env_reader("FIRSTRADE_GCS_STATE_BUCKET", None) - state_prefix = env_reader("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform" - state_gcs_uri = f"gs://{state_bucket}/{state_prefix}" if state_bucket else None - return StrategyPluginSmsAlertMarkerStore( - local_dir=env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) or "/tmp/quant_strategy_plugin_alerts", - gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or state_gcs_uri, - gcp_project_id=settings.project_id, - ) - - -class StrategyPluginAlertPublishResults: - def __init__(self, *, email_result, sms_result): - self.email_result = email_result - self.sms_result = sms_result - - def to_report_fields(self) -> dict[str, Any]: - fields: dict[str, Any] = {} - fields.update(self.email_result.to_report_fields()) - fields.update(self.sms_result.to_report_fields()) - return fields - - def publish_strategy_plugin_alerts( signals, *, @@ -279,25 +244,34 @@ def publish_strategy_plugin_alerts( log_message: Callable[..., Any] = print, env_reader: Callable[[str, str | None], str | None] = os.getenv, ): - email_result = publish_strategy_plugin_email_alerts( + return dispatch_strategy_plugin_alerts( signals, - email_settings=settings, + notification_settings=settings, translator=translator, strategy_label=settings.strategy_profile, context_label=build_strategy_plugin_alert_context_label(settings), - alert_store=build_strategy_plugin_alert_store(settings, env_reader=env_reader), + state_settings=build_strategy_plugin_alert_state_settings(settings, env_reader=env_reader), log_message=log_message, ) - sms_result = publish_strategy_plugin_sms_alerts( - signals, - sms_settings=settings, - translator=translator, - strategy_label=settings.strategy_profile, - context_label=build_strategy_plugin_alert_context_label(settings), - alert_store=build_strategy_plugin_sms_alert_store(settings, env_reader=env_reader), - log_message=log_message, - ) - return StrategyPluginAlertPublishResults(email_result=email_result, sms_result=sms_result) + + +def empty_strategy_plugin_alert_report_fields() -> dict[str, Any]: + return { + "strategy_plugin_alert_attempted_count": 0, + "strategy_plugin_alert_sent_count": 0, + "strategy_plugin_alert_skipped_count": 0, + "strategy_plugin_alert_failed_count": 0, + "strategy_plugin_alert_email_attempted_count": 0, + "strategy_plugin_alert_email_sent_count": 0, + "strategy_plugin_alert_email_skipped_count": 0, + "strategy_plugin_alert_email_failed_count": 0, + "strategy_plugin_alert_email_deliveries": [], + "strategy_plugin_alert_sms_attempted_count": 0, + "strategy_plugin_alert_sms_sent_count": 0, + "strategy_plugin_alert_sms_skipped_count": 0, + "strategy_plugin_alert_sms_failed_count": 0, + "strategy_plugin_alert_sms_deliveries": [], + } def _runtime_metadata_with_execution_policy( @@ -423,16 +397,7 @@ def run_strategy_cycle( } ], "action_done": False, - "strategy_plugin_alert_email_attempted_count": 0, - "strategy_plugin_alert_email_sent_count": 0, - "strategy_plugin_alert_email_skipped_count": 0, - "strategy_plugin_alert_email_failed_count": 0, - "strategy_plugin_alert_email_deliveries": [], - "strategy_plugin_alert_sms_attempted_count": 0, - "strategy_plugin_alert_sms_sent_count": 0, - "strategy_plugin_alert_sms_skipped_count": 0, - "strategy_plugin_alert_sms_failed_count": 0, - "strategy_plugin_alert_sms_deliveries": [], + **empty_strategy_plugin_alert_report_fields(), } return attach_strategy_plugin_result( result, @@ -440,17 +405,17 @@ def run_strategy_cycle( error=strategy_plugin_error, translator=translator, ) - strategy_plugin_alert_email_result = None - strategy_plugin_alert_email_error = None + strategy_plugin_alert_result = None + strategy_plugin_alert_error = None try: - strategy_plugin_alert_email_result = publish_strategy_plugin_alerts( + strategy_plugin_alert_result = publish_strategy_plugin_alerts( strategy_plugin_signals, settings=settings, translator=translator, env_reader=env_reader, ) except Exception as exc: - strategy_plugin_alert_email_error = f"{type(exc).__name__}: {exc}" + strategy_plugin_alert_error = f"{type(exc).__name__}: {exc}" strategy_run_persisted = False strategy_run_persistence_error = None if persist_strategy_runs: @@ -527,25 +492,12 @@ def run_strategy_cycle( result["funding_blocked"] = True if strategy_run_persistence_error: result["strategy_run_persistence_error"] = strategy_run_persistence_error - if strategy_plugin_alert_email_result is not None: - result.update(strategy_plugin_alert_email_result.to_report_fields()) + if strategy_plugin_alert_result is not None: + result.update(strategy_plugin_alert_result.to_report_fields()) else: - result.update( - { - "strategy_plugin_alert_email_attempted_count": 0, - "strategy_plugin_alert_email_sent_count": 0, - "strategy_plugin_alert_email_skipped_count": 0, - "strategy_plugin_alert_email_failed_count": 0, - "strategy_plugin_alert_email_deliveries": [], - "strategy_plugin_alert_sms_attempted_count": 0, - "strategy_plugin_alert_sms_sent_count": 0, - "strategy_plugin_alert_sms_skipped_count": 0, - "strategy_plugin_alert_sms_failed_count": 0, - "strategy_plugin_alert_sms_deliveries": [], - } - ) - if strategy_plugin_alert_email_error: - result["strategy_plugin_alert_email_error"] = strategy_plugin_alert_email_error + result.update(empty_strategy_plugin_alert_report_fields()) + if strategy_plugin_alert_error: + result["strategy_plugin_alert_error"] = strategy_plugin_alert_error attach_strategy_plugin_result( result, signals=strategy_plugin_signals, diff --git a/pyproject.toml b/pyproject.toml index 584eb09..2b0c092 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,8 +14,8 @@ authors = [ ] dependencies = [ "firstrade==0.0.38", - "quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@d43800180aae1c7fe7051496a6af5d76f2c65879", - "us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@9661d8bb74e33466fa0ec1efef168b1d1bae8875", + "quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@532346b6e22d9f9c0a25c5e859f29084d7b27f6c", + "us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@7ccc46038e6f0fe84a6910a6fa0515252bf15b79", "google-cloud-storage", "requests", ] diff --git a/requirements.txt b/requirements.txt index bcacdd1..83f5373 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ flask gunicorn firstrade==0.0.38 -quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@d43800180aae1c7fe7051496a6af5d76f2c65879 -us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@9661d8bb74e33466fa0ec1efef168b1d1bae8875 +quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@532346b6e22d9f9c0a25c5e859f29084d7b27f6c +us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@7ccc46038e6f0fe84a6910a6fa0515252bf15b79 google-cloud-storage requests pytest diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index a58d0e0..4ff7e3c 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -233,19 +233,21 @@ def test_run_strategy_cycle_loads_strategy_plugin_report_and_sends_email( crisis_alert_email_sender_password="app-password", ) messages = [] - observed_email_alerts = [] - observed_sms_alerts = [] + observed_alerts = [] monkeypatch.setattr( "application.rebalance_service.load_strategy_runtime", lambda *_args, **_kwargs: FakeStrategyRuntime(), ) - def fake_email_publish(signals, **kwargs): - observed_email_alerts.append((tuple(signals), kwargs)) + def fake_dispatch(signals, **kwargs): + observed_alerts.append((tuple(signals), kwargs)) return SimpleNamespace( - sent_count=1, to_report_fields=lambda: { + "strategy_plugin_alert_attempted_count": 2, + "strategy_plugin_alert_sent_count": 2, + "strategy_plugin_alert_skipped_count": 0, + "strategy_plugin_alert_failed_count": 0, "strategy_plugin_alert_email_attempted_count": 1, "strategy_plugin_alert_email_sent_count": 1, "strategy_plugin_alert_email_skipped_count": 0, @@ -253,14 +255,6 @@ def fake_email_publish(signals, **kwargs): "strategy_plugin_alert_email_deliveries": [ {"subject": "Crisis plugin alert", "status": "sent"} ], - }, - ) - - def fake_sms_publish(signals, **kwargs): - observed_sms_alerts.append((tuple(signals), kwargs)) - return SimpleNamespace( - sent_count=1, - to_report_fields=lambda: { "strategy_plugin_alert_sms_attempted_count": 1, "strategy_plugin_alert_sms_sent_count": 1, "strategy_plugin_alert_sms_skipped_count": 0, @@ -271,8 +265,7 @@ def fake_sms_publish(signals, **kwargs): }, ) - monkeypatch.setattr("application.rebalance_service.publish_strategy_plugin_email_alerts", fake_email_publish) - monkeypatch.setattr("application.rebalance_service.publish_strategy_plugin_sms_alerts", fake_sms_publish) + monkeypatch.setattr("application.rebalance_service.dispatch_strategy_plugin_alerts", fake_dispatch) result = run_strategy_cycle( runtime_settings=settings, @@ -288,12 +281,11 @@ def fake_sms_publish(signals, **kwargs): assert result["strategy_plugin_lines"] == ( "🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend", ) - assert len(observed_email_alerts) == 1 - assert len(observed_sms_alerts) == 1 - assert observed_email_alerts[0][0][0].canonical_route == "true_crisis" - assert observed_sms_alerts[0][0][0].canonical_route == "true_crisis" - assert "firstrade" in observed_email_alerts[0][1]["context_label"] - assert "firstrade" in observed_sms_alerts[0][1]["context_label"] + assert len(observed_alerts) == 1 + assert observed_alerts[0][0][0].canonical_route == "true_crisis" + assert "firstrade" in observed_alerts[0][1]["context_label"] + assert observed_alerts[0][1]["notification_settings"] is settings + assert observed_alerts[0][1]["state_settings"] is not None assert result["strategy_plugin_alert_email_deliveries"][0]["status"] == "sent" assert result["strategy_plugin_alert_sms_deliveries"][0]["status"] == "sent" assert "🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend" in messages[0]