diff --git a/docs/architecture/Worker-Gateway-Design.md b/docs/architecture/Worker-Gateway-Design.md index a6f4a8cd..e11d00b1 100644 --- a/docs/architecture/Worker-Gateway-Design.md +++ b/docs/architecture/Worker-Gateway-Design.md @@ -241,7 +241,8 @@ CREATE TABLE sessions ( | ------- | ----------------------------------------------- | ---------------------------- | | Session | `IDLE` 超过 `idle_timeout`(默认 60min) | → `TERMINATED`,清理 runtime | | Session | 总存活超过 `max_lifetime`(默认 24h) | → `TERMINATED` | -| Session | `TERMINATED` 超过 `retention_period`(默认 7d) | → `DELETED`(删除 DB 记录) | +| Session | `TERMINATED` cron 类超过 `cron_term_retention`(默认 24h) | → `DELETED`(删除 DB 记录) | +| Session | `TERMINATED` 其他超过 `term_retention`(默认 7d) | → `DELETED`(删除 DB 记录) | GC 扫描间隔:60s,后台 goroutine 定期执行。 diff --git a/docs/explanation/session-lifecycle.md b/docs/explanation/session-lifecycle.md index bd71082a..3afc6545 100644 --- a/docs/explanation/session-lifecycle.md +++ b/docs/explanation/session-lifecycle.md @@ -184,7 +184,13 @@ IDLE session 的 idle_expires_at = entered_idle_at + IdleTimeout。 到期后 TERMINATED,回收暂停的 Worker 进程。 ``` -**特别注意**:GC 不执行物理删除(`DELETE FROM sessions`)。TERMINATED 的记录保留作为"resume 决策标志"——它们的存在告诉 Gateway 可以尝试 `--resume` 恢复对话历史。 +**第 3 层:TERMINATED 记录清理** +``` +TERMINATED session 按 source 差异化保留: + cron 类:updated_at ≤ now - cron_term_retention(默认 24h)→ DELETE + 其他:updated_at ≤ now - term_retention(默认 7d)→ DELETE +``` +TERMINATED 记录在保留期内作为"resume 决策标志"——它们的存在告诉 Gateway 可以尝试 `--resume` 恢复对话历史。 ### Fast Reconnect 优化 diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 874e52a1..d6d8630d 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -182,6 +182,8 @@ Session 生命周期管理配置。 | 字段 | 类型 | 默认值 | 环境变量 | 说明 | |------|------|--------|----------|------| | `retention_period` | duration | `168h` (7天) | `HOTPLEX_SESSION_RETENTION_PERIOD` | 事件和日志的数据保留期。过期数据由 GC 扫描清理 | +| `term_retention` | duration | `168h` (7天) | `HOTPLEX_SESSION_TERM_RETENTION` | 普通 TERMINATED session 数据库记录保留期。过期后物理删除 | +| `cron_term_retention` | duration | `24h` | `HOTPLEX_SESSION_CRON_TERM_RETENTION` | Cron 类 TERMINATED session 数据库记录保留期。过期后物理删除 | | `gc_scan_interval` | duration | `1m` | — | GC 扫描间隔。定期扫描过期 Session 并清理 | | `max_concurrent` | int | `1000` | `HOTPLEX_SESSION_MAX_CONCURRENT` | 最大并发 Session 数。达到上限后新请求被拒绝 | diff --git a/docs/reference/glossary.md b/docs/reference/glossary.md index 02d6753e..b47c6928 100644 --- a/docs/reference/glossary.md +++ b/docs/reference/glossary.md @@ -97,7 +97,7 @@ Session 垃圾回收机制(`internal/session/manager.go`)。定期扫描( - **IDLE 超时**:`idle_expires_at ≤ now` → TERMINATED - **最大生命周期**:`expires_at ≤ now` → TERMINATED - **僵尸检测**:RUNNING session 的 `LastIO()` 超过 execution_timeout(默认 30 分钟)→ TERMINATED -- **保留期清理**:TERMINATED session 的 `updated_at ≤ now - retention_period` → DELETE +- **保留期清理**:TERMINATED session 按 source 差异化保留:cron 类保留 24h、其他保留 7d(`cron_term_retention` / `term_retention`),过期后 `DELETE FROM sessions` --- diff --git a/docs/specs/Turn-Summary-Spec.md b/docs/specs/Turn-Summary-Spec.md index 95823f21..b910e4bc 100644 --- a/docs/specs/Turn-Summary-Spec.md +++ b/docs/specs/Turn-Summary-Spec.md @@ -217,7 +217,7 @@ type TurnSummaryData struct { TotalCostUSD float64 // 额外实现字段 - ContextFill int64 + ContextFill int64 // 仅从控制通道获取,消除多步 turn 累计膨胀 TotalOutputTok int64 SessionDuration string WorkDir string diff --git a/e2e/helper_test.go b/e2e/helper_test.go index 182d6d75..ee6fde45 100644 --- a/e2e/helper_test.go +++ b/e2e/helper_test.go @@ -258,8 +258,8 @@ func (m *mockStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]string return args.Get(0).([]string), args.Error(1) } -func (m *mockStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error { - args := m.Called(ctx, cutoff) +func (m *mockStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error { + args := m.Called(ctx, cronCutoff, defaultCutoff) return args.Error(0) } @@ -320,7 +320,7 @@ func setupTestGateway(t *testing.T) *testGateway { store.On("Close").Return(nil) store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string{}, nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string{}, nil) - store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")).Return(nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) store.On("List", mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int"), mock.AnythingOfType("int")).Return([]*session.SessionInfo{}, nil) // Get falls back to store when session is not in Manager's in-memory map. // Return not-found for all store lookups (Manager holds sessions in memory after Create). diff --git a/internal/config/config.go b/internal/config/config.go index 780d663c..847e0d75 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -503,9 +503,11 @@ type SecurityConfig struct { // SessionConfig holds session lifecycle settings. type SessionConfig struct { - RetentionPeriod time.Duration `mapstructure:"retention_period"` - GCScanInterval time.Duration `mapstructure:"gc_scan_interval"` - MaxConcurrent int `mapstructure:"max_concurrent"` + RetentionPeriod time.Duration `mapstructure:"retention_period"` // max session lifetime (default 7d) + GCScanInterval time.Duration `mapstructure:"gc_scan_interval"` // GC scan interval (default 1m) + MaxConcurrent int `mapstructure:"max_concurrent"` // max concurrent sessions + TermRetention time.Duration `mapstructure:"term_retention"` // DB retention for terminated sessions (default 7d) + CronTermRetention time.Duration `mapstructure:"cron_term_retention"` // DB retention for terminated cron sessions (default 24h) } // PoolConfig holds session pool settings. @@ -611,9 +613,11 @@ func Default() *Config { AllowedOrigins: []string{"*"}, }, Session: SessionConfig{ - RetentionPeriod: 7 * 24 * time.Hour, - GCScanInterval: 1 * time.Minute, - MaxConcurrent: 1000, + RetentionPeriod: 7 * 24 * time.Hour, + GCScanInterval: 1 * time.Minute, + MaxConcurrent: 1000, + TermRetention: 7 * 24 * time.Hour, + CronTermRetention: 24 * time.Hour, }, Pool: PoolConfig{ MinSize: 0, @@ -818,6 +822,8 @@ func Load(filePath string) (*Config, error) { _ = v.BindEnv("admin.addr") _ = v.BindEnv("session.max_concurrent") _ = v.BindEnv("session.retention_period") + _ = v.BindEnv("session.term_retention") + _ = v.BindEnv("session.cron_term_retention") _ = v.BindEnv("pool.max_size") _ = v.BindEnv("pool.max_idle_per_user") _ = v.BindEnv("pool.max_memory_per_user") diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5d3796d7..a3fa2c92 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -21,6 +21,8 @@ func TestDefault(t *testing.T) { require.Equal(t, 5, cfg.Pool.MaxIdlePerUser) require.Equal(t, 7*24*time.Hour, cfg.Session.RetentionPeriod) require.Equal(t, 1*time.Minute, cfg.Session.GCScanInterval) + require.Equal(t, 7*24*time.Hour, cfg.Session.TermRetention) + require.Equal(t, 24*time.Hour, cfg.Session.CronTermRetention) require.False(t, cfg.Security.TLSEnabled) require.True(t, cfg.Admin.Enabled) require.Equal(t, "localhost:9999", cfg.Admin.Addr) diff --git a/internal/gateway/bridge_forward.go b/internal/gateway/bridge_forward.go index c19f2faf..60f76fe8 100644 --- a/internal/gateway/bridge_forward.go +++ b/internal/gateway/bridge_forward.go @@ -174,7 +174,7 @@ func (b *Bridge) forwardEvents(w worker.Worker, sessionID string, opts forwardOp acc.computePerTurnDeltas() // Query precise context usage from worker via control channel. - // Silently falls back to aggregated Done stats on failure. + // ContextFill stays 0 on failure — no inflated fallback. if cr, ok := w.(worker.ControlRequester); ok { fetchContextUsage(cr, acc) } diff --git a/internal/gateway/conn_test.go b/internal/gateway/conn_test.go index 9dad81ca..96914d7e 100644 --- a/internal/gateway/conn_test.go +++ b/internal/gateway/conn_test.go @@ -484,8 +484,8 @@ func (m *mockSessionStoreForBotID) GetExpiredIdle(ctx context.Context, now time. return args.Get(0).([]string), args.Error(1) } -func (m *mockSessionStoreForBotID) DeleteTerminated(ctx context.Context, cutoff time.Time) error { - args := m.Called(ctx, cutoff) +func (m *mockSessionStoreForBotID) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error { + args := m.Called(ctx, cronCutoff, defaultCutoff) return args.Error(0) } diff --git a/internal/gateway/ctrl_test.go b/internal/gateway/ctrl_test.go index 38397a45..c2db7403 100644 --- a/internal/gateway/ctrl_test.go +++ b/internal/gateway/ctrl_test.go @@ -57,8 +57,8 @@ func (m *mockStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]string return args.Get(0).([]string), args.Error(1) } -func (m *mockStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error { - args := m.Called(ctx, cutoff) +func (m *mockStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error { + args := m.Called(ctx, cronCutoff, defaultCutoff) return args.Error(0) } diff --git a/internal/gateway/session_stats.go b/internal/gateway/session_stats.go index ac47a303..f286a029 100644 --- a/internal/gateway/session_stats.go +++ b/internal/gateway/session_stats.go @@ -16,8 +16,8 @@ type sessionAccumulator struct { TotalCostUSD float64 TotalInput int64 // cumulative input tokens consumed across turns TotalOutput int64 - ContextWindow int64 // from modelUsage.contextWindow (0 = unknown) - ContextFill int64 // latest turn's input_tokens (total including cache, bounded by ContextWindow) + ContextWindow int64 // from modelUsage.contextWindow or get_context_usage.maxTokens (0 = unknown) + ContextFill int64 // context window fill from get_context_usage control channel (0 if unavailable) ModelName string // first model seen StartedAt time.Time WorkDir string // session working directory @@ -58,7 +58,6 @@ func (a *sessionAccumulator) mergePerTurnStats(data events.DoneData) { events.ToInt64(usage["cache_creation_input_tokens"]) + events.ToInt64(usage["cache_read_input_tokens"]) a.TotalInput += input - a.ContextFill = input a.TotalOutput += events.ToInt64(usage["output_tokens"]) a.TotalCacheWrite += events.ToInt64(usage["cache_creation_input_tokens"]) a.TotalCacheRead += events.ToInt64(usage["cache_read_input_tokens"]) @@ -68,7 +67,6 @@ func (a *sessionAccumulator) mergePerTurnStats(data events.DoneData) { events.ToInt64(tokens["cache_read"]) + events.ToInt64(tokens["cache_write"]) a.TotalInput += input - a.ContextFill = input a.TotalOutput += events.ToInt64(tokens["output"]) a.TotalCacheWrite += events.ToInt64(tokens["cache_write"]) a.TotalCacheRead += events.ToInt64(tokens["cache_read"]) @@ -125,10 +123,11 @@ func (a *sessionAccumulator) resetPerTurn() { a.PerTurnCacheWrite = 0 a.PerTurnCacheRead = 0 a.TurnDurationMs = 0 + a.ContextFill = 0 } -// mergeContextUsage updates ContextFill, ContextWindow, and ModelName from precise worker control data. -// Called after get_context_usage returns; overrides the aggregated Done event values. +// mergeContextUsage sets ContextFill, ContextWindow, and ModelName from the worker's +// get_context_usage control channel response. This is the sole source for ContextFill. // Supports partial data: updates ModelName even when MaxTokens is 0 (OCS scenario). func (a *sessionAccumulator) mergeContextUsage(cu *events.ContextUsageData) { if cu == nil { @@ -146,7 +145,7 @@ func (a *sessionAccumulator) mergeContextUsage(cu *events.ContextUsageData) { } // computeContextPct returns context window usage percentage (0-100). -// Data comes from get_context_usage control channel (precise) or Done event usage (fallback). +// Returns 0 if either ContextFill or ContextWindow is unset (control channel unavailable). func (a *sessionAccumulator) computeContextPct() float64 { if a.ContextWindow <= 0 || a.ContextFill <= 0 { return 0 diff --git a/internal/gateway/session_stats_test.go b/internal/gateway/session_stats_test.go index f1aa9ac1..badb4400 100644 --- a/internal/gateway/session_stats_test.go +++ b/internal/gateway/session_stats_test.go @@ -31,7 +31,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) { }) require.Equal(t, int64(23434), acc.TotalInput) - require.Equal(t, int64(23434), acc.ContextFill) + require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event usage") require.Equal(t, int64(3821), acc.TotalOutput) require.Equal(t, int64(200000), acc.ContextWindow) require.Equal(t, "Sonnet", acc.ModelName) @@ -53,7 +53,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) { }) require.Equal(t, int64(8400+2000+500), acc.TotalInput) - require.Equal(t, int64(8400+2000+500), acc.ContextFill) + require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event tokens") require.Equal(t, int64(3634), acc.TotalOutput) require.InDelta(t, 0.0234, acc.TotalCostUSD, 0.0001) }) @@ -80,13 +80,13 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) { }, }, }) - require.Equal(t, int64(90000), acc.ContextFill, "ContextFill = input + cache_creation + cache_read") + require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event usage") require.Equal(t, int64(90000), acc.TotalInput, "TotalInput = input + cache_creation + cache_read") pct := acc.computeContextPct() - require.Equal(t, 45.0, pct, "context % must be 90000/200000 = 45%%") + require.Equal(t, 0.0, pct, "context % must be 0 when ContextFill is not set from control channel") }) - t.Run("context fill overwritten by latest turn", func(t *testing.T) { + t.Run("total input accumulates across turns, context fill only from control channel", func(t *testing.T) { acc := &sessionAccumulator{StartedAt: time.Now()} // Turn 1 @@ -104,10 +104,18 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) { "total_cost_usd": 0.05, }, }) - require.Equal(t, int64(10000), acc.ContextFill) + require.Equal(t, int64(0), acc.ContextFill, "ContextFill not set by mergePerTurnStats") require.Equal(t, int64(10000), acc.TotalInput) - // Turn 2: smaller input — ContextFill reflects latest, TotalInput accumulates + // Simulate get_context_usage override for Turn 1 + acc.mergeContextUsage(&events.ContextUsageData{TotalTokens: 58000, MaxTokens: 200000}) + require.Equal(t, int64(58000), acc.ContextFill, "ContextFill set by mergeContextUsage") + + // resetPerTurn clears ContextFill + acc.resetPerTurn() + require.Equal(t, int64(0), acc.ContextFill, "ContextFill cleared by resetPerTurn") + + // Turn 2: smaller input — TotalInput accumulates, ContextFill stays 0 until control channel acc.mergePerTurnStats(events.DoneData{ Stats: map[string]any{ "usage": map[string]any{ @@ -118,7 +126,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) { }, }) - require.Equal(t, int64(5000), acc.ContextFill) // latest turn only + require.Equal(t, int64(0), acc.ContextFill) // not set from Done event require.Equal(t, int64(15000), acc.TotalInput) // cumulative require.Equal(t, int64(3000), acc.TotalOutput) require.Equal(t, int64(200000), acc.ContextWindow) diff --git a/internal/session/manager.go b/internal/session/manager.go index cfa444de..3515b1a9 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -60,6 +60,11 @@ type Manager struct { // After this duration, they are evicted from the in-memory map (DB records preserved). const terminatedSessionTTL = 24 * time.Hour +// Session source constants — used for differential DB retention. +const ( + SourceCron = "cron" // cron-triggered session (24h retention) +) + // runningIndex helpers — use riMu (independent of m.mu/ms.mu) to avoid lock ordering issues. func (m *Manager) addToRunningIndex(id string) { @@ -137,6 +142,9 @@ type SessionInfo struct { // Title is the user-facing session name. Used as DeriveSessionKey input for WebChat sessions. // Empty for Slack/Feishu sessions (they use DerivePlatformSessionKey instead). Title string `json:"title,omitempty"` + // Source identifies the session origin: "" (user-initiated) or "cron" (cron-triggered). + // Used for differential DB retention — cron sessions are cleaned up after 24h vs 7d for normal. + Source string `json:"source,omitempty"` } // NewManager creates a new session manager using the provided Store. @@ -175,6 +183,10 @@ func (m *Manager) Create(ctx context.Context, id, userID string, workerType work // CreateWithBot creates a new session with explicit bot_id and persists it to SQLite. func (m *Manager) CreateWithBot(ctx context.Context, id, userID, botID string, workerType worker.WorkerType, allowedTools []string, platform string, platformKey map[string]string, workDir, title string) (*SessionInfo, error) { now := time.Now() + source := "" + if _, isCron := platformKey["cron_job_id"]; isCron { + source = SourceCron + } info := &SessionInfo{ ID: id, UserID: userID, @@ -189,6 +201,7 @@ func (m *Manager) CreateWithBot(ctx context.Context, id, userID, botID string, w PlatformKey: platformKey, WorkDir: workDir, Title: title, + Source: source, } if err := m.store.Upsert(ctx, info); err != nil { @@ -1067,6 +1080,17 @@ func (m *Manager) gc(ctx context.Context) { m.log.Info("session: gc evicted TERMINATED sessions from memory", "count", evicted, "ttl", terminatedSessionTTL) } + // 4. Delete old TERMINATED sessions from DB with source-based retention. + // Cron sessions: CronTermRetention, normal sessions: TermRetention. Events are not cascaded. + cfg := m.cfg + if m.cfgStore != nil { + cfg = m.cfgStore.Load() + } + cronCutoff := now.Add(-cfg.Session.CronTermRetention) + defaultCutoff := now.Add(-cfg.Session.TermRetention) + if err := m.store.DeleteTerminated(ctx, cronCutoff, defaultCutoff); err != nil { + m.log.Error("session: gc (delete_terminated) failed", "err", err) + } } // ─── Helpers ───────────────────────────────────────────────────────────────── diff --git a/internal/session/manager_test.go b/internal/session/manager_test.go index 74f55544..dcb6fec8 100644 --- a/internal/session/manager_test.go +++ b/internal/session/manager_test.go @@ -54,8 +54,8 @@ func (m *mockStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]string return args.Get(0).([]string), args.Error(1) } -func (m *mockStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error { - args := m.Called(ctx, cutoff) +func (m *mockStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error { + args := m.Called(ctx, cronCutoff, defaultCutoff) return args.Error(0) } @@ -1433,7 +1433,7 @@ func TestManager_GC_ZombieDetection(t *testing.T) { store.On("Upsert", mock.Anything, mock.AnythingOfType("*session.SessionInfo")).Return(nil) store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) - store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")).Return(nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) m.gc(ctx) @@ -1476,7 +1476,7 @@ func TestManager_GC_NoZombieWhenRecentIO(t *testing.T) { store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) - store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")).Return(nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) m.gc(ctx) @@ -1500,7 +1500,7 @@ func TestManager_GC_ExpiredMaxLifetime(t *testing.T) { Return([]string{"sess_maxlife"}, nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), nil) - store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")). + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). Return(nil) store.On("Close").Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) @@ -1541,7 +1541,7 @@ func TestManager_GC_ExpiredIdleTimeout(t *testing.T) { Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string{"sess_idle_exp"}, nil) - store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")). + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). Return(nil) store.On("Close").Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) @@ -1581,8 +1581,7 @@ func TestManager_GC_NoRetentionCleanup(t *testing.T) { Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), nil) - // DeleteTerminated is NOT expected — retention cleanup is intentionally - // removed so that TERMINATED records serve as "resume decision flags". + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) store.On("Close").Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) @@ -1603,6 +1602,7 @@ func TestManager_GC_TerminatedSessionPreserved(t *testing.T) { Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) store.On("Close").Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) @@ -1646,14 +1646,14 @@ func TestManager_GC_TerminatedSession_DBError_NoImpact(t *testing.T) { store := new(mockStore) store.Test(t) - // Even if the store had a DeleteTerminated method that errored, GC should - // be unaffected because retention cleanup is no longer performed. + // DeleteTerminated error is logged but does not propagate. store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), errors.New("db error")) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), errors.New("db error")) store.On("Close").Return(nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(errors.New("db error")) m, err := NewManager(ctx, nil, cfg, nil, store) require.NoError(t, err) @@ -1694,7 +1694,7 @@ func TestManager_GC_NoPanicOnStoreErrors(t *testing.T) { Return([]string(nil), errors.New("db error")) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")). Return([]string(nil), errors.New("db error")) - // DeleteTerminated no longer called — retention cleanup removed. + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(errors.New("db error")) store.On("Close").Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) @@ -2344,6 +2344,7 @@ func TestGC_EvictsOldTerminatedSessions(t *testing.T) { store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string(nil), nil) store.On("Close").Return(nil) + store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil) m, err := NewManager(ctx, nil, cfg, nil, store) require.NoError(t, err) diff --git a/internal/session/sql/migrations/011_session_source.sql b/internal/session/sql/migrations/011_session_source.sql new file mode 100644 index 00000000..e2357a5c --- /dev/null +++ b/internal/session/sql/migrations/011_session_source.sql @@ -0,0 +1,8 @@ +-- +goose Up +ALTER TABLE sessions ADD COLUMN source TEXT NOT NULL DEFAULT '' CHECK(source IN ('', 'cron')); +CREATE INDEX IF NOT EXISTS idx_sessions_source_state_updated ON sessions(source, state, updated_at); + +-- +goose Down +DROP INDEX IF EXISTS idx_sessions_source_state_updated; +-- Note: ALTER TABLE DROP COLUMN is not supported before SQLite 3.35.0 (2020-12-01). +-- To fully revert, recreate the sessions table without the source column. diff --git a/internal/session/sql/queries/sessions.upsert_session.sql b/internal/session/sql/queries/sessions.upsert_session.sql index e8742e63..9b559059 100644 --- a/internal/session/sql/queries/sessions.upsert_session.sql +++ b/internal/session/sql/queries/sessions.upsert_session.sql @@ -1,5 +1,5 @@ -INSERT INTO sessions (id, user_id, owner_id, bot_id, worker_session_id, worker_type, state, platform, platform_key_json, work_dir, title, created_at, updated_at, expires_at, idle_expires_at, context_json) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +INSERT INTO sessions (id, user_id, owner_id, bot_id, worker_session_id, worker_type, state, platform, platform_key_json, work_dir, title, created_at, updated_at, expires_at, idle_expires_at, context_json, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET state=excluded.state, owner_id=CASE WHEN excluded.owner_id != '' THEN excluded.owner_id ELSE sessions.owner_id END, @@ -7,4 +7,5 @@ INSERT INTO sessions (id, user_id, owner_id, bot_id, worker_session_id, worker_t expires_at=excluded.expires_at, idle_expires_at=excluded.idle_expires_at, title=CASE WHEN excluded.title != '' THEN excluded.title ELSE sessions.title END, - context_json=excluded.context_json; + context_json=excluded.context_json, + source=CASE WHEN excluded.source != '' THEN excluded.source ELSE sessions.source END; diff --git a/internal/session/sql/queries/store.delete_terminated.sql b/internal/session/sql/queries/store.delete_terminated.sql index 26760a39..c55ad2e5 100644 --- a/internal/session/sql/queries/store.delete_terminated.sql +++ b/internal/session/sql/queries/store.delete_terminated.sql @@ -1,2 +1,7 @@ --- delete_terminated deletes terminated sessions older than cutoff. -DELETE FROM sessions WHERE state=? AND updated_at <= ?; +-- delete_terminated removes terminated sessions older than the respective cutoffs. +-- cronCutoff applies to source='cron'; defaultCutoff applies to all other sessions. +-- Events lifecycle is managed independently — session deletion does not cascade. +DELETE FROM sessions WHERE state = ? AND ( + (source = 'cron' AND updated_at <= ?) OR + (source != 'cron' AND updated_at <= ?) +); diff --git a/internal/session/sql/queries/store.get_session.sql b/internal/session/sql/queries/store.get_session.sql index ee8d62b2..09e1a262 100644 --- a/internal/session/sql/queries/store.get_session.sql +++ b/internal/session/sql/queries/store.get_session.sql @@ -1,3 +1,3 @@ --- upsert_session inserts or updates a session record. -SELECT id, user_id, COALESCE(owner_id, user_id), worker_session_id, worker_type, state, bot_id, platform, platform_key_json, COALESCE(work_dir, ''), COALESCE(title, ''), created_at, updated_at, expires_at, idle_expires_at, context_json +-- get_session loads a session by ID. +SELECT id, user_id, COALESCE(owner_id, user_id), worker_session_id, worker_type, state, bot_id, platform, platform_key_json, COALESCE(work_dir, ''), COALESCE(title, ''), created_at, updated_at, expires_at, idle_expires_at, context_json, source FROM sessions WHERE id = ?; diff --git a/internal/session/sql/queries/store.list_sessions.sql b/internal/session/sql/queries/store.list_sessions.sql index f9aade54..ae8e6d4b 100644 --- a/internal/session/sql/queries/store.list_sessions.sql +++ b/internal/session/sql/queries/store.list_sessions.sql @@ -1,6 +1,6 @@ -- list_sessions lists sessions with pagination, excluding soft-deleted. -- Filters by user_id and platform if provided. -SELECT id, user_id, COALESCE(owner_id, user_id), worker_session_id, worker_type, state, bot_id, platform, platform_key_json, COALESCE(work_dir, ''), COALESCE(title, ''), created_at, updated_at, expires_at, idle_expires_at, context_json +SELECT id, user_id, COALESCE(owner_id, user_id), worker_session_id, worker_type, state, bot_id, platform, platform_key_json, COALESCE(work_dir, ''), COALESCE(title, ''), created_at, updated_at, expires_at, idle_expires_at, context_json, source FROM sessions WHERE state != 'deleted' AND (? = '' OR user_id = ?) diff --git a/internal/session/store.go b/internal/session/store.go index 6998baac..eba28fb4 100644 --- a/internal/session/store.go +++ b/internal/session/store.go @@ -20,7 +20,7 @@ type Store interface { List(ctx context.Context, userID, platform string, limit, offset int) ([]*SessionInfo, error) GetExpiredMaxLifetime(ctx context.Context, now time.Time) ([]string, error) GetExpiredIdle(ctx context.Context, now time.Time) ([]string, error) - DeleteTerminated(ctx context.Context, cutoff time.Time) error + DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error DeletePhysical(ctx context.Context, id string) error Compact(ctx context.Context, threshold float64) error GetSessionsByState(ctx context.Context, state events.SessionState) ([]string, error) @@ -86,7 +86,7 @@ func (s *SQLiteStore) Upsert(ctx context.Context, info *SessionInfo) error { info.ID, info.UserID, info.OwnerID, info.BotID, info.WorkerSessionID, info.WorkerType, string(info.State), info.Platform, string(platformKeyJSON), info.WorkDir, info.Title, info.CreatedAt, info.UpdatedAt, info.ExpiresAt, info.IdleExpiresAt, - string(ctxJSON), + string(ctxJSON), info.Source, ) if err != nil { return fmt.Errorf("session store: upsert: %w", err) @@ -105,7 +105,7 @@ func scanSession(sc rowScanner) (*SessionInfo, error) { err := sc.Scan( &info.ID, &info.UserID, &info.OwnerID, &info.WorkerSessionID, &info.WorkerType, &info.State, &info.BotID, &info.Platform, &platformKeyStr, &info.WorkDir, &info.Title, - &createdAt, &updatedAt, &expiresAt, &idleExpiresAt, &ctxJSON, + &createdAt, &updatedAt, &expiresAt, &idleExpiresAt, &ctxJSON, &info.Source, ) if err != nil { return nil, err @@ -196,8 +196,8 @@ func (s *SQLiteStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]stri } // Events lifecycle is managed independently — session deletion does not cascade to events. -func (s *SQLiteStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error { - _, err := s.db.ExecContext(ctx, queries["store.delete_terminated"], events.StateTerminated, cutoff) +func (s *SQLiteStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error { + _, err := s.db.ExecContext(ctx, queries["store.delete_terminated"], events.StateTerminated, cronCutoff, defaultCutoff) if err != nil { return fmt.Errorf("session store: delete terminated: %w", err) } diff --git a/internal/session/store_test.go b/internal/session/store_test.go index 6f3aecf6..1f62f5da 100644 --- a/internal/session/store_test.go +++ b/internal/session/store_test.go @@ -173,11 +173,46 @@ func TestSQLiteStore_DeleteTerminated(t *testing.T) { store, _ := helperDB(t) ctx := context.Background() - helperUpsert(t, store, "sess_term", "user1", events.StateTerminated) - - cutoff := time.Now().Add(time.Hour) - err := store.DeleteTerminated(ctx, cutoff) + now := time.Now() + // Cron session: terminated 25h ago → should be deleted (cutoff 24h) + require.NoError(t, store.Upsert(ctx, &SessionInfo{ + ID: "cron_old", UserID: "u1", WorkerType: "claude_code", + State: events.StateTerminated, Source: SourceCron, + CreatedAt: now.Add(-25 * time.Hour), UpdatedAt: now.Add(-25 * time.Hour), + })) + // Normal session: terminated 8d ago → should be deleted (cutoff 7d) + require.NoError(t, store.Upsert(ctx, &SessionInfo{ + ID: "normal_old", UserID: "u1", WorkerType: "claude_code", + State: events.StateTerminated, + CreatedAt: now.Add(-8 * 24 * time.Hour), UpdatedAt: now.Add(-8 * 24 * time.Hour), + })) + // Cron session: terminated 12h ago → should survive (cutoff 24h) + require.NoError(t, store.Upsert(ctx, &SessionInfo{ + ID: "cron_recent", UserID: "u1", WorkerType: "claude_code", + State: events.StateTerminated, Source: SourceCron, + CreatedAt: now.Add(-12 * time.Hour), UpdatedAt: now.Add(-12 * time.Hour), + })) + // Normal session: terminated 3d ago → should survive (cutoff 7d) + require.NoError(t, store.Upsert(ctx, &SessionInfo{ + ID: "normal_recent", UserID: "u1", WorkerType: "claude_code", + State: events.StateTerminated, + CreatedAt: now.Add(-3 * 24 * time.Hour), UpdatedAt: now.Add(-3 * 24 * time.Hour), + })) + + cronCutoff := now.Add(-24 * time.Hour) + defaultCutoff := now.Add(-7 * 24 * time.Hour) + err := store.DeleteTerminated(ctx, cronCutoff, defaultCutoff) require.NoError(t, err) + + _, err = store.Get(ctx, "cron_old") + require.ErrorIs(t, err, ErrSessionNotFound, "old cron session should be deleted") + _, err = store.Get(ctx, "normal_old") + require.ErrorIs(t, err, ErrSessionNotFound, "old normal session should be deleted") + + _, err = store.Get(ctx, "cron_recent") + require.NoError(t, err, "recent cron session should survive") + _, err = store.Get(ctx, "normal_recent") + require.NoError(t, err, "recent normal session should survive") } // ─── SQLiteStore: GetSessionsByState ───────────────────────────────────────── diff --git a/internal/worker/opencodeserver/commands.go b/internal/worker/opencodeserver/commands.go index 25aeffb6..808fa7be 100644 --- a/internal/worker/opencodeserver/commands.go +++ b/internal/worker/opencodeserver/commands.go @@ -110,11 +110,15 @@ func (c *ServerCommander) queryContextUsage(ctx context.Context) (map[string]any return nil, fmt.Errorf("opencode context query: %w", err) } var totalInput, totalOutput, totalReasoning, totalCacheRead, totalCacheWrite int + var lastInput, lastCacheRead, lastCacheWrite int var model string for _, msg := range messages { if msg.Info.Role != "assistant" || msg.Info.Tokens == nil { continue } + lastInput = msg.Info.Tokens.Input + lastCacheRead = msg.Info.Tokens.Cache.Read + lastCacheWrite = msg.Info.Tokens.Cache.Write totalInput += msg.Info.Tokens.Input totalOutput += msg.Info.Tokens.Output totalReasoning += msg.Info.Tokens.Reasoning @@ -124,9 +128,11 @@ func (c *ServerCommander) queryContextUsage(ctx context.Context) (map[string]any model = msg.Info.Model.ProviderID + "/" + msg.Info.Model.ModelID } } - totalTokens := totalInput + totalOutput + totalReasoning + totalCacheRead + totalCacheWrite + // Context fill = last assistant message's total input tokens (input + cache read + cache write). + // This represents the actual context window usage for the most recent API call. + contextFill := lastInput + lastCacheRead + lastCacheWrite return map[string]any{ - "totalTokens": totalTokens, + "totalTokens": contextFill, "maxTokens": 0, "percentage": 0, "model": model, diff --git a/internal/worker/opencodeserver/commands_test.go b/internal/worker/opencodeserver/commands_test.go index afd7ee6e..95a273f0 100644 --- a/internal/worker/opencodeserver/commands_test.go +++ b/internal/worker/opencodeserver/commands_test.go @@ -88,7 +88,7 @@ func TestServerCommanderQueryContextUsage(t *testing.T) { resp, err := c.SendControlRequest(context.Background(), "get_context_usage", nil) require.NoError(t, err) - require.Equal(t, 400, resp["totalTokens"]) + require.Equal(t, 100+30+20, resp["totalTokens"], "totalTokens = last message input + cache_read + cache_write") require.Equal(t, "anthropic/claude-sonnet-4", resp["model"]) require.Len(t, resp["categories"], 5) } @@ -115,7 +115,7 @@ func TestServerCommanderQueryContextUsageMultipleMessages(t *testing.T) { resp, err := c.SendControlRequest(context.Background(), "get_context_usage", nil) require.NoError(t, err) - require.Equal(t, 100+200+50+10+5+50+100+25+5+3, resp["totalTokens"]) + require.Equal(t, 50+5+3, resp["totalTokens"], "totalTokens = last assistant message input + cache_read + cache_write") require.Equal(t, "openai/gpt-4", resp["model"]) categories := resp["categories"].([]map[string]any) require.Len(t, categories, 5) diff --git a/pkg/events/events.go b/pkg/events/events.go index 60f99823..ae073b69 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -378,6 +378,11 @@ type WorkerCommandData struct { } // ContextUsageData carries context window usage breakdown from a worker. +// TotalTokens semantics differ by worker: +// - Claude Code: total input tokens (including cache) from the last API call. +// - OCS: last assistant message's input + cache_read + cache_write. +// +// In both cases this represents the actual context window fill, not cumulative totals. type ContextUsageData struct { TotalTokens int `json:"total_tokens"` MaxTokens int `json:"max_tokens"`