Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 41 additions & 89 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,10 @@
parse_strategy_plugin_mounts,
)
from quant_platform_kit.notifications.events import NotificationPublisher, RenderedNotification
from quant_platform_kit.notifications.strategy_plugin_email import (
StrategyPluginEmailAlertMarkerStore,
build_strategy_plugin_alert_context_label as build_email_alert_context_label,
publish_strategy_plugin_email_alerts,
)
from quant_platform_kit.notifications.strategy_plugin_sms import (
StrategyPluginSmsAlertMarkerStore,
publish_strategy_plugin_sms_alerts,
from quant_platform_kit.notifications.strategy_plugin_alerts import (
StrategyPluginAlertStateSettings,
build_strategy_plugin_alert_context_label as build_alert_context_label,
publish_strategy_plugin_alerts as dispatch_strategy_plugin_alerts,
)
from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs
from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings
Expand Down Expand Up @@ -216,7 +212,7 @@ def attach_strategy_plugin_result(


def build_strategy_plugin_alert_context_label(settings: PlatformRuntimeSettings) -> str:
return build_email_alert_context_label(
return build_alert_context_label(
platform_id="firstrade",
strategy_profile=settings.strategy_profile,
account_scope=settings.account_region or settings.account_prefix,
Expand All @@ -225,52 +221,21 @@ def build_strategy_plugin_alert_context_label(settings: PlatformRuntimeSettings)
)


def build_strategy_plugin_alert_store(
def build_strategy_plugin_alert_state_settings(
settings: PlatformRuntimeSettings,
*,
env_reader: Callable[[str, str | None], str | None] = os.getenv,
):
explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None)
report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None)
state_bucket = env_reader("FIRSTRADE_GCS_STATE_BUCKET", None)
state_prefix = env_reader("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform"
state_gcs_uri = f"gs://{state_bucket}/{state_prefix}" if state_bucket else None
return StrategyPluginEmailAlertMarkerStore(
local_dir=env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) or "/tmp/quant_strategy_plugin_alerts",
gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or state_gcs_uri,
return StrategyPluginAlertStateSettings.from_env(
env_reader=env_reader,
gcp_project_id=settings.project_id,
fallback_gcs_prefix_uri=state_gcs_uri,
)


def build_strategy_plugin_sms_alert_store(
settings: PlatformRuntimeSettings,
*,
env_reader: Callable[[str, str | None], str | None] = os.getenv,
):
explicit_gcs_uri = env_reader("STRATEGY_PLUGIN_ALERT_STATE_GCS_URI", None)
report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None)
state_bucket = env_reader("FIRSTRADE_GCS_STATE_BUCKET", None)
state_prefix = env_reader("FIRSTRADE_STATE_PREFIX", "firstrade-platform") or "firstrade-platform"
state_gcs_uri = f"gs://{state_bucket}/{state_prefix}" if state_bucket else None
return StrategyPluginSmsAlertMarkerStore(
local_dir=env_reader("STRATEGY_PLUGIN_ALERT_STATE_DIR", None) or "/tmp/quant_strategy_plugin_alerts",
gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri or state_gcs_uri,
gcp_project_id=settings.project_id,
)


class StrategyPluginAlertPublishResults:
def __init__(self, *, email_result, sms_result):
self.email_result = email_result
self.sms_result = sms_result

def to_report_fields(self) -> dict[str, Any]:
fields: dict[str, Any] = {}
fields.update(self.email_result.to_report_fields())
fields.update(self.sms_result.to_report_fields())
return fields


def publish_strategy_plugin_alerts(
signals,
*,
Expand All @@ -279,25 +244,34 @@ def publish_strategy_plugin_alerts(
log_message: Callable[..., Any] = print,
env_reader: Callable[[str, str | None], str | None] = os.getenv,
):
email_result = publish_strategy_plugin_email_alerts(
return dispatch_strategy_plugin_alerts(
signals,
email_settings=settings,
notification_settings=settings,
translator=translator,
strategy_label=settings.strategy_profile,
context_label=build_strategy_plugin_alert_context_label(settings),
alert_store=build_strategy_plugin_alert_store(settings, env_reader=env_reader),
state_settings=build_strategy_plugin_alert_state_settings(settings, env_reader=env_reader),
log_message=log_message,
)
sms_result = publish_strategy_plugin_sms_alerts(
signals,
sms_settings=settings,
translator=translator,
strategy_label=settings.strategy_profile,
context_label=build_strategy_plugin_alert_context_label(settings),
alert_store=build_strategy_plugin_sms_alert_store(settings, env_reader=env_reader),
log_message=log_message,
)
return StrategyPluginAlertPublishResults(email_result=email_result, sms_result=sms_result)


def empty_strategy_plugin_alert_report_fields() -> dict[str, Any]:
return {
"strategy_plugin_alert_attempted_count": 0,
"strategy_plugin_alert_sent_count": 0,
"strategy_plugin_alert_skipped_count": 0,
"strategy_plugin_alert_failed_count": 0,
"strategy_plugin_alert_email_attempted_count": 0,
"strategy_plugin_alert_email_sent_count": 0,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [],
"strategy_plugin_alert_sms_attempted_count": 0,
"strategy_plugin_alert_sms_sent_count": 0,
"strategy_plugin_alert_sms_skipped_count": 0,
"strategy_plugin_alert_sms_failed_count": 0,
"strategy_plugin_alert_sms_deliveries": [],
}


def _runtime_metadata_with_execution_policy(
Expand Down Expand Up @@ -423,34 +397,25 @@ def run_strategy_cycle(
}
],
"action_done": False,
"strategy_plugin_alert_email_attempted_count": 0,
"strategy_plugin_alert_email_sent_count": 0,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [],
"strategy_plugin_alert_sms_attempted_count": 0,
"strategy_plugin_alert_sms_sent_count": 0,
"strategy_plugin_alert_sms_skipped_count": 0,
"strategy_plugin_alert_sms_failed_count": 0,
"strategy_plugin_alert_sms_deliveries": [],
**empty_strategy_plugin_alert_report_fields(),
}
return attach_strategy_plugin_result(
result,
signals=strategy_plugin_signals,
error=strategy_plugin_error,
translator=translator,
)
strategy_plugin_alert_email_result = None
strategy_plugin_alert_email_error = None
strategy_plugin_alert_result = None
strategy_plugin_alert_error = None
try:
strategy_plugin_alert_email_result = publish_strategy_plugin_alerts(
strategy_plugin_alert_result = publish_strategy_plugin_alerts(
strategy_plugin_signals,
settings=settings,
translator=translator,
env_reader=env_reader,
)
except Exception as exc:
strategy_plugin_alert_email_error = f"{type(exc).__name__}: {exc}"
strategy_plugin_alert_error = f"{type(exc).__name__}: {exc}"
strategy_run_persisted = False
strategy_run_persistence_error = None
if persist_strategy_runs:
Expand Down Expand Up @@ -527,25 +492,12 @@ def run_strategy_cycle(
result["funding_blocked"] = True
if strategy_run_persistence_error:
result["strategy_run_persistence_error"] = strategy_run_persistence_error
if strategy_plugin_alert_email_result is not None:
result.update(strategy_plugin_alert_email_result.to_report_fields())
if strategy_plugin_alert_result is not None:
result.update(strategy_plugin_alert_result.to_report_fields())
else:
result.update(
{
"strategy_plugin_alert_email_attempted_count": 0,
"strategy_plugin_alert_email_sent_count": 0,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [],
"strategy_plugin_alert_sms_attempted_count": 0,
"strategy_plugin_alert_sms_sent_count": 0,
"strategy_plugin_alert_sms_skipped_count": 0,
"strategy_plugin_alert_sms_failed_count": 0,
"strategy_plugin_alert_sms_deliveries": [],
}
)
if strategy_plugin_alert_email_error:
result["strategy_plugin_alert_email_error"] = strategy_plugin_alert_email_error
result.update(empty_strategy_plugin_alert_report_fields())
if strategy_plugin_alert_error:
result["strategy_plugin_alert_error"] = strategy_plugin_alert_error
attach_strategy_plugin_result(
result,
signals=strategy_plugin_signals,
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ authors = [
]
dependencies = [
"firstrade==0.0.38",
"quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@d43800180aae1c7fe7051496a6af5d76f2c65879",
"us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@9661d8bb74e33466fa0ec1efef168b1d1bae8875",
"quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@532346b6e22d9f9c0a25c5e859f29084d7b27f6c",
"us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@7ccc46038e6f0fe84a6910a6fa0515252bf15b79",
"google-cloud-storage",
"requests",
]
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
flask
gunicorn
firstrade==0.0.38
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@d43800180aae1c7fe7051496a6af5d76f2c65879
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@9661d8bb74e33466fa0ec1efef168b1d1bae8875
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@532346b6e22d9f9c0a25c5e859f29084d7b27f6c
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@7ccc46038e6f0fe84a6910a6fa0515252bf15b79
google-cloud-storage
requests
pytest
34 changes: 13 additions & 21 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,34 +233,28 @@ def test_run_strategy_cycle_loads_strategy_plugin_report_and_sends_email(
crisis_alert_email_sender_password="app-password",
)
messages = []
observed_email_alerts = []
observed_sms_alerts = []
observed_alerts = []

monkeypatch.setattr(
"application.rebalance_service.load_strategy_runtime",
lambda *_args, **_kwargs: FakeStrategyRuntime(),
)

def fake_email_publish(signals, **kwargs):
observed_email_alerts.append((tuple(signals), kwargs))
def fake_dispatch(signals, **kwargs):
observed_alerts.append((tuple(signals), kwargs))
return SimpleNamespace(
sent_count=1,
to_report_fields=lambda: {
"strategy_plugin_alert_attempted_count": 2,
"strategy_plugin_alert_sent_count": 2,
"strategy_plugin_alert_skipped_count": 0,
"strategy_plugin_alert_failed_count": 0,
"strategy_plugin_alert_email_attempted_count": 1,
"strategy_plugin_alert_email_sent_count": 1,
"strategy_plugin_alert_email_skipped_count": 0,
"strategy_plugin_alert_email_failed_count": 0,
"strategy_plugin_alert_email_deliveries": [
{"subject": "Crisis plugin alert", "status": "sent"}
],
},
)

def fake_sms_publish(signals, **kwargs):
observed_sms_alerts.append((tuple(signals), kwargs))
return SimpleNamespace(
sent_count=1,
to_report_fields=lambda: {
"strategy_plugin_alert_sms_attempted_count": 1,
"strategy_plugin_alert_sms_sent_count": 1,
"strategy_plugin_alert_sms_skipped_count": 0,
Expand All @@ -271,8 +265,7 @@ def fake_sms_publish(signals, **kwargs):
},
)

monkeypatch.setattr("application.rebalance_service.publish_strategy_plugin_email_alerts", fake_email_publish)
monkeypatch.setattr("application.rebalance_service.publish_strategy_plugin_sms_alerts", fake_sms_publish)
monkeypatch.setattr("application.rebalance_service.dispatch_strategy_plugin_alerts", fake_dispatch)

result = run_strategy_cycle(
runtime_settings=settings,
Expand All @@ -288,12 +281,11 @@ def fake_sms_publish(signals, **kwargs):
assert result["strategy_plugin_lines"] == (
"🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend",
)
assert len(observed_email_alerts) == 1
assert len(observed_sms_alerts) == 1
assert observed_email_alerts[0][0][0].canonical_route == "true_crisis"
assert observed_sms_alerts[0][0][0].canonical_route == "true_crisis"
assert "firstrade" in observed_email_alerts[0][1]["context_label"]
assert "firstrade" in observed_sms_alerts[0][1]["context_label"]
assert len(observed_alerts) == 1
assert observed_alerts[0][0][0].canonical_route == "true_crisis"
assert "firstrade" in observed_alerts[0][1]["context_label"]
assert observed_alerts[0][1]["notification_settings"] is settings
assert observed_alerts[0][1]["state_settings"] is not None
assert result["strategy_plugin_alert_email_deliveries"][0]["status"] == "sent"
assert result["strategy_plugin_alert_sms_deliveries"][0]["status"] == "sent"
assert "🧩 Plugin: Crisis Watch Notice | status: true crisis | notice: defend" in messages[0]
Expand Down