diff --git a/application/rebalance_service.py b/application/rebalance_service.py index eae9674..3056cbc 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -8,6 +8,8 @@ from collections.abc import Mapping from datetime import datetime +from notifications.events import NotificationPublisher, RenderedNotification + _ZH_REASON_REPLACEMENTS = ( ("feature snapshot guard blocked execution", "特征快照校验阻止执行"), ("feature snapshot required", "需要特征快照"), @@ -303,6 +305,10 @@ def run_strategy( sleeper=_noop_sleep, ): print(with_prefix(f"[{datetime.now()}] Starting strategy..."), flush=True) + notification_publisher = NotificationPublisher( + log_message=lambda message: print(with_prefix(message), flush=True), + send_message=send_tg_message, + ) token = refresh_token_if_needed( fetch_token_from_secret(project_id, secret_name), @@ -636,8 +642,12 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): ) compact_lines.extend([separator, translator("order_logs_title"), formatted_logs]) compact_tg_message = "\n".join(compact_lines) - print(with_prefix(detailed_tg_message), flush=True) - send_tg_message(compact_tg_message) + notification_publisher.publish( + RenderedNotification( + detailed_text=detailed_tg_message, + compact_text=compact_tg_message, + ) + ) else: no_trade_lines = [ translator("heartbeat_title"), @@ -703,8 +713,12 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): compact_no_trade_lines.extend([separator, translator("notes_title")]) compact_no_trade_lines.extend(f" - {log}" for log in note_logs) compact_no_trade_message = "\n".join(compact_no_trade_lines) - print(with_prefix(no_trade_message), flush=True) - send_tg_message(compact_no_trade_message) + notification_publisher.publish( + RenderedNotification( + detailed_text=no_trade_message, + compact_text=compact_no_trade_message, + ) + ) def safe_quote_last_price(quote_context, symbol, *, fetch_last_price, notify_issue): diff --git a/main.py b/main.py index 628caca..49b0cb9 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ send_order_status_message as notifications_send_order_status_message, submit_order_with_alert as notifications_submit_order_with_alert, ) +from notifications.events import NotificationPublisher, RenderedNotification from notifications.telegram import ( build_issue_notifier, build_prefixer, @@ -131,6 +132,20 @@ def with_prefix(message: str) -> str: def send_tg_message(message): return build_sender(TG_TOKEN, TG_CHAT_ID, with_prefix_fn=with_prefix)(message) + +def publish_notification(*, detailed_text, compact_text): + publisher = NotificationPublisher( + log_message=lambda message: print(with_prefix(message), flush=True), + send_message=send_tg_message, + ) + publisher.publish( + RenderedNotification( + detailed_text=detailed_text, + compact_text=compact_text, + ) + ) + + def notify_issue(title, detail): return build_issue_notifier(with_prefix_fn=with_prefix, send_tg_message_fn=send_tg_message)(title, detail) @@ -447,8 +462,10 @@ def run_strategy(): error_message=str(exc), ) err = traceback.format_exc() - print(with_prefix(f"Strategy error:\n{err}"), flush=True) - send_tg_message(f"{t('error_title')}\n{err}") + publish_notification( + detailed_text=f"Strategy error:\n{err}", + compact_text=f"{t('error_title')}\n{err}", + ) finally: try: report_path = persist_execution_report(report) diff --git a/notifications/events.py b/notifications/events.py new file mode 100644 index 0000000..bab787d --- /dev/null +++ b/notifications/events.py @@ -0,0 +1,44 @@ +"""Notification event envelope and delivery helpers.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass + + +@dataclass(frozen=True) +class RenderedNotification: + """Rendered notification payload split by sink.""" + + detailed_text: str + compact_text: str + + +@dataclass(frozen=True) +class NotificationPublisher: + """Publish rendered notifications to the configured sinks.""" + + log_message: Callable[[str], None] + send_message: Callable[[str], None] + + def publish(self, notification: RenderedNotification) -> None: + publish_rendered_notification( + notification, + log_message=self.log_message, + send_message=self.send_message, + ) + + +def publish_rendered_notification( + notification: RenderedNotification, + *, + log_message: Callable[[str], None], + send_message: Callable[[str], None], +) -> None: + """Write the detailed log copy and send the compact user notification.""" + detailed = str(notification.detailed_text or "").strip() + compact = str(notification.compact_text or "").strip() + if detailed: + log_message(detailed) + if compact: + send_message(compact) diff --git a/notifications/telegram.py b/notifications/telegram.py index 5eb71be..0199a6a 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -4,6 +4,8 @@ import requests +from notifications.events import NotificationPublisher, RenderedNotification + SIGNAL_ICONS = { "hold": "💎", @@ -223,9 +225,18 @@ def send_tg_message(message): def build_issue_notifier(*, with_prefix_fn, send_tg_message_fn): + publisher = NotificationPublisher( + log_message=lambda message: print(with_prefix_fn(message), flush=True), + send_message=send_tg_message_fn, + ) + def notify_issue(title, detail): message = f"{title}\n{detail}" - print(with_prefix_fn(message), flush=True) - send_tg_message_fn(message) + publisher.publish( + RenderedNotification( + detailed_text=message, + compact_text=message, + ) + ) return notify_issue diff --git a/tests/test_notification_events.py b/tests/test_notification_events.py new file mode 100644 index 0000000..06bbe44 --- /dev/null +++ b/tests/test_notification_events.py @@ -0,0 +1,60 @@ +import unittest + +from notifications.events import ( + NotificationPublisher, + RenderedNotification, + publish_rendered_notification, +) + + +class NotificationEventsTests(unittest.TestCase): + def test_publish_rendered_notification_splits_log_and_send_sinks(self): + logged = [] + sent = [] + + publish_rendered_notification( + RenderedNotification( + detailed_text="detailed copy", + compact_text="compact copy", + ), + log_message=logged.append, + send_message=sent.append, + ) + + self.assertEqual(logged, ["detailed copy"]) + self.assertEqual(sent, ["compact copy"]) + + def test_publish_rendered_notification_skips_empty_sinks(self): + logged = [] + sent = [] + + publish_rendered_notification( + RenderedNotification(detailed_text=" ", compact_text=""), + log_message=logged.append, + send_message=sent.append, + ) + + self.assertEqual(logged, []) + self.assertEqual(sent, []) + + def test_notification_publisher_uses_configured_sinks(self): + logged = [] + sent = [] + publisher = NotificationPublisher( + log_message=logged.append, + send_message=sent.append, + ) + + publisher.publish( + RenderedNotification( + detailed_text="detailed copy", + compact_text="compact copy", + ) + ) + + self.assertEqual(logged, ["detailed copy"]) + self.assertEqual(sent, ["compact copy"]) + + +if __name__ == "__main__": + unittest.main()