Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 191 additions & 2 deletions .github/workflows/invoke-cloud-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
env:
CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }}
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }}
steps:
- name: Validate inputs
run: |
Expand Down Expand Up @@ -82,20 +83,127 @@ jobs:
raw_path="/${raw_path}"
fi

service_url="$(
service_json="$(
gcloud run services describe "${CLOUD_RUN_SERVICE}" \
--region "${CLOUD_RUN_REGION}" \
--format='value(status.url)'
--format=json
)"
service_url="$(SERVICE_JSON="${service_json}" python3 - <<'PY'
import json
import os

service = json.loads(os.environ["SERVICE_JSON"])
print((service.get("status") or {}).get("url") or "")
PY
)"
if [ -z "${service_url}" ]; then
echo "Unable to resolve Cloud Run service URL." >&2
exit 1
fi

service_ingress="$(SERVICE_JSON="${service_json}" python3 - <<'PY'
import json
import os

service = json.loads(os.environ["SERVICE_JSON"])
annotations = (service.get("metadata") or {}).get("annotations") or {}
print(annotations.get("run.googleapis.com/ingress-status") or annotations.get("run.googleapis.com/ingress") or "")
PY
)"
latest_ready_revision="$(SERVICE_JSON="${service_json}" python3 - <<'PY'
import json
import os

service = json.loads(os.environ["SERVICE_JSON"])
print((service.get("status") or {}).get("latestReadyRevisionName") or "")
PY
)"
deployed_commit="$(SERVICE_JSON="${service_json}" python3 - <<'PY'
import json
import os

service = json.loads(os.environ["SERVICE_JSON"])
template = ((service.get("spec") or {}).get("template") or {}).get("metadata") or {}
print((template.get("labels") or {}).get("commit-sha") or "")
PY
)"

invoke_method="direct"
scheduler_job=""
scheduler_location=""
if [ "${service_ingress}" = "internal" ]; then
scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}"
case "${raw_path}" in
/)
scheduler_job="${CLOUD_RUN_SERVICE}-scheduler"
;;
/probe)
scheduler_job="${CLOUD_RUN_SERVICE}-probe-scheduler"
;;
/precheck)
scheduler_job="${CLOUD_RUN_SERVICE}-precheck-scheduler"
;;
*)
echo "Cloud Run service ${CLOUD_RUN_SERVICE} has internal ingress, so GitHub-hosted runners cannot curl ${raw_path} directly." >&2
echo "Use one of the scheduler-backed paths: /, /probe, /precheck." >&2
exit 1
;;
esac

scheduler_uri="$(
gcloud scheduler jobs describe "${scheduler_job}" \
--location="${scheduler_location}" \
--format='value(httpTarget.uri)' 2>/dev/null || true
)"
if [ -z "${scheduler_uri}" ]; then
echo "Cloud Scheduler job ${scheduler_job} was not found in ${scheduler_location}." >&2
exit 1
fi
scheduler_path="$(SCHEDULER_URI="${scheduler_uri}" python3 - <<'PY'
import os
from urllib.parse import urlparse

def normalize(path: str) -> str:
clean = (path or "/").rstrip("/")
return clean or "/"

print(normalize(urlparse(os.environ["SCHEDULER_URI"]).path))
PY
)"
requested_path="$(RAW_PATH="${raw_path}" python3 - <<'PY'
import os

clean = (os.environ["RAW_PATH"] or "/").rstrip("/")
print(clean or "/")
PY
)"
if [ "${scheduler_path}" != "${requested_path}" ]; then
echo "Cloud Scheduler job ${scheduler_job} targets ${scheduler_uri}, not ${raw_path}." >&2
exit 1
fi
invoke_method="scheduler"
fi

echo "Cloud Run service: ${CLOUD_RUN_SERVICE}"
echo "Cloud Run region: ${CLOUD_RUN_REGION}"
echo "Cloud Run URL: ${service_url}"
echo "Cloud Run ingress: ${service_ingress:-<empty>}"
echo "Latest ready revision: ${latest_ready_revision:-<empty>}"
echo "Deployed commit: ${deployed_commit:-<empty>}"
echo "Invoke method: ${invoke_method}"
if [ -n "${scheduler_job}" ]; then
echo "Cloud Scheduler job: ${scheduler_job}"
echo "Cloud Scheduler location: ${scheduler_location}"
fi

