Skip to content

Latest commit

 

History

History
822 lines (703 loc) · 27.2 KB

File metadata and controls

822 lines (703 loc) · 27.2 KB

系统技术实现

最后更新:2026-04-16 | 模块:系统技术实现

架构图与流水线详见 ../architecture.md,接口定义详见 ../CODE_WIKI.md

返回文档索引


1. 系统总体架构

1.1 处理流水线

Inbound → Chunker → Arranger → Canonicalizer → CacheInjector → Hasher → Dedup → Outbound → Proxy

完整架构图与状态机详见 ../architecture.md

1.2 MVP单体 vs 微服务演进

MVP阶段采用单体架构,所有模块编译为单一二进制文件。通过内部接口解耦,后期可按需拆分为独立微服务。

MVP (单体)                    演进 (微服务)
┌──────────────────┐         ┌────────────┐ ┌────────────┐
│  token-router    │         │  gateway   │ │  billing   │
│  (单二进制)      │  ───>   │  service   │ │  service   │
│                  │         ├────────────┤ ├────────────┤
│  - inbound       │         │  provider  │ │  cache     │
│  - chunker       │         │  service   │ │  service   │
│  - arranger      │         ├────────────┤ ├────────────┤
│  - canonicalizer │         │  monitor   │ │  auth      │
│  - cacheinject   │         │  service   │ │  service   │
│  - outbound      │         └────────────┘ └────────────┘
│  - dedup         │
│  - billing       │
│  - monitor       │
└──────────────────┘

拆分优先级:认证服务 > 计费服务 > 缓存服务 > 监控服务。


2. 技术栈选型

层级 技术 版本 选型理由
后端语言 Go 1.24+ 高并发、单二进制、编译型语言
Web框架 Gin - 轻量高性能、中间件生态完善
ORM GORM - Go主流ORM、自动迁移
数据库 PostgreSQL 16 ACID、JSON支持、成熟生态
缓存 Redis 7 高性能KV存储、Pub/Sub
时序扩展 TimescaleDB - PostgreSQL原生扩展、时序查询优化
消息队列 NATS - 轻量高性能、Go原生集成
容器化 Docker - 标准化部署
编排 Kubernetes - 弹性伸缩、服务发现
监控 Prometheus + Grafana - 云原生监控标准
日志 Zap + Loki - 结构化日志 + 日志聚合
CI/CD GitHub Actions - 与代码仓库集成

3. 项目工程结构

