-
Notifications
You must be signed in to change notification settings - Fork 0
Add IBIT market signal runtime input support #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ | |
| ) | ||
| from quant_platform_kit.strategy_contracts import build_strategy_evaluation_inputs | ||
| from runtime_config_support import PlatformRuntimeSettings, load_platform_runtime_settings | ||
| from market_signal_runtime import resolve_external_market_signal_inputs | ||
| from strategy_runtime import load_strategy_runtime | ||
|
|
||
| LIMIT_SELL_DISCOUNT = 0.995 | ||
|
|
@@ -118,15 +119,30 @@ def build_market_inputs( | |
| market_data_port, | ||
| benchmark_symbol: str, | ||
| strategy_runtime_config: Mapping[str, Any], | ||
| strategy_profile: str | None = None, | ||
| runtime_settings: PlatformRuntimeSettings | None = None, | ||
| log_message: Callable[[str], None] = print, | ||
| ) -> dict[str, Any]: | ||
| inputs: dict[str, Any] = {} | ||
| if runtime_settings is not None and strategy_profile is not None: | ||
| inputs.update( | ||
| resolve_external_market_signal_inputs( | ||
| strategy_profile=strategy_profile, | ||
| available_inputs=available_inputs, | ||
| runtime_settings=runtime_settings, | ||
| logger=log_message, | ||
| ) | ||
| ) | ||
| if "market_history" in available_inputs: | ||
| inputs["market_history"] = _build_market_history_loader(market_data_port) | ||
| if "benchmark_history" in available_inputs: | ||
| inputs["benchmark_history"] = _build_price_history(market_data_port, benchmark_symbol) | ||
| if "qqq_history" in available_inputs: | ||
| inputs["qqq_history"] = _build_price_history(market_data_port, benchmark_symbol) | ||
| if "derived_indicators" in available_inputs or "indicators" in available_inputs: | ||
| if ( | ||
| ("derived_indicators" in available_inputs and "derived_indicators" not in inputs) | ||
| or ("indicators" in available_inputs and "indicators" not in inputs) | ||
| ): | ||
| indicators = _build_derived_indicators( | ||
| market_data_port, | ||
| trend_ma_window=int(strategy_runtime_config.get("trend_ma_window", 150)), | ||
|
|
@@ -343,6 +359,9 @@ def run_strategy_cycle( | |
| market_data_port=market_data_port, | ||
| benchmark_symbol=benchmark_symbol, | ||
| strategy_runtime_config=strategy_runtime.merged_runtime_config, | ||
| strategy_profile=settings.strategy_profile, | ||
| runtime_settings=settings, | ||
| log_message=log_message, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In Useful? React with 👍 / 👎. |
||
| ) | ||
| evaluation_inputs = build_strategy_evaluation_inputs( | ||
| available_inputs=available_inputs, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from datetime import date, datetime | ||
| from pathlib import Path | ||
| from typing import Any, Callable, Iterable | ||
|
|
||
| from us_equity_strategies.signals import ( | ||
| IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER, | ||
| MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT, | ||
| MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF, | ||
| MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, | ||
| extract_consumer_market_signal_inputs_from_reference, | ||
| ) | ||
|
|
||
|
|
||
| IBIT_SMART_DCA_PROFILE = "ibit_smart_dca" | ||
| DEFAULT_MARKET_SIGNAL_CACHE_DIR = "/tmp/quant-platform-market-signals" | ||
|
|
||
|
|
||
| def resolve_external_market_signal_inputs( | ||
| *, | ||
| strategy_profile: str, | ||
| available_inputs: Iterable[str], | ||
| runtime_settings: Any, | ||
| as_of: Any = None, | ||
| logger: Callable[[str], None] = print, | ||
| client_factory: Any = None, | ||
| ) -> dict[str, Any]: | ||
| if str(strategy_profile or "").strip().lower() != IBIT_SMART_DCA_PROFILE: | ||
| return {} | ||
| if "derived_indicators" not in {str(item) for item in available_inputs or ()}: | ||
| return {} | ||
|
|
||
| reference_type, reference = _market_signal_reference(runtime_settings) | ||
| if reference is None: | ||
| if bool(getattr(runtime_settings, "market_signal_required", False)): | ||
| raise RuntimeError("IBIT external market signal is required but no signal reference is configured") | ||
| return {"derived_indicators": {}} | ||
|
|
||
| market_inputs, metadata = extract_consumer_market_signal_inputs_from_reference( | ||
| reference, | ||
| reference_type=reference_type, | ||
| consumer=IBIT_SMART_DCA_MARKET_SIGNAL_CONSUMER, | ||
| cache_dir=_market_signal_cache_dir(runtime_settings), | ||
| as_of=_market_signal_as_of(as_of), | ||
| client_factory=client_factory, | ||
| ) | ||
| logger( | ||
| "market_signal_inputs_loaded | " | ||
| f"profile={strategy_profile} reference_type={metadata.get('reference_type')} " | ||
| f"source_uri={metadata.get('source_uri') or reference} " | ||
| f"materialized_count={metadata.get('materialized_count')}" | ||
| ) | ||
| return dict(market_inputs) | ||
|
|
||
|
|
||
| def _market_signal_reference(runtime_settings: Any) -> tuple[str, str | None]: | ||
| consumption_audit_uri = _optional_string( | ||
| getattr(runtime_settings, "market_signal_consumption_audit_uri", None) | ||
| ) | ||
| if consumption_audit_uri: | ||
| return MARKET_SIGNAL_REFERENCE_CONSUMPTION_AUDIT, consumption_audit_uri | ||
|
|
||
| handoff_manifest_uri = _optional_string( | ||
| getattr(runtime_settings, "market_signal_handoff_manifest_uri", None) | ||
| ) | ||
| if handoff_manifest_uri: | ||
| return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF, handoff_manifest_uri | ||
|
|
||
| handoff_index_uri = _optional_string( | ||
| getattr(runtime_settings, "market_signal_handoff_index_uri", None) | ||
| ) | ||
| if handoff_index_uri: | ||
| return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, handoff_index_uri | ||
|
|
||
| return MARKET_SIGNAL_REFERENCE_PLATFORM_HANDOFF_INDEX, None | ||
|
|
||
|
|
||
| def _market_signal_cache_dir(runtime_settings: Any) -> Path: | ||
| configured = _optional_string(getattr(runtime_settings, "market_signal_cache_dir", None)) | ||
| return Path(configured or DEFAULT_MARKET_SIGNAL_CACHE_DIR) | ||
|
|
||
|
|
||
| def _market_signal_as_of(value: Any) -> str | None: | ||
| if value is None: | ||
| return None | ||
| if isinstance(value, datetime): | ||
| return value.date().isoformat() | ||
| if isinstance(value, date): | ||
| return value.isoformat() | ||
| text = str(value).strip() | ||
| return text[:10] if text else None | ||
|
|
||
|
|
||
| def _optional_string(value: Any) -> str | None: | ||
| text = str(value or "").strip() | ||
| return text or None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime, timezone | ||
| from types import SimpleNamespace | ||
|
|
||
| import pytest | ||
|
|
||
| import market_signal_runtime | ||
|
|
||
|
|
||
| def test_non_ibit_profile_does_not_load_market_signal(): | ||
| settings = SimpleNamespace(market_signal_required=True) | ||
|
|
||
| assert ( | ||
| market_signal_runtime.resolve_external_market_signal_inputs( | ||
| strategy_profile="soxl_soxx_trend_income", | ||
| available_inputs={"derived_indicators"}, | ||
| runtime_settings=settings, | ||
| ) | ||
| == {} | ||
| ) | ||
|
|
||
|
|
||
| def test_ibit_without_reference_provides_empty_indicator_input(): | ||
| settings = SimpleNamespace(market_signal_required=False) | ||
|
|
||
| assert market_signal_runtime.resolve_external_market_signal_inputs( | ||
| strategy_profile="ibit_smart_dca", | ||
| available_inputs={"derived_indicators"}, | ||
| runtime_settings=settings, | ||
| ) == {"derived_indicators": {}} | ||
|
|
||
|
|
||
| def test_ibit_required_reference_missing_raises(): | ||
| settings = SimpleNamespace(market_signal_required=True) | ||
|
|
||
| with pytest.raises(RuntimeError, match="external market signal is required"): | ||
| market_signal_runtime.resolve_external_market_signal_inputs( | ||
| strategy_profile="ibit_smart_dca", | ||
| available_inputs={"derived_indicators"}, | ||
| runtime_settings=settings, | ||
| ) | ||
|
|
||
|
|
||
| def test_ibit_handoff_index_reference_is_extracted(monkeypatch, tmp_path): | ||
| calls: dict[str, object] = {} | ||
|
|
||
| def fake_extract(reference, *, reference_type, consumer, cache_dir, as_of, client_factory=None): | ||
| calls["extract"] = (reference, reference_type, consumer, cache_dir, as_of, client_factory) | ||
| return {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}}, { | ||
| "reference_type": reference_type, | ||
| "source_uri": reference, | ||
| "materialized_count": 2, | ||
| } | ||
|
|
||
| monkeypatch.setattr( | ||
| market_signal_runtime, | ||
| "extract_consumer_market_signal_inputs_from_reference", | ||
| fake_extract, | ||
| ) | ||
| settings = SimpleNamespace( | ||
| market_signal_handoff_index_uri="gs://signals/platform_handoffs/index.json", | ||
| market_signal_cache_dir=str(tmp_path), | ||
| market_signal_required=False, | ||
| ) | ||
|
|
||
| assert market_signal_runtime.resolve_external_market_signal_inputs( | ||
| strategy_profile="ibit_smart_dca", | ||
| available_inputs={"derived_indicators"}, | ||
| runtime_settings=settings, | ||
| as_of=datetime(2026, 6, 19, tzinfo=timezone.utc), | ||
| logger=lambda _message: None, | ||
| client_factory=object, | ||
| ) == {"derived_indicators": {"BTC": {"mvrv_z_score": 1.0}}} | ||
| assert calls["extract"] == ( | ||
| "gs://signals/platform_handoffs/index.json", | ||
| "platform_handoff_index", | ||
| "us_equity:ibit_smart_dca", | ||
| tmp_path, | ||
| "2026-06-19", | ||
| object, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this project is installed from its
pyproject.tomlrather than run directly from a source checkout, this new import cannot resolve because[tool.setuptools].py-modulesstill omits the new top-levelmarket_signal_runtime.py. Any packaged/buildpack-style deployment or downstreampip install .consumer that importsapplication.rebalance_serviceorstrategy_runtimewill fail at startup withModuleNotFoundError; add the module to the setuptools module list or package it another way.Useful? React with 👍 / 👎.