From 0d56300f1d7bf3ce070b96070e51beee9e06d13b Mon Sep 17 00:00:00 2001 From: lifujie25 Date: Wed, 25 Mar 2026 09:40:33 +0800 Subject: [PATCH 01/11] fix(webui): sync TemplateResponse fix and add multi-arch DockerHub publish --- .github/upstream-steelydk.digest | 1 + .github/workflows/docker-publish.yml | 108 ++++++++++++++++++++------- README.md | 20 ++++- src/web/app.py | 20 ++--- 4 files changed, 114 insertions(+), 35 deletions(-) create mode 100644 .github/upstream-steelydk.digest diff --git a/.github/upstream-steelydk.digest b/.github/upstream-steelydk.digest new file mode 100644 index 00000000..86e43146 --- /dev/null +++ b/.github/upstream-steelydk.digest @@ -0,0 +1 @@ +sha256:661c6386fd75a2499db0f3e75db0d8e60d62aa789c09602122cc36574390a03e diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 874d40b5..2768b97c 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -1,53 +1,110 @@ -name: Docker Image CI +name: Docker Publish (Multi-Arch) on: push: - branches: [ "master", "main" ] - # 当发布新版本时触发 - tags: [ 'v*.*.*' ] + branches: ["master", "main"] + tags: ["v*.*.*"] pull_request: - branches: [ "master", "main" ] + branches: ["master", "main"] + workflow_dispatch: + schedule: + - cron: "0 */6 * * *" env: - # GitHub Container Registry 的地址 - REGISTRY: ghcr.io - # 镜像名称,默认为 GitHub 用户名/仓库名 - IMAGE_NAME: ${{ github.repository }} + IMAGE_NAME: lifj25/codex-console + UPSTREAM_IMAGE: steelydk/codex-console:latest + UPSTREAM_DIGEST_FILE: .github/upstream-steelydk.digest jobs: + prepare: + runs-on: ubuntu-latest + permissions: + contents: write + outputs: + should_build: ${{ steps.decide.outputs.should_build }} + upstream_digest: ${{ steps.digest.outputs.upstream_digest }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Read upstream image digest + id: digest + run: | + set -euo pipefail + digest="$(docker buildx imagetools inspect "${UPSTREAM_IMAGE}" | awk '/^Digest:/ {print $2; exit}')" + echo "upstream_digest=${digest}" >> "$GITHUB_OUTPUT" + + - name: Decide whether to build + id: decide + run: | + set -euo pipefail + if [ "${{ github.event_name }}" != "schedule" ]; then + echo "should_build=true" >> "$GITHUB_OUTPUT" + exit 0 + fi + + last_digest="" + if [ -f "${UPSTREAM_DIGEST_FILE}" ]; then + last_digest="$(tr -d '\r\n' < "${UPSTREAM_DIGEST_FILE}")" + fi + + if [ "${last_digest}" = "${{ steps.digest.outputs.upstream_digest }}" ]; then + echo "should_build=false" >> "$GITHUB_OUTPUT" + else + echo "should_build=true" >> "$GITHUB_OUTPUT" + fi + + - name: Update tracked upstream digest + if: github.event_name == 'schedule' && steps.decide.outputs.should_build == 'true' + run: | + set -euo pipefail + echo "${{ steps.digest.outputs.upstream_digest }}" > "${UPSTREAM_DIGEST_FILE}" + + - name: Commit updated digest file + if: github.event_name == 'schedule' && steps.decide.outputs.should_build == 'true' + run: | + set -euo pipefail + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + git add "${UPSTREAM_DIGEST_FILE}" + if git diff --cached --quiet; then + exit 0 + fi + git commit -m "chore: update upstream digest [skip ci]" + git push + build-and-push-image: runs-on: ubuntu-latest + needs: prepare + if: needs.prepare.outputs.should_build == 'true' permissions: contents: read - packages: write - # 如果需要签名生成的镜像,可以使用 id-token: write - steps: - name: Checkout repository uses: actions/checkout@v4 - # 设置 Docker Buildx 用于构建多平台镜像 (可选) + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - # 登录到 Docker 镜像仓库 - # 如果只是在 PR 中测试构建,则跳过登录 - - name: Log in to the Container registry + - name: Log in to Docker Hub if: github.event_name != 'pull_request' uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} - # 提取 Docker 镜像的元数据(标签、注释等) - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@v5 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + images: docker.io/${{ env.IMAGE_NAME }} tags: | - type=schedule type=ref,event=branch type=ref,event=pr type=semver,pattern={{version}} @@ -56,13 +113,14 @@ jobs: type=sha type=raw,value=latest,enable={{is_default_branch}} - # 构建并推送 Docker 镜像 - - name: Build and push Docker image + - name: Build and push Docker image (linux/amd64,linux/arm64) uses: docker/build-push-action@v5 with: context: . + platforms: linux/amd64,linux/arm64 push: ${{ github.event_name != 'pull_request' }} + provenance: false tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha - cache-to: type=gha,mode=max \ No newline at end of file + cache-to: type=gha,mode=max diff --git a/README.md b/README.md index e99b6d02..f4cdd052 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,7 @@ docker run -d \ -e WEBUI_ACCESS_PASSWORD=your_secure_password \ -v $(pwd)/data:/app/data \ --name codex-console \ - ghcr.io//codex-console:latest + lifj25/codex-console:latest ``` 说明: @@ -156,6 +156,24 @@ docker run -d \ `-v $(pwd)/data:/app/data` 很重要,这会把数据库和账号数据持久化到宿主机。否则容器一重启,数据也可能跟着表演消失术。 +### Docker 镜像自动发布(amd64 + arm64) + +仓库内置了 GitHub Actions 工作流,会在 `main/master` 推送时自动构建并发布多架构镜像到 Docker Hub: + +- `linux/amd64` +- `linux/arm64` + +工作流文件: + +`/.github/workflows/docker-publish.yml` + +需要在 GitHub 仓库 Secrets 中配置: + +- `DOCKERHUB_USERNAME` +- `DOCKERHUB_TOKEN` + +另外工作流每 6 小时会检查一次 `steelydk/codex-console:latest` 的 digest,检测到变更时自动触发重建并发布。 + ## 使用远程 PostgreSQL ```bash diff --git a/src/web/app.py b/src/web/app.py index 09fb509e..c0c16a13 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -108,8 +108,9 @@ def _redirect_to_login(request: Request) -> RedirectResponse: async def login_page(request: Request, next: Optional[str] = "/"): """登录页面""" return templates.TemplateResponse( - "login.html", - {"request": request, "error": "", "next": next or "/"} + request=request, + name="login.html", + context={"request": request, "error": "", "next": next or "/"} ) @app.post("/login") @@ -118,8 +119,9 @@ async def login_submit(request: Request, password: str = Form(...), next: Option expected = get_settings().webui_access_password.get_secret_value() if not secrets.compare_digest(password, expected): return templates.TemplateResponse( - "login.html", - {"request": request, "error": "密码错误", "next": next or "/"}, + request=request, + name="login.html", + context={"request": request, "error": "密码错误", "next": next or "/"}, status_code=401 ) @@ -139,33 +141,33 @@ async def index(request: Request): """首页 - 注册页面""" if not _is_authenticated(request): return _redirect_to_login(request) - return templates.TemplateResponse("index.html", {"request": request}) + return templates.TemplateResponse(request=request, name="index.html", context={"request": request}) @app.get("/accounts", response_class=HTMLResponse) async def accounts_page(request: Request): """账号管理页面""" if not _is_authenticated(request): return _redirect_to_login(request) - return templates.TemplateResponse("accounts.html", {"request": request}) + return templates.TemplateResponse(request=request, name="accounts.html", context={"request": request}) @app.get("/email-services", response_class=HTMLResponse) async def email_services_page(request: Request): """邮箱服务管理页面""" if not _is_authenticated(request): return _redirect_to_login(request) - return templates.TemplateResponse("email_services.html", {"request": request}) + return templates.TemplateResponse(request=request, name="email_services.html", context={"request": request}) @app.get("/settings", response_class=HTMLResponse) async def settings_page(request: Request): """设置页面""" if not _is_authenticated(request): return _redirect_to_login(request) - return templates.TemplateResponse("settings.html", {"request": request}) + return templates.TemplateResponse(request=request, name="settings.html", context={"request": request}) @app.get("/payment", response_class=HTMLResponse) async def payment_page(request: Request): """支付页面""" - return templates.TemplateResponse("payment.html", {"request": request}) + return templates.TemplateResponse(request=request, name="payment.html", context={"request": request}) @app.on_event("startup") async def startup_event(): From e10aa9c1c2b6d79c3a0542a5a1c5c193d984e184 Mon Sep 17 00:00:00 2001 From: lifujie25 Date: Wed, 25 Mar 2026 09:43:19 +0800 Subject: [PATCH 02/11] chore(ci): remove steelydk digest tracking --- .github/upstream-steelydk.digest | 1 - .github/workflows/docker-publish.yml | 66 ---------------------------- README.md | 2 - 3 files changed, 69 deletions(-) delete mode 100644 .github/upstream-steelydk.digest diff --git a/.github/upstream-steelydk.digest b/.github/upstream-steelydk.digest deleted file mode 100644 index 86e43146..00000000 --- a/.github/upstream-steelydk.digest +++ /dev/null @@ -1 +0,0 @@ -sha256:661c6386fd75a2499db0f3e75db0d8e60d62aa789c09602122cc36574390a03e diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 2768b97c..05c7e08f 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -7,79 +7,13 @@ on: pull_request: branches: ["master", "main"] workflow_dispatch: - schedule: - - cron: "0 */6 * * *" env: IMAGE_NAME: lifj25/codex-console - UPSTREAM_IMAGE: steelydk/codex-console:latest - UPSTREAM_DIGEST_FILE: .github/upstream-steelydk.digest jobs: - prepare: - runs-on: ubuntu-latest - permissions: - contents: write - outputs: - should_build: ${{ steps.decide.outputs.should_build }} - upstream_digest: ${{ steps.digest.outputs.upstream_digest }} - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Read upstream image digest - id: digest - run: | - set -euo pipefail - digest="$(docker buildx imagetools inspect "${UPSTREAM_IMAGE}" | awk '/^Digest:/ {print $2; exit}')" - echo "upstream_digest=${digest}" >> "$GITHUB_OUTPUT" - - - name: Decide whether to build - id: decide - run: | - set -euo pipefail - if [ "${{ github.event_name }}" != "schedule" ]; then - echo "should_build=true" >> "$GITHUB_OUTPUT" - exit 0 - fi - - last_digest="" - if [ -f "${UPSTREAM_DIGEST_FILE}" ]; then - last_digest="$(tr -d '\r\n' < "${UPSTREAM_DIGEST_FILE}")" - fi - - if [ "${last_digest}" = "${{ steps.digest.outputs.upstream_digest }}" ]; then - echo "should_build=false" >> "$GITHUB_OUTPUT" - else - echo "should_build=true" >> "$GITHUB_OUTPUT" - fi - - - name: Update tracked upstream digest - if: github.event_name == 'schedule' && steps.decide.outputs.should_build == 'true' - run: | - set -euo pipefail - echo "${{ steps.digest.outputs.upstream_digest }}" > "${UPSTREAM_DIGEST_FILE}" - - - name: Commit updated digest file - if: github.event_name == 'schedule' && steps.decide.outputs.should_build == 'true' - run: | - set -euo pipefail - git config user.name "github-actions[bot]" - git config user.email "github-actions[bot]@users.noreply.github.com" - git add "${UPSTREAM_DIGEST_FILE}" - if git diff --cached --quiet; then - exit 0 - fi - git commit -m "chore: update upstream digest [skip ci]" - git push - build-and-push-image: runs-on: ubuntu-latest - needs: prepare - if: needs.prepare.outputs.should_build == 'true' permissions: contents: read steps: diff --git a/README.md b/README.md index f4cdd052..ee93047d 100644 --- a/README.md +++ b/README.md @@ -172,8 +172,6 @@ docker run -d \ - `DOCKERHUB_USERNAME` - `DOCKERHUB_TOKEN` -另外工作流每 6 小时会检查一次 `steelydk/codex-console:latest` 的 digest,检测到变更时自动触发重建并发布。 - ## 使用远程 PostgreSQL ```bash From 34fd4d6f68b22ad72fb474df164a81b4222cfe96 Mon Sep 17 00:00:00 2001 From: lifujie25 Date: Wed, 25 Mar 2026 14:54:40 +0800 Subject: [PATCH 03/11] feat(schedule): persist cron scheduler with TZ support and default batch settings --- Dockerfile | 2 +- docker-compose.yml | 3 +- src/config/settings.py | 34 +- src/web/app.py | 25 + src/web/routes/registration.py | 883 +++++++++++++++++++++++++++++---- static/js/app.js | 194 +++++++- static/js/settings.js | 230 ++++++++- templates/index.html | 24 +- templates/settings.html | 119 +++++ 9 files changed, 1376 insertions(+), 138 deletions(-) diff --git a/Dockerfile b/Dockerfile index db633977..6561b69e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ WORKDIR /app # 设置环境变量 ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ - # WebUI 默认配置 + TZ=Asia/Shanghai \ WEBUI_HOST=0.0.0.0 \ WEBUI_PORT=1455 \ LOG_LEVEL=info \ diff --git a/docker-compose.yml b/docker-compose.yml index e579408f..829e2810 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,10 +10,11 @@ services: - WEBUI_PORT=1455 - DEBUG=0 - LOG_LEVEL=info + - TZ=${TZ:-Asia/Shanghai} # 如果需要访问密码,可以在这里取消注释并设置 - WEBUI_ACCESS_PASSWORD=admin123 volumes: # 挂载数据目录以持久化数据库和日志 - ./data:/app/data - ./logs:/app/logs - restart: unless-stopped \ No newline at end of file + restart: unless-stopped diff --git a/src/config/settings.py b/src/config/settings.py index 2803afd5..6c53f6b4 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -6,7 +6,7 @@ import os from typing import Optional, Dict, Any, Type, List from enum import Enum -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, Field, field_validator from pydantic.types import SecretStr from dataclasses import dataclass @@ -248,6 +248,30 @@ class SettingDefinition: category=SettingCategory.REGISTRATION, description="注册间隔最大值(秒)" ), + "registration_schedule_enabled": SettingDefinition( + db_key="registration.schedule_enabled", + default_value=False, + category=SettingCategory.REGISTRATION, + description="是否启用定时注册" + ), + "registration_schedule_interval_minutes": SettingDefinition( + db_key="registration.schedule_interval_minutes", + default_value=30, + category=SettingCategory.REGISTRATION, + description="定时注册触发间隔(分钟)" + ), + "registration_schedule_cron": SettingDefinition( + db_key="registration.schedule_cron", + default_value="*/30 * * * *", + category=SettingCategory.REGISTRATION, + description="定时注册 Cron(5位)" + ), + "registration_schedule_payload": SettingDefinition( + db_key="registration.schedule_payload", + default_value={}, + category=SettingCategory.REGISTRATION, + description="定时注册请求参数快照" + ), # 邮箱服务配置 "email_service_priority": SettingDefinition( @@ -400,6 +424,10 @@ class SettingDefinition: "registration_default_password_length": int, "registration_sleep_min": int, "registration_sleep_max": int, + "registration_schedule_enabled": bool, + "registration_schedule_interval_minutes": int, + "registration_schedule_cron": str, + "registration_schedule_payload": dict, "email_service_priority": dict, "tempmail_timeout": int, "tempmail_max_retries": int, @@ -663,6 +691,10 @@ def proxy_url(self) -> Optional[str]: registration_default_password_length: int = 12 registration_sleep_min: int = 5 registration_sleep_max: int = 30 + registration_schedule_enabled: bool = False + registration_schedule_interval_minutes: int = 30 + registration_schedule_cron: str = "*/30 * * * *" + registration_schedule_payload: Dict[str, Any] = Field(default_factory=dict) # 邮箱服务配置 email_service_priority: Dict[str, int] = {"tempmail": 0, "outlook": 1, "moe_mail": 2} diff --git a/src/web/app.py b/src/web/app.py index c0c16a13..154d2c53 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -4,10 +4,12 @@ """ import logging +import os import sys import secrets import hmac import hashlib +import time from typing import Optional from pathlib import Path @@ -46,6 +48,18 @@ def _build_static_asset_version(static_dir: Path) -> str: return str(latest_mtime or 1) +def _ensure_process_timezone() -> str: + """统一进程时区:支持传入 TZ,未传默认 Asia/Shanghai。""" + tz_name = (os.environ.get("TZ") or "Asia/Shanghai").strip() or "Asia/Shanghai" + os.environ["TZ"] = tz_name + if hasattr(time, "tzset"): + try: + time.tzset() + except Exception as e: + logger.warning(f"设置时区失败(TZ={tz_name}): {e}") + return tz_name + + def create_app() -> FastAPI: """创建 FastAPI 应用实例""" settings = get_settings() @@ -174,6 +188,10 @@ async def startup_event(): """应用启动事件""" import asyncio from ..database.init_db import initialize_database + from .routes.registration import restore_scheduled_registration_from_settings + + # 统一时区(容器可通过 TZ 传入,不传默认 Asia/Shanghai) + tz_name = _ensure_process_timezone() # 确保数据库已初始化(reload 模式下子进程也需要初始化) try: @@ -185,9 +203,16 @@ async def startup_event(): loop = asyncio.get_event_loop() task_manager.set_loop(loop) + # 从持久化设置恢复定时注册任务 + try: + await restore_scheduled_registration_from_settings() + except Exception as e: + logger.warning(f"恢复定时注册失败: {e}") + logger.info("=" * 50) logger.info(f"{settings.app_name} v{settings.app_version} 启动中,程序正在伸懒腰...") logger.info(f"调试模式: {settings.debug}") + logger.info(f"运行时区: {tz_name}") logger.info(f"数据库连接已接好线: {settings.database_url}") logger.info("=" * 50) diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index daa92e5e..f19c6bc3 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -4,10 +4,13 @@ import asyncio import logging +import os import uuid import random -from datetime import datetime -from typing import List, Optional, Dict, Tuple +import threading +from datetime import datetime, timedelta, timezone +from typing import List, Optional, Dict, Tuple, Any +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from fastapi import APIRouter, HTTPException, Query, BackgroundTasks from pydantic import BaseModel, Field @@ -28,6 +31,26 @@ # 批量任务存储 batch_tasks: Dict[str, dict] = {} +# 定时注册任务存储(内存态,重启后清空) +scheduled_registration_lock = threading.Lock() +scheduled_registration: Dict[str, Any] = { + "enabled": False, + "schedule_cron": "*/30 * * * *", + "timezone": "Asia/Shanghai", + "registration_mode": "batch", # single | batch + "payload": None, # 保留请求快照 + "next_run_at": None, + "last_run_at": None, + "last_error": None, + "total_runs": 0, + "success_runs": 0, + "failed_runs": 0, + "skipped_runs": 0, + "active_task_uuid": None, + "active_batch_id": None, +} +scheduled_registration_runner: Optional[asyncio.Task] = None + # ============== Proxy Helper Functions ============== @@ -81,7 +104,7 @@ class RegistrationTaskCreate(BaseModel): class BatchRegistrationRequest(BaseModel): """批量注册请求""" - count: int = 1 + count: int = 100 email_service_type: str = "tempmail" proxy: Optional[str] = None email_service_config: Optional[dict] = None @@ -175,6 +198,47 @@ class OutlookBatchRegistrationResponse(BaseModel): service_ids: List[int] # 实际要注册的服务 ID +class ScheduledRegistrationRequest(BaseModel): + """定时注册请求""" + schedule_cron: str = "*/30 * * * *" # 5 位 Cron: 分 时 日 月 周 + schedule_interval_minutes: Optional[int] = Field(default=None, ge=1, le=1440) # 兼容旧参数 + run_immediately: bool = True + registration_mode: str = "batch" # single | batch + email_service_type: str = "tempmail" + proxy: Optional[str] = None + email_service_config: Optional[dict] = None + email_service_id: Optional[int] = None + count: int = 100 + interval_min: int = 5 + interval_max: int = 30 + concurrency: int = 1 + mode: str = "pipeline" + auto_upload_cpa: bool = False + cpa_service_ids: List[int] = [] + auto_upload_sub2api: bool = False + sub2api_service_ids: List[int] = [] + auto_upload_tm: bool = False + tm_service_ids: List[int] = [] + + +class ScheduledRegistrationStatusResponse(BaseModel): + """定时注册状态响应""" + enabled: bool + schedule_cron: str + timezone: str + registration_mode: str + payload: Dict[str, Any] = Field(default_factory=dict) + next_run_at: Optional[str] = None + last_run_at: Optional[str] = None + last_error: Optional[str] = None + total_runs: int = 0 + success_runs: int = 0 + failed_runs: int = 0 + skipped_runs: int = 0 + active_task_uuid: Optional[str] = None + active_batch_id: Optional[str] = None + + # ============== Helper Functions ============== def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse: @@ -221,6 +285,592 @@ def _normalize_email_service_config( return normalized +def _get_schedule_timezone_name() -> str: + """获取定时任务使用的时区名称:优先环境变量 TZ,否则 Asia/Shanghai。""" + tz_name = (os.environ.get("TZ") or "Asia/Shanghai").strip() + return tz_name or "Asia/Shanghai" + + +def _get_schedule_timezone(): + """获取时区对象。""" + tz_name = _get_schedule_timezone_name() + try: + return ZoneInfo(tz_name) + except ZoneInfoNotFoundError: + logger.warning(f"无效 TZ={tz_name},将回退到 Asia/Shanghai") + try: + return ZoneInfo("Asia/Shanghai") + except ZoneInfoNotFoundError: + # 某些精简镜像不包含 tzdata,兜底为固定东八区,确保与默认时区一致 + logger.warning("系统缺少 Asia/Shanghai 时区数据,回退到固定 +08:00") + return timezone(timedelta(hours=8), name="Asia/Shanghai") + + +def _now_in_schedule_tz() -> datetime: + """获取当前调度时区时间。""" + return datetime.now(_get_schedule_timezone()) + + +def _dt_to_iso(value: Optional[datetime]) -> Optional[str]: + """将时间转换为 ISO 字符串。""" + if not value: + return None + if value.tzinfo is None: + value = value.replace(tzinfo=_get_schedule_timezone()) + return value.isoformat() + + +def _legacy_interval_to_cron(interval_minutes: int) -> str: + """将旧版 interval(分钟)转换为 5 位 Cron。""" + if interval_minutes < 1 or interval_minutes > 1440: + raise ValueError("旧版 interval 必须在 1-1440 分钟之间") + + if interval_minutes <= 59: + return f"*/{interval_minutes} * * * *" + if interval_minutes == 60: + return "0 * * * *" + if interval_minutes % 60 == 0: + hours = interval_minutes // 60 + if 1 <= hours <= 23: + return f"0 */{hours} * * *" + if hours == 24: + return "0 0 * * *" + + raise ValueError("旧版 interval 仅支持 1-59、整小时或 24 小时,请改用 5 位 Cron 表达式") + + +def _cron_to_legacy_interval_minutes(cron_expr: str) -> int: + """尽力将 Cron 回写到旧字段(仅用于兼容)。""" + parts = " ".join((cron_expr or "").split()).split(" ") + if len(parts) != 5: + return 30 + + minute, hour, day, month, weekday = parts + if hour == "*" and day == "*" and month == "*" and weekday == "*" and minute.startswith("*/"): + try: + val = int(minute[2:]) + if 1 <= val <= 59: + return val + except ValueError: + return 30 + + if minute == "0" and hour == "*" and day == "*" and month == "*" and weekday == "*": + return 60 + + if minute == "0" and day == "*" and month == "*" and weekday == "*" and hour.startswith("*/"): + try: + val = int(hour[2:]) + if 1 <= val <= 23: + return val * 60 + except ValueError: + return 30 + + if minute == "0" and hour == "0" and day == "*" and month == "*" and weekday == "*": + return 1440 + + return 30 + + +def _parse_cron_field(field: str, min_value: int, max_value: int, field_name: str, is_weekday: bool = False) -> set: + """解析单个 Cron 字段,支持 *, */n, a, a-b, a-b/n, a,b,c。""" + values = set() + tokens = [token.strip() for token in field.split(",") if token.strip()] + if not tokens: + raise ValueError(f"{field_name} 字段不能为空") + + for token in tokens: + step = 1 + base = token + if "/" in token: + base, step_str = token.split("/", 1) + if not step_str.isdigit(): + raise ValueError(f"{field_name} 字段步长无效: {token}") + step = int(step_str) + if step <= 0: + raise ValueError(f"{field_name} 字段步长必须大于 0: {token}") + + def _normalize(v: int) -> int: + if is_weekday and v == 7: + return 0 + return v + + def _validate(v: int) -> int: + nv = _normalize(v) + upper = 6 if is_weekday else max_value + lower = min_value + if nv < lower or nv > upper: + raise ValueError(f"{field_name} 字段超出范围 {min_value}-{max_value}: {token}") + return nv + + if base in ("*", ""): + for raw in range(min_value, max_value + 1, step): + values.add(_normalize(raw)) + continue + + if "-" in base: + start_str, end_str = base.split("-", 1) + if not start_str.isdigit() or not end_str.isdigit(): + raise ValueError(f"{field_name} 字段范围无效: {token}") + start = int(start_str) + end = int(end_str) + if start > end: + raise ValueError(f"{field_name} 字段范围无效: {token}") + for raw in range(start, end + 1, step): + values.add(_validate(raw)) + continue + + if not base.isdigit(): + raise ValueError(f"{field_name} 字段值无效: {token}") + + start = int(base) + if step == 1: + values.add(_validate(start)) + else: + for raw in range(start, max_value + 1, step): + values.add(_validate(raw)) + + return values + + +def _parse_cron_expression(cron_expr: str) -> Tuple[str, Dict[str, Any]]: + """解析 5 位 Cron 表达式。""" + normalized = " ".join((cron_expr or "").split()) + parts = normalized.split(" ") + if len(parts) != 5: + raise ValueError("Cron 必须是 5 位(分 时 日 月 周)") + + minute_field, hour_field, day_field, month_field, weekday_field = parts + parsed = { + "minute": _parse_cron_field(minute_field, 0, 59, "分钟"), + "hour": _parse_cron_field(hour_field, 0, 23, "小时"), + "day": _parse_cron_field(day_field, 1, 31, "日期"), + "month": _parse_cron_field(month_field, 1, 12, "月份"), + "weekday": _parse_cron_field(weekday_field, 0, 7, "星期", is_weekday=True), + "day_any": day_field == "*", + "weekday_any": weekday_field == "*", + } + return normalized, parsed + + +def _validate_schedule_cron_or_raise(cron_expr: str) -> str: + """校验 Cron 并返回规范化结果。""" + try: + normalized, _ = _parse_cron_expression(cron_expr) + return normalized + except ValueError as e: + raise HTTPException(status_code=400, detail=f"Cron 表达式无效: {e}") + + +def _cron_day_matches(dt: datetime, parsed: Dict[str, Any]) -> bool: + """Cron 的 day/month/week 匹配规则(与标准 cron 一致)。""" + day_match = dt.day in parsed["day"] + weekday_now = (dt.weekday() + 1) % 7 # Python: Mon=0, Cron: Sun=0 + weekday_match = weekday_now in parsed["weekday"] + + if parsed["day_any"] and parsed["weekday_any"]: + return True + if parsed["day_any"]: + return weekday_match + if parsed["weekday_any"]: + return day_match + return day_match or weekday_match + + +def _cron_matches(dt: datetime, parsed: Dict[str, Any]) -> bool: + return ( + dt.minute in parsed["minute"] + and dt.hour in parsed["hour"] + and dt.month in parsed["month"] + and _cron_day_matches(dt, parsed) + ) + + +def _get_next_run_from_cron(cron_expr: str, after_dt: Optional[datetime] = None) -> datetime: + """计算下一次触发时间(调度时区)。""" + normalized, parsed = _parse_cron_expression(cron_expr) + tz = _get_schedule_timezone() + now = (after_dt or _now_in_schedule_tz()).astimezone(tz) + probe = now.replace(second=0, microsecond=0) + timedelta(minutes=1) + + # 最多搜索 2 年,足够覆盖常见 Cron 场景 + for _ in range(2 * 366 * 24 * 60): + if _cron_matches(probe, parsed): + return probe + probe += timedelta(minutes=1) + + raise RuntimeError(f"无法计算下一次触发时间,请检查 Cron: {normalized}") + + +def _get_scheduled_registration_status() -> ScheduledRegistrationStatusResponse: + """获取定时注册状态快照。""" + with scheduled_registration_lock: + state = scheduled_registration.copy() + + return ScheduledRegistrationStatusResponse( + enabled=bool(state.get("enabled")), + schedule_cron=state.get("schedule_cron") or "*/30 * * * *", + timezone=state.get("timezone") or _get_schedule_timezone_name(), + registration_mode=state.get("registration_mode") or "batch", + payload=state.get("payload") or {}, + next_run_at=_dt_to_iso(state.get("next_run_at")), + last_run_at=_dt_to_iso(state.get("last_run_at")), + last_error=state.get("last_error"), + total_runs=int(state.get("total_runs") or 0), + success_runs=int(state.get("success_runs") or 0), + failed_runs=int(state.get("failed_runs") or 0), + skipped_runs=int(state.get("skipped_runs") or 0), + active_task_uuid=state.get("active_task_uuid"), + active_batch_id=state.get("active_batch_id"), + ) + + +def _save_scheduled_registration_settings(enabled: bool, schedule_cron: str, payload: Optional[dict] = None): + """将定时注册配置持久化到 settings。""" + from ...config.settings import update_settings + + cron_normalized = _validate_schedule_cron_or_raise(schedule_cron) + update_settings( + registration_schedule_enabled=enabled, + registration_schedule_interval_minutes=_cron_to_legacy_interval_minutes(cron_normalized), + registration_schedule_cron=cron_normalized, + registration_schedule_payload=payload or {}, + ) + + +async def restore_scheduled_registration_from_settings(): + """应用启动时从 settings 恢复定时注册配置。""" + global scheduled_registration_runner + settings = get_settings() + tz_name = _get_schedule_timezone_name() + + enabled = bool(settings.registration_schedule_enabled) + saved_cron = (getattr(settings, "registration_schedule_cron", "") or "").strip() + legacy_interval = int(settings.registration_schedule_interval_minutes or 30) + raw_payload = settings.registration_schedule_payload or {} + + fallback_cron = saved_cron or _legacy_interval_to_cron(legacy_interval) + payload: dict = {} + schedule_cron = fallback_cron + + if raw_payload: + try: + normalized_payload = dict(raw_payload) + if not normalized_payload.get("schedule_cron"): + legacy = normalized_payload.get("schedule_interval_minutes") + if legacy is not None: + normalized_payload["schedule_cron"] = _legacy_interval_to_cron(int(legacy)) + else: + normalized_payload["schedule_cron"] = fallback_cron + request_payload = ScheduledRegistrationRequest(**normalized_payload) + payload = request_payload.model_dump(exclude_none=True) + schedule_cron = payload.get("schedule_cron", fallback_cron) + except Exception as e: + logger.warning(f"定时注册配置恢复失败,将自动禁用: {e}") + enabled = False + payload = {} + try: + _save_scheduled_registration_settings(False, fallback_cron, {}) + except Exception: + pass + elif enabled: + logger.warning("定时注册已启用但缺少有效配置,已自动禁用") + enabled = False + try: + _save_scheduled_registration_settings(False, fallback_cron, {}) + except Exception: + pass + + try: + schedule_cron = _validate_schedule_cron_or_raise(schedule_cron) + except HTTPException as e: + logger.warning(f"定时注册 Cron 无效,将自动禁用: {e.detail}") + enabled = False + payload = {} + schedule_cron = "*/30 * * * *" + try: + _save_scheduled_registration_settings(False, schedule_cron, {}) + except Exception: + pass + now = _now_in_schedule_tz() + with scheduled_registration_lock: + scheduled_registration["enabled"] = enabled and bool(payload) + scheduled_registration["schedule_cron"] = schedule_cron + scheduled_registration["timezone"] = tz_name + scheduled_registration["registration_mode"] = payload.get("registration_mode", "batch") if payload else "batch" + scheduled_registration["payload"] = payload + scheduled_registration["next_run_at"] = _get_next_run_from_cron(schedule_cron, now) if (enabled and payload) else None + scheduled_registration["last_run_at"] = None + scheduled_registration["last_error"] = None + scheduled_registration["total_runs"] = 0 + scheduled_registration["success_runs"] = 0 + scheduled_registration["failed_runs"] = 0 + scheduled_registration["skipped_runs"] = 0 + scheduled_registration["active_task_uuid"] = None + scheduled_registration["active_batch_id"] = None + + if scheduled_registration["enabled"]: + if not scheduled_registration_runner or scheduled_registration_runner.done(): + scheduled_registration_runner = asyncio.create_task(_scheduled_registration_loop()) + logger.info( + "已从 settings 恢复定时注册:cron=%s, tz=%s, mode=%s", + scheduled_registration["schedule_cron"], + scheduled_registration["timezone"], + scheduled_registration["registration_mode"], + ) + + +def _validate_email_service_type(email_service_type: str): + """验证邮箱服务类型。""" + try: + EmailServiceType(email_service_type) + except ValueError: + raise HTTPException( + status_code=400, + detail=f"无效的邮箱服务类型: {email_service_type}" + ) + + +def _validate_batch_params(count: int, interval_min: int, interval_max: int, concurrency: int, mode: str): + """验证批量参数。""" + if count < 1 or count > 100: + raise HTTPException(status_code=400, detail="注册数量必须在 1-100 之间") + + if interval_min < 0 or interval_max < interval_min: + raise HTTPException(status_code=400, detail="间隔时间参数无效") + + if not 1 <= concurrency <= 50: + raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间") + + if mode not in ("parallel", "pipeline"): + raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline") + + +async def _enqueue_single_registration( + request: RegistrationTaskCreate, + background_tasks: Optional[BackgroundTasks] = None, +) -> RegistrationTaskResponse: + """创建并启动单次注册任务。""" + _validate_email_service_type(request.email_service_type) + + task_uuid = str(uuid.uuid4()) + with get_db() as db: + task = crud.create_registration_task( + db, + task_uuid=task_uuid, + proxy=request.proxy + ) + + args = ( + task_uuid, + request.email_service_type, + request.proxy, + request.email_service_config, + request.email_service_id, + "", + "", + request.auto_upload_cpa, + request.cpa_service_ids, + request.auto_upload_sub2api, + request.sub2api_service_ids, + request.auto_upload_tm, + request.tm_service_ids, + ) + + if background_tasks is not None: + background_tasks.add_task(run_registration_task, *args) + else: + asyncio.create_task(run_registration_task(*args)) + + return task_to_response(task) + + +async def _enqueue_batch_registration( + request: BatchRegistrationRequest, + background_tasks: Optional[BackgroundTasks] = None, +) -> BatchRegistrationResponse: + """创建并启动批量注册任务。""" + _validate_email_service_type(request.email_service_type) + _validate_batch_params( + count=request.count, + interval_min=request.interval_min, + interval_max=request.interval_max, + concurrency=request.concurrency, + mode=request.mode, + ) + + batch_id = str(uuid.uuid4()) + task_uuids = [] + + with get_db() as db: + for _ in range(request.count): + task_uuid = str(uuid.uuid4()) + crud.create_registration_task( + db, + task_uuid=task_uuid, + proxy=request.proxy + ) + task_uuids.append(task_uuid) + + with get_db() as db: + tasks = [crud.get_registration_task(db, task_uuid) for task_uuid in task_uuids] + + args = ( + batch_id, + task_uuids, + request.email_service_type, + request.proxy, + request.email_service_config, + request.email_service_id, + request.interval_min, + request.interval_max, + request.concurrency, + request.mode, + request.auto_upload_cpa, + request.cpa_service_ids, + request.auto_upload_sub2api, + request.sub2api_service_ids, + request.auto_upload_tm, + request.tm_service_ids, + ) + + if background_tasks is not None: + background_tasks.add_task(run_batch_registration, *args) + else: + asyncio.create_task(run_batch_registration(*args)) + + return BatchRegistrationResponse( + batch_id=batch_id, + count=request.count, + tasks=[task_to_response(task) for task in tasks if task] + ) + + +def _has_active_scheduled_job() -> bool: + """检查上一次定时触发的任务是否还在运行。""" + with scheduled_registration_lock: + active_task_uuid = scheduled_registration.get("active_task_uuid") + active_batch_id = scheduled_registration.get("active_batch_id") + + if active_task_uuid: + with get_db() as db: + task = crud.get_registration_task(db, active_task_uuid) + if task and task.status in ("pending", "running"): + return True + + if active_batch_id: + batch = batch_tasks.get(active_batch_id) + if batch and not batch.get("finished", False): + return True + + return False + + +async def _run_scheduled_registration_once(): + """执行一次定时注册触发。""" + with scheduled_registration_lock: + payload = dict(scheduled_registration.get("payload") or {}) + registration_mode = scheduled_registration.get("registration_mode") or "batch" + + if not payload: + raise RuntimeError("定时任务配置为空") + + common_kwargs = { + "email_service_type": payload.get("email_service_type") or "tempmail", + "proxy": payload.get("proxy"), + "email_service_config": payload.get("email_service_config"), + "email_service_id": payload.get("email_service_id"), + "auto_upload_cpa": bool(payload.get("auto_upload_cpa")), + "cpa_service_ids": payload.get("cpa_service_ids") or [], + "auto_upload_sub2api": bool(payload.get("auto_upload_sub2api")), + "sub2api_service_ids": payload.get("sub2api_service_ids") or [], + "auto_upload_tm": bool(payload.get("auto_upload_tm")), + "tm_service_ids": payload.get("tm_service_ids") or [], + } + + if registration_mode == "single": + response = await _enqueue_single_registration(RegistrationTaskCreate(**common_kwargs)) + with scheduled_registration_lock: + scheduled_registration["active_task_uuid"] = response.task_uuid + scheduled_registration["active_batch_id"] = None + logger.info(f"定时注册触发成功(单次任务): {response.task_uuid}") + return + + if registration_mode == "batch": + response = await _enqueue_batch_registration(BatchRegistrationRequest( + **common_kwargs, + count=int(payload.get("count") or 100), + interval_min=int(payload.get("interval_min") or 5), + interval_max=int(payload.get("interval_max") or 30), + concurrency=int(payload.get("concurrency") or 1), + mode=payload.get("mode") or "pipeline", + )) + with scheduled_registration_lock: + scheduled_registration["active_task_uuid"] = None + scheduled_registration["active_batch_id"] = response.batch_id + logger.info(f"定时注册触发成功(批量任务): {response.batch_id}") + return + + raise RuntimeError(f"不支持的定时注册模式: {registration_mode}") + + +async def _scheduled_registration_loop(): + """后台循环:到点触发注册任务。""" + global scheduled_registration_runner + logger.info("定时注册调度器已启动 (tz=%s)", _get_schedule_timezone_name()) + + try: + while True: + await asyncio.sleep(1) + + with scheduled_registration_lock: + enabled = bool(scheduled_registration.get("enabled")) + schedule_cron = scheduled_registration.get("schedule_cron") or "*/30 * * * *" + next_run_at = scheduled_registration.get("next_run_at") + + if not enabled: + break + + now = _now_in_schedule_tz() + if not next_run_at: + with scheduled_registration_lock: + scheduled_registration["next_run_at"] = _get_next_run_from_cron(schedule_cron, now) + continue + + if now < next_run_at: + continue + + # 默认一个定时器只维持一条执行链,上一轮还在跑就跳过本轮 + if _has_active_scheduled_job(): + with scheduled_registration_lock: + scheduled_registration["next_run_at"] = _get_next_run_from_cron(schedule_cron, now) + scheduled_registration["skipped_runs"] = int(scheduled_registration.get("skipped_runs") or 0) + 1 + logger.info("定时注册本轮跳过:上一轮任务仍在执行") + continue + + with scheduled_registration_lock: + scheduled_registration["last_run_at"] = now + scheduled_registration["next_run_at"] = _get_next_run_from_cron(schedule_cron, now) + scheduled_registration["last_error"] = None + + try: + await _run_scheduled_registration_once() + with scheduled_registration_lock: + scheduled_registration["total_runs"] = int(scheduled_registration.get("total_runs") or 0) + 1 + scheduled_registration["success_runs"] = int(scheduled_registration.get("success_runs") or 0) + 1 + except Exception as e: + err_message = e.detail if isinstance(e, HTTPException) else str(e) + logger.error(f"定时注册触发失败: {err_message}") + with scheduled_registration_lock: + scheduled_registration["total_runs"] = int(scheduled_registration.get("total_runs") or 0) + 1 + scheduled_registration["failed_runs"] = int(scheduled_registration.get("failed_runs") or 0) + 1 + scheduled_registration["last_error"] = err_message + except asyncio.CancelledError: + logger.info("定时注册调度器已停止") + raise + finally: + scheduled_registration_runner = None + + def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False, cpa_service_ids: List[int] = None, auto_upload_sub2api: bool = False, sub2api_service_ids: List[int] = None, auto_upload_tm: bool = False, tm_service_ids: List[int] = None): """ 在线程池中执行的同步注册任务 @@ -814,44 +1464,7 @@ async def start_registration( - proxy: 代理地址 - email_service_config: 邮箱服务配置(outlook 需要提供账户信息) """ - # 验证邮箱服务类型 - try: - EmailServiceType(request.email_service_type) - except ValueError: - raise HTTPException( - status_code=400, - detail=f"无效的邮箱服务类型: {request.email_service_type}" - ) - - # 创建任务 - task_uuid = str(uuid.uuid4()) - - with get_db() as db: - task = crud.create_registration_task( - db, - task_uuid=task_uuid, - proxy=request.proxy - ) - - # 在后台运行注册任务 - background_tasks.add_task( - run_registration_task, - task_uuid, - request.email_service_type, - request.proxy, - request.email_service_config, - request.email_service_id, - "", - "", - request.auto_upload_cpa, - request.cpa_service_ids, - request.auto_upload_sub2api, - request.sub2api_service_ids, - request.auto_upload_tm, - request.tm_service_ids, - ) - - return task_to_response(task) + return await _enqueue_single_registration(request, background_tasks) @router.post("/batch", response_model=BatchRegistrationResponse) @@ -868,71 +1481,7 @@ async def start_batch_registration( - interval_min: 最小间隔秒数 - interval_max: 最大间隔秒数 """ - # 验证参数 - if request.count < 1 or request.count > 100: - raise HTTPException(status_code=400, detail="注册数量必须在 1-100 之间") - - try: - EmailServiceType(request.email_service_type) - except ValueError: - raise HTTPException( - status_code=400, - detail=f"无效的邮箱服务类型: {request.email_service_type}" - ) - - if request.interval_min < 0 or request.interval_max < request.interval_min: - raise HTTPException(status_code=400, detail="间隔时间参数无效") - - if not 1 <= request.concurrency <= 50: - raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间") - - if request.mode not in ("parallel", "pipeline"): - raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline") - - # 创建批量任务 - batch_id = str(uuid.uuid4()) - task_uuids = [] - - with get_db() as db: - for _ in range(request.count): - task_uuid = str(uuid.uuid4()) - task = crud.create_registration_task( - db, - task_uuid=task_uuid, - proxy=request.proxy - ) - task_uuids.append(task_uuid) - - # 获取所有任务 - with get_db() as db: - tasks = [crud.get_registration_task(db, uuid) for uuid in task_uuids] - - # 在后台运行批量注册 - background_tasks.add_task( - run_batch_registration, - batch_id, - task_uuids, - request.email_service_type, - request.proxy, - request.email_service_config, - request.email_service_id, - request.interval_min, - request.interval_max, - request.concurrency, - request.mode, - request.auto_upload_cpa, - request.cpa_service_ids, - request.auto_upload_sub2api, - request.sub2api_service_ids, - request.auto_upload_tm, - request.tm_service_ids, - ) - - return BatchRegistrationResponse( - batch_id=batch_id, - count=request.count, - tasks=[task_to_response(t) for t in tasks if t] - ) + return await _enqueue_batch_registration(request, background_tasks) @router.get("/batch/{batch_id}") @@ -970,6 +1519,128 @@ async def cancel_batch(batch_id: str): return {"success": True, "message": "批量任务取消请求已提交,正在让它们有序收工"} +@router.get("/schedule/status", response_model=ScheduledRegistrationStatusResponse) +async def get_scheduled_registration_status(): + """获取定时注册状态。""" + return _get_scheduled_registration_status() + + +@router.post("/schedule/start", response_model=ScheduledRegistrationStatusResponse) +async def start_scheduled_registration(request: ScheduledRegistrationRequest): + """启动(或覆盖)定时注册任务。""" + global scheduled_registration_runner + + if request.registration_mode not in ("single", "batch"): + raise HTTPException(status_code=400, detail="定时注册模式必须为 single 或 batch") + + _validate_email_service_type(request.email_service_type) + if request.registration_mode == "batch": + _validate_batch_params( + count=request.count, + interval_min=request.interval_min, + interval_max=request.interval_max, + concurrency=request.concurrency, + mode=request.mode, + ) + + # 兼容旧参数:schedule_interval_minutes -> schedule_cron + try: + if request.schedule_interval_minutes is not None: + schedule_cron = _legacy_interval_to_cron(int(request.schedule_interval_minutes)) + else: + schedule_cron = request.schedule_cron + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + schedule_cron = _validate_schedule_cron_or_raise(schedule_cron) + payload_to_store = request.model_dump(exclude_none=True) + payload_to_store["schedule_cron"] = schedule_cron + payload_to_store.pop("schedule_interval_minutes", None) + + now = _now_in_schedule_tz() + first_run_at = now if request.run_immediately else _get_next_run_from_cron(schedule_cron, now) + tz_name = _get_schedule_timezone_name() + + with scheduled_registration_lock: + scheduled_registration["enabled"] = True + scheduled_registration["schedule_cron"] = schedule_cron + scheduled_registration["timezone"] = tz_name + scheduled_registration["registration_mode"] = request.registration_mode + scheduled_registration["payload"] = payload_to_store + scheduled_registration["next_run_at"] = first_run_at + scheduled_registration["last_run_at"] = None + scheduled_registration["last_error"] = None + scheduled_registration["total_runs"] = 0 + scheduled_registration["success_runs"] = 0 + scheduled_registration["failed_runs"] = 0 + scheduled_registration["skipped_runs"] = 0 + scheduled_registration["active_task_uuid"] = None + scheduled_registration["active_batch_id"] = None + + try: + _save_scheduled_registration_settings( + enabled=True, + schedule_cron=schedule_cron, + payload=payload_to_store, + ) + except Exception as e: + logger.error(f"保存定时注册设置失败: {e}") + raise HTTPException(status_code=500, detail=f"保存定时注册设置失败: {e}") + + # 修改定时配置时,明确重启调度器,避免沿用旧触发链 + old_runner = scheduled_registration_runner + if old_runner and not old_runner.done(): + old_runner.cancel() + try: + await old_runner + except asyncio.CancelledError: + pass + + scheduled_registration_runner = asyncio.create_task(_scheduled_registration_loop()) + + logger.info( + "定时注册已启动:cron=%s, tz=%s, mode=%s, first_run_at=%s", + schedule_cron, + tz_name, + request.registration_mode, + first_run_at.isoformat() + ) + return _get_scheduled_registration_status() + + +@router.post("/schedule/stop", response_model=ScheduledRegistrationStatusResponse) +async def stop_scheduled_registration(): + """停止定时注册任务。""" + global scheduled_registration_runner + + with scheduled_registration_lock: + schedule_cron = scheduled_registration.get("schedule_cron") or "*/30 * * * *" + payload = dict(scheduled_registration.get("payload") or {}) + scheduled_registration["enabled"] = False + scheduled_registration["next_run_at"] = None + + try: + _save_scheduled_registration_settings( + enabled=False, + schedule_cron=schedule_cron, + payload=payload, + ) + except Exception as e: + logger.error(f"保存定时注册设置失败: {e}") + raise HTTPException(status_code=500, detail=f"保存定时注册设置失败: {e}") + + runner = scheduled_registration_runner + if runner and not runner.done(): + runner.cancel() + try: + await runner + except asyncio.CancelledError: + pass + + logger.info("定时注册已停止") + return _get_scheduled_registration_status() + + @router.get("/tasks", response_model=TaskListResponse) async def list_tasks( page: int = Query(1, ge=1), diff --git a/static/js/app.js b/static/js/app.js index 543dc0b4..ca3f9b12 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -9,6 +9,7 @@ let currentBatch = null; let logPollingInterval = null; let batchPollingInterval = null; let accountsPollingInterval = null; +let scheduleStatusPollingInterval = null; let isBatchMode = false; let isOutlookBatchMode = false; let outlookAccounts = []; @@ -94,30 +95,39 @@ const elements = { autoUploadTm: document.getElementById('auto-upload-tm'), tmServiceSelectGroup: document.getElementById('tm-service-select-group'), tmServiceSelect: document.getElementById('tm-service-select'), + // 定时注册 + scheduleCronExpression: document.getElementById('schedule-cron-expression'), + scheduleStartBtn: document.getElementById('schedule-start-btn'), + scheduleStopBtn: document.getElementById('schedule-stop-btn'), + scheduleStatusText: document.getElementById('schedule-status-text'), }; // 初始化 -document.addEventListener('DOMContentLoaded', () => { +document.addEventListener('DOMContentLoaded', async () => { initEventListeners(); - loadAvailableServices(); + handleModeChange({ target: elements.regMode }); // 默认模式初始化(默认批量) + await loadAvailableServices(); + await initAutoUploadOptions(); + await loadScheduledRegistrationStatus(); + startScheduleStatusPolling(); loadRecentAccounts(); startAccountsPolling(); initVisibilityReconnect(); restoreActiveTask(); - initAutoUploadOptions(); }); // 初始化注册后自动操作选项(CPA / Sub2API / TM) async function initAutoUploadOptions() { await Promise.all([ - loadServiceSelect('/cpa-services?enabled=true', elements.cpaServiceSelect, elements.autoUploadCpa, elements.cpaServiceSelectGroup), + loadServiceSelect('/cpa-services?enabled=true', elements.cpaServiceSelect, elements.autoUploadCpa, elements.cpaServiceSelectGroup, { defaultCheckedWhenAvailable: true }), loadServiceSelect('/sub2api-services?enabled=true', elements.sub2apiServiceSelect, elements.autoUploadSub2api, elements.sub2apiServiceSelectGroup), loadServiceSelect('/tm-services?enabled=true', elements.tmServiceSelect, elements.autoUploadTm, elements.tmServiceSelectGroup), ]); } // 通用:构建自定义多选下拉组件并处理联动 -async function loadServiceSelect(apiPath, container, checkbox, selectGroup) { +async function loadServiceSelect(apiPath, container, checkbox, selectGroup, options = {}) { + const defaultCheckedWhenAvailable = Boolean(options.defaultCheckedWhenAvailable); if (!checkbox || !container) return; let services = []; try { @@ -126,11 +136,21 @@ async function loadServiceSelect(apiPath, container, checkbox, selectGroup) { if (!services || services.length === 0) { checkbox.disabled = true; + checkbox.checked = false; checkbox.title = '请先在设置中添加对应服务'; const label = checkbox.closest('label'); if (label) label.style.opacity = '0.5'; container.innerHTML = '
暂无可用服务
'; + if (selectGroup) selectGroup.style.display = 'none'; } else { + checkbox.disabled = false; + checkbox.title = ''; + const label = checkbox.closest('label'); + if (label) label.style.opacity = '1'; + if (defaultCheckedWhenAvailable) { + checkbox.checked = true; + } + const items = services.map(s => `