tokenrouter/
├── cmd/
│   └── server/
│       └── main.go                 # 入口:初始化配置、数据库、路由、启动服务
├── internal/
│   ├── server/
│   │   └── pipeline.go             # ChatPipeline 与生产 chat handler
│   ├── inbound/
│   │   ├── adapter.go              # InboundAdapter 接口
│   │   ├── registry.go             # 入站适配器注册表
│   │   └── openai.go               # OpenAI 入站适配器
│   ├── envelope/
│   │   └── envelope.go             # Envelope / Message / Tool 定义
│   ├── block/
│   │   └── block.go                # Block 定义与类型常量
│   ├── chunker/
│   │   └── chunker.go              # 静态分块器
│   ├── arranger/
│   │   └── arranger.go             # 排列器:System 合并 / Tool 排序 / History 截断
│   ├── canonicalizer/
│   │   └── canonicalizer.go        # 序列化规范器:确定性 JSON 输出
│   ├── cacheinject/
│   │   ├── engine.go               # 缓存注入引擎入口
│   │   ├── injector.go             # Injector 接口
│   │   ├── registry.go             # 注入器注册中心
│   │   ├── openai.go               # OpenAI / DeepSeek 透传策略(MVP v0.1)
│   │   └── anthropic.go            # Anthropic cache_control 注入器(MVP stub)
│   ├── hasher/
│   │   └── hasher.go               # PrefixHash / FullHash 计算
│   ├── dedup/
│   │   └── dedup.go                # 非流式请求去重器
│   ├── observer/
│   │   └── observer.go             # 流量观测(Phase 2 预留,当前 NoopObserver)
│   ├── outbound/
│   │   ├── adapter.go              # OutboundAdapter 接口
│   │   ├── registry.go             # 出站适配器注册表
│   │   ├── deepseek/
│   │   │   └── deepseek.go         # DeepSeek 出站适配器(MVP v0.1,OpenAI 兼容)
│   │   ├── openai/
│   │   │   └── openai.go           # OpenAI 出站适配器(预留 stub)
│   │   └── anthropic/
│   │       └── anthropic.go        # Anthropic 出站适配器(预留 stub)
│   ├── proxy/
│   │   ├── proxy.go                # 非流式 HTTP 转发(Forward)
│   │   ├── stream.go               # SSE 流式代理(ProxyStream)
│   │   └── pool.go                 # 连接池管理
│   ├── billing/
│   │   ├── token_counter.go        # TokenCounts / CostBreakdown 定义
│   │   ├── price_engine.go         # 价格计算引擎
│   │   ├── pricing_table.go        # 模型定价表(内存缓存)
│   │   └── quota.go                # 配额管理(GORM + 内存缓存)
│   ├── usage/
│   │   └── recorder.go             # 同步/异步 usage 写入器
│   ├── monitor/
│   │   ├── metrics.go              # Prometheus 指标注册
│   │   └── collector.go            # 指标采集辅助函数
│   ├── middleware/
│   │   ├── auth.go                 # API Key 认证中间件
│   │   ├── ratelimit.go            # 令牌桶限流中间件(内存)
│   │   ├── cors.go                 # CORS 中间件
│   │   └── logging.go              # 请求日志中间件
│   ├── model/
│   │   ├── user.go                 # User 模型
│   │   ├── api_key.go              # APIKey 模型
│   │   ├── request.go              # Request 模型
│   │   ├── cache_stats.go          # CacheStats 模型
│   │   ├── model_pricing.go        # ModelPricing 模型
│   │   └── daily_usage.go          # DailyUsage 模型
│   └── admin/
│       ├── handler.go              # Admin API handlers
│       └── models.go               # Admin request/response DTOs
├── pkg/
│   ├── config/
│   │   └── config.go               # 配置加载(.env + 环境变量)
│   ├── logger/
│   │   └── logger.go               # Zap 日志封装
│   ├── httputil/
│   │   └── client.go               # HTTP 客户端工具
│   └── crypto/
│       └── hash.go                 # API Key 哈希工具
├── tests/
│   ├── integration/
│   │   └── forward_test.go         # httptest mock-based 集成测试
│   └── e2e/
│       ├── helper_test.go          # pipeline 组装 + 公共断言
│       ├── fast_test.go            # Fast Suite(//go:build e2e)
│       └── full_test.go            # Full Suite(//go:build e2e_full)
├── migrations/
│   ├── 000_create_users.up.sql
│   ├── 001_create_api_keys.up.sql
│   ├── 002_create_requests.up.sql
│   ├── 003_create_cache_stats.up.sql
│   ├── 004_create_model_pricing.up.sql
│   ├── 005_create_daily_usage.up.sql
│   ├── 006_seed_pricing.up.sql
│   └── 006_seed_pricing.down.sql
├── deployments/
│   ├── docker/
│   │   └── Dockerfile
│   └── docker-compose.yml
├── .env.example
├── go.mod
├── go.sum
└── Makefile

4. API网关层

4.1 OpenAI统一接口

对外暴露OpenAI兼容接口,客户端无需修改即可接入。

端点 方法 说明
/v1/chat/completions POST 聊天补全(流式/非流式)
/v1/models GET 可用模型列表
/health GET 健康检查

4.2 请求处理流程

完整流水线与状态机详见 ../architecture.md

4.3 Gin路由注册

// cmd/server/main.go 中的实际路由注册

