From 86d72631c76f27c668b234704b91241bec8fa9ee Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Thu, 28 May 2026 14:34:42 +0800 Subject: [PATCH] Add plugin metadata and IBKR option support --- .../common/strategy_plugins.py | 88 ++++++++ src/quant_platform_kit/ibkr/__init__.py | 2 + src/quant_platform_kit/ibkr/execution.py | 198 +++++++++++++++++- src/quant_platform_kit/ibkr/market_data.py | 187 +++++++++++++++++ src/quant_platform_kit/ibkr/portfolio.py | 24 ++- tests/test_ibkr_execution.py | 113 ++++++++++ tests/test_ibkr_market_data.py | 57 +++++ tests/test_ibkr_portfolio.py | 17 ++ tests/test_strategy_plugins.py | 78 +++++++ 9 files changed, 757 insertions(+), 7 deletions(-) diff --git a/src/quant_platform_kit/common/strategy_plugins.py b/src/quant_platform_kit/common/strategy_plugins.py index f3598a9..33c1793 100644 --- a/src/quant_platform_kit/common/strategy_plugins.py +++ b/src/quant_platform_kit/common/strategy_plugins.py @@ -7,6 +7,7 @@ import tempfile from collections.abc import Mapping, Sequence from dataclasses import dataclass, field +from dataclasses import replace as dataclass_replace from pathlib import Path from typing import Any, Callable @@ -427,6 +428,52 @@ def build_strategy_plugin_report_payload(signals: Sequence[StrategyPluginSignal] } +def build_strategy_plugin_metadata(signals: Sequence[StrategyPluginSignal]) -> dict[str, Any]: + """Build portfolio-snapshot metadata consumed by deterministic strategies.""" + plugin_payloads: dict[str, Any] = {} + summaries: dict[str, Any] = {} + for signal in signals: + execution_controls = getattr(signal, "execution_controls", {}) or {} + if not isinstance(execution_controls, Mapping) or not _as_bool( + execution_controls.get("strategy_runtime_metadata_allowed"), + default=False, + ): + continue + plugin = str(getattr(signal, "plugin", "") or "").strip() + if not plugin: + continue + payload = dict(getattr(signal, "payload", {}) or {}) + plugin_payloads[plugin] = payload + summaries[plugin] = signal.report_summary() + if not plugin_payloads: + return {} + metadata: dict[str, Any] = { + "strategy_plugins": plugin_payloads, + "strategy_plugin_summaries": summaries, + } + metadata.update(plugin_payloads) + return metadata + + +def attach_strategy_plugin_metadata(snapshot: Any, signals: Sequence[StrategyPluginSignal]) -> Any: + """Return a snapshot copy with plugin payloads attached to metadata.""" + plugin_metadata = build_strategy_plugin_metadata(signals) + if not plugin_metadata: + return snapshot + current_metadata = getattr(snapshot, "metadata", {}) or {} + if not isinstance(current_metadata, Mapping): + current_metadata = {} + merged_metadata = {**dict(current_metadata), **plugin_metadata} + try: + return dataclass_replace(snapshot, metadata=merged_metadata) + except TypeError: + try: + snapshot.metadata = merged_metadata + except Exception: + return snapshot + return snapshot + + def translate_strategy_plugin_value( category: str, raw_value: str | None, @@ -532,6 +579,41 @@ def build_strategy_plugin_alert_scope_note( ) +def build_strategy_plugin_ai_audit_note( + signal: StrategyPluginSignal, + *, + translator: Callable[..., str] | None = None, +) -> str | None: + payload = getattr(signal, "payload", {}) or {} + if not isinstance(payload, Mapping): + return None + ai_audit = payload.get("ai_audit") + if not isinstance(ai_audit, Mapping) or not _as_bool(ai_audit.get("enabled"), default=False): + return None + status = _normalize_strategy_plugin_field(_optional_string(ai_audit.get("status"))) + if status == "ok": + verdict = _optional_string(ai_audit.get("verdict")) or "unknown" + assessment = _optional_string(ai_audit.get("route_assessment")) or "unknown" + summary = _optional_string(ai_audit.get("summary")) or "no summary" + return _translate( + translator, + "strategy_plugin_alert_ai_audit", + fallback="AI audit: {status} | verdict={verdict} | assessment={assessment} | {summary}", + status=status, + verdict=verdict, + assessment=assessment, + summary=summary, + ) + reason = _optional_string(ai_audit.get("skip_reason")) or _optional_string(ai_audit.get("error")) or "no detail" + return _translate( + translator, + "strategy_plugin_alert_ai_audit_status", + fallback="AI audit: {status} | {reason}", + status=status, + reason=reason, + ) + + def build_strategy_plugin_alert_key( signal: StrategyPluginSignal, *, @@ -593,6 +675,7 @@ def build_strategy_plugin_alert_messages( strategy = str(strategy_label or getattr(signal, "strategy", None) or "").strip() or "unknown" guidance = build_strategy_plugin_alert_guidance(signal, translator=translator) scope_note = build_strategy_plugin_alert_scope_note(signal, translator=translator) + ai_audit_note = build_strategy_plugin_ai_audit_note(signal, translator=translator) subject = _translate( translator, "strategy_plugin_alert_subject", @@ -665,6 +748,8 @@ def build_strategy_plugin_alert_messages( guidance=guidance, ) ) + if ai_audit_note: + body_lines.append(ai_audit_note) if scope_note: body_lines.append( _translate( @@ -686,6 +771,9 @@ def build_strategy_plugin_alert_messages( "context_label": context or None, "guidance": guidance, "scope_note": scope_note, + "ai_audit": getattr(signal, "payload", {}).get("ai_audit") + if isinstance(getattr(signal, "payload", {}), Mapping) + else None, } messages.append( StrategyPluginAlertMessage( diff --git a/src/quant_platform_kit/ibkr/__init__.py b/src/quant_platform_kit/ibkr/__init__.py index 8958c88..64ae2e7 100644 --- a/src/quant_platform_kit/ibkr/__init__.py +++ b/src/quant_platform_kit/ibkr/__init__.py @@ -3,6 +3,7 @@ from .market_data import ( fetch_historical_price_candles, fetch_historical_price_series, + fetch_option_chain_snapshot, fetch_quote_snapshots, ) from .portfolio import fetch_portfolio_snapshot @@ -23,6 +24,7 @@ "connect_ib", "ensure_event_loop", "fetch_historical_price_candles", + "fetch_option_chain_snapshot", "submit_order_intent", "fetch_historical_price_series", "fetch_quote_snapshots", diff --git a/src/quant_platform_kit/ibkr/execution.py b/src/quant_platform_kit/ibkr/execution.py index 5ff3b73..4080685 100644 --- a/src/quant_platform_kit/ibkr/execution.py +++ b/src/quant_platform_kit/ibkr/execution.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import date, datetime from typing import Any, Callable from quant_platform_kit.common.models import ExecutionReport, OrderIntent @@ -24,6 +25,168 @@ def _build_stock_contract( return stock_factory(symbol, exchange, currency) +def _normalize_option_expiration(value: Any) -> str: + text = str(value or "").strip() + if len(text) == 8 and text.isdigit(): + return text + if not text: + raise ValueError("Option OrderIntent.metadata.expiration is required.") + if isinstance(value, datetime): + return value.date().strftime("%Y%m%d") + if isinstance(value, date): + return value.strftime("%Y%m%d") + try: + return datetime.fromisoformat(text[:10]).date().strftime("%Y%m%d") + except ValueError as exc: + raise ValueError(f"Invalid option expiration: {value!r}") from exc + + +def _normalize_option_right(value: Any) -> str: + text = str(value or "").strip().upper() + if text in {"CALL", "C"}: + return "C" + if text in {"PUT", "P"}: + return "P" + raise ValueError("Option OrderIntent.metadata.right must be C/call or P/put.") + + +def _build_option_contract( + order_intent: OrderIntent, + *, + option_factory: Callable[..., Any] | None = None, + exchange: str = "SMART", + currency: str = "USD", +) -> Any: + metadata = dict(order_intent.metadata or {}) + underlier = str(metadata.get("underlier") or order_intent.symbol or "").strip().upper() + if not underlier: + raise ValueError("Option OrderIntent requires symbol or metadata.underlier.") + expiration = _normalize_option_expiration(metadata.get("expiration")) + right = _normalize_option_right(metadata.get("right")) + try: + strike = float(metadata.get("strike")) + except (TypeError, ValueError) as exc: + raise ValueError("Option OrderIntent.metadata.strike is required.") from exc + if strike <= 0.0: + raise ValueError("Option OrderIntent.metadata.strike must be positive.") + if option_factory is None: + from ib_insync import Option + + option_factory = Option + return option_factory( + underlier, + expiration, + strike, + right, + exchange=exchange, + currency=currency, + ) + + +def _is_option_intent(order_intent: OrderIntent) -> bool: + metadata = dict(order_intent.metadata or {}) + return ( + str(metadata.get("asset_class") or "").strip().lower() == "option" + or str(metadata.get("security_type") or "").strip().upper() == "OPT" + or str(metadata.get("security_type") or "").strip().upper() == "BAG" + or str(metadata.get("intent_type") or "").strip() == "single_leg_option" + or str(metadata.get("intent_type") or "").strip() == "multi_leg_option" + ) + + +def _is_combo_option_intent(order_intent: OrderIntent) -> bool: + metadata = dict(order_intent.metadata or {}) + return ( + str(metadata.get("asset_class") or "").strip().lower() == "option" + and str(metadata.get("intent_type") or "").strip() == "multi_leg_option" + ) + + +def _leg_action(value: Any) -> str: + text = str(value or "").strip().lower() + if text.startswith("buy"): + return "BUY" + if text.startswith("sell"): + return "SELL" + raise ValueError(f"Unsupported option combo leg action: {value!r}") + + +def _build_option_combo_contract( + ib: Any, + order_intent: OrderIntent, + *, + option_factory: Callable[..., Any] | None = None, + combo_contract_factory: Callable[..., Any] | None = None, + combo_leg_factory: Callable[..., Any] | None = None, + exchange: str = "SMART", + currency: str = "USD", +) -> Any: + metadata = dict(order_intent.metadata or {}) + underlier = str(metadata.get("underlier") or order_intent.symbol or "").strip().upper() + legs = tuple(metadata.get("legs") or ()) + if not underlier or not legs: + raise ValueError("Multi-leg option OrderIntent requires metadata.underlier and metadata.legs.") + if combo_contract_factory is None: + from ib_insync import Contract + + combo_contract_factory = Contract + if combo_leg_factory is None: + from ib_insync import ComboLeg + + combo_leg_factory = ComboLeg + + combo_legs = [] + for leg in legs: + if not isinstance(leg, dict): + raise ValueError("Option combo legs must be mappings.") + option_contract = _build_option_contract( + OrderIntent( + symbol=underlier, + side=_leg_action(leg.get("action")), + quantity=1, + metadata={ + "underlier": underlier, + "expiration": leg.get("expiration") or metadata.get("expiration"), + "right": leg.get("right"), + "strike": leg.get("strike"), + }, + ), + option_factory=option_factory, + exchange=exchange, + currency=currency, + ) + qualified = ib.qualifyContracts(option_contract) + qualified_contract = qualified[0] if qualified else option_contract + con_id = getattr(qualified_contract, "conId", None) + if con_id is None: + raise ValueError("Qualified option combo leg did not expose conId.") + combo_legs.append( + combo_leg_factory( + conId=con_id, + ratio=int(leg.get("ratio") or 1), + action=_leg_action(leg.get("action")), + exchange=exchange, + ) + ) + + contract = combo_contract_factory() + contract.symbol = underlier + contract.secType = "BAG" + contract.exchange = exchange + contract.currency = currency + contract.comboLegs = combo_legs + return contract + + +def _normalize_order_side(side: str) -> str: + text = str(side or "").strip().lower() + if text.startswith("buy"): + return "BUY" + if text.startswith("sell"): + return "SELL" + raise ValueError(f"Unsupported order side: {side!r}") + + def submit_order_intent( ib: Any, order_intent: OrderIntent, @@ -31,16 +194,34 @@ def submit_order_intent( account_id: str | None = None, wait_seconds: float = 1.0, stock_factory: Callable[..., Any] | None = None, + option_factory: Callable[..., Any] | None = None, + combo_contract_factory: Callable[..., Any] | None = None, + combo_leg_factory: Callable[..., Any] | None = None, market_order_factory: Callable[..., Any] | None = None, limit_order_factory: Callable[..., Any] | None = None, ) -> ExecutionReport: - contract = _build_stock_contract( - order_intent.symbol, - stock_factory=stock_factory, - ) + metadata = dict(order_intent.metadata or {}) + if _is_combo_option_intent(order_intent): + contract = _build_option_combo_contract( + ib, + order_intent, + option_factory=option_factory, + combo_contract_factory=combo_contract_factory, + combo_leg_factory=combo_leg_factory, + ) + elif _is_option_intent(order_intent): + contract = _build_option_contract( + order_intent, + option_factory=option_factory, + ) + else: + contract = _build_stock_contract( + order_intent.symbol, + stock_factory=stock_factory, + ) ib.qualifyContracts(contract) - side = order_intent.side.upper() + side = _normalize_order_side(order_intent.side) order_type = order_intent.order_type.lower() if order_type == "market": if market_order_factory is None: @@ -90,5 +271,12 @@ def submit_order_intent( "order_type": order_type, "time_in_force": getattr(order, "tif", None), "account_id": resolved_account_id, + "asset_class": metadata.get("asset_class"), + "intent_type": metadata.get("intent_type"), + "underlier": metadata.get("underlier"), + "right": metadata.get("right"), + "expiration": metadata.get("expiration"), + "strike": metadata.get("strike"), + "legs": metadata.get("legs"), }, ) diff --git a/src/quant_platform_kit/ibkr/market_data.py b/src/quant_platform_kit/ibkr/market_data.py index fa650a0..7e1bd4d 100644 --- a/src/quant_platform_kit/ibkr/market_data.py +++ b/src/quant_platform_kit/ibkr/market_data.py @@ -20,6 +20,18 @@ def _coerce_as_of(value: Any) -> datetime: raise ValueError(f"Unsupported IBKR date value: {value!r}") from exc +def _coerce_expiration(value: Any) -> date | None: + text = str(value or "").strip() + if not text: + return None + if len(text) == 8 and text.isdigit(): + text = f"{text[:4]}-{text[4:6]}-{text[6:]}" + try: + return datetime.fromisoformat(text[:10]).date() + except ValueError: + return None + + def _build_stock_contract( symbol: str, *, @@ -34,6 +46,30 @@ def _build_stock_contract( return stock_factory(symbol, exchange, currency) +def _build_option_contract( + symbol: str, + expiration: str, + strike: float, + right: str, + *, + exchange: str, + currency: str, + option_factory: Callable[..., Any] | None, +) -> Any: + if option_factory is None: + from ib_insync import Option + + option_factory = Option + return option_factory( + symbol, + expiration, + float(strike), + str(right).strip().upper(), + exchange=exchange, + currency=currency, + ) + + def _normalize_duration_for_ibkr(duration: str) -> str: text = str(duration or "").strip() match = re.fullmatch(r"(\d+)\s*([A-Za-z]+)", text) @@ -286,3 +322,154 @@ def fetch_quote_snapshots( _set_market_data_type(ib, 1) return snapshots + + +def _select_option_chain_params(chains: Any) -> Any | None: + candidates = list(chains or ()) + if not candidates: + return None + for chain in candidates: + if str(getattr(chain, "exchange", "") or "").strip().upper() == "SMART": + return chain + return candidates[0] + + +def _ticker_delta(ticker: Any) -> float | None: + for greeks_name in ("modelGreeks", "bidGreeks", "askGreeks", "lastGreeks"): + greeks = getattr(ticker, greeks_name, None) + delta = _coerce_positive_price(abs(getattr(greeks, "delta", 0.0))) if greeks is not None else None + if delta is not None: + return float(delta) + return None + + +def fetch_option_chain_snapshot( + ib: Any, + underlier: str, + *, + rights: tuple[str, ...] = ("C", "P"), + min_dte: int = 25, + max_dte: int = 930, + target_dte: int | None = None, + max_expirations: int = 3, + strike_range_pct: tuple[float, float] = (0.50, 1.35), + max_contracts: int = 80, + wait_seconds: float = 3.0, + exchange: str = "SMART", + currency: str = "USD", + stock_factory: Callable[..., Any] | None = None, + option_factory: Callable[..., Any] | None = None, +) -> dict[str, Any]: + """Fetch a bounded option-chain snapshot suitable for strategy-side contract selection.""" + + symbol = str(underlier or "").strip().upper() + if not symbol: + raise ValueError("underlier is required.") + + stock_contract = _build_stock_contract( + symbol, + exchange=exchange, + currency=currency, + stock_factory=stock_factory, + ) + qualified = ib.qualifyContracts(stock_contract) + qualified_stock = qualified[0] if qualified else stock_contract + con_id = getattr(qualified_stock, "conId", None) + if con_id is None: + raise ValueError(f"IBKR stock contract for {symbol} did not expose conId.") + + spot_snapshot = fetch_quote_snapshots( + ib, + (symbol,), + wait_seconds=wait_seconds, + exchange=exchange, + currency=currency, + stock_factory=stock_factory, + ) + spot = float(spot_snapshot[symbol].last_price) if symbol in spot_snapshot else 0.0 + if spot <= 0.0: + raise ValueError(f"Could not resolve underlying price for {symbol}.") + + chains = ib.reqSecDefOptParams(symbol, "", "STK", con_id) + chain = _select_option_chain_params(chains) + if chain is None: + return {"underlier": symbol, "spot": spot, "contracts": ()} + + as_of = datetime.utcnow().date() + target_dte = int(target_dte if target_dte is not None else (min_dte + max_dte) / 2) + expirations = [] + for raw_expiration in getattr(chain, "expirations", ()) or (): + expiration_date = _coerce_expiration(raw_expiration) + if expiration_date is None: + continue + dte = (expiration_date - as_of).days + if int(min_dte) <= dte <= int(max_dte): + expirations.append((abs(dte - target_dte), raw_expiration, expiration_date, dte)) + expirations = sorted(expirations)[: max(1, int(max_expirations or 1))] + + low_ratio, high_ratio = strike_range_pct + min_strike = spot * max(0.01, float(low_ratio)) + max_strike = spot * max(float(high_ratio), float(low_ratio)) + strikes = [ + float(strike) + for strike in getattr(chain, "strikes", ()) or () + if min_strike <= float(strike) <= max_strike + ] + if max_contracts > 0 and expirations and rights: + max_strikes = max(1, int(max_contracts // (len(expirations) * len(rights)) or 1)) + if len(strikes) > max_strikes: + if max_strikes == 1: + strikes = [min(strikes, key=lambda strike: abs(strike - spot))] + else: + last_index = len(strikes) - 1 + indices = { + round(index * last_index / (max_strikes - 1)) + for index in range(max_strikes) + } + strikes = [strikes[index] for index in sorted(indices)] + + requested: list[tuple[Any, str, float, str, date, int, Any]] = [] + for _dte_gap, raw_expiration, expiration_date, dte in expirations: + expiration_text = str(raw_expiration).replace("-", "") + for right in rights: + normalized_right = str(right or "").strip().upper()[0] + for strike in strikes: + contract = _build_option_contract( + symbol, + expiration_text, + strike, + normalized_right, + exchange=exchange, + currency=currency, + option_factory=option_factory, + ) + ib.qualifyContracts(contract) + ticker = ib.reqMktData(contract, "", False, False) + requested.append((contract, normalized_right, strike, expiration_text, expiration_date, dte, ticker)) + + _wait_for_market_data(ib, wait_seconds) + rows = [] + for contract, right, strike, _expiration_text, expiration_date, dte, ticker in requested: + ib.cancelMktData(contract) + bid = _coerce_positive_price(getattr(ticker, "bid", None)) + ask = _coerce_positive_price(getattr(ticker, "ask", None)) + last = _coerce_positive_price(getattr(ticker, "last", None)) + rows.append( + { + "underlier": symbol, + "right": right, + "expiration": expiration_date.isoformat(), + "dte": dte, + "strike": float(strike), + "bid": bid, + "ask": ask, + "last": last, + "mid": ((bid + ask) / 2.0) if bid is not None and ask is not None else last, + "delta": _ticker_delta(ticker), + } + ) + return { + "underlier": symbol, + "spot": spot, + "contracts": tuple(rows), + } diff --git a/src/quant_platform_kit/ibkr/portfolio.py b/src/quant_platform_kit/ibkr/portfolio.py index 3695fc7..972408a 100644 --- a/src/quant_platform_kit/ibkr/portfolio.py +++ b/src/quant_platform_kit/ibkr/portfolio.py @@ -41,17 +41,34 @@ def fetch_portfolio_snapshot( time_module.sleep(wait_seconds) positions = [] + option_positions = [] for raw_position in ib.positions(): account_id = str(getattr(raw_position, "account", "") or "").strip() or None if not _matches_account(account_id, selected_account_ids): continue if raw_position.position == 0: continue + contract = raw_position.contract quantity = float(raw_position.position) average_cost = float(raw_position.avgCost) + if str(getattr(contract, "secType", "") or "").strip().upper() == "OPT": + option_positions.append( + { + "underlier": str(getattr(contract, "symbol", "") or "").strip().upper(), + "local_symbol": str(getattr(contract, "localSymbol", "") or "").strip(), + "expiration": str(getattr(contract, "lastTradeDateOrContractMonth", "") or "").strip(), + "right": str(getattr(contract, "right", "") or "").strip().upper(), + "strike": float(getattr(contract, "strike", 0.0) or 0.0), + "quantity": quantity, + "average_cost": average_cost, + "cost_basis": abs(quantity * average_cost), + "account_id": account_id, + } + ) + continue positions.append( Position( - symbol=raw_position.contract.symbol, + symbol=contract.symbol, quantity=quantity, market_value=quantity * average_cost, average_cost=average_cost, @@ -78,5 +95,8 @@ def fetch_portfolio_snapshot( total_equity=total_equity, buying_power=buying_power, positions=tuple(positions), - metadata={"account_ids": selected_account_ids}, + metadata={ + "account_ids": selected_account_ids, + "option_positions": tuple(option_positions), + }, ) diff --git a/tests/test_ibkr_execution.py b/tests/test_ibkr_execution.py index 817e034..cd68875 100644 --- a/tests/test_ibkr_execution.py +++ b/tests/test_ibkr_execution.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass +from types import SimpleNamespace import unittest from quant_platform_kit.common.models import OrderIntent @@ -14,6 +15,16 @@ class FakeContract: currency: str +@dataclass +class FakeOptionContract: + symbol: str + lastTradeDateOrContractMonth: str + strike: float + right: str + exchange: str + currency: str + + class FakeMarketOrder: def __init__(self, side, quantity): self.side = side @@ -29,6 +40,14 @@ def __init__(self, side, quantity, limit_price): self.tif = None +class FakeComboLeg: + def __init__(self, conId, ratio, action, exchange): + self.conId = conId + self.ratio = ratio + self.action = action + self.exchange = exchange + + class FakeTrade: def __init__(self, status="Submitted", filled=0, avg_fill_price=0, order_id=123): self.orderStatus = type( @@ -113,6 +132,100 @@ def test_submit_order_intent_rejects_conflicting_account_id(self) -> None: market_order_factory=FakeMarketOrder, ) + def test_submit_order_intent_builds_single_leg_option_contract(self) -> None: + ib = FakeIB() + + report = submit_order_intent( + ib, + OrderIntent( + symbol="TQQQ", + side="buy_to_open", + quantity=2, + order_type="limit", + limit_price=32.5, + time_in_force="DAY", + metadata={ + "asset_class": "option", + "intent_type": "single_leg_option", + "underlier": "TQQQ", + "right": "C", + "expiration": "2028-01-21", + "strike": 70.0, + }, + ), + wait_seconds=0, + option_factory=FakeOptionContract, + limit_order_factory=FakeLimitOrder, + ) + + contract, order = ib.orders[0] + self.assertEqual(contract.symbol, "TQQQ") + self.assertEqual(contract.lastTradeDateOrContractMonth, "20280121") + self.assertEqual(contract.right, "C") + self.assertEqual(contract.strike, 70.0) + self.assertEqual(order.side, "BUY") + self.assertEqual(report.raw_payload["asset_class"], "option") + + def test_submit_order_intent_builds_option_combo_contract(self) -> None: + class ComboIB(FakeIB): + def __init__(self): + super().__init__() + self.next_con_id = 100 + + def qualifyContracts(self, contract): + if hasattr(contract, "right"): + contract.conId = self.next_con_id + self.next_con_id += 1 + self.qualified_contract = contract + return [contract] + + ib = ComboIB() + report = submit_order_intent( + ib, + OrderIntent( + symbol="SOXX", + side="sell", + quantity=1, + order_type="limit", + limit_price=1.25, + time_in_force="DAY", + metadata={ + "asset_class": "option", + "intent_type": "multi_leg_option", + "underlier": "SOXX", + "expiration": "2026-07-17", + "legs": ( + { + "action": "sell_to_open", + "right": "P", + "expiration": "2026-07-17", + "strike": 200.0, + "ratio": 1, + }, + { + "action": "buy_to_open", + "right": "P", + "expiration": "2026-07-17", + "strike": 180.0, + "ratio": 1, + }, + ), + }, + ), + wait_seconds=0, + option_factory=FakeOptionContract, + combo_contract_factory=SimpleNamespace, + combo_leg_factory=FakeComboLeg, + limit_order_factory=FakeLimitOrder, + ) + + contract, order = ib.orders[0] + self.assertEqual(contract.secType, "BAG") + self.assertEqual(contract.symbol, "SOXX") + self.assertEqual([leg.action for leg in contract.comboLegs], ["SELL", "BUY"]) + self.assertEqual(order.side, "SELL") + self.assertEqual(report.raw_payload["intent_type"], "multi_leg_option") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_ibkr_market_data.py b/tests/test_ibkr_market_data.py index e140e2e..8e38fee 100644 --- a/tests/test_ibkr_market_data.py +++ b/tests/test_ibkr_market_data.py @@ -2,11 +2,13 @@ from dataclasses import dataclass from datetime import date +from types import SimpleNamespace import unittest from quant_platform_kit.ibkr.market_data import ( fetch_historical_price_candles, fetch_historical_price_series, + fetch_option_chain_snapshot, fetch_quote_snapshots, ) @@ -18,6 +20,16 @@ class FakeContract: currency: str +@dataclass +class FakeOptionContract: + symbol: str + lastTradeDateOrContractMonth: str + strike: float + right: str + exchange: str + currency: str + + @dataclass class FakeBar: date: date @@ -287,6 +299,51 @@ def reqMktData(self, contract, *_args): self.assertEqual(snapshots["SPY"].last_price, 101.8) self.assertEqual(ib.market_data_type_calls, [3, 4, 1, 1]) + def test_fetch_option_chain_snapshot_returns_bounded_contract_rows(self) -> None: + class OptionChainIB(FakeIB): + def qualifyContracts(self, contract): + self.qualified.append(contract) + if not hasattr(contract, "right"): + contract.conId = 12345 + return [contract] + + def reqSecDefOptParams(self, symbol, fut_fop_exchange, sec_type, con_id): + self.option_params_request = (symbol, fut_fop_exchange, sec_type, con_id) + return [ + SimpleNamespace( + exchange="SMART", + expirations={"20280121"}, + strikes={50.0, 70.0, 90.0, 150.0}, + ) + ] + + def reqMktData(self, contract, *_args): + if hasattr(contract, "right"): + ticker = FakeTicker(-1.0, close=float("nan"), bid=29.0, ask=31.0) + ticker.modelGreeks = SimpleNamespace(delta=0.74) + return ticker + return FakeTicker(102.5, close=101.8, bid=102.4, ask=102.6) + + ib = OptionChainIB() + snapshot = fetch_option_chain_snapshot( + ib, + "TQQQ", + rights=("C",), + min_dte=540, + max_dte=930, + target_dte=730, + wait_seconds=0, + stock_factory=FakeContract, + option_factory=FakeOptionContract, + ) + + self.assertEqual(snapshot["underlier"], "TQQQ") + self.assertEqual(snapshot["spot"], 102.5) + self.assertEqual(snapshot["contracts"][0]["right"], "C") + self.assertEqual(snapshot["contracts"][0]["delta"], 0.74) + self.assertEqual(snapshot["contracts"][0]["mid"], 30.0) + self.assertEqual(ib.option_params_request, ("TQQQ", "", "STK", 12345)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_ibkr_portfolio.py b/tests/test_ibkr_portfolio.py index 2d29343..2102252 100644 --- a/tests/test_ibkr_portfolio.py +++ b/tests/test_ibkr_portfolio.py @@ -21,6 +21,19 @@ def positions(self): position=3, avgCost=100.0, ), + SimpleNamespace( + account="U18308207", + contract=SimpleNamespace( + symbol="TQQQ", + secType="OPT", + lastTradeDateOrContractMonth="20280121", + right="C", + strike=70.0, + localSymbol="TQQQ 280121C00070000", + ), + position=2, + avgCost=3200.0, + ), SimpleNamespace( account="U15998061", contract=SimpleNamespace(symbol="AAPL"), @@ -50,6 +63,9 @@ def test_fetch_portfolio_snapshot_filters_by_account_id(self) -> None: self.assertEqual(tuple(position.symbol for position in snapshot.positions), ("TQQQ",)) self.assertEqual(snapshot.positions[0].account_id, "U18308207") self.assertEqual(snapshot.metadata["account_ids"], ("U18308207",)) + self.assertEqual(snapshot.metadata["option_positions"][0]["underlier"], "TQQQ") + self.assertEqual(snapshot.metadata["option_positions"][0]["right"], "C") + self.assertEqual(snapshot.metadata["option_positions"][0]["strike"], 70.0) def test_fetch_portfolio_snapshot_sums_selected_accounts(self) -> None: snapshot = fetch_portfolio_snapshot( @@ -61,6 +77,7 @@ def test_fetch_portfolio_snapshot_sums_selected_accounts(self) -> None: self.assertEqual(snapshot.total_equity, 3000.0) self.assertEqual(snapshot.buying_power, 750.0) self.assertEqual(tuple(position.symbol for position in snapshot.positions), ("TQQQ", "AAPL")) + self.assertEqual(len(snapshot.metadata["option_positions"]), 1) if __name__ == "__main__": diff --git a/tests/test_strategy_plugins.py b/tests/test_strategy_plugins.py index 14387f8..45308d9 100644 --- a/tests/test_strategy_plugins.py +++ b/tests/test_strategy_plugins.py @@ -1,8 +1,10 @@ import json import tempfile import unittest +from datetime import datetime, timezone from pathlib import Path +from quant_platform_kit.common.models import PortfolioSnapshot from quant_platform_kit.common.strategy_plugins import ( CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES, DEFAULT_STRATEGY_PLUGIN_DEFINITIONS, @@ -15,6 +17,7 @@ STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM, TACO_REBOUND_SHADOW_SUPPORTED_STRATEGIES, StrategyPluginDefinition, + attach_strategy_plugin_metadata, build_strategy_plugin_alert_messages, build_strategy_plugin_notification_lines, build_strategy_plugin_report_payload, @@ -44,6 +47,7 @@ def _signal_payload(*, strategy="tqqq_growth_income", plugin="crisis_response_sh "live_allocation_mutation_allowed": False, "repository_broker_write_allowed": False, "repository_allocation_mutation_allowed": False, + "strategy_runtime_metadata_allowed": False, }, } @@ -289,6 +293,53 @@ def test_build_strategy_plugin_report_payload_uses_compact_summary(self): self.assertEqual(report_payload["strategy_plugins"][0]["plugin"], "crisis_response_shadow") self.assertNotIn("payload", report_payload["strategy_plugins"][0]) + def test_attach_strategy_plugin_metadata_adds_payloads_to_snapshot(self): + signal = validate_strategy_plugin_signal_payload( + { + **_signal_payload(plugin=PLUGIN_TACO_REBOUND_SHADOW), + "canonical_route": "taco_rebound", + "rebound_context_active": True, + "execution_controls": { + **_signal_payload()["execution_controls"], + "strategy_runtime_metadata_allowed": True, + }, + } + ) + snapshot = PortfolioSnapshot( + as_of=datetime(2026, 4, 17, tzinfo=timezone.utc), + total_equity=100000.0, + metadata={"account_hash": "demo"}, + ) + + enriched = attach_strategy_plugin_metadata(snapshot, (signal,)) + + self.assertIsNot(enriched, snapshot) + self.assertEqual(enriched.metadata["account_hash"], "demo") + self.assertEqual(enriched.metadata[PLUGIN_TACO_REBOUND_SHADOW]["canonical_route"], "taco_rebound") + self.assertEqual( + enriched.metadata["strategy_plugins"][PLUGIN_TACO_REBOUND_SHADOW]["canonical_route"], + "taco_rebound", + ) + + def test_attach_strategy_plugin_metadata_ignores_shadow_artifact_without_runtime_guard(self): + signal = validate_strategy_plugin_signal_payload( + { + **_signal_payload(plugin=PLUGIN_TACO_REBOUND_SHADOW), + "canonical_route": "taco_rebound", + "rebound_context_active": True, + } + ) + snapshot = PortfolioSnapshot( + as_of=datetime(2026, 4, 17, tzinfo=timezone.utc), + total_equity=100000.0, + metadata={"account_hash": "demo"}, + ) + + enriched = attach_strategy_plugin_metadata(snapshot, (signal,)) + + self.assertIs(enriched, snapshot) + self.assertEqual(enriched.metadata, {"account_hash": "demo"}) + def test_strategy_plugin_notification_lines_use_translator_when_available(self): signal = validate_strategy_plugin_signal_payload(_signal_payload()) translations = { @@ -366,6 +417,33 @@ def test_strategy_plugin_true_crisis_builds_generic_alert_message(self): self.assertTrue(alerts[0].metadata["would_trade_if_enabled"]) self.assertEqual(alerts[0].metadata["guidance"], "reduce leverage or move to cash") + def test_strategy_plugin_alert_message_includes_ai_audit_note_when_present(self): + signal = validate_strategy_plugin_signal_payload( + { + **_signal_payload(), + "canonical_route": "true_crisis", + "suggested_action": "defend", + "would_trade_if_enabled": True, + "ai_audit": { + "enabled": True, + "status": "ok", + "verdict": "agree", + "route_assessment": "confirm_true_crisis", + "summary": "Evidence is coherent; keep deterministic route unchanged.", + "final_route_unchanged": True, + }, + }, + source_uri="gs://bucket/latest_signal.json", + ) + + alerts = build_strategy_plugin_alert_messages([signal], strategy_label="TQQQ Growth Income") + + self.assertEqual(len(alerts), 1) + self.assertIn("AI audit: ok", alerts[0].body) + self.assertIn("verdict=agree", alerts[0].body) + self.assertIn("Evidence is coherent", alerts[0].body) + self.assertEqual(alerts[0].metadata["ai_audit"]["final_route_unchanged"], True) + def test_taco_rebound_notification_alerts_without_trade_flag(self): signal = validate_strategy_plugin_signal_payload( {