From dff418d8c130d49d86e5a66a75a7ec7aaa71962a Mon Sep 17 00:00:00 2001 From: Tobias Grosse-Puppendahl Date: Sat, 6 Jun 2026 07:25:34 +0200 Subject: [PATCH 1/3] fix(duckdb): stop orphaned queries on timeout/cancel to prevent worker hangs Query timeouts (and user cancellations) raced the native DuckDB operation via Promise.race, but the loser was never cancelled: prepared.run() kept executing on a libuv worker thread while the caller's finally immediately disconnected the connection. Closing a connection out from under a live native query can trip an extension abort() that kills the whole worker/API process, and the orphaned queries starve the (default 4-thread) libuv pool so unrelated jobs hang in "Thinking". withQueryTimeout now interrupts the query and waits for the native operation to actually settle (bounded by QUERY_INTERRUPT_GRACE_MS) before rejecting. The new safeDisconnect helper defers teardown until any still-unwinding query finishes, so a connection is never closed mid-query. Wired safeDisconnect into every query call site (agent tools, MCP tools, SQL AST validation, DuckDB console, connections + data-browser API routes) and raised UV_THREADPOOL_SIZE in the container entrypoint as defense-in-depth. Co-authored-by: Cursor --- .env.example | 8 + .../connections-firebird.integration.test.ts | 1 + .../connections-reinit.integration.test.ts | 1 + apps/api/src/routes/connections.ts | 6 +- apps/api/src/routes/data-browser.ts | 6 +- entrypoint.sh | 9 ++ packages/core/src/services/agent-tools.ts | 11 +- packages/core/src/services/duckdb-console.ts | 5 +- packages/core/src/services/duckdb.test.ts | 109 ++++++++++++++ packages/core/src/services/duckdb.ts | 140 ++++++++++++++++-- packages/core/src/services/mcp-tools.ts | 6 +- .../core/src/services/sql-ast-validation.ts | 6 +- 12 files changed, 280 insertions(+), 28 deletions(-) diff --git a/.env.example b/.env.example index 7586f83..9d2d385 100644 --- a/.env.example +++ b/.env.example @@ -51,6 +51,14 @@ AGENT_API_KEY=your-api-key # On timeout, the DuckDB query is cancelled via interrupt(). # QUERY_TIMEOUT_MS=30000 +# How long (ms) to wait for an interrupted query to actually unwind after a +# timeout/cancellation before giving up (default: 30000). The connection is +# only torn down once the native query has stopped, so it is never closed out +# from under a live operation (which can crash the worker process). A query +# that ignores the interrupt past this window has its teardown deferred rather +# than blocking the run forever. +# QUERY_INTERRUPT_GRACE_MS=30000 + # Max concurrent DuckDB queries per project (default: 10). # Limits parallel queries to prevent resource exhaustion. # Additional requests wait up to QUERY_TIMEOUT_MS for a slot. diff --git a/apps/api/src/routes/connections-firebird.integration.test.ts b/apps/api/src/routes/connections-firebird.integration.test.ts index 96c85ef..ca5bf1f 100644 --- a/apps/api/src/routes/connections-firebird.integration.test.ts +++ b/apps/api/src/routes/connections-firebird.integration.test.ts @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({ getProjectInstance: vi.fn(), testSingleConnection: mocks.testSingleConnection, withQueryTimeout: vi.fn(async (_db: unknown, op: () => Promise) => op()), + safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()), })); import { createTestApp, jsonBody } from "../test-utils/api-client"; diff --git a/apps/api/src/routes/connections-reinit.integration.test.ts b/apps/api/src/routes/connections-reinit.integration.test.ts index 885ba88..3ab2fbe 100644 --- a/apps/api/src/routes/connections-reinit.integration.test.ts +++ b/apps/api/src/routes/connections-reinit.integration.test.ts @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({ getProjectInstance: mocks.getProjectInstance, testSingleConnection: mocks.testSingleConnection, withQueryTimeout: mocks.withQueryTimeout, + safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()), })); import { createTestApp, jsonBody } from "../test-utils/api-client"; diff --git a/apps/api/src/routes/connections.ts b/apps/api/src/routes/connections.ts index d7de29d..2b9a45c 100644 --- a/apps/api/src/routes/connections.ts +++ b/apps/api/src/routes/connections.ts @@ -3,7 +3,7 @@ import { zValidator } from "@hono/zod-validator"; import { z } from "zod/v4"; import { connectDB } from "@archmax/core/infra/db"; import { Connection, CONNECTION_TYPES, SLUG_PATTERN, slugifyConnectionName, Project, type IConnectionDocument } from "@archmax/core/models/index"; -import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb"; +import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, safeDisconnect, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb"; import { encryptConnectionCredentials, decryptConnectionCredentials } from "@archmax/core/infra/crypto"; import { customFirebirdEnabled, getEnv } from "@archmax/core/config/env"; import { AppError } from "../utils/errors"; @@ -251,7 +251,7 @@ const app = new Hono() try { await withQueryTimeout(db, () => db.run("SELECT 1"), CONNECTION_TEST_TIMEOUT_MS); } finally { - db.disconnectSync(); + safeDisconnect(db); } return c.json({ ok: true }); } catch (err: unknown) { @@ -297,7 +297,7 @@ const app = new Hono() tableCount += chunk.rowCount; } } finally { - db.disconnectSync(); + safeDisconnect(db); } return c.json({ ok: true as const, tableCount }); } catch (err: unknown) { diff --git a/apps/api/src/routes/data-browser.ts b/apps/api/src/routes/data-browser.ts index 2f73f2b..2704941 100644 --- a/apps/api/src/routes/data-browser.ts +++ b/apps/api/src/routes/data-browser.ts @@ -3,7 +3,7 @@ import { z } from "zod/v4"; import { zValidator } from "@hono/zod-validator"; import { connectDB } from "@archmax/core/infra/db"; import { Connection, Project } from "@archmax/core/models/index"; -import { getProjectInstance, withQueryTimeout } from "@archmax/core/services/duckdb"; +import { getProjectInstance, safeDisconnect, withQueryTimeout } from "@archmax/core/services/duckdb"; import { AppError } from "../utils/errors"; type ProjectInstance = Awaited>; @@ -70,7 +70,7 @@ async function collectRows(instance: ProjectInstance, sql: string): Promise<{ co } return { columns, rows }; } finally { - db.disconnectSync(); + safeDisconnect(db); } } @@ -195,7 +195,7 @@ const app = new Hono() return safeJson(c, { columns, rows, total, page, pageSize }); } finally { - db.disconnectSync(); + safeDisconnect(db); } }); diff --git a/entrypoint.sh b/entrypoint.sh index f12505b..963aade 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -82,6 +82,15 @@ if [ -n "$MISSING" ]; then exec sleep infinity fi +# DuckDB runs every query on a libuv worker thread. The default pool is only 4 +# threads, so a handful of slow or non-interruptible federated queries (e.g. a +# scanner blocked on a stalled upstream socket after a timeout) can occupy the +# entire pool and starve all other async I/O in the process — new chat jobs +# then hang in "Thinking" with nothing able to make progress. Give DuckDB and +# the rest of the app headroom so a few stuck queries cannot freeze everything. +# Operators can override by setting UV_THREADPOOL_SIZE in the environment. +export UV_THREADPOOL_SIZE="${UV_THREADPOOL_SIZE:-16}" + # Supervise a long-running process: restart it whenever it exits. # # The BullMQ worker runs DuckDB (and its mysql/postgres/mssql scanner diff --git a/packages/core/src/services/agent-tools.ts b/packages/core/src/services/agent-tools.ts index dbe1ebe..bc667b1 100644 --- a/packages/core/src/services/agent-tools.ts +++ b/packages/core/src/services/agent-tools.ts @@ -10,6 +10,7 @@ import { scopeSchemaName, stripScopedSchemaQualifier, redactConnectionSecrets, + safeDisconnect, withProjectQuerySlot, withQueryTimeout, withRecoverableProjectInstance, @@ -115,7 +116,10 @@ export function makeExecuteQueryTool(projectId: string) { truncated: rows.length >= MAX_ROWS, }); } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query that was + // still unwinding after a timeout/cancel interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } }), ); @@ -293,7 +297,10 @@ export function makeRunModelQueryTool(projectId: string) { truncated: rows.length >= MAX_ROWS, }); } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query that was + // still unwinding after a timeout/cancel interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } }); }, diff --git a/packages/core/src/services/duckdb-console.ts b/packages/core/src/services/duckdb-console.ts index 2819ace..26be4b3 100644 --- a/packages/core/src/services/duckdb-console.ts +++ b/packages/core/src/services/duckdb-console.ts @@ -8,6 +8,7 @@ import { getQueryTimeoutMs, isQueryCancelledError, redactConnectionSecrets, + safeDisconnect, withQueryTimeout, } from "./duckdb"; @@ -183,7 +184,9 @@ async function collectQueryRows( } return { columns, rows }; } finally { - db.disconnectSync(); + // `safeDisconnect` so a console query that timed out and is still unwinding + // after the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } diff --git a/packages/core/src/services/duckdb.test.ts b/packages/core/src/services/duckdb.test.ts index 1f47941..4335714 100644 --- a/packages/core/src/services/duckdb.test.ts +++ b/packages/core/src/services/duckdb.test.ts @@ -20,7 +20,9 @@ import { buildFirebirdAttachOptions, COMMUNITY_EXTENSIONS, getQueryTimeoutMs, + getQueryInterruptGraceMs, withQueryTimeout, + safeDisconnect, withProjectQuerySlot, getProjectInstance, disposeProjectInstance, @@ -1011,7 +1013,43 @@ describe("getQueryTimeoutMs", () => { }); }); +describe("getQueryInterruptGraceMs", () => { + const origEnv = process.env.QUERY_INTERRUPT_GRACE_MS; + afterEach(() => { + if (origEnv === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS; + else process.env.QUERY_INTERRUPT_GRACE_MS = origEnv; + }); + + it("returns default 30_000 when env is unset", () => { + delete process.env.QUERY_INTERRUPT_GRACE_MS; + expect(getQueryInterruptGraceMs()).toBe(30_000); + }); + + it("parses numeric env value", () => { + process.env.QUERY_INTERRUPT_GRACE_MS = "5000"; + expect(getQueryInterruptGraceMs()).toBe(5000); + }); + + it("falls back to default for non-positive or non-numeric values", () => { + process.env.QUERY_INTERRUPT_GRACE_MS = "0"; + expect(getQueryInterruptGraceMs()).toBe(30_000); + process.env.QUERY_INTERRUPT_GRACE_MS = "nope"; + expect(getQueryInterruptGraceMs()).toBe(30_000); + }); +}); + describe("withQueryTimeout", () => { + const origGrace = process.env.QUERY_INTERRUPT_GRACE_MS; + beforeEach(() => { + // Keep the post-interrupt settle wait short for tests that intentionally + // use a never-settling operation (the production default is 30s). + process.env.QUERY_INTERRUPT_GRACE_MS = "50"; + }); + afterEach(() => { + if (origGrace === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS; + else process.env.QUERY_INTERRUPT_GRACE_MS = origGrace; + }); + it("returns the result when the operation completes in time", async () => { const instance = await DuckDBInstance.create(); const db = await instance.connect(); @@ -1048,6 +1086,77 @@ describe("withQueryTimeout", () => { } }); + it("waits for an interrupted op to unwind before rejecting", async () => { + // The timeout fires at 20ms, but the operation only settles at ~80ms. + // withQueryTimeout must not reject (and thus let the caller disconnect) + // until the native operation has actually unwound. + process.env.QUERY_INTERRUPT_GRACE_MS = "1000"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settle: () => void = () => {}; + const op = new Promise((_resolve, reject) => { + settle = () => reject(new Error("interrupted")); + }); + try { + const start = Date.now(); + const pending = withQueryTimeout(db, () => op, 20); + setTimeout(() => settle(), 80); + await expect(pending).rejects.toThrow(/timed out after 0\.02s/i); + expect(Date.now() - start).toBeGreaterThanOrEqual(70); + // Op settled within the grace, so nothing is left pending: a plain + // disconnect is safe (safeDisconnect runs synchronously). + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + safeDisconnect(db); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + disconnectSpy.mockRestore(); + } finally { + db.disconnectSync(); + } + }); + + it("defers safeDisconnect until an op that ignores the interrupt settles", async () => { + // timeout=20ms, grace=30ms, op never settles within the grace → the + // connection must NOT be disconnected while the native op is still live. + process.env.QUERY_INTERRUPT_GRACE_MS = "30"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settle: () => void = () => {}; + const op = new Promise((_resolve, reject) => { + settle = () => reject(new Error("late")); + }); + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + try { + await expect(withQueryTimeout(db, () => op, 20)).rejects.toThrow(/timed out/i); + // Op still in flight past the grace → disconnect is deferred. + safeDisconnect(db); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Once the op settles, the deferred disconnect runs. + settle(); + await op.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + } finally { + disconnectSpy.mockRestore(); + db.disconnectSync(); + } + }); + + it("rejects immediately for an already-aborted signal without starting the op", async () => { + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + const controller = new AbortController(); + controller.abort(); + const operation = vi.fn(() => db.run("SELECT 1")); + try { + await expect( + withQueryTimeout(db, operation, 5_000, controller.signal), + ).rejects.toMatchObject({ name: "QueryCancelledError" }); + expect(operation).not.toHaveBeenCalled(); + } finally { + db.disconnectSync(); + } + }); + it("clears the timer when operation succeeds before deadline", async () => { const clearSpy = vi.spyOn(globalThis, "clearTimeout"); const instance = await DuckDBInstance.create(); diff --git a/packages/core/src/services/duckdb.ts b/packages/core/src/services/duckdb.ts index f79cf7f..36d1813 100644 --- a/packages/core/src/services/duckdb.ts +++ b/packages/core/src/services/duckdb.ts @@ -60,6 +60,33 @@ export function getQueryTimeoutMs(): number { return configured > 0 ? configured : 30_000; } +/** + * After a query times out (or is cancelled) we call `connection.interrupt()` + * and then wait for the in-flight native operation to actually unwind before + * letting the caller disconnect the connection. This is the upper bound on + * that wait: a well-behaved query reacts to `interrupt()` within milliseconds, + * but a federated scanner blocked on a slow upstream socket may take longer. + * If the operation has not settled within this grace, we proceed anyway (and + * `safeDisconnect` defers the actual `disconnectSync` until it finally does), + * so a genuinely non-interruptible query can never wedge the run forever. + */ +export function getQueryInterruptGraceMs(): number { + const configured = Number(process.env.QUERY_INTERRUPT_GRACE_MS); + return configured > 0 ? configured : 30_000; +} + +/** + * Tracks, per connection, a native DuckDB operation that was still unwinding + * when its `withQueryTimeout` rejected (timeout/cancel) and did not settle + * within the interrupt grace. The value resolves (never rejects) once the + * operation finally settles. `safeDisconnect` reads this so it can defer + * `disconnectSync` — closing a connection while a native query is still + * running on a libuv worker thread can trip a native assertion that calls + * `abort()` and takes the whole process down. A `WeakMap` lets entries be + * collected with their connection if a caller forgets to disconnect. + */ +const pendingNativeOps = new WeakMap>(); + /** * Raised when an in-flight DuckDB operation is aborted via an `AbortSignal` * (e.g. the user pressed "stop" on an agent run). Distinct from the @@ -90,14 +117,27 @@ export function isQueryCancelledError(err: unknown): boolean { * Run an async operation against a DuckDB connection with a hard timeout and * optional cooperative cancellation. * - * On timeout, `connection.interrupt()` is called to cancel the in-flight - * query inside DuckDB, then the promise rejects with a timeout error. + * Crucially, when the timeout or an abort `signal` wins the race, this does NOT + * return while the native query is still executing. `Promise.race` settling + * does not cancel the losing promise: the underlying `prepared.run()` keeps + * running on a libuv worker thread. If the caller were to `disconnectSync()` + * the connection at that point (its `finally` block), it would close the + * connection out from under a live native operation — a textbook trigger for a + * native assertion that calls `abort()` and kills the whole worker process — + * and the orphaned query would keep consuming a thread and an upstream + * connection, eventually starving the libuv pool so unrelated jobs hang. + * + * Instead we: + * 1. call `connection.interrupt()` to ask DuckDB to cancel the query, + * 2. wait for the operation to actually settle (bounded by + * `getQueryInterruptGraceMs()`), then + * 3. reject with the timeout / cancellation error. * - * When an aborted `signal` is supplied, `connection.interrupt()` is likewise - * called immediately so a long-running query stops promptly (rather than - * blocking the agent run for up to the full timeout), and the promise rejects - * with a `QueryCancelledError`. The timer and abort listener are always - * cleaned up regardless of outcome. + * If the operation has not settled within the grace (a non-interruptible + * federated scan), we still reject, but register the still-pending operation + * so `safeDisconnect` defers the connection teardown until it finishes rather + * than disconnecting under a live query. The timer and abort listener are + * always cleaned up regardless of outcome. */ export async function withQueryTimeout( connection: DuckDBConnection, @@ -105,27 +145,39 @@ export async function withQueryTimeout( timeoutMs: number = getQueryTimeoutMs(), signal?: AbortSignal, ): Promise { + // Don't even start a query for an already-cancelled run. + if (signal?.aborted) { + throw new QueryCancelledError(); + } + let timer: ReturnType | undefined; let onAbort: (() => void) | undefined; + let lostRace = false; + + // Start the operation once and keep a reference so we can wait for it to + // unwind after interrupting, instead of orphaning it on the race. + const opPromise = operation(); + // A handle that resolves (never rejects) when the native op has settled, so + // we can await it without re-raising its error. Attaching here also marks + // any later rejection as handled (the race only attaches once it settles). + const opSettled = opPromise.then( + () => {}, + () => {}, + ); const timeoutPromise = new Promise((_resolve, reject) => { timer = setTimeout(() => { - try { connection.interrupt(); } catch { /* best-effort */ } + lostRace = true; reject(new Error(`Query timed out after ${timeoutMs / 1000}s`)); }, timeoutMs); }); - const racers: Array> = [operation(), timeoutPromise]; + const racers: Array> = [opPromise, timeoutPromise]; if (signal) { const abortPromise = new Promise((_resolve, reject) => { - if (signal.aborted) { - try { connection.interrupt(); } catch { /* best-effort */ } - reject(new QueryCancelledError()); - return; - } onAbort = () => { - try { connection.interrupt(); } catch { /* best-effort */ } + lostRace = true; reject(new QueryCancelledError()); }; signal.addEventListener("abort", onAbort, { once: true }); @@ -134,13 +186,69 @@ export async function withQueryTimeout( } try { - return await Promise.race(racers); + const result = await Promise.race(racers); + // The operation won the race, so it has already settled. + return result; + } catch (err) { + if (!lostRace) { + // The operation itself rejected (a genuine query error); it has settled. + throw err; + } + // A timeout or abort won. Interrupt the in-flight query and wait for it to + // actually unwind so the caller can safely tear the connection down. + try { connection.interrupt(); } catch { /* best-effort */ } + const settled = await raceSettleWithin(opSettled, getQueryInterruptGraceMs()); + if (!settled) { + // The query ignored the interrupt within the grace window. Hand the + // still-pending settle handle to `safeDisconnect` so it defers the + // teardown rather than disconnecting under a live native operation. + pendingNativeOps.set(connection, opSettled); + } + throw err; } finally { clearTimeout(timer); if (signal && onAbort) signal.removeEventListener("abort", onAbort); } } +/** + * Resolve `true` if `settled` resolves within `graceMs`, otherwise `false`. + * The grace timer is cleared as soon as the operation settles so it never + * keeps the event loop alive. + */ +function raceSettleWithin(settled: Promise, graceMs: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(() => resolve(false), graceMs); + void settled.then(() => { + clearTimeout(timer); + resolve(true); + }); + }); +} + +/** + * Disconnect a DuckDB connection without ever closing it out from under a live + * native operation. In the normal case (query finished, or it unwound after an + * interrupt within the grace) this disconnects synchronously. If a timed-out / + * cancelled query was still unwinding past the grace, `withQueryTimeout` + * recorded its settle handle; we defer the actual `disconnectSync` until that + * resolves so we never trip the native abort path. + * + * Always prefer this over `connection.disconnectSync()` for connections that + * ran queries through `withQueryTimeout`. + */ +export function safeDisconnect(connection: DuckDBConnection): void { + const pending = pendingNativeOps.get(connection); + if (!pending) { + try { connection.disconnectSync(); } catch { /* best-effort */ } + return; + } + pendingNativeOps.delete(connection); + void pending.finally(() => { + try { connection.disconnectSync(); } catch { /* best-effort */ } + }); +} + // ── Per-project query concurrency limiter ──────────────────────────── function getMaxConcurrentQueries(): number { diff --git a/packages/core/src/services/mcp-tools.ts b/packages/core/src/services/mcp-tools.ts index f27b448..b501d2e 100644 --- a/packages/core/src/services/mcp-tools.ts +++ b/packages/core/src/services/mcp-tools.ts @@ -10,6 +10,7 @@ import { redactConnectionSecrets, getAttachedCatalogSlugs, hardenConnection, + safeDisconnect, withQueryTimeout, withProjectQuerySlot, withRecoverableProjectInstance, @@ -353,7 +354,10 @@ export async function executeScopedQuery( return { text: payload, columns, rows, rowCount: rows.length, truncated: rows.length >= MAX_ROWS }; } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query still + // unwinding after a timeout/cancel interrupt is not closed out + // from under a live native operation. + safeDisconnect(db); } }); }, diff --git a/packages/core/src/services/sql-ast-validation.ts b/packages/core/src/services/sql-ast-validation.ts index 0108b95..ae32c23 100644 --- a/packages/core/src/services/sql-ast-validation.ts +++ b/packages/core/src/services/sql-ast-validation.ts @@ -1,5 +1,5 @@ import { DuckDBInstance } from "@duckdb/node-api"; -import { withQueryTimeout } from "./duckdb"; +import { safeDisconnect, withQueryTimeout } from "./duckdb"; /** * Structural SQL validator built on DuckDB's own parser via @@ -121,7 +121,9 @@ async function serializeSqlToAst(sql: string): Promise { } return JSON.parse(firstColumn) as SerializeSqlPayload; } finally { - db.disconnectSync(); + // `safeDisconnect` so a parse that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } From 7bf46922acd8b235c74ae9cf9816328aefcf73f4 Mon Sep 17 00:00:00 2001 From: Tobias Grosse-Puppendahl Date: Sat, 6 Jun 2026 07:51:07 +0200 Subject: [PATCH 2/3] fix(duckdb): use safeDisconnect on internal attach/materialize timeout paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bugbot flagged three in-file callers that ran through withQueryTimeout but still called db.disconnectSync() directly in finally: attachConnection, attachIcebergCatalog, and materialiseModelViewsLocked. If one of those ATTACH / CREATE SCHEMA / CREATE OR REPLACE VIEW operations timed out and did not settle within the interrupt grace, the connection was closed out from under a live native operation — reintroducing the crash/orphan behavior this PR prevents on the MCP execute_query setup/materialization path. Route them through safeDisconnect so the deferred disconnect is honored everywhere. Co-authored-by: Cursor --- packages/core/src/services/duckdb.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/core/src/services/duckdb.ts b/packages/core/src/services/duckdb.ts index 36d1813..1d4eea1 100644 --- a/packages/core/src/services/duckdb.ts +++ b/packages/core/src/services/duckdb.ts @@ -966,7 +966,9 @@ async function attachConnection(entry: ProjectDuckDB, conn: IConnectionDocument) ); entry.attachedSlugs.add(conn.slug); } finally { - db.disconnectSync(); + // `safeDisconnect` so an ATTACH that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } @@ -1008,7 +1010,9 @@ async function attachIcebergCatalog(entry: ProjectDuckDB, conn: IConnectionDocum ); entry.attachedSlugs.add(conn.slug); } finally { - db.disconnectSync(); + // `safeDisconnect` so an ATTACH that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } @@ -1461,7 +1465,10 @@ async function materialiseModelViewsLocked( } } } finally { - db.disconnectSync(); + // `safeDisconnect` so a CREATE SCHEMA / CREATE OR REPLACE VIEW that timed + // out or was cancelled and is still unwinding after the interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } return result; From 0d42b2937227fab04e028ea8e056596ad33c68c2 Mon Sep 17 00:00:00 2001 From: Tobias Grosse-Puppendahl Date: Sat, 6 Jun 2026 11:39:05 +0200 Subject: [PATCH 3/3] fix(duckdb): merge pending native ops per connection so safeDisconnect waits for all pendingNativeOps tracked at most one settle handle per connection, but a single connection runs several withQueryTimeout calls in sequence (materialisation pass, data-browser exists/count/data). A second timeout overwrote the first's handle, so safeDisconnect could close the connection while the first orphaned query was still live. Merge the new handle with any prior one (Promise.all) so the deferred disconnect waits for every still-running query to unwind. Co-authored-by: Cursor --- packages/core/src/services/duckdb.test.ts | 38 +++++++++++++++++++++++ packages/core/src/services/duckdb.ts | 12 ++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/packages/core/src/services/duckdb.test.ts b/packages/core/src/services/duckdb.test.ts index 4335714..5d4fb8a 100644 --- a/packages/core/src/services/duckdb.test.ts +++ b/packages/core/src/services/duckdb.test.ts @@ -1141,6 +1141,44 @@ describe("withQueryTimeout", () => { } }); + it("defers safeDisconnect until ALL non-settling ops on one connection settle", async () => { + // A single connection runs several queries in sequence (e.g. the + // materialisation pass). If two of them time out without unwinding, the + // later one must not mask the earlier still-live query — safeDisconnect + // must wait for BOTH before closing the connection. + process.env.QUERY_INTERRUPT_GRACE_MS = "20"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settleFirst: () => void = () => {}; + let settleSecond: () => void = () => {}; + const first = new Promise((_resolve, reject) => { + settleFirst = () => reject(new Error("late-1")); + }); + const second = new Promise((_resolve, reject) => { + settleSecond = () => reject(new Error("late-2")); + }); + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + try { + await expect(withQueryTimeout(db, () => first, 10)).rejects.toThrow(/timed out/i); + await expect(withQueryTimeout(db, () => second, 10)).rejects.toThrow(/timed out/i); + safeDisconnect(db); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Settling only the second op must NOT disconnect — the first is live. + settleSecond(); + await second.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Once the first op also settles, the deferred disconnect finally runs. + settleFirst(); + await first.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + } finally { + disconnectSpy.mockRestore(); + db.disconnectSync(); + } + }); + it("rejects immediately for an already-aborted signal without starting the op", async () => { const instance = await DuckDBInstance.create(); const db = await instance.connect(); diff --git a/packages/core/src/services/duckdb.ts b/packages/core/src/services/duckdb.ts index 1d4eea1..d632ff8 100644 --- a/packages/core/src/services/duckdb.ts +++ b/packages/core/src/services/duckdb.ts @@ -202,7 +202,17 @@ export async function withQueryTimeout( // The query ignored the interrupt within the grace window. Hand the // still-pending settle handle to `safeDisconnect` so it defers the // teardown rather than disconnecting under a live native operation. - pendingNativeOps.set(connection, opSettled); + // A single connection can run multiple queries in sequence (e.g. the + // materialisation pass issues many `CREATE OR REPLACE VIEW`s, the data + // browser runs exists/count/data), so MERGE with any handle already + // recorded for this connection instead of overwriting it — otherwise a + // later timeout would mask an earlier still-live query and let + // `safeDisconnect` close the connection underneath it. + const prior = pendingNativeOps.get(connection); + pendingNativeOps.set( + connection, + prior ? Promise.all([prior, opSettled]).then(() => {}) : opSettled, + ); } throw err; } finally {