From 4dd4aa6b7b664c8c368c739f7d70ddb5e5ae6e74 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Thu, 18 Jun 2026 18:09:17 +0800 Subject: [PATCH] Fix internal LongBridge precheck invoke --- .github/workflows/invoke-cloud-run.yml | 193 +++++++++++++++++++++++- application/rebalance_service.py | 2 + application/runtime_composer.py | 2 + application/runtime_dependencies.py | 1 + main.py | 5 + notifications/renderers.py | 10 +- notifications/telegram.py | 2 + tests/test_invoke_cloud_run_workflow.sh | 7 + tests/test_notifications.py | 31 ++++ tests/test_request_handling.py | 65 +++++++- 10 files changed, 309 insertions(+), 9 deletions(-) diff --git a/.github/workflows/invoke-cloud-run.yml b/.github/workflows/invoke-cloud-run.yml index 8c52d3f..b497ca6 100644 --- a/.github/workflows/invoke-cloud-run.yml +++ b/.github/workflows/invoke-cloud-run.yml @@ -39,6 +39,7 @@ jobs: env: CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }} CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} + CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }} steps: - name: Validate inputs run: | @@ -82,20 +83,127 @@ jobs: raw_path="/${raw_path}" fi - service_url="$( + service_json="$( gcloud run services describe "${CLOUD_RUN_SERVICE}" \ --region "${CLOUD_RUN_REGION}" \ - --format='value(status.url)' + --format=json + )" + service_url="$(SERVICE_JSON="${service_json}" python3 - <<'PY' + import json + import os + + service = json.loads(os.environ["SERVICE_JSON"]) + print((service.get("status") or {}).get("url") or "") + PY )" if [ -z "${service_url}" ]; then echo "Unable to resolve Cloud Run service URL." >&2 exit 1 fi + service_ingress="$(SERVICE_JSON="${service_json}" python3 - <<'PY' + import json + import os + + service = json.loads(os.environ["SERVICE_JSON"]) + annotations = (service.get("metadata") or {}).get("annotations") or {} + print(annotations.get("run.googleapis.com/ingress-status") or annotations.get("run.googleapis.com/ingress") or "") + PY + )" + latest_ready_revision="$(SERVICE_JSON="${service_json}" python3 - <<'PY' + import json + import os + + service = json.loads(os.environ["SERVICE_JSON"]) + print((service.get("status") or {}).get("latestReadyRevisionName") or "") + PY + )" + deployed_commit="$(SERVICE_JSON="${service_json}" python3 - <<'PY' + import json + import os + + service = json.loads(os.environ["SERVICE_JSON"]) + template = ((service.get("spec") or {}).get("template") or {}).get("metadata") or {} + print((template.get("labels") or {}).get("commit-sha") or "") + PY + )" + + invoke_method="direct" + scheduler_job="" + scheduler_location="" + if [ "${service_ingress}" = "internal" ]; then + scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}" + case "${raw_path}" in + /) + scheduler_job="${CLOUD_RUN_SERVICE}-scheduler" + ;; + /probe) + scheduler_job="${CLOUD_RUN_SERVICE}-probe-scheduler" + ;; + /precheck) + scheduler_job="${CLOUD_RUN_SERVICE}-precheck-scheduler" + ;; + *) + echo "Cloud Run service ${CLOUD_RUN_SERVICE} has internal ingress, so GitHub-hosted runners cannot curl ${raw_path} directly." >&2 + echo "Use one of the scheduler-backed paths: /, /probe, /precheck." >&2 + exit 1 + ;; + esac + + scheduler_uri="$( + gcloud scheduler jobs describe "${scheduler_job}" \ + --location="${scheduler_location}" \ + --format='value(httpTarget.uri)' 2>/dev/null || true + )" + if [ -z "${scheduler_uri}" ]; then + echo "Cloud Scheduler job ${scheduler_job} was not found in ${scheduler_location}." >&2 + exit 1 + fi + scheduler_path="$(SCHEDULER_URI="${scheduler_uri}" python3 - <<'PY' + import os + from urllib.parse import urlparse + + def normalize(path: str) -> str: + clean = (path or "/").rstrip("/") + return clean or "/" + + print(normalize(urlparse(os.environ["SCHEDULER_URI"]).path)) + PY + )" + requested_path="$(RAW_PATH="${raw_path}" python3 - <<'PY' + import os + + clean = (os.environ["RAW_PATH"] or "/").rstrip("/") + print(clean or "/") + PY + )" + if [ "${scheduler_path}" != "${requested_path}" ]; then + echo "Cloud Scheduler job ${scheduler_job} targets ${scheduler_uri}, not ${raw_path}." >&2 + exit 1 + fi + invoke_method="scheduler" + fi + + echo "Cloud Run service: ${CLOUD_RUN_SERVICE}" + echo "Cloud Run region: ${CLOUD_RUN_REGION}" + echo "Cloud Run URL: ${service_url}" + echo "Cloud Run ingress: ${service_ingress:-}" + echo "Latest ready revision: ${latest_ready_revision:-}" + echo "Deployed commit: ${deployed_commit:-}" + echo "Invoke method: ${invoke_method}" + if [ -n "${scheduler_job}" ]; then + echo "Cloud Scheduler job: ${scheduler_job}" + echo "Cloud Scheduler location: ${scheduler_location}" + fi + echo "url=${service_url}" >> "$GITHUB_OUTPUT" echo "path=${raw_path}" >> "$GITHUB_OUTPUT" + echo "invoke_method=${invoke_method}" >> "$GITHUB_OUTPUT" + echo "scheduler_job=${scheduler_job}" >> "$GITHUB_OUTPUT" + echo "scheduler_location=${scheduler_location}" >> "$GITHUB_OUTPUT" - name: Authenticate for service invocation + if: steps.service.outputs.invoke_method == 'direct' id: invoke-auth uses: google-github-actions/auth@v3 with: @@ -106,6 +214,7 @@ jobs: id_token_include_email: true - name: Invoke service + if: steps.service.outputs.invoke_method == 'direct' run: | set -euo pipefail @@ -113,3 +222,83 @@ jobs: --request POST \ --header "Authorization: Bearer ${{ steps.invoke-auth.outputs.id_token }}" \ "${{ steps.service.outputs.url }}${{ steps.service.outputs.path }}" + + - name: Invoke internal service through Cloud Scheduler + if: steps.service.outputs.invoke_method == 'scheduler' + run: | + set -euo pipefail + + started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)" + scheduler_job="${{ steps.service.outputs.scheduler_job }}" + scheduler_location="${{ steps.service.outputs.scheduler_location }}" + + echo "Triggering ${scheduler_job} at ${started_at}." + gcloud scheduler jobs run "${scheduler_job}" \ + --location="${scheduler_location}" \ + --quiet + + deadline=$((SECONDS + 180)) + while true; do + job_json="$( + gcloud scheduler jobs describe "${scheduler_job}" \ + --location="${scheduler_location}" \ + --format=json + )" + attempt_seen="$(JOB_JSON="${job_json}" STARTED_AT="${started_at}" python3 - <<'PY' + import datetime as dt + import json + import os + + def parse_timestamp(value: str) -> dt.datetime | None: + if not value: + return None + text = value.replace("Z", "+00:00") + return dt.datetime.fromisoformat(text) + + job = json.loads(os.environ["JOB_JSON"]) + last_attempt = parse_timestamp(job.get("lastAttemptTime") or "") + started_at = parse_timestamp(os.environ["STARTED_AT"]) + print("true" if last_attempt and started_at and last_attempt >= started_at else "false") + PY + )" + status_code="$(JOB_JSON="${job_json}" python3 - <<'PY' + import json + import os + + status = (json.loads(os.environ["JOB_JSON"]).get("status") or {}) + print(status.get("code") or "") + PY + )" + status_message="$(JOB_JSON="${job_json}" python3 - <<'PY' + import json + import os + + status = (json.loads(os.environ["JOB_JSON"]).get("status") or {}) + print(status.get("message") or "") + PY + )" + last_attempt_time="$(JOB_JSON="${job_json}" python3 - <<'PY' + import json + import os + + print(json.loads(os.environ["JOB_JSON"]).get("lastAttemptTime") or "") + PY + )" + + if [ "${attempt_seen}" = "true" ]; then + if [ -n "${status_code}" ] && [ "${status_code}" != "0" ]; then + echo "Cloud Scheduler job ${scheduler_job} failed with status ${status_code}: ${status_message}" >&2 + exit 1 + fi + echo "Cloud Scheduler job ${scheduler_job} ran at ${last_attempt_time}." + break + fi + + if [ "${SECONDS}" -ge "${deadline}" ]; then + echo "Timed out waiting for Cloud Scheduler job ${scheduler_job} to record a new attempt." >&2 + exit 1 + fi + + echo "Waiting for Cloud Scheduler job ${scheduler_job} attempt..." + sleep 10 + done diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 9a0666f..c7f4169 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -371,6 +371,7 @@ def fetch_replanned_state(): strategy_display_name=config.strategy_display_name, dry_run_only=config.dry_run_only, extra_notification_lines=config.extra_notification_lines, + title_key=config.notification_title_key or "rebalance_title", ) ) else: @@ -384,6 +385,7 @@ def fetch_replanned_state(): strategy_display_name=config.strategy_display_name, dry_run_only=config.dry_run_only, extra_notification_lines=config.extra_notification_lines, + title_key=config.notification_title_key or "heartbeat_title", ) ) return execution_result diff --git a/application/runtime_composer.py b/application/runtime_composer.py index 9262f20..c4cbc0e 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -196,6 +196,7 @@ def build_rebalance_config( *, strategy_plugin_signals=(), strategy_plugin_error: str | None = None, + notification_title_key: str = "", ) -> LongBridgeRebalanceConfig: market_scope_line = self.translator( "market_scope_detail", @@ -231,6 +232,7 @@ def build_rebalance_config( safe_haven_cash_substitute_threshold_usd=self.safe_haven_cash_substitute_threshold_usd, sleeper=self.sleeper, extra_notification_lines=(market_scope_line, *plugin_lines, *plugin_error_lines), + notification_title_key=notification_title_key, strategy_plugin_signals=tuple(strategy_plugin_signals or ()), execution_dedup_enabled=resolve_execution_dedup_enabled( env_reader=self.env_reader, diff --git a/application/runtime_dependencies.py b/application/runtime_dependencies.py index e92a3f0..2f872fc 100644 --- a/application/runtime_dependencies.py +++ b/application/runtime_dependencies.py @@ -26,6 +26,7 @@ class LongBridgeRebalanceConfig: safe_haven_cash_substitute_threshold_usd: float = 1000.0 sleeper: Callable[[float], None] | None = None extra_notification_lines: tuple[str, ...] = () + notification_title_key: str = "" strategy_plugin_signals: tuple[Any, ...] = () execution_dedup_enabled: bool = False execution_state_store: Any = None diff --git a/main.py b/main.py index 7a71a6c..c29263a 100644 --- a/main.py +++ b/main.py @@ -505,6 +505,11 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali config=composer.build_rebalance_config( strategy_plugin_signals=strategy_plugin_signals, strategy_plugin_error=strategy_plugin_error, + notification_title_key=( + "precheck_title" + if validation_only and validation_label == "precheck" + else "" + ), ), ) signal_snapshot = {} diff --git a/notifications/renderers.py b/notifications/renderers.py index ee674f8..fcf0a3c 100644 --- a/notifications/renderers.py +++ b/notifications/renderers.py @@ -506,9 +506,10 @@ def render_rebalance_notification( strategy_display_name, dry_run_only, extra_notification_lines=(), + title_key="rebalance_title", ) -> RenderedNotification: formatted_logs = "\n".join(f" - {log}" for log in [*logs, *skip_logs, *note_logs]) - detailed_lines = [translator("rebalance_title")] + detailed_lines = [translator(title_key or "rebalance_title")] _append_strategy_line(detailed_lines, strategy_display_name=strategy_display_name, translator=translator) if dry_run_only: detailed_lines.append(translator("dry_run_banner")) @@ -525,7 +526,7 @@ def render_rebalance_notification( ) detailed_lines.extend([separator, translator("order_logs_title"), formatted_logs]) - compact_lines = [translator("rebalance_title")] + compact_lines = [translator(title_key or "rebalance_title")] _append_strategy_line(compact_lines, strategy_display_name=strategy_display_name, translator=translator) if dry_run_only: compact_lines.append(translator("dry_run_banner")) @@ -557,8 +558,9 @@ def render_heartbeat_notification( strategy_display_name, dry_run_only, extra_notification_lines=(), + title_key="heartbeat_title", ) -> RenderedNotification: - detailed_lines = [translator("heartbeat_title")] + detailed_lines = [translator(title_key or "heartbeat_title")] _append_strategy_line(detailed_lines, strategy_display_name=strategy_display_name, translator=translator) if dry_run_only: detailed_lines.append(translator("dry_run_banner")) @@ -594,7 +596,7 @@ def render_heartbeat_notification( + "\n".join(f" - {log}" for log in note_logs) ) - compact_lines = [translator("heartbeat_title")] + compact_lines = [translator(title_key or "heartbeat_title")] _append_strategy_line(compact_lines, strategy_display_name=strategy_display_name, translator=translator) if dry_run_only: compact_lines.append(translator("dry_run_banner")) diff --git a/notifications/telegram.py b/notifications/telegram.py index 2e94052..877cb62 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -39,6 +39,7 @@ "income_locked": "🏦 收入层锁定占比: {ratio}", "signal": "🎯 触发信号: {msg}", "heartbeat_title": "💓 【心跳检测】", + "precheck_title": "🧪 【策略预检】", "health_probe_title": "🔎 【连接探针】", "health_probe_error_prefix": "健康探针异常:\n", "equity": "💰 净值: ${value}", @@ -196,6 +197,7 @@ "income_locked": "🏦 Income Locked: {ratio}", "signal": "🎯 Signal: {msg}", "heartbeat_title": "💓 【Heartbeat】", + "precheck_title": "🧪 【Strategy Precheck】", "health_probe_title": "🔎 【Health Probe】", "health_probe_error_prefix": "Health probe error:\n", "equity": "💰 Equity: ${value}", diff --git a/tests/test_invoke_cloud_run_workflow.sh b/tests/test_invoke_cloud_run_workflow.sh index 26c6e3d..4902758 100644 --- a/tests/test_invoke_cloud_run_workflow.sh +++ b/tests/test_invoke_cloud_run_workflow.sh @@ -12,11 +12,18 @@ grep -Fq "google-github-actions/auth@v3" "$workflow_file" grep -Fq "google-github-actions/setup-gcloud@v3" "$workflow_file" grep -Fq "CLOUD_RUN_REGION: \${{ vars.CLOUD_RUN_REGION }}" "$workflow_file" grep -Fq "CLOUD_RUN_SERVICE: \${{ vars.CLOUD_RUN_SERVICE }}" "$workflow_file" +grep -Fq "CLOUD_SCHEDULER_LOCATION: \${{ vars.CLOUD_SCHEDULER_LOCATION }}" "$workflow_file" grep -Fq "longbridge-hk|longbridge-paper|longbridge-sg" "$workflow_file" grep -Fq "gcloud run services describe \"\${CLOUD_RUN_SERVICE}\"" "$workflow_file" +grep -Fq "Cloud Run service \${CLOUD_RUN_SERVICE} has internal ingress" "$workflow_file" +grep -Fq "Use one of the scheduler-backed paths: /, /probe, /precheck." "$workflow_file" +grep -Fq "scheduler_job=\"\${CLOUD_RUN_SERVICE}-precheck-scheduler\"" "$workflow_file" +grep -Fq "Invoke internal service through Cloud Scheduler" "$workflow_file" +grep -Fq "gcloud scheduler jobs run \"\${scheduler_job}\"" "$workflow_file" grep -Fq "token_format: id_token" "$workflow_file" grep -Fq "id_token_audience: \${{ steps.service.outputs.url }}" "$workflow_file" grep -Fq "id_token_include_email: true" "$workflow_file" +grep -Fq "if: steps.service.outputs.invoke_method == 'direct'" "$workflow_file" grep -Fq "curl --fail-with-body --show-error --silent" "$workflow_file" grep -Fq -- "--request POST" "$workflow_file" grep -Fq "steps.invoke-auth.outputs.id_token" "$workflow_file" diff --git a/tests/test_notifications.py b/tests/test_notifications.py index a2b9e1e..9494e38 100644 --- a/tests/test_notifications.py +++ b/tests/test_notifications.py @@ -177,6 +177,37 @@ def test_heartbeat_signal_snapshot_localizes_price_source(self): self.assertIn("📊 市场状态: 🚀 风险开启(SOXX+SOXL)", rendered.compact_text) self.assertNotIn("longbridge_candlesticks", rendered.compact_text) + def test_precheck_heartbeat_uses_precheck_title(self): + rendered = render_heartbeat_notification( + execution={ + "signal_display": "🚀 入场信号 | 原因:QQQ 高于 MA200", + }, + skip_logs=(), + note_logs=(), + translator=build_translator("zh"), + separator="━━━━━━━━━━━━━━━━━━", + strategy_display_name="TQQQ 增长收益", + dry_run_only=True, + title_key="precheck_title", + ) + en_rendered = render_heartbeat_notification( + execution={ + "signal_display": "Entry signal | reason: QQQ is above MA200", + }, + skip_logs=(), + note_logs=(), + translator=build_translator("en"), + separator="━━━━━━━━━━━━━━━━━━", + strategy_display_name="TQQQ Growth Income", + dry_run_only=True, + title_key="precheck_title", + ) + + self.assertIn("🧪 【策略预检】", rendered.compact_text) + self.assertNotIn("💓 【心跳检测】", rendered.compact_text) + self.assertIn("🧪 【Strategy Precheck】", en_rendered.compact_text) + self.assertNotIn("💓 【Heartbeat】", en_rendered.compact_text) + def test_heartbeat_renders_tqqq_volatility_delever_risk_control(self): zh_rendered = render_heartbeat_notification( execution={ diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index c6582ae..a512b1e 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -489,7 +489,13 @@ def with_prefix(self, message): def build_rebalance_runtime(self, *, silent_cycle_notifications=False): return types.SimpleNamespace() - def build_rebalance_config(self, *, strategy_plugin_signals=(), strategy_plugin_error=None): + def build_rebalance_config( + self, + *, + strategy_plugin_signals=(), + strategy_plugin_error=None, + notification_title_key="", + ): return types.SimpleNamespace() module.build_composer = lambda *, dry_run_only_override=None: FakeComposer() @@ -529,7 +535,7 @@ def test_run_strategy_force_runs_when_market_closed(self): def test_run_strategy_validation_only_uses_dry_run_composer(self): module = load_module() - observed = {"override": None} + observed = {"override": None, "notification_title_key": None} class FakeComposer: def build_reporting_adapters(self): @@ -555,7 +561,14 @@ def build_rebalance_runtime(self, *, silent_cycle_notifications=False): observed["silent_cycle_notifications"] = silent_cycle_notifications return types.SimpleNamespace() - def build_rebalance_config(self, *, strategy_plugin_signals=(), strategy_plugin_error=None): + def build_rebalance_config( + self, + *, + strategy_plugin_signals=(), + strategy_plugin_error=None, + notification_title_key="", + ): + observed["notification_title_key"] = notification_title_key return types.SimpleNamespace() module.build_composer = lambda *, dry_run_only_override=None: observed.__setitem__("override", dry_run_only_override) or FakeComposer() @@ -568,6 +581,52 @@ def build_rebalance_config(self, *, strategy_plugin_signals=(), strategy_plugin_ self.assertTrue(observed["override"]) self.assertTrue(observed["silent_cycle_notifications"]) + self.assertEqual(observed["notification_title_key"], "") + + def test_run_strategy_precheck_sets_precheck_notification_title(self): + module = load_module() + observed = {"notification_title_key": None} + + class FakeComposer: + def build_reporting_adapters(self): + return types.SimpleNamespace( + start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}), + log_event=lambda *args, **kwargs: None, + persist_execution_report=lambda report: types.SimpleNamespace(local_path="/tmp/report.json"), + ) + + def build_notification_adapters(self): + return types.SimpleNamespace(publish_cycle_notification=lambda **_kwargs: None) + + def load_strategy_plugin_signals(self, *_args, **_kwargs): + return (), None + + def attach_strategy_plugin_report(self, *_args, **_kwargs): + return None + + def with_prefix(self, message): + return message + + def build_rebalance_runtime(self, *, silent_cycle_notifications=False): + return types.SimpleNamespace() + + def build_rebalance_config( + self, + *, + strategy_plugin_signals=(), + strategy_plugin_error=None, + notification_title_key="", + ): + observed["notification_title_key"] = notification_title_key + return types.SimpleNamespace() + + module.build_composer = lambda *, dry_run_only_override=None: FakeComposer() + module.is_market_open_now = lambda **_kwargs: False + module.run_rebalance_cycle = lambda **_kwargs: None + + module.run_strategy(force_run=True, validation_only=True, validation_label="precheck") + + self.assertEqual(observed["notification_title_key"], "precheck_title") def test_run_strategy_persists_machine_readable_report(self): module = load_module()