From 3e5284c400fce73b0efd12f4217c46d2176c19dc Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Thu, 18 Jun 2026 17:25:38 -0300 Subject: [PATCH 01/10] feat: add inline execution mode (runner: "inline") (#178) Adds a `runner` engine config option: - "thread" (default): the existing piscina worker-thread pool. - "inline": runs jobs in the current process/thread, with no pool. Introduces a `JobRunner` interface implemented by both the existing `RunnerPool` and a new `InlineRunner`; `ExecutorManager` picks the impl from config. The `runner()` function gains an `inline` flag to skip the redundant `Sidequest.configure` injection when the job runs in the host process. Inline mode trades CPU isolation and forcible timeout/cancellation (best-effort only) for single-process execution. Groundwork for the no-fork mode and the NestJS integration; also useful for serverless, tests, and SQLite. --- packages/engine/src/engine.ts | 14 +++++ .../src/execution/executor-manager.test.ts | 24 +++++++++ .../engine/src/execution/executor-manager.ts | 19 ++++--- packages/engine/src/shared-runner/index.ts | 2 + .../src/shared-runner/inline-runner.test.ts | 51 +++++++++++++++++++ .../engine/src/shared-runner/inline-runner.ts | 38 ++++++++++++++ .../engine/src/shared-runner/job-runner.ts | 24 +++++++++ .../engine/src/shared-runner/runner-pool.ts | 3 +- .../engine/src/shared-runner/runner.test.ts | 13 +++++ packages/engine/src/shared-runner/runner.ts | 16 +++++- 10 files changed, 193 insertions(+), 11 deletions(-) create mode 100644 packages/engine/src/shared-runner/inline-runner.test.ts create mode 100644 packages/engine/src/shared-runner/inline-runner.ts create mode 100644 packages/engine/src/shared-runner/job-runner.ts diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index cb8daf87..07a0f3b7 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -40,6 +40,19 @@ export interface EngineConfig { cleanupFinishedJobsOlderThan?: number; /** Whether to enable graceful shutdown handling. Defaults to `true` */ gracefulShutdown?: boolean; + /** + * How jobs are executed. + * + * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation + * and lets timeouts/cancellation forcibly abort a running job. + * - `"inline"`: jobs run in the current process/thread, with no worker pool. Timeouts and + * cancellation become best-effort (a running job cannot be forcibly aborted) and a CPU-bound + * job will block the event loop. Useful for single-process setups (serverless, tests, SQLite) + * and required when jobs need access to live in-process state. + * + * Defaults to `"thread"`. + */ + runner?: "thread" | "inline"; /** Minimum number of worker threads to use. Defaults to number of CPUs */ minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ @@ -155,6 +168,7 @@ export class Engine { json: config?.logger?.json ?? false, }, gracefulShutdown: config?.gracefulShutdown ?? true, + runner: config?.runner ?? "thread", minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000, diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0565887f..011b46a0 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -8,6 +8,7 @@ import { DummyJob } from "../test-jobs/dummy-job"; import { ExecutorManager } from "./executor-manager"; const runMock = vi.hoisted(() => vi.fn()); +const inlineRunMock = vi.hoisted(() => vi.fn()); vi.mock("../shared-runner", () => ({ RunnerPool: vi.fn().mockImplementation(function () { @@ -16,6 +17,12 @@ vi.mock("../shared-runner", () => ({ destroy: vi.fn(), }; }), + InlineRunner: vi.fn().mockImplementation(function () { + return { + run: inlineRunMock, + destroy: vi.fn(), + }; + }), })); vi.mock("../job/job-transitioner", () => ({ @@ -71,6 +78,23 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest("uses the inline runner when runner is 'inline'", async ({ backend, config }) => { + inlineRunMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "result", + } satisfies CompletedResult); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + await executorManager.execute(queryConfig, jobData); + + expect(inlineRunMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).not.toHaveBeenCalled(); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }); + sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index f97b357d..4c7a145b 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -4,7 +4,7 @@ import EventEmitter from "events"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; import { JobTransitioner } from "../job/job-transitioner"; -import { RunnerPool } from "../shared-runner"; +import { InlineRunner, JobRunner, RunnerPool } from "../shared-runner"; /** * Manages job execution and worker concurrency for Sidequest. @@ -12,7 +12,7 @@ import { RunnerPool } from "../shared-runner"; export class ExecutorManager { private activeByQueue: Record>; private activeJobs: Set; - private runnerPool: RunnerPool; + private jobRunner: JobRunner; /** * Creates a new ExecutorManager. @@ -25,7 +25,10 @@ export class ExecutorManager { ) { this.activeByQueue = {}; this.activeJobs = new Set(); - this.runnerPool = new RunnerPool(this.nonNullConfig); + this.jobRunner = + this.nonNullConfig.runner === "inline" + ? new InlineRunner(this.nonNullConfig) + : new RunnerPool(this.nonNullConfig); } /** @@ -114,7 +117,7 @@ export class ExecutorManager { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const runPromise = this.runnerPool.run(job, signal); + const runPromise = this.jobRunner.run(job, signal); if (job.timeout) { void new Promise(() => { @@ -155,13 +158,13 @@ export class ExecutorManager { await new Promise((resolve, reject) => { const checkJobs = () => { if (this.totalActiveWorkers() === 0) { - logger("ExecutorManager").info("All active jobs finished. Destroying runner pool."); + logger("ExecutorManager").info("All active jobs finished. Destroying runner."); try { - this.runnerPool.destroy(); - logger("ExecutorManager").debug("Runner pool destroyed. Returning."); + this.jobRunner.destroy(); + logger("ExecutorManager").debug("Runner destroyed. Returning."); resolve(); } catch (error) { - logger("ExecutorManager").error("Error while destroying runner pool:", error); + logger("ExecutorManager").error("Error while destroying runner:", error); reject(error as Error); } } else { diff --git a/packages/engine/src/shared-runner/index.ts b/packages/engine/src/shared-runner/index.ts index e2d80d65..2112f5b0 100644 --- a/packages/engine/src/shared-runner/index.ts +++ b/packages/engine/src/shared-runner/index.ts @@ -1,4 +1,6 @@ import * as runner from "./runner"; +export * from "./inline-runner"; +export * from "./job-runner"; export * from "./manual-loader"; export * from "./runner-pool"; export { runner as run }; diff --git a/packages/engine/src/shared-runner/inline-runner.test.ts b/packages/engine/src/shared-runner/inline-runner.test.ts new file mode 100644 index 00000000..225538e4 --- /dev/null +++ b/packages/engine/src/shared-runner/inline-runner.test.ts @@ -0,0 +1,51 @@ +import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; +import { JobData } from "@sidequest/core"; +import { beforeEach, describe, expect, vi } from "vitest"; +import { DummyJob } from "../test-jobs/dummy-job"; +import { InlineRunner } from "./inline-runner"; + +// Spy on the runner module's default export so we can assert how the InlineRunner delegates to it. +vi.mock("./runner", async (importOriginal) => { + const original = await importOriginal(); + return { default: vi.fn(original.default), injectSidequestConfig: original.injectSidequestConfig }; +}); + +import run from "./runner"; + +describe("InlineRunner", () => { + let runner: InlineRunner; + let jobData: JobData; + + beforeEach(async ({ backend, config }) => { + vi.clearAllMocks(); + + const job = new DummyJob(); + await job.ready(); + + jobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + runner = new InlineRunner(config); + }); + + sidequestTest("runs the job in-process and returns its result", async () => { + const result = await runner.run(jobData); + expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); + }); + + sidequestTest("delegates to the runner with inline enabled (skips config injection)", async ({ config }) => { + await runner.run(jobData); + expect(run).toHaveBeenCalledWith({ jobData, config, inline: true }); + }); + + sidequestTest("destroy is a no-op and does not throw", () => { + expect(() => runner.destroy()).not.toThrow(); + }); +}); diff --git a/packages/engine/src/shared-runner/inline-runner.ts b/packages/engine/src/shared-runner/inline-runner.ts new file mode 100644 index 00000000..2631717e --- /dev/null +++ b/packages/engine/src/shared-runner/inline-runner.ts @@ -0,0 +1,38 @@ +import { JobData, JobResult, logger } from "@sidequest/core"; +import { NonNullableEngineConfig } from "../engine"; +import { JobRunner } from "./job-runner"; +import run from "./runner"; + +/** + * Runs jobs in the current process/thread instead of a worker thread pool. + * + * Used by the inline execution mode (`runner: "inline"`). Unlike {@link RunnerPool}, a running job + * cannot be forcibly aborted: cancellation and timeouts are best-effort only, and a CPU-bound job + * will block the event loop. In exchange, jobs run in the host process and can reach live + * in-process state (the basis for framework integrations like NestJS). + */ +export class InlineRunner implements JobRunner { + /** + * Creates a new InlineRunner. + * @param nonNullConfig The non-nullable engine configuration. + */ + constructor(private nonNullConfig: NonNullableEngineConfig) {} + + /** + * Runs a job in the current process. The abort signal is intentionally not accepted: inline + * execution has no separate thread to terminate. + * @param job The job data to run. + * @returns A promise resolving to the job result. + */ + run(job: JobData): Promise { + logger("InlineRunner").debug(`Running job ${job.id} inline`); + return run({ jobData: job, config: this.nonNullConfig, inline: true }); + } + + /** + * Releases resources. No-op for the inline runner. + */ + destroy(): void { + // There is no pool to tear down. In-flight jobs are awaited by the ExecutorManager. + } +} diff --git a/packages/engine/src/shared-runner/job-runner.ts b/packages/engine/src/shared-runner/job-runner.ts new file mode 100644 index 00000000..6658b3dc --- /dev/null +++ b/packages/engine/src/shared-runner/job-runner.ts @@ -0,0 +1,24 @@ +import { JobData, JobResult } from "@sidequest/core"; +import EventEmitter from "events"; + +/** + * Abstraction over how a claimed job is actually executed. + * + * Implemented by the thread-based {@link RunnerPool} (piscina worker pool) and the + * {@link InlineRunner} (same-process execution). The {@link ExecutorManager} picks one based on + * the engine's `runner` configuration. + */ +export interface JobRunner { + /** + * Runs a job and resolves with its result. + * @param job The job data to run. + * @param signal Optional event emitter used to request cancellation/abort. May be ignored by + * implementations that cannot forcibly abort a running job (e.g. the inline runner). + */ + run(job: JobData, signal?: EventEmitter): Promise; + + /** + * Releases any resources held by the runner. + */ + destroy(): void; +} diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index ab17d3fe..224d6aca 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -3,11 +3,12 @@ import EventEmitter from "events"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; +import { JobRunner } from "./job-runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. */ -export class RunnerPool { +export class RunnerPool implements JobRunner { /** The underlying Piscina worker pool. */ private readonly pool: Piscina; diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 555cd270..7fde6c15 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -52,6 +52,19 @@ describe("runner.ts", () => { expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); }); + sidequestTest("injects the config when not inline", async ({ config }) => { + vi.mocked(importSidequest).mockClear(); + await run({ jobData, config }); + expect(importSidequest).toHaveBeenCalled(); + }); + + sidequestTest("skips config injection when inline", async ({ config }) => { + vi.mocked(importSidequest).mockClear(); + const result = await run({ jobData, config, inline: true }); + expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); + expect(importSidequest).not.toHaveBeenCalled(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 8e31abde..458bd1fe 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -11,8 +11,20 @@ import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPa * @param config The non-nullable engine configuration. * @returns A promise resolving to the job result. */ -export default async function run({ jobData, config }: { jobData: JobData; config: EngineConfig }): Promise { - await injectSidequestConfig(config); +export default async function run({ + jobData, + config, + inline, +}: { + jobData: JobData; + config: EngineConfig; + inline?: boolean; +}): Promise { + // In inline mode the job runs in the host process, where Sidequest is already configured, so + // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. + if (!inline) { + await injectSidequestConfig(config); + } let script: Record = {}; try { From 274dd70944a6937ab472d795f76c36f97300af5e Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Thu, 18 Jun 2026 17:43:10 -0300 Subject: [PATCH 02/10] feat: add no-fork execution mode (fork: false) (#179) Adds a `fork` engine config option: - true (default): the engine runs in a child_process.fork, as today. - false: the engine runs in the host process, no fork. Extracts the worker runtime (dispatcher loop + stale/cleanup cron routines) out of MainWorker into a shared WorkerRuntime, used both by the forked worker and by the in-process path. WorkerRuntime now also stops its cron tasks on shutdown, which the forked worker previously relied on process exit for. Engine.start() runs the runtime in-process when fork is false; Engine.close() tears it down. No-fork trades crash isolation (a job crash can take down the host) for in-process execution. Groundwork for the NestJS integration; also useful for serverless and tests. --- packages/engine/src/engine.test.ts | 37 ++++++ packages/engine/src/engine.ts | 34 ++++- packages/engine/src/workers/index.ts | 1 + packages/engine/src/workers/main.test.ts | 48 +------ packages/engine/src/workers/main.ts | 112 +---------------- .../engine/src/workers/worker-runtime.test.ts | 119 ++++++++++++++++++ packages/engine/src/workers/worker-runtime.ts | 108 ++++++++++++++++ 7 files changed, 308 insertions(+), 151 deletions(-) create mode 100644 packages/engine/src/workers/worker-runtime.test.ts create mode 100644 packages/engine/src/workers/worker-runtime.ts diff --git a/packages/engine/src/engine.test.ts b/packages/engine/src/engine.test.ts index b21f5028..70ece959 100644 --- a/packages/engine/src/engine.test.ts +++ b/packages/engine/src/engine.test.ts @@ -24,6 +24,23 @@ vi.mock("child_process", () => ({ }), })); +// Mock the in-process worker runtime so the no-fork path doesn't run a real dispatcher loop. +const workerRuntimeMocks = vi.hoisted(() => ({ + start: vi.fn().mockResolvedValue(undefined), + shutdown: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("./workers/worker-runtime", () => ({ + WorkerRuntime: vi.fn(function () { + return { + start: workerRuntimeMocks.start, + shutdown: workerRuntimeMocks.shutdown, + }; + }), +})); + +import { fork } from "child_process"; + export class ParameterizedJob extends DummyJob { constructor( public param1: string, @@ -399,6 +416,26 @@ describe("Engine", () => { await engine.close(); }); + sidequestTest("runs in-process and skips fork when fork is false", async () => { + vi.mocked(fork).mockClear(); + workerRuntimeMocks.start.mockClear(); + workerRuntimeMocks.shutdown.mockClear(); + + const engine = new Engine(); + + await engine.start({ + backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" }, + fork: false, + gracefulShutdown: false, + }); + + expect(workerRuntimeMocks.start).toHaveBeenCalledTimes(1); + expect(fork).not.toHaveBeenCalled(); + + await engine.close(); + expect(workerRuntimeMocks.shutdown).toHaveBeenCalledTimes(1); + }); + sidequestTest("should warn when starting already started engine", async () => { const engine = new Engine(); const config = { diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 07a0f3b7..0db99da0 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -13,6 +13,7 @@ import { JobBuilder, JobBuilderDefaults } from "./job/job-builder"; import { grantQueueConfig, QueueDefaults } from "./queue/grant-queue-config"; import { findSidequestJobsScriptInParentDirs, resolveScriptPath } from "./shared-runner"; import { clearGracefulShutdown, gracefulShutdown } from "./utils/shutdown"; +import { WorkerRuntime } from "./workers/worker-runtime"; /** * Configuration options for the Sidequest engine. @@ -40,6 +41,18 @@ export interface EngineConfig { cleanupFinishedJobsOlderThan?: number; /** Whether to enable graceful shutdown handling. Defaults to `true` */ gracefulShutdown?: boolean; + /** + * Whether to run the engine in a forked child process. + * + * - `true` (default): the engine runs in a `child_process.fork`, isolating job-code crashes from + * the host application. + * - `false`: the engine runs in the host process. A crash in job code can take down the host, but + * jobs can reach live in-process state. Useful for single-process setups (serverless, tests) + * and required by framework integrations that rely on in-process execution. + * + * Defaults to `true`. + */ + fork?: boolean; /** * How jobs are executed. * @@ -136,6 +149,12 @@ export class Engine { */ private mainWorker?: ChildProcess; + /** + * Worker runtime when the engine runs in-process (`fork: false`). + * Mutually exclusive with {@link mainWorker}. + */ + private inProcessRuntime?: WorkerRuntime; + /** * Flag indicating whether the engine is currently shutting down. * This is used to prevent multiple shutdown attempts and ensure graceful shutdown behavior. @@ -168,6 +187,7 @@ export class Engine { json: config?.logger?.json ?? false, }, gracefulShutdown: config?.gracefulShutdown ?? true, + fork: config?.fork ?? true, runner: config?.runner ?? "thread", minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, @@ -255,7 +275,7 @@ export class Engine { * @param config Optional configuration object. */ async start(config: EngineConfig): Promise { - if (this.mainWorker) { + if (this.mainWorker || this.inProcessRuntime) { logger("Engine").warn("Sidequest engine already started"); return; } @@ -270,6 +290,14 @@ export class Engine { } } + if (!nonNullConfig.fork) { + logger("Engine").info("Starting Sidequest in-process (fork disabled)"); + this.inProcessRuntime = new WorkerRuntime(dependencyRegistry.get(Dependency.Backend)!, nonNullConfig); + await this.inProcessRuntime.start(); + gracefulShutdown(this.close.bind(this), "Engine", nonNullConfig.gracefulShutdown); + return; + } + return new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error("Timeout on starting sidequest fork!")); @@ -337,12 +365,16 @@ export class Engine { this.mainWorker.send({ type: "shutdown" }); await promise; } + if (this.inProcessRuntime) { + await this.inProcessRuntime.shutdown(); + } try { await dependencyRegistry.get(Dependency.Backend)?.close(); } catch (error) { logger("Engine").error("Error closing backend:", error); } this.mainWorker = undefined; + this.inProcessRuntime = undefined; // Reset the shutting down flag after closing // This allows the engine to be reconfigured or restarted later clearGracefulShutdown(); diff --git a/packages/engine/src/workers/index.ts b/packages/engine/src/workers/index.ts index d66d2cdb..9a327bf0 100644 --- a/packages/engine/src/workers/index.ts +++ b/packages/engine/src/workers/index.ts @@ -1 +1,2 @@ export * from "./main"; +export * from "./worker-runtime"; diff --git a/packages/engine/src/workers/main.test.ts b/packages/engine/src/workers/main.test.ts index a5dedb5d..b193e525 100644 --- a/packages/engine/src/workers/main.test.ts +++ b/packages/engine/src/workers/main.test.ts @@ -4,8 +4,6 @@ import { randomUUID } from "node:crypto"; import { beforeEach, describe, expect, vi } from "vitest"; import { Dispatcher } from "../execution/dispatcher"; import { grantQueueConfig } from "../queue"; -import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; -import { releaseStaleJobs } from "../routines/release-stale-jobs"; import { MainWorker } from "./main"; const runMock = vi.hoisted(() => vi.fn()); @@ -20,7 +18,7 @@ vi.mock("../shared-runner", () => ({ })); const cronMocks = vi.hoisted(() => ({ - schedule: vi.fn().mockReturnValue({ execute: vi.fn() }), + schedule: vi.fn().mockReturnValue({ execute: vi.fn(), stop: vi.fn() }), })); vi.mock("node-cron", () => ({ @@ -60,53 +58,15 @@ describe("main.ts", () => { } await worker.runWorker(config); vi.resetAllMocks(); + // resetAllMocks clears the return value, so re-establish the scheduled-task shape used by + // WorkerRuntime (which calls task.stop() on shutdown). + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() }); }); afterEach(async () => { await worker.shutdown(); }); - describe("startCron", () => { - sidequestTest("should schedule both cron jobs", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - expect(cronMocks.schedule).toHaveBeenCalledTimes(2); - expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function)); - }); - - sidequestTest("should call releaseStaleJobs when release cron executes", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; - - await cronCallback(); - - expect(releaseStaleJobs).toHaveBeenCalledWith(expect.any(Object), 600_000, 60_000); - }); - - sidequestTest("should call cleanupFinishedJobs when cleanup cron executes", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown; - - await cronCallback(); - - expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), 0); - }); - - sidequestTest("should handle errors and log them when releaseStaleJobs fails", async () => { - const error = new Error("fail"); - (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(error); - - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; - - await expect(cronCallback()).resolves.toBeUndefined(); - expect(releaseStaleJobs).toHaveBeenCalled(); - }); - }); - describe("runWorker", () => { sidequestTest("should call startCron after starting the worker", async ({ config }) => { const mockWorkerRun = vi.fn().mockResolvedValueOnce(undefined); diff --git a/packages/engine/src/workers/main.ts b/packages/engine/src/workers/main.ts index 85353b63..c0ea6347 100644 --- a/packages/engine/src/workers/main.ts +++ b/packages/engine/src/workers/main.ts @@ -1,20 +1,13 @@ -import { Backend } from "@sidequest/backend"; import { logger } from "@sidequest/core"; -import cron from "node-cron"; import { WORKER_PROCESS_FLAG } from "../constants"; import { Engine, EngineConfig, NonNullableEngineConfig } from "../engine"; -import { Dispatcher } from "../execution/dispatcher"; -import { ExecutorManager } from "../execution/executor-manager"; -import { QueueManager } from "../execution/queue-manager"; -import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; -import { releaseStaleJobs } from "../routines/release-stale-jobs"; import { gracefulShutdown } from "../utils/shutdown"; +import { WorkerRuntime } from "./worker-runtime"; export class MainWorker { shuttingDown = false; - private dispatcher: Dispatcher | undefined; + private runtime?: WorkerRuntime; private engine = new Engine(); - private backend?: Backend; /** * Starts a Sidequest worker process with the given configuration. @@ -24,23 +17,8 @@ export class MainWorker { if (!this.shuttingDown) { try { const nonNullConfig = await this.engine.configure({ ...sidequestConfig, skipMigration: true }); - this.backend = this.engine.getBackend()!; - - this.dispatcher = new Dispatcher( - this.backend, - new QueueManager(this.backend, nonNullConfig.queues, nonNullConfig.queueDefaults), - new ExecutorManager(this.backend, nonNullConfig), - nonNullConfig.jobPollingInterval, - ); - this.dispatcher.start(); - - await this.startCron( - nonNullConfig.releaseStaleJobsIntervalMin, - nonNullConfig.releaseStaleJobsMaxStaleMs, - nonNullConfig.releaseStaleJobsMaxClaimedMs, - nonNullConfig.cleanupFinishedJobsIntervalMin, - nonNullConfig.cleanupFinishedJobsOlderThan, - ); + this.runtime = new WorkerRuntime(this.engine.getBackend()!, nonNullConfig); + await this.runtime.start(); } catch (error) { logger("Worker").error(error); process.exit(1); @@ -56,91 +34,13 @@ export class MainWorker { async shutdown() { if (!this.shuttingDown) { this.shuttingDown = true; - logger("Worker").debug("Shutting down dispatcher"); - await this.dispatcher?.stop(); + logger("Worker").debug("Shutting down worker runtime"); + await this.runtime?.shutdown(); logger("Worker").debug("Shutting down engine"); await this.engine.close(); logger("Worker").debug("Main worker completely shut down"); } } - - /** - * Starts cron job for releasing stale jobs. - * Also executes the task immediately. - */ - async startAndExecuteStaleJobsReleaseCron( - intervalMin: number, - maxStaleMs: number, - maxClaimedMs: number, - ): Promise { - if (!this.backend) { - throw new Error("Backend is not initialized. Cannot start stale jobs release cron."); - } - - logger("Worker").debug(`Starting stale jobs release cron with interval: ${intervalMin} minutes`); - const releaseTask = cron.schedule(`*/${intervalMin} * * * *`, async () => { - try { - logger("Worker").debug("Running stale jobs release task"); - await releaseStaleJobs(this.backend!, maxStaleMs, maxClaimedMs); - } catch (error: unknown) { - logger("Worker").error("Error on running ReleaseStaleJob!", error); - } - }); - return releaseTask.execute(); - } - - /** - * Starts cron job for cleaning up finished jobs. - * Also executes the task immediately. - */ - async startAndExecuteFinishedJobsCleanupCron(intervalMin: number, cutoffMs: number): Promise { - if (!this.backend) { - throw new Error("Backend is not initialized. Cannot start finished jobs cleanup cron."); - } - - logger("Worker").debug(`Starting finished jobs cleanup cron with interval: ${intervalMin} minutes`); - const cleanupTask = cron.schedule(`*/${intervalMin} * * * *`, async () => { - try { - logger("Worker").debug("Running finished jobs cleanup task"); - await cleanupFinishedJobs(this.backend!, cutoffMs); - } catch (error: unknown) { - logger("Worker").error("Error on running CleanupJob!", error); - } - }); - return cleanupTask.execute(); - } - - /** - * Starts cron jobs for releasing stale jobs and cleaning up finished jobs. - * - * @param staleIntervalMin Interval in minutes for releasing stale jobs, or false to disable. - * @param maxStaleMs Maximum age in milliseconds for stale jobs. - * @param maxClaimedMs Maximum age in milliseconds for claimed jobs. - * @param cleanupIntervalMin Interval in minutes for cleaning up finished jobs, or false to disable - * @param cleanupCutoffMs Maximum age in milliseconds for finished jobs to be cleaned up. - */ - async startCron( - staleIntervalMin: number | false, - maxStaleMs: number, - maxClaimedMs: number, - cleanupIntervalMin: number | false, - cleanupCutoffMs: number, - ) { - logger("Worker").debug("Starting cron jobs"); - const promises: Promise[] = []; - - if (staleIntervalMin !== false) { - promises.push(this.startAndExecuteStaleJobsReleaseCron(staleIntervalMin, maxStaleMs, maxClaimedMs)); - } - - if (cleanupIntervalMin !== false) { - promises.push(this.startAndExecuteFinishedJobsCleanupCron(cleanupIntervalMin, cleanupCutoffMs)); - } - - await Promise.all(promises).catch((error) => { - logger("Worker").error(error); - }); - } } // Gate the bootstrap on the explicit flag the engine passes when forking, not on `!!process.send`. diff --git a/packages/engine/src/workers/worker-runtime.test.ts b/packages/engine/src/workers/worker-runtime.test.ts new file mode 100644 index 00000000..15765b0c --- /dev/null +++ b/packages/engine/src/workers/worker-runtime.test.ts @@ -0,0 +1,119 @@ +import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; +import { beforeEach, describe, expect, vi } from "vitest"; +import { Dispatcher } from "../execution/dispatcher"; +import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; +import { releaseStaleJobs } from "../routines/release-stale-jobs"; +import { WorkerRuntime } from "./worker-runtime"; + +const runMock = vi.hoisted(() => vi.fn()); + +vi.mock("../shared-runner", () => ({ + RunnerPool: vi.fn(function () { + return { + run: runMock, + destroy: vi.fn(), + }; + }), +})); + +const cronMocks = vi.hoisted(() => ({ + schedule: vi.fn(), +})); + +vi.mock("node-cron", () => ({ + default: { + schedule: cronMocks.schedule, + }, +})); + +vi.mock("../routines/cleanup-finished-job", () => ({ + cleanupFinishedJobs: vi.fn(() => undefined), +})); + +vi.mock("../routines/release-stale-jobs", () => ({ + releaseStaleJobs: vi.fn(() => undefined), +})); + +describe("WorkerRuntime", () => { + let runtime: WorkerRuntime; + + beforeEach(async ({ backend, config }) => { + await backend.migrate(); + vi.clearAllMocks(); + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() }); + runtime = new WorkerRuntime(backend, config); + }); + + describe("start", () => { + sidequestTest("starts the dispatcher and schedules both cron routines", async () => { + const dispatcherStart = vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined); + + await runtime.start(); + + expect(dispatcherStart).toHaveBeenCalled(); + expect(cronMocks.schedule).toHaveBeenCalledTimes(2); + expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function)); + }); + }); + + describe("startCron", () => { + sidequestTest("runs releaseStaleJobs when the stale cron executes", async ({ config }) => { + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; + await cronCallback(); + + expect(releaseStaleJobs).toHaveBeenCalledWith( + expect.any(Object), + config.releaseStaleJobsMaxStaleMs, + config.releaseStaleJobsMaxClaimedMs, + ); + }); + + sidequestTest("runs cleanupFinishedJobs when the cleanup cron executes", async ({ config }) => { + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown; + await cronCallback(); + + expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), config.cleanupFinishedJobsOlderThan); + }); + + sidequestTest("does not schedule routines that are disabled", async ({ backend, config }) => { + const disabled = new WorkerRuntime(backend, { + ...config, + releaseStaleJobsIntervalMin: false, + cleanupFinishedJobsIntervalMin: false, + }); + + await disabled.startCron(); + + expect(cronMocks.schedule).not.toHaveBeenCalled(); + }); + + sidequestTest("swallows and logs errors thrown by a routine", async () => { + (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(new Error("fail")); + + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; + await expect(cronCallback()).resolves.toBeUndefined(); + expect(releaseStaleJobs).toHaveBeenCalled(); + }); + }); + + describe("shutdown", () => { + sidequestTest("stops the scheduled cron tasks and drains the dispatcher", async () => { + const stop = vi.fn(); + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop }); + vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined); + const dispatcherStop = vi.spyOn(Dispatcher.prototype, "stop").mockResolvedValue(undefined); + + await runtime.start(); + await runtime.shutdown(); + + expect(stop).toHaveBeenCalledTimes(2); + expect(dispatcherStop).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/engine/src/workers/worker-runtime.ts b/packages/engine/src/workers/worker-runtime.ts new file mode 100644 index 00000000..3a061019 --- /dev/null +++ b/packages/engine/src/workers/worker-runtime.ts @@ -0,0 +1,108 @@ +import { Backend } from "@sidequest/backend"; +import { logger } from "@sidequest/core"; +import cron, { ScheduledTask } from "node-cron"; +import { NonNullableEngineConfig } from "../engine"; +import { Dispatcher } from "../execution/dispatcher"; +import { ExecutorManager } from "../execution/executor-manager"; +import { QueueManager } from "../execution/queue-manager"; +import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; +import { releaseStaleJobs } from "../routines/release-stale-jobs"; + +/** + * Owns the runtime side of the engine: the dispatcher loop plus the stale-job and finished-job + * cron routines. + * + * It runs either inside the forked worker process (driven by {@link MainWorker}) or directly in the + * host process when the engine is started with `fork: false`. It does NOT own the backend + * lifecycle: closing the backend remains the responsibility of the {@link Engine} that created it. + */ +export class WorkerRuntime { + private dispatcher: Dispatcher; + private cronTasks: ScheduledTask[] = []; + + /** + * Creates a new WorkerRuntime. + * @param backend The backend instance. + * @param config The non-nullable engine configuration. + */ + constructor( + private backend: Backend, + private config: NonNullableEngineConfig, + ) { + this.dispatcher = new Dispatcher( + backend, + new QueueManager(backend, config.queues, config.queueDefaults), + new ExecutorManager(backend, config), + config.jobPollingInterval, + ); + } + + /** + * Starts the dispatcher loop and the cron routines. + */ + async start(): Promise { + this.dispatcher.start(); + await this.startCron(); + } + + /** + * Stops the cron routines and drains the dispatcher. Unlike the forked worker (which relies on + * process exit), the in-process runtime must explicitly stop its cron tasks to avoid leaks. + */ + async shutdown(): Promise { + logger("WorkerRuntime").debug("Stopping cron routines"); + // ScheduledTask.stop() returns `void | Promise`; normalize before aggregating. + await Promise.all(this.cronTasks.map((task) => Promise.resolve(task.stop()))); + this.cronTasks = []; + logger("WorkerRuntime").debug("Stopping dispatcher"); + await this.dispatcher.stop(); + } + + /** + * Schedules the stale-job and finished-job cron routines according to the config and runs each + * once immediately. Either routine can be disabled with a `false` interval. + */ + async startCron(): Promise { + const promises: Promise[] = []; + + if (this.config.releaseStaleJobsIntervalMin !== false) { + promises.push( + this.scheduleAndRun(this.config.releaseStaleJobsIntervalMin, "ReleaseStaleJob", () => + releaseStaleJobs( + this.backend, + this.config.releaseStaleJobsMaxStaleMs, + this.config.releaseStaleJobsMaxClaimedMs, + ), + ), + ); + } + + if (this.config.cleanupFinishedJobsIntervalMin !== false) { + promises.push( + this.scheduleAndRun(this.config.cleanupFinishedJobsIntervalMin, "CleanupJob", () => + cleanupFinishedJobs(this.backend, this.config.cleanupFinishedJobsOlderThan), + ), + ); + } + + await Promise.all(promises).catch((error) => logger("WorkerRuntime").error(error)); + } + + /** + * Schedules a recurring task at the given minute interval, tracks it for shutdown, and triggers + * an immediate run. + */ + private scheduleAndRun(intervalMin: number, name: string, task: () => Promise): Promise { + logger("WorkerRuntime").debug(`Starting ${name} cron with interval: ${intervalMin} minutes`); + const scheduled = cron.schedule(`*/${intervalMin} * * * *`, async () => { + try { + logger("WorkerRuntime").debug(`Running ${name} task`); + await task(); + } catch (error: unknown) { + logger("WorkerRuntime").error(`Error on running ${name}!`, error); + } + }); + this.cronTasks.push(scheduled); + return scheduled.execute(); + } +} From 437b70ed8b817a5f76171eb5c6a130297cda4560 Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:28:32 -0300 Subject: [PATCH 03/10] feat: deliver timeout/cancel to jobs via this.abortSignal (#182) * feat: deliver timeout/cancel to jobs via this.abortSignal Adds an AbortSignal to the Job base class, available inside run() as `this.abortSignal`. The engine aborts it on timeout (reason: JobTimeout) and on cancellation (reason: JobCanceled), so jobs can stop cooperatively (`fetch(url, { signal: this.abortSignal })`, `throwIfAborted()`, etc.). In inline mode this is the only way a running job can be stopped, so timeout and cancel now work there cooperatively (superseding the previous approach of disabling cancel inline). On cancel the in-memory job is marked canceled so the terminal transition is skipped by shouldRun and does not overwrite the state. The executor now uses an AbortController instead of an EventEmitter and detects aborts via signal.aborted rather than matching the rejection message. Thread mode is unchanged: piscina still terminates the worker on abort. * refactor(engine): simplify abort handling in executor Use a single const AbortController (drop the | undefined and the alias) and guard the terminal transition with !controller.signal.aborted instead of mutating the in-memory job state, so a job that ignores the abort signal and runs to completion cannot overwrite the canceled/retry state. Adds a regression test for the ignored-signal case. * feat: cooperative abort in thread mode + abortGracePeriodMs (#183) * feat: cooperative abort in thread mode with configurable hard-kill grace Thread-mode jobs can now honor this.abortSignal too. On timeout/cancel the engine transfers a MessagePort to the worker and posts the abort (with its JobTimeout/JobCanceled reason); the worker turns that into the job's abortSignal. A new abortGracePeriodMs config controls how long to wait for the job to stop cooperatively before forcibly terminating the worker thread. abortGracePeriodMs defaults to 0, which preserves the current behavior: the worker is terminated immediately with no cooperative window and no port overhead. The port machinery is only set up when the grace period is positive. Inline is unaffected (no thread to terminate; grace does not apply). * test: cover the worker-side abort port path Adds a test that drives run() through the abortPort branch (thread mode): a real MessageChannel delivers an abort message, signalFromAbortPort rebuilds the JobTimeout reason, and the job observes it via this.abortSignal. Closes the gap where only the engine-side port posting and the inline signal path were tested. --- packages/core/src/job/abort-reason.ts | 31 +++++++ packages/core/src/job/index.ts | 1 + packages/core/src/job/job.test.ts | 15 +++ packages/core/src/job/job.ts | 20 ++++ packages/engine/src/engine.ts | 23 ++++- .../src/execution/executor-manager.test.ts | 48 +++++++--- .../engine/src/execution/executor-manager.ts | 40 +++++--- .../src/shared-runner/inline-runner.test.ts | 7 +- .../engine/src/shared-runner/inline-runner.ts | 10 +- .../engine/src/shared-runner/job-runner.ts | 7 +- .../src/shared-runner/runner-pool.test.ts | 56 +++++++++-- .../engine/src/shared-runner/runner-pool.ts | 52 ++++++++++- .../engine/src/shared-runner/runner.test.ts | 93 ++++++++++++++++++- packages/engine/src/shared-runner/runner.ts | 47 +++++++++- .../engine/src/test-jobs/abort-aware-job.js | 19 ++++ 15 files changed, 417 insertions(+), 52 deletions(-) create mode 100644 packages/core/src/job/abort-reason.ts create mode 100644 packages/engine/src/test-jobs/abort-aware-job.js diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts new file mode 100644 index 00000000..97ffe8c9 --- /dev/null +++ b/packages/core/src/job/abort-reason.ts @@ -0,0 +1,31 @@ +/** + * Reason set on a job's `abortSignal` when the engine aborts a running job. + * + * Inspect `job.abortSignal.reason` inside `run` to tell why the job is being aborted, e.g. to log + * or clean up differently for a timeout vs an explicit cancellation. + */ +export type AbortReason = JobTimeout | JobCanceled; + +/** + * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`. + */ +export class JobTimeout extends Error { + /** The timeout, in milliseconds, that was exceeded. */ + readonly timeoutMs: number; + + constructor(timeoutMs: number) { + super(`Job timed out after ${timeoutMs}ms`); + this.name = "JobTimeout"; + this.timeoutMs = timeoutMs; + } +} + +/** + * Set as the `abortSignal.reason` when a running job is aborted because it was canceled. + */ +export class JobCanceled extends Error { + constructor() { + super("Job was canceled"); + this.name = "JobCanceled"; + } +} diff --git a/packages/core/src/job/index.ts b/packages/core/src/job/index.ts index 9fc50091..a9016069 100644 --- a/packages/core/src/job/index.ts +++ b/packages/core/src/job/index.ts @@ -1 +1,2 @@ +export * from "./abort-reason"; export * from "./job"; diff --git a/packages/core/src/job/job.test.ts b/packages/core/src/job/job.test.ts index 74706418..4124f5f1 100644 --- a/packages/core/src/job/job.test.ts +++ b/packages/core/src/job/job.test.ts @@ -37,6 +37,21 @@ describe("job.ts", () => { expect(transition.result).toBe("foo bar"); }); + it("exposes a non-aborting abortSignal by default", () => { + const job = new DummyJob(); + expect(job.abortSignal).toBeInstanceOf(AbortSignal); + expect(job.abortSignal.aborted).toBe(false); + }); + + it("injects the abort signal at runtime", () => { + const job = new DummyJob(); + const controller = new AbortController(); + job.injectAbortSignal(controller.signal); + expect(job.abortSignal).toBe(controller.signal); + controller.abort(); + expect(job.abortSignal.aborted).toBe(true); + }); + it("creates a fail transition", () => { const job = new DummyJob(); const transition = job.fail("error"); diff --git a/packages/core/src/job/job.ts b/packages/core/src/job/job.ts index e32bd9c7..59cf2730 100644 --- a/packages/core/src/job/job.ts +++ b/packages/core/src/job/job.ts @@ -79,6 +79,18 @@ export abstract class Job implements JobData { readonly backoff_strategy!: BackoffStrategy; readonly retry_delay!: number | null; + /** + * Signal that fires when the engine aborts this run (timeout or cancellation). Available inside + * `run`. Pass it to abort-aware APIs (e.g. `fetch(url, { signal: this.abortSignal })`) or check it + * cooperatively (`this.abortSignal.throwIfAborted()`, `this.abortSignal.aborted`). The reason is a + * `JobTimeout` or `JobCanceled` (see `abortSignal.reason`). + * + * In `runner: "inline"` mode this is the only way a running job can be stopped. In `"thread"` mode + * the worker is also terminated, so honoring the signal is optional (but enables graceful cleanup). + * Defaults to a signal that never aborts when no run is in progress. + */ + readonly abortSignal: AbortSignal = new AbortController().signal; + /** * Initializes the job and resolves its script path. */ @@ -102,6 +114,14 @@ export abstract class Job implements JobData { Object.assign(this, jobData); } + /** + * Injects the abort signal for this run into the job instance at runtime. + * @param signal The abort signal the engine controls for this execution. + */ + injectAbortSignal(signal: AbortSignal): void { + Object.assign(this, { abortSignal: signal }); + } + /** * The class name of this job. */ diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 0db99da0..c53e5904 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -58,14 +58,28 @@ export interface EngineConfig { * * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation * and lets timeouts/cancellation forcibly abort a running job. - * - `"inline"`: jobs run in the current process/thread, with no worker pool. Timeouts and - * cancellation become best-effort (a running job cannot be forcibly aborted) and a CPU-bound - * job will block the event loop. Useful for single-process setups (serverless, tests, SQLite) - * and required when jobs need access to live in-process state. + * - `"inline"`: jobs run in the current process/thread, with no worker pool. A running job cannot + * be forcibly terminated, so timeouts and cancellation are delivered cooperatively via + * `this.abortSignal` inside the job (the job must honor it to stop early); a job that ignores it + * runs to completion, and a CPU-bound job will block the event loop. Useful for single-process + * setups (serverless, tests, SQLite) and required when jobs need access to live in-process state. * * Defaults to `"thread"`. */ runner?: "thread" | "inline"; + /** + * Grace period, in milliseconds, between cooperatively aborting a running job (timeout or + * cancellation) and forcibly terminating its worker thread. + * + * Only applies to `runner: "thread"`. When greater than `0`, the abort is first delivered to the + * job via `this.abortSignal` so it can stop and clean up; if it has not finished after this many + * milliseconds, the worker thread is terminated. When `0` (default), the worker is terminated + * immediately with no cooperative window, preserving the previous behavior. Has no effect in + * `runner: "inline"` (there is no thread to terminate). + * + * Defaults to `0`. + */ + abortGracePeriodMs?: number; /** Minimum number of worker threads to use. Defaults to number of CPUs */ minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ @@ -189,6 +203,7 @@ export class Engine { gracefulShutdown: config?.gracefulShutdown ?? true, fork: config?.fork ?? true, runner: config?.runner ?? "thread", + abortGracePeriodMs: config?.abortGracePeriodMs ?? 0, minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000, diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 011b46a0..872d7722 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,7 +1,6 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; +import { CompletedResult, CompleteTransition, JobData, RetryTransition, RunTransition } from "@sidequest/core"; import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; @@ -72,7 +71,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); await executorManager.destroy(); @@ -89,7 +88,7 @@ describe("ExecutorManager", () => { await executorManager.execute(queryConfig, jobData); - expect(inlineRunMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(inlineRunMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).not.toHaveBeenCalled(); await executorManager.destroy(); inlineRunMock.mockReset(); @@ -102,9 +101,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -119,7 +118,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -133,6 +132,33 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest( + "does not overwrite the canceled state when an aborted job ignores the signal", + async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled mid-run but ignores the abort signal and runs to completion. + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + while (!signal.aborted) { + await new Promise((r) => setTimeout(r, 50)); + } + return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult; + }); + + await executorManager.execute(queryConfig, jobData); + + // The terminal completion transition must be skipped so the canceled state is preserved. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, expect.anything(), expect.any(CompleteTransition)); + + await executorManager.destroy(); + }, + ); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); @@ -140,9 +166,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -156,7 +182,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -186,7 +212,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 4c7a145b..743da59c 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,6 +1,14 @@ import { Backend } from "@sidequest/backend"; -import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; +import { + JobCanceled, + JobData, + JobTimeout, + JobTransitionFactory, + logger, + QueueConfig, + RetryTransition, + RunTransition, +} from "@sidequest/core"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; import { JobTransitioner } from "../job/job-transitioner"; @@ -91,6 +99,7 @@ export class ExecutorManager { */ async execute(queueConfig: QueueConfig, job: JobData): Promise { let isRunning = false; + const controller = new AbortController(); try { logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); // We call prepareJob here again to make sure the jobs are in the queues. @@ -100,13 +109,12 @@ export class ExecutorManager { job = await JobTransitioner.apply(this.backend, job, new RunTransition()); isRunning = true; - const signal = new EventEmitter(); const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); + logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); + controller.abort(new JobCanceled()); isRunning = false; return; } @@ -117,13 +125,13 @@ export class ExecutorManager { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const runPromise = this.jobRunner.run(job, signal); + const runPromise = this.jobRunner.run(job, controller.signal); if (job.timeout) { void new Promise(() => { setTimeout(() => { logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); - signal.emit("abort"); + controller.abort(new JobTimeout(job.timeout!)); void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); }, job.timeout!); }); @@ -132,14 +140,22 @@ export class ExecutorManager { const result = await runPromise; isRunning = false; - logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); - const transition = JobTransitionFactory.create(result); - await JobTransitioner.apply(this.backend, job, transition); + // If the job was aborted (canceled or timed out) the terminal state was already decided by that + // path. Skip the terminal transition so a job that ignored the signal and ran to completion does + // not overwrite the canceled/retry state. + if (!controller.signal.aborted) { + logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); + const transition = JobTransitionFactory.create(result); + await JobTransitioner.apply(this.backend, job, transition); + } } catch (error: unknown) { isRunning = false; const err = error as Error; - if (err.message === "The task has been aborted") { - logger("Executor Manager").debug(`Job ${job.id} was aborted`); + // The thread runner rejects the run when its worker is aborted. Detect that via the signal + // rather than the rejection message (which varies). The terminal state was already set by the + // timeout (retry) or cancellation (canceled) path, so there is nothing more to do here. + if (controller.signal.aborted) { + logger("Executor Manager").debug(`Job ${job.id} was aborted: ${String(controller.signal.reason)}`); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); diff --git a/packages/engine/src/shared-runner/inline-runner.test.ts b/packages/engine/src/shared-runner/inline-runner.test.ts index 225538e4..74bc540f 100644 --- a/packages/engine/src/shared-runner/inline-runner.test.ts +++ b/packages/engine/src/shared-runner/inline-runner.test.ts @@ -40,9 +40,10 @@ describe("InlineRunner", () => { expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); }); - sidequestTest("delegates to the runner with inline enabled (skips config injection)", async ({ config }) => { - await runner.run(jobData); - expect(run).toHaveBeenCalledWith({ jobData, config, inline: true }); + sidequestTest("delegates to the runner with inline enabled and forwards the signal", async ({ config }) => { + const signal = new AbortController().signal; + await runner.run(jobData, signal); + expect(run).toHaveBeenCalledWith({ jobData, config, inline: true, signal }); }); sidequestTest("destroy is a no-op and does not throw", () => { diff --git a/packages/engine/src/shared-runner/inline-runner.ts b/packages/engine/src/shared-runner/inline-runner.ts index 2631717e..f5ceb0fc 100644 --- a/packages/engine/src/shared-runner/inline-runner.ts +++ b/packages/engine/src/shared-runner/inline-runner.ts @@ -19,14 +19,16 @@ export class InlineRunner implements JobRunner { constructor(private nonNullConfig: NonNullableEngineConfig) {} /** - * Runs a job in the current process. The abort signal is intentionally not accepted: inline - * execution has no separate thread to terminate. + * Runs a job in the current process. The abort signal is forwarded to the job (as + * `this.abortSignal`) so it can stop cooperatively: inline execution has no separate thread to + * terminate, so this is the only way timeouts and cancellation can take effect. * @param job The job data to run. + * @param signal Abort signal handed to the job for cooperative cancellation. * @returns A promise resolving to the job result. */ - run(job: JobData): Promise { + run(job: JobData, signal?: AbortSignal): Promise { logger("InlineRunner").debug(`Running job ${job.id} inline`); - return run({ jobData: job, config: this.nonNullConfig, inline: true }); + return run({ jobData: job, config: this.nonNullConfig, inline: true, signal }); } /** diff --git a/packages/engine/src/shared-runner/job-runner.ts b/packages/engine/src/shared-runner/job-runner.ts index 6658b3dc..14d851f0 100644 --- a/packages/engine/src/shared-runner/job-runner.ts +++ b/packages/engine/src/shared-runner/job-runner.ts @@ -1,5 +1,4 @@ import { JobData, JobResult } from "@sidequest/core"; -import EventEmitter from "events"; /** * Abstraction over how a claimed job is actually executed. @@ -12,10 +11,10 @@ export interface JobRunner { /** * Runs a job and resolves with its result. * @param job The job data to run. - * @param signal Optional event emitter used to request cancellation/abort. May be ignored by - * implementations that cannot forcibly abort a running job (e.g. the inline runner). + * @param signal Abort signal for the run. The thread runner uses it to terminate the worker; the + * inline runner forwards it to the job so it can stop cooperatively. */ - run(job: JobData, signal?: EventEmitter): Promise; + run(job: JobData, signal?: AbortSignal): Promise; /** * Releases any resources held by the runner. diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 8c4bba95..d725f763 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -1,8 +1,9 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { JobData } from "@sidequest/core"; -import EventEmitter from "events"; +import { JobData, JobTimeout } from "@sidequest/core"; +import { MessagePort } from "node:worker_threads"; import { beforeEach, describe, expect, vi } from "vitest"; import { DummyJob } from "../test-jobs/dummy-job"; +import { AbortPortMessage } from "./runner"; import { RunnerPool } from "./runner-pool"; const piscinaMockInstance = { @@ -39,14 +40,57 @@ describe("RunnerPool", () => { pool = new RunnerPool(config); }); - sidequestTest("should call pool.run with job data", async ({ config }) => { - const emiter = new EventEmitter(); - const result = await pool.run(jobData, emiter); + sidequestTest("passes the abort signal straight to piscina when grace is 0", async ({ config }) => { + const signal = new AbortController().signal; + const result = await pool.run(jobData, signal); expect(result).toEqual({ type: "completed", result: "ok" }); - expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal: emiter }); + expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest( + "with a grace period, delivers the abort over a port and hard-kills after the grace", + async ({ config }) => { + vi.useFakeTimers(); + try { + const gracePool = new RunnerPool({ ...config, abortGracePeriodMs: 1000 }); + + let capturedPort: MessagePort | undefined; + let hardKillSignal: AbortSignal | undefined; + piscinaMockInstance.run.mockImplementationOnce( + (value: { abortPort: MessagePort }, opts: { signal: AbortSignal }) => { + capturedPort = value.abortPort; + hardKillSignal = opts.signal; + // Resolve only once piscina is asked to terminate (hard kill). + return new Promise((resolve) => + opts.signal.addEventListener("abort", () => resolve({ type: "completed", result: "killed" })), + ); + }, + ); + + const controller = new AbortController(); + const runPromise = gracePool.run(jobData, controller.signal); + + const message = new Promise((resolve) => + capturedPort!.once("message", (m: AbortPortMessage) => resolve(m)), + ); + + controller.abort(new JobTimeout(5000)); + + expect(await message).toEqual({ kind: "timeout", timeoutMs: 5000 }); + expect(hardKillSignal!.aborted).toBe(false); + + // Grace elapses -> piscina is asked to terminate the worker. + await vi.advanceTimersByTimeAsync(1000); + expect(hardKillSignal!.aborted).toBe(true); + + await runPromise; + } finally { + vi.useRealTimers(); + } + }, + ); + sidequestTest("should call pool.destroy", () => { pool.destroy(); expect(piscinaMockInstance.destroy).toHaveBeenCalled(); diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 224d6aca..da034bd7 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,9 +1,10 @@ -import { JobData, JobResult, logger } from "@sidequest/core"; -import EventEmitter from "events"; +import { JobData, JobResult, JobTimeout, logger } from "@sidequest/core"; +import { MessageChannel } from "node:worker_threads"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; import { JobRunner } from "./job-runner"; +import type { AbortPortMessage } from "./runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. @@ -30,13 +31,54 @@ export class RunnerPool implements JobRunner { /** * Runs a job in the worker pool. + * + * With `abortGracePeriodMs === 0` (default), an abort terminates the worker immediately. With a + * positive grace period, the abort is delivered to the job cooperatively over a transferred port + * (so it can stop via `this.abortSignal`), and the worker is only forcibly terminated if it has + * not finished within the grace period. + * * @param job The job data to run. - * @param signal Optional event emitter for cancellation. + * @param signal Optional abort signal for cancellation/timeout. * @returns A promise resolving to the job result. */ - run(job: JobData, signal?: EventEmitter): Promise { + run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); - return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + const grace = this.nonNullConfig.abortGracePeriodMs; + + if (!signal || grace <= 0) { + // Abort terminates the worker immediately. + return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + } + + // Deliver the abort cooperatively first, then hard-terminate after the grace period. + const channel = new MessageChannel(); + const hardKill = new AbortController(); + let graceTimer: ReturnType | undefined; + + const onAbort = () => { + const reason: unknown = signal.reason; + const message: AbortPortMessage = + reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; + channel.port1.postMessage(message); + graceTimer = setTimeout(() => hardKill.abort(), grace); + }; + + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + + return this.pool + .run( + { jobData: job, config: this.nonNullConfig, abortPort: channel.port2 }, + { transferList: [channel.port2], signal: hardKill.signal }, + ) + .finally(() => { + if (graceTimer) clearTimeout(graceTimer); + signal.removeEventListener("abort", onAbort); + channel.port1.close(); + }); } /** diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 7fde6c15..38378c8f 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -1,9 +1,11 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { FailedResult, JobData } from "@sidequest/core"; +import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core"; import { existsSync, unlinkSync } from "node:fs"; import { unlink, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; +import { MessageChannel } from "node:worker_threads"; import { vi } from "vitest"; +import { AbortAwareJob } from "../test-jobs/abort-aware-job"; import { DummyJob } from "../test-jobs/dummy-job"; import { DummyJobWithArgs } from "../test-jobs/dummy-job-with-args"; import { importSidequest } from "../utils/import"; @@ -65,6 +67,95 @@ describe("runner.ts", () => { expect(importSidequest).not.toHaveBeenCalled(); }); + sidequestTest("injects the abort signal into the job when provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + const signal = new AbortController().signal; + await run({ jobData, config, signal }); + expect(injectSpy).toHaveBeenCalledWith(signal); + injectSpy.mockRestore(); + }); + + sidequestTest("does not inject an abort signal when none is provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + await run({ jobData, config }); + expect(injectSpy).not.toHaveBeenCalled(); + injectSpy.mockRestore(); + }); + + sidequestTest("a job receives the abort signal and its reason and stops", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + const resultPromise = run({ jobData: abortJobData, config, inline: true, signal: controller.signal }); + controller.abort(new JobCanceled()); + + // The job resolves once it observes the abort (whether before it starts or while awaiting it). + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("JobCanceled"); + }); + + sidequestTest("a job sees an already-aborted signal before it starts", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + controller.abort(new JobTimeout(50)); + + const result = (await run({ + jobData: abortJobData, + config, + inline: true, + signal: controller.signal, + })) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("aborted before start: JobTimeout"); + }); + + sidequestTest("a thread job is aborted via the abort port (rebuilds the reason)", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const channel = new MessageChannel(); + // Thread path: no live signal, the abort arrives over the transferred port. + const resultPromise = run({ jobData: abortJobData, config, abortPort: channel.port2 }); + channel.port1.postMessage({ kind: "timeout", timeoutMs: 1234 }); + + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + // The worker reconstructs the JobTimeout reason from the port message. + expect(result.error.message).toContain("JobTimeout"); + channel.port1.close(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 458bd1fe..3ef3e6bf 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -1,10 +1,40 @@ -import { Job, JobClassType, JobData, JobResult, logger, resolveScriptPathForJob, toErrorData } from "@sidequest/core"; +import { + Job, + JobCanceled, + JobClassType, + JobData, + JobResult, + JobTimeout, + logger, + resolveScriptPathForJob, + toErrorData, +} from "@sidequest/core"; import { existsSync } from "node:fs"; import { fileURLToPath } from "node:url"; +import { MessagePort } from "node:worker_threads"; import { EngineConfig } from "../engine"; import { importSidequest } from "../utils"; import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader"; +/** Message posted by the engine over the abort port to cooperatively abort a worker-thread job. */ +export type AbortPortMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; + +/** + * Builds an {@link AbortSignal} for a worker-thread job from an abort port. + * + * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine + * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a + * local signal carrying the proper {@link JobTimeout}/{@link JobCanceled} reason. + */ +function signalFromAbortPort(port: MessagePort): AbortSignal { + const controller = new AbortController(); + port.on("message", (message: AbortPortMessage) => { + const reason = message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); + controller.abort(reason); + }); + return controller.signal; +} + /** * Runs a job by dynamically importing its script and executing the specified class. * @param jobData The job data containing script and class information @@ -15,10 +45,14 @@ export default async function run({ jobData, config, inline, + signal, + abortPort, }: { jobData: JobData; config: EngineConfig; inline?: boolean; + signal?: AbortSignal; + abortPort?: MessagePort; }): Promise { // In inline mode the job runs in the host process, where Sidequest is already configured, so // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. @@ -77,9 +111,18 @@ export default async function run({ const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); + // Inline passes a live signal directly; the thread runner gets an abort port it turns into one. + const abortSignal = signal ?? (abortPort ? signalFromAbortPort(abortPort) : undefined); + if (abortSignal) { + job.injectAbortSignal(abortSignal); + } logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args); - return job.perform(...jobData.args); + try { + return await job.perform(...jobData.args); + } finally { + abortPort?.close(); + } } /** diff --git a/packages/engine/src/test-jobs/abort-aware-job.js b/packages/engine/src/test-jobs/abort-aware-job.js new file mode 100644 index 00000000..bc9704de --- /dev/null +++ b/packages/engine/src/test-jobs/abort-aware-job.js @@ -0,0 +1,19 @@ +import { Job } from "@sidequest/core"; + +/** + * A job that honors `this.abortSignal`: it waits until aborted and reports the abort reason. + * Used to verify cooperative cancellation/timeout end-to-end through the runner. + */ +export class AbortAwareJob extends Job { + async run() { + if (this.abortSignal.aborted) { + return this.fail(`aborted before start: ${this.abortSignal.reason?.name}`); + } + + await new Promise((resolve) => { + this.abortSignal.addEventListener("abort", resolve, { once: true }); + }); + + return this.fail(`aborted: ${this.abortSignal.reason?.name}`); + } +} From 8b75656a5e16f56084ec8dc21077d857c2371e8d Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 14:34:54 -0300 Subject: [PATCH 04/10] style: wrap long assertion in executor-manager test (prettier) --- packages/engine/src/execution/executor-manager.test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 872d7722..0e8b6b1e 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -153,7 +153,11 @@ describe("ExecutorManager", () => { // The terminal completion transition must be skipped so the canceled state is preserved. // eslint-disable-next-line @typescript-eslint/unbound-method - expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, expect.anything(), expect.any(CompleteTransition)); + expect(JobTransitioner.apply).not.toHaveBeenCalledWith( + backend, + expect.anything(), + expect.any(CompleteTransition), + ); await executorManager.destroy(); }, From a8a71b432a41fd49c34ee150b64e8be92e6210f1 Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 16:22:23 -0300 Subject: [PATCH 05/10] fix: unify timeout/cancel handling; decide terminal state when the run settles (#184) * fix: in inline mode, let the job's result decide on timeout/cancel An inline run cannot be force-stopped, so eagerly applying a RetryTransition when the timeout fires (while the job keeps running) re-queued the job and could run a second copy concurrently. In inline mode the timeout/cancel now only signal via this.abortSignal; the terminal state is decided when the job actually settles: a job that honors the signal returns its own result, one that ignores it simply completes. Thread mode is unchanged (the worker is terminated, so the abort still decides). Also capture the timeout handle and clearTimeout on settle so a job that finishes before its timeout no longer leaves a dangling timer (it kept the event loop alive and retained job/backend refs until it fired). * refactor: address code-review follow-ups (robustness + cleanup) - executor-manager: guard the cancellation poll against a missing job row (watchedJob?.state) and attach a .catch so the polling loop can't die on an unhandled rejection; log the underlying rejection when a job is aborted so a real post-abort error is not silently lost. - core: move the abort-reason wire format and its (de)serialization into @sidequest/core (AbortReasonMessage + serializeAbortReason/deserializeAbortReason) so the encode (runner-pool) and decode (runner) no longer duplicate the mapping. - runner: replace the nested ternary for picking the abort signal with an explicit if-chain (the two inputs are mutually exclusive by construction). * refactor: unify timeout/cancel handling around the run's settle Replace the inline-vs-thread special-casing with a single rule that holds for every runner/grace combination: - The terminal transition is only applied once the run actually ends, so a still-running job is never re-queued underneath itself (fixes the double execution in inline and in thread with abortGracePeriodMs > 0). - If the job returned a state (resolved), respect it and transition to that state, even if a timeout/cancel was signaled. - If the run was hard-killed (rejected, no result), the abort reason decides: timeout -> retry, cancellation -> canceled. - A real error -> retry, as before. The timeout timer now only signals the abort; it no longer eagerly applies a retry. Removes the `inline` branch in execute(). --- packages/core/src/job/abort-reason.test.ts | 24 ++++ packages/core/src/job/abort-reason.ts | 23 ++++ .../src/execution/executor-manager.test.ts | 125 ++++++++++++++---- .../engine/src/execution/executor-manager.ts | 49 ++++--- .../src/shared-runner/runner-pool.test.ts | 7 +- .../engine/src/shared-runner/runner-pool.ts | 8 +- packages/engine/src/shared-runner/runner.ts | 24 ++-- 7 files changed, 192 insertions(+), 68 deletions(-) create mode 100644 packages/core/src/job/abort-reason.test.ts diff --git a/packages/core/src/job/abort-reason.test.ts b/packages/core/src/job/abort-reason.test.ts new file mode 100644 index 00000000..1aecbf4a --- /dev/null +++ b/packages/core/src/job/abort-reason.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from "vitest"; +import { deserializeAbortReason, JobCanceled, JobTimeout, serializeAbortReason } from "./abort-reason"; + +describe("abort-reason", () => { + it("round-trips a JobTimeout reason through the wire form", () => { + const message = serializeAbortReason(new JobTimeout(1500)); + expect(message).toEqual({ kind: "timeout", timeoutMs: 1500 }); + + const reason = deserializeAbortReason(message); + expect(reason).toBeInstanceOf(JobTimeout); + expect((reason as JobTimeout).timeoutMs).toBe(1500); + }); + + it("round-trips a JobCanceled reason through the wire form", () => { + const message = serializeAbortReason(new JobCanceled()); + expect(message).toEqual({ kind: "canceled" }); + expect(deserializeAbortReason(message)).toBeInstanceOf(JobCanceled); + }); + + it("treats any non-timeout reason as canceled", () => { + expect(serializeAbortReason(new Error("boom"))).toEqual({ kind: "canceled" }); + expect(serializeAbortReason(undefined)).toEqual({ kind: "canceled" }); + }); +}); diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts index 97ffe8c9..b9611981 100644 --- a/packages/core/src/job/abort-reason.ts +++ b/packages/core/src/job/abort-reason.ts @@ -29,3 +29,26 @@ export class JobCanceled extends Error { this.name = "JobCanceled"; } } + +/** + * Structured-clone-safe wire form of an {@link AbortReason}, used to convey the reason to a job + * running in a worker thread (a live {@link AbortSignal} cannot cross the thread boundary). + */ +export type AbortReasonMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; + +/** + * Encodes an abort reason into its wire form. Anything that is not a {@link JobTimeout} is treated + * as a cancellation. + * @param reason The abort reason (typically `signal.reason`). + */ +export function serializeAbortReason(reason: unknown): AbortReasonMessage { + return reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; +} + +/** + * Rebuilds the proper {@link AbortReason} instance from its wire form. + * @param message The wire-form message. + */ +export function deserializeAbortReason(message: AbortReasonMessage): AbortReason { + return message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); +} diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0e8b6b1e..0b6d27b8 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,6 +1,13 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, CompleteTransition, JobData, RetryTransition, RunTransition } from "@sidequest/core"; +import { + CancelTransition, + CompletedResult, + CompleteTransition, + JobData, + RetryTransition, + RunTransition, +} from "@sidequest/core"; import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; @@ -94,6 +101,54 @@ describe("ExecutorManager", () => { inlineRunMock.mockReset(); }); + sidequestTest( + "inline: a job that ignores the timeout completes instead of being retried", + async ({ backend, config }) => { + jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 20 }); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + // The job ignores the abort signal and completes after the timeout has already fired. + inlineRunMock.mockImplementationOnce( + () => + new Promise((resolve) => + setTimeout(() => resolve({ __is_job_transition__: true, type: "completed", result: "done" }), 60), + ), + ); + + await executorManager.execute(queryConfig, jobData); + + // Timeout fired (signal aborted) but inline applies the job's own result: completed, not a retry. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition)); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }, + ); + + sidequestTest( + "inline: a job that ignores cancellation completes (its result wins)", + async ({ backend, config }) => { + // Pre-cancel in the DB so the first cancellation poll observes it immediately. JobTransitioner is + // mocked here, so the RunTransition does not overwrite the persisted state. + jobData = await backend.updateJob({ ...jobData, state: "canceled" }); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + inlineRunMock.mockResolvedValue({ __is_job_transition__: true, type: "completed", result: "done" }); + + await executorManager.execute(queryConfig, jobData); + + // Inline cannot force-stop the job; it completed, so its result is applied. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }, + ); + sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); @@ -132,36 +187,54 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest( - "does not overwrite the canceled state when an aborted job ignores the signal", - async ({ backend, config }) => { - await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + sidequestTest("respects a job's returned result even after a cancel was signaled", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config); - - // The job is canceled mid-run but ignores the abort signal and runs to completion. - runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { - await backend.updateJob({ ...job, state: "canceled" }); - while (!signal.aborted) { - await new Promise((r) => setTimeout(r, 50)); - } - return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult; + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled mid-run but ignores the abort signal and returns a completed result. + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + while (!signal.aborted) { + await new Promise((r) => setTimeout(r, 50)); + } + return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult; + }); + + await executorManager.execute(queryConfig, jobData); + + // The job returned a state, so it is respected: the completion is applied. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + + await executorManager.destroy(); + }); + + sidequestTest("a hard-killed canceled job transitions to canceled", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled and never produces a result (the worker is terminated -> the run rejects). + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + return new Promise((_, reject) => { + signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); }); + }); - await executorManager.execute(queryConfig, jobData); + await executorManager.execute(queryConfig, jobData); - // The terminal completion transition must be skipped so the canceled state is preserved. - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(JobTransitioner.apply).not.toHaveBeenCalledWith( - backend, - expect.anything(), - expect.any(CompleteTransition), - ); + // No result: the abort reason (canceled) decides the terminal state. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); - await executorManager.destroy(); - }, - ); + await executorManager.destroy(); + }); sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 743da59c..b06bef68 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,5 +1,6 @@ import { Backend } from "@sidequest/backend"; import { + CancelTransition, JobCanceled, JobData, JobTimeout, @@ -100,6 +101,7 @@ export class ExecutorManager { async execute(queueConfig: QueueConfig, job: JobData): Promise { let isRunning = false; const controller = new AbortController(); + let timeoutHandle: ReturnType | undefined; try { logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); // We call prepareJob here again to make sure the jobs are in the queues. @@ -111,8 +113,10 @@ export class ExecutorManager { isRunning = true; const cancellationCheck = async () => { while (isRunning) { + // The row can be missing transiently or if it was deleted; treat that as "not canceled" + // rather than dereferencing undefined and crashing the polling loop. const watchedJob = await this.backend.getJob(job.id); - if (watchedJob!.state === "canceled") { + if (watchedJob?.state === "canceled") { logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); controller.abort(new JobCanceled()); isRunning = false; @@ -121,47 +125,50 @@ export class ExecutorManager { await new Promise((r) => setTimeout(r, 1000)); } }; - void cancellationCheck(); + void cancellationCheck().catch((error) => { + logger("Executor Manager").error(`Cancellation watcher for job ${job.id} failed:`, error); + }); logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); const runPromise = this.jobRunner.run(job, controller.signal); if (job.timeout) { - void new Promise(() => { - setTimeout(() => { - logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); - controller.abort(new JobTimeout(job.timeout!)); - void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); - }, job.timeout!); - }); + // Only signal the abort here. The terminal transition is decided when the run actually ends + // (resolve or reject) so a still-running job is never re-queued underneath itself. + timeoutHandle = setTimeout(() => { + logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); + controller.abort(new JobTimeout(job.timeout!)); + }, job.timeout); } const result = await runPromise; isRunning = false; - // If the job was aborted (canceled or timed out) the terminal state was already decided by that - // path. Skip the terminal transition so a job that ignored the signal and ran to completion does - // not overwrite the canceled/retry state. - if (!controller.signal.aborted) { - logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); - const transition = JobTransitionFactory.create(result); - await JobTransitioner.apply(this.backend, job, transition); - } + // The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled); + // respect it and transition accordingly. + logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`); + await JobTransitioner.apply(this.backend, job, JobTransitionFactory.create(result)); } catch (error: unknown) { isRunning = false; const err = error as Error; - // The thread runner rejects the run when its worker is aborted. Detect that via the signal - // rather than the rejection message (which varies). The terminal state was already set by the - // timeout (retry) or cancellation (canceled) path, so there is nothing more to do here. if (controller.signal.aborted) { - logger("Executor Manager").debug(`Job ${job.id} was aborted: ${String(controller.signal.reason)}`); + // The run produced no result because the worker was hard-killed (thread). The abort reason + // decides the terminal state: a timeout becomes a retry, anything else (cancellation) becomes + // canceled. The rejection is logged so a real error during the abort is not lost. + const reason: unknown = controller.signal.reason; + logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); + const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition(); + await JobTransitioner.apply(this.backend, job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); } } finally { isRunning = false; + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } this.activeByQueue[queueConfig.name].delete(job.id); this.activeJobs.delete(job.id); } diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index d725f763..48f21275 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -1,9 +1,8 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { JobData, JobTimeout } from "@sidequest/core"; +import { AbortReasonMessage, JobData, JobTimeout } from "@sidequest/core"; import { MessagePort } from "node:worker_threads"; import { beforeEach, describe, expect, vi } from "vitest"; import { DummyJob } from "../test-jobs/dummy-job"; -import { AbortPortMessage } from "./runner"; import { RunnerPool } from "./runner-pool"; const piscinaMockInstance = { @@ -71,8 +70,8 @@ describe("RunnerPool", () => { const controller = new AbortController(); const runPromise = gracePool.run(jobData, controller.signal); - const message = new Promise((resolve) => - capturedPort!.once("message", (m: AbortPortMessage) => resolve(m)), + const message = new Promise((resolve) => + capturedPort!.once("message", (m: AbortReasonMessage) => resolve(m)), ); controller.abort(new JobTimeout(5000)); diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index da034bd7..244c37dd 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,10 +1,9 @@ -import { JobData, JobResult, JobTimeout, logger } from "@sidequest/core"; +import { JobData, JobResult, logger, serializeAbortReason } from "@sidequest/core"; import { MessageChannel } from "node:worker_threads"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; import { JobRunner } from "./job-runner"; -import type { AbortPortMessage } from "./runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. @@ -56,10 +55,7 @@ export class RunnerPool implements JobRunner { let graceTimer: ReturnType | undefined; const onAbort = () => { - const reason: unknown = signal.reason; - const message: AbortPortMessage = - reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; - channel.port1.postMessage(message); + channel.port1.postMessage(serializeAbortReason(signal.reason)); graceTimer = setTimeout(() => hardKill.abort(), grace); }; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 3ef3e6bf..41c5d0b3 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -1,10 +1,10 @@ import { + AbortReasonMessage, + deserializeAbortReason, Job, - JobCanceled, JobClassType, JobData, JobResult, - JobTimeout, logger, resolveScriptPathForJob, toErrorData, @@ -16,21 +16,17 @@ import { EngineConfig } from "../engine"; import { importSidequest } from "../utils"; import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader"; -/** Message posted by the engine over the abort port to cooperatively abort a worker-thread job. */ -export type AbortPortMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; - /** * Builds an {@link AbortSignal} for a worker-thread job from an abort port. * * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a - * local signal carrying the proper {@link JobTimeout}/{@link JobCanceled} reason. + * local signal carrying the proper `JobTimeout`/`JobCanceled` reason. */ function signalFromAbortPort(port: MessagePort): AbortSignal { const controller = new AbortController(); - port.on("message", (message: AbortPortMessage) => { - const reason = message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); - controller.abort(reason); + port.on("message", (message: AbortReasonMessage) => { + controller.abort(deserializeAbortReason(message)); }); return controller.signal; } @@ -111,8 +107,14 @@ export default async function run({ const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); - // Inline passes a live signal directly; the thread runner gets an abort port it turns into one. - const abortSignal = signal ?? (abortPort ? signalFromAbortPort(abortPort) : undefined); + // Exactly one of these is provided: inline passes a live signal; the thread runner passes an abort + // port it turns into one. + let abortSignal: AbortSignal | undefined; + if (signal) { + abortSignal = signal; + } else if (abortPort) { + abortSignal = signalFromAbortPort(abortPort); + } if (abortSignal) { job.injectAbortSignal(abortSignal); } From 90ff994cfb4af2372cf5280e09bd26f9c698b081 Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:01:55 -0300 Subject: [PATCH 06/10] fix: abort a running job when its row is gone, not just when canceled (#185) The cancellation watcher polls the job row to detect cancellation. If the row no longer exists (deleted or truncated while the job runs), it could never observe "canceled", so a long-running, non-cooperative job was never aborted and blocked engine shutdown (executorManager.destroy waits for active jobs). Treat a missing row the same as a cancellation: abort the run so it stops and shutdown can proceed. Fixes the integration suite hang where a test truncates the DB and then stops Sidequest while a long TimeoutJob is still running. --- .../src/execution/executor-manager.test.ts | 25 +++++++++++++++++++ .../engine/src/execution/executor-manager.ts | 11 +++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0b6d27b8..70a03fb9 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -236,6 +236,31 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest("aborts a running job when its row no longer exists", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // Simulate the row being deleted/truncated while the job runs: the watcher must still stop it. + const getJobSpy = vi.spyOn(backend, "getJob").mockResolvedValue(undefined); + runMock.mockImplementationOnce( + (_job: JobData, signal: AbortSignal) => + new Promise((_, reject) => { + signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); + }), + ); + + await executorManager.execute(queryConfig, jobData); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); + expect(executorManager.totalActiveWorkers()).toBe(0); + + await executorManager.destroy(); + getJobSpy.mockRestore(); + }); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index b06bef68..68253d33 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -113,11 +113,14 @@ export class ExecutorManager { isRunning = true; const cancellationCheck = async () => { while (isRunning) { - // The row can be missing transiently or if it was deleted; treat that as "not canceled" - // rather than dereferencing undefined and crashing the polling loop. const watchedJob = await this.backend.getJob(job.id); - if (watchedJob?.state === "canceled") { - logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); + // Abort when the job was canceled, and also when its row no longer exists (deleted or + // truncated): the record is gone, so the run must be stopped rather than left to block + // shutdown forever. + if (!watchedJob || watchedJob.state === "canceled") { + logger("Executor Manager").debug( + `Aborting job ${job.id}: ${watchedJob ? "canceled" : "row no longer exists"}`, + ); controller.abort(new JobCanceled()); isRunning = false; return; From e669e1b7a88d3d5c5b01745dd3e06f4b3a943d28 Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:32:55 -0300 Subject: [PATCH 07/10] docs: execution modes + cooperative timeout/cancellation (#186) * docs: document execution modes and cooperative timeout/cancellation Adds a new "Execution Modes" page covering the fork and runner options, the four fork/runner combinations and when to use each (serverless, tests, SQLite, framework integrations), abortGracePeriodMs, and the cooperative timeout/cancellation model via this.abortSignal. Includes disclaimers for the sharp edges: no-fork removes crash isolation; inline jobs block the event loop and cannot be force-stopped (so timeouts and cancellation only work if the job honors this.abortSignal); grace=0 gives no cooperative window in thread mode; canceling a running inline job is best-effort. Also documents this.abortSignal in the job control page, the new options in the configuration reference, and cross-links from how-it-works and the lifecycle page. * docs: drop references to the not-yet-released @sidequest/nestjs package --- packages/docs/.vitepress/config.mts | 1 + .../docs/getting-started/configuration.md | 58 ++--- packages/docs/guide/jobs/lifecycle.md | 6 +- packages/docs/guide/jobs/running.md | 63 ++++-- packages/docs/introduction/how-it-works.md | 4 + packages/docs/production/execution-modes.md | 200 ++++++++++++++++++ 6 files changed, 288 insertions(+), 44 deletions(-) create mode 100644 packages/docs/production/execution-modes.md diff --git a/packages/docs/.vitepress/config.mts b/packages/docs/.vitepress/config.mts index fdac912f..f341bd3b 100644 --- a/packages/docs/.vitepress/config.mts +++ b/packages/docs/.vitepress/config.mts @@ -99,6 +99,7 @@ export default defineConfig({ collapsed: false, items: [ { text: "Backends", link: "/backends" }, + { text: "Execution Modes", link: "/execution-modes" }, { text: "Graceful Shutdown", link: "/graceful-shutdown" }, { text: "Cleanup", link: "/cleanup" }, { text: "Manual Job Resolution", link: "/manual-resolution" }, diff --git a/packages/docs/getting-started/configuration.md b/packages/docs/getting-started/configuration.md index 8be70380..442b4f95 100644 --- a/packages/docs/getting-started/configuration.md +++ b/packages/docs/getting-started/configuration.md @@ -122,6 +122,9 @@ await Sidequest.start({ minThreads: 4, maxThreads: 8, idleWorkerTimeout: 10000, // 10 seconds + fork: true, // run the engine in a child process + runner: "thread", // "thread" (worker pool) or "inline" + abortGracePeriodMs: 0, // grace before force-killing a timed-out/canceled thread job // 4. Migration and startup skipMigration: false, @@ -179,32 +182,35 @@ await Sidequest.start({ ### Configuration Options -| Option | Description | Default | -| -------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` | -| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` | -| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` | -| `dashboard.port` | Port for the dashboard web interface | `8678` | -| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` | -| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` | -| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` | -| `minThreads` | Minimum number of worker threads to use | Number of CPU cores | -| `maxThreads` | Maximum number of worker threads to use | `minThreads * 2` | -| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated | `10000` (10 seconds) | -| `skipMigration` | Whether to skip database migration on startup | `false` | -| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` | -| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) | -| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) | -| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` | -| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) | -| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` | -| `logger.json` | Whether to output logs in JSON format | `false` | -| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` | -| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` | -| `queueDefaults` | Default values for auto-created queues | `undefined` | -| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` | -| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` | -| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) | +| Option | Description | Default | +| -------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | +| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` | +| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` | +| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` | +| `dashboard.port` | Port for the dashboard web interface | `8678` | +| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` | +| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` | +| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` | +| `fork` | Run the engine in a child process (crash isolation). Set `false` to run in-process. See [Execution Modes](/production/execution-modes#fork-process-isolation) | `true` | +| `runner` | How jobs run: `"thread"` (worker pool) or `"inline"` (current thread). See [Execution Modes](/production/execution-modes#runner-thread-pool-vs-inline) | `"thread"` | +| `abortGracePeriodMs` | Grace period (ms) before a timed-out/canceled job's worker **thread** is force-killed. `0` kills immediately. No effect with `runner: "inline"`. See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) | `0` | +| `minThreads` | Minimum number of worker threads to use (`runner: "thread"` only) | Number of CPU cores | +| `maxThreads` | Maximum number of worker threads to use (`runner: "thread"` only) | `minThreads * 2` | +| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated (`runner: "thread"` only) | `10000` (10 seconds) | +| `skipMigration` | Whether to skip database migration on startup | `false` | +| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` | +| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) | +| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) | +| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` | +| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) | +| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` | +| `logger.json` | Whether to output logs in JSON format | `false` | +| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` | +| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` | +| `queueDefaults` | Default values for auto-created queues | `undefined` | +| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` | +| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` | +| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) | ::: danger If `auth` is not configured and `dashboard: true` is enabled in production, the dashboard will be publicly accessible. This is a security risk and **not recommended**. diff --git a/packages/docs/guide/jobs/lifecycle.md b/packages/docs/guide/jobs/lifecycle.md index 304d8544..53ab0d5e 100644 --- a/packages/docs/guide/jobs/lifecycle.md +++ b/packages/docs/guide/jobs/lifecycle.md @@ -67,7 +67,11 @@ Jobs can be manually canceled at any point before completion: - Waiting jobs are immediately marked as `canceled` - Claimed jobs are marked as `canceled` before execution an are prevented from running -- Running jobs receive a cancellation signal and transition to `canceled` +- Running jobs receive a cancellation signal via `this.abortSignal` and transition to `canceled` + +::: warning +Stopping a _running_ job depends on the [execution mode](/production/execution-modes#cooperative-timeout-and-cancellation). With the default thread pool the worker is terminated. In `runner: "inline"` mode the job cannot be force-stopped, so it must honor `this.abortSignal`; a running inline job that ignores it finishes with its own result instead of `canceled`. The same applies to job timeouts. +::: ## Best Practices diff --git a/packages/docs/guide/jobs/running.md b/packages/docs/guide/jobs/running.md index cf2e6c27..09a9fbe7 100644 --- a/packages/docs/guide/jobs/running.md +++ b/packages/docs/guide/jobs/running.md @@ -35,15 +35,16 @@ When you need finer control — fail without retrying, retry with a custom delay Before `run()` executes, Sidequest injects read-only properties onto `this`: -| Property | Type | Description | -| ------------------- | ----------- | -------------------------------- | -| `this.id` | `string` | Job ID | -| `this.attempt` | `number` | Current attempt number (1-based) | -| `this.max_attempts` | `number` | Maximum allowed attempts | -| `this.queue` | `string` | Queue the job is running in | -| `this.state` | `string` | Current state (`"running"`) | -| `this.inserted_at` | `Date` | When the job was first enqueued | -| `this.args` | `unknown[]` | The run arguments | +| Property | Type | Description | +| ------------------- | ------------- | --------------------------------------------------------------------------------------------------- | +| `this.id` | `string` | Job ID | +| `this.attempt` | `number` | Current attempt number (1-based) | +| `this.max_attempts` | `number` | Maximum allowed attempts | +| `this.queue` | `string` | Queue the job is running in | +| `this.state` | `string` | Current state (`"running"`) | +| `this.inserted_at` | `Date` | When the job was first enqueued | +| `this.args` | `unknown[]` | The run arguments | +| `this.abortSignal` | `AbortSignal` | Aborts when the job times out or is canceled. See [below](#responding-to-timeout-and-cancellation). | ::: warning These properties are only available inside `run()`. They are `undefined` in the constructor. @@ -57,9 +58,10 @@ These methods let you explicitly transition the job to a specific lifecycle stat You must **`return`** the result of every flow control method. Calling one without returning it is a no-op — the transition won't happen. ```typescript -this.fail("reason"); // ❌ does nothing +this.fail("reason"); // ❌ does nothing return this.fail("reason"); // ✅ transitions to failed ``` + ::: ### `return this.complete(result)` @@ -129,15 +131,42 @@ async run(payload: unknown) { Use `snooze` for time-based deferrals: rate limit windows, maintenance modes, business hours. +## Responding to timeout and cancellation + +When a job exceeds its `timeout`, or is canceled (via the dashboard or `Sidequest.job.cancel(id)`), Sidequest aborts `this.abortSignal`. Use it to stop your work promptly: + +```typescript +async run(url: string) { + // Pass it to any abort-aware API; it cancels automatically. + const res = await fetch(url, { signal: this.abortSignal }); + + // Or check it cooperatively in loops / between steps. + for (const item of await res.json()) { + this.abortSignal.throwIfAborted(); // throws if timed out / canceled + await process(item); + } +} +``` + +`this.abortSignal.reason` is a `JobTimeout` or `JobCanceled` (both exported from `sidequest`) so you can react differently to each. + +::: danger Whether the signal can actually stop the job depends on the execution mode + +- In the default thread pool with `abortGracePeriodMs: 0`, the worker is terminated, so honoring the signal is optional (it just lets you clean up; set a grace period to get a cooperative window). +- In **`runner: "inline"` mode there is no way to forcibly stop a job.** If your job ignores `this.abortSignal`, timeouts and cancellation **will not stop it**: it runs to completion. Honoring the signal is mandatory for long-running inline jobs. + +See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) for the full behavior across modes. +::: + ## Choosing the right method -| Situation | Use | -|---|---| -| Normal completion | `return result` or `return this.complete(result)` | -| Permanent, unrecoverable error | `return this.fail(reason)` | -| Transient error, controlled retry delay | `return this.retry(reason, delay)` | -| Not the right time — try again later | `return this.snooze(delay)` | -| Unexpected error — let Sidequest decide | `throw error` | +| Situation | Use | +| --------------------------------------- | ------------------------------------------------- | +| Normal completion | `return result` or `return this.complete(result)` | +| Permanent, unrecoverable error | `return this.fail(reason)` | +| Transient error, controlled retry delay | `return this.retry(reason, delay)` | +| Not the right time — try again later | `return this.snooze(delay)` | +| Unexpected error — let Sidequest decide | `throw error` | ## Best practices diff --git a/packages/docs/introduction/how-it-works.md b/packages/docs/introduction/how-it-works.md index bccf7c42..771b16c7 100644 --- a/packages/docs/introduction/how-it-works.md +++ b/packages/docs/introduction/how-it-works.md @@ -30,6 +30,10 @@ Your app process Because the engine is a separate process, a job that calls `process.exit()` or throws an unhandled exception will kill the engine process but **not your app**. The engine restarts automatically. +::: tip +This forked, worker-thread model is the default and the right choice for most deployments. For serverless runtimes, test suites, or framework integrations that need jobs to share live in-process state, you can run the engine in-process and/or run jobs inline. See [Execution Modes](/production/execution-modes). +::: + ## How jobs are claimed The Dispatcher polls the database at a configurable interval (default: **100 ms**). When it finds waiting jobs that fit within queue concurrency limits, it claims them atomically: diff --git a/packages/docs/production/execution-modes.md b/packages/docs/production/execution-modes.md new file mode 100644 index 00000000..98528d3e --- /dev/null +++ b/packages/docs/production/execution-modes.md @@ -0,0 +1,200 @@ +--- +outline: deep +title: Execution Modes +description: Choose how and where Sidequest runs your jobs (forked vs in-process, thread pool vs inline) and how cooperative timeout/cancellation works. +--- + +# Execution Modes + +By default Sidequest runs your jobs with **two layers of isolation**: the engine runs in a forked child process, and each job runs in its own worker thread inside that process (see [How It Works](/introduction/how-it-works)). This is the most robust setup and what you want in most deployments. + +Some environments and integrations need a different trade-off. Two independent options let you change where and how jobs run: + +- [`fork`](#fork-process-isolation): run the engine in a child process (default) or in your application's process. +- [`runner`](#runner-thread-pool-vs-inline): run each job in a worker thread pool (default) or inline in the current thread. + +They are orthogonal: `fork` controls the **process**, `runner` controls the **thread**. A related option, [`abortGracePeriodMs`](#cooperative-timeout-and-cancellation), controls how timeouts and cancellations stop a running job. + +::: tip TL;DR +Keep the defaults (`fork: true`, `runner: "thread"`) unless you have a concrete reason not to. Reach for `inline` + `fork: false` for serverless, test suites, or framework integrations that need jobs to share live in-process state. +::: + +## `fork`: process isolation + +```typescript +await Sidequest.start({ fork: false }); // default: true +``` + +| Value | Where the engine runs | Crash isolation | +| ---------------- | -------------------------- | -------------------------------------------------------------------------------------------------- | +| `true` (default) | A `child_process.fork` | A job crash (or `process.exit()`) kills the fork, not your app. The engine restarts automatically. | +| `false` | Your application's process | No isolation. An uncaught error in job code can take down your app. | + +Use `fork: false` when: + +- You can't spawn child processes (many **serverless / edge** runtimes). +- You're running an **integration test** and want to avoid IPC and process teardown flakiness. +- Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container. + +::: danger No crash isolation with `fork: false` +With the default `fork: true`, a job that throws an unhandled exception or calls `process.exit()` only takes down the engine fork, and Sidequest restarts it. With `fork: false`, the engine shares your application's process: **a misbehaving job can crash your whole app.** Only use it when you understand and accept that. +::: + +## `runner`: thread pool vs inline + +```typescript +await Sidequest.start({ runner: "inline" }); // default: "thread" +``` + +| Value | How a job runs | CPU isolation | Can be force-stopped? | +| -------------------- | ------------------------------------------------------------------ | ------------- | ------------------------------------- | +| `"thread"` (default) | In a [piscina](https://github.com/piscinajs/piscina) worker thread | Yes | Yes (the worker thread is terminated) | +| `"inline"` | Directly in the current thread, no pool | No | **No** | + +With `runner: "thread"`, `minThreads` / `maxThreads` / `idleWorkerTimeout` size the pool, and a job can be forcibly stopped by terminating its worker thread. + +With `runner: "inline"`, there is no pool and no separate thread. This is required when jobs must reach state that lives in the current thread, and it's handy for single-process setups. But it comes with two important consequences: + +::: warning Inline jobs block the event loop +An inline job runs on the same thread as everything else in that process: the dispatcher, and your app too if `fork: false`. A **CPU-bound** inline job will starve all of it until it finishes. Keep inline jobs I/O-bound, or use the thread pool for heavy work. +::: + +::: danger Inline jobs cannot be forcibly stopped +There is no separate thread to terminate, so Sidequest **cannot** kill a running inline job. Timeouts and cancellation only work if the job **cooperates** with the abort signal (see [Cooperative timeout and cancellation](#cooperative-timeout-and-cancellation) below). A job that ignores the signal runs to completion no matter what. +::: + +## Choosing a combination + +`fork` and `runner` combine into four setups: + +| `fork` | `runner` | Crash isolation | CPU isolation | Typical use | +| ------- | -------- | ---------------- | ------------- | ----------------------------------------------------------------------- | +| `true` | `thread` | ✅ | ✅ | **Default.** Production. | +| `true` | `inline` | ✅ (engine fork) | ❌ | Lighter execution with crash isolation kept; e.g. SQLite single-writer. | +| `false` | `thread` | ❌ | ✅ | Run in-process but still isolate CPU per job. | +| `false` | `inline` | ❌ | ❌ | Serverless, tests, and integrations that need live in-process state. | + +::: code-group + +```typescript [Serverless / single-process] +// No child process, no worker threads: everything in one place. +await Sidequest.start({ + fork: false, + runner: "inline", + backend: { driver: "@sidequest/postgres-backend", config: process.env.DATABASE_URL }, +}); +``` + +```typescript [SQLite] +// SQLite is single-writer; running jobs inline avoids cross-thread write contention. +await Sidequest.start({ + runner: "inline", + maxConcurrentJobs: 1, + backend: { driver: "@sidequest/sqlite-backend", config: "./jobs.sqlite" }, +}); +``` + +```typescript [Integration tests] +await Sidequest.start({ + fork: false, // no IPC to wait on + runner: "inline", // deterministic, in-process execution + backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" }, +}); +``` + +::: + +::: warning SQLite and concurrency +SQLite allows a single writer. Concurrency above 1 against the same file leads to `SQLITE_BUSY`. Keep `maxConcurrentJobs: 1`, use a separate `.sqlite` file from your app, or use a server database (Postgres/MySQL) for real concurrency. This is independent of the execution mode. +::: + +## Cooperative timeout and cancellation + +A job is stopped early in two cases: it exceeds its `timeout`, or it is canceled (via the dashboard or `Sidequest.job.cancel(id)`). How that actually stops the job depends on the mode. + +Sidequest hands every job an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) at `this.abortSignal`. When a timeout or cancellation fires, that signal aborts. Your job can observe it and stop: + +```typescript +import { Job } from "sidequest"; + +export class SyncContactsJob extends Job { + async run(accountId: string) { + // 1. Hand the signal to anything that accepts one; it aborts automatically. + const res = await fetch(`https://api.example.com/${accountId}/contacts`, { + signal: this.abortSignal, + }); + const contacts = await res.json(); + + // 2. For long loops or CPU work, check it cooperatively. + for (const contact of contacts) { + this.abortSignal.throwIfAborted(); // bail out promptly on timeout/cancel + await upsert(contact); + } + + return this.complete({ synced: contacts.length }); + } +} +``` + +`this.abortSignal.reason` tells you _why_ it aborted. It is a `JobTimeout` or a `JobCanceled`: + +```typescript +import { JobTimeout, JobCanceled } from "sidequest"; + +this.abortSignal.addEventListener("abort", () => { + const reason = this.abortSignal.reason; + if (reason instanceof JobTimeout) { + // exceeded `timeout` + } else if (reason instanceof JobCanceled) { + // canceled by an operator + } +}); +``` + +### When does the job actually receive the signal? + +| Mode | Gets a live `abortSignal`? | If the job ignores it | +| ----------------------------------------------------- | --------------------------------- | --------------------------------------------- | +| `runner: "inline"` | **Always** | Runs to completion (cannot be force-stopped). | +| `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. | +| `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. | + +::: danger Inline timeout/cancel only work if your job honors the signal +In `runner: "inline"` there is no way to forcibly stop a job. If your job does not pass `this.abortSignal` to its async work or check `this.abortSignal.aborted` / `throwIfAborted()`, then **timeouts and cancellation have no effect**: the job keeps running until it returns on its own. Treat `this.abortSignal` as mandatory for any long-running inline job. +::: + +### `abortGracePeriodMs`: graceful kill for thread jobs + +```typescript +await Sidequest.start({ abortGracePeriodMs: 5000 }); // default: 0 +``` + +Applies only to `runner: "thread"`. It controls the window between _signaling_ an abort and _forcibly terminating_ the worker thread: + +- `0` (default): the worker is terminated immediately. The job is not given a chance to react, and `this.abortSignal` is not delivered to it. This is the historical behavior. +- `> 0`: the abort is delivered to the job via `this.abortSignal` first; if the job has not finished after this many milliseconds, the worker thread is terminated. Use this to let thread jobs clean up (close handles, flush buffers) before being killed. + +::: tip +A positive grace period allocates a small message channel per job to deliver the abort into the worker. The cost only applies while a grace period is configured, and only matters for the rare cancel/timeout. Leave it at `0` unless your thread jobs need graceful shutdown. +::: + +### What state does the job end in? + +The terminal state is decided when the run **actually ends**, never while it is still running (so a job is never re-queued while a copy of it is still in flight): + +| What happened | Terminal state | +| -------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------- | +| The job returned a value/transition (it finished) | Whatever the job returned (`completed`, `failed`, a retry, etc.). This holds **even if** a timeout/cancel was signaled but the job finished anyway. | +| The worker was hard-killed by a **timeout** (thread, no result) | Retried (or `failed` if no attempts remain). | +| The worker was hard-killed by a **cancellation** (thread, no result) | `canceled`. | +| The job threw an unexpected error | Retried (or `failed`). | + +::: warning Canceling a running inline job is best-effort +Because an inline job's result is respected once it returns, a running inline job that **ignores** a cancellation and finishes will be recorded with its own result (e.g. `completed`), not `canceled`. Cancellation of a _running_ inline job only takes effect if the job honors `this.abortSignal`. Canceling a **waiting** job always works (it is simply never claimed). +::: + +## Next steps + +- [Execution and Control](/guide/jobs/running): using `this.abortSignal` inside `run()` +- [Configuration reference](/getting-started/configuration): all engine options +- [Graceful Shutdown](/production/graceful-shutdown): draining jobs on shutdown From 8470de52bf3b70fc040cec08e051567bfe6c95b4 Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Fri, 19 Jun 2026 18:53:14 -0300 Subject: [PATCH 08/10] fix: tolerate a deleted job row when recording the terminal state (#187) When a job's row is deleted while it runs (cleanup routine, explicit delete, or a test truncating the table), the final transition's updateJob throws "Cannot update job, not found." Since execute() is fire-and-forget, that surfaced as an unhandled rejection that could crash and restart the engine fork (which in turn made scheduled jobs re-fire). Apply the terminal transition through a helper that swallows and logs such failures (recording the state of a job that no longer exists is safe to skip), and guard the dispatcher's fire-and-forget execute() with a .catch so a single job can never crash the engine. --- packages/engine/src/execution/dispatcher.ts | 7 +++-- .../src/execution/executor-manager.test.ts | 24 ++++++++++++++++ .../engine/src/execution/executor-manager.ts | 28 +++++++++++++++++-- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index 493fe82c..eec75ba3 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -60,8 +60,11 @@ export class Dispatcher { // because the execution is not awaited. This way we ensure that available slots // are correctly calculated. this.executorManager.queueJob(queue, job); - // does not await for job execution. - void this.executorManager.execute(queue, job); + // does not await for job execution. Guard against any unexpected rejection so a single + // job can never crash the engine with an unhandled promise rejection. + void this.executorManager.execute(queue, job).catch((error: unknown) => { + logger("Dispatcher").error(`Unexpected error executing job ${job.id}:`, error); + }); } } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 70a03fb9..4c4d9092 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -261,6 +261,30 @@ describe("ExecutorManager", () => { getJobSpy.mockRestore(); }); + sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => { + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // RunTransition succeeds; the terminal transition fails because the row was deleted mid-run. + // eslint-disable-next-line @typescript-eslint/unbound-method + vi.mocked(JobTransitioner.apply) + .mockImplementationOnce((_backend: Backend, job: JobData) => job) + .mockImplementationOnce(() => { + throw new Error("Cannot update job, not found."); + }); + runMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "ok", + } satisfies CompletedResult); + + // The fire-and-forget executor must not reject, and must free the job from the active set. + await expect(executorManager.execute(queryConfig, jobData)).resolves.toBeUndefined(); + expect(executorManager.totalActiveWorkers()).toBe(0); + + await executorManager.destroy(); + }); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 68253d33..89b7f255 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -4,6 +4,7 @@ import { JobCanceled, JobData, JobTimeout, + JobTransition, JobTransitionFactory, logger, QueueConfig, @@ -151,7 +152,7 @@ export class ExecutorManager { // The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled); // respect it and transition accordingly. logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`); - await JobTransitioner.apply(this.backend, job, JobTransitionFactory.create(result)); + await this.applyTerminalTransition(job, JobTransitionFactory.create(result)); } catch (error: unknown) { isRunning = false; const err = error as Error; @@ -162,10 +163,10 @@ export class ExecutorManager { const reason: unknown = controller.signal.reason; logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition(); - await JobTransitioner.apply(this.backend, job, transition); + await this.applyTerminalTransition(job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); - await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); + await this.applyTerminalTransition(job, new RetryTransition(err)); } } finally { isRunning = false; @@ -177,6 +178,27 @@ export class ExecutorManager { } } + /** + * Applies a job's final transition, tolerating the job row having disappeared. + * + * A job's row can be deleted while it runs (cleanup routine, an explicit delete, or a test + * truncating the table). Recording its terminal state is then impossible and safe to skip. This + * must never throw: `execute` is fire-and-forget, so an error here would surface as an unhandled + * rejection. + * + * @param job The job being finalized. + * @param transition The terminal transition to apply. + */ + private async applyTerminalTransition(job: JobData, transition: JobTransition): Promise { + try { + await JobTransitioner.apply(this.backend, job, transition); + } catch (error) { + logger("Executor Manager").warn( + `Could not record terminal state for job ${job.id} (it may no longer exist): ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + /** * Destroys the runner pool and releases resources. */ From 464449fdbb193872b526970fd2deca77df57e37a Mon Sep 17 00:00:00 2001 From: Lucas Merencia <1686635+merencia@users.noreply.github.com> Date: Sat, 20 Jun 2026 18:09:19 -0300 Subject: [PATCH 09/10] fix: address PR #180 review feedback (#188) - executor: stop treating a missing job row as a cancellation in the watcher. Jobs are not deleted in normal operation; the integration test's mid-run truncate is handled by the resilient terminal transition instead. The test's long job is shortened so it cannot outlive teardown. - executor: on a hard-kill, default to a retry and only map to canceled when the abort reason is clearly a JobCanceled (failsafe for unknown abort reasons). - runner-pool: if the signal is already aborted on entry, reject without submitting the job to the pool instead of running it anyway. - docs: remove the duplicated crash-isolation and inline-abort danger callouts from the execution-modes page (the tables already convey them). --- packages/docs/production/execution-modes.md | 8 +----- .../src/execution/executor-manager.test.ts | 25 ------------------- .../engine/src/execution/executor-manager.ts | 20 +++++++-------- .../src/shared-runner/runner-pool.test.ts | 8 ++++++ .../engine/src/shared-runner/runner-pool.ts | 12 +++++---- tests/integration/shared-test-suite.js | 4 ++- 6 files changed, 28 insertions(+), 49 deletions(-) diff --git a/packages/docs/production/execution-modes.md b/packages/docs/production/execution-modes.md index 98528d3e..5216ce97 100644 --- a/packages/docs/production/execution-modes.md +++ b/packages/docs/production/execution-modes.md @@ -36,10 +36,6 @@ Use `fork: false` when: - You're running an **integration test** and want to avoid IPC and process teardown flakiness. - Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container. -::: danger No crash isolation with `fork: false` -With the default `fork: true`, a job that throws an unhandled exception or calls `process.exit()` only takes down the engine fork, and Sidequest restarts it. With `fork: false`, the engine shares your application's process: **a misbehaving job can crash your whole app.** Only use it when you understand and accept that. -::: - ## `runner`: thread pool vs inline ```typescript @@ -159,9 +155,7 @@ this.abortSignal.addEventListener("abort", () => { | `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. | | `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. | -::: danger Inline timeout/cancel only work if your job honors the signal -In `runner: "inline"` there is no way to forcibly stop a job. If your job does not pass `this.abortSignal` to its async work or check `this.abortSignal.aborted` / `throwIfAborted()`, then **timeouts and cancellation have no effect**: the job keeps running until it returns on its own. Treat `this.abortSignal` as mandatory for any long-running inline job. -::: +So in inline mode `this.abortSignal` is effectively mandatory for any long-running job: a job that does not honor it keeps running until it returns on its own (timeouts and cancellation cannot stop it). ### `abortGracePeriodMs`: graceful kill for thread jobs diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 4c4d9092..850bfb78 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -236,31 +236,6 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest("aborts a running job when its row no longer exists", async ({ backend, config }) => { - await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); - - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config); - - // Simulate the row being deleted/truncated while the job runs: the watcher must still stop it. - const getJobSpy = vi.spyOn(backend, "getJob").mockResolvedValue(undefined); - runMock.mockImplementationOnce( - (_job: JobData, signal: AbortSignal) => - new Promise((_, reject) => { - signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); - }), - ); - - await executorManager.execute(queryConfig, jobData); - - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); - expect(executorManager.totalActiveWorkers()).toBe(0); - - await executorManager.destroy(); - getJobSpy.mockRestore(); - }); - sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); const executorManager = new ExecutorManager(backend, config); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 89b7f255..1468ba36 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -115,13 +115,8 @@ export class ExecutorManager { const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); - // Abort when the job was canceled, and also when its row no longer exists (deleted or - // truncated): the record is gone, so the run must be stopped rather than left to block - // shutdown forever. - if (!watchedJob || watchedJob.state === "canceled") { - logger("Executor Manager").debug( - `Aborting job ${job.id}: ${watchedJob ? "canceled" : "row no longer exists"}`, - ); + if (watchedJob?.state === "canceled") { + logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); controller.abort(new JobCanceled()); isRunning = false; return; @@ -157,12 +152,15 @@ export class ExecutorManager { isRunning = false; const err = error as Error; if (controller.signal.aborted) { - // The run produced no result because the worker was hard-killed (thread). The abort reason - // decides the terminal state: a timeout becomes a retry, anything else (cancellation) becomes - // canceled. The rejection is logged so a real error during the abort is not lost. + // The run produced no result because the worker was hard-killed (thread). Only a clear + // cancellation maps to canceled; every other abort reason (timeout, or anything else) defaults + // to a retry as a failsafe. The rejection is logged so a real error during the abort is kept. const reason: unknown = controller.signal.reason; logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); - const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition(); + const transition = + reason instanceof JobCanceled + ? new CancelTransition() + : new RetryTransition(reason instanceof Error ? reason : new Error(`Job aborted: ${String(reason)}`)); await this.applyTerminalTransition(job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 48f21275..5059c9b4 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -47,6 +47,14 @@ describe("RunnerPool", () => { expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest("rejects without running the job when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(new Error("already gone")); + + await expect(pool.run(jobData, controller.signal)).rejects.toThrow("already gone"); + expect(piscinaMockInstance.run).not.toHaveBeenCalled(); + }); + sidequestTest( "with a grace period, delivers the abort over a port and hard-kills after the grace", async ({ config }) => { diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index 244c37dd..1ed8b20b 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -42,6 +42,12 @@ export class RunnerPool implements JobRunner { */ run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); + + // Already aborted before we could start (e.g. canceled between claim and dispatch): don't run it. + if (signal?.aborted) { + return Promise.reject(signal.reason instanceof Error ? signal.reason : new Error("Job aborted before execution")); + } + const grace = this.nonNullConfig.abortGracePeriodMs; if (!signal || grace <= 0) { @@ -59,11 +65,7 @@ export class RunnerPool implements JobRunner { graceTimer = setTimeout(() => hardKill.abort(), grace); }; - if (signal.aborted) { - onAbort(); - } else { - signal.addEventListener("abort", onAbort, { once: true }); - } + signal.addEventListener("abort", onAbort, { once: true }); return this.pool .run( diff --git a/tests/integration/shared-test-suite.js b/tests/integration/shared-test-suite.js index 125b6ae6..531d98c5 100644 --- a/tests/integration/shared-test-suite.js +++ b/tests/integration/shared-test-suite.js @@ -310,7 +310,9 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") queues: [{ name: "default" }], }); - const jobData = await Sidequest.build(TimeoutJob).enqueue(1000000); + // A few seconds: long enough to reliably cancel while running, short enough that it cannot + // outlive the suite's teardown (which truncates the table) if the cancel watcher is mid-poll. + const jobData = await Sidequest.build(TimeoutJob).enqueue(3000); await vi.waitUntil(() => Sidequest.job.get(jobData.id).then((job) => job?.state === "running"), 5000); // Cancel the job while it's running From 0d609bbcc079350f00cbe791b8caf7e50963d90b Mon Sep 17 00:00:00 2001 From: merencia Date: Sat, 20 Jun 2026 18:28:26 -0300 Subject: [PATCH 10/10] docs: document new run() params and JSDoc convention --- CLAUDE.md | 1 + packages/engine/src/shared-runner/runner.ts | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 57e89cb9..771beb38 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,6 +116,7 @@ These are the things that aren't visually loud in either code or docs: - **Don't wrap things "just in case."** No backwards-compat shims, no feature flags for hypothetical use cases, no validation at internal boundaries. - **Match existing structure.** New engine concerns go under `packages/engine/src/`; cross-cutting types belong in `@sidequest/core`. Don't create a new package for a small piece. - **Tests live next to the code.** `foo.ts` + `foo.test.ts` in the same folder; integration tests under `tests/integration/`. Backend changes must keep `@sidequest/backend-test` green for every driver. +- **JSDoc every exported entity.** `CONTRIBUTING.md` requires JSDoc-style docstrings on all exports; match that when adding public API. - **Commits follow Conventional Commits** (commitlint + Husky enforce it). semantic-release publishes from `master`. ## Scope guard diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 41c5d0b3..65219224 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -33,8 +33,14 @@ function signalFromAbortPort(port: MessagePort): AbortSignal { /** * Runs a job by dynamically importing its script and executing the specified class. - * @param jobData The job data containing script and class information + * @param jobData The job data containing script and class information. * @param config The non-nullable engine configuration. + * @param inline Whether the job runs inline in the host process. When true, the Sidequest config is + * not re-injected (the host process is already configured). + * @param signal Abort signal handed to the job as `this.abortSignal` (used by the inline runner, + * which executes in the same process). + * @param abortPort Port the thread runner uses to receive the abort cooperatively across the worker + * boundary; it is turned into the job's `this.abortSignal`. Mutually exclusive with `signal`. * @returns A promise resolving to the job result. */ export default async function run({