From ad36dfcf2059fa3dfc05ebd13d29b3d43a7d9af0 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Sat, 20 Jun 2026 01:34:08 +0800 Subject: [PATCH] Add IBIT market signal runtime input support --- .github/workflows/sync-cloud-run-env.yml | 10 +++ application/rebalance_service.py | 21 ++++- market_signal_runtime.py | 97 +++++++++++++++++++++++ pyproject.toml | 4 +- requirements.txt | 4 +- runtime_config_support.py | 28 +++++++ strategy_runtime.py | 13 ++- tests/test_market_signal_runtime.py | 82 +++++++++++++++++++ tests/test_sync_cloud_run_env_workflow.py | 5 ++ 9 files changed, 258 insertions(+), 6 deletions(-) create mode 100644 market_signal_runtime.py create mode 100644 tests/test_market_signal_runtime.py diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index b06f78e..3c0cfbe 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -73,6 +73,11 @@ jobs: FIRSTRADE_STATE_PREFIX: ${{ vars.FIRSTRADE_STATE_PREFIX }} FIRSTRADE_STRATEGY_CONFIG_PATH: ${{ vars.FIRSTRADE_STRATEGY_CONFIG_PATH }} FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON: ${{ vars.FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON }} + FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI }} + FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI }} + FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI: ${{ vars.FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI }} + FIRSTRADE_MARKET_SIGNAL_CACHE_DIR: ${{ vars.FIRSTRADE_MARKET_SIGNAL_CACHE_DIR }} + FIRSTRADE_MARKET_SIGNAL_REQUIRED: ${{ vars.FIRSTRADE_MARKET_SIGNAL_REQUIRED }} STRATEGY_PLUGIN_ALERT_CHANNELS: ${{ vars.STRATEGY_PLUGIN_ALERT_CHANNELS }} STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS: ${{ vars.STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS }} STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL: ${{ vars.STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL }} @@ -537,6 +542,11 @@ jobs: add_optional_env FIRSTRADE_FEATURE_SNAPSHOT_MANIFEST_PATH add_optional_env FIRSTRADE_STRATEGY_CONFIG_PATH add_optional_env FIRSTRADE_STRATEGY_PLUGIN_MOUNTS_JSON + add_optional_env FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI + add_optional_env FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI + add_optional_env FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI + add_optional_env FIRSTRADE_MARKET_SIGNAL_CACHE_DIR + add_optional_env FIRSTRADE_MARKET_SIGNAL_REQUIRED add_optional_env STRATEGY_PLUGIN_ALERT_CHANNELS add_optional_env STRATEGY_PLUGIN_ALERT_EMAIL_RECIPIENTS add_optional_env STRATEGY_PLUGIN_ALERT_EMAIL_SENDER_EMAIL diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 744b8f0..f802610 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -61,6 +61,7 @@ ) from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings +from market_signal_runtime import resolve_external_market_signal_inputs from strategy_runtime import load_strategy_runtime LIMIT_SELL_DISCOUNT = 0.995 @@ -118,15 +119,30 @@ def build_market_inputs( market_data_port, benchmark_symbol: str, strategy_runtime_config: Mapping[str, Any], + strategy_profile: str | None = None, + runtime_settings: PlatformRuntimeSettings | None = None, + log_message: Callable[[str], None] = print, ) -> dict[str, Any]: inputs: dict[str, Any] = {} + if runtime_settings is not None and strategy_profile is not None: + inputs.update( + resolve_external_market_signal_inputs( + strategy_profile=strategy_profile, + available_inputs=available_inputs, + runtime_settings=runtime_settings, + logger=log_message, + ) + ) if "market_history" in available_inputs: inputs["market_history"] = _build_market_history_loader(market_data_port) if "benchmark_history" in available_inputs: inputs["benchmark_history"] = _build_price_history(market_data_port, benchmark_symbol) if "qqq_history" in available_inputs: inputs["qqq_history"] = _build_price_history(market_data_port, benchmark_symbol) - if "derived_indicators" in available_inputs or "indicators" in available_inputs: + if ( + ("derived_indicators" in available_inputs and "derived_indicators" not in inputs) + or ("indicators" in available_inputs and "indicators" not in inputs) + ): indicators = _build_derived_indicators( market_data_port, trend_ma_window=int(strategy_runtime_config.get("trend_ma_window", 150)), @@ -343,6 +359,9 @@ def run_strategy_cycle( market_data_port=market_data_port, benchmark_symbol=benchmark_symbol, strategy_runtime_config=strategy_runtime.merged_runtime_config, + strategy_profile=settings.strategy_profile, + runtime_settings=settings, + log_message=log_message, ) evaluation_inputs = build_strategy_evaluation_inputs( available_inputs=available_inputs, diff --git a/market_signal_runtime.py b/market_signal_runtime.py new file mode 100644 index 0000000..7225378 --- /dev/null +++ b/market_signal_runtime.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from datetime import date, datetime +from pathlib import Path +from typing import Any, Callable, Iterable + +from us_equity_strategies.signals import ( + IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER, + MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT, + MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF, + MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, + extract_consumer_market_signal_inputs_from_reference, +) + + +IBIT_SMART_DCA_PROFILE = "ibit_smart_dca" +DEFAULT_MARKET_SIGNAL_CACHE_DIR = "/tmp/quant-platform-market-signals" + + +def resolve_external_market_signal_inputs( + *, + strategy_profile: str, + available_inputs: Iterable[str], + runtime_settings: Any, + as_of: Any = None, + logger: Callable[[str], None] = print, + client_factory: Any = None, +) -> dict[str, Any]: + if str(strategy_profile or "").strip().lower() != IBIT_SMART_DCA_PROFILE: + return {} + if "derived_indicators" not in {str(item) for item in available_inputs or ()}: + return {} + + reference_type, reference = _market_signal_reference(runtime_settings) + if reference is None: + if bool(getattr(runtime_settings, "market_signal_required", False)): + raise RuntimeError("IBIT external market signal is required but no signal reference is configured") + return {"derived_indicators": {}} + + market_inputs, metadata = extract_consumer_market_signal_inputs_from_reference( + reference, + reference_type=reference_type, + consumer=IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER, + cache_dir=_market_signal_cache_dir(runtime_settings), + as_of=_market_signal_as_of(as_of), + client_factory=client_factory, + ) + logger( + "market_signal_inputs_loaded | " + f"profile={strategy_profile} reference_type={metadata.get('reference_type')} " + f"source_uri={metadata.get('source_uri') or reference} " + f"materialized_count={metadata.get('materialized_count')}" + ) + return dict(market_inputs) + + +def _market_signal_reference(runtime_settings: Any) -> tuple[str, str | None]: + consumption_audit_uri = _optional_string( + getattr(runtime_settings, "market_signal_consumption_audit_uri", None) + ) + if consumption_audit_uri: + return MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT, consumption_audit_uri + + handoff_manifest_uri = _optional_string( + getattr(runtime_settings, "market_signal_handoff_manifest_uri", None) + ) + if handoff_manifest_uri: + return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF, handoff_manifest_uri + + handoff_index_uri = _optional_string( + getattr(runtime_settings, "market_signal_handoff_index_uri", None) + ) + if handoff_index_uri: + return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, handoff_index_uri + + return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, None + + +def _market_signal_cache_dir(runtime_settings: Any) -> Path: + configured = _optional_string(getattr(runtime_settings, "market_signal_cache_dir", None)) + return Path(configured or DEFAULT_MARKET_SIGNAL_CACHE_DIR) + + +def _market_signal_as_of(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, datetime): + return value.date().isoformat() + if isinstance(value, date): + return value.isoformat() + text = str(value).strip() + return text[:10] if text else None + + +def _optional_string(value: Any) -> str | None: + text = str(value or "").strip() + return text or None diff --git a/pyproject.toml b/pyproject.toml index cdef4a7..21a39c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,8 +14,8 @@ authors = [ ] dependencies = [ "firstrade==0.0.39", - "quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@b846c9d777a450e95d23c264853997d671f47dd9", - "us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@361338f60900182e3be535cd5fd2be2b9a07b422", + "quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@6a273601b5213844168d4642a7f3c9c4ed3ce162", + "us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@3b557b349566caa14b5630aea614fd10a0310f1d", "google-cloud-storage", "requests", ] diff --git a/requirements.txt b/requirements.txt index 1188166..cc8377d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ flask gunicorn firstrade==0.0.39 -quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@b846c9d777a450e95d23c264853997d671f47dd9 -us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@53de72b2ebd3f142d72988f3a327a56e89d59171 +quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@6a273601b5213844168d4642a7f3c9c4ed3ce162 +us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@3b557b349566caa14b5630aea614fd10a0310f1d google-cloud-storage google-auth requests diff --git a/runtime_config_support.py b/runtime_config_support.py index d1c72e7..2561aaa 100644 --- a/runtime_config_support.py +++ b/runtime_config_support.py @@ -59,6 +59,11 @@ class PlatformRuntimeSettings: dca_mode: str | None = None dca_base_investment_usd: float | None = None runtime_execution_window_trading_days: int | None = None + market_signal_handoff_index_uri: str | None = None + market_signal_handoff_manifest_uri: str | None = None + market_signal_consumption_audit_uri: str | None = None + market_signal_cache_dir: str | None = None + market_signal_required: bool = False feature_snapshot_path: str | None = None feature_snapshot_manifest_path: str | None = None strategy_config_path: str | None = None @@ -179,6 +184,29 @@ def load_platform_runtime_settings( runtime_execution_window_trading_days=_runtime_execution_window_trading_days_env( strategy_definition.profile ), + market_signal_handoff_index_uri=_first_non_empty( + os.getenv("FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI"), + os.getenv("MARKET_SIGNAL_HANDOFF_INDEX_URI"), + ), + market_signal_handoff_manifest_uri=_first_non_empty( + os.getenv("FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI"), + os.getenv("MARKET_SIGNAL_HANDOFF_MANIFEST_URI"), + ), + market_signal_consumption_audit_uri=_first_non_empty( + os.getenv("FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI"), + os.getenv("MARKET_SIGNAL_CONSUMPTION_AUDIT_URI"), + ), + market_signal_cache_dir=_first_non_empty( + os.getenv("FIRSTRADE_MARKET_SIGNAL_CACHE_DIR"), + os.getenv("MARKET_SIGNAL_CACHE_DIR"), + ), + market_signal_required=resolve_bool_value( + _first_non_empty( + os.getenv("FIRSTRADE_MARKET_SIGNAL_REQUIRED"), + os.getenv("MARKET_SIGNAL_REQUIRED"), + "false", + ) + ), feature_snapshot_path=runtime_paths.feature_snapshot_path, feature_snapshot_manifest_path=runtime_paths.feature_snapshot_manifest_path, strategy_config_path=runtime_paths.strategy_config_path, diff --git a/strategy_runtime.py b/strategy_runtime.py index 4a619ca..7443edb 100644 --- a/strategy_runtime.py +++ b/strategy_runtime.py @@ -18,6 +18,7 @@ build_strategy_context_from_available_inputs, ) from runtime_config_support import PlatformRuntimeSettings +from market_signal_runtime import resolve_external_market_signal_inputs from strategy_loader import ( load_strategy_entrypoint_for_profile, load_strategy_runtime_adapter_for_profile, @@ -76,11 +77,21 @@ def evaluate( ) as_of = datetime.now(timezone.utc) + resolved_available_inputs = dict(available_inputs) + resolved_available_inputs.update( + resolve_external_market_signal_inputs( + strategy_profile=self.profile, + available_inputs=self.runtime_adapter.available_inputs or self.entrypoint.manifest.required_inputs, + runtime_settings=self.runtime_settings, + as_of=as_of, + logger=self.logger, + ) + ) ctx = build_strategy_context_from_available_inputs( entrypoint=self.entrypoint, runtime_adapter=self.runtime_adapter, as_of=as_of, - available_inputs=available_inputs, + available_inputs=resolved_available_inputs, runtime_config=runtime_config, ) decision = self.entrypoint.evaluate(ctx) diff --git a/tests/test_market_signal_runtime.py b/tests/test_market_signal_runtime.py new file mode 100644 index 0000000..3e6dbbb --- /dev/null +++ b/tests/test_market_signal_runtime.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from types import SimpleNamespace + +import pytest + +import market_signal_runtime + + +def test_non_ibit_profile_does_not_load_market_signal(): + settings = SimpleNamespace(market_signal_required=True) + + assert ( + market_signal_runtime.resolve_external_market_signal_inputs( + strategy_profile="soxl_soxx_trend_income", + available_inputs={"derived_indicators"}, + runtime_settings=settings, + ) + == {} + ) + + +def test_ibit_without_reference_provides_empty_indicator_input(): + settings = SimpleNamespace(market_signal_required=False) + + assert market_signal_runtime.resolve_external_market_signal_inputs( + strategy_profile="ibit_smart_dca", + available_inputs={"derived_indicators"}, + runtime_settings=settings, + ) == {"derived_indicators": {}} + + +def test_ibit_required_reference_missing_raises(): + settings = SimpleNamespace(market_signal_required=True) + + with pytest.raises(RuntimeError, match="external market signal is required"): + market_signal_runtime.resolve_external_market_signal_inputs( + strategy_profile="ibit_smart_dca", + available_inputs={"derived_indicators"}, + runtime_settings=settings, + ) + + +def test_ibit_handoff_index_reference_is_extracted(monkeypatch, tmp_path): + calls: dict[str, object] = {} + + def fake_extract(reference, *, reference_type, consumer, cache_dir, as_of, client_factory=None): + calls["extract"] = (reference, reference_type, consumer, cache_dir, as_of, client_factory) + return {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}}, { + "reference_type": reference_type, + "source_uri": reference, + "materialized_count": 2, + } + + monkeypatch.setattr( + market_signal_runtime, + "extract_consumer_market_signal_inputs_from_reference", + fake_extract, + ) + settings = SimpleNamespace( + market_signal_handoff_index_uri="gs://signals/platform_handoffs/index.json", + market_signal_cache_dir=str(tmp_path), + market_signal_required=False, + ) + + assert market_signal_runtime.resolve_external_market_signal_inputs( + strategy_profile="ibit_smart_dca", + available_inputs={"derived_indicators"}, + runtime_settings=settings, + as_of=datetime(2026, 6, 19, tzinfo=timezone.utc), + logger=lambda _message: None, + client_factory=object, + ) == {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}} + assert calls["extract"] == ( + "gs://signals/platform_handoffs/index.json", + "platform_handoff_index", + "us_equity:ibit_smart_dca", + tmp_path, + "2026-06-19", + object, + ) diff --git a/tests/test_sync_cloud_run_env_workflow.py b/tests/test_sync_cloud_run_env_workflow.py index 3ca06b6..672c504 100644 --- a/tests/test_sync_cloud_run_env_workflow.py +++ b/tests/test_sync_cloud_run_env_workflow.py @@ -49,6 +49,11 @@ def test_sync_cloud_run_env_workflow_syncs_strategy_plugin_alert_settings(): "INCOME_LAYER_ENABLED", "INCOME_LAYER_START_USD", "INCOME_LAYER_MAX_RATIO", + "FIRSTRADE_MARKET_SIGNAL_HANDOFF_INDEX_URI", + "FIRSTRADE_MARKET_SIGNAL_HANDOFF_MANIFEST_URI", + "FIRSTRADE_MARKET_SIGNAL_CONSUMPTION_AUDIT_URI", + "FIRSTRADE_MARKET_SIGNAL_CACHE_DIR", + "FIRSTRADE_MARKET_SIGNAL_REQUIRED", ): assert f"{name}: ${{{{ vars.{name} }}}}" in workflow assert f"add_optional_env {name}" in workflow