Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ AGENT_API_KEY=your-api-key
# On timeout, the DuckDB query is cancelled via interrupt().
# QUERY_TIMEOUT_MS=30000

# How long (ms) to wait for an interrupted query to actually unwind after a
# timeout/cancellation before giving up (default: 30000). The connection is
# only torn down once the native query has stopped, so it is never closed out
# from under a live operation (which can crash the worker process). A query
# that ignores the interrupt past this window has its teardown deferred rather
# than blocking the run forever.
# QUERY_INTERRUPT_GRACE_MS=30000

# Max concurrent DuckDB queries per project (default: 10).
# Limits parallel queries to prevent resource exhaustion.
# Additional requests wait up to QUERY_TIMEOUT_MS for a slot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({
getProjectInstance: vi.fn(),
testSingleConnection: mocks.testSingleConnection,
withQueryTimeout: vi.fn(async (_db: unknown, op: () => Promise<unknown>) => op()),
safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()),
}));

import { createTestApp, jsonBody } from "../test-utils/api-client";
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/routes/connections-reinit.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ vi.mock("@archmax/core/services/duckdb", () => ({
getProjectInstance: mocks.getProjectInstance,
testSingleConnection: mocks.testSingleConnection,
withQueryTimeout: mocks.withQueryTimeout,
safeDisconnect: vi.fn((db: { disconnectSync?: () => void }) => db.disconnectSync?.()),
}));

import { createTestApp, jsonBody } from "../test-utils/api-client";
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/routes/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { zValidator } from "@hono/zod-validator";
import { z } from "zod/v4";
import { connectDB } from "@archmax/core/infra/db";
import { Connection, CONNECTION_TYPES, SLUG_PATTERN, slugifyConnectionName, Project, type IConnectionDocument } from "@archmax/core/models/index";
import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb";
import { deleteProjectDuckdbFile, disposeProjectInstance, getProjectInstance, safeDisconnect, testSingleConnection, withQueryTimeout } from "@archmax/core/services/duckdb";
import { encryptConnectionCredentials, decryptConnectionCredentials } from "@archmax/core/infra/crypto";
import { customFirebirdEnabled, getEnv } from "@archmax/core/config/env";
import { AppError } from "../utils/errors";
Expand Down Expand Up @@ -251,7 +251,7 @@ const app = new Hono()
try {
await withQueryTimeout(db, () => db.run("SELECT 1"), CONNECTION_TEST_TIMEOUT_MS);
} finally {
db.disconnectSync();
safeDisconnect(db);
}
return c.json({ ok: true });
} catch (err: unknown) {
Expand Down Expand Up @@ -297,7 +297,7 @@ const app = new Hono()
tableCount += chunk.rowCount;
}
} finally {
db.disconnectSync();
safeDisconnect(db);
}
return c.json({ ok: true as const, tableCount });
} catch (err: unknown) {
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/routes/data-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from "zod/v4";
import { zValidator } from "@hono/zod-validator";
import { connectDB } from "@archmax/core/infra/db";
import { Connection, Project } from "@archmax/core/models/index";
import { getProjectInstance, withQueryTimeout } from "@archmax/core/services/duckdb";
import { getProjectInstance, safeDisconnect, withQueryTimeout } from "@archmax/core/services/duckdb";
import { AppError } from "../utils/errors";

type ProjectInstance = Awaited<ReturnType<typeof getProjectInstance>>;
Expand Down Expand Up @@ -70,7 +70,7 @@ async function collectRows(instance: ProjectInstance, sql: string): Promise<{ co
}
return { columns, rows };
} finally {
db.disconnectSync();
safeDisconnect(db);
}
}

Expand Down Expand Up @@ -195,7 +195,7 @@ const app = new Hono()

return safeJson(c, { columns, rows, total, page, pageSize });
} finally {
db.disconnectSync();
safeDisconnect(db);
}
});

Expand Down
9 changes: 9 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ if [ -n "$MISSING" ]; then
exec sleep infinity
fi

# DuckDB runs every query on a libuv worker thread. The default pool is only 4
# threads, so a handful of slow or non-interruptible federated queries (e.g. a
# scanner blocked on a stalled upstream socket after a timeout) can occupy the
# entire pool and starve all other async I/O in the process — new chat jobs
# then hang in "Thinking" with nothing able to make progress. Give DuckDB and
# the rest of the app headroom so a few stuck queries cannot freeze everything.
# Operators can override by setting UV_THREADPOOL_SIZE in the environment.
export UV_THREADPOOL_SIZE="${UV_THREADPOOL_SIZE:-16}"

