Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/quant_platform_kit/common/strategy_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from dataclasses import replace as dataclass_replace
from pathlib import Path
from typing import Any, Callable

Expand Down Expand Up @@ -427,6 +428,52 @@ def build_strategy_plugin_report_payload(signals: Sequence[StrategyPluginSignal]
}


def build_strategy_plugin_metadata(signals: Sequence[StrategyPluginSignal]) -> dict[str, Any]:
"""Build portfolio-snapshot metadata consumed by deterministic strategies."""
plugin_payloads: dict[str, Any] = {}
summaries: dict[str, Any] = {}
for signal in signals:
execution_controls = getattr(signal, "execution_controls", {}) or {}
if not isinstance(execution_controls, Mapping) or not _as_bool(
execution_controls.get("strategy_runtime_metadata_allowed"),
default=False,
):
continue
plugin = str(getattr(signal, "plugin", "") or "").strip()
if not plugin:
continue
payload = dict(getattr(signal, "payload", {}) or {})
plugin_payloads[plugin] = payload
summaries[plugin] = signal.report_summary()
if not plugin_payloads:
return {}
metadata: dict[str, Any] = {
"strategy_plugins": plugin_payloads,
"strategy_plugin_summaries": summaries,
}
metadata.update(plugin_payloads)
return metadata


def attach_strategy_plugin_metadata(snapshot: Any, signals: Sequence[StrategyPluginSignal]) -> Any:
"""Return a snapshot copy with plugin payloads attached to metadata."""
plugin_metadata = build_strategy_plugin_metadata(signals)
if not plugin_metadata:
return snapshot
current_metadata = getattr(snapshot, "metadata", {}) or {}
if not isinstance(current_metadata, Mapping):
current_metadata = {}
merged_metadata = {**dict(current_metadata), **plugin_metadata}
try:
return dataclass_replace(snapshot, metadata=merged_metadata)
except TypeError:
try:
snapshot.metadata = merged_metadata
except Exception:
return snapshot
return snapshot