engine.GET("/health", func(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
engine.GET("/metrics", gin.WrapH(promhttp.Handler()))
engine.GET("/v1/models", modelHandler.ListModels)

v1Group := engine.Group("/v1")
v1Group.Use(middleware.AuthMiddlewareWithStore(authStore))
if cfg.RateLimitEnabled {
    v1Group.Use(middleware.RateLimitMiddleware(rateLimiter))
}
v1Group.POST("/chat/completions", pipeline.Handle)

adminGroup := engine.Group("/admin")
adminGroup.Use(middleware.AuthMiddlewareWithStore(authStore))

4.4 认证中间件

type AuthStore interface {
    LookupAPIKey(ctx context.Context, keyHash string) (APIKeyIdentity, error)
}

func AuthMiddlewareWithStore(store AuthStore) gin.HandlerFunc {
    return func(c *gin.Context) {
        authHeader := c.GetHeader("Authorization")
        // ... parse "Bearer <key>" ...
        apiKey := parts[1]

        // Full hash lookup avoids collisions in the display prefix.
        keyHash := crypto.HashKey(apiKey)
        identity, err := store.LookupAPIKey(c.Request.Context(), keyHash)
        if err != nil {
            c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid api key"})
            return
        }
        if len(identity.KeyHash) != len(keyHash) ||
            subtle.ConstantTimeCompare([]byte(identity.KeyHash), []byte(keyHash)) != 1 {
            c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid api key"})
            return
        }

        // Set user/key IDs in both Gin context and request context
        c.Set(string(UserIDKey), identity.UserID)
        c.Set(string(APIKeyIDKey), identity.APIKeyID)
        ctx := context.WithValue(c.Request.Context(), UserIDKey, identity.UserID)
        ctx = context.WithValue(ctx, APIKeyIDKey, identity.APIKeyID)
        c.Request = c.Request.WithContext(ctx)
        c.Next()
    }
}

Production wiring uses NewCachedAuthStore(NewDBAuthStore(db), cfg.AuthCacheTTL). Only successful lookups are cached, so newly-created API keys are not blocked by negative cache entries. Revoked keys may remain accepted until the positive cache TTL expires.

4.5 限流中间件

基于内存的窗口计数限流,按 User ID(未认证时按 IP)限制请求速率。请求路径只读取和更新当前 key 的 bucket;过期 bucket 由后台 janitor 定期清理,避免用户数升高时每次请求都扫描全量 bucket。

type RateLimiter struct {
    buckets map[string]*bucket
    mu      sync.Mutex
    limit   int
    window  time.Duration
}

func NewRateLimiter(limit int, window time.Duration) *RateLimiter

func (rl *RateLimiter) Allow(key string) bool

func (rl *RateLimiter) Stop()

func RateLimitMiddleware(limiter *RateLimiter) gin.HandlerFunc {
    return func(c *gin.Context) {
        userID := GetUserID(c.Request.Context())
        if userID == "" {
            userID = c.ClientIP()
        }
        if !limiter.Allow(userID) {
            c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
                "error": "rate limit exceeded",
            })
            return
        }
        c.Next()
    }
}

5. 流式代理层

5.1 SSE流式代理核心

func (p *StreamProxy) ProxyStream(
    ctx context.Context,
    w http.ResponseWriter,
    method string,
    url string,
    headers map[string]string,
    body []byte,
) error {
    req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(body))
    // ... set headers ...

    resp, err := p.client.Do(req)
    // ... error handling ...
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        // Forward upstream status/body to the client, then return UpstreamStatusError
        // so callers can log/measure the failure.
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    flusher, ok := w.(http.Flusher)

    reader := bufio.NewReader(resp.Body)
    for {
        line, err := reader.ReadBytes('\n')
        if err != nil {
            if err == io.EOF {
                fmt.Fprintf(w, "data: [DONE]\n\n")
                flusher.Flush()
                return nil
            }
            if ctx.Err() != nil {
                return fmt.Errorf("proxy: stream context done: %w", ctx.Err())
            }
            return fmt.Errorf("proxy: read stream: %w", err)
        }
        w.Write(line)
        flusher.Flush()
    }
}

ChatPipeline.Handle logs every ProxyStream error. If the stream proxy failed before writing any response headers or body, the handler returns 502 with a JSON error body. If the proxy already forwarded an upstream status/body or started an SSE stream, the handler does not overwrite the response.

5.2 连接池管理

type ConnPool struct {
    pools map[string]*http.Client // key: provider baseURL
    mu    sync.RWMutex
}

func NewConnPool() *ConnPool {
    return &ConnPool{
        pools: make(map[string]*http.Client),
    }
}

func (p *ConnPool) Get(baseURL string) *http.Client {
    p.mu.RLock()
    client, ok := p.pools[baseURL]
    p.mu.RUnlock()
    if ok {
        return client
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    client = &http.Client{
        Transport: &http.Transport{
            MaxIdleConns:        100,
            MaxIdleConnsPerHost: 20,
            IdleConnTimeout:     90 * time.Second,
            TLSHandshakeTimeout: 10 * time.Second,
        },
        Timeout: 5 * time.Minute, // 流式请求长超时
    }
    p.pools[baseURL] = client
    return client
}

6. 数据库设计

6.1 核心表结构

users 表

CREATE TABLE users (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email         VARCHAR(255) UNIQUE NOT NULL,
    name          VARCHAR(255) NOT NULL,
    plan          VARCHAR(50) NOT NULL DEFAULT 'free', -- free/pro/enterprise
    quota_limit   BIGINT NOT NULL DEFAULT 1000000,     -- 月度Token配额
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

api_keys 表

CREATE TABLE api_keys (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id       UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    name          VARCHAR(100) NOT NULL,
    key_hash      VARCHAR(64) UNIQUE NOT NULL,         -- SHA-256哈希,认证按该字段查询
    key_prefix    VARCHAR(8) NOT NULL,                  -- 显示用前缀,不参与认证查找
    revoked       BOOLEAN NOT NULL DEFAULT FALSE,
    rate_limit    INT NOT NULL DEFAULT 60,              -- 每分钟请求限制
    expires_at    TIMESTAMPTZ,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_api_keys_user ON api_keys(user_id);

requests 表

CREATE TABLE requests (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id       UUID NOT NULL REFERENCES users(id),
    api_key_id    UUID NOT NULL REFERENCES api_keys(id),
    model         VARCHAR(100) NOT NULL,
    provider      VARCHAR(50) NOT NULL,
    prompt_tokens INT NOT NULL DEFAULT 0,
    completion_tokens INT NOT NULL DEFAULT 0,
    cache_read_tokens INT NOT NULL DEFAULT 0,
    cache_write_tokens INT NOT NULL DEFAULT 0,
    cost_usd      DECIMAL(10, 6) NOT NULL DEFAULT 0,
    latency_ms    INT NOT NULL DEFAULT 0,
    status        VARCHAR(20) NOT NULL DEFAULT 'success',
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_requests_user_created ON requests(user_id, created_at);
CREATE INDEX idx_requests_model ON requests(model);

cache_stats 表

CREATE TABLE cache_stats (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id       UUID NOT NULL REFERENCES users(id),
    model         VARCHAR(100) NOT NULL,
    provider      VARCHAR(50) NOT NULL,
    hit_count     INT NOT NULL DEFAULT 0,
    miss_count    INT NOT NULL DEFAULT 0,
    saved_tokens  BIGINT NOT NULL DEFAULT 0,
    saved_usd     DECIMAL(10, 6) NOT NULL DEFAULT 0,
    date          DATE NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(user_id, model, provider, date)
);

model_pricing 表

CREATE TABLE model_pricing (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    model         VARCHAR(100) UNIQUE NOT NULL,
    provider      VARCHAR(50) NOT NULL,
    prompt_price  DECIMAL(10, 6) NOT NULL,   -- 每百万Token价格
    completion_price DECIMAL(10, 6) NOT NULL,
    cache_read_price  DECIMAL(10, 6) NOT NULL DEFAULT 0,
    cache_write_price DECIMAL(10, 6) NOT NULL DEFAULT 0,
    effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

daily_usage 表(TimescaleDB超表)

CREATE TABLE daily_usage (
    time          TIMESTAMPTZ NOT NULL,
    user_id       UUID NOT NULL,
    model         VARCHAR(100) NOT NULL,
    provider      VARCHAR(50) NOT NULL,
    prompt_tokens BIGINT NOT NULL DEFAULT 0,
    completion_tokens BIGINT NOT NULL DEFAULT 0,
    cache_read_tokens BIGINT NOT NULL DEFAULT 0,
    request_count INT NOT NULL DEFAULT 0,
    cost_usd      DECIMAL(10, 6) NOT NULL DEFAULT 0,
    cache_hit_count INT NOT NULL DEFAULT 0
);

SELECT create_hypertable('daily_usage', 'time', chunk_time_interval => INTERVAL '1 day');

CREATE INDEX idx_daily_usage_user ON daily_usage(user_id, time DESC);

6.2 Redis数据结构

Key模式 类型 用途 TTL
ratelimit:{user_id} STRING (计数器) 用户限流 60s
cache:config:{model} Hash 模型缓存策略配置 30min
cache:stats:{user_id}:{date} Hash 用户当日缓存统计 24h
dedup:inflight:{hash} String 正在处理中的请求标记 2min
provider:health:{name} String 供应商健康状态 30s
quota:{user_id} Hash 用户配额余量 -

7. 计费与配额

7.1 Token计量

type TokenUsage struct {
    PromptTokens     int `json:"prompt_tokens"`
    CompletionTokens int `json:"completion_tokens"`
    CacheReadTokens  int `json:"cache_read_tokens"`
    CacheWriteTokens int `json:"cache_write_tokens"`
}

func CountTokens(resp *outbound.Response) TokenUsage {
    if resp.Usage == nil {
        return TokenUsage{}
    }
    return TokenUsage{
        PromptTokens:     resp.Usage.PromptTokens,
        CompletionTokens: resp.Usage.CompletionTokens,
        CacheReadTokens:  resp.Usage.CacheReadTokens,
        CacheWriteTokens: resp.Usage.CacheWriteTokens,
    }
}

7.2 价格计算引擎

type PriceEngine struct {
    table *PricingTable
}

func NewPriceEngine(table *PricingTable) *PriceEngine

func (e *PriceEngine) Calculate(modelName string, counts TokenCounts) (CostBreakdown, error) {
    pricing, err := e.table.Get(modelName)
    if err != nil {
        return CostBreakdown{}, fmt.Errorf("billing: %w", err)
    }
    cb := CostBreakdown{
        PromptCost:     float64(counts.PromptTokens) * pricing.PromptPrice,
        CompletionCost: float64(counts.CompletionTokens) * pricing.CompletionPrice,
        CacheReadCost:  float64(counts.CacheReadTokens) * pricing.CacheReadPrice,
        CacheWriteCost: float64(counts.CacheWriteTokens) * pricing.CacheWritePrice,
    }
    cb.TotalCost = cb.PromptCost + cb.CompletionCost + cb.CacheReadCost + cb.CacheWriteCost
    return cb, nil
}

7.3 三级配额

当前 MVP 的配额检查是 soft quota:CheckQuota 按月读取已记录用量后判断是否放行,不做并发原子预扣。月度 usage 口径为 prompt_tokens + completion_tokens + cache_read_tokens + cache_write_tokens,与计费 token 口径保持一致。

请求完成后的 usage/billing 记录通过 internal/usage.Recorder 写入。生产默认使用 AsyncRecorder:请求路径只把 model.Request 投递到有界队列,后台 worker 按 USAGE_BATCH_SIZEUSAGE_FLUSH_INTERVAL 批量写入 requests 表。队列满时 handler 返回 503,避免账单事件静默丢失;USAGE_ASYNC_ENABLED=false 时退回同步写库。

级别 月度Token配额 速率限制 价格倍率
Free 1,000,000 20 req/min 1.0x
Pro 50,000,000 120 req/min 0.8x
Enterprise 自定义 自定义 协商定价

7.4 模型定价表示例

模型 Provider Prompt ($/MTok) Completion ($/MTok) Cache Read ($/MTok) 状态
deepseek-chat DeepSeek 0.14 0.28 0.014 MVP v0.1 已接入
gpt-4o OpenAI 2.50 10.00 1.25 预留,Phase 1.1 接入
gpt-4o-mini OpenAI 0.15 0.60 0.075 预留,Phase 1.1 接入
claude-sonnet-4-20250514 Anthropic 3.00 15.00 0.30 预留,Phase 1.1 接入
claude-haiku-4-20250414 Anthropic 0.80 4.00 0.08 预留,Phase 1.1 接入

8. 监控与可观测性

8.1 Prometheus指标(10项)

指标名 类型 标签 说明
tokenrouter_requests_total Counter model, provider, status 请求总数
tokenrouter_request_duration_seconds Histogram model, provider 请求延迟分布
tokenrouter_tokens_total Counter model, type Token消耗量
tokenrouter_cache_hits_total Counter model, provider 缓存命中次数
tokenrouter_cache_misses_total Counter model, provider 缓存未命中次数
tokenrouter_cost_usd_total Counter model 累计费用
tokenrouter_active_streams Gauge provider 当前活跃流式连接数
tokenrouter_provider_errors_total Counter provider, error_type 供应商错误数
tokenrouter_quota_usage_percent Gauge plan 配额使用百分比
tokenrouter_dedup_hits_total Counter model 去重复用次数

8.2 监控面板(5个)

面板 核心图表
系统概览 QPS、P50/P95/P99延迟、错误率、活跃连接数
缓存性能 缓存命中率、节省Token数、节省金额
计费统计 实时费用、按模型费用分布、用户费用Top10
供应商健康 各供应商QPS、错误率、延迟对比
资源使用 CPU/内存/磁盘、连接池使用率、Goroutine数

8.3 告警规则(5条)

规则 条件 级别
高错误率 error_rate > 5% 持续5分钟 Critical
缓存命中率下降 cache_hit_rate < 50% 持续15分钟 Warning
供应商不可用 provider_error_rate > 50% 持续2分钟 Critical
配额即将耗尽 quota_usage > 90% Warning
去重异常 dedup_wait_timeout > 1% 持续10分钟 Warning

9. 部署与运维

9.1 Docker多阶段构建

# 构建阶段
FROM golang:1.24-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o tokenrouter ./cmd/server

# 运行阶段
FROM alpine:3.19
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /app
COPY --from=builder /app/tokenrouter .
COPY --from=builder /app/migrations ./migrations
COPY --from=builder /app/.env.example .env.example

EXPOSE 8080
CMD ["./tokenrouter"]

9.2 docker-compose

version: '3.8'
services:
  tokenrouter:
    build:
      context: ..
      dockerfile: deployments/docker/Dockerfile
    ports:
      - "8080:8080"
    environment:
      DATABASE_URL: postgres://tokenrouter:tokenrouter@postgres:5432/tokenrouter?sslmode=disable
      REDIS_URL: redis://redis:6379/0
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: tokenrouter
      POSTGRES_USER: tokenrouter
      POSTGRES_PASSWORD: tokenrouter
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U tokenrouter"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    volumes:
      - redisdata:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./deployments/prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafanadata:/var/lib/grafana

volumes:
  pgdata:
  redisdata:
  grafanadata:

9.3 Kubernetes部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: token-router
spec:
  replicas: 3
  selector:
    matchLabels:
      app: token-router
  template:
    metadata:
      labels:
        app: token-router
    spec:
      containers:
        - name: token-router
          image: tokenrouter/server:latest
          ports:
            - containerPort: 8080
          envFrom:
            - configMapRef:
                name: token-router-config
            - secretRef:
                name: token-router-secrets
          resources:
            requests:
              cpu: 250m
              memory: 256Mi
            limits:
              cpu: "1"
              memory: 512Mi
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 15
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: token-router-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: token-router
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: tokenrouter_active_streams
        target:
          type: AverageValue
          averageValue: "500"

9.4 CI/CD流水线

# .github/workflows/ci.yml
name: CI/CD
on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-go@v5
        with:
          go-version: '1.24'
      - run: go test ./... -race -coverprofile=coverage.out
      - run: go vet ./...

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Build Docker image
        run: docker build -t tokenrouter/server:${{ github.sha }} .

  deploy:
    needs: build
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to K8s
        run: |
          kubectl set image deployment/token-router \
            token-router=tokenrouter/server:${{ github.sha }}