diff --git a/packages/docs/production/execution-modes.md b/packages/docs/production/execution-modes.md index 98528d3e..5216ce97 100644 --- a/packages/docs/production/execution-modes.md +++ b/packages/docs/production/execution-modes.md @@ -36,10 +36,6 @@ Use `fork: false` when: - You're running an **integration test** and want to avoid IPC and process teardown flakiness. - Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container. -::: danger No crash isolation with `fork: false` -With the default `fork: true`, a job that throws an unhandled exception or calls `process.exit()` only takes down the engine fork, and Sidequest restarts it. With `fork: false`, the engine shares your application's process: **a misbehaving job can crash your whole app.** Only use it when you understand and accept that. -::: - ## `runner`: thread pool vs inline ```typescript @@ -159,9 +155,7 @@ this.abortSignal.addEventListener("abort", () => { | `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. | | `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. | -::: danger Inline timeout/cancel only work if your job honors the signal -In `runner: "inline"` there is no way to forcibly stop a job. If your job does not pass `this.abortSignal` to its async work or check `this.abortSignal.aborted` / `throwIfAborted()`, then **timeouts and cancellation have no effect**: the job keeps running until it returns on its own. Treat `this.abortSignal` as mandatory for any long-running inline job. -::: +So in inline mode `this.abortSignal` is effectively mandatory for any long-running job: a job that does not honor it keeps running until it returns on its own (timeouts and cancellation cannot stop it). ### `abortGracePeriodMs`: graceful kill for thread jobs diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 4c4d9092..850bfb78 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -236,31 +236,6 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest("aborts a running job when its row no longer exists", async ({ backend, config }) => { - await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config); - - // Simulate the row being deleted/truncated while the job runs: the watcher must still stop it. - const getJobSpy = vi.spyOn(backend, "getJob").mockResolvedValue(undefined); - runMock.mockImplementationOnce( - (_job: JobData, signal: AbortSignal) => - new Promise((_, reject) => { - signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); - }), - ); - - await executorManager.execute(queryConfig, jobData); - - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); - expect(executorManager.totalActiveWorkers()).toBe(0); - - await executorManager.destroy(); - getJobSpy.mockRestore(); - }); - sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); const executorManager = new ExecutorManager(backend, config); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 89b7f255..1468ba36 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -115,13 +115,8 @@ export class ExecutorManager { const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); - // Abort when the job was canceled, and also when its row no longer exists (deleted or - // truncated): the record is gone, so the run must be stopped rather than left to block - // shutdown forever. - if (!watchedJob || watchedJob.state === "canceled") { - logger("Executor Manager").debug( - `Aborting job ${job.id}: ${watchedJob ? "canceled" : "row no longer exists"}`, - ); + if (watchedJob?.state === "canceled") { + logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); controller.abort(new JobCanceled()); isRunning = false; return; @@ -157,12 +152,15 @@ export class ExecutorManager { isRunning = false; const err = error as Error; if (controller.signal.aborted) { - // The run produced no result because the worker was hard-killed (thread). The abort reason - // decides the terminal state: a timeout becomes a retry, anything else (cancellation) becomes - // canceled. The rejection is logged so a real error during the abort is not lost. + // The run produced no result because the worker was hard-killed (thread). Only a clear + // cancellation maps to canceled; every other abort reason (timeout, or anything else) defaults + // to a retry as a failsafe. The rejection is logged so a real error during the abort is kept. const reason: unknown = controller.signal.reason; logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); - const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition(); + const transition = + reason instanceof JobCanceled + ? new CancelTransition() + : new RetryTransition(reason instanceof Error ? reason : new Error(`Job aborted: ${String(reason)}`)); await this.applyTerminalTransition(job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 48f21275..5059c9b4 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -47,6 +47,14 @@ describe("RunnerPool", () => { expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest("rejects without running the job when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(new Error("already gone")); + + await expect(pool.run(jobData, controller.signal)).rejects.toThrow("already gone"); + expect(piscinaMockInstance.run).not.toHaveBeenCalled(); + }); + sidequestTest( "with a grace period, delivers the abort over a port and hard-kills after the grace", async ({ config }) => { diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 244c37dd..1ed8b20b 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -42,6 +42,12 @@ export class RunnerPool implements JobRunner { */ run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); + + // Already aborted before we could start (e.g. canceled between claim and dispatch): don't run it. + if (signal?.aborted) { + return Promise.reject(signal.reason instanceof Error ? signal.reason : new Error("Job aborted before execution")); + } + const grace = this.nonNullConfig.abortGracePeriodMs; if (!signal || grace <= 0) { @@ -59,11 +65,7 @@ export class RunnerPool implements JobRunner { graceTimer = setTimeout(() => hardKill.abort(), grace); }; - if (signal.aborted) { - onAbort(); - } else { - signal.addEventListener("abort", onAbort, { once: true }); - } + signal.addEventListener("abort", onAbort, { once: true }); return this.pool .run( diff --git a/tests/integration/shared-test-suite.js b/tests/integration/shared-test-suite.js index 125b6ae6..531d98c5 100644 --- a/tests/integration/shared-test-suite.js +++ b/tests/integration/shared-test-suite.js @@ -310,7 +310,9 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") queues: [{ name: "default" }], }); - const jobData = await Sidequest.build(TimeoutJob).enqueue(1000000); + // A few seconds: long enough to reliably cancel while running, short enough that it cannot + // outlive the suite's teardown (which truncates the table) if the cancel watcher is mid-poll. + const jobData = await Sidequest.build(TimeoutJob).enqueue(3000); await vi.waitUntil(() => Sidequest.job.get(jobData.id).then((job) => job?.state === "running"), 5000); // Cancel the job while it's running