From 313d0dc6982c9a96deb972699875392b46d372db Mon Sep 17 00:00:00 2001 From: merencia Date: Fri, 19 Jun 2026 17:46:54 -0300 Subject: [PATCH] fix: tolerate a deleted job row when recording the terminal state When a job's row is deleted while it runs (cleanup routine, explicit delete, or a test truncating the table), the final transition's updateJob throws "Cannot update job, not found." Since execute() is fire-and-forget, that surfaced as an unhandled rejection that could crash and restart the engine fork (which in turn made scheduled jobs re-fire). Apply the terminal transition through a helper that swallows and logs such failures (recording the state of a job that no longer exists is safe to skip), and guard the dispatcher's fire-and-forget execute() with a .catch so a single job can never crash the engine. --- packages/engine/src/execution/dispatcher.ts | 7 +++-- .../src/execution/executor-manager.test.ts | 24 ++++++++++++++++ .../engine/src/execution/executor-manager.ts | 28 +++++++++++++++++-- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index 493fe82c..eec75ba3 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -60,8 +60,11 @@ export class Dispatcher { // because the execution is not awaited. This way we ensure that available slots // are correctly calculated. this.executorManager.queueJob(queue, job); - // does not await for job execution. - void this.executorManager.execute(queue, job); + // does not await for job execution. Guard against any unexpected rejection so a single + // job can never crash the engine with an unhandled promise rejection. + void this.executorManager.execute(queue, job).catch((error: unknown) => { + logger("Dispatcher").error(`Unexpected error executing job ${job.id}:`, error); + }); } } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 70a03fb9..4c4d9092 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -261,6 +261,30 @@ describe("ExecutorManager", () => { getJobSpy.mockRestore(); }); + sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => { + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // RunTransition succeeds; the terminal transition fails because the row was deleted mid-run. + // eslint-disable-next-line @typescript-eslint/unbound-method + vi.mocked(JobTransitioner.apply) + .mockImplementationOnce((_backend: Backend, job: JobData) => job) + .mockImplementationOnce(() => { + throw new Error("Cannot update job, not found."); + }); + runMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "ok", + } satisfies CompletedResult); + + // The fire-and-forget executor must not reject, and must free the job from the active set. + await expect(executorManager.execute(queryConfig, jobData)).resolves.toBeUndefined(); + expect(executorManager.totalActiveWorkers()).toBe(0); + + await executorManager.destroy(); + }); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 68253d33..89b7f255 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -4,6 +4,7 @@ import { JobCanceled, JobData, JobTimeout, + JobTransition, JobTransitionFactory, logger, QueueConfig, @@ -151,7 +152,7 @@ export class ExecutorManager { // The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled); // respect it and transition accordingly. logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`); - await JobTransitioner.apply(this.backend, job, JobTransitionFactory.create(result)); + await this.applyTerminalTransition(job, JobTransitionFactory.create(result)); } catch (error: unknown) { isRunning = false; const err = error as Error; @@ -162,10 +163,10 @@ export class ExecutorManager { const reason: unknown = controller.signal.reason; logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition(); - await JobTransitioner.apply(this.backend, job, transition); + await this.applyTerminalTransition(job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); - await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); + await this.applyTerminalTransition(job, new RetryTransition(err)); } } finally { isRunning = false; @@ -177,6 +178,27 @@ export class ExecutorManager { } } + /** + * Applies a job's final transition, tolerating the job row having disappeared. + * + * A job's row can be deleted while it runs (cleanup routine, an explicit delete, or a test + * truncating the table). Recording its terminal state is then impossible and safe to skip. This + * must never throw: `execute` is fire-and-forget, so an error here would surface as an unhandled + * rejection. + * + * @param job The job being finalized. + * @param transition The terminal transition to apply. + */ + private async applyTerminalTransition(job: JobData, transition: JobTransition): Promise { + try { + await JobTransitioner.apply(this.backend, job, transition); + } catch (error) { + logger("Executor Manager").warn( + `Could not record terminal state for job ${job.id} (it may no longer exist): ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + /** * Destroys the runner pool and releases resources. */