diff --git a/src/core/dashboard-ipc-server.ts b/src/core/dashboard-ipc-server.ts index 1320da3e..3db4f189 100644 --- a/src/core/dashboard-ipc-server.ts +++ b/src/core/dashboard-ipc-server.ts @@ -76,6 +76,12 @@ import type { DaemonSession } from './types.js'; import { attachSkillPolicy, detachSkillPolicy } from './skills/im-command.js'; import { readSkillRegistry } from '../services/skill-registry-store.js'; +// Daemon process start (module load ≈ daemon boot). Used by the SSE snapshot +// replay to bound the "recently closed" set to sessions that flipped +// active→closed during THIS run — i.e. restore-time zombies — without replaying +// the entire closed-session history on every connect. +const PROCESS_START_MS = Date.now(); + export interface IpcServerHandle { port: number; close: () => Promise; @@ -1602,9 +1608,31 @@ ipcRoute('GET', '/api/events', (_req, res) => { // aggregator and the browser store upsert by sessionId, so any row also // delivered live just refreshes the same entry. try { + const activeIds = new Set(); for (const ds of listActiveSessions()) { + activeIds.add(ds.session.sessionId); res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromActive(ds) })}\n\n`); } + // Also replay sessions CLOSED during this run as `session.spawned` carrying a + // closed row. The active-only replay above can't cover a restore-time zombie: + // restoreActiveSessions registers it, announces it, then immediately probes it + // 'missing' and closeSession()s it (evicting it from the active Map) — all + // before a racing dashboard's SSE subscription exists. By connect time it is + // neither in the active Map nor was it a closed row at the dashboard's early + // (pre-restore) hydrate, so without this it stays invisible (or, if the + // dashboard cached it active from before the restart, lingers as a stale + // active row — hydrateSessions only upserts, never deletes absent rows). + // Bounded to closedAt >= PROCESS_START_MS so we replay only this run's + // closures (the full closed history is already served by GET /api/sessions + // on hydrate). `session.spawned` (not session.update) because the row may be + // unknown to the client — both consumers upsert by sessionId, and the closed + // row's status:'closed' overwrites any stale active entry. + for (const s of sessionStore.listSessions()) { + if (s.status !== 'closed' || activeIds.has(s.sessionId)) continue; + const closedMs = s.closedAt ? Date.parse(s.closedAt) : NaN; + if (!Number.isFinite(closedMs) || closedMs < PROCESS_START_MS) continue; + res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromClosed(s) })}\n\n`); + } } catch (err) { logger.warn(`[dashboard-ipc] /api/events snapshot replay failed: ${err}`); } diff --git a/test/dashboard-ipc.test.ts b/test/dashboard-ipc.test.ts index 548f2c3f..4ef601b0 100644 --- a/test/dashboard-ipc.test.ts +++ b/test/dashboard-ipc.test.ts @@ -332,6 +332,47 @@ describe('GET /api/events', () => { workerPool.setActiveSessionsRegistry(new Map()); } }); + + it('replays this-run closed sessions as session.spawned (zombie-close visibility)', async () => { + // A restore-time zombie is registered, announced, then immediately + // closeSession()'d (evicted from the active Map) — all before a racing + // dashboard's SSE subscription exists. By connect time it's gone from the Map, + // so the active-only replay can't surface it. The closed-since-process-start + // replay must still deliver it as a closed row so the dashboard doesn't lose + // it (or keep a stale active entry). + const dataDir = mkdtempSync(join(tmpdir(), 'dashboard-ipc-sse-closed-')); + const prevDataDir = process.env.SESSION_DATA_DIR; + const prevConfigDataDir = config.session.dataDir; + const registry = new Map(); + try { + config.session.dataDir = dataDir; + sessionStore.init(); + workerPool.setActiveSessionsRegistry(registry); // empty — zombie already evicted + + const session = sessionStore.createSession('oc_zombie', 'om_zombie', 'zombie topic', 'group'); + session.larkAppId = ''; + session.scope = 'thread'; + session.cliId = 'codex' as any; + sessionStore.updateSession(session); + sessionStore.closeSession(session.sessionId); // closedAt = now ≥ PROCESS_START_MS + + handle = await startIpcServer({ port: 0, host: '127.0.0.1' }); + const ev = await readSseEvent( + `http://127.0.0.1:${handle.port}/api/events`, + e => e.type === 'session.spawned' && e.body?.session?.sessionId === session.sessionId, + ); + expect(ev).not.toBeNull(); + expect(ev!.body.session.status).toBe('closed'); + expect(typeof ev!.body.session.closedAt).toBe('number'); + } finally { + workerPool.setActiveSessionsRegistry(new Map()); + sessionStore.init(); + if (prevDataDir === undefined) delete process.env.SESSION_DATA_DIR; + else process.env.SESSION_DATA_DIR = prevDataDir; + config.session.dataDir = prevConfigDataDir; + rmSync(dataDir, { recursive: true, force: true }); + } + }); }); describe('GET /api/sessions/:sessionId/write-link', () => {