From 107ca2e323b6c11cd1c59c4b6ab3994f0afa9dda Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Fri, 19 Jun 2026 18:41:12 +0800 Subject: [PATCH] Add shared monitor dispatcher scheduler --- .github/workflows/sync-cloud-run-env.yml | 229 ++++++++++++++++-- application/monitor_dispatcher.py | 271 ++++++++++++++++++++++ main.py | 22 ++ tests/test_monitor_dispatcher.py | 57 +++++ tests/test_request_handling.py | 36 +++ tests/test_sync_cloud_run_env_workflow.sh | 13 +- 6 files changed, 602 insertions(+), 26 deletions(-) create mode 100644 application/monitor_dispatcher.py create mode 100644 tests/test_monitor_dispatcher.py diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index 7658278..87e64b3 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -53,6 +53,7 @@ env: GCP_WORKLOAD_IDENTITY_PROVIDER: projects/252919773759/locations/global/workloadIdentityPools/github-actions/providers/github-main GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT: longbridge-platform-deploy@longbridgequant.iam.gserviceaccount.com GCP_RUNTIME_SERVICE_ACCOUNT: longbridge-platform-runtime@longbridgequant.iam.gserviceaccount.com + GCP_SCHEDULER_SERVICE_ACCOUNT: longbridge-platform-scheduler@longbridgequant.iam.gserviceaccount.com GCP_ARTIFACT_REGISTRY_REPOSITORY: cloud-run-source-deploy concurrency: @@ -99,11 +100,13 @@ jobs: # Set CLOUD_RUN_REGION per Environment so paper/HK/SG can target different regions. CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }} CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} + CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }} CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT: ${{ vars.CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT }} CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }} CLOUD_SCHEDULER_MAIN_TIME: ${{ vars.CLOUD_SCHEDULER_MAIN_TIME }} CLOUD_SCHEDULER_PROBE_TIME: ${{ vars.CLOUD_SCHEDULER_PROBE_TIME }} CLOUD_SCHEDULER_PRECHECK_TIME: ${{ vars.CLOUD_SCHEDULER_PRECHECK_TIME }} + MONITOR_DISPATCHER_OWNER_LABEL: ${{ vars.MONITOR_DISPATCHER_OWNER_LABEL || 'SG' }} ACCOUNT_PREFIX: ${{ vars.ACCOUNT_PREFIX }} TELEGRAM_TOKEN_SECRET_NAME: ${{ vars.TELEGRAM_TOKEN_SECRET_NAME }} LONGPORT_APP_KEY_SECRET_NAME: ${{ vars.LONGPORT_APP_KEY_SECRET_NAME }} @@ -961,6 +964,89 @@ jobs: remove_env_vars+=("RUNTIME_TARGET_ENABLED") fi + monitor_targets_json="$( + python - <<'PY' + import json + import os + import subprocess + + def decode_json(raw, fallback): + raw = str(raw or "").strip() + if not raw: + return fallback + try: + return json.loads(raw) + except json.JSONDecodeError: + return fallback + + def runtime_target_from(source): + runtime_target = source.get("runtime_target") or source.get("runtime_target_json") + if isinstance(runtime_target, str): + runtime_target = decode_json(runtime_target, {}) + return runtime_target if isinstance(runtime_target, dict) else {} + + raw_targets = decode_json(os.environ.get("CLOUD_RUN_SERVICE_TARGETS_JSON"), None) + if isinstance(raw_targets, dict): + source_targets = raw_targets.get("targets") + else: + source_targets = raw_targets + if not isinstance(source_targets, list) or not source_targets: + source_targets = [ + { + "service": os.environ.get("CLOUD_RUN_SERVICE"), + "region": os.environ.get("CLOUD_RUN_REGION"), + "runtime_target": decode_json(os.environ.get("RUNTIME_TARGET_JSON"), {}), + "runtime_target_enabled": os.environ.get("RUNTIME_TARGET_ENABLED", "true"), + } + ] + + project = os.environ.get("GCP_PROJECT_ID", "").strip() + output = [] + for source in source_targets: + if not isinstance(source, dict): + continue + runtime_target = runtime_target_from(source) + service_name = ( + source.get("service") + or source.get("service_name") + or source.get("cloud_run_service") + or runtime_target.get("service_name") + ) + region = source.get("region") or source.get("cloud_run_region") or os.environ.get("CLOUD_RUN_REGION") + service_name = str(service_name or "").strip() + region = str(region or "").strip() + if not service_name or not region: + continue + service_url = subprocess.check_output( + [ + "gcloud", + "run", + "services", + "describe", + service_name, + f"--project={project}", + f"--region={region}", + "--format=value(status.url)", + ], + text=True, + ).strip() + if not service_url: + continue + output.append( + { + "service_name": service_name, + "service_url": service_url, + "strategy_profile": runtime_target.get("strategy_profile") or source.get("STRATEGY_PROFILE"), + "account_scope": runtime_target.get("account_scope") or source.get("ACCOUNT_REGION") or source.get("account_scope"), + "runtime_target_enabled": source.get("runtime_target_enabled", source.get("RUNTIME_TARGET_ENABLED", True)), + "scheduler": runtime_target.get("scheduler") if isinstance(runtime_target.get("scheduler"), dict) else {}, + } + ) + print(json.dumps({"targets": output}, separators=(",", ":"))) + PY + )" + env_pairs+=("MONITOR_DISPATCH_TARGETS_JSON=${monitor_targets_json}") + gcloud_args=( run services update "${CLOUD_RUN_SERVICE}" --region "${CLOUD_RUN_REGION}" @@ -1042,33 +1128,42 @@ jobs: exit 1 fi - for suffix in scheduler probe-scheduler precheck-scheduler; do - job_name="${CLOUD_RUN_SERVICE}-${suffix}" - case "${suffix}" in - scheduler) - schedule_time="${main_time}" - scheduler_path="/run" - ;; - probe-scheduler) - schedule_time="${probe_time}" - scheduler_path="/probe" - ;; - precheck-scheduler) - schedule_time="${precheck_time}" - scheduler_path="/dry-run" - ;; - esac - - current_schedule="$(gcloud scheduler jobs describe "${job_name}" \ + gcloud run services add-iam-policy-binding "${CLOUD_RUN_SERVICE}" \ + --project="${GCP_PROJECT_ID}" \ + --region="${CLOUD_RUN_REGION}" \ + --member="serviceAccount:${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --role="roles/run.invoker" \ + --quiet + gcloud run services add-iam-policy-binding "${CLOUD_RUN_SERVICE}" \ + --project="${GCP_PROJECT_ID}" \ + --region="${CLOUD_RUN_REGION}" \ + --member="serviceAccount:${GCP_RUNTIME_SERVICE_ACCOUNT}" \ + --role="roles/run.invoker" \ + --quiet + + scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-scheduler") + if [[ "${CLOUD_RUN_SERVICE}" == *-service ]]; then + scheduler_job_candidates+=("${CLOUD_RUN_SERVICE%-service}-scheduler") + fi + + job_name="" + current_schedule="" + for candidate_job in "${scheduler_job_candidates[@]}"; do + current_schedule="$(gcloud scheduler jobs describe "${candidate_job}" \ --project="${GCP_PROJECT_ID}" \ --location="${scheduler_location}" \ --format='value(schedule)' 2>/dev/null || true)" - if [ -z "${current_schedule}" ]; then - echo "Cloud Scheduler job ${job_name} was not found in ${scheduler_location}; skipping schedule sync." - continue + if [ -n "${current_schedule}" ]; then + job_name="${candidate_job}" + break fi + done + if [ -z "${job_name}" ]; then + job_name="${scheduler_job_candidates[0]}" + fi - desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${schedule_time}" python - <<'PY' + if [ -n "${current_schedule}" ]; then + desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${main_time}" python - <<'PY' import os current_fields = os.environ["CURRENT_SCHEDULE"].split() @@ -1085,8 +1180,25 @@ jobs: ) PY )" + else + desired_schedule="$(SCHEDULE_TIME="${main_time}" python - <<'PY' + import os + + fields = os.environ["SCHEDULE_TIME"].split() + if len(fields) == 5: + print(" ".join(fields)) + elif len(fields) == 2: + print(" ".join([*fields, "*", "*", "*"])) + else: + raise SystemExit( + f"Cloud Scheduler override must have 2 time fields or 5 cron fields: {os.environ['SCHEDULE_TIME']!r}" + ) + PY + )" + fi - scheduler_uri="${service_url}${scheduler_path}" + scheduler_uri="${service_url}/run" + if [ -n "${current_schedule}" ]; then echo "Updating Cloud Scheduler job ${job_name} schedule to ${desired_schedule}, timezone to ${market_timezone}, and URI to ${scheduler_uri}." gcloud scheduler jobs update http "${job_name}" \ --project="${GCP_PROJECT_ID}" \ @@ -1095,6 +1207,77 @@ jobs: --schedule="${desired_schedule}" \ --time-zone="${market_timezone}" \ --quiet + else + echo "Creating Cloud Scheduler job ${job_name} schedule ${desired_schedule}, timezone ${market_timezone}, and URI ${scheduler_uri}." + gcloud scheduler jobs create http "${job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${scheduler_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="${desired_schedule}" \ + --time-zone="${market_timezone}" \ + --attempt-deadline=600s \ + --quiet + fi + + if [ "${DEPLOYMENT_LABEL:-}" = "${MONITOR_DISPATCHER_OWNER_LABEL:-SG}" ]; then + monitor_job_name="longbridge-monitor-dispatcher-scheduler" + monitor_uri="${service_url}/monitor-dispatch" + if gcloud scheduler jobs describe "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" >/dev/null 2>&1; then + echo "Updating Cloud Scheduler job ${monitor_job_name} to ${monitor_uri}." + gcloud scheduler jobs update http "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${monitor_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="*/5 * * * *" \ + --time-zone="UTC" \ + --attempt-deadline=180s \ + --quiet + else + echo "Creating Cloud Scheduler job ${monitor_job_name} at ${monitor_uri}." + gcloud scheduler jobs create http "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${monitor_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="*/5 * * * *" \ + --time-zone="UTC" \ + --attempt-deadline=180s \ + --quiet + fi + else + echo "Skipping shared LongBridge monitor dispatcher scheduler from ${DEPLOYMENT_LABEL}; owner is ${MONITOR_DISPATCHER_OWNER_LABEL:-SG}." + fi + + legacy_jobs=( + "${CLOUD_RUN_SERVICE}-probe-scheduler" + "${CLOUD_RUN_SERVICE}-precheck-scheduler" + ) + if [[ "${CLOUD_RUN_SERVICE}" == *-service ]]; then + legacy_jobs+=( + "${CLOUD_RUN_SERVICE%-service}-probe-scheduler" + "${CLOUD_RUN_SERVICE%-service}-precheck-scheduler" + ) + fi + for legacy_job in "${legacy_jobs[@]}"; do + if gcloud scheduler jobs describe "${legacy_job}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" >/dev/null 2>&1; then + echo "Deleting legacy Cloud Scheduler job ${legacy_job}; monitor dispatcher now owns probe/precheck." + gcloud scheduler jobs delete "${legacy_job}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --quiet + fi done - name: Prune old Cloud Run revisions diff --git a/application/monitor_dispatcher.py b/application/monitor_dispatcher.py new file mode 100644 index 0000000..1d59160 --- /dev/null +++ b/application/monitor_dispatcher.py @@ -0,0 +1,271 @@ +"""Dispatch shared monitor checks to platform Cloud Run services.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from typing import Any, Callable, Iterable, Mapping, Sequence +from zoneinfo import ZoneInfo + +import requests +from google.auth.transport.requests import Request +from google.oauth2 import id_token + + +MONITOR_TARGET_ENV_NAMES = ( + "MONITOR_DISPATCH_TARGETS_JSON", + "LONGBRIDGE_MONITOR_DISPATCH_TARGETS_JSON", + "SCHWAB_MONITOR_DISPATCH_TARGETS_JSON", + "FIRSTRADE_MONITOR_DISPATCH_TARGETS_JSON", +) +DEFAULT_LOOKBACK_MINUTES = 4 +DEFAULT_TIMEOUT_SECONDS = 120 +DEFAULT_MAX_WORKERS = 4 + + +@dataclass(frozen=True) +class MonitorWindow: + name: str + path: str + scheduler_key: str + + +MONITOR_WINDOWS = ( + MonitorWindow("probe", "/probe", "probe_time"), + MonitorWindow("precheck", "/dry-run", "precheck_time"), +) + + +def load_monitor_targets(env: Mapping[str, str] | None = None) -> list[dict[str, Any]]: + env = env or os.environ + raw = "" + for name in MONITOR_TARGET_ENV_NAMES: + raw = str(env.get(name) or "").strip() + if raw: + break + if not raw: + return [] + payload = json.loads(raw) + if isinstance(payload, dict): + targets = payload.get("targets") + else: + targets = payload + if not isinstance(targets, list): + raise ValueError("monitor dispatch targets must be a JSON array or an object with targets") + return [dict(target) for target in targets if isinstance(target, Mapping)] + + +def dispatch_due_monitors( + targets: Sequence[Mapping[str, Any]], + *, + now: dt.datetime | None = None, + lookback_minutes: int = DEFAULT_LOOKBACK_MINUTES, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + max_workers: int = DEFAULT_MAX_WORKERS, + token_fetcher: Callable[[str], str] | None = None, + post_fn: Callable[..., Any] | None = None, +) -> dict[str, Any]: + now_utc = _as_utc(now or dt.datetime.now(dt.timezone.utc)) + due_dispatches = list(_iter_due_dispatches(targets, now_utc=now_utc, lookback_minutes=lookback_minutes)) + token_fetcher = token_fetcher or _fetch_id_token + post_fn = post_fn or requests.post + if not due_dispatches: + return { + "ok": True, + "total_targets": len(targets), + "dispatches_due": 0, + "results": [], + } + + workers = max(1, min(int(max_workers or 1), len(due_dispatches))) + results: list[dict[str, Any]] = [] + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [ + executor.submit( + _send_dispatch, + dispatch, + timeout_seconds=timeout_seconds, + token_fetcher=token_fetcher, + post_fn=post_fn, + ) + for dispatch in due_dispatches + ] + for future in as_completed(futures): + results.append(future.result()) + results.sort(key=lambda item: (str(item.get("service_name") or ""), str(item.get("window") or ""))) + return { + "ok": all(bool(result.get("ok")) for result in results), + "total_targets": len(targets), + "dispatches_due": len(due_dispatches), + "results": results, + } + + +def _iter_due_dispatches( + targets: Sequence[Mapping[str, Any]], + *, + now_utc: dt.datetime, + lookback_minutes: int, +) -> Iterable[dict[str, Any]]: + for target in targets: + if not _target_enabled(target): + continue + service_url = str(target.get("service_url") or "").rstrip("/") + if not service_url: + continue + scheduler = target.get("scheduler") if isinstance(target.get("scheduler"), Mapping) else {} + timezone = _target_timezone(scheduler) + local_now = now_utc.astimezone(timezone) + for window in MONITOR_WINDOWS: + schedule = str(scheduler.get(window.scheduler_key) or "").strip() + if not schedule: + continue + if _cron_due_within_window(schedule, local_now=local_now, lookback_minutes=lookback_minutes): + yield { + "service_name": str(target.get("service_name") or ""), + "strategy_profile": str(target.get("strategy_profile") or ""), + "account_scope": str(target.get("account_scope") or target.get("account_group") or ""), + "window": window.name, + "url": f"{service_url}{window.path}", + "audience": service_url, + } + + +def _send_dispatch( + dispatch: Mapping[str, Any], + *, + timeout_seconds: int, + token_fetcher: Callable[[str], str], + post_fn: Callable[..., Any], +) -> dict[str, Any]: + url = str(dispatch.get("url") or "") + audience = str(dispatch.get("audience") or "") + base_result = { + "service_name": dispatch.get("service_name"), + "strategy_profile": dispatch.get("strategy_profile"), + "account_scope": dispatch.get("account_scope"), + "window": dispatch.get("window"), + "url": url, + } + try: + token = token_fetcher(audience) + response = post_fn( + url, + headers={ + "Authorization": f"Bearer {token}", + "User-Agent": "platform-monitor-dispatcher", + }, + timeout=timeout_seconds, + ) + status_code = int(getattr(response, "status_code", 0) or 0) + return { + **base_result, + "status_code": status_code, + "ok": 200 <= status_code < 300, + } + except Exception as exc: + return { + **base_result, + "status_code": 0, + "ok": False, + "error_type": type(exc).__name__, + "error": str(exc), + } + + +def _target_enabled(target: Mapping[str, Any]) -> bool: + value = target.get("runtime_target_enabled") + if value is None: + value = target.get("enabled") + if value is None: + return True + if isinstance(value, bool): + return value + return str(value).strip().lower() not in {"0", "false", "no", "off", "disabled"} + + +def _target_timezone(scheduler: Mapping[str, Any]) -> dt.tzinfo: + try: + return ZoneInfo(str(scheduler.get("timezone") or "UTC")) + except Exception: + return dt.timezone.utc + + +def _cron_due_within_window(schedule: str, *, local_now: dt.datetime, lookback_minutes: int) -> bool: + fields = schedule.split() + if len(fields) != 5: + return False + lookback = max(0, int(lookback_minutes or 0)) + floor_now = local_now.replace(second=0, microsecond=0) + for minute_offset in range(lookback + 1): + candidate = floor_now - dt.timedelta(minutes=minute_offset) + if _cron_matches(fields, candidate): + return True + return False + + +def _cron_matches(fields: Sequence[str], value: dt.datetime) -> bool: + minute, hour, day_of_month, month, day_of_week = fields + cron_weekday = (value.weekday() + 1) % 7 + return ( + _field_matches(minute, value.minute, 0, 59) + and _field_matches(hour, value.hour, 0, 23) + and _field_matches(day_of_month, value.day, 1, 31) + and _field_matches(month, value.month, 1, 12) + and _field_matches(day_of_week, cron_weekday, 0, 7, sunday_alias=True) + ) + + +def _field_matches(field: str, value: int, min_value: int, max_value: int, *, sunday_alias: bool = False) -> bool: + for part in field.split(","): + part = part.strip() + if not part: + continue + if _part_matches(part, value, min_value, max_value, sunday_alias=sunday_alias): + return True + return False + + +def _part_matches(part: str, value: int, min_value: int, max_value: int, *, sunday_alias: bool = False) -> bool: + if "/" in part: + base, step_text = part.split("/", 1) + try: + step = int(step_text) + except ValueError: + return False + if step <= 0: + return False + else: + base = part + step = 1 + if base == "*": + start, end = min_value, max_value + elif "-" in base: + start_text, end_text = base.split("-", 1) + try: + start, end = int(start_text), int(end_text) + except ValueError: + return False + else: + try: + start = end = int(base) + except ValueError: + return False + if sunday_alias and value == 0 and start == end == 7: + return True + if value < start or value > end: + return False + return (value - start) % step == 0 + + +def _as_utc(value: dt.datetime) -> dt.datetime: + if value.tzinfo is None: + return value.replace(tzinfo=dt.timezone.utc) + return value.astimezone(dt.timezone.utc) + + +def _fetch_id_token(audience: str) -> str: + return id_token.fetch_id_token(Request(), audience) diff --git a/main.py b/main.py index 2caaacd..07bdb30 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ Runs the configured shared UsEquityStrategies profile for the deployed LongBridge account. Runs on Cloud Run; token from Secret Manager, orders via LongPort OpenAPI, alerts via Telegram. """ +import json import os import time import traceback @@ -12,6 +13,7 @@ import google.auth import requests +from application.monitor_dispatcher import dispatch_due_monitors, load_monitor_targets from application.runtime_broker_adapters import build_runtime_broker_adapters from application.runtime_composer import build_runtime_composer from application.rebalance_service import run_strategy as run_rebalance_cycle @@ -720,6 +722,26 @@ def handle_probe(): ) +@app.route("/monitor-dispatch", methods=["POST", "GET"]) +def handle_monitor_dispatch(): + if request_method() == "GET": + return "Monitor Dispatch OK - use POST to dispatch due monitor checks", 200 + try: + result = dispatch_due_monitors(load_monitor_targets()) + except Exception as exc: + return _handle_route_runtime_error(exc, route_label="POST /monitor-dispatch") + return json.dumps(result, ensure_ascii=False), 200, {"Content-Type": "application/json"} + + +def request_method() -> str: + try: + from flask import request + + return request.method + except Exception: + return "GET" + + @app.route("/health", methods=["GET"]) def health(): return "OK", 200 diff --git a/tests/test_monitor_dispatcher.py b/tests/test_monitor_dispatcher.py new file mode 100644 index 0000000..ece0a53 --- /dev/null +++ b/tests/test_monitor_dispatcher.py @@ -0,0 +1,57 @@ +import datetime as dt + +from application.monitor_dispatcher import dispatch_due_monitors + + +def test_dispatch_due_monitors_selects_probe_window(): + targets = [ + { + "service_name": "longbridge-quant-sg-service", + "service_url": "https://svc-sg.example.run.app", + "strategy_profile": "tqqq_growth_income", + "account_scope": "SG", + "scheduler": { + "timezone": "America/New_York", + "probe_time": "35 9,15 * * *", + "precheck_time": "45 9 * * *", + }, + } + ] + calls = [] + + class Response: + status_code = 200 + + result = dispatch_due_monitors( + targets, + now=dt.datetime(2026, 6, 18, 13, 35, tzinfo=dt.timezone.utc), + token_fetcher=lambda audience: f"token:{audience}", + post_fn=lambda url, **kwargs: calls.append((url, kwargs)) or Response(), + ) + + assert result["ok"] is True + assert result["dispatches_due"] == 1 + assert calls[0][0] == "https://svc-sg.example.run.app/probe" + assert calls[0][1]["headers"]["Authorization"] == "Bearer token:https://svc-sg.example.run.app" + + +def test_dispatch_due_monitors_skips_disabled_target(): + result = dispatch_due_monitors( + [ + { + "service_name": "longbridge-quant-hk-service", + "service_url": "https://svc-hk.example.run.app", + "runtime_target_enabled": "false", + "scheduler": { + "timezone": "America/New_York", + "probe_time": "35 9 * * *", + }, + } + ], + now=dt.datetime(2026, 6, 18, 13, 35, tzinfo=dt.timezone.utc), + token_fetcher=lambda _audience: "token", + post_fn=lambda *_args, **_kwargs: None, + ) + + assert result["dispatches_due"] == 0 + assert result["results"] == [] diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index ad6003f..e875569 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -104,12 +104,21 @@ def run(self, *args, **kwargs): google_auth_module = types.ModuleType("google.auth") google_auth_module.default = lambda *args, **kwargs: (None, None) + google_auth_transport_module = types.ModuleType("google.auth.transport") + google_auth_transport_requests_module = types.ModuleType("google.auth.transport.requests") + google_auth_transport_requests_module.Request = type("Request", (), {}) + google_oauth2_module = types.ModuleType("google.oauth2") + google_oauth2_id_token_module = types.ModuleType("google.oauth2.id_token") + google_oauth2_id_token_module.fetch_id_token = lambda *_args, **_kwargs: "id-token" google_cloud_module = types.ModuleType("google.cloud") google_cloud_module.__path__ = [] google_secretmanager_module = types.ModuleType("google.cloud.secretmanager_v1") google_module.auth = google_auth_module + google_auth_module.transport = google_auth_transport_module + google_auth_transport_module.requests = google_auth_transport_requests_module + google_oauth2_module.id_token = google_oauth2_id_token_module google_cloud_module.secretmanager_v1 = google_secretmanager_module pandas_module = types.ModuleType("pandas") @@ -163,6 +172,10 @@ def run(self, *args, **kwargs): "quant_platform_kit.longbridge": qpk_longbridge_module, "google": google_module, "google.auth": google_auth_module, + "google.auth.transport": google_auth_transport_module, + "google.auth.transport.requests": google_auth_transport_requests_module, + "google.oauth2": google_oauth2_module, + "google.oauth2.id_token": google_oauth2_id_token_module, "google.cloud": google_cloud_module, "google.cloud.secretmanager_v1": google_secretmanager_module, "pandas": pandas_module, @@ -216,8 +229,31 @@ def test_cloud_run_route_contracts_are_registered(self): module.app._routes[("/probe", ("POST", "GET"))], module.handle_probe, ) + self.assertIs( + module.app._routes[("/monitor-dispatch", ("POST", "GET"))], + module.handle_monitor_dispatch, + ) self.assertIs(module.app._routes[("/health", ("GET",))], module.health) + def test_handle_monitor_dispatch_post_dispatches_due_targets(self): + module = load_module() + observed = {} + monkeypatch = unittest.mock.patch.object + + def fake_dispatch(targets): + observed["targets"] = targets + return {"ok": True, "dispatches_due": 0} + + with monkeypatch(module, "request_method", lambda: "POST"), \ + monkeypatch(module, "load_monitor_targets", lambda: [{"service_name": "longbridge-quant-sg-service"}]), \ + monkeypatch(module, "dispatch_due_monitors", fake_dispatch): + body, status, headers = module.handle_monitor_dispatch() + + self.assertEqual(status, 200) + self.assertEqual(headers["Content-Type"], "application/json") + self.assertIn('"dispatches_due": 0', body) + self.assertEqual(observed["targets"][0]["service_name"], "longbridge-quant-sg-service") + def test_health_route_returns_ok(self): module = load_module() diff --git a/tests/test_sync_cloud_run_env_workflow.sh b/tests/test_sync_cloud_run_env_workflow.sh index da8b034..ed75a8b 100644 --- a/tests/test_sync_cloud_run_env_workflow.sh +++ b/tests/test_sync_cloud_run_env_workflow.sh @@ -26,6 +26,8 @@ grep -Fq 'CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }}' "$work grep -Fq 'CLOUD_SCHEDULER_MAIN_TIME: ${{ vars.CLOUD_SCHEDULER_MAIN_TIME }}' "$workflow_file" grep -Fq 'CLOUD_SCHEDULER_PROBE_TIME: ${{ vars.CLOUD_SCHEDULER_PROBE_TIME }}' "$workflow_file" grep -Fq 'CLOUD_SCHEDULER_PRECHECK_TIME: ${{ vars.CLOUD_SCHEDULER_PRECHECK_TIME }}' "$workflow_file" +grep -Fq 'CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }}' "$workflow_file" +grep -Fq 'GCP_SCHEDULER_SERVICE_ACCOUNT: longbridge-platform-scheduler@longbridgequant.iam.gserviceaccount.com' "$workflow_file" grep -Fq 'Skipping Cloud Run commit wait because CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT is disabled.' "$workflow_file" grep -Fq 'permissions:' "$workflow_file" grep -Fq 'id-token: write' "$workflow_file" @@ -216,6 +218,7 @@ grep -Fq 'join_by_delimiter()' "$workflow_file" grep -Fq 'gcloud_args+=(--remove-secrets "$(IFS=,; echo "${remove_secret_vars[*]}")")' "$workflow_file" grep -Fq 'gcloud_args+=(--update-secrets "$(IFS=,; echo "${secret_pairs[*]}")")' "$workflow_file" grep -Fq -- '--update-env-vars "^|^$(join_by_delimiter "|" "${env_pairs[@]}")"' "$workflow_file" +grep -Fq 'MONITOR_DISPATCH_TARGETS_JSON=${monitor_targets_json}' "$workflow_file" grep -Fq 'Sync Cloud Scheduler schedule' "$workflow_file" grep -Fq 'RUNTIME_TARGET_JSON: ${{ steps.strategy_requirements.outputs.runtime_target_json }}' "$workflow_file" grep -Fq 'scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}"' "$workflow_file" @@ -226,13 +229,17 @@ grep -Fq 'timezone = "Asia/Hong_Kong" if market == "HK" else "America/New_York"' grep -Fq 'configured_time("main_time", "CLOUD_SCHEDULER_MAIN_TIME", "45 15")' "$workflow_file" grep -Fq 'configured_time("probe_time", "CLOUD_SCHEDULER_PROBE_TIME", "35 9,15")' "$workflow_file" grep -Fq 'configured_time("precheck_time", "CLOUD_SCHEDULER_PRECHECK_TIME", "45 9")' "$workflow_file" -grep -Fq 'for suffix in scheduler probe-scheduler precheck-scheduler; do' "$workflow_file" -grep -Fq 'current_schedule="$(gcloud scheduler jobs describe "${job_name}"' "$workflow_file" -grep -Fq 'desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${schedule_time}" python - <<' "$workflow_file" +grep -Fq 'scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-scheduler")' "$workflow_file" +grep -Fq 'current_schedule="$(gcloud scheduler jobs describe "${candidate_job}"' "$workflow_file" +grep -Fq 'desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${main_time}" python - <<' "$workflow_file" grep -Fq 'if len(time_fields) == 5:' "$workflow_file" grep -Fq 'print(" ".join(time_fields))' "$workflow_file" grep -Fq 'print(" ".join([*time_fields, *current_fields[2:]]))' "$workflow_file" grep -Fq 'gcloud scheduler jobs update http "${job_name}"' "$workflow_file" +grep -Fq 'gcloud scheduler jobs create http "${job_name}"' "$workflow_file" +grep -Fq 'monitor_job_name="longbridge-monitor-dispatcher-scheduler"' "$workflow_file" +grep -Fq 'monitor_uri="${service_url}/monitor-dispatch"' "$workflow_file" +grep -Fq 'gcloud scheduler jobs delete "${legacy_job}"' "$workflow_file" grep -Fq -- '--schedule="${desired_schedule}"' "$workflow_file" grep -Fq -- '--time-zone="${market_timezone}"' "$workflow_file"