diff --git a/.env.example b/.env.example index 7586f83..9d2d385 100644 --- a/.env.example +++ b/.env.example @@ -51,6 +51,14 @@ AGENT_API_KEY=your-api-key # On timeout, the DuckDB query is cancelled via interrupt(). # QUERY_TIMEOUT_MS=30000 +# How long (ms) to wait for an interrupted query to actually unwind after a +# timeout/cancellation before giving up (default: 30000). The connection is +# only torn down once the native query has stopped, so it is never closed out +# from under a live operation (which can crash the worker process). A query +# that ignores the interrupt past this window has its teardown deferred rather +# than blocking the run forever. +# QUERY_INTERRUPT_GRACE_MS=30000 + # Max concurrent DuckDB queries per project (default: 10). # Limits parallel queries to prevent resource exhaustion. # Additional requests wait up to QUERY_TIMEOUT_MS for a slot. diff --git a/apps/api/src/routes/connections-firebird.integration.test.ts b/apps/api/src/routes/connections-firebird.integration.test.ts index 96c85ef..ca5bf1f 100644 --- a/apps/api/src/routes/connections-firebird.integration.test.ts +++ b/apps/api/src/routes/connections-firebird.integration.test.ts @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({ getProjectInstance: vi.fn(), testSingleConnection: mocks.testSingleConnection, withQueryTimeout: vi.fn(async (_db: unknown, op: () => Promise) => op()), + safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()), })); import { createTestApp, jsonBody } from "../test-utils/api-client"; diff --git a/apps/api/src/routes/connections-reinit.integration.test.ts b/apps/api/src/routes/connections-reinit.integration.test.ts index 885ba88..3ab2fbe 100644 --- a/apps/api/src/routes/connections-reinit.integration.test.ts +++ b/apps/api/src/routes/connections-reinit.integration.test.ts @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({ getProjectInstance: mocks.getProjectInstance, testSingleConnection: mocks.testSingleConnection, withQueryTimeout: mocks.withQueryTimeout, + safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()), })); import { createTestApp, jsonBody } from "../test-utils/api-client"; diff --git a/apps/api/src/routes/connections.ts b/apps/api/src/routes/connections.ts index d7de29d..2b9a45c 100644 --- a/apps/api/src/routes/connections.ts +++ b/apps/api/src/routes/connections.ts @@ -3,7 +3,7 @@ import { zValidator } from "@hono/zod-validator"; import { z } from "zod/v4"; import { connectDB } from "@archmax/core/infra/db"; import { Connection, CONNECTION_TYPES, SLUG_PATTERN, slugifyConnectionName, Project, type IConnectionDocument } from "@archmax/core/models/index"; -import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb"; +import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, safeDisconnect, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb"; import { encryptConnectionCredentials, decryptConnectionCredentials } from "@archmax/core/infra/crypto"; import { customFirebirdEnabled, getEnv } from "@archmax/core/config/env"; import { AppError } from "../utils/errors"; @@ -251,7 +251,7 @@ const app = new Hono() try { await withQueryTimeout(db, () => db.run("SELECT 1"), CONNECTION_TEST_TIMEOUT_MS); } finally { - db.disconnectSync(); + safeDisconnect(db); } return c.json({ ok: true }); } catch (err: unknown) { @@ -297,7 +297,7 @@ const app = new Hono() tableCount += chunk.rowCount; } } finally { - db.disconnectSync(); + safeDisconnect(db); } return c.json({ ok: true as const, tableCount }); } catch (err: unknown) { diff --git a/apps/api/src/routes/data-browser.ts b/apps/api/src/routes/data-browser.ts index 2f73f2b..2704941 100644 --- a/apps/api/src/routes/data-browser.ts +++ b/apps/api/src/routes/data-browser.ts @@ -3,7 +3,7 @@ import { z } from "zod/v4"; import { zValidator } from "@hono/zod-validator"; import { connectDB } from "@archmax/core/infra/db"; import { Connection, Project } from "@archmax/core/models/index"; -import { getProjectInstance, withQueryTimeout } from "@archmax/core/services/duckdb"; +import { getProjectInstance, safeDisconnect, withQueryTimeout } from "@archmax/core/services/duckdb"; import { AppError } from "../utils/errors"; type ProjectInstance = Awaited>; @@ -70,7 +70,7 @@ async function collectRows(instance: ProjectInstance, sql: string): Promise<{ co } return { columns, rows }; } finally { - db.disconnectSync(); + safeDisconnect(db); } } @@ -195,7 +195,7 @@ const app = new Hono() return safeJson(c, { columns, rows, total, page, pageSize }); } finally { - db.disconnectSync(); + safeDisconnect(db); } }); diff --git a/entrypoint.sh b/entrypoint.sh index f12505b..963aade 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -82,6 +82,15 @@ if [ -n "$MISSING" ]; then exec sleep infinity fi +# DuckDB runs every query on a libuv worker thread. The default pool is only 4 +# threads, so a handful of slow or non-interruptible federated queries (e.g. a +# scanner blocked on a stalled upstream socket after a timeout) can occupy the +# entire pool and starve all other async I/O in the process — new chat jobs +# then hang in "Thinking" with nothing able to make progress. Give DuckDB and +# the rest of the app headroom so a few stuck queries cannot freeze everything. +# Operators can override by setting UV_THREADPOOL_SIZE in the environment. +export UV_THREADPOOL_SIZE="${UV_THREADPOOL_SIZE:-16}" + # Supervise a long-running process: restart it whenever it exits. # # The BullMQ worker runs DuckDB (and its mysql/postgres/mssql scanner diff --git a/packages/core/src/services/agent-tools.ts b/packages/core/src/services/agent-tools.ts index dbe1ebe..bc667b1 100644 --- a/packages/core/src/services/agent-tools.ts +++ b/packages/core/src/services/agent-tools.ts @@ -10,6 +10,7 @@ import { scopeSchemaName, stripScopedSchemaQualifier, redactConnectionSecrets, + safeDisconnect, withProjectQuerySlot, withQueryTimeout, withRecoverableProjectInstance, @@ -115,7 +116,10 @@ export function makeExecuteQueryTool(projectId: string) { truncated: rows.length >= MAX_ROWS, }); } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query that was + // still unwinding after a timeout/cancel interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } }), ); @@ -293,7 +297,10 @@ export function makeRunModelQueryTool(projectId: string) { truncated: rows.length >= MAX_ROWS, }); } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query that was + // still unwinding after a timeout/cancel interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } }); }, diff --git a/packages/core/src/services/duckdb-console.ts b/packages/core/src/services/duckdb-console.ts index 2819ace..26be4b3 100644 --- a/packages/core/src/services/duckdb-console.ts +++ b/packages/core/src/services/duckdb-console.ts @@ -8,6 +8,7 @@ import { getQueryTimeoutMs, isQueryCancelledError, redactConnectionSecrets, + safeDisconnect, withQueryTimeout, } from "./duckdb"; @@ -183,7 +184,9 @@ async function collectQueryRows( } return { columns, rows }; } finally { - db.disconnectSync(); + // `safeDisconnect` so a console query that timed out and is still unwinding + // after the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } diff --git a/packages/core/src/services/duckdb.test.ts b/packages/core/src/services/duckdb.test.ts index 1f47941..5d4fb8a 100644 --- a/packages/core/src/services/duckdb.test.ts +++ b/packages/core/src/services/duckdb.test.ts @@ -20,7 +20,9 @@ import { buildFirebirdAttachOptions, COMMUNITY_EXTENSIONS, getQueryTimeoutMs, + getQueryInterruptGraceMs, withQueryTimeout, + safeDisconnect, withProjectQuerySlot, getProjectInstance, disposeProjectInstance, @@ -1011,7 +1013,43 @@ describe("getQueryTimeoutMs", () => { }); }); +describe("getQueryInterruptGraceMs", () => { + const origEnv = process.env.QUERY_INTERRUPT_GRACE_MS; + afterEach(() => { + if (origEnv === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS; + else process.env.QUERY_INTERRUPT_GRACE_MS = origEnv; + }); + + it("returns default 30_000 when env is unset", () => { + delete process.env.QUERY_INTERRUPT_GRACE_MS; + expect(getQueryInterruptGraceMs()).toBe(30_000); + }); + + it("parses numeric env value", () => { + process.env.QUERY_INTERRUPT_GRACE_MS = "5000"; + expect(getQueryInterruptGraceMs()).toBe(5000); + }); + + it("falls back to default for non-positive or non-numeric values", () => { + process.env.QUERY_INTERRUPT_GRACE_MS = "0"; + expect(getQueryInterruptGraceMs()).toBe(30_000); + process.env.QUERY_INTERRUPT_GRACE_MS = "nope"; + expect(getQueryInterruptGraceMs()).toBe(30_000); + }); +}); + describe("withQueryTimeout", () => { + const origGrace = process.env.QUERY_INTERRUPT_GRACE_MS; + beforeEach(() => { + // Keep the post-interrupt settle wait short for tests that intentionally + // use a never-settling operation (the production default is 30s). + process.env.QUERY_INTERRUPT_GRACE_MS = "50"; + }); + afterEach(() => { + if (origGrace === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS; + else process.env.QUERY_INTERRUPT_GRACE_MS = origGrace; + }); + it("returns the result when the operation completes in time", async () => { const instance = await DuckDBInstance.create(); const db = await instance.connect(); @@ -1048,6 +1086,115 @@ describe("withQueryTimeout", () => { } }); + it("waits for an interrupted op to unwind before rejecting", async () => { + // The timeout fires at 20ms, but the operation only settles at ~80ms. + // withQueryTimeout must not reject (and thus let the caller disconnect) + // until the native operation has actually unwound. + process.env.QUERY_INTERRUPT_GRACE_MS = "1000"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settle: () => void = () => {}; + const op = new Promise((_resolve, reject) => { + settle = () => reject(new Error("interrupted")); + }); + try { + const start = Date.now(); + const pending = withQueryTimeout(db, () => op, 20); + setTimeout(() => settle(), 80); + await expect(pending).rejects.toThrow(/timed out after 0\.02s/i); + expect(Date.now() - start).toBeGreaterThanOrEqual(70); + // Op settled within the grace, so nothing is left pending: a plain + // disconnect is safe (safeDisconnect runs synchronously). + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + safeDisconnect(db); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + disconnectSpy.mockRestore(); + } finally { + db.disconnectSync(); + } + }); + + it("defers safeDisconnect until an op that ignores the interrupt settles", async () => { + // timeout=20ms, grace=30ms, op never settles within the grace → the + // connection must NOT be disconnected while the native op is still live. + process.env.QUERY_INTERRUPT_GRACE_MS = "30"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settle: () => void = () => {}; + const op = new Promise((_resolve, reject) => { + settle = () => reject(new Error("late")); + }); + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + try { + await expect(withQueryTimeout(db, () => op, 20)).rejects.toThrow(/timed out/i); + // Op still in flight past the grace → disconnect is deferred. + safeDisconnect(db); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Once the op settles, the deferred disconnect runs. + settle(); + await op.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + } finally { + disconnectSpy.mockRestore(); + db.disconnectSync(); + } + }); + + it("defers safeDisconnect until ALL non-settling ops on one connection settle", async () => { + // A single connection runs several queries in sequence (e.g. the + // materialisation pass). If two of them time out without unwinding, the + // later one must not mask the earlier still-live query — safeDisconnect + // must wait for BOTH before closing the connection. + process.env.QUERY_INTERRUPT_GRACE_MS = "20"; + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + let settleFirst: () => void = () => {}; + let settleSecond: () => void = () => {}; + const first = new Promise((_resolve, reject) => { + settleFirst = () => reject(new Error("late-1")); + }); + const second = new Promise((_resolve, reject) => { + settleSecond = () => reject(new Error("late-2")); + }); + const disconnectSpy = vi.spyOn(db, "disconnectSync"); + try { + await expect(withQueryTimeout(db, () => first, 10)).rejects.toThrow(/timed out/i); + await expect(withQueryTimeout(db, () => second, 10)).rejects.toThrow(/timed out/i); + safeDisconnect(db); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Settling only the second op must NOT disconnect — the first is live. + settleSecond(); + await second.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).not.toHaveBeenCalled(); + // Once the first op also settles, the deferred disconnect finally runs. + settleFirst(); + await first.catch(() => {}); + await new Promise((r) => setTimeout(r, 0)); + expect(disconnectSpy).toHaveBeenCalledTimes(1); + } finally { + disconnectSpy.mockRestore(); + db.disconnectSync(); + } + }); + + it("rejects immediately for an already-aborted signal without starting the op", async () => { + const instance = await DuckDBInstance.create(); + const db = await instance.connect(); + const controller = new AbortController(); + controller.abort(); + const operation = vi.fn(() => db.run("SELECT 1")); + try { + await expect( + withQueryTimeout(db, operation, 5_000, controller.signal), + ).rejects.toMatchObject({ name: "QueryCancelledError" }); + expect(operation).not.toHaveBeenCalled(); + } finally { + db.disconnectSync(); + } + }); + it("clears the timer when operation succeeds before deadline", async () => { const clearSpy = vi.spyOn(globalThis, "clearTimeout"); const instance = await DuckDBInstance.create(); diff --git a/packages/core/src/services/duckdb.ts b/packages/core/src/services/duckdb.ts index f79cf7f..d632ff8 100644 --- a/packages/core/src/services/duckdb.ts +++ b/packages/core/src/services/duckdb.ts @@ -60,6 +60,33 @@ export function getQueryTimeoutMs(): number { return configured > 0 ? configured : 30_000; } +/** + * After a query times out (or is cancelled) we call `connection.interrupt()` + * and then wait for the in-flight native operation to actually unwind before + * letting the caller disconnect the connection. This is the upper bound on + * that wait: a well-behaved query reacts to `interrupt()` within milliseconds, + * but a federated scanner blocked on a slow upstream socket may take longer. + * If the operation has not settled within this grace, we proceed anyway (and + * `safeDisconnect` defers the actual `disconnectSync` until it finally does), + * so a genuinely non-interruptible query can never wedge the run forever. + */ +export function getQueryInterruptGraceMs(): number { + const configured = Number(process.env.QUERY_INTERRUPT_GRACE_MS); + return configured > 0 ? configured : 30_000; +} + +/** + * Tracks, per connection, a native DuckDB operation that was still unwinding + * when its `withQueryTimeout` rejected (timeout/cancel) and did not settle + * within the interrupt grace. The value resolves (never rejects) once the + * operation finally settles. `safeDisconnect` reads this so it can defer + * `disconnectSync` — closing a connection while a native query is still + * running on a libuv worker thread can trip a native assertion that calls + * `abort()` and takes the whole process down. A `WeakMap` lets entries be + * collected with their connection if a caller forgets to disconnect. + */ +const pendingNativeOps = new WeakMap>(); + /** * Raised when an in-flight DuckDB operation is aborted via an `AbortSignal` * (e.g. the user pressed "stop" on an agent run). Distinct from the @@ -90,14 +117,27 @@ export function isQueryCancelledError(err: unknown): boolean { * Run an async operation against a DuckDB connection with a hard timeout and * optional cooperative cancellation. * - * On timeout, `connection.interrupt()` is called to cancel the in-flight - * query inside DuckDB, then the promise rejects with a timeout error. + * Crucially, when the timeout or an abort `signal` wins the race, this does NOT + * return while the native query is still executing. `Promise.race` settling + * does not cancel the losing promise: the underlying `prepared.run()` keeps + * running on a libuv worker thread. If the caller were to `disconnectSync()` + * the connection at that point (its `finally` block), it would close the + * connection out from under a live native operation — a textbook trigger for a + * native assertion that calls `abort()` and kills the whole worker process — + * and the orphaned query would keep consuming a thread and an upstream + * connection, eventually starving the libuv pool so unrelated jobs hang. * - * When an aborted `signal` is supplied, `connection.interrupt()` is likewise - * called immediately so a long-running query stops promptly (rather than - * blocking the agent run for up to the full timeout), and the promise rejects - * with a `QueryCancelledError`. The timer and abort listener are always - * cleaned up regardless of outcome. + * Instead we: + * 1. call `connection.interrupt()` to ask DuckDB to cancel the query, + * 2. wait for the operation to actually settle (bounded by + * `getQueryInterruptGraceMs()`), then + * 3. reject with the timeout / cancellation error. + * + * If the operation has not settled within the grace (a non-interruptible + * federated scan), we still reject, but register the still-pending operation + * so `safeDisconnect` defers the connection teardown until it finishes rather + * than disconnecting under a live query. The timer and abort listener are + * always cleaned up regardless of outcome. */ export async function withQueryTimeout( connection: DuckDBConnection, @@ -105,27 +145,39 @@ export async function withQueryTimeout( timeoutMs: number = getQueryTimeoutMs(), signal?: AbortSignal, ): Promise { + // Don't even start a query for an already-cancelled run. + if (signal?.aborted) { + throw new QueryCancelledError(); + } + let timer: ReturnType | undefined; let onAbort: (() => void) | undefined; + let lostRace = false; + + // Start the operation once and keep a reference so we can wait for it to + // unwind after interrupting, instead of orphaning it on the race. + const opPromise = operation(); + // A handle that resolves (never rejects) when the native op has settled, so + // we can await it without re-raising its error. Attaching here also marks + // any later rejection as handled (the race only attaches once it settles). + const opSettled = opPromise.then( + () => {}, + () => {}, + ); const timeoutPromise = new Promise((_resolve, reject) => { timer = setTimeout(() => { - try { connection.interrupt(); } catch { /* best-effort */ } + lostRace = true; reject(new Error(`Query timed out after ${timeoutMs / 1000}s`)); }, timeoutMs); }); - const racers: Array> = [operation(), timeoutPromise]; + const racers: Array> = [opPromise, timeoutPromise]; if (signal) { const abortPromise = new Promise((_resolve, reject) => { - if (signal.aborted) { - try { connection.interrupt(); } catch { /* best-effort */ } - reject(new QueryCancelledError()); - return; - } onAbort = () => { - try { connection.interrupt(); } catch { /* best-effort */ } + lostRace = true; reject(new QueryCancelledError()); }; signal.addEventListener("abort", onAbort, { once: true }); @@ -134,13 +186,79 @@ export async function withQueryTimeout( } try { - return await Promise.race(racers); + const result = await Promise.race(racers); + // The operation won the race, so it has already settled. + return result; + } catch (err) { + if (!lostRace) { + // The operation itself rejected (a genuine query error); it has settled. + throw err; + } + // A timeout or abort won. Interrupt the in-flight query and wait for it to + // actually unwind so the caller can safely tear the connection down. + try { connection.interrupt(); } catch { /* best-effort */ } + const settled = await raceSettleWithin(opSettled, getQueryInterruptGraceMs()); + if (!settled) { + // The query ignored the interrupt within the grace window. Hand the + // still-pending settle handle to `safeDisconnect` so it defers the + // teardown rather than disconnecting under a live native operation. + // A single connection can run multiple queries in sequence (e.g. the + // materialisation pass issues many `CREATE OR REPLACE VIEW`s, the data + // browser runs exists/count/data), so MERGE with any handle already + // recorded for this connection instead of overwriting it — otherwise a + // later timeout would mask an earlier still-live query and let + // `safeDisconnect` close the connection underneath it. + const prior = pendingNativeOps.get(connection); + pendingNativeOps.set( + connection, + prior ? Promise.all([prior, opSettled]).then(() => {}) : opSettled, + ); + } + throw err; } finally { clearTimeout(timer); if (signal && onAbort) signal.removeEventListener("abort", onAbort); } } +/** + * Resolve `true` if `settled` resolves within `graceMs`, otherwise `false`. + * The grace timer is cleared as soon as the operation settles so it never + * keeps the event loop alive. + */ +function raceSettleWithin(settled: Promise, graceMs: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(() => resolve(false), graceMs); + void settled.then(() => { + clearTimeout(timer); + resolve(true); + }); + }); +} + +/** + * Disconnect a DuckDB connection without ever closing it out from under a live + * native operation. In the normal case (query finished, or it unwound after an + * interrupt within the grace) this disconnects synchronously. If a timed-out / + * cancelled query was still unwinding past the grace, `withQueryTimeout` + * recorded its settle handle; we defer the actual `disconnectSync` until that + * resolves so we never trip the native abort path. + * + * Always prefer this over `connection.disconnectSync()` for connections that + * ran queries through `withQueryTimeout`. + */ +export function safeDisconnect(connection: DuckDBConnection): void { + const pending = pendingNativeOps.get(connection); + if (!pending) { + try { connection.disconnectSync(); } catch { /* best-effort */ } + return; + } + pendingNativeOps.delete(connection); + void pending.finally(() => { + try { connection.disconnectSync(); } catch { /* best-effort */ } + }); +} + // ── Per-project query concurrency limiter ──────────────────────────── function getMaxConcurrentQueries(): number { @@ -858,7 +976,9 @@ async function attachConnection(entry: ProjectDuckDB, conn: IConnectionDocument) ); entry.attachedSlugs.add(conn.slug); } finally { - db.disconnectSync(); + // `safeDisconnect` so an ATTACH that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } @@ -900,7 +1020,9 @@ async function attachIcebergCatalog(entry: ProjectDuckDB, conn: IConnectionDocum ); entry.attachedSlugs.add(conn.slug); } finally { - db.disconnectSync(); + // `safeDisconnect` so an ATTACH that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } } @@ -1353,7 +1475,10 @@ async function materialiseModelViewsLocked( } } } finally { - db.disconnectSync(); + // `safeDisconnect` so a CREATE SCHEMA / CREATE OR REPLACE VIEW that timed + // out or was cancelled and is still unwinding after the interrupt is not + // closed out from under a live native operation. + safeDisconnect(db); } return result; diff --git a/packages/core/src/services/mcp-tools.ts b/packages/core/src/services/mcp-tools.ts index f27b448..b501d2e 100644 --- a/packages/core/src/services/mcp-tools.ts +++ b/packages/core/src/services/mcp-tools.ts @@ -10,6 +10,7 @@ import { redactConnectionSecrets, getAttachedCatalogSlugs, hardenConnection, + safeDisconnect, withQueryTimeout, withProjectQuerySlot, withRecoverableProjectInstance, @@ -353,7 +354,10 @@ export async function executeScopedQuery( return { text: payload, columns, rows, rowCount: rows.length, truncated: rows.length >= MAX_ROWS }; } finally { - db.disconnectSync(); + // `safeDisconnect` (not `disconnectSync`) so a query still + // unwinding after a timeout/cancel interrupt is not closed out + // from under a live native operation. + safeDisconnect(db); } }); }, diff --git a/packages/core/src/services/sql-ast-validation.ts b/packages/core/src/services/sql-ast-validation.ts index 0108b95..ae32c23 100644 --- a/packages/core/src/services/sql-ast-validation.ts +++ b/packages/core/src/services/sql-ast-validation.ts @@ -1,5 +1,5 @@ import { DuckDBInstance } from "@duckdb/node-api"; -import { withQueryTimeout } from "./duckdb"; +import { safeDisconnect, withQueryTimeout } from "./duckdb"; /** * Structural SQL validator built on DuckDB's own parser via @@ -121,7 +121,9 @@ async function serializeSqlToAst(sql: string): Promise { } return JSON.parse(firstColumn) as SerializeSqlPayload; } finally { - db.disconnectSync(); + // `safeDisconnect` so a parse that timed out and is still unwinding after + // the interrupt is not closed out from under a live native operation. + safeDisconnect(db); } }