echo "url=${service_url}" >> "$GITHUB_OUTPUT"
echo "path=${raw_path}" >> "$GITHUB_OUTPUT"
echo "invoke_method=${invoke_method}" >> "$GITHUB_OUTPUT"
echo "scheduler_job=${scheduler_job}" >> "$GITHUB_OUTPUT"
echo "scheduler_location=${scheduler_location}" >> "$GITHUB_OUTPUT"

- name: Authenticate for service invocation
if: steps.service.outputs.invoke_method == 'direct'
id: invoke-auth
uses: google-github-actions/auth@v3
with:
Expand All @@ -106,10 +214,91 @@ jobs:
id_token_include_email: true

- name: Invoke service
if: steps.service.outputs.invoke_method == 'direct'
run: |
set -euo pipefail

curl --fail-with-body --show-error --silent \
--request POST \
--header "Authorization: Bearer ${{ steps.invoke-auth.outputs.id_token }}" \
"${{ steps.service.outputs.url }}${{ steps.service.outputs.path }}"

- name: Invoke internal service through Cloud Scheduler
if: steps.service.outputs.invoke_method == 'scheduler'
run: |
set -euo pipefail

started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
scheduler_job="${{ steps.service.outputs.scheduler_job }}"
scheduler_location="${{ steps.service.outputs.scheduler_location }}"

echo "Triggering ${scheduler_job} at ${started_at}."
gcloud scheduler jobs run "${scheduler_job}" \
--location="${scheduler_location}" \
--quiet

deadline=$((SECONDS + 180))
while true; do
job_json="$(
gcloud scheduler jobs describe "${scheduler_job}" \
--location="${scheduler_location}" \
--format=json
)"
attempt_seen="$(JOB_JSON="${job_json}" STARTED_AT="${started_at}" python3 - <<'PY'
import datetime as dt
import json
import os

def parse_timestamp(value: str) -> dt.datetime | None:
if not value:
return None
text = value.replace("Z", "+00:00")
return dt.datetime.fromisoformat(text)

job = json.loads(os.environ["JOB_JSON"])
last_attempt = parse_timestamp(job.get("lastAttemptTime") or "")
started_at = parse_timestamp(os.environ["STARTED_AT"])
print("true" if last_attempt and started_at and last_attempt >= started_at else "false")
PY
)"
status_code="$(JOB_JSON="${job_json}" python3 - <<'PY'
import json
import os

status = (json.loads(os.environ["JOB_JSON"]).get("status") or {})
print(status.get("code") or "")
PY
)"
status_message="$(JOB_JSON="${job_json}" python3 - <<'PY'
import json
import os

status = (json.loads(os.environ["JOB_JSON"]).get("status") or {})
print(status.get("message") or "")
PY
)"
last_attempt_time="$(JOB_JSON="${job_json}" python3 - <<'PY'
import json
import os

print(json.loads(os.environ["JOB_JSON"]).get("lastAttemptTime") or "")
PY
)"

if [ "${attempt_seen}" = "true" ]; then
if [ -n "${status_code}" ] && [ "${status_code}" != "0" ]; then
echo "Cloud Scheduler job ${scheduler_job} failed with status ${status_code}: ${status_message}" >&2
exit 1
fi
Comment on lines +288 to +292

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wait for the scheduler attempt to finish

Cloud Scheduler documents lastAttemptTime as the time an attempt started and status as the target response for the last attempted execution, so for /precheck runs that take more than a moment this branch can fire before the Cloud Run request has completed (or while status is still empty/stale from a previous run). Because an empty status is treated as success and the loop exits, the workflow can pass even if the forced scheduler invocation later returns 500; keep polling until the new attempt has a populated terminal status.

Useful? React with 👍 / 👎.

echo "Cloud Scheduler job ${scheduler_job} ran at ${last_attempt_time}."
break
fi

if [ "${SECONDS}" -ge "${deadline}" ]; then
echo "Timed out waiting for Cloud Scheduler job ${scheduler_job} to record a new attempt." >&2
exit 1
fi