# Supervise a long-running process: restart it whenever it exits.
#
# The BullMQ worker runs DuckDB (and its mysql/postgres/mssql scanner
Expand Down
11 changes: 9 additions & 2 deletions packages/core/src/services/agent-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
scopeSchemaName,
stripScopedSchemaQualifier,
redactConnectionSecrets,
safeDisconnect,
withProjectQuerySlot,
withQueryTimeout,
withRecoverableProjectInstance,
Expand Down Expand Up @@ -115,7 +116,10 @@ export function makeExecuteQueryTool(projectId: string) {
truncated: rows.length >= MAX_ROWS,
});
} finally {
db.disconnectSync();
// `safeDisconnect` (not `disconnectSync`) so a query that was
// still unwinding after a timeout/cancel interrupt is not
// closed out from under a live native operation.
safeDisconnect(db);
}
}),
);
Expand Down Expand Up @@ -293,7 +297,10 @@ export function makeRunModelQueryTool(projectId: string) {
truncated: rows.length >= MAX_ROWS,
});
} finally {
db.disconnectSync();
// `safeDisconnect` (not `disconnectSync`) so a query that was
// still unwinding after a timeout/cancel interrupt is not
// closed out from under a live native operation.
safeDisconnect(db);
}
});
},
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/services/duckdb-console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
getQueryTimeoutMs,
isQueryCancelledError,
redactConnectionSecrets,
safeDisconnect,
withQueryTimeout,
} from "./duckdb";

Expand Down Expand Up @@ -183,7 +184,9 @@ async function collectQueryRows(
}
return { columns, rows };
} finally {
db.disconnectSync();
// `safeDisconnect` so a console query that timed out and is still unwinding
// after the interrupt is not closed out from under a live native operation.
safeDisconnect(db);
}
}

Expand Down
147 changes: 147 additions & 0 deletions packages/core/src/services/duckdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import {
buildFirebirdAttachOptions,
COMMUNITY_EXTENSIONS,
getQueryTimeoutMs,
getQueryInterruptGraceMs,
withQueryTimeout,
safeDisconnect,
withProjectQuerySlot,
getProjectInstance,
disposeProjectInstance,
Expand Down Expand Up @@ -1011,7 +1013,43 @@ describe("getQueryTimeoutMs", () => {
});
});

