Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/core/dashboard-ipc-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ import type { DaemonSession } from './types.js';
import { attachSkillPolicy, detachSkillPolicy } from './skills/im-command.js';
import { readSkillRegistry } from '../services/skill-registry-store.js';

// Daemon process start (module load ≈ daemon boot). Used by the SSE snapshot
// replay to bound the "recently closed" set to sessions that flipped
// active→closed during THIS run — i.e. restore-time zombies — without replaying
// the entire closed-session history on every connect.
const PROCESS_START_MS = Date.now();

export interface IpcServerHandle {
port: number;
close: () => Promise<void>;
Expand Down Expand Up @@ -1602,9 +1608,31 @@ ipcRoute('GET', '/api/events', (_req, res) => {
// aggregator and the browser store upsert by sessionId, so any row also
// delivered live just refreshes the same entry.
try {
const activeIds = new Set<string>();
for (const ds of listActiveSessions()) {
activeIds.add(ds.session.sessionId);
res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromActive(ds) })}\n\n`);
}
// Also replay sessions CLOSED during this run as `session.spawned` carrying a
// closed row. The active-only replay above can't cover a restore-time zombie:
// restoreActiveSessions registers it, announces it, then immediately probes it
// 'missing' and closeSession()s it (evicting it from the active Map) — all
// before a racing dashboard's SSE subscription exists. By connect time it is
// neither in the active Map nor was it a closed row at the dashboard's early
// (pre-restore) hydrate, so without this it stays invisible (or, if the
// dashboard cached it active from before the restart, lingers as a stale
// active row — hydrateSessions only upserts, never deletes absent rows).
// Bounded to closedAt >= PROCESS_START_MS so we replay only this run's
// closures (the full closed history is already served by GET /api/sessions
// on hydrate). `session.spawned` (not session.update) because the row may be
// unknown to the client — both consumers upsert by sessionId, and the closed
// row's status:'closed' overwrites any stale active entry.
for (const s of sessionStore.listSessions()) {
if (s.status !== 'closed' || activeIds.has(s.sessionId)) continue;
const closedMs = s.closedAt ? Date.parse(s.closedAt) : NaN;
if (!Number.isFinite(closedMs) || closedMs < PROCESS_START_MS) continue;
res.write(`event: session.spawned\ndata: ${JSON.stringify({ session: composeRowFromClosed(s) })}\n\n`);
}
} catch (err) {
logger.warn(`[dashboard-ipc] /api/events snapshot replay failed: ${err}`);
}
Expand Down
41 changes: 41 additions & 0 deletions test/dashboard-ipc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,47 @@ describe('GET /api/events', () => {
workerPool.setActiveSessionsRegistry(new Map());
}
});

it('replays this-run closed sessions as session.spawned (zombie-close visibility)', async () => {
// A restore-time zombie is registered, announced, then immediately
// closeSession()'d (evicted from the active Map) — all before a racing
// dashboard's SSE subscription exists. By connect time it's gone from the Map,
// so the active-only replay can't surface it. The closed-since-process-start
// replay must still deliver it as a closed row so the dashboard doesn't lose
// it (or keep a stale active entry).
const dataDir = mkdtempSync(join(tmpdir(), 'dashboard-ipc-sse-closed-'));
const prevDataDir = process.env.SESSION_DATA_DIR;
const prevConfigDataDir = config.session.dataDir;
const registry = new Map<string, any>();
try {
config.session.dataDir = dataDir;
sessionStore.init();
workerPool.setActiveSessionsRegistry(registry); // empty — zombie already evicted

const session = sessionStore.createSession('oc_zombie', 'om_zombie', 'zombie topic', 'group');
session.larkAppId = '';
session.scope = 'thread';
session.cliId = 'codex' as any;
sessionStore.updateSession(session);
sessionStore.closeSession(session.sessionId); // closedAt = now ≥ PROCESS_START_MS

handle = await startIpcServer({ port: 0, host: '127.0.0.1' });
const ev = await readSseEvent(
`http://127.0.0.1:${handle.port}/api/events`,
e => e.type === 'session.spawned' && e.body?.session?.sessionId === session.sessionId,
);
expect(ev).not.toBeNull();
expect(ev!.body.session.status).toBe('closed');
expect(typeof ev!.body.session.closedAt).toBe('number');
} finally {
workerPool.setActiveSessionsRegistry(new Map());
sessionStore.init();
if (prevDataDir === undefined) delete process.env.SESSION_DATA_DIR;
else process.env.SESSION_DATA_DIR = prevDataDir;
config.session.dataDir = prevConfigDataDir;
rmSync(dataDir, { recursive: true, force: true });
}
});
});

describe('GET /api/sessions/:sessionId/write-link', () => {
Expand Down