From e6ab9c121d5f58036a46ab2dc8aabb3777b548e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=94=B3=E6=99=97?= Date: Tue, 23 Jun 2026 11:06:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(dashboard):=20SSE=20=E5=BB=BA=E8=BF=9E?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E6=9C=AC=E8=BF=9B=E7=A8=8B=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E7=9A=84=E4=BC=9A=E8=AF=9D=EF=BC=8C=E4=BF=AE=20restore=20?= =?UTF-8?q?=E6=9C=9F=20zombie=20row=20=E4=B8=A2=E5=A4=B1/=E6=AE=8B?= =?UTF-8?q?=E7=95=99=20stale-active?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #277 的 follow-up(Codex 二审第三轮发现)。 #277 让 /api/events 建连先 subscribe 再回放当前活跃会话,确定性修住了「恢复后仍 active」的行;但只遍历 active Map。restore 期间的 zombie(backing pane 单独死掉) 是「先 announceSessionRow 再立刻 closeSession 从 Map 删除」,这俩事件都早于一个正 在 descriptor→restore 窗口里重连的 dashboard 的 SSE 订阅 → 全丢;等 SSE 连上时 zombie 已不在 active Map,active-only 回放补不到它(它此刻是 closed row)。新建 aggregator 永远看不到它;若 dashboard 原本缓存它为 active,hydrateSessions 只 upsert 不删 absent → 残留 stale active(已死会话显示成活的)。 修:snapshot-on-connect 在回放活跃会话后,再回放「closedAt ≥ 进程启动时刻」的已关 会话(composeRowFromClosed 作 session.spawned)。按进程启动时刻过滤 = 只补本 run 的关闭(含 restore 期 zombie),不回放整个 closed 历史(那本就由 GET /api/sessions hydrate 提供)。用 session.spawned 因目标行可能未知,两端按 sessionId upsert,closed 行的 status 覆盖任何 stale active 条目。补回归测试。 --- src/core/dashboard-ipc-server.ts | 28 ++++++++++++++++++++++ test/dashboard-ipc.test.ts | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/core/dashboard-ipc-server.ts b/src/core/dashboard-ipc-server.ts index 1320da3e..3db4f189 100644 --- a/src/core/dashboard-ipc-server.ts +++ b/src/core/dashboard-ipc-server.ts @@ -76,6 +76,12 @@ import type { DaemonSession } from './types.js'; import { attachSkillPolicy, detachSkillPolicy } from './skills/im-command.js'; import { readSkillRegistry } from '../services/skill-registry-store.js'; +// Daemon process start (module load ≈ daemon boot). Used by the SSE snapshot +// replay to bound the "recently closed" set to sessions that flipped +// active→closed during THIS run — i.e. restore-time zombies — without replaying +// the entire closed-session history on every connect. +const PROCESS_START_MS = Date.now(); + export interface IpcServerHandle { port: number; close: () => Promise; @@ -1602,9 +1608,31 @@ ipcRoute('GET', '/api/events', (_req, res) => { // aggregator and the browser store upsert by sessionId, so any row also // delivered live just refreshes the same entry. try { + const activeIds = new Set(); for (const ds of listActiveSessions()) { + activeIds.add(ds.session.sessionId); res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromActive(ds) })}\n\n`); } + // Also replay sessions CLOSED during this run as `session.spawned` carrying a + // closed row. The active-only replay above can't cover a restore-time zombie: + // restoreActiveSessions registers it, announces it, then immediately probes it + // 'missing' and closeSession()s it (evicting it from the active Map) — all + // before a racing dashboard's SSE subscription exists. By connect time it is + // neither in the active Map nor was it a closed row at the dashboard's early + // (pre-restore) hydrate, so without this it stays invisible (or, if the + // dashboard cached it active from before the restart, lingers as a stale + // active row — hydrateSessions only upserts, never deletes absent rows). + // Bounded to closedAt >= PROCESS_START_MS so we replay only this run's + // closures (the full closed history is already served by GET /api/sessions + // on hydrate). `session.spawned` (not session.update) because the row may be + // unknown to the client — both consumers upsert by sessionId, and the closed + // row's status:'closed' overwrites any stale active entry. + for (const s of sessionStore.listSessions()) { + if (s.status !== 'closed' || activeIds.has(s.sessionId)) continue; + const closedMs = s.closedAt ? Date.parse(s.closedAt) : NaN; + if (!Number.isFinite(closedMs) || closedMs < PROCESS_START_MS) continue; + res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromClosed(s) })}\n\n`); + } } catch (err) { logger.warn(`[dashboard-ipc] /api/events snapshot replay failed: ${err}`); } diff --git a/test/dashboard-ipc.test.ts b/test/dashboard-ipc.test.ts index 548f2c3f..4ef601b0 100644 --- a/test/dashboard-ipc.test.ts +++ b/test/dashboard-ipc.test.ts @@ -332,6 +332,47 @@ describe('GET /api/events', () => { workerPool.setActiveSessionsRegistry(new Map()); } }); + + it('replays this-run closed sessions as session.spawned (zombie-close visibility)', async () => { + // A restore-time zombie is registered, announced, then immediately + // closeSession()'d (evicted from the active Map) — all before a racing + // dashboard's SSE subscription exists. By connect time it's gone from the Map, + // so the active-only replay can't surface it. The closed-since-process-start + // replay must still deliver it as a closed row so the dashboard doesn't lose + // it (or keep a stale active entry). + const dataDir = mkdtempSync(join(tmpdir(), 'dashboard-ipc-sse-closed-')); + const prevDataDir = process.env.SESSION_DATA_DIR; + const prevConfigDataDir = config.session.dataDir; + const registry = new Map(); + try { + config.session.dataDir = dataDir; + sessionStore.init(); + workerPool.setActiveSessionsRegistry(registry); // empty — zombie already evicted + + const session = sessionStore.createSession('oc_zombie', 'om_zombie', 'zombie topic', 'group'); + session.larkAppId = ''; + session.scope = 'thread'; + session.cliId = 'codex' as any; + sessionStore.updateSession(session); + sessionStore.closeSession(session.sessionId); // closedAt = now ≥ PROCESS_START_MS + + handle = await startIpcServer({ port: 0, host: '127.0.0.1' }); + const ev = await readSseEvent( + `http://127.0.0.1:${handle.port}/api/events`, + e => e.type === 'session.spawned' && e.body?.session?.sessionId === session.sessionId, + ); + expect(ev).not.toBeNull(); + expect(ev!.body.session.status).toBe('closed'); + expect(typeof ev!.body.session.closedAt).toBe('number'); + } finally { + workerPool.setActiveSessionsRegistry(new Map()); + sessionStore.init(); + if (prevDataDir === undefined) delete process.env.SESSION_DATA_DIR; + else process.env.SESSION_DATA_DIR = prevDataDir; + config.session.dataDir = prevConfigDataDir; + rmSync(dataDir, { recursive: true, force: true }); + } + }); }); describe('GET /api/sessions/:sessionId/write-link', () => {