diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index 54c4cc6..b06f78e 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -11,6 +11,7 @@ env: GCP_WORKLOAD_IDENTITY_PROVIDER: projects/1088907247379/locations/global/workloadIdentityPools/github-actions/providers/github-main GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT: firstrade-platform-deploy@firstradequant.iam.gserviceaccount.com GCP_RUNTIME_SERVICE_ACCOUNT: firstrade-platform-runtime@firstradequant.iam.gserviceaccount.com + GCP_SCHEDULER_SERVICE_ACCOUNT: firstrade-platform-scheduler@firstradequant.iam.gserviceaccount.com GCP_ARTIFACT_REGISTRY_HOSTNAME: us-central1-docker.pkg.dev GCP_ARTIFACT_REGISTRY_REPOSITORY: cloud-run-source-deploy @@ -587,6 +588,33 @@ jobs: add_optional_secret FIRSTRADE_MFA_PHONE FIRSTRADE_MFA_PHONE_SECRET_NAME FIRSTRADE_MFA_PHONE add_optional_secret FIRSTRADE_MFA_CODE FIRSTRADE_MFA_CODE_SECRET_NAME FIRSTRADE_MFA_CODE + service_url="$(gcloud run services describe "${CLOUD_RUN_SERVICE}" \ + --project="${GCP_PROJECT_ID}" \ + --region="${CLOUD_RUN_REGION}" \ + --format='value(status.url)' 2>/dev/null || true)" + if [ -z "${service_url}" ]; then + echo "Unable to resolve Cloud Run service URL for ${CLOUD_RUN_SERVICE}; cannot sync monitor dispatcher targets." >&2 + exit 1 + fi + monitor_targets_json="$( + SERVICE_URL="${service_url}" python - <<'PY' + import json + import os + + runtime_target = json.loads(os.environ.get("RUNTIME_TARGET_JSON") or "{}") + target = { + "service_name": os.environ.get("CLOUD_RUN_SERVICE"), + "service_url": os.environ.get("SERVICE_URL"), + "strategy_profile": runtime_target.get("strategy_profile"), + "account_scope": runtime_target.get("account_scope"), + "runtime_target_enabled": os.environ.get("RUNTIME_TARGET_ENABLED", "true"), + "scheduler": runtime_target.get("scheduler") if isinstance(runtime_target.get("scheduler"), dict) else {}, + } + print(json.dumps({"targets": [target]}, separators=(",", ":"))) + PY + )" + env_pairs+=("MONITOR_DISPATCH_TARGETS_JSON=${monitor_targets_json}") + gcloud_args=( run services update "${CLOUD_RUN_SERVICE}" --region "${CLOUD_RUN_REGION}" @@ -655,46 +683,42 @@ jobs: exit 1 fi - for suffix in scheduler probe-scheduler precheck-scheduler; do - case "${suffix}" in - scheduler) - schedule_time="${main_time}" - scheduler_path="/run" - ;; - probe-scheduler) - schedule_time="${probe_time}" - scheduler_path="/probe" - ;; - precheck-scheduler) - schedule_time="${precheck_time}" - scheduler_path="/dry-run" - ;; - esac - - scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-${suffix}") - if [[ "${CLOUD_RUN_SERVICE}" == *-service ]]; then - scheduler_job_candidates+=("${CLOUD_RUN_SERVICE%-service}-${suffix}") - fi + gcloud run services add-iam-policy-binding "${CLOUD_RUN_SERVICE}" \ + --project="${GCP_PROJECT_ID}" \ + --region="${CLOUD_RUN_REGION}" \ + --member="serviceAccount:${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --role="roles/run.invoker" \ + --quiet + gcloud run services add-iam-policy-binding "${CLOUD_RUN_SERVICE}" \ + --project="${GCP_PROJECT_ID}" \ + --region="${CLOUD_RUN_REGION}" \ + --member="serviceAccount:${GCP_RUNTIME_SERVICE_ACCOUNT}" \ + --role="roles/run.invoker" \ + --quiet - job_name="" - current_schedule="" - for candidate_job in "${scheduler_job_candidates[@]}"; do - current_schedule="$(gcloud scheduler jobs describe "${candidate_job}" \ - --project="${GCP_PROJECT_ID}" \ - --location="${scheduler_location}" \ - --format='value(schedule)' 2>/dev/null || true)" - if [ -n "${current_schedule}" ]; then - job_name="${candidate_job}" - break - fi - done + scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-scheduler") + if [[ "${CLOUD_RUN_SERVICE}" == *-service ]]; then + scheduler_job_candidates+=("${CLOUD_RUN_SERVICE%-service}-scheduler") + fi - if [ -z "${current_schedule}" ]; then - echo "Cloud Scheduler job for ${CLOUD_RUN_SERVICE} ${suffix} was not found in ${scheduler_location}; skipping schedule sync." - continue + job_name="" + current_schedule="" + for candidate_job in "${scheduler_job_candidates[@]}"; do + current_schedule="$(gcloud scheduler jobs describe "${candidate_job}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --format='value(schedule)' 2>/dev/null || true)" + if [ -n "${current_schedule}" ]; then + job_name="${candidate_job}" + break fi + done + if [ -z "${job_name}" ]; then + job_name="${scheduler_job_candidates[0]}" + fi - desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${schedule_time}" python - <<'PY' + if [ -n "${current_schedule}" ]; then + desired_schedule="$(CURRENT_SCHEDULE="${current_schedule}" SCHEDULE_TIME="${main_time}" python - <<'PY' import os current_fields = os.environ["CURRENT_SCHEDULE"].split() @@ -711,8 +735,25 @@ jobs: ) PY )" + else + desired_schedule="$(SCHEDULE_TIME="${main_time}" python - <<'PY' + import os - scheduler_uri="${service_url}${scheduler_path}" + fields = os.environ["SCHEDULE_TIME"].split() + if len(fields) == 5: + print(" ".join(fields)) + elif len(fields) == 2: + print(" ".join([*fields, "*", "*", "*"])) + else: + raise SystemExit( + f"Cloud Scheduler override must have 2 time fields or 5 cron fields: {os.environ['SCHEDULE_TIME']!r}" + ) + PY + )" + fi + + scheduler_uri="${service_url}/run" + if [ -n "${current_schedule}" ]; then echo "Updating Cloud Scheduler job ${job_name} schedule to ${desired_schedule}, timezone to ${market_timezone}, and URI to ${scheduler_uri}." gcloud scheduler jobs update http "${job_name}" \ --project="${GCP_PROJECT_ID}" \ @@ -721,6 +762,75 @@ jobs: --schedule="${desired_schedule}" \ --time-zone="${market_timezone}" \ --quiet + else + echo "Creating Cloud Scheduler job ${job_name} schedule ${desired_schedule}, timezone ${market_timezone}, and URI ${scheduler_uri}." + gcloud scheduler jobs create http "${job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${scheduler_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="${desired_schedule}" \ + --time-zone="${market_timezone}" \ + --attempt-deadline=600s \ + --quiet + fi + + monitor_job_name="firstrade-monitor-dispatcher-scheduler" + monitor_uri="${service_url}/monitor-dispatch" + if gcloud scheduler jobs describe "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" >/dev/null 2>&1; then + echo "Updating Cloud Scheduler job ${monitor_job_name} to ${monitor_uri}." + gcloud scheduler jobs update http "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${monitor_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="*/5 * * * *" \ + --time-zone="UTC" \ + --attempt-deadline=180s \ + --quiet + else + echo "Creating Cloud Scheduler job ${monitor_job_name} at ${monitor_uri}." + gcloud scheduler jobs create http "${monitor_job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --uri="${monitor_uri}" \ + --http-method=POST \ + --oidc-service-account-email="${GCP_SCHEDULER_SERVICE_ACCOUNT}" \ + --oidc-token-audience="${service_url}" \ + --schedule="*/5 * * * *" \ + --time-zone="UTC" \ + --attempt-deadline=180s \ + --quiet + fi + + legacy_jobs=( + "${CLOUD_RUN_SERVICE}-probe-scheduler" + "${CLOUD_RUN_SERVICE}-precheck-scheduler" + "${CLOUD_RUN_SERVICE}-session-check-scheduler" + ) + if [[ "${CLOUD_RUN_SERVICE}" == *-service ]]; then + legacy_jobs+=( + "${CLOUD_RUN_SERVICE%-service}-probe-scheduler" + "${CLOUD_RUN_SERVICE%-service}-precheck-scheduler" + "${CLOUD_RUN_SERVICE%-service}-session-check-scheduler" + ) + fi + for legacy_job in "${legacy_jobs[@]}"; do + if gcloud scheduler jobs describe "${legacy_job}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" >/dev/null 2>&1; then + echo "Deleting legacy Cloud Scheduler job ${legacy_job}; monitor dispatcher now owns probe/precheck." + gcloud scheduler jobs delete "${legacy_job}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --quiet + fi done - name: Clean up old Cloud Run revisions and images diff --git a/application/monitor_dispatcher.py b/application/monitor_dispatcher.py new file mode 100644 index 0000000..1d59160 --- /dev/null +++ b/application/monitor_dispatcher.py @@ -0,0 +1,271 @@ +"""Dispatch shared monitor checks to platform Cloud Run services.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from typing import Any, Callable, Iterable, Mapping, Sequence +from zoneinfo import ZoneInfo + +import requests +from google.auth.transport.requests import Request +from google.oauth2 import id_token + + +MONITOR_TARGET_ENV_NAMES = ( + "MONITOR_DISPATCH_TARGETS_JSON", + "LONGBRIDGE_MONITOR_DISPATCH_TARGETS_JSON", + "SCHWAB_MONITOR_DISPATCH_TARGETS_JSON", + "FIRSTRADE_MONITOR_DISPATCH_TARGETS_JSON", +) +DEFAULT_LOOKBACK_MINUTES = 4 +DEFAULT_TIMEOUT_SECONDS = 120 +DEFAULT_MAX_WORKERS = 4 + + +@dataclass(frozen=True) +class MonitorWindow: + name: str + path: str + scheduler_key: str + + +MONITOR_WINDOWS = ( + MonitorWindow("probe", "/probe", "probe_time"), + MonitorWindow("precheck", "/dry-run", "precheck_time"), +) + + +def load_monitor_targets(env: Mapping[str, str] | None = None) -> list[dict[str, Any]]: + env = env or os.environ + raw = "" + for name in MONITOR_TARGET_ENV_NAMES: + raw = str(env.get(name) or "").strip() + if raw: + break + if not raw: + return [] + payload = json.loads(raw) + if isinstance(payload, dict): + targets = payload.get("targets") + else: + targets = payload + if not isinstance(targets, list): + raise ValueError("monitor dispatch targets must be a JSON array or an object with targets") + return [dict(target) for target in targets if isinstance(target, Mapping)] + + +def dispatch_due_monitors( + targets: Sequence[Mapping[str, Any]], + *, + now: dt.datetime | None = None, + lookback_minutes: int = DEFAULT_LOOKBACK_MINUTES, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + max_workers: int = DEFAULT_MAX_WORKERS, + token_fetcher: Callable[[str], str] | None = None, + post_fn: Callable[..., Any] | None = None, +) -> dict[str, Any]: + now_utc = _as_utc(now or dt.datetime.now(dt.timezone.utc)) + due_dispatches = list(_iter_due_dispatches(targets, now_utc=now_utc, lookback_minutes=lookback_minutes)) + token_fetcher = token_fetcher or _fetch_id_token + post_fn = post_fn or requests.post + if not due_dispatches: + return { + "ok": True, + "total_targets": len(targets), + "dispatches_due": 0, + "results": [], + } + + workers = max(1, min(int(max_workers or 1), len(due_dispatches))) + results: list[dict[str, Any]] = [] + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [ + executor.submit( + _send_dispatch, + dispatch, + timeout_seconds=timeout_seconds, + token_fetcher=token_fetcher, + post_fn=post_fn, + ) + for dispatch in due_dispatches + ] + for future in as_completed(futures): + results.append(future.result()) + results.sort(key=lambda item: (str(item.get("service_name") or ""), str(item.get("window") or ""))) + return { + "ok": all(bool(result.get("ok")) for result in results), + "total_targets": len(targets), + "dispatches_due": len(due_dispatches), + "results": results, + } + + +def _iter_due_dispatches( + targets: Sequence[Mapping[str, Any]], + *, + now_utc: dt.datetime, + lookback_minutes: int, +) -> Iterable[dict[str, Any]]: + for target in targets: + if not _target_enabled(target): + continue + service_url = str(target.get("service_url") or "").rstrip("/") + if not service_url: + continue + scheduler = target.get("scheduler") if isinstance(target.get("scheduler"), Mapping) else {} + timezone = _target_timezone(scheduler) + local_now = now_utc.astimezone(timezone) + for window in MONITOR_WINDOWS: + schedule = str(scheduler.get(window.scheduler_key) or "").strip() + if not schedule: + continue + if _cron_due_within_window(schedule, local_now=local_now, lookback_minutes=lookback_minutes): + yield { + "service_name": str(target.get("service_name") or ""), + "strategy_profile": str(target.get("strategy_profile") or ""), + "account_scope": str(target.get("account_scope") or target.get("account_group") or ""), + "window": window.name, + "url": f"{service_url}{window.path}", + "audience": service_url, + } + + +def _send_dispatch( + dispatch: Mapping[str, Any], + *, + timeout_seconds: int, + token_fetcher: Callable[[str], str], + post_fn: Callable[..., Any], +) -> dict[str, Any]: + url = str(dispatch.get("url") or "") + audience = str(dispatch.get("audience") or "") + base_result = { + "service_name": dispatch.get("service_name"), + "strategy_profile": dispatch.get("strategy_profile"), + "account_scope": dispatch.get("account_scope"), + "window": dispatch.get("window"), + "url": url, + } + try: + token = token_fetcher(audience) + response = post_fn( + url, + headers={ + "Authorization": f"Bearer {token}", + "User-Agent": "platform-monitor-dispatcher", + }, + timeout=timeout_seconds, + ) + status_code = int(getattr(response, "status_code", 0) or 0) + return { + **base_result, + "status_code": status_code, + "ok": 200 <= status_code < 300, + } + except Exception as exc: + return { + **base_result, + "status_code": 0, + "ok": False, + "error_type": type(exc).__name__, + "error": str(exc), + } + + +def _target_enabled(target: Mapping[str, Any]) -> bool: + value = target.get("runtime_target_enabled") + if value is None: + value = target.get("enabled") + if value is None: + return True + if isinstance(value, bool): + return value + return str(value).strip().lower() not in {"0", "false", "no", "off", "disabled"} + + +def _target_timezone(scheduler: Mapping[str, Any]) -> dt.tzinfo: + try: + return ZoneInfo(str(scheduler.get("timezone") or "UTC")) + except Exception: + return dt.timezone.utc + + +def _cron_due_within_window(schedule: str, *, local_now: dt.datetime, lookback_minutes: int) -> bool: + fields = schedule.split() + if len(fields) != 5: + return False + lookback = max(0, int(lookback_minutes or 0)) + floor_now = local_now.replace(second=0, microsecond=0) + for minute_offset in range(lookback + 1): + candidate = floor_now - dt.timedelta(minutes=minute_offset) + if _cron_matches(fields, candidate): + return True + return False + + +def _cron_matches(fields: Sequence[str], value: dt.datetime) -> bool: + minute, hour, day_of_month, month, day_of_week = fields + cron_weekday = (value.weekday() + 1) % 7 + return ( + _field_matches(minute, value.minute, 0, 59) + and _field_matches(hour, value.hour, 0, 23) + and _field_matches(day_of_month, value.day, 1, 31) + and _field_matches(month, value.month, 1, 12) + and _field_matches(day_of_week, cron_weekday, 0, 7, sunday_alias=True) + ) + + +def _field_matches(field: str, value: int, min_value: int, max_value: int, *, sunday_alias: bool = False) -> bool: + for part in field.split(","): + part = part.strip() + if not part: + continue + if _part_matches(part, value, min_value, max_value, sunday_alias=sunday_alias): + return True + return False + + +def _part_matches(part: str, value: int, min_value: int, max_value: int, *, sunday_alias: bool = False) -> bool: + if "/" in part: + base, step_text = part.split("/", 1) + try: + step = int(step_text) + except ValueError: + return False + if step <= 0: + return False + else: + base = part + step = 1 + if base == "*": + start, end = min_value, max_value + elif "-" in base: + start_text, end_text = base.split("-", 1) + try: + start, end = int(start_text), int(end_text) + except ValueError: + return False + else: + try: + start = end = int(base) + except ValueError: + return False + if sunday_alias and value == 0 and start == end == 7: + return True + if value < start or value > end: + return False + return (value - start) % step == 0 + + +def _as_utc(value: dt.datetime) -> dt.datetime: + if value.tzinfo is None: + return value.replace(tzinfo=dt.timezone.utc) + return value.astimezone(dt.timezone.utc) + + +def _fetch_id_token(audience: str) -> str: + return id_token.fetch_id_token(Request(), audience) diff --git a/main.py b/main.py index bd6a053..21762d7 100644 --- a/main.py +++ b/main.py @@ -10,6 +10,7 @@ from flask import Flask, jsonify, request +from application.monitor_dispatcher import dispatch_due_monitors, load_monitor_targets from application.firstrade_client import ( FirstradeBrokerClient, FirstradeCredentials, @@ -483,5 +484,26 @@ def probe(): return session_check() +@app.post("/monitor-dispatch") +@app.get("/monitor-dispatch") +def monitor_dispatch(): + if request.method == "GET": + return jsonify({"ok": True, "message": "use POST to dispatch due monitor checks"}) + try: + return jsonify(dispatch_due_monitors(load_monitor_targets())) + except Exception as exc: + notification_attempted = _handle_strategy_run_exception(exc) + return ( + jsonify( + { + "ok": False, + "error": f"{type(exc).__name__}: {exc}", + "runtime_error_notification_attempted": notification_attempted, + } + ), + 500, + ) + + if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("PORT", "8080"))) diff --git a/requirements.txt b/requirements.txt index 918f145..e614423 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,6 @@ firstrade==0.0.39 quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@b846c9d777a450e95d23c264853997d671f47dd9 us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@0cd90a37f3ea5e4b6fe27c9a435834df7df3a25f google-cloud-storage +google-auth requests pytest diff --git a/tests/test_monitor_dispatcher.py b/tests/test_monitor_dispatcher.py new file mode 100644 index 0000000..65ad16d --- /dev/null +++ b/tests/test_monitor_dispatcher.py @@ -0,0 +1,57 @@ +import datetime as dt + +from application.monitor_dispatcher import dispatch_due_monitors + + +def test_dispatch_due_monitors_selects_probe_window(): + targets = [ + { + "service_name": "firstrade-quant-service", + "service_url": "https://svc-sg.example.run.app", + "strategy_profile": "ibit_smart_dca", + "account_scope": "US", + "scheduler": { + "timezone": "America/New_York", + "probe_time": "35 9,15 * * *", + "precheck_time": "45 9 * * *", + }, + } + ] + calls = [] + + class Response: + status_code = 200 + + result = dispatch_due_monitors( + targets, + now=dt.datetime(2026, 6, 18, 13, 35, tzinfo=dt.timezone.utc), + token_fetcher=lambda audience: f"token:{audience}", + post_fn=lambda url, **kwargs: calls.append((url, kwargs)) or Response(), + ) + + assert result["ok"] is True + assert result["dispatches_due"] == 1 + assert calls[0][0] == "https://svc-sg.example.run.app/probe" + assert calls[0][1]["headers"]["Authorization"] == "Bearer token:https://svc-sg.example.run.app" + + +def test_dispatch_due_monitors_skips_disabled_target(): + result = dispatch_due_monitors( + [ + { + "service_name": "firstrade-quant-service", + "service_url": "https://svc-hk.example.run.app", + "runtime_target_enabled": "false", + "scheduler": { + "timezone": "America/New_York", + "probe_time": "35 9 * * *", + }, + } + ], + now=dt.datetime(2026, 6, 18, 13, 35, tzinfo=dt.timezone.utc), + token_fetcher=lambda _audience: "token", + post_fn=lambda *_args, **_kwargs: None, + ) + + assert result["dispatches_due"] == 0 + assert result["results"] == [] diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 1c6d01a..90989f1 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -21,6 +21,7 @@ def test_cloud_run_route_contracts_are_registered(): "/smoke": ["GET"], "/run": ["GET", "POST"], "/dry-run": ["GET", "POST"], + "/monitor-dispatch": ["GET", "POST"], "/probe": ["GET", "POST"], "/static/": ["GET"], } @@ -66,6 +67,29 @@ def test_run_endpoint_calls_strategy_cycle_when_gate_enabled(monkeypatch): assert response.get_json() == {"ok": True, "action_done": False} +def test_run_endpoint_returns_200_for_terminal_funding_block(monkeypatch): + monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true") + monkeypatch.setattr( + main, + "_run_strategy_cycle_with_report", + lambda **_kwargs: { + "ok": False, + "execution_blocked": True, + "execution_block_retryable": False, + "funding_blocked": True, + "error": "Strategy execution blocked; see execution_blocking_skips.", + }, + ) + client = main.app.test_client() + + response = client.post("/run") + + assert response.status_code == 200 + payload = response.get_json() + assert payload["funding_blocked"] is True + assert payload["execution_block_retryable"] is False + + def test_probe_endpoint_is_disabled_without_explicit_http_gate(monkeypatch): monkeypatch.delenv("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP", raising=False) client = main.app.test_client() @@ -246,3 +270,21 @@ def fake_run_strategy_cycle_with_report(**kwargs): } assert probe_response.status_code == 200 assert probe_response.get_json()["ok"] is True + + +def test_monitor_dispatch_post_dispatches_due_targets(monkeypatch): + observed = {} + + def fake_dispatch(targets): + observed["targets"] = targets + return {"ok": True, "dispatches_due": 0} + + monkeypatch.setattr(main, "load_monitor_targets", lambda: [{"service_name": "firstrade-quant-service"}]) + monkeypatch.setattr(main, "dispatch_due_monitors", fake_dispatch) + client = main.app.test_client() + + response = client.post("/monitor-dispatch") + + assert response.status_code == 200 + assert response.get_json()["dispatches_due"] == 0 + assert observed["targets"][0]["service_name"] == "firstrade-quant-service" diff --git a/tests/test_sync_cloud_run_env_workflow.py b/tests/test_sync_cloud_run_env_workflow.py index cea5044..3ca06b6 100644 --- a/tests/test_sync_cloud_run_env_workflow.py +++ b/tests/test_sync_cloud_run_env_workflow.py @@ -121,6 +121,8 @@ def test_sync_cloud_run_env_workflow_syncs_scheduler_from_runtime_target(): workflow = workflow_path.read_text(encoding="utf-8") assert "Sync Cloud Scheduler schedule" in workflow + assert "GCP_SCHEDULER_SERVICE_ACCOUNT: firstrade-platform-scheduler@firstradequant.iam.gserviceaccount.com" in workflow + assert "MONITOR_DISPATCH_TARGETS_JSON=${monitor_targets_json}" in workflow assert 'scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}"' in workflow assert 'raw_runtime_target = os.environ.get("RUNTIME_TARGET_JSON", "").strip()' in workflow assert 'scheduler = runtime_target.get("scheduler") if isinstance(runtime_target, dict) else {}' in workflow @@ -128,11 +130,16 @@ def test_sync_cloud_run_env_workflow_syncs_scheduler_from_runtime_target(): assert 'configured_time("main_time", "CLOUD_SCHEDULER_MAIN_TIME", "45 15")' in workflow assert 'configured_time("probe_time", "CLOUD_SCHEDULER_PROBE_TIME", "35 9,15")' in workflow assert 'configured_time("precheck_time", "CLOUD_SCHEDULER_PRECHECK_TIME", "45 9")' in workflow - assert 'scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-${suffix}")' in workflow - assert 'scheduler_job_candidates+=("${CLOUD_RUN_SERVICE%-service}-${suffix}")' in workflow + assert 'scheduler_job_candidates=("${CLOUD_RUN_SERVICE}-scheduler")' in workflow + assert 'scheduler_job_candidates+=("${CLOUD_RUN_SERVICE%-service}-scheduler")' in workflow assert 'if len(time_fields) == 5:' in workflow assert 'print(" ".join(time_fields))' in workflow assert 'print(" ".join([*time_fields, *current_fields[2:]]))' in workflow assert 'gcloud scheduler jobs update http "${job_name}"' in workflow + assert 'gcloud scheduler jobs create http "${job_name}"' in workflow + assert 'monitor_job_name="firstrade-monitor-dispatcher-scheduler"' in workflow + assert 'monitor_uri="${service_url}/monitor-dispatch"' in workflow + assert '"${CLOUD_RUN_SERVICE}-session-check-scheduler"' in workflow + assert 'gcloud scheduler jobs delete "${legacy_job}"' in workflow assert '--schedule="${desired_schedule}"' in workflow assert '--time-zone="${market_timezone}"' in workflow