describe("getQueryInterruptGraceMs", () => {
const origEnv = process.env.QUERY_INTERRUPT_GRACE_MS;
afterEach(() => {
if (origEnv === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS;
else process.env.QUERY_INTERRUPT_GRACE_MS = origEnv;
});

it("returns default 30_000 when env is unset", () => {
delete process.env.QUERY_INTERRUPT_GRACE_MS;
expect(getQueryInterruptGraceMs()).toBe(30_000);
});

it("parses numeric env value", () => {
process.env.QUERY_INTERRUPT_GRACE_MS = "5000";
expect(getQueryInterruptGraceMs()).toBe(5000);
});

it("falls back to default for non-positive or non-numeric values", () => {
process.env.QUERY_INTERRUPT_GRACE_MS = "0";
expect(getQueryInterruptGraceMs()).toBe(30_000);
process.env.QUERY_INTERRUPT_GRACE_MS = "nope";
expect(getQueryInterruptGraceMs()).toBe(30_000);
});
});

describe("withQueryTimeout", () => {
const origGrace = process.env.QUERY_INTERRUPT_GRACE_MS;
beforeEach(() => {
// Keep the post-interrupt settle wait short for tests that intentionally
// use a never-settling operation (the production default is 30s).
process.env.QUERY_INTERRUPT_GRACE_MS = "50";
});
afterEach(() => {
if (origGrace === undefined) delete process.env.QUERY_INTERRUPT_GRACE_MS;
else process.env.QUERY_INTERRUPT_GRACE_MS = origGrace;
});

it("returns the result when the operation completes in time", async () => {
const instance = await DuckDBInstance.create();
const db = await instance.connect();
Expand Down Expand Up @@ -1048,6 +1086,115 @@ describe("withQueryTimeout", () => {
}
});

it("waits for an interrupted op to unwind before rejecting", async () => {
// The timeout fires at 20ms, but the operation only settles at ~80ms.
// withQueryTimeout must not reject (and thus let the caller disconnect)
// until the native operation has actually unwound.
process.env.QUERY_INTERRUPT_GRACE_MS = "1000";
const instance = await DuckDBInstance.create();
const db = await instance.connect();
let settle: () => void = () => {};
const op = new Promise<never>((_resolve, reject) => {
settle = () => reject(new Error("interrupted"));
});
try {
const start = Date.now();
const pending = withQueryTimeout(db, () => op, 20);
setTimeout(() => settle(), 80);
await expect(pending).rejects.toThrow(/timed out after 0\.02s/i);
expect(Date.now() - start).toBeGreaterThanOrEqual(70);
// Op settled within the grace, so nothing is left pending: a plain
// disconnect is safe (safeDisconnect runs synchronously).
const disconnectSpy = vi.spyOn(db, "disconnectSync");
safeDisconnect(db);
expect(disconnectSpy).toHaveBeenCalledTimes(1);
disconnectSpy.mockRestore();
} finally {
db.disconnectSync();
}
});

it("defers safeDisconnect until an op that ignores the interrupt settles", async () => {
// timeout=20ms, grace=30ms, op never settles within the grace → the
// connection must NOT be disconnected while the native op is still live.
process.env.QUERY_INTERRUPT_GRACE_MS = "30";
const instance = await DuckDBInstance.create();
const db = await instance.connect();
let settle: () => void = () => {};
const op = new Promise<never>((_resolve, reject) => {
settle = () => reject(new Error("late"));
});
const disconnectSpy = vi.spyOn(db, "disconnectSync");
try {
await expect(withQueryTimeout(db, () => op, 20)).rejects.toThrow(/timed out/i);
// Op still in flight past the grace → disconnect is deferred.
safeDisconnect(db);
expect(disconnectSpy).not.toHaveBeenCalled();
// Once the op settles, the deferred disconnect runs.
settle();
await op.catch(() => {});
await new Promise((r) => setTimeout(r, 0));
expect(disconnectSpy).toHaveBeenCalledTimes(1);
} finally {
disconnectSpy.mockRestore();
db.disconnectSync();
}
});

it("defers safeDisconnect until ALL non-settling ops on one connection settle", async () => {
// A single connection runs several queries in sequence (e.g. the
// materialisation pass). If two of them time out without unwinding, the
// later one must not mask the earlier still-live query — safeDisconnect
// must wait for BOTH before closing the connection.
process.env.QUERY_INTERRUPT_GRACE_MS = "20";
const instance = await DuckDBInstance.create();
const db = await instance.connect();
let settleFirst: () => void = () => {};
let settleSecond: () => void = () => {};
const first = new Promise<never>((_resolve, reject) => {
settleFirst = () => reject(new Error("late-1"));
});
const second = new Promise<never>((_resolve, reject) => {
settleSecond = () => reject(new Error("late-2"));
});
const disconnectSpy = vi.spyOn(db, "disconnectSync");
try {
await expect(withQueryTimeout(db, () => first, 10)).rejects.toThrow(/timed out/i);
await expect(withQueryTimeout(db, () => second, 10)).rejects.toThrow(/timed out/i);
safeDisconnect(db);
expect(disconnectSpy).not.toHaveBeenCalled();
// Settling only the second op must NOT disconnect — the first is live.
settleSecond();
await second.catch(() => {});
await new Promise((r) => setTimeout(r, 0));
expect(disconnectSpy).not.toHaveBeenCalled();
// Once the first op also settles, the deferred disconnect finally runs.
settleFirst();
await first.catch(() => {});
await new Promise((r) => setTimeout(r, 0));
expect(disconnectSpy).toHaveBeenCalledTimes(1);
} finally {
disconnectSpy.mockRestore();
db.disconnectSync();
}
});

it("rejects immediately for an already-aborted signal without starting the op", async () => {
const instance = await DuckDBInstance.create();
const db = await instance.connect();
const controller = new AbortController();
controller.abort();
const operation = vi.fn(() => db.run("SELECT 1"));
try {
await expect(
withQueryTimeout(db, operation, 5_000, controller.signal),
).rejects.toMatchObject({ name: "QueryCancelledError" });
expect(operation).not.toHaveBeenCalled();
} finally {
db.disconnectSync();
}
});

it("clears the timer when operation succeeds before deadline", async () => {
const clearSpy = vi.spyOn(globalThis, "clearTimeout");
const instance = await DuckDBInstance.create();
Expand Down
Loading
Loading