Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/sync-cloud-run-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ jobs:
gcloud_args=(
run services update "${CLOUD_RUN_SERVICE}"
--region "${CLOUD_RUN_REGION}"
--concurrency 1
--max-instances 1
--remove-env-vars "$(IFS=,; echo "${remove_env_vars[*]}")"
--update-env-vars "$(IFS=,; echo "${env_pairs[*]}")"
)
Expand Down
2 changes: 1 addition & 1 deletion gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# gunicorn.conf.py
timeout = 120
workers = 1
threads = 8
threads = 1
96 changes: 82 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""IBKR strategy runner for shared us_equity strategy profiles."""
import os
import threading
import time
import traceback
from datetime import datetime, timezone
Expand Down Expand Up @@ -56,6 +57,7 @@
app = Flask(__name__)
ensure_event_loop = ibkr_ensure_event_loop
NEW_YORK_TZ = ZoneInfo("America/New_York")
STRATEGY_RUN_LOCK = threading.Lock()


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -160,6 +162,32 @@ def get_ib_connect_timeout_seconds():
return timeout_seconds


def get_positive_int_env(name, default):
raw_value = os.getenv(name, str(default))
try:
parsed = int(raw_value)
except (TypeError, ValueError):
print(f"Invalid {name}={raw_value!r}; using {default}", flush=True)
return default
if parsed <= 0:
print(f"Invalid {name}={raw_value!r}; using {default}", flush=True)
return default
return parsed


def get_non_negative_float_env(name, default):
raw_value = os.getenv(name, str(default))
try:
parsed = float(raw_value)
except (TypeError, ValueError):
print(f"Invalid {name}={raw_value!r}; using {default}", flush=True)
return default
if parsed < 0:
print(f"Invalid {name}={raw_value!r}; using {default}", flush=True)
return default
return parsed


def _env_flag(name: str) -> bool:
return str(os.getenv(name) or "").strip().lower() in {"1", "true", "yes", "on"}

Expand All @@ -172,6 +200,9 @@ def _env_flag(name: str) -> bool:
IB_PORT = get_ib_port()
IB_CLIENT_ID = RUNTIME_SETTINGS.ib_client_id
IB_CONNECT_TIMEOUT_SECONDS = get_ib_connect_timeout_seconds()
IB_CONNECT_ATTEMPTS = get_positive_int_env("IBKR_CONNECT_ATTEMPTS", 3)
IB_CONNECT_RETRY_DELAY_SECONDS = get_non_negative_float_env("IBKR_CONNECT_RETRY_DELAY_SECONDS", 5.0)
IB_CLIENT_ID_RETRY_OFFSET = get_positive_int_env("IBKR_CLIENT_ID_RETRY_OFFSET", 100)
STRATEGY_PROFILE = RUNTIME_SETTINGS.strategy_profile
STRATEGY_DISPLAY_NAME = RUNTIME_SETTINGS.strategy_display_name
ACCOUNT_GROUP = RUNTIME_SETTINGS.account_group
Expand Down Expand Up @@ -255,6 +286,8 @@ def t(key, **kwargs):
"strategy_artifact_dir": RUNTIME_SETTINGS.strategy_artifact_dir,
"strategy_display_name": STRATEGY_DISPLAY_NAME,
"strategy_display_name_localized": strategy_display_name,
"ib_connect_attempts": IB_CONNECT_ATTEMPTS,
"ib_client_id_retry_offset": IB_CLIENT_ID_RETRY_OFFSET,
},
)
LAST_CYCLE_DETAILS: dict[str, object] = {}
Expand All @@ -271,20 +304,39 @@ def send_tg_message(message):

def connect_ib():
host = get_ib_host()
print(
"Connecting to IB gateway "
f"{host}:{IB_PORT} "
f"(mode={RUNTIME_SETTINGS.ib_gateway_mode}, "
f"client_id={IB_CLIENT_ID}, "
f"timeout={IB_CONNECT_TIMEOUT_SECONDS}s)",
flush=True,
)
return ibkr_connect_ib(
host,
IB_PORT,
IB_CLIENT_ID,
timeout=IB_CONNECT_TIMEOUT_SECONDS,
)
last_error = None
for attempt in range(1, IB_CONNECT_ATTEMPTS + 1):
client_id = IB_CLIENT_ID + ((attempt - 1) * IB_CLIENT_ID_RETRY_OFFSET)
print(
"Connecting to IB gateway "
f"{host}:{IB_PORT} "
f"(mode={RUNTIME_SETTINGS.ib_gateway_mode}, "
f"client_id={client_id}, "
f"attempt={attempt}/{IB_CONNECT_ATTEMPTS}, "
f"timeout={IB_CONNECT_TIMEOUT_SECONDS}s)",
flush=True,
)
try:
return ibkr_connect_ib(
host,
IB_PORT,
client_id,
timeout=IB_CONNECT_TIMEOUT_SECONDS,
)
except (ConnectionError, TimeoutError, OSError) as exc:
last_error = exc
print(
"IB gateway connection attempt failed "
f"(attempt={attempt}/{IB_CONNECT_ATTEMPTS}, "
f"client_id={client_id}, "
f"error_type={type(exc).__name__}, "
f"error={exc})",
flush=True,
)
if attempt < IB_CONNECT_ATTEMPTS and IB_CONNECT_RETRY_DELAY_SECONDS > 0:
time.sleep(IB_CONNECT_RETRY_DELAY_SECONDS)

raise last_error


def log_runtime_event(log_context, event, **fields):
Expand Down Expand Up @@ -568,13 +620,27 @@ def handle_request():
LAST_CYCLE_DETAILS = {}
log_context = build_request_log_context()
report = build_execution_report(log_context)
lock_acquired = STRATEGY_RUN_LOCK.acquire(blocking=False)
try:
log_runtime_event(
log_context,
"strategy_cycle_received",
message="Received strategy execution request",
http_method=request.method,
)
if not lock_acquired:
log_runtime_event(
log_context,
"strategy_cycle_already_running",
message="Another strategy execution is already running; skip overlapping request",
severity="WARNING",
)
finalize_runtime_report(
report,
status="skipped",
diagnostics={"skip_reason": "already_running"},
)
return "Already Running", 200
if not is_market_open_today():
log_runtime_event(
log_context,
Expand Down Expand Up @@ -654,6 +720,8 @@ def handle_request():
print(error_msg, flush=True)
return "Error", 500
finally:
if lock_acquired:
STRATEGY_RUN_LOCK.release()
try:
report_path = persist_execution_report(report)
print(f"execution_report {report_path}", flush=True)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
flask
gunicorn
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@v0.7.18
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@v0.7.19
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@main
pandas
numpy
Expand Down
27 changes: 27 additions & 0 deletions tests/test_event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ def fake_ibkr_connect(host, port, client_id, **kwargs):
assert observed["args"] == ("127.0.0.1", 4001, 1, {"timeout": 60})


def test_connect_ib_retries_with_offset_client_ids(strategy_module_factory, monkeypatch):
module = strategy_module_factory(
IBKR_CONNECT_ATTEMPTS="3",
IBKR_CONNECT_RETRY_DELAY_SECONDS="0",
IBKR_CLIENT_ID_RETRY_OFFSET="100",
)
observed = {"client_ids": []}

def fake_ibkr_connect(host, port, client_id, **kwargs):
observed["client_ids"].append(client_id)
if len(observed["client_ids"]) < 3:
raise TimeoutError("handshake timeout")
return object()

monkeypatch.setattr(module, "ibkr_connect_ib", fake_ibkr_connect)

module.connect_ib()

assert observed["client_ids"] == [1, 101, 201]


def test_ib_connect_attempts_falls_back_when_invalid(strategy_module_factory):
module = strategy_module_factory(IBKR_CONNECT_ATTEMPTS="0")

assert module.IB_CONNECT_ATTEMPTS == 3


def test_ib_connect_timeout_can_be_overridden(strategy_module_factory):
module = strategy_module_factory(IBKR_CONNECT_TIMEOUT_SECONDS="75")

Expand Down
26 changes: 26 additions & 0 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ def fake_run_strategy_core():
assert observed["called"] is True


def test_handle_request_skips_overlapping_post(strategy_module, monkeypatch):
observed = {}

def fail_if_called():
raise AssertionError("overlapping request should not execute strategy")

monkeypatch.setattr(strategy_module, "run_strategy_core", fail_if_called)
monkeypatch.setattr(
strategy_module,
"persist_execution_report",
lambda report: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json",
)

strategy_module.STRATEGY_RUN_LOCK.acquire()
try:
with strategy_module.app.test_request_context("/", method="POST"):
body, status = strategy_module.handle_request()
finally:
strategy_module.STRATEGY_RUN_LOCK.release()

assert status == 200
assert body == "Already Running"
assert observed["report"]["status"] == "skipped"
assert observed["report"]["diagnostics"]["skip_reason"] == "already_running"


def test_handle_request_emits_structured_runtime_events(strategy_module, monkeypatch):
observed = []

Expand Down
2 changes: 2 additions & 0 deletions tests/test_sync_cloud_run_env_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,7 @@ grep -Fq '"IB_GATEWAY_INSTANCE_NAME"' "$workflow_file"
grep -Fq '"IB_GATEWAY_MODE"' "$workflow_file"
grep -Fq -- '--remove-secrets "$(IFS=,; echo "${remove_secret_vars[*]}")"' "$workflow_file"
grep -Fq -- '--update-secrets "$(IFS=,; echo "${secret_pairs[*]}")"' "$workflow_file"
grep -Fq -- '--concurrency 1' "$workflow_file"
grep -Fq -- '--max-instances 1' "$workflow_file"
grep -Fq -- '--remove-env-vars "$(IFS=,; echo "${remove_env_vars[*]}")"' "$workflow_file"
grep -Fq -- '--update-env-vars "$(IFS=,; echo "${env_pairs[*]}")"' "$workflow_file"