diff --git a/pyproject.toml b/pyproject.toml index 864649e..6938cc1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "quant-platform-kit" -version = "0.7.25" +version = "0.7.26" description = "Shared broker adapters, domain models, execution ports, and notification utilities for QuantStrategyLab strategies." readme = "README.md" requires-python = ">=3.9" diff --git a/src/quant_platform_kit/notifications/__init__.py b/src/quant_platform_kit/notifications/__init__.py index 8a6174f..73b02fd 100644 --- a/src/quant_platform_kit/notifications/__init__.py +++ b/src/quant_platform_kit/notifications/__init__.py @@ -3,6 +3,12 @@ from .email import parse_email_recipients, send_smtp_email from .events import NotificationPublisher, RenderedNotification, publish_rendered_notification from .sms import normalize_sms_recipient, parse_sms_recipients, send_twilio_sms +from .strategy_plugin_alerts import ( + StrategyPluginAlertChannelStores, + StrategyPluginAlertPublishResult, + StrategyPluginAlertStateSettings, + publish_strategy_plugin_alerts, +) from .strategy_plugin_email import ( StrategyPluginEmailAlertDelivery, StrategyPluginEmailAlertMarkerStore, @@ -22,6 +28,9 @@ __all__ = [ "NotificationPublisher", "RenderedNotification", + "StrategyPluginAlertChannelStores", + "StrategyPluginAlertPublishResult", + "StrategyPluginAlertStateSettings", "StrategyPluginEmailAlertDelivery", "StrategyPluginEmailAlertMarkerStore", "StrategyPluginEmailAlertPublishResult", @@ -35,6 +44,7 @@ "parse_email_recipients", "parse_sms_recipients", "publish_rendered_notification", + "publish_strategy_plugin_alerts", "publish_strategy_plugin_email_alerts", "publish_strategy_plugin_sms_alerts", "send_smtp_email", diff --git a/src/quant_platform_kit/notifications/strategy_plugin_alerts.py b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py new file mode 100644 index 0000000..3d02d64 --- /dev/null +++ b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py @@ -0,0 +1,235 @@ +"""Channel dispatcher for strategy plugin alerts.""" + +from __future__ import annotations + +import os +from collections.abc import Callable, Mapping, Sequence +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .email import send_smtp_email +from .sms import send_twilio_sms +from .strategy_plugin_email import ( + StrategyPluginEmailAlertMarkerStore, + StrategyPluginEmailAlertPublishResult, + StrategyPluginEmailSettings, + build_strategy_plugin_alert_context_label, + publish_strategy_plugin_email_alerts, +) +from .strategy_plugin_sms import ( + StrategyPluginSmsAlertMarkerStore, + StrategyPluginSmsAlertPublishResult, + StrategyPluginSmsSettings, + publish_strategy_plugin_sms_alerts, +) + +_DEFAULT_ALERT_STATE_DIR = "/tmp/quant_strategy_plugin_alerts" +_CHANNEL_EMAIL = "email" +_CHANNEL_SMS = "sms" +_SUPPORTED_CHANNELS = frozenset({_CHANNEL_EMAIL, _CHANNEL_SMS}) + + +@dataclass(frozen=True) +class StrategyPluginAlertChannelStores: + """Marker stores used by each alert channel.""" + + email: StrategyPluginEmailAlertMarkerStore | object | None = None + sms: StrategyPluginSmsAlertMarkerStore | object | None = None + + @classmethod + def from_mapping( + cls, + value: Mapping[str, object | None] | None, + ) -> "StrategyPluginAlertChannelStores": + if value is None: + return cls() + return cls(email=value.get(_CHANNEL_EMAIL), sms=value.get(_CHANNEL_SMS)) + + +@dataclass(frozen=True) +class StrategyPluginAlertStateSettings: + """Shared marker-store location for strategy plugin alert channels.""" + + local_dir: str | Path | None = _DEFAULT_ALERT_STATE_DIR + gcs_prefix_uri: str | None = None + gcp_project_id: str | None = None + client_factory: Any = None + + @classmethod + def from_env( + cls, + *, + env_reader: Callable[[str, str | None], str | None] = os.getenv, + gcp_project_id: str | None = None, + fallback_gcs_prefix_uri: str | None = None, + default_local_dir: str | Path | None = _DEFAULT_ALERT_STATE_DIR, + ) -> "StrategyPluginAlertStateSettings": + explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None) + report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None) + local_dir = env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) + return cls( + local_dir=local_dir or default_local_dir, + gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or fallback_gcs_prefix_uri, + gcp_project_id=gcp_project_id, + ) + + def build_channel_stores(self) -> StrategyPluginAlertChannelStores: + return StrategyPluginAlertChannelStores( + email=StrategyPluginEmailAlertMarkerStore( + local_dir=self.local_dir, + gcs_prefix_uri=self.gcs_prefix_uri, + gcp_project_id=self.gcp_project_id, + client_factory=self.client_factory, + ), + sms=StrategyPluginSmsAlertMarkerStore( + local_dir=self.local_dir, + gcs_prefix_uri=self.gcs_prefix_uri, + gcp_project_id=self.gcp_project_id, + client_factory=self.client_factory, + ), + ) + + +@dataclass(frozen=True) +class StrategyPluginAlertPublishResult: + """Combined delivery result across strategy plugin alert channels.""" + + email_result: StrategyPluginEmailAlertPublishResult | None = None + sms_result: StrategyPluginSmsAlertPublishResult | None = None + + @property + def attempted_count(self) -> int: + return sum(result.attempted_count for result in self._results()) + + @property + def sent_count(self) -> int: + return sum(result.sent_count for result in self._results()) + + @property + def skipped_count(self) -> int: + return sum(result.skipped_count for result in self._results()) + + @property + def failed_count(self) -> int: + return sum(result.failed_count for result in self._results()) + + def to_report_fields(self) -> dict[str, Any]: + fields: dict[str, Any] = { + "strategy_plugin_alert_attempted_count": self.attempted_count, + "strategy_plugin_alert_sent_count": self.sent_count, + "strategy_plugin_alert_skipped_count": self.skipped_count, + "strategy_plugin_alert_failed_count": self.failed_count, + } + if self.email_result is not None: + fields.update(self.email_result.to_report_fields()) + if self.sms_result is not None: + fields.update(self.sms_result.to_report_fields()) + return fields + + def to_summary_fields(self) -> dict[str, int]: + fields = { + "strategy_plugin_alert_sent_count": self.sent_count, + } + if self.email_result is not None: + fields["strategy_plugin_alert_email_sent_count"] = self.email_result.sent_count + if self.sms_result is not None: + fields["strategy_plugin_alert_sms_sent_count"] = self.sms_result.sent_count + return fields + + def attach_to_report(self, report: dict[str, Any]) -> None: + report.setdefault("summary", {}).update(self.to_summary_fields()) + report.setdefault("diagnostics", {}).update(self.to_report_fields()) + + def _results( + self, + ) -> tuple[StrategyPluginEmailAlertPublishResult | StrategyPluginSmsAlertPublishResult, ...]: + return tuple( + result + for result in (self.email_result, self.sms_result) + if result is not None + ) + + +def publish_strategy_plugin_alerts( + signals: Sequence[object], + *, + notification_settings: StrategyPluginEmailSettings | StrategyPluginSmsSettings | object, + translator: Callable[..., str] | None = None, + strategy_label: str | None = None, + context_label: str | None = None, + channels: Sequence[str] | str = (_CHANNEL_EMAIL, _CHANNEL_SMS), + state_settings: StrategyPluginAlertStateSettings | None = None, + alert_stores: StrategyPluginAlertChannelStores | Mapping[str, object | None] | None = None, + send_email_notification: Callable[..., bool] = send_smtp_email, + send_sms_notification: Callable[..., bool] = send_twilio_sms, + log_message: Callable[..., Any] = print, +) -> StrategyPluginAlertPublishResult: + """Publish strategy plugin alerts through the configured notification channels.""" + + selected_channels = _normalize_channels(channels) + stores = _resolve_alert_stores(alert_stores=alert_stores, state_settings=state_settings) + email_result = None + sms_result = None + if _CHANNEL_EMAIL in selected_channels: + email_result = publish_strategy_plugin_email_alerts( + signals, + email_settings=notification_settings, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_store=stores.email, + send_notification=send_email_notification, + log_message=log_message, + ) + if _CHANNEL_SMS in selected_channels: + sms_result = publish_strategy_plugin_sms_alerts( + signals, + sms_settings=notification_settings, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_store=stores.sms, + send_notification=send_sms_notification, + log_message=log_message, + ) + return StrategyPluginAlertPublishResult( + email_result=email_result, + sms_result=sms_result, + ) + + +def _resolve_alert_stores( + *, + alert_stores: StrategyPluginAlertChannelStores | Mapping[str, object | None] | None, + state_settings: StrategyPluginAlertStateSettings | None, +) -> StrategyPluginAlertChannelStores: + if isinstance(alert_stores, StrategyPluginAlertChannelStores): + return alert_stores + if isinstance(alert_stores, Mapping): + return StrategyPluginAlertChannelStores.from_mapping(alert_stores) + return (state_settings or StrategyPluginAlertStateSettings.from_env()).build_channel_stores() + + +def _normalize_channels(channels: Sequence[str] | str) -> tuple[str, ...]: + raw_channels = (channels,) if isinstance(channels, str) else tuple(channels) + normalized: list[str] = [] + for channel in raw_channels: + name = str(channel or "").strip().lower() + if not name: + continue + if name not in _SUPPORTED_CHANNELS: + supported = ", ".join(sorted(_SUPPORTED_CHANNELS)) + raise ValueError(f"unsupported strategy plugin alert channel {name!r}; expected one of: {supported}") + if name not in normalized: + normalized.append(name) + return tuple(normalized) + + +__all__ = [ + "StrategyPluginAlertChannelStores", + "StrategyPluginAlertPublishResult", + "StrategyPluginAlertStateSettings", + "build_strategy_plugin_alert_context_label", + "publish_strategy_plugin_alerts", +] diff --git a/tests/test_strategy_plugin_alert_dispatcher.py b/tests/test_strategy_plugin_alert_dispatcher.py new file mode 100644 index 0000000..ae0bdc4 --- /dev/null +++ b/tests/test_strategy_plugin_alert_dispatcher.py @@ -0,0 +1,176 @@ +import tempfile +import unittest +from types import SimpleNamespace + +from quant_platform_kit.notifications.strategy_plugin_alerts import ( + StrategyPluginAlertStateSettings, + publish_strategy_plugin_alerts, +) +from quant_platform_kit.notifications.strategy_plugin_email import StrategyPluginEmailSettings +from quant_platform_kit.notifications.strategy_plugin_sms import StrategyPluginSmsSettings + + +def _alert_signal(): + return SimpleNamespace( + strategy="tqqq_growth_income", + plugin="crisis_response_shadow", + effective_mode="shadow", + as_of="2026-05-24", + canonical_route="true_crisis", + suggested_action="defend", + would_trade_if_enabled=True, + ) + + +class _NotificationSettings: + crisis_alert_email_recipients = "risk@example.com" + crisis_alert_email_sender_email = "bot@example.com" + crisis_alert_email_sender_password = "app-password" + crisis_alert_sms_recipients = "+15165480265" + crisis_alert_sms_account_id = "AC123" + crisis_alert_sms_auth_token = "secret" + crisis_alert_sms_sender = "+15551234567" + + +class StrategyPluginAlertDispatcherTests(unittest.TestCase): + def test_publish_strategy_plugin_alerts_dispatches_enabled_channels(self): + emails = [] + sms_messages = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=_NotificationSettings(), + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **kwargs: emails.append(kwargs) or True, + send_sms_notification=lambda **kwargs: sms_messages.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertEqual(result.sent_count, 2) + self.assertEqual(result.failed_count, 0) + self.assertIsNotNone(result.email_result) + self.assertIsNotNone(result.sms_result) + self.assertEqual(result.email_result.sent_count, 1) + self.assertEqual(result.sms_result.sent_count, 1) + self.assertEqual(emails[0]["recipients"], ("risk@example.com",)) + self.assertEqual(sms_messages[0]["recipients"], ("+15165480265",)) + + def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(self): + settings = _NotificationSettings() + + with tempfile.TemporaryDirectory() as tmp_dir: + state_settings = StrategyPluginAlertStateSettings(local_dir=tmp_dir) + first = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=settings, + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=state_settings, + send_email_notification=lambda **_kwargs: True, + send_sms_notification=lambda **_kwargs: True, + log_message=lambda *_args, **_kwargs: None, + ) + second = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=settings, + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=state_settings, + send_email_notification=lambda **_kwargs: True, + send_sms_notification=lambda **_kwargs: True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertEqual(first.sent_count, 2) + self.assertEqual(second.sent_count, 0) + self.assertEqual(second.skipped_count, 2) + self.assertIsNotNone(second.email_result) + self.assertEqual(second.email_result.deliveries[0].reason, "duplicate_alert") + self.assertIsNotNone(second.sms_result) + self.assertEqual(second.sms_result.deliveries[0].reason, "duplicate_alert") + + def test_publish_strategy_plugin_alerts_can_target_one_channel(self): + sms_messages = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=StrategyPluginSmsSettings( + recipients=("+15165480265",), + account_id="AC123", + auth_token="secret", + sender="+15551234567", + ), + channels=("sms",), + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **_kwargs: self.fail("email should not run"), + send_sms_notification=lambda **kwargs: sms_messages.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertIsNone(result.email_result) + self.assertIsNotNone(result.sms_result) + self.assertEqual(result.sent_count, 1) + self.assertTrue(sms_messages) + + def test_publish_strategy_plugin_alerts_attach_to_report(self): + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=StrategyPluginEmailSettings( + recipients=("risk@example.com",), + sender_email="bot@example.com", + sender_password="app-password", + ), + channels=("email",), + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **_kwargs: True, + log_message=lambda *_args, **_kwargs: None, + ) + report = {} + + result.attach_to_report(report) + + self.assertEqual(report["summary"]["strategy_plugin_alert_sent_count"], 1) + self.assertEqual(report["summary"]["strategy_plugin_alert_email_sent_count"], 1) + self.assertEqual(report["diagnostics"]["strategy_plugin_alert_sent_count"], 1) + self.assertEqual(report["diagnostics"]["strategy_plugin_alert_email_sent_count"], 1) + self.assertNotIn("strategy_plugin_alert_sms_sent_count", report["summary"]) + + def test_publish_strategy_plugin_alerts_rejects_unknown_channel(self): + with tempfile.TemporaryDirectory() as tmp_dir: + with self.assertRaisesRegex(ValueError, "unsupported strategy plugin alert channel"): + publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=_NotificationSettings(), + channels=("pager",), + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + log_message=lambda *_args, **_kwargs: None, + ) + + def test_strategy_plugin_alert_state_settings_reads_env_with_fallback(self): + values = { + "STRATEGY_PLUGIN_ALERT_STATE_DIR": "/tmp/custom-alerts", + "EXECUTION_REPORT_GCS_URI": "gs://reports/runtime", + } + + settings = StrategyPluginAlertStateSettings.from_env( + env_reader=lambda name, default=None: values.get(name, default), + gcp_project_id="project-a", + fallback_gcs_prefix_uri="gs://state/fallback", + ) + + self.assertEqual(settings.local_dir, "/tmp/custom-alerts") + self.assertEqual(settings.gcs_prefix_uri, "gs://reports/runtime") + self.assertEqual(settings.gcp_project_id, "project-a") + + +if __name__ == "__main__": + unittest.main()