diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index cd6dd87..a30915e 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -289,6 +289,8 @@ jobs: gcloud_args=( run services update "${CLOUD_RUN_SERVICE}" --region "${CLOUD_RUN_REGION}" + --concurrency 1 + --max-instances 1 --remove-env-vars "$(IFS=,; echo "${remove_env_vars[*]}")" --update-env-vars "$(IFS=,; echo "${env_pairs[*]}")" ) diff --git a/gunicorn.conf.py b/gunicorn.conf.py index 7589381..c37aa36 100644 --- a/gunicorn.conf.py +++ b/gunicorn.conf.py @@ -1,4 +1,4 @@ # gunicorn.conf.py timeout = 120 workers = 1 -threads = 8 +threads = 1 diff --git a/main.py b/main.py index 072cc54..5f93a7a 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ """IBKR strategy runner for shared us_equity strategy profiles.""" import os +import threading import time import traceback from datetime import datetime, timezone @@ -56,6 +57,7 @@ app = Flask(__name__) ensure_event_loop = ibkr_ensure_event_loop NEW_YORK_TZ = ZoneInfo("America/New_York") +STRATEGY_RUN_LOCK = threading.Lock() # --------------------------------------------------------------------------- @@ -160,6 +162,32 @@ def get_ib_connect_timeout_seconds(): return timeout_seconds +def get_positive_int_env(name, default): + raw_value = os.getenv(name, str(default)) + try: + parsed = int(raw_value) + except (TypeError, ValueError): + print(f"Invalid {name}={raw_value!r}; using {default}", flush=True) + return default + if parsed <= 0: + print(f"Invalid {name}={raw_value!r}; using {default}", flush=True) + return default + return parsed + + +def get_non_negative_float_env(name, default): + raw_value = os.getenv(name, str(default)) + try: + parsed = float(raw_value) + except (TypeError, ValueError): + print(f"Invalid {name}={raw_value!r}; using {default}", flush=True) + return default + if parsed < 0: + print(f"Invalid {name}={raw_value!r}; using {default}", flush=True) + return default + return parsed + + def _env_flag(name: str) -> bool: return str(os.getenv(name) or "").strip().lower() in {"1", "true", "yes", "on"} @@ -172,6 +200,9 @@ def _env_flag(name: str) -> bool: IB_PORT = get_ib_port() IB_CLIENT_ID = RUNTIME_SETTINGS.ib_client_id IB_CONNECT_TIMEOUT_SECONDS = get_ib_connect_timeout_seconds() +IB_CONNECT_ATTEMPTS = get_positive_int_env("IBKR_CONNECT_ATTEMPTS", 3) +IB_CONNECT_RETRY_DELAY_SECONDS = get_non_negative_float_env("IBKR_CONNECT_RETRY_DELAY_SECONDS", 5.0) +IB_CLIENT_ID_RETRY_OFFSET = get_positive_int_env("IBKR_CLIENT_ID_RETRY_OFFSET", 100) STRATEGY_PROFILE = RUNTIME_SETTINGS.strategy_profile STRATEGY_DISPLAY_NAME = RUNTIME_SETTINGS.strategy_display_name ACCOUNT_GROUP = RUNTIME_SETTINGS.account_group @@ -255,6 +286,8 @@ def t(key, **kwargs): "strategy_artifact_dir": RUNTIME_SETTINGS.strategy_artifact_dir, "strategy_display_name": STRATEGY_DISPLAY_NAME, "strategy_display_name_localized": strategy_display_name, + "ib_connect_attempts": IB_CONNECT_ATTEMPTS, + "ib_client_id_retry_offset": IB_CLIENT_ID_RETRY_OFFSET, }, ) LAST_CYCLE_DETAILS: dict[str, object] = {} @@ -271,20 +304,39 @@ def send_tg_message(message): def connect_ib(): host = get_ib_host() - print( - "Connecting to IB gateway " - f"{host}:{IB_PORT} " - f"(mode={RUNTIME_SETTINGS.ib_gateway_mode}, " - f"client_id={IB_CLIENT_ID}, " - f"timeout={IB_CONNECT_TIMEOUT_SECONDS}s)", - flush=True, - ) - return ibkr_connect_ib( - host, - IB_PORT, - IB_CLIENT_ID, - timeout=IB_CONNECT_TIMEOUT_SECONDS, - ) + last_error = None + for attempt in range(1, IB_CONNECT_ATTEMPTS + 1): + client_id = IB_CLIENT_ID + ((attempt - 1) * IB_CLIENT_ID_RETRY_OFFSET) + print( + "Connecting to IB gateway " + f"{host}:{IB_PORT} " + f"(mode={RUNTIME_SETTINGS.ib_gateway_mode}, " + f"client_id={client_id}, " + f"attempt={attempt}/{IB_CONNECT_ATTEMPTS}, " + f"timeout={IB_CONNECT_TIMEOUT_SECONDS}s)", + flush=True, + ) + try: + return ibkr_connect_ib( + host, + IB_PORT, + client_id, + timeout=IB_CONNECT_TIMEOUT_SECONDS, + ) + except (ConnectionError, TimeoutError, OSError) as exc: + last_error = exc + print( + "IB gateway connection attempt failed " + f"(attempt={attempt}/{IB_CONNECT_ATTEMPTS}, " + f"client_id={client_id}, " + f"error_type={type(exc).__name__}, " + f"error={exc})", + flush=True, + ) + if attempt < IB_CONNECT_ATTEMPTS and IB_CONNECT_RETRY_DELAY_SECONDS > 0: + time.sleep(IB_CONNECT_RETRY_DELAY_SECONDS) + + raise last_error def log_runtime_event(log_context, event, **fields): @@ -568,6 +620,7 @@ def handle_request(): LAST_CYCLE_DETAILS = {} log_context = build_request_log_context() report = build_execution_report(log_context) + lock_acquired = STRATEGY_RUN_LOCK.acquire(blocking=False) try: log_runtime_event( log_context, @@ -575,6 +628,19 @@ def handle_request(): message="Received strategy execution request", http_method=request.method, ) + if not lock_acquired: + log_runtime_event( + log_context, + "strategy_cycle_already_running", + message="Another strategy execution is already running; skip overlapping request", + severity="WARNING", + ) + finalize_runtime_report( + report, + status="skipped", + diagnostics={"skip_reason": "already_running"}, + ) + return "Already Running", 200 if not is_market_open_today(): log_runtime_event( log_context, @@ -654,6 +720,8 @@ def handle_request(): print(error_msg, flush=True) return "Error", 500 finally: + if lock_acquired: + STRATEGY_RUN_LOCK.release() try: report_path = persist_execution_report(report) print(f"execution_report {report_path}", flush=True) diff --git a/requirements.txt b/requirements.txt index ba28448..d950166 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ flask gunicorn -quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@v0.7.18 +quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@v0.7.19 us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@main pandas numpy diff --git a/tests/test_event_loop.py b/tests/test_event_loop.py index a88919e..2873405 100644 --- a/tests/test_event_loop.py +++ b/tests/test_event_loop.py @@ -36,6 +36,33 @@ def fake_ibkr_connect(host, port, client_id, **kwargs): assert observed["args"] == ("127.0.0.1", 4001, 1, {"timeout": 60}) +def test_connect_ib_retries_with_offset_client_ids(strategy_module_factory, monkeypatch): + module = strategy_module_factory( + IBKR_CONNECT_ATTEMPTS="3", + IBKR_CONNECT_RETRY_DELAY_SECONDS="0", + IBKR_CLIENT_ID_RETRY_OFFSET="100", + ) + observed = {"client_ids": []} + + def fake_ibkr_connect(host, port, client_id, **kwargs): + observed["client_ids"].append(client_id) + if len(observed["client_ids"]) < 3: + raise TimeoutError("handshake timeout") + return object() + + monkeypatch.setattr(module, "ibkr_connect_ib", fake_ibkr_connect) + + module.connect_ib() + + assert observed["client_ids"] == [1, 101, 201] + + +def test_ib_connect_attempts_falls_back_when_invalid(strategy_module_factory): + module = strategy_module_factory(IBKR_CONNECT_ATTEMPTS="0") + + assert module.IB_CONNECT_ATTEMPTS == 3 + + def test_ib_connect_timeout_can_be_overridden(strategy_module_factory): module = strategy_module_factory(IBKR_CONNECT_TIMEOUT_SECONDS="75") diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 1faa8ac..9c58d20 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -29,6 +29,32 @@ def fake_run_strategy_core(): assert observed["called"] is True +def test_handle_request_skips_overlapping_post(strategy_module, monkeypatch): + observed = {} + + def fail_if_called(): + raise AssertionError("overlapping request should not execute strategy") + + monkeypatch.setattr(strategy_module, "run_strategy_core", fail_if_called) + monkeypatch.setattr( + strategy_module, + "persist_execution_report", + lambda report: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json", + ) + + strategy_module.STRATEGY_RUN_LOCK.acquire() + try: + with strategy_module.app.test_request_context("/", method="POST"): + body, status = strategy_module.handle_request() + finally: + strategy_module.STRATEGY_RUN_LOCK.release() + + assert status == 200 + assert body == "Already Running" + assert observed["report"]["status"] == "skipped" + assert observed["report"]["diagnostics"]["skip_reason"] == "already_running" + + def test_handle_request_emits_structured_runtime_events(strategy_module, monkeypatch): observed = [] diff --git a/tests/test_sync_cloud_run_env_workflow.sh b/tests/test_sync_cloud_run_env_workflow.sh index 710c314..a570e10 100644 --- a/tests/test_sync_cloud_run_env_workflow.sh +++ b/tests/test_sync_cloud_run_env_workflow.sh @@ -86,5 +86,7 @@ grep -Fq '"IB_GATEWAY_INSTANCE_NAME"' "$workflow_file" grep -Fq '"IB_GATEWAY_MODE"' "$workflow_file" grep -Fq -- '--remove-secrets "$(IFS=,; echo "${remove_secret_vars[*]}")"' "$workflow_file" grep -Fq -- '--update-secrets "$(IFS=,; echo "${secret_pairs[*]}")"' "$workflow_file" +grep -Fq -- '--concurrency 1' "$workflow_file" +grep -Fq -- '--max-instances 1' "$workflow_file" grep -Fq -- '--remove-env-vars "$(IFS=,; echo "${remove_env_vars[*]}")"' "$workflow_file" grep -Fq -- '--update-env-vars "$(IFS=,; echo "${env_pairs[*]}")"' "$workflow_file"