diff --git a/backend/src/lib/sse.ts b/backend/src/lib/sse.ts new file mode 100644 index 00000000..96d38223 --- /dev/null +++ b/backend/src/lib/sse.ts @@ -0,0 +1,53 @@ +import type { Request, Response } from "express"; + +export function startSseStream( + req: Request, + res: Response, + options: { heartbeatMs?: number } = {}, +) { + const heartbeatMs = options.heartbeatMs ?? 15000; + let closed = false; + + // Disable all socket-level timeouts so long-running tool calls (e.g. + // TrustFoundry agentic search) don't get killed mid-stream. + req.setTimeout(0); + res.setTimeout(0); + if (req.socket) { + req.socket.setTimeout(0); + req.socket.setNoDelay(true); + } + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders(); + + const write = (line: string) => { + if (closed || res.writableEnded) return; + res.write(line); + }; + + const heartbeat = setInterval(() => { + write(": keepalive\n\n"); + }, heartbeatMs); + + const abort = new AbortController(); + + const stop = () => { + closed = true; + clearInterval(heartbeat); + if (!abort.signal.aborted) abort.abort(); + }; + + req.on("close", stop); + + return { + write, + signal: abort.signal, + close: () => { + stop(); + req.off("close", stop); + }, + }; +} diff --git a/backend/src/routes/chat.ts b/backend/src/routes/chat.ts index ecf2dfea..581763b4 100644 --- a/backend/src/routes/chat.ts +++ b/backend/src/routes/chat.ts @@ -20,6 +20,7 @@ import { } from "../lib/userSettings"; import { checkProjectAccess } from "../lib/access"; import { safeErrorLog, safeErrorMessage } from "../lib/safeError"; +import { startSseStream } from "../lib/sse"; export const chatRouter = Router(); @@ -574,18 +575,8 @@ chatRouter.post("/", requireAuth, async (req, res) => { workflowCount: Object.keys(workflowStore).length, }); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - res.setHeader("X-Accel-Buffering", "no"); - res.flushHeaders(); - - const write = (line: string) => res.write(line); - const streamAbort = new AbortController(); - let streamFinished = false; - res.on("close", () => { - if (!streamFinished) streamAbort.abort(); - }); + const stream = startSseStream(req, res); + const { write } = stream; try { write(`data: ${JSON.stringify({ type: "chat_id", chatId })}\n\n`); @@ -601,7 +592,7 @@ chatRouter.post("/", requireAuth, async (req, res) => { includeResearchTools: legalResearchUs, model, apiKeys, - signal: streamAbort.signal, + signal: stream.signal, projectId: resolvedProjectId, }); @@ -684,7 +675,7 @@ chatRouter.post("/", requireAuth, async (req, res) => { /* ignore */ } } finally { - streamFinished = true; + stream.close(); res.end(); } }); diff --git a/backend/src/routes/projectChat.ts b/backend/src/routes/projectChat.ts index 1a3a9eab..e438024d 100644 --- a/backend/src/routes/projectChat.ts +++ b/backend/src/routes/projectChat.ts @@ -20,6 +20,7 @@ import { } from "../lib/userSettings"; import { checkProjectAccess } from "../lib/access"; import { safeErrorLog, safeErrorMessage } from "../lib/safeError"; +import { startSseStream } from "../lib/sse"; const PROJECT_SYSTEM_PROMPT_EXTRA = `PROJECT CONTEXT: You are operating within a project folder that contains a collection of legal documents the user has organised for a single matter. The user's questions will usually refer to one or more documents in this project — your job is to find the relevant files to work on. Use list_documents to see what is available and fetch_documents / read_document to pull in any documents you need before answering. @@ -157,18 +158,8 @@ projectChatRouter.post("/", requireAuth, async (req, res) => { const workflowStore = await buildWorkflowStore(userId, userEmail, db); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - res.setHeader("X-Accel-Buffering", "no"); - res.flushHeaders(); - - const write = (line: string) => res.write(line); - const streamAbort = new AbortController(); - let streamFinished = false; - res.on("close", () => { - if (!streamFinished) streamAbort.abort(); - }); + const stream = startSseStream(req, res); + const { write } = stream; try { write(`data: ${JSON.stringify({ type: "chat_id", chatId })}\n\n`); @@ -185,7 +176,7 @@ projectChatRouter.post("/", requireAuth, async (req, res) => { includeResearchTools: legalResearchUs, model, apiKeys, - signal: streamAbort.signal, + signal: stream.signal, projectId, }); @@ -265,7 +256,7 @@ projectChatRouter.post("/", requireAuth, async (req, res) => { /* ignore */ } } finally { - streamFinished = true; + stream.close(); res.end(); } }); diff --git a/backend/src/routes/tabular.ts b/backend/src/routes/tabular.ts index 46bea1cb..d0f83d29 100644 --- a/backend/src/routes/tabular.ts +++ b/backend/src/routes/tabular.ts @@ -32,6 +32,7 @@ import { listAccessibleProjectIds, } from "../lib/access"; import { safeErrorLog, safeErrorMessage } from "../lib/safeError"; +import { startSseStream } from "../lib/sse"; function formatPromptSuffix(format?: string, tags?: string[]): string { switch (format) { @@ -946,13 +947,8 @@ tabularRouter.post("/:reviewId/generate", requireAuth, async (req, res) => { }); } - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - res.setHeader("X-Accel-Buffering", "no"); - res.flushHeaders(); - - const write = (line: string) => res.write(line); + const stream = startSseStream(req, res); + const { write } = stream; try { await Promise.all( @@ -1073,6 +1069,7 @@ tabularRouter.post("/:reviewId/generate", requireAuth, async (req, res) => { /* ignore */ } } finally { + stream.close(); res.end(); } }); @@ -1414,17 +1411,8 @@ tabularRouter.post("/:reviewId/chat", requireAuth, async (req, res) => { review.title || "Untitled Review", ); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - res.setHeader("X-Accel-Buffering", "no"); - res.flushHeaders(); - const write = (line: string) => res.write(line); - const streamAbort = new AbortController(); - let streamFinished = false; - res.on("close", () => { - if (!streamFinished) streamAbort.abort(); - }); + const stream = startSseStream(req, res); + const { write } = stream; if (chatId) { write(`data: ${JSON.stringify({ type: "chat_id", chatId })}\n\n`); @@ -1445,7 +1433,7 @@ tabularRouter.post("/:reviewId/chat", requireAuth, async (req, res) => { extractTabularAnnotations(text, tabularStore), model: tabular_model, apiKeys: api_keys, - signal: streamAbort.signal, + signal: stream.signal, }); const persistedEvents = stripTransientAssistantEvents(events); @@ -1555,7 +1543,7 @@ tabularRouter.post("/:reviewId/chat", requireAuth, async (req, res) => { /* ignore */ } } finally { - streamFinished = true; + stream.close(); res.end(); } });