From e667a90cb5f655e5998fd2f4b7506ef14e1b4599 Mon Sep 17 00:00:00 2001 From: merencia Date: Thu, 18 Jun 2026 18:25:24 -0300 Subject: [PATCH] fix: do not attempt to cancel running jobs in inline mode Cancelling a running job relies on aborting its worker thread. The inline runner has no thread to abort, so the abort signal had no effect and, unlike the thread runner, the job ran to completion and its terminal transition overwrote the canceled state (the in-memory job stayed "running", so shouldRun passed). Skip the cancellation polling loop entirely when runner is "inline": an in-flight job runs to completion and cancellation of running jobs is not supported there. Pending jobs can still be cancelled (the dispatcher never claims them). --- packages/engine/src/engine.ts | 9 +++--- .../src/execution/executor-manager.test.ts | 20 +++++++++++++ .../engine/src/execution/executor-manager.ts | 30 +++++++++++-------- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 0db99da0..9f93819e 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -58,10 +58,11 @@ export interface EngineConfig { * * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation * and lets timeouts/cancellation forcibly abort a running job. - * - `"inline"`: jobs run in the current process/thread, with no worker pool. Timeouts and - * cancellation become best-effort (a running job cannot be forcibly aborted) and a CPU-bound - * job will block the event loop. Useful for single-process setups (serverless, tests, SQLite) - * and required when jobs need access to live in-process state. + * - `"inline"`: jobs run in the current process/thread, with no worker pool. A running job + * cannot be aborted, so timeouts are best-effort and cancelling an in-flight job is not + * supported (it runs to completion; pending jobs can still be cancelled). A CPU-bound job will + * block the event loop. Useful for single-process setups (serverless, tests, SQLite) and + * required when jobs need access to live in-process state. * * Defaults to `"thread"`. */ diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 011b46a0..92804f15 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -95,6 +95,26 @@ describe("ExecutorManager", () => { inlineRunMock.mockReset(); }); + sidequestTest("does not poll for cancellation in inline mode", async ({ backend, config }) => { + inlineRunMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "result", + } satisfies CompletedResult); + // The cancellation poll is the only thing that reads the job back during a run, so if inline + // skips it, getJob is never called. + const getJobSpy = vi.spyOn(backend, "getJob"); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + await executorManager.execute(queryConfig, jobData); + + expect(getJobSpy).not.toHaveBeenCalled(); + await executorManager.destroy(); + getJobSpy.mockRestore(); + inlineRunMock.mockReset(); + }); + sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 4c7a145b..c348c903 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -101,19 +101,25 @@ export class ExecutorManager { isRunning = true; const signal = new EventEmitter(); - const cancellationCheck = async () => { - while (isRunning) { - const watchedJob = await this.backend.getJob(job.id); - if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); - isRunning = false; - return; + // Cancelling a running job works by aborting its worker thread. The inline runner has no + // separate thread to abort, so cancellation of an in-flight job is not supported there: we + // skip the polling loop entirely and let the job run to completion. (Pending jobs can still + // be cancelled — the dispatcher never claims them.) + if (this.nonNullConfig.runner !== "inline") { + const cancellationCheck = async () => { + while (isRunning) { + const watchedJob = await this.backend.getJob(job.id); + if (watchedJob!.state === "canceled") { + logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); + signal.emit("abort"); + isRunning = false; + return; + } + await new Promise((r) => setTimeout(r, 1000)); } - await new Promise((r) => setTimeout(r, 1000)); - } - }; - void cancellationCheck(); + }; + void cancellationCheck(); + } logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`);