From 2fec3154e0dee3baf860a3283cd32ce568a85b06 Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 11:36:43 -0300 Subject: [PATCH 1/2] feat: cooperative abort in thread mode with configurable hard-kill grace Thread-mode jobs can now honor this.abortSignal too. On timeout/cancel the engine transfers a MessagePort to the worker and posts the abort (with its JobTimeout/JobCanceled reason); the worker turns that into the job's abortSignal. A new abortGracePeriodMs config controls how long to wait for the job to stop cooperatively before forcibly terminating the worker thread. abortGracePeriodMs defaults to 0, which preserves the current behavior: the worker is terminated immediately with no cooperative window and no port overhead. The port machinery is only set up when the grace period is positive. Inline is unaffected (no thread to terminate; grace does not apply). --- packages/core/src/job/abort-reason.ts | 4 ++ packages/engine/src/engine.ts | 14 +++++ .../src/shared-runner/runner-pool.test.ts | 56 +++++++++++++++++-- .../engine/src/shared-runner/runner-pool.ts | 49 +++++++++++++++- .../engine/src/shared-runner/runner.test.ts | 52 ++++++++++++++++- packages/engine/src/shared-runner/runner.ts | 46 +++++++++++++-- .../engine/src/test-jobs/abort-aware-job.js | 19 +++++++ 7 files changed, 226 insertions(+), 14 deletions(-) create mode 100644 packages/engine/src/test-jobs/abort-aware-job.js diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts index 14678339..97ffe8c9 100644 --- a/packages/core/src/job/abort-reason.ts +++ b/packages/core/src/job/abort-reason.ts @@ -10,9 +10,13 @@ export type AbortReason = JobTimeout | JobCanceled; * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`. */ export class JobTimeout extends Error { + /** The timeout, in milliseconds, that was exceeded. */ + readonly timeoutMs: number; + constructor(timeoutMs: number) { super(`Job timed out after ${timeoutMs}ms`); this.name = "JobTimeout"; + this.timeoutMs = timeoutMs; } } diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 4d9da318..c53e5904 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -67,6 +67,19 @@ export interface EngineConfig { * Defaults to `"thread"`. */ runner?: "thread" | "inline"; + /** + * Grace period, in milliseconds, between cooperatively aborting a running job (timeout or + * cancellation) and forcibly terminating its worker thread. + * + * Only applies to `runner: "thread"`. When greater than `0`, the abort is first delivered to the + * job via `this.abortSignal` so it can stop and clean up; if it has not finished after this many + * milliseconds, the worker thread is terminated. When `0` (default), the worker is terminated + * immediately with no cooperative window, preserving the previous behavior. Has no effect in + * `runner: "inline"` (there is no thread to terminate). + * + * Defaults to `0`. + */ + abortGracePeriodMs?: number; /** Minimum number of worker threads to use. Defaults to number of CPUs */ minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ @@ -190,6 +203,7 @@ export class Engine { gracefulShutdown: config?.gracefulShutdown ?? true, fork: config?.fork ?? true, runner: config?.runner ?? "thread", + abortGracePeriodMs: config?.abortGracePeriodMs ?? 0, minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000, diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 8c4bba95..d725f763 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -1,8 +1,9 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { JobData } from "@sidequest/core"; -import EventEmitter from "events"; +import { JobData, JobTimeout } from "@sidequest/core"; +import { MessagePort } from "node:worker_threads"; import { beforeEach, describe, expect, vi } from "vitest"; import { DummyJob } from "../test-jobs/dummy-job"; +import { AbortPortMessage } from "./runner"; import { RunnerPool } from "./runner-pool"; const piscinaMockInstance = { @@ -39,14 +40,57 @@ describe("RunnerPool", () => { pool = new RunnerPool(config); }); - sidequestTest("should call pool.run with job data", async ({ config }) => { - const emiter = new EventEmitter(); - const result = await pool.run(jobData, emiter); + sidequestTest("passes the abort signal straight to piscina when grace is 0", async ({ config }) => { + const signal = new AbortController().signal; + const result = await pool.run(jobData, signal); expect(result).toEqual({ type: "completed", result: "ok" }); - expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal: emiter }); + expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest( + "with a grace period, delivers the abort over a port and hard-kills after the grace", + async ({ config }) => { + vi.useFakeTimers(); + try { + const gracePool = new RunnerPool({ ...config, abortGracePeriodMs: 1000 }); + + let capturedPort: MessagePort | undefined; + let hardKillSignal: AbortSignal | undefined; + piscinaMockInstance.run.mockImplementationOnce( + (value: { abortPort: MessagePort }, opts: { signal: AbortSignal }) => { + capturedPort = value.abortPort; + hardKillSignal = opts.signal; + // Resolve only once piscina is asked to terminate (hard kill). + return new Promise((resolve) => + opts.signal.addEventListener("abort", () => resolve({ type: "completed", result: "killed" })), + ); + }, + ); + + const controller = new AbortController(); + const runPromise = gracePool.run(jobData, controller.signal); + + const message = new Promise((resolve) => + capturedPort!.once("message", (m: AbortPortMessage) => resolve(m)), + ); + + controller.abort(new JobTimeout(5000)); + + expect(await message).toEqual({ kind: "timeout", timeoutMs: 5000 }); + expect(hardKillSignal!.aborted).toBe(false); + + // Grace elapses -> piscina is asked to terminate the worker. + await vi.advanceTimersByTimeAsync(1000); + expect(hardKillSignal!.aborted).toBe(true); + + await runPromise; + } finally { + vi.useRealTimers(); + } + }, + ); + sidequestTest("should call pool.destroy", () => { pool.destroy(); expect(piscinaMockInstance.destroy).toHaveBeenCalled(); diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 2461b402..da034bd7 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,8 +1,10 @@ -import { JobData, JobResult, logger } from "@sidequest/core"; +import { JobData, JobResult, JobTimeout, logger } from "@sidequest/core"; +import { MessageChannel } from "node:worker_threads"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; import { JobRunner } from "./job-runner"; +import type { AbortPortMessage } from "./runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. @@ -29,13 +31,54 @@ export class RunnerPool implements JobRunner { /** * Runs a job in the worker pool. + * + * With `abortGracePeriodMs === 0` (default), an abort terminates the worker immediately. With a + * positive grace period, the abort is delivered to the job cooperatively over a transferred port + * (so it can stop via `this.abortSignal`), and the worker is only forcibly terminated if it has + * not finished within the grace period. + * * @param job The job data to run. - * @param signal Optional abort signal; when it aborts, piscina terminates the worker thread. + * @param signal Optional abort signal for cancellation/timeout. * @returns A promise resolving to the job result. */ run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); - return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + const grace = this.nonNullConfig.abortGracePeriodMs; + + if (!signal || grace <= 0) { + // Abort terminates the worker immediately. + return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + } + + // Deliver the abort cooperatively first, then hard-terminate after the grace period. + const channel = new MessageChannel(); + const hardKill = new AbortController(); + let graceTimer: ReturnType | undefined; + + const onAbort = () => { + const reason: unknown = signal.reason; + const message: AbortPortMessage = + reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; + channel.port1.postMessage(message); + graceTimer = setTimeout(() => hardKill.abort(), grace); + }; + + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + + return this.pool + .run( + { jobData: job, config: this.nonNullConfig, abortPort: channel.port2 }, + { transferList: [channel.port2], signal: hardKill.signal }, + ) + .finally(() => { + if (graceTimer) clearTimeout(graceTimer); + signal.removeEventListener("abort", onAbort); + channel.port1.close(); + }); } /** diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 317fcb63..6f42b117 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -1,9 +1,10 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { FailedResult, JobData } from "@sidequest/core"; +import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core"; import { existsSync, unlinkSync } from "node:fs"; import { unlink, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; import { vi } from "vitest"; +import { AbortAwareJob } from "../test-jobs/abort-aware-job"; import { DummyJob } from "../test-jobs/dummy-job"; import { DummyJobWithArgs } from "../test-jobs/dummy-job-with-args"; import { importSidequest } from "../utils/import"; @@ -80,6 +81,55 @@ describe("runner.ts", () => { injectSpy.mockRestore(); }); + sidequestTest("a job receives the abort signal and its reason and stops", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + const resultPromise = run({ jobData: abortJobData, config, inline: true, signal: controller.signal }); + controller.abort(new JobCanceled()); + + // The job resolves once it observes the abort (whether before it starts or while awaiting it). + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("JobCanceled"); + }); + + sidequestTest("a job sees an already-aborted signal before it starts", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + controller.abort(new JobTimeout(50)); + + const result = (await run({ + jobData: abortJobData, + config, + inline: true, + signal: controller.signal, + })) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("aborted before start: JobTimeout"); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 53a3a993..3ef3e6bf 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -1,10 +1,40 @@ -import { Job, JobClassType, JobData, JobResult, logger, resolveScriptPathForJob, toErrorData } from "@sidequest/core"; +import { + Job, + JobCanceled, + JobClassType, + JobData, + JobResult, + JobTimeout, + logger, + resolveScriptPathForJob, + toErrorData, +} from "@sidequest/core"; import { existsSync } from "node:fs"; import { fileURLToPath } from "node:url"; +import { MessagePort } from "node:worker_threads"; import { EngineConfig } from "../engine"; import { importSidequest } from "../utils"; import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader"; +/** Message posted by the engine over the abort port to cooperatively abort a worker-thread job. */ +export type AbortPortMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; + +/** + * Builds an {@link AbortSignal} for a worker-thread job from an abort port. + * + * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine + * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a + * local signal carrying the proper {@link JobTimeout}/{@link JobCanceled} reason. + */ +function signalFromAbortPort(port: MessagePort): AbortSignal { + const controller = new AbortController(); + port.on("message", (message: AbortPortMessage) => { + const reason = message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); + controller.abort(reason); + }); + return controller.signal; +} + /** * Runs a job by dynamically importing its script and executing the specified class. * @param jobData The job data containing script and class information @@ -16,11 +46,13 @@ export default async function run({ config, inline, signal, + abortPort, }: { jobData: JobData; config: EngineConfig; inline?: boolean; signal?: AbortSignal; + abortPort?: MessagePort; }): Promise { // In inline mode the job runs in the host process, where Sidequest is already configured, so // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. @@ -79,12 +111,18 @@ export default async function run({ const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); - if (signal) { - job.injectAbortSignal(signal); + // Inline passes a live signal directly; the thread runner gets an abort port it turns into one. + const abortSignal = signal ?? (abortPort ? signalFromAbortPort(abortPort) : undefined); + if (abortSignal) { + job.injectAbortSignal(abortSignal); } logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args); - return job.perform(...jobData.args); + try { + return await job.perform(...jobData.args); + } finally { + abortPort?.close(); + } } /** diff --git a/packages/engine/src/test-jobs/abort-aware-job.js b/packages/engine/src/test-jobs/abort-aware-job.js new file mode 100644 index 00000000..bc9704de --- /dev/null +++ b/packages/engine/src/test-jobs/abort-aware-job.js @@ -0,0 +1,19 @@ +import { Job } from "@sidequest/core"; + +/** + * A job that honors `this.abortSignal`: it waits until aborted and reports the abort reason. + * Used to verify cooperative cancellation/timeout end-to-end through the runner. + */ +export class AbortAwareJob extends Job { + async run() { + if (this.abortSignal.aborted) { + return this.fail(`aborted before start: ${this.abortSignal.reason?.name}`); + } + + await new Promise((resolve) => { + this.abortSignal.addEventListener("abort", resolve, { once: true }); + }); + + return this.fail(`aborted: ${this.abortSignal.reason?.name}`); + } +} From 2aa0ce952d65f54541cb47ba7a0506241fc57e08 Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 14:24:12 -0300 Subject: [PATCH 2/2] test: cover the worker-side abort port path Adds a test that drives run() through the abortPort branch (thread mode): a real MessageChannel delivers an abort message, signalFromAbortPort rebuilds the JobTimeout reason, and the job observes it via this.abortSignal. Closes the gap where only the engine-side port posting and the inline signal path were tested. --- .../engine/src/shared-runner/runner.test.ts | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 6f42b117..38378c8f 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -3,6 +3,7 @@ import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core" import { existsSync, unlinkSync } from "node:fs"; import { unlink, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; +import { MessageChannel } from "node:worker_threads"; import { vi } from "vitest"; import { AbortAwareJob } from "../test-jobs/abort-aware-job"; import { DummyJob } from "../test-jobs/dummy-job"; @@ -130,6 +131,31 @@ describe("runner.ts", () => { expect(result.error.message).toContain("aborted before start: JobTimeout"); }); + sidequestTest("a thread job is aborted via the abort port (rebuilds the reason)", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const channel = new MessageChannel(); + // Thread path: no live signal, the abort arrives over the transferred port. + const resultPromise = run({ jobData: abortJobData, config, abortPort: channel.port2 }); + channel.port1.postMessage({ kind: "timeout", timeoutMs: 1234 }); + + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + // The worker reconstructs the JobTimeout reason from the port message. + expect(result.error.message).toContain("JobTimeout"); + channel.port1.close(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult;