Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/architecture/Worker-Gateway-Design.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ CREATE TABLE sessions (
| ------- | ----------------------------------------------- | ---------------------------- |
| Session | `IDLE` 超过 `idle_timeout`(默认 60min) | → `TERMINATED`,清理 runtime |
| Session | 总存活超过 `max_lifetime`(默认 24h) | → `TERMINATED` |
| Session | `TERMINATED` 超过 `retention_period`(默认 7d) | → `DELETED`(删除 DB 记录) |
| Session | `TERMINATED` cron 类超过 `cron_term_retention`(默认 24h) | → `DELETED`(删除 DB 记录) |
| Session | `TERMINATED` 其他超过 `term_retention`(默认 7d) | → `DELETED`(删除 DB 记录) |

GC 扫描间隔:60s,后台 goroutine 定期执行。

Expand Down
8 changes: 7 additions & 1 deletion docs/explanation/session-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ IDLE session 的 idle_expires_at = entered_idle_at + IdleTimeout。
到期后 TERMINATED,回收暂停的 Worker 进程。
```

**特别注意**:GC 不执行物理删除(`DELETE FROM sessions`)。TERMINATED 的记录保留作为"resume 决策标志"——它们的存在告诉 Gateway 可以尝试 `--resume` 恢复对话历史。
**第 3 层:TERMINATED 记录清理**
```
TERMINATED session 按 source 差异化保留:
cron 类:updated_at ≤ now - cron_term_retention(默认 24h)→ DELETE
其他:updated_at ≤ now - term_retention(默认 7d)→ DELETE
```
TERMINATED 记录在保留期内作为"resume 决策标志"——它们的存在告诉 Gateway 可以尝试 `--resume` 恢复对话历史。

### Fast Reconnect 优化

Expand Down
2 changes: 2 additions & 0 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Session 生命周期管理配置。
| 字段 | 类型 | 默认值 | 环境变量 | 说明 |
|------|------|--------|----------|------|
| `retention_period` | duration | `168h` (7天) | `HOTPLEX_SESSION_RETENTION_PERIOD` | 事件和日志的数据保留期。过期数据由 GC 扫描清理 |
| `term_retention` | duration | `168h` (7天) | `HOTPLEX_SESSION_TERM_RETENTION` | 普通 TERMINATED session 数据库记录保留期。过期后物理删除 |
| `cron_term_retention` | duration | `24h` | `HOTPLEX_SESSION_CRON_TERM_RETENTION` | Cron 类 TERMINATED session 数据库记录保留期。过期后物理删除 |
| `gc_scan_interval` | duration | `1m` | — | GC 扫描间隔。定期扫描过期 Session 并清理 |
| `max_concurrent` | int | `1000` | `HOTPLEX_SESSION_MAX_CONCURRENT` | 最大并发 Session 数。达到上限后新请求被拒绝 |

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Session 垃圾回收机制(`internal/session/manager.go`)。定期扫描(
- **IDLE 超时**:`idle_expires_at ≤ now` → TERMINATED
- **最大生命周期**:`expires_at ≤ now` → TERMINATED
- **僵尸检测**:RUNNING session 的 `LastIO()` 超过 execution_timeout(默认 30 分钟)→ TERMINATED
- **保留期清理**:TERMINATED session 的 `updated_at ≤ now - retention_period` → DELETE
- **保留期清理**:TERMINATED session 按 source 差异化保留:cron 类保留 24h、其他保留 7d(`cron_term_retention` / `term_retention`),过期后 `DELETE FROM sessions`

---

Expand Down
2 changes: 1 addition & 1 deletion docs/specs/Turn-Summary-Spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ type TurnSummaryData struct {
TotalCostUSD float64

// 额外实现字段
ContextFill int64
ContextFill int64 // 仅从控制通道获取,消除多步 turn 累计膨胀
TotalOutputTok int64
SessionDuration string
WorkDir string
Expand Down
6 changes: 3 additions & 3 deletions e2e/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func (m *mockStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]string
return args.Get(0).([]string), args.Error(1)
}

func (m *mockStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error {
args := m.Called(ctx, cutoff)
func (m *mockStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error {
args := m.Called(ctx, cronCutoff, defaultCutoff)
return args.Error(0)
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func setupTestGateway(t *testing.T) *testGateway {
store.On("Close").Return(nil)
store.On("GetExpiredMaxLifetime", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string{}, nil)
store.On("GetExpiredIdle", mock.Anything, mock.AnythingOfType("time.Time")).Return([]string{}, nil)
store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time")).Return(nil)
store.On("DeleteTerminated", mock.Anything, mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Return(nil)
store.On("List", mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int"), mock.AnythingOfType("int")).Return([]*session.SessionInfo{}, nil)
// Get falls back to store when session is not in Manager's in-memory map.
// Return not-found for all store lookups (Manager holds sessions in memory after Create).
Expand Down
18 changes: 12 additions & 6 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,11 @@ type SecurityConfig struct {

// SessionConfig holds session lifecycle settings.
type SessionConfig struct {
RetentionPeriod time.Duration `mapstructure:"retention_period"`
GCScanInterval time.Duration `mapstructure:"gc_scan_interval"`
MaxConcurrent int `mapstructure:"max_concurrent"`
RetentionPeriod time.Duration `mapstructure:"retention_period"` // max session lifetime (default 7d)
GCScanInterval time.Duration `mapstructure:"gc_scan_interval"` // GC scan interval (default 1m)
MaxConcurrent int `mapstructure:"max_concurrent"` // max concurrent sessions
TermRetention time.Duration `mapstructure:"term_retention"` // DB retention for terminated sessions (default 7d)
CronTermRetention time.Duration `mapstructure:"cron_term_retention"` // DB retention for terminated cron sessions (default 24h)
}

// PoolConfig holds session pool settings.
Expand Down Expand Up @@ -611,9 +613,11 @@ func Default() *Config {
AllowedOrigins: []string{"*"},
},
Session: SessionConfig{
RetentionPeriod: 7 * 24 * time.Hour,
GCScanInterval: 1 * time.Minute,
MaxConcurrent: 1000,
RetentionPeriod: 7 * 24 * time.Hour,
GCScanInterval: 1 * time.Minute,
MaxConcurrent: 1000,
TermRetention: 7 * 24 * time.Hour,
CronTermRetention: 24 * time.Hour,
},
Pool: PoolConfig{
MinSize: 0,
Expand Down Expand Up @@ -818,6 +822,8 @@ func Load(filePath string) (*Config, error) {
_ = v.BindEnv("admin.addr")
_ = v.BindEnv("session.max_concurrent")
_ = v.BindEnv("session.retention_period")
_ = v.BindEnv("session.term_retention")
_ = v.BindEnv("session.cron_term_retention")
_ = v.BindEnv("pool.max_size")
_ = v.BindEnv("pool.max_idle_per_user")
_ = v.BindEnv("pool.max_memory_per_user")
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func TestDefault(t *testing.T) {
require.Equal(t, 5, cfg.Pool.MaxIdlePerUser)
require.Equal(t, 7*24*time.Hour, cfg.Session.RetentionPeriod)
require.Equal(t, 1*time.Minute, cfg.Session.GCScanInterval)
require.Equal(t, 7*24*time.Hour, cfg.Session.TermRetention)
require.Equal(t, 24*time.Hour, cfg.Session.CronTermRetention)
require.False(t, cfg.Security.TLSEnabled)
require.True(t, cfg.Admin.Enabled)
require.Equal(t, "localhost:9999", cfg.Admin.Addr)
Expand Down
2 changes: 1 addition & 1 deletion internal/gateway/bridge_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (b *Bridge) forwardEvents(w worker.Worker, sessionID string, opts forwardOp
acc.computePerTurnDeltas()

// Query precise context usage from worker via control channel.
// Silently falls back to aggregated Done stats on failure.
// ContextFill stays 0 on failure — no inflated fallback.
if cr, ok := w.(worker.ControlRequester); ok {
fetchContextUsage(cr, acc)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/gateway/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ func (m *mockSessionStoreForBotID) GetExpiredIdle(ctx context.Context, now time.
return args.Get(0).([]string), args.Error(1)
}

func (m *mockSessionStoreForBotID) DeleteTerminated(ctx context.Context, cutoff time.Time) error {
args := m.Called(ctx, cutoff)
func (m *mockSessionStoreForBotID) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error {
args := m.Called(ctx, cronCutoff, defaultCutoff)
return args.Error(0)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/gateway/ctrl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (m *mockStore) GetExpiredIdle(ctx context.Context, now time.Time) ([]string
return args.Get(0).([]string), args.Error(1)
}

func (m *mockStore) DeleteTerminated(ctx context.Context, cutoff time.Time) error {
args := m.Called(ctx, cutoff)
func (m *mockStore) DeleteTerminated(ctx context.Context, cronCutoff, defaultCutoff time.Time) error {
args := m.Called(ctx, cronCutoff, defaultCutoff)
return args.Error(0)
}

Expand Down
13 changes: 6 additions & 7 deletions internal/gateway/session_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type sessionAccumulator struct {
TotalCostUSD float64
TotalInput int64 // cumulative input tokens consumed across turns
TotalOutput int64
ContextWindow int64 // from modelUsage.contextWindow (0 = unknown)
ContextFill int64 // latest turn's input_tokens (total including cache, bounded by ContextWindow)
ContextWindow int64 // from modelUsage.contextWindow or get_context_usage.maxTokens (0 = unknown)
ContextFill int64 // context window fill from get_context_usage control channel (0 if unavailable)
ModelName string // first model seen
StartedAt time.Time
WorkDir string // session working directory
Expand Down Expand Up @@ -58,7 +58,6 @@ func (a *sessionAccumulator) mergePerTurnStats(data events.DoneData) {
events.ToInt64(usage["cache_creation_input_tokens"]) +
events.ToInt64(usage["cache_read_input_tokens"])
a.TotalInput += input
a.ContextFill = input
a.TotalOutput += events.ToInt64(usage["output_tokens"])
a.TotalCacheWrite += events.ToInt64(usage["cache_creation_input_tokens"])
a.TotalCacheRead += events.ToInt64(usage["cache_read_input_tokens"])
Expand All @@ -68,7 +67,6 @@ func (a *sessionAccumulator) mergePerTurnStats(data events.DoneData) {
events.ToInt64(tokens["cache_read"]) +
events.ToInt64(tokens["cache_write"])
a.TotalInput += input
a.ContextFill = input
a.TotalOutput += events.ToInt64(tokens["output"])
a.TotalCacheWrite += events.ToInt64(tokens["cache_write"])
a.TotalCacheRead += events.ToInt64(tokens["cache_read"])
Expand Down Expand Up @@ -125,10 +123,11 @@ func (a *sessionAccumulator) resetPerTurn() {
a.PerTurnCacheWrite = 0
a.PerTurnCacheRead = 0
a.TurnDurationMs = 0
a.ContextFill = 0
}

// mergeContextUsage updates ContextFill, ContextWindow, and ModelName from precise worker control data.
// Called after get_context_usage returns; overrides the aggregated Done event values.
// mergeContextUsage sets ContextFill, ContextWindow, and ModelName from the worker's
// get_context_usage control channel response. This is the sole source for ContextFill.
// Supports partial data: updates ModelName even when MaxTokens is 0 (OCS scenario).
func (a *sessionAccumulator) mergeContextUsage(cu *events.ContextUsageData) {
if cu == nil {
Expand All @@ -146,7 +145,7 @@ func (a *sessionAccumulator) mergeContextUsage(cu *events.ContextUsageData) {
}

// computeContextPct returns context window usage percentage (0-100).
// Data comes from get_context_usage control channel (precise) or Done event usage (fallback).
// Returns 0 if either ContextFill or ContextWindow is unset (control channel unavailable).
func (a *sessionAccumulator) computeContextPct() float64 {
if a.ContextWindow <= 0 || a.ContextFill <= 0 {
return 0
Expand Down
24 changes: 16 additions & 8 deletions internal/gateway/session_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) {
})

require.Equal(t, int64(23434), acc.TotalInput)
require.Equal(t, int64(23434), acc.ContextFill)
require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event usage")
require.Equal(t, int64(3821), acc.TotalOutput)
require.Equal(t, int64(200000), acc.ContextWindow)
require.Equal(t, "Sonnet", acc.ModelName)
Expand All @@ -53,7 +53,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) {
})

require.Equal(t, int64(8400+2000+500), acc.TotalInput)
require.Equal(t, int64(8400+2000+500), acc.ContextFill)
require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event tokens")
require.Equal(t, int64(3634), acc.TotalOutput)
require.InDelta(t, 0.0234, acc.TotalCostUSD, 0.0001)
})
Expand All @@ -80,13 +80,13 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) {
},
},
})
require.Equal(t, int64(90000), acc.ContextFill, "ContextFill = input + cache_creation + cache_read")
require.Equal(t, int64(0), acc.ContextFill, "ContextFill must not be set from Done event usage")
require.Equal(t, int64(90000), acc.TotalInput, "TotalInput = input + cache_creation + cache_read")
pct := acc.computeContextPct()
require.Equal(t, 45.0, pct, "context % must be 90000/200000 = 45%%")
require.Equal(t, 0.0, pct, "context % must be 0 when ContextFill is not set from control channel")
})

t.Run("context fill overwritten by latest turn", func(t *testing.T) {
t.Run("total input accumulates across turns, context fill only from control channel", func(t *testing.T) {
acc := &sessionAccumulator{StartedAt: time.Now()}

// Turn 1
Expand All @@ -104,10 +104,18 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) {
"total_cost_usd": 0.05,
},
})
require.Equal(t, int64(10000), acc.ContextFill)
require.Equal(t, int64(0), acc.ContextFill, "ContextFill not set by mergePerTurnStats")
require.Equal(t, int64(10000), acc.TotalInput)

// Turn 2: smaller input — ContextFill reflects latest, TotalInput accumulates
// Simulate get_context_usage override for Turn 1
acc.mergeContextUsage(&events.ContextUsageData{TotalTokens: 58000, MaxTokens: 200000})
require.Equal(t, int64(58000), acc.ContextFill, "ContextFill set by mergeContextUsage")

// resetPerTurn clears ContextFill
acc.resetPerTurn()
require.Equal(t, int64(0), acc.ContextFill, "ContextFill cleared by resetPerTurn")

// Turn 2: smaller input — TotalInput accumulates, ContextFill stays 0 until control channel
acc.mergePerTurnStats(events.DoneData{
Stats: map[string]any{
"usage": map[string]any{
Expand All @@ -118,7 +126,7 @@ func TestSessionAccumulator_MergePerTurnStats(t *testing.T) {
},
})

require.Equal(t, int64(5000), acc.ContextFill) // latest turn only
require.Equal(t, int64(0), acc.ContextFill) // not set from Done event
require.Equal(t, int64(15000), acc.TotalInput) // cumulative
require.Equal(t, int64(3000), acc.TotalOutput)
require.Equal(t, int64(200000), acc.ContextWindow)
Expand Down
24 changes: 24 additions & 0 deletions internal/session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Manager struct {
// After this duration, they are evicted from the in-memory map (DB records preserved).
const terminatedSessionTTL = 24 * time.Hour

// Session source constants — used for differential DB retention.
const (
SourceCron = "cron" // cron-triggered session (24h retention)
)

// runningIndex helpers — use riMu (independent of m.mu/ms.mu) to avoid lock ordering issues.

func (m *Manager) addToRunningIndex(id string) {
Expand Down Expand Up @@ -137,6 +142,9 @@ type SessionInfo struct {
// Title is the user-facing session name. Used as DeriveSessionKey input for WebChat sessions.
// Empty for Slack/Feishu sessions (they use DerivePlatformSessionKey instead).
Title string `json:"title,omitempty"`
// Source identifies the session origin: "" (user-initiated) or "cron" (cron-triggered).
// Used for differential DB retention — cron sessions are cleaned up after 24h vs 7d for normal.
Source string `json:"source,omitempty"`
}

// NewManager creates a new session manager using the provided Store.
Expand Down Expand Up @@ -175,6 +183,10 @@ func (m *Manager) Create(ctx context.Context, id, userID string, workerType work
// CreateWithBot creates a new session with explicit bot_id and persists it to SQLite.
func (m *Manager) CreateWithBot(ctx context.Context, id, userID, botID string, workerType worker.WorkerType, allowedTools []string, platform string, platformKey map[string]string, workDir, title string) (*SessionInfo, error) {
now := time.Now()
source := ""
if _, isCron := platformKey["cron_job_id"]; isCron {
source = SourceCron
}
info := &SessionInfo{
ID: id,
UserID: userID,
Expand All @@ -189,6 +201,7 @@ func (m *Manager) CreateWithBot(ctx context.Context, id, userID, botID string, w
PlatformKey: platformKey,
WorkDir: workDir,
Title: title,
Source: source,
}

if err := m.store.Upsert(ctx, info); err != nil {
Expand Down Expand Up @@ -1067,6 +1080,17 @@ func (m *Manager) gc(ctx context.Context) {
m.log.Info("session: gc evicted TERMINATED sessions from memory",
"count", evicted, "ttl", terminatedSessionTTL)
}
// 4. Delete old TERMINATED sessions from DB with source-based retention.
// Cron sessions: CronTermRetention, normal sessions: TermRetention. Events are not cascaded.
cfg := m.cfg
if m.cfgStore != nil {
cfg = m.cfgStore.Load()
}
cronCutoff := now.Add(-cfg.Session.CronTermRetention)
defaultCutoff := now.Add(-cfg.Session.TermRetention)
if err := m.store.DeleteTerminated(ctx, cronCutoff, defaultCutoff); err != nil {
m.log.Error("session: gc (delete_terminated) failed", "err", err)
}
}

// ─── Helpers ─────────────────────────────────────────────────────────────────
Expand Down
Loading