echo "Waiting for Cloud Scheduler job ${scheduler_job} attempt..."
sleep 10
done
2 changes: 2 additions & 0 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ def fetch_replanned_state():
strategy_display_name=config.strategy_display_name,
dry_run_only=config.dry_run_only,
extra_notification_lines=config.extra_notification_lines,
title_key=config.notification_title_key or "rebalance_title",
)
)
else:
Expand All @@ -384,6 +385,7 @@ def fetch_replanned_state():
strategy_display_name=config.strategy_display_name,
dry_run_only=config.dry_run_only,
extra_notification_lines=config.extra_notification_lines,
title_key=config.notification_title_key or "heartbeat_title",
)
)
return execution_result
2 changes: 2 additions & 0 deletions application/runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def build_rebalance_config(
*,
strategy_plugin_signals=(),
strategy_plugin_error: str | None = None,
notification_title_key: str = "",
) -> LongBridgeRebalanceConfig:
market_scope_line = self.translator(
"market_scope_detail",
Expand Down Expand Up @@ -231,6 +232,7 @@ def build_rebalance_config(
safe_haven_cash_substitute_threshold_usd=self.safe_haven_cash_substitute_threshold_usd,
sleeper=self.sleeper,
extra_notification_lines=(market_scope_line, *plugin_lines, *plugin_error_lines),
notification_title_key=notification_title_key,
strategy_plugin_signals=tuple(strategy_plugin_signals or ()),
execution_dedup_enabled=resolve_execution_dedup_enabled(
env_reader=self.env_reader,
Expand Down
1 change: 1 addition & 0 deletions application/runtime_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class LongBridgeRebalanceConfig:
safe_haven_cash_substitute_threshold_usd: float = 1000.0
sleeper: Callable[[float], None] | None = None
extra_notification_lines: tuple[str, ...] = ()
notification_title_key: str = ""
strategy_plugin_signals: tuple[Any, ...] = ()
execution_dedup_enabled: bool = False
execution_state_store: Any = None
Expand Down
5 changes: 5 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
config=composer.build_rebalance_config(
strategy_plugin_signals=strategy_plugin_signals,
strategy_plugin_error=strategy_plugin_error,
notification_title_key=(
"precheck_title"
if validation_only and validation_label == "precheck"
else ""
),
),
)
signal_snapshot = {}
Expand Down
10 changes: 6 additions & 4 deletions notifications/renderers.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,10 @@ def render_rebalance_notification(
strategy_display_name,
dry_run_only,
extra_notification_lines=(),
title_key="rebalance_title",
) -> RenderedNotification:
formatted_logs = "\n".join(f" - {log}" for log in [*logs, *skip_logs, *note_logs])
detailed_lines = [translator("rebalance_title")]
detailed_lines = [translator(title_key or "rebalance_title")]
_append_strategy_line(detailed_lines, strategy_display_name=strategy_display_name, translator=translator)
if dry_run_only:
detailed_lines.append(translator("dry_run_banner"))
Expand All @@ -525,7 +526,7 @@ def render_rebalance_notification(
)
detailed_lines.extend([separator, translator("order_logs_title"), formatted_logs])

compact_lines = [translator("rebalance_title")]
compact_lines = [translator(title_key or "rebalance_title")]
_append_strategy_line(compact_lines, strategy_display_name=strategy_display_name, translator=translator)
if dry_run_only:
compact_lines.append(translator("dry_run_banner"))
Expand Down Expand Up @@ -557,8 +558,9 @@ def render_heartbeat_notification(
strategy_display_name,
dry_run_only,
extra_notification_lines=(),
title_key="heartbeat_title",
) -> RenderedNotification:
detailed_lines = [translator("heartbeat_title")]
detailed_lines = [translator(title_key or "heartbeat_title")]
_append_strategy_line(detailed_lines, strategy_display_name=strategy_display_name, translator=translator)
if dry_run_only:
detailed_lines.append(translator("dry_run_banner"))
Expand Down Expand Up @@ -594,7 +596,7 @@ def render_heartbeat_notification(
+ "\n".join(f" - {log}" for log in note_logs)
)

compact_lines = [translator("heartbeat_title")]
compact_lines = [translator(title_key or "heartbeat_title")]
_append_strategy_line(compact_lines, strategy_display_name=strategy_display_name, translator=translator)
if dry_run_only:
compact_lines.append(translator("dry_run_banner"))
Expand Down
2 changes: 2 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"income_locked": "🏦 收入层锁定占比: {ratio}",
"signal": "🎯 触发信号: {msg}",
"heartbeat_title": "💓 【心跳检测】",
"precheck_title": "🧪 【策略预检】",
"health_probe_title": "🔎 【连接探针】",
"health_probe_error_prefix": "健康探针异常:\n",
"equity": "💰 净值: ${value}",
Expand Down Expand Up @@ -196,6 +197,7 @@
"income_locked": "🏦 Income Locked: {ratio}",
"signal": "🎯 Signal: {msg}",
"heartbeat_title": "💓 【Heartbeat】",
"precheck_title": "🧪 【Strategy Precheck】",
"health_probe_title": "🔎 【Health Probe】",
"health_probe_error_prefix": "Health probe error:\n",
"equity": "💰 Equity: ${value}",
Expand Down
7 changes: 7 additions & 0 deletions tests/test_invoke_cloud_run_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ grep -Fq "google-github-actions/auth@v3" "$workflow_file"
grep -Fq "google-github-actions/setup-gcloud@v3" "$workflow_file"
grep -Fq "CLOUD_RUN_REGION: \${{ vars.CLOUD_RUN_REGION }}" "$workflow_file"
grep -Fq "CLOUD_RUN_SERVICE: \${{ vars.CLOUD_RUN_SERVICE }}" "$workflow_file"
grep -Fq "CLOUD_SCHEDULER_LOCATION: \${{ vars.CLOUD_SCHEDULER_LOCATION }}" "$workflow_file"
grep -Fq "longbridge-hk|longbridge-paper|longbridge-sg" "$workflow_file"
grep -Fq "gcloud run services describe \"\${CLOUD_RUN_SERVICE}\"" "$workflow_file"
grep -Fq "Cloud Run service \${CLOUD_RUN_SERVICE} has internal ingress" "$workflow_file"
grep -Fq "Use one of the scheduler-backed paths: /, /probe, /precheck." "$workflow_file"
grep -Fq "scheduler_job=\"\${CLOUD_RUN_SERVICE}-precheck-scheduler\"" "$workflow_file"
grep -Fq "Invoke internal service through Cloud Scheduler" "$workflow_file"
grep -Fq "gcloud scheduler jobs run \"\${scheduler_job}\"" "$workflow_file"
grep -Fq "token_format: id_token" "$workflow_file"
grep -Fq "id_token_audience: \${{ steps.service.outputs.url }}" "$workflow_file"
grep -Fq "id_token_include_email: true" "$workflow_file"
grep -Fq "if: steps.service.outputs.invoke_method == 'direct'" "$workflow_file"
grep -Fq "curl --fail-with-body --show-error --silent" "$workflow_file"
grep -Fq -- "--request POST" "$workflow_file"
grep -Fq "steps.invoke-auth.outputs.id_token" "$workflow_file"
31 changes: 31 additions & 0 deletions tests/test_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,37 @@ def test_heartbeat_signal_snapshot_localizes_price_source(self):
self.assertIn("📊 市场状态: 🚀 风险开启(SOXX+SOXL)", rendered.compact_text)
self.assertNotIn("longbridge_candlesticks", rendered.compact_text)

def test_precheck_heartbeat_uses_precheck_title(self):
rendered = render_heartbeat_notification(
execution={
"signal_display": "🚀 入场信号 | 原因:QQQ 高于 MA200",
},
skip_logs=(),
note_logs=(),
translator=build_translator("zh"),
separator="━━━━━━━━━━━━━━━━━━",
strategy_display_name="TQQQ 增长收益",
dry_run_only=True,
title_key="precheck_title",
)
en_rendered = render_heartbeat_notification(
execution={
"signal_display": "Entry signal | reason: QQQ is above MA200",
},
skip_logs=(),
note_logs=(),
translator=build_translator("en"),
separator="━━━━━━━━━━━━━━━━━━",
strategy_display_name="TQQQ Growth Income",
dry_run_only=True,
title_key="precheck_title",
)

self.assertIn("🧪 【策略预检】", rendered.compact_text)
self.assertNotIn("💓 【心跳检测】", rendered.compact_text)
self.assertIn("🧪 【Strategy Precheck】", en_rendered.compact_text)
self.assertNotIn("💓 【Heartbeat】", en_rendered.compact_text)

def test_heartbeat_renders_tqqq_volatility_delever_risk_control(self):
zh_rendered = render_heartbeat_notification(
execution={
Expand Down
Loading