Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,17 +311,28 @@ def execute_value_target_plan(
buy_budget = min(buy_budget, order_notional_cap)
quantity = _floor_quantity(buy_budget / price)
if quantity <= 0:
skipped.append(
{
"symbol": symbol,
"reason": "buy_quantity_zero",
**(
{"max_order_notional_usd": round(order_notional_cap, 2)}
if order_notional_cap is not None
else {}
),
}
)
if order_notional_cap is None and investable_cash < price:
skipped.append(
{
"symbol": symbol,
"reason": "insufficient_cash_for_whole_share",
"price": round(price, 2),
"investable_cash": round(investable_cash, 2),
"required_cash_for_one_share": round(price, 2),
}
)
else:
skipped.append(
{
"symbol": symbol,
"reason": "buy_quantity_zero",
**(
{"max_order_notional_usd": round(order_notional_cap, 2)}
if order_notional_cap is not None
else {}
),
}
)
continue
submitted.append(
_submit_order(
Expand Down
37 changes: 33 additions & 4 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@

LIMIT_SELL_DISCOUNT = 0.995
LIMIT_BUY_PREMIUM = 1.005
EXECUTION_BLOCKING_SKIP_REASONS = frozenset(
{
"buy_quantity_zero",
"insufficient_cash_for_whole_share",
"quote_unavailable",
"sell_quantity_zero",
}
)


def _utcnow() -> datetime:
Expand All @@ -55,6 +63,14 @@ def get_project_id() -> str | None:
return os.getenv("GOOGLE_CLOUD_PROJECT")


def _execution_blocking_skips(skipped_orders: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
dict(item)
for item in skipped_orders
if str(item.get("reason") or "") in EXECUTION_BLOCKING_SKIP_REASONS
]


def _series_from_price_history(market_data_port, symbol: str) -> pd.Series:
series = market_data_port.get_price_series(symbol)
index = pd.DatetimeIndex([pd.Timestamp(point.as_of) for point in series.points])
Expand Down Expand Up @@ -284,8 +300,12 @@ def run_strategy_cycle(
max_order_notional_usd=settings.max_order_notional_usd,
safe_haven_cash_substitute_threshold_usd=settings.safe_haven_cash_substitute_threshold_usd,
)
submitted_orders = list(execution_result.submitted_orders)
skipped_orders = list(execution_result.skipped_orders)
blocking_skips = _execution_blocking_skips(skipped_orders)
execution_blocked = bool(blocking_skips)
result = {
"ok": True,
"ok": not execution_blocked,
"api_kind": "unofficial-reverse-engineered",
"account": mask_account_id(account),
"strategy_profile": strategy_runtime.profile,
Expand All @@ -298,16 +318,25 @@ def run_strategy_cycle(
"portfolio": plan.get("portfolio", {}),
"allocation": plan.get("allocation", {}),
"execution": plan.get("execution", {}),
"submitted_orders": list(execution_result.submitted_orders),
"skipped_orders": list(execution_result.skipped_orders),
"submitted_orders": submitted_orders,
"skipped_orders": skipped_orders,
"action_done": execution_result.action_done,
}
if execution_blocked:
result["execution_blocked"] = True
result["execution_blocking_skips"] = blocking_skips
result["error"] = "Strategy execution blocked; see execution_blocking_skips."
if strategy_run_persistence_error:
result["strategy_run_persistence_error"] = strategy_run_persistence_error
if persist_strategy_runs:
stage = "DRY_RUN_COMPLETED"
if not settings.dry_run_only:
stage = "SUBMITTED" if execution_result.action_done else "NO_ACTION"
if execution_blocked and execution_result.action_done:
stage = "PARTIAL_SUBMITTED"
elif execution_blocked:
Comment on lines +334 to +336

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent partial retries from re-submitting prior orders

Persisting PARTIAL_SUBMITTED as a non-terminal stage means the same run_period is no longer idempotency-skipped on the next scheduler attempt, so a retry can execute the full plan again and submit symbols that were already submitted in the first attempt. This is especially risky when retries happen before previous orders are reflected in balances/positions, because there is no deduplication against prior submitted_orders or open orders, which can over-size live positions.

Useful? React with 👍 / 👎.

stage = "EXECUTION_BLOCKED"
else:
stage = "SUBMITTED" if execution_result.action_done else "NO_ACTION"
completed_state = build_strategy_run_state(
stage=stage,
account=masked_account,
Expand Down
8 changes: 8 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"order_price_suffix": " @ ${price}",
"order_id_suffix": "(订单号: {order_id})",
"no_order_submitted": "未下单: 原因={reason}",
"execution_blocked_banner": "⚠️ 执行阻塞: {reason}",
"no_rebalance_needed": "✅ 无需调仓",
"no_trades": "✅ 无需调仓",
"no_executable_orders": "无可执行订单",
Expand Down Expand Up @@ -94,6 +95,7 @@
"skip_reason_quote_unavailable": "无法获取报价",
"skip_reason_sell_quantity_zero": "卖出股数为0",
"skip_reason_buy_quantity_zero": "买入股数为0",
"skip_reason_insufficient_cash_for_whole_share": "现金不足以买入一整股",
"skip_reason_unknown": "未知原因",
},
"en": {
Expand Down Expand Up @@ -132,6 +134,7 @@
"order_price_suffix": " @ ${price}",
"order_id_suffix": " (ID: {order_id})",
"no_order_submitted": "No order submitted: reason={reason}",
"execution_blocked_banner": "⚠️ Execution blocked: {reason}",
"no_rebalance_needed": "✅ No rebalance needed",
"no_trades": "✅ No rebalance needed",
"no_executable_orders": "no executable orders",
Expand Down Expand Up @@ -172,6 +175,7 @@
"skip_reason_quote_unavailable": "quote unavailable",
"skip_reason_sell_quantity_zero": "sell quantity rounds to 0",
"skip_reason_buy_quantity_zero": "buy quantity rounds to 0",
"skip_reason_insufficient_cash_for_whole_share": "insufficient cash for one whole share",
"skip_reason_unknown": "unknown reason",
},
}
Expand Down Expand Up @@ -550,6 +554,10 @@ def render_cycle_summary(result: Mapping[str, Any], *, lang: str = "en") -> str:
lines.append(translator("account_label", account=account))
if dry_run_only:
lines.append(translator("dry_run_banner"))
if bool(result.get("execution_blocked")):
blocked = list(result.get("execution_blocking_skips") or skipped)
reason = _format_skipped_reason(blocked, translator=translator)
lines.append(translator("execution_blocked_banner", reason=reason))

dashboard_lines = _format_dashboard_lines(portfolio, execution, translator=translator)
if dashboard_lines:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,36 @@ def test_execute_value_target_plan_has_no_default_order_notional_cap():
assert execution_port.orders[0].metadata == {}


def test_execute_value_target_plan_reports_insufficient_cash_for_whole_share():
execution_port = FakeExecutionPort()
result = execute_value_target_plan(
plan={
"allocation": {"targets": {"SPY": 500.0}},
"portfolio": {
"market_values": {"SPY": 0.0},
"sellable_quantities": {},
"liquid_cash": 50.0,
},
"execution": {"current_min_trade": 1.0, "investable_cash": 50.0},
},
market_data_port=FakeMarketDataPort({"SPY": 100.0}),
execution_port=execution_port,
dry_run_only=True,
)

assert result.action_done is False
assert execution_port.orders == []
assert result.skipped_orders == (
{
"symbol": "SPY",
"reason": "insufficient_cash_for_whole_share",
"price": 100.0,
"investable_cash": 50.0,
"required_cash_for_one_share": 100.0,
},
)


def test_execute_value_target_plan_leaves_small_safe_haven_target_as_cash():
execution_port = FakeExecutionPort()
plan = {
Expand Down
91 changes: 89 additions & 2 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def fake_client_factory(*args, **kwargs):
assert store.writes == []


def test_run_strategy_cycle_persists_live_no_action_without_duplicate_terminal_stage(monkeypatch):
def test_run_strategy_cycle_persists_live_execution_blocked_without_terminal_stage(monkeypatch):
store = FakeStateStore()
settings = _runtime_settings_with_persistence(
dry_run_only=False,
Expand All @@ -260,7 +260,63 @@ def test_run_strategy_cycle_persists_live_no_action_without_duplicate_terminal_s

latest_payload = store.writes[-2][1]
assert result["action_done"] is False
assert latest_payload["stage"] == "NO_ACTION"
assert result["ok"] is False
assert result["execution_blocked"] is True
assert latest_payload["stage"] == "EXECUTION_BLOCKED"


def test_run_strategy_cycle_persists_live_partial_submission_as_non_terminal(monkeypatch):
store = FakeStateStore()
settings = _runtime_settings_with_persistence(
dry_run_only=False,
live_trading_enabled=True,
live_order_ack=True,
persist_strategy_runs=True,
max_order_notional_usd=1000.0,
)

class PartialRuntime(FakeStrategyRuntime):
managed_symbols = ("AAA", "BBB")

def evaluate(self, **inputs):
assert "portfolio_snapshot" in inputs
return SimpleNamespace(
decision=StrategyDecision(
positions=(
PositionTarget(symbol="AAA", target_value=50.0, role="risk"),
PositionTarget(symbol="BBB", target_value=150.0, role="risk"),
),
diagnostics={"execution_annotations": {"trade_threshold_value": 1.0}},
),
metadata={"strategy_profile": self.profile},
)

class PartialClient(FakeFirstradeClient):
def get_balances(self, _account):
return {"total_value": "100.00", "cash": "60.00", "buying_power": "60.00"}

def get_quote(self, _account, symbol):
prices = {"AAA": "10.00", "BBB": "100.00"}
return {"symbol": symbol, "last": prices[symbol], "bid": "9.90", "ask": "10.10"}

monkeypatch.setattr(
"application.rebalance_service.load_strategy_runtime",
lambda *_args, **_kwargs: PartialRuntime(),
)

result = run_strategy_cycle(
runtime_settings=settings,
credentials=FirstradeCredentials(username="user", password="pass"),
client_factory=PartialClient,
state_store=store,
env_reader=lambda _name, default=None: default,
)

latest_payload = store.writes[-2][1]
assert result["action_done"] is True
assert result["ok"] is False
assert result["execution_blocked"] is True
assert latest_payload["stage"] == "PARTIAL_SUBMITTED"


def test_render_cycle_summary_formats_skipped_orders_in_unified_chinese_template():
Expand Down Expand Up @@ -400,3 +456,34 @@ def test_render_cycle_summary_formats_skipped_orders_in_unified_english_template
assert "信号" not in message
assert "profile:" not in message
assert "targets:" not in message


def test_render_cycle_summary_shows_execution_blocked_banner():
message = render_cycle_summary(
{
"account": "****1234",
"strategy_profile": "mega_cap_leader_rotation_top50_balanced",
"strategy_display_name": "Mega Cap Leader Rotation Top50 Balanced",
"dry_run_only": False,
"execution_blocked": True,
"execution_blocking_skips": [
{"symbol": "NVDA", "reason": "insufficient_cash_for_whole_share"}
],
"portfolio": {
"total_equity": 50.0,
"liquid_cash": 50.0,
"portfolio_rows": (("NVDA",),),
"market_values": {"NVDA": 0.0},
"quantities": {"NVDA": 0},
},
"allocation": {"targets": {"NVDA": 500.0}},
"execution": {},
"submitted_orders": [],
"skipped_orders": [
{"symbol": "NVDA", "reason": "insufficient_cash_for_whole_share"}
],
},
lang="zh",
)

assert "⚠️ 执行阻塞: 现金不足以买入一整股:NVDA" in message