def translate_strategy_plugin_value(
category: str,
raw_value: str | None,
Expand Down Expand Up @@ -532,6 +579,41 @@ def build_strategy_plugin_alert_scope_note(
)


def build_strategy_plugin_ai_audit_note(
signal: StrategyPluginSignal,
*,
translator: Callable[..., str] | None = None,
) -> str | None:
payload = getattr(signal, "payload", {}) or {}
if not isinstance(payload, Mapping):
return None
ai_audit = payload.get("ai_audit")
if not isinstance(ai_audit, Mapping) or not _as_bool(ai_audit.get("enabled"), default=False):
return None
status = _normalize_strategy_plugin_field(_optional_string(ai_audit.get("status")))
if status == "ok":
verdict = _optional_string(ai_audit.get("verdict")) or "unknown"
assessment = _optional_string(ai_audit.get("route_assessment")) or "unknown"
summary = _optional_string(ai_audit.get("summary")) or "no summary"
return _translate(
translator,
"strategy_plugin_alert_ai_audit",
fallback="AI audit: {status} | verdict={verdict} | assessment={assessment} | {summary}",
status=status,
verdict=verdict,
assessment=assessment,
summary=summary,
)
reason = _optional_string(ai_audit.get("skip_reason")) or _optional_string(ai_audit.get("error")) or "no detail"
return _translate(
translator,
"strategy_plugin_alert_ai_audit_status",
fallback="AI audit: {status} | {reason}",
status=status,
reason=reason,
)


def build_strategy_plugin_alert_key(
signal: StrategyPluginSignal,
*,
Expand Down Expand Up @@ -593,6 +675,7 @@ def build_strategy_plugin_alert_messages(
strategy = str(strategy_label or getattr(signal, "strategy", None) or "").strip() or "unknown"
guidance = build_strategy_plugin_alert_guidance(signal, translator=translator)
scope_note = build_strategy_plugin_alert_scope_note(signal, translator=translator)
ai_audit_note = build_strategy_plugin_ai_audit_note(signal, translator=translator)
subject = _translate(
translator,
"strategy_plugin_alert_subject",
Expand Down Expand Up @@ -665,6 +748,8 @@ def build_strategy_plugin_alert_messages(
guidance=guidance,
)
)
if ai_audit_note:
body_lines.append(ai_audit_note)
if scope_note:
body_lines.append(
_translate(
Expand All @@ -686,6 +771,9 @@ def build_strategy_plugin_alert_messages(
"context_label": context or None,
"guidance": guidance,
"scope_note": scope_note,
"ai_audit": getattr(signal, "payload", {}).get("ai_audit")
if isinstance(getattr(signal, "payload", {}), Mapping)
else None,
}
messages.append(
StrategyPluginAlertMessage(
Expand Down
2 changes: 2 additions & 0 deletions src/quant_platform_kit/ibkr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .market_data import (
fetch_historical_price_candles,
fetch_historical_price_series,
fetch_option_chain_snapshot,
fetch_quote_snapshots,
)
from .portfolio import fetch_portfolio_snapshot
Expand All @@ -23,6 +24,7 @@
"connect_ib",
"ensure_event_loop",
"fetch_historical_price_candles",
"fetch_option_chain_snapshot",
"submit_order_intent",
"fetch_historical_price_series",
"fetch_quote_snapshots",
Expand Down
198 changes: 193 additions & 5 deletions src/quant_platform_kit/ibkr/execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import date, datetime
from typing import Any, Callable

from quant_platform_kit.common.models import ExecutionReport, OrderIntent
Expand All @@ -24,23 +25,203 @@ def _build_stock_contract(
return stock_factory(symbol, exchange, currency)


def _normalize_option_expiration(value: Any) -> str:
text = str(value or "").strip()
if len(text) == 8 and text.isdigit():
return text
if not text:
raise ValueError("Option OrderIntent.metadata.expiration is required.")
if isinstance(value, datetime):
return value.date().strftime("%Y%m%d")
if isinstance(value, date):
return value.strftime("%Y%m%d")
try:
return datetime.fromisoformat(text[:10]).date().strftime("%Y%m%d")
except ValueError as exc:
raise ValueError(f"Invalid option expiration: {value!r}") from exc


def _normalize_option_right(value: Any) -> str:
text = str(value or "").strip().upper()
if text in {"CALL", "C"}:
return "C"
if text in {"PUT", "P"}:
return "P"
raise ValueError("Option OrderIntent.metadata.right must be C/call or P/put.")


def _build_option_contract(
order_intent: OrderIntent,
*,
option_factory: Callable[..., Any] | None = None,
exchange: str = "SMART",
currency: str = "USD",
) -> Any:
metadata = dict(order_intent.metadata or {})
underlier = str(metadata.get("underlier") or order_intent.symbol or "").strip().upper()
if not underlier:
raise ValueError("Option OrderIntent requires symbol or metadata.underlier.")
expiration = _normalize_option_expiration(metadata.get("expiration"))
right = _normalize_option_right(metadata.get("right"))
try:
strike = float(metadata.get("strike"))
except (TypeError, ValueError) as exc:
raise ValueError("Option OrderIntent.metadata.strike is required.") from exc
if strike <= 0.0:
raise ValueError("Option OrderIntent.metadata.strike must be positive.")
if option_factory is None:
from ib_insync import Option

option_factory = Option
return option_factory(
underlier,
expiration,
strike,
right,
exchange=exchange,
currency=currency,
)


def _is_option_intent(order_intent: OrderIntent) -> bool:
metadata = dict(order_intent.metadata or {})
return (
str(metadata.get("asset_class") or "").strip().lower() == "option"
or str(metadata.get("security_type") or "").strip().upper() == "OPT"
or str(metadata.get("security_type") or "").strip().upper() == "BAG"
or str(metadata.get("intent_type") or "").strip() == "single_leg_option"
or str(metadata.get("intent_type") or "").strip() == "multi_leg_option"
)


def _is_combo_option_intent(order_intent: OrderIntent) -> bool:
metadata = dict(order_intent.metadata or {})
return (
str(metadata.get("asset_class") or "").strip().lower() == "option"
and str(metadata.get("intent_type") or "").strip() == "multi_leg_option"
Comment on lines +100 to +101

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Route BAG/multi-leg intents to combo builder

When callers identify a combo with metadata["security_type"] == "BAG" or only metadata["intent_type"] == "multi_leg_option", _is_option_intent() recognizes it as an option, but this narrower check returns false unless asset_class is also exactly option. Those intents then fall through to _build_option_contract() and fail on missing top-level right/strike instead of building the legs, so IBKR-style BAG orders are not submitable unless they include an extra, undocumented asset_class field.

Useful? React with 👍 / 👎.

)


def _leg_action(value: Any) -> str:
text = str(value or "").strip().lower()
if text.startswith("buy"):
return "BUY"
if text.startswith("sell"):
return "SELL"
raise ValueError(f"Unsupported option combo leg action: {value!r}")


def _build_option_combo_contract(
ib: Any,
order_intent: OrderIntent,
*,
option_factory: Callable[..., Any] | None = None,
combo_contract_factory: Callable[..., Any] | None = None,
combo_leg_factory: Callable[..., Any] | None = None,
exchange: str = "SMART",
currency: str = "USD",
) -> Any:
metadata = dict(order_intent.metadata or {})
underlier = str(metadata.get("underlier") or order_intent.symbol or "").strip().upper()
legs = tuple(metadata.get("legs") or ())
if not underlier or not legs:
raise ValueError("Multi-leg option OrderIntent requires metadata.underlier and metadata.legs.")
if combo_contract_factory is None:
from ib_insync import Contract

combo_contract_factory = Contract
if combo_leg_factory is None:
from ib_insync import ComboLeg

combo_leg_factory = ComboLeg

combo_legs = []
for leg in legs:
if not isinstance(leg, dict):
raise ValueError("Option combo legs must be mappings.")
option_contract = _build_option_contract(
OrderIntent(
symbol=underlier,
side=_leg_action(leg.get("action")),
quantity=1,
metadata={
"underlier": underlier,
"expiration": leg.get("expiration") or metadata.get("expiration"),
"right": leg.get("right"),
"strike": leg.get("strike"),
},
),
option_factory=option_factory,
exchange=exchange,
currency=currency,
)
qualified = ib.qualifyContracts(option_contract)
qualified_contract = qualified[0] if qualified else option_contract
con_id = getattr(qualified_contract, "conId", None)
if con_id is None:
raise ValueError("Qualified option combo leg did not expose conId.")
combo_legs.append(
combo_leg_factory(
conId=con_id,
ratio=int(leg.get("ratio") or 1),
action=_leg_action(leg.get("action")),
exchange=exchange,
)
)

contract = combo_contract_factory()
contract.symbol = underlier
contract.secType = "BAG"
contract.exchange = exchange
contract.currency = currency
contract.comboLegs = combo_legs
return contract


def _normalize_order_side(side: str) -> str:
text = str(side or "").strip().lower()
if text.startswith("buy"):
return "BUY"
if text.startswith("sell"):
return "SELL"
raise ValueError(f"Unsupported order side: {side!r}")


def submit_order_intent(
ib: Any,
order_intent: OrderIntent,
*,
account_id: str | None = None,
wait_seconds: float = 1.0,
stock_factory: Callable[..., Any] | None = None,
option_factory: Callable[..., Any] | None = None,
combo_contract_factory: Callable[..., Any] | None = None,
combo_leg_factory: Callable[..., Any] | None = None,
market_order_factory: Callable[..., Any] | None = None,
limit_order_factory: Callable[..., Any] | None = None,
) -> ExecutionReport:
contract = _build_stock_contract(
order_intent.symbol,
stock_factory=stock_factory,
)
metadata = dict(order_intent.metadata or {})
if _is_combo_option_intent(order_intent):
contract = _build_option_combo_contract(
ib,
order_intent,
option_factory=option_factory,
combo_contract_factory=combo_contract_factory,
combo_leg_factory=combo_leg_factory,
)
elif _is_option_intent(order_intent):
contract = _build_option_contract(
order_intent,
option_factory=option_factory,
)
else:
contract = _build_stock_contract(
order_intent.symbol,
stock_factory=stock_factory,
)
ib.qualifyContracts(contract)

side = order_intent.side.upper()
side = _normalize_order_side(order_intent.side)
order_type = order_intent.order_type.lower()
if order_type == "market":
if market_order_factory is None:
Expand Down Expand Up @@ -90,5 +271,12 @@ def submit_order_intent(
"order_type": order_type,
"time_in_force": getattr(order, "tif", None),
"account_id": resolved_account_id,
"asset_class": metadata.get("asset_class"),
"intent_type": metadata.get("intent_type"),
"underlier": metadata.get("underlier"),
"right": metadata.get("right"),
"expiration": metadata.get("expiration"),
"strike": metadata.get("strike"),
"legs": metadata.get("legs"),
},
)
Loading