From 856756b9f8cf4ed55e767f224f8911d694d9a95a Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 11:02:56 -0300 Subject: [PATCH 1/3] feat: deliver timeout/cancel to jobs via this.abortSignal Adds an AbortSignal to the Job base class, available inside run() as `this.abortSignal`. The engine aborts it on timeout (reason: JobTimeout) and on cancellation (reason: JobCanceled), so jobs can stop cooperatively (`fetch(url, { signal: this.abortSignal })`, `throwIfAborted()`, etc.). In inline mode this is the only way a running job can be stopped, so timeout and cancel now work there cooperatively (superseding the previous approach of disabling cancel inline). On cancel the in-memory job is marked canceled so the terminal transition is skipped by shouldRun and does not overwrite the state. The executor now uses an AbortController instead of an EventEmitter and detects aborts via signal.aborted rather than matching the rejection message. Thread mode is unchanged: piscina still terminates the worker on abort. --- packages/core/src/job/abort-reason.ts | 27 +++++++++++++++ packages/core/src/job/index.ts | 1 + packages/core/src/job/job.test.ts | 15 ++++++++ packages/core/src/job/job.ts | 20 +++++++++++ packages/engine/src/engine.ts | 9 ++--- .../src/execution/executor-manager.test.ts | 19 +++++------ .../engine/src/execution/executor-manager.ts | 34 ++++++++++++++----- .../src/shared-runner/inline-runner.test.ts | 7 ++-- .../engine/src/shared-runner/inline-runner.ts | 10 +++--- .../engine/src/shared-runner/job-runner.ts | 7 ++-- .../engine/src/shared-runner/runner-pool.ts | 5 ++- .../engine/src/shared-runner/runner.test.ts | 15 ++++++++ packages/engine/src/shared-runner/runner.ts | 5 +++ 13 files changed, 137 insertions(+), 37 deletions(-) create mode 100644 packages/core/src/job/abort-reason.ts diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts new file mode 100644 index 00000000..14678339 --- /dev/null +++ b/packages/core/src/job/abort-reason.ts @@ -0,0 +1,27 @@ +/** + * Reason set on a job's `abortSignal` when the engine aborts a running job. + * + * Inspect `job.abortSignal.reason` inside `run` to tell why the job is being aborted, e.g. to log + * or clean up differently for a timeout vs an explicit cancellation. + */ +export type AbortReason = JobTimeout | JobCanceled; + +/** + * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`. + */ +export class JobTimeout extends Error { + constructor(timeoutMs: number) { + super(`Job timed out after ${timeoutMs}ms`); + this.name = "JobTimeout"; + } +} + +/** + * Set as the `abortSignal.reason` when a running job is aborted because it was canceled. + */ +export class JobCanceled extends Error { + constructor() { + super("Job was canceled"); + this.name = "JobCanceled"; + } +} diff --git a/packages/core/src/job/index.ts b/packages/core/src/job/index.ts index 9fc50091..a9016069 100644 --- a/packages/core/src/job/index.ts +++ b/packages/core/src/job/index.ts @@ -1 +1,2 @@ +export * from "./abort-reason"; export * from "./job"; diff --git a/packages/core/src/job/job.test.ts b/packages/core/src/job/job.test.ts index 74706418..4124f5f1 100644 --- a/packages/core/src/job/job.test.ts +++ b/packages/core/src/job/job.test.ts @@ -37,6 +37,21 @@ describe("job.ts", () => { expect(transition.result).toBe("foo bar"); }); + it("exposes a non-aborting abortSignal by default", () => { + const job = new DummyJob(); + expect(job.abortSignal).toBeInstanceOf(AbortSignal); + expect(job.abortSignal.aborted).toBe(false); + }); + + it("injects the abort signal at runtime", () => { + const job = new DummyJob(); + const controller = new AbortController(); + job.injectAbortSignal(controller.signal); + expect(job.abortSignal).toBe(controller.signal); + controller.abort(); + expect(job.abortSignal.aborted).toBe(true); + }); + it("creates a fail transition", () => { const job = new DummyJob(); const transition = job.fail("error"); diff --git a/packages/core/src/job/job.ts b/packages/core/src/job/job.ts index e32bd9c7..59cf2730 100644 --- a/packages/core/src/job/job.ts +++ b/packages/core/src/job/job.ts @@ -79,6 +79,18 @@ export abstract class Job implements JobData { readonly backoff_strategy!: BackoffStrategy; readonly retry_delay!: number | null; + /** + * Signal that fires when the engine aborts this run (timeout or cancellation). Available inside + * `run`. Pass it to abort-aware APIs (e.g. `fetch(url, { signal: this.abortSignal })`) or check it + * cooperatively (`this.abortSignal.throwIfAborted()`, `this.abortSignal.aborted`). The reason is a + * `JobTimeout` or `JobCanceled` (see `abortSignal.reason`). + * + * In `runner: "inline"` mode this is the only way a running job can be stopped. In `"thread"` mode + * the worker is also terminated, so honoring the signal is optional (but enables graceful cleanup). + * Defaults to a signal that never aborts when no run is in progress. + */ + readonly abortSignal: AbortSignal = new AbortController().signal; + /** * Initializes the job and resolves its script path. */ @@ -102,6 +114,14 @@ export abstract class Job implements JobData { Object.assign(this, jobData); } + /** + * Injects the abort signal for this run into the job instance at runtime. + * @param signal The abort signal the engine controls for this execution. + */ + injectAbortSignal(signal: AbortSignal): void { + Object.assign(this, { abortSignal: signal }); + } + /** * The class name of this job. */ diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 0db99da0..4d9da318 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -58,10 +58,11 @@ export interface EngineConfig { * * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation * and lets timeouts/cancellation forcibly abort a running job. - * - `"inline"`: jobs run in the current process/thread, with no worker pool. Timeouts and - * cancellation become best-effort (a running job cannot be forcibly aborted) and a CPU-bound - * job will block the event loop. Useful for single-process setups (serverless, tests, SQLite) - * and required when jobs need access to live in-process state. + * - `"inline"`: jobs run in the current process/thread, with no worker pool. A running job cannot + * be forcibly terminated, so timeouts and cancellation are delivered cooperatively via + * `this.abortSignal` inside the job (the job must honor it to stop early); a job that ignores it + * runs to completion, and a CPU-bound job will block the event loop. Useful for single-process + * setups (serverless, tests, SQLite) and required when jobs need access to live in-process state. * * Defaults to `"thread"`. */ diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 011b46a0..e88e7b5e 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,7 +1,6 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; @@ -72,7 +71,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); await executorManager.destroy(); @@ -89,7 +88,7 @@ describe("ExecutorManager", () => { await executorManager.execute(queryConfig, jobData); - expect(inlineRunMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(inlineRunMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).not.toHaveBeenCalled(); await executorManager.destroy(); inlineRunMock.mockReset(); @@ -102,9 +101,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -119,7 +118,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -140,9 +139,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -156,7 +155,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -186,7 +185,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 4c7a145b..1dbcfb67 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,6 +1,14 @@ import { Backend } from "@sidequest/backend"; -import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; +import { + JobCanceled, + JobData, + JobTimeout, + JobTransitionFactory, + logger, + QueueConfig, + RetryTransition, + RunTransition, +} from "@sidequest/core"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; import { JobTransitioner } from "../job/job-transitioner"; @@ -91,6 +99,7 @@ export class ExecutorManager { */ async execute(queueConfig: QueueConfig, job: JobData): Promise { let isRunning = false; + let controller: AbortController | undefined; try { logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); // We call prepareJob here again to make sure the jobs are in the queues. @@ -100,13 +109,17 @@ export class ExecutorManager { job = await JobTransitioner.apply(this.backend, job, new RunTransition()); isRunning = true; - const signal = new EventEmitter(); + controller = new AbortController(); + const abortController = controller; const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); + logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); + // Reflect the cancellation on the in-memory job so the terminal transition below is + // skipped by shouldRun (it requires "running") and does not overwrite the canceled state. + job.state = "canceled"; + abortController.abort(new JobCanceled()); isRunning = false; return; } @@ -117,13 +130,13 @@ export class ExecutorManager { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const runPromise = this.jobRunner.run(job, signal); + const runPromise = this.jobRunner.run(job, abortController.signal); if (job.timeout) { void new Promise(() => { setTimeout(() => { logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); - signal.emit("abort"); + abortController.abort(new JobTimeout(job.timeout!)); void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); }, job.timeout!); }); @@ -138,8 +151,11 @@ export class ExecutorManager { } catch (error: unknown) { isRunning = false; const err = error as Error; - if (err.message === "The task has been aborted") { - logger("Executor Manager").debug(`Job ${job.id} was aborted`); + // The thread runner rejects the run when its worker is aborted. Detect that via the signal + // rather than the rejection message (which varies). The terminal state was already set by the + // timeout (retry) or cancellation (canceled) path, so there is nothing more to do here. + if (controller?.signal.aborted) { + logger("Executor Manager").debug(`Job ${job.id} was aborted: ${String(controller.signal.reason)}`); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); diff --git a/packages/engine/src/shared-runner/inline-runner.test.ts b/packages/engine/src/shared-runner/inline-runner.test.ts index 225538e4..74bc540f 100644 --- a/packages/engine/src/shared-runner/inline-runner.test.ts +++ b/packages/engine/src/shared-runner/inline-runner.test.ts @@ -40,9 +40,10 @@ describe("InlineRunner", () => { expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); }); - sidequestTest("delegates to the runner with inline enabled (skips config injection)", async ({ config }) => { - await runner.run(jobData); - expect(run).toHaveBeenCalledWith({ jobData, config, inline: true }); + sidequestTest("delegates to the runner with inline enabled and forwards the signal", async ({ config }) => { + const signal = new AbortController().signal; + await runner.run(jobData, signal); + expect(run).toHaveBeenCalledWith({ jobData, config, inline: true, signal }); }); sidequestTest("destroy is a no-op and does not throw", () => { diff --git a/packages/engine/src/shared-runner/inline-runner.ts b/packages/engine/src/shared-runner/inline-runner.ts index 2631717e..f5ceb0fc 100644 --- a/packages/engine/src/shared-runner/inline-runner.ts +++ b/packages/engine/src/shared-runner/inline-runner.ts @@ -19,14 +19,16 @@ export class InlineRunner implements JobRunner { constructor(private nonNullConfig: NonNullableEngineConfig) {} /** - * Runs a job in the current process. The abort signal is intentionally not accepted: inline - * execution has no separate thread to terminate. + * Runs a job in the current process. The abort signal is forwarded to the job (as + * `this.abortSignal`) so it can stop cooperatively: inline execution has no separate thread to + * terminate, so this is the only way timeouts and cancellation can take effect. * @param job The job data to run. + * @param signal Abort signal handed to the job for cooperative cancellation. * @returns A promise resolving to the job result. */ - run(job: JobData): Promise { + run(job: JobData, signal?: AbortSignal): Promise { logger("InlineRunner").debug(`Running job ${job.id} inline`); - return run({ jobData: job, config: this.nonNullConfig, inline: true }); + return run({ jobData: job, config: this.nonNullConfig, inline: true, signal }); } /** diff --git a/packages/engine/src/shared-runner/job-runner.ts b/packages/engine/src/shared-runner/job-runner.ts index 6658b3dc..14d851f0 100644 --- a/packages/engine/src/shared-runner/job-runner.ts +++ b/packages/engine/src/shared-runner/job-runner.ts @@ -1,5 +1,4 @@ import { JobData, JobResult } from "@sidequest/core"; -import EventEmitter from "events"; /** * Abstraction over how a claimed job is actually executed. @@ -12,10 +11,10 @@ export interface JobRunner { /** * Runs a job and resolves with its result. * @param job The job data to run. - * @param signal Optional event emitter used to request cancellation/abort. May be ignored by - * implementations that cannot forcibly abort a running job (e.g. the inline runner). + * @param signal Abort signal for the run. The thread runner uses it to terminate the worker; the + * inline runner forwards it to the job so it can stop cooperatively. */ - run(job: JobData, signal?: EventEmitter): Promise; + run(job: JobData, signal?: AbortSignal): Promise; /** * Releases any resources held by the runner. diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 224d6aca..2461b402 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,5 +1,4 @@ import { JobData, JobResult, logger } from "@sidequest/core"; -import EventEmitter from "events"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; @@ -31,10 +30,10 @@ export class RunnerPool implements JobRunner { /** * Runs a job in the worker pool. * @param job The job data to run. - * @param signal Optional event emitter for cancellation. + * @param signal Optional abort signal; when it aborts, piscina terminates the worker thread. * @returns A promise resolving to the job result. */ - run(job: JobData, signal?: EventEmitter): Promise { + run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); } diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 7fde6c15..317fcb63 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -65,6 +65,21 @@ describe("runner.ts", () => { expect(importSidequest).not.toHaveBeenCalled(); }); + sidequestTest("injects the abort signal into the job when provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + const signal = new AbortController().signal; + await run({ jobData, config, signal }); + expect(injectSpy).toHaveBeenCalledWith(signal); + injectSpy.mockRestore(); + }); + + sidequestTest("does not inject an abort signal when none is provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + await run({ jobData, config }); + expect(injectSpy).not.toHaveBeenCalled(); + injectSpy.mockRestore(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 458bd1fe..53a3a993 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -15,10 +15,12 @@ export default async function run({ jobData, config, inline, + signal, }: { jobData: JobData; config: EngineConfig; inline?: boolean; + signal?: AbortSignal; }): Promise { // In inline mode the job runs in the host process, where Sidequest is already configured, so // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. @@ -77,6 +79,9 @@ export default async function run({ const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); + if (signal) { + job.injectAbortSignal(signal); + } logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args); return job.perform(...jobData.args); From c78251d3dcaaccbb1500d3f870da4899792336b1 Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 11:39:56 -0300 Subject: [PATCH 2/3] refactor(engine): simplify abort handling in executor Use a single const AbortController (drop the | undefined and the alias) and guard the terminal transition with !controller.signal.aborted instead of mutating the in-memory job state, so a job that ignores the abort signal and runs to completion cannot overwrite the canceled/retry state. Adds a regression test for the ignored-signal case. --- .../src/execution/executor-manager.test.ts | 29 ++++++++++++++++++- .../engine/src/execution/executor-manager.ts | 26 ++++++++--------- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index e88e7b5e..872d7722 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,6 +1,6 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core"; +import { CompletedResult, CompleteTransition, JobData, RetryTransition, RunTransition } from "@sidequest/core"; import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; @@ -132,6 +132,33 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest( + "does not overwrite the canceled state when an aborted job ignores the signal", + async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled mid-run but ignores the abort signal and runs to completion. + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + while (!signal.aborted) { + await new Promise((r) => setTimeout(r, 50)); + } + return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult; + }); + + await executorManager.execute(queryConfig, jobData); + + // The terminal completion transition must be skipped so the canceled state is preserved. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, expect.anything(), expect.any(CompleteTransition)); + + await executorManager.destroy(); + }, + ); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 1dbcfb67..743da59c 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -99,7 +99,7 @@ export class ExecutorManager { */ async execute(queueConfig: QueueConfig, job: JobData): Promise { let isRunning = false; - let controller: AbortController | undefined; + const controller = new AbortController(); try { logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); // We call prepareJob here again to make sure the jobs are in the queues. @@ -109,17 +109,12 @@ export class ExecutorManager { job = await JobTransitioner.apply(this.backend, job, new RunTransition()); isRunning = true; - controller = new AbortController(); - const abortController = controller; const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); if (watchedJob!.state === "canceled") { logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); - // Reflect the cancellation on the in-memory job so the terminal transition below is - // skipped by shouldRun (it requires "running") and does not overwrite the canceled state. - job.state = "canceled"; - abortController.abort(new JobCanceled()); + controller.abort(new JobCanceled()); isRunning = false; return; } @@ -130,13 +125,13 @@ export class ExecutorManager { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const runPromise = this.jobRunner.run(job, abortController.signal); + const runPromise = this.jobRunner.run(job, controller.signal); if (job.timeout) { void new Promise(() => { setTimeout(() => { logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); - abortController.abort(new JobTimeout(job.timeout!)); + controller.abort(new JobTimeout(job.timeout!)); void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); }, job.timeout!); }); @@ -145,16 +140,21 @@ export class ExecutorManager { const result = await runPromise; isRunning = false; - logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); - const transition = JobTransitionFactory.create(result); - await JobTransitioner.apply(this.backend, job, transition); + // If the job was aborted (canceled or timed out) the terminal state was already decided by that + // path. Skip the terminal transition so a job that ignored the signal and ran to completion does + // not overwrite the canceled/retry state. + if (!controller.signal.aborted) { + logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); + const transition = JobTransitionFactory.create(result); + await JobTransitioner.apply(this.backend, job, transition); + } } catch (error: unknown) { isRunning = false; const err = error as Error; // The thread runner rejects the run when its worker is aborted. Detect that via the signal // rather than the rejection message (which varies). The terminal state was already set by the // timeout (retry) or cancellation (canceled) path, so there is nothing more to do here. - if (controller?.signal.aborted) { + if (controller.signal.aborted) { logger("Executor Manager").debug(`Job ${job.id} was aborted: ${String(controller.signal.reason)}`); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); From 796b197e27a9ea49348a7c863e634b92b2f1ecfd Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:28:09 -0300 Subject: [PATCH 3/3] feat: cooperative abort in thread mode + abortGracePeriodMs (#183) * feat: cooperative abort in thread mode with configurable hard-kill grace Thread-mode jobs can now honor this.abortSignal too. On timeout/cancel the engine transfers a MessagePort to the worker and posts the abort (with its JobTimeout/JobCanceled reason); the worker turns that into the job's abortSignal. A new abortGracePeriodMs config controls how long to wait for the job to stop cooperatively before forcibly terminating the worker thread. abortGracePeriodMs defaults to 0, which preserves the current behavior: the worker is terminated immediately with no cooperative window and no port overhead. The port machinery is only set up when the grace period is positive. Inline is unaffected (no thread to terminate; grace does not apply). * test: cover the worker-side abort port path Adds a test that drives run() through the abortPort branch (thread mode): a real MessageChannel delivers an abort message, signalFromAbortPort rebuilds the JobTimeout reason, and the job observes it via this.abortSignal. Closes the gap where only the engine-side port posting and the inline signal path were tested. --- packages/core/src/job/abort-reason.ts | 4 + packages/engine/src/engine.ts | 14 ++++ .../src/shared-runner/runner-pool.test.ts | 56 +++++++++++-- .../engine/src/shared-runner/runner-pool.ts | 49 +++++++++++- .../engine/src/shared-runner/runner.test.ts | 78 ++++++++++++++++++- packages/engine/src/shared-runner/runner.ts | 46 ++++++++++- .../engine/src/test-jobs/abort-aware-job.js | 19 +++++ 7 files changed, 252 insertions(+), 14 deletions(-) create mode 100644 packages/engine/src/test-jobs/abort-aware-job.js diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts index 14678339..97ffe8c9 100644 --- a/packages/core/src/job/abort-reason.ts +++ b/packages/core/src/job/abort-reason.ts @@ -10,9 +10,13 @@ export type AbortReason = JobTimeout | JobCanceled; * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`. */ export class JobTimeout extends Error { + /** The timeout, in milliseconds, that was exceeded. */ + readonly timeoutMs: number; + constructor(timeoutMs: number) { super(`Job timed out after ${timeoutMs}ms`); this.name = "JobTimeout"; + this.timeoutMs = timeoutMs; } } diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 4d9da318..c53e5904 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -67,6 +67,19 @@ export interface EngineConfig { * Defaults to `"thread"`. */ runner?: "thread" | "inline"; + /** + * Grace period, in milliseconds, between cooperatively aborting a running job (timeout or + * cancellation) and forcibly terminating its worker thread. + * + * Only applies to `runner: "thread"`. When greater than `0`, the abort is first delivered to the + * job via `this.abortSignal` so it can stop and clean up; if it has not finished after this many + * milliseconds, the worker thread is terminated. When `0` (default), the worker is terminated + * immediately with no cooperative window, preserving the previous behavior. Has no effect in + * `runner: "inline"` (there is no thread to terminate). + * + * Defaults to `0`. + */ + abortGracePeriodMs?: number; /** Minimum number of worker threads to use. Defaults to number of CPUs */ minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ @@ -190,6 +203,7 @@ export class Engine { gracefulShutdown: config?.gracefulShutdown ?? true, fork: config?.fork ?? true, runner: config?.runner ?? "thread", + abortGracePeriodMs: config?.abortGracePeriodMs ?? 0, minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000, diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 8c4bba95..d725f763 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -1,8 +1,9 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { JobData } from "@sidequest/core"; -import EventEmitter from "events"; +import { JobData, JobTimeout } from "@sidequest/core"; +import { MessagePort } from "node:worker_threads"; import { beforeEach, describe, expect, vi } from "vitest"; import { DummyJob } from "../test-jobs/dummy-job"; +import { AbortPortMessage } from "./runner"; import { RunnerPool } from "./runner-pool"; const piscinaMockInstance = { @@ -39,14 +40,57 @@ describe("RunnerPool", () => { pool = new RunnerPool(config); }); - sidequestTest("should call pool.run with job data", async ({ config }) => { - const emiter = new EventEmitter(); - const result = await pool.run(jobData, emiter); + sidequestTest("passes the abort signal straight to piscina when grace is 0", async ({ config }) => { + const signal = new AbortController().signal; + const result = await pool.run(jobData, signal); expect(result).toEqual({ type: "completed", result: "ok" }); - expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal: emiter }); + expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest( + "with a grace period, delivers the abort over a port and hard-kills after the grace", + async ({ config }) => { + vi.useFakeTimers(); + try { + const gracePool = new RunnerPool({ ...config, abortGracePeriodMs: 1000 }); + + let capturedPort: MessagePort | undefined; + let hardKillSignal: AbortSignal | undefined; + piscinaMockInstance.run.mockImplementationOnce( + (value: { abortPort: MessagePort }, opts: { signal: AbortSignal }) => { + capturedPort = value.abortPort; + hardKillSignal = opts.signal; + // Resolve only once piscina is asked to terminate (hard kill). + return new Promise((resolve) => + opts.signal.addEventListener("abort", () => resolve({ type: "completed", result: "killed" })), + ); + }, + ); + + const controller = new AbortController(); + const runPromise = gracePool.run(jobData, controller.signal); + + const message = new Promise((resolve) => + capturedPort!.once("message", (m: AbortPortMessage) => resolve(m)), + ); + + controller.abort(new JobTimeout(5000)); + + expect(await message).toEqual({ kind: "timeout", timeoutMs: 5000 }); + expect(hardKillSignal!.aborted).toBe(false); + + // Grace elapses -> piscina is asked to terminate the worker. + await vi.advanceTimersByTimeAsync(1000); + expect(hardKillSignal!.aborted).toBe(true); + + await runPromise; + } finally { + vi.useRealTimers(); + } + }, + ); + sidequestTest("should call pool.destroy", () => { pool.destroy(); expect(piscinaMockInstance.destroy).toHaveBeenCalled(); diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 2461b402..da034bd7 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,8 +1,10 @@ -import { JobData, JobResult, logger } from "@sidequest/core"; +import { JobData, JobResult, JobTimeout, logger } from "@sidequest/core"; +import { MessageChannel } from "node:worker_threads"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; import { JobRunner } from "./job-runner"; +import type { AbortPortMessage } from "./runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. @@ -29,13 +31,54 @@ export class RunnerPool implements JobRunner { /** * Runs a job in the worker pool. + * + * With `abortGracePeriodMs === 0` (default), an abort terminates the worker immediately. With a + * positive grace period, the abort is delivered to the job cooperatively over a transferred port + * (so it can stop via `this.abortSignal`), and the worker is only forcibly terminated if it has + * not finished within the grace period. + * * @param job The job data to run. - * @param signal Optional abort signal; when it aborts, piscina terminates the worker thread. + * @param signal Optional abort signal for cancellation/timeout. * @returns A promise resolving to the job result. */ run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); - return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + const grace = this.nonNullConfig.abortGracePeriodMs; + + if (!signal || grace <= 0) { + // Abort terminates the worker immediately. + return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + } + + // Deliver the abort cooperatively first, then hard-terminate after the grace period. + const channel = new MessageChannel(); + const hardKill = new AbortController(); + let graceTimer: ReturnType | undefined; + + const onAbort = () => { + const reason: unknown = signal.reason; + const message: AbortPortMessage = + reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; + channel.port1.postMessage(message); + graceTimer = setTimeout(() => hardKill.abort(), grace); + }; + + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + + return this.pool + .run( + { jobData: job, config: this.nonNullConfig, abortPort: channel.port2 }, + { transferList: [channel.port2], signal: hardKill.signal }, + ) + .finally(() => { + if (graceTimer) clearTimeout(graceTimer); + signal.removeEventListener("abort", onAbort); + channel.port1.close(); + }); } /** diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 317fcb63..38378c8f 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -1,9 +1,11 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { FailedResult, JobData } from "@sidequest/core"; +import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core"; import { existsSync, unlinkSync } from "node:fs"; import { unlink, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; +import { MessageChannel } from "node:worker_threads"; import { vi } from "vitest"; +import { AbortAwareJob } from "../test-jobs/abort-aware-job"; import { DummyJob } from "../test-jobs/dummy-job"; import { DummyJobWithArgs } from "../test-jobs/dummy-job-with-args"; import { importSidequest } from "../utils/import"; @@ -80,6 +82,80 @@ describe("runner.ts", () => { injectSpy.mockRestore(); }); + sidequestTest("a job receives the abort signal and its reason and stops", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + const resultPromise = run({ jobData: abortJobData, config, inline: true, signal: controller.signal }); + controller.abort(new JobCanceled()); + + // The job resolves once it observes the abort (whether before it starts or while awaiting it). + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("JobCanceled"); + }); + + sidequestTest("a job sees an already-aborted signal before it starts", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + controller.abort(new JobTimeout(50)); + + const result = (await run({ + jobData: abortJobData, + config, + inline: true, + signal: controller.signal, + })) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("aborted before start: JobTimeout"); + }); + + sidequestTest("a thread job is aborted via the abort port (rebuilds the reason)", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const channel = new MessageChannel(); + // Thread path: no live signal, the abort arrives over the transferred port. + const resultPromise = run({ jobData: abortJobData, config, abortPort: channel.port2 }); + channel.port1.postMessage({ kind: "timeout", timeoutMs: 1234 }); + + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + // The worker reconstructs the JobTimeout reason from the port message. + expect(result.error.message).toContain("JobTimeout"); + channel.port1.close(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 53a3a993..3ef3e6bf 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -1,10 +1,40 @@ -import { Job, JobClassType, JobData, JobResult, logger, resolveScriptPathForJob, toErrorData } from "@sidequest/core"; +import { + Job, + JobCanceled, + JobClassType, + JobData, + JobResult, + JobTimeout, + logger, + resolveScriptPathForJob, + toErrorData, +} from "@sidequest/core"; import { existsSync } from "node:fs"; import { fileURLToPath } from "node:url"; +import { MessagePort } from "node:worker_threads"; import { EngineConfig } from "../engine"; import { importSidequest } from "../utils"; import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader"; +/** Message posted by the engine over the abort port to cooperatively abort a worker-thread job. */ +export type AbortPortMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; + +/** + * Builds an {@link AbortSignal} for a worker-thread job from an abort port. + * + * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine + * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a + * local signal carrying the proper {@link JobTimeout}/{@link JobCanceled} reason. + */ +function signalFromAbortPort(port: MessagePort): AbortSignal { + const controller = new AbortController(); + port.on("message", (message: AbortPortMessage) => { + const reason = message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); + controller.abort(reason); + }); + return controller.signal; +} + /** * Runs a job by dynamically importing its script and executing the specified class. * @param jobData The job data containing script and class information @@ -16,11 +46,13 @@ export default async function run({ config, inline, signal, + abortPort, }: { jobData: JobData; config: EngineConfig; inline?: boolean; signal?: AbortSignal; + abortPort?: MessagePort; }): Promise { // In inline mode the job runs in the host process, where Sidequest is already configured, so // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. @@ -79,12 +111,18 @@ export default async function run({ const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); - if (signal) { - job.injectAbortSignal(signal); + // Inline passes a live signal directly; the thread runner gets an abort port it turns into one. + const abortSignal = signal ?? (abortPort ? signalFromAbortPort(abortPort) : undefined); + if (abortSignal) { + job.injectAbortSignal(abortSignal); } logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args); - return job.perform(...jobData.args); + try { + return await job.perform(...jobData.args); + } finally { + abortPort?.close(); + } } /** diff --git a/packages/engine/src/test-jobs/abort-aware-job.js b/packages/engine/src/test-jobs/abort-aware-job.js new file mode 100644 index 00000000..bc9704de --- /dev/null +++ b/packages/engine/src/test-jobs/abort-aware-job.js @@ -0,0 +1,19 @@ +import { Job } from "@sidequest/core"; + +/** + * A job that honors `this.abortSignal`: it waits until aborted and reports the abort reason. + * Used to verify cooperative cancellation/timeout end-to-end through the runner. + */ +export class AbortAwareJob extends Job { + async run() { + if (this.abortSignal.aborted) { + return this.fail(`aborted before start: ${this.abortSignal.reason?.name}`); + } + + await new Promise((resolve) => { + this.abortSignal.addEventListener("abort", resolve, { once: true }); + }); + + return this.fail(`aborted: ${this.abortSignal.reason?.name}`); + } +}