Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/sync-cloud-run-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ jobs:
FIRSTRADE_STATE_PREFIX: ${{ vars.FIRSTRADE_STATE_PREFIX }}
FIRSTRADE_STRATEGY_CONFIG_PATH: ${{ vars.FIRSTRADE_STRATEGY_CONFIG_PATH }}
FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON: ${{ vars.FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON }}
FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI }}
FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI }}
FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI }}
FIRSTRADE_MARKET_SIGNAL_CACHE_DIR: ${{ vars.FIRSTRADE_MARKET_SIGNAL_CACHE_DIR }}
FIRSTRADE_MARKET_SIGNAL_REQUIRED: ${{ vars.FIRSTRADE_MARKET_SIGNAL_REQUIRED }}
STRATEGY_PLUGIN_ALERT_CHANNELS: ${{ vars.STRATEGY_PLUGIN_ALERT_CHANNELS }}
STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS: ${{ vars.STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS }}
STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL: ${{ vars.STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL }}
Expand Down Expand Up @@ -537,6 +542,11 @@ jobs:
add_optional_env FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH
add_optional_env FIRSTRADE_STRATEGY_CONFIG_PATH
add_optional_env FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON
add_optional_env FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI
add_optional_env FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI
add_optional_env FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI
add_optional_env FIRSTRADE_MARKET_SIGNAL_CACHE_DIR
add_optional_env FIRSTRADE_MARKET_SIGNAL_REQUIRED
add_optional_env STRATEGY_PLUGIN_ALERT_CHANNELS
add_optional_env STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS
add_optional_env STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL
Expand Down
21 changes: 20 additions & 1 deletion application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
)
from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs
from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings
from market_signal_runtime import resolve_external_market_signal_inputs

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include the market signal module in packaged installs

When this project is installed from its pyproject.toml rather than run directly from a source checkout, this new import cannot resolve because [tool.setuptools].py-modules still omits the new top-level market_signal_runtime.py. Any packaged/buildpack-style deployment or downstream pip install . consumer that imports application.rebalance_service or strategy_runtime will fail at startup with ModuleNotFoundError; add the module to the setuptools module list or package it another way.

Useful? React with 👍 / 👎.

from strategy_runtime import load_strategy_runtime

LIMIT_SELL_DISCOUNT = 0.995
Expand Down Expand Up @@ -118,15 +119,30 @@ def build_market_inputs(
market_data_port,
benchmark_symbol: str,
strategy_runtime_config: Mapping[str, Any],
strategy_profile: str | None = None,
runtime_settings: PlatformRuntimeSettings | None = None,
log_message: Callable[[str], None] = print,
) -> dict[str, Any]:
inputs: dict[str, Any] = {}
if runtime_settings is not None and strategy_profile is not None:
inputs.update(
resolve_external_market_signal_inputs(
strategy_profile=strategy_profile,
available_inputs=available_inputs,
runtime_settings=runtime_settings,
logger=log_message,
)
)
if "market_history" in available_inputs:
inputs["market_history"] = _build_market_history_loader(market_data_port)
if "benchmark_history" in available_inputs:
inputs["benchmark_history"] = _build_price_history(market_data_port, benchmark_symbol)
if "qqq_history" in available_inputs:
inputs["qqq_history"] = _build_price_history(market_data_port, benchmark_symbol)
if "derived_indicators" in available_inputs or "indicators" in available_inputs:
if (
("derived_indicators" in available_inputs and "derived_indicators" not in inputs)
or ("indicators" in available_inputs and "indicators" not in inputs)
):
indicators = _build_derived_indicators(
market_data_port,
trend_ma_window=int(strategy_runtime_config.get("trend_ma_window", 150)),
Expand Down Expand Up @@ -343,6 +359,9 @@ def run_strategy_cycle(
market_data_port=market_data_port,
benchmark_symbol=benchmark_symbol,
strategy_runtime_config=strategy_runtime.merged_runtime_config,
strategy_profile=settings.strategy_profile,
runtime_settings=settings,
log_message=log_message,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Pass a defined logger into market input building

In run_strategy_cycle this keyword argument is evaluated before build_market_inputs is called, but log_message is not defined in this function or module scope. Any strategy cycle that reaches market input construction will raise NameError, including profiles that do not use market signals, so scheduled/live rebalance runs stop before evaluation. Use an existing logger such as lambda message: print(message, flush=True) or add a log_message parameter.

Useful? React with 👍 / 👎.

)
evaluation_inputs = build_strategy_evaluation_inputs(
available_inputs=available_inputs,
Expand Down
97 changes: 97 additions & 0 deletions market_signal_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from datetime import date, datetime
from pathlib import Path
from typing import Any, Callable, Iterable

from us_equity_strategies.signals import (
IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER,
MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT,
MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF,
MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX,
extract_consumer_market_signal_inputs_from_reference,
)


IBIT_SMART_DCA_PROFILE = "ibit_smart_dca"
DEFAULT_MARKET_SIGNAL_CACHE_DIR = "/tmp/quant-platform-market-signals"


def resolve_external_market_signal_inputs(
*,
strategy_profile: str,
available_inputs: Iterable[str],
runtime_settings: Any,
as_of: Any = None,
logger: Callable[[str], None] = print,
client_factory: Any = None,
) -> dict[str, Any]:
if str(strategy_profile or "").strip().lower() != IBIT_SMART_DCA_PROFILE:
return {}
if "derived_indicators" not in {str(item) for item in available_inputs or ()}:
return {}

reference_type, reference = _market_signal_reference(runtime_settings)
if reference is None:
if bool(getattr(runtime_settings, "market_signal_required", False)):
raise RuntimeError("IBIT external market signal is required but no signal reference is configured")
return {"derived_indicators": {}}

market_inputs, metadata = extract_consumer_market_signal_inputs_from_reference(
reference,
reference_type=reference_type,
consumer=IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER,
cache_dir=_market_signal_cache_dir(runtime_settings),
as_of=_market_signal_as_of(as_of),
client_factory=client_factory,
)
logger(
"market_signal_inputs_loaded | "
f"profile={strategy_profile} reference_type={metadata.get('reference_type')} "
f"source_uri={metadata.get('source_uri') or reference} "
f"materialized_count={metadata.get('materialized_count')}"
)
return dict(market_inputs)


def _market_signal_reference(runtime_settings: Any) -> tuple[str, str | None]:
consumption_audit_uri = _optional_string(
getattr(runtime_settings, "market_signal_consumption_audit_uri", None)
)
if consumption_audit_uri:
return MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT, consumption_audit_uri

handoff_manifest_uri = _optional_string(
getattr(runtime_settings, "market_signal_handoff_manifest_uri", None)
)
if handoff_manifest_uri:
return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF, handoff_manifest_uri

handoff_index_uri = _optional_string(
getattr(runtime_settings, "market_signal_handoff_index_uri", None)
)
if handoff_index_uri:
return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, handoff_index_uri

return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, None


def _market_signal_cache_dir(runtime_settings: Any) -> Path:
configured = _optional_string(getattr(runtime_settings, "market_signal_cache_dir", None))
return Path(configured or DEFAULT_MARKET_SIGNAL_CACHE_DIR)


def _market_signal_as_of(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.date().isoformat()
if isinstance(value, date):
return value.isoformat()
text = str(value).strip()
return text[:10] if text else None


def _optional_string(value: Any) -> str | None:
text = str(value or "").strip()
return text or None
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ authors = [
]
dependencies = [
"firstrade==0.0.39",
"quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@b846c9d777a450e95d23c264853997d671f47dd9",
"us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@031a5bbd5a5ec64a57225a2b24f6569359dc9b11",
"quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@6a273601b5213844168d4642a7f3c9c4ed3ce162",
"us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@3b557b349566caa14b5630aea614fd10a0310f1d",
"google-cloud-storage",
"requests",
]
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
flask
gunicorn
firstrade==0.0.39
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@b846c9d777a450e95d23c264853997d671f47dd9
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@031a5bbd5a5ec64a57225a2b24f6569359dc9b11
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@6a273601b5213844168d4642a7f3c9c4ed3ce162
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@3b557b349566caa14b5630aea614fd10a0310f1d
google-cloud-storage
google-auth
requests
Expand Down
28 changes: 28 additions & 0 deletions runtime_config_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class PlatformRuntimeSettings:
dca_mode: str | None = None
dca_base_investment_usd: float | None = None
runtime_execution_window_trading_days: int | None = None
market_signal_handoff_index_uri: str | None = None
market_signal_handoff_manifest_uri: str | None = None
market_signal_consumption_audit_uri: str | None = None
market_signal_cache_dir: str | None = None
market_signal_required: bool = False
feature_snapshot_path: str | None = None
feature_snapshot_manifest_path: str | None = None
strategy_config_path: str | None = None
Expand Down Expand Up @@ -179,6 +184,29 @@ def load_platform_runtime_settings(
runtime_execution_window_trading_days=_runtime_execution_window_trading_days_env(
strategy_definition.profile
),
market_signal_handoff_index_uri=_first_non_empty(
os.getenv("FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI"),
os.getenv("MARKET_SIGNAL_HANDOFF_INDEX_URI"),
),
market_signal_handoff_manifest_uri=_first_non_empty(
os.getenv("FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI"),
os.getenv("MARKET_SIGNAL_HANDOFF_MANIFEST_URI"),
),
market_signal_consumption_audit_uri=_first_non_empty(
os.getenv("FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI"),
os.getenv("MARKET_SIGNAL_CONSUMPTION_AUDIT_URI"),
),
market_signal_cache_dir=_first_non_empty(
os.getenv("FIRSTRADE_MARKET_SIGNAL_CACHE_DIR"),
os.getenv("MARKET_SIGNAL_CACHE_DIR"),
),
market_signal_required=resolve_bool_value(
_first_non_empty(
os.getenv("FIRSTRADE_MARKET_SIGNAL_REQUIRED"),
os.getenv("MARKET_SIGNAL_REQUIRED"),
"false",
)
),
feature_snapshot_path=runtime_paths.feature_snapshot_path,
feature_snapshot_manifest_path=runtime_paths.feature_snapshot_manifest_path,
strategy_config_path=runtime_paths.strategy_config_path,
Expand Down
13 changes: 12 additions & 1 deletion strategy_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
build_strategy_context_from_available_inputs,
)
from runtime_config_support import PlatformRuntimeSettings
from market_signal_runtime import resolve_external_market_signal_inputs
from strategy_loader import (
load_strategy_entrypoint_for_profile,
load_strategy_runtime_adapter_for_profile,
Expand Down Expand Up @@ -76,11 +77,21 @@ def evaluate(
)

as_of = datetime.now(timezone.utc)
resolved_available_inputs = dict(available_inputs)
resolved_available_inputs.update(
resolve_external_market_signal_inputs(
strategy_profile=self.profile,
available_inputs=self.runtime_adapter.available_inputs or self.entrypoint.manifest.required_inputs,
runtime_settings=self.runtime_settings,
as_of=as_of,
logger=self.logger,
)
)
ctx = build_strategy_context_from_available_inputs(
entrypoint=self.entrypoint,
runtime_adapter=self.runtime_adapter,
as_of=as_of,
available_inputs=available_inputs,
available_inputs=resolved_available_inputs,
runtime_config=runtime_config,
)
decision = self.entrypoint.evaluate(ctx)
Expand Down
82 changes: 82 additions & 0 deletions tests/test_market_signal_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

from datetime import datetime, timezone
from types import SimpleNamespace

import pytest

import market_signal_runtime


def test_non_ibit_profile_does_not_load_market_signal():
settings = SimpleNamespace(market_signal_required=True)

assert (
market_signal_runtime.resolve_external_market_signal_inputs(
strategy_profile="soxl_soxx_trend_income",
available_inputs={"derived_indicators"},
runtime_settings=settings,
)
== {}
)


def test_ibit_without_reference_provides_empty_indicator_input():
settings = SimpleNamespace(market_signal_required=False)

assert market_signal_runtime.resolve_external_market_signal_inputs(
strategy_profile="ibit_smart_dca",
available_inputs={"derived_indicators"},
runtime_settings=settings,
) == {"derived_indicators": {}}


def test_ibit_required_reference_missing_raises():
settings = SimpleNamespace(market_signal_required=True)

with pytest.raises(RuntimeError, match="external market signal is required"):
market_signal_runtime.resolve_external_market_signal_inputs(
strategy_profile="ibit_smart_dca",
available_inputs={"derived_indicators"},
runtime_settings=settings,
)


def test_ibit_handoff_index_reference_is_extracted(monkeypatch, tmp_path):
calls: dict[str, object] = {}

def fake_extract(reference, *, reference_type, consumer, cache_dir, as_of, client_factory=None):
calls["extract"] = (reference, reference_type, consumer, cache_dir, as_of, client_factory)
return {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}}, {
"reference_type": reference_type,
"source_uri": reference,
"materialized_count": 2,
}

monkeypatch.setattr(
market_signal_runtime,
"extract_consumer_market_signal_inputs_from_reference",
fake_extract,
)
settings = SimpleNamespace(
market_signal_handoff_index_uri="gs://signals/platform_handoffs/index.json",
market_signal_cache_dir=str(tmp_path),
market_signal_required=False,
)

assert market_signal_runtime.resolve_external_market_signal_inputs(
strategy_profile="ibit_smart_dca",
available_inputs={"derived_indicators"},
runtime_settings=settings,
as_of=datetime(2026, 6, 19, tzinfo=timezone.utc),
logger=lambda _message: None,
client_factory=object,
) == {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}}
assert calls["extract"] == (
"gs://signals/platform_handoffs/index.json",
"platform_handoff_index",
"us_equity:ibit_smart_dca",
tmp_path,
"2026-06-19",
object,
)
5 changes: 5 additions & 0 deletions tests/test_sync_cloud_run_env_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def test_sync_cloud_run_env_workflow_syncs_strategy_plugin_alert_settings():
"INCOME_LAYER_ENABLED",
"INCOME_LAYER_START_USD",
"INCOME_LAYER_MAX_RATIO",
"FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI",
"FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI",
"FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI",
"FIRSTRADE_MARKET_SIGNAL_CACHE_DIR",
"FIRSTRADE_MARKET_SIGNAL_REQUIRED",
):
assert f"{name}: ${{{{ vars.{name} }}}}" in workflow
assert f"add_optional_env {name}" in workflow
Expand Down
Loading