Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/engine/src/execution/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ export class Dispatcher {
// because the execution is not awaited. This way we ensure that available slots
// are correctly calculated.
this.executorManager.queueJob(queue, job);
// does not await for job execution.
void this.executorManager.execute(queue, job);
// does not await for job execution. Guard against any unexpected rejection so a single
// job can never crash the engine with an unhandled promise rejection.
void this.executorManager.execute(queue, job).catch((error: unknown) => {
logger("Dispatcher").error(`Unexpected error executing job ${job.id}:`, error);
});
}
}

Expand Down
24 changes: 24 additions & 0 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,30 @@ describe("ExecutorManager", () => {
getJobSpy.mockRestore();
});

sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

// RunTransition succeeds; the terminal transition fails because the row was deleted mid-run.
// eslint-disable-next-line @typescript-eslint/unbound-method
vi.mocked(JobTransitioner.apply)
.mockImplementationOnce((_backend: Backend, job: JobData) => job)
.mockImplementationOnce(() => {
throw new Error("Cannot update job, not found.");
});
runMock.mockResolvedValue({
__is_job_transition__: true,
type: "completed",
result: "ok",
} satisfies CompletedResult);

// The fire-and-forget executor must not reject, and must free the job from the active set.
await expect(executorManager.execute(queryConfig, jobData)).resolves.toBeUndefined();
expect(executorManager.totalActiveWorkers()).toBe(0);

await executorManager.destroy();
});

sidequestTest("should abort job execution on timeout", async ({ backend, config }) => {
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 });

Expand Down
28 changes: 25 additions & 3 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
JobCanceled,
JobData,
JobTimeout,
JobTransition,
JobTransitionFactory,
logger,
QueueConfig,
Expand Down Expand Up @@ -151,7 +152,7 @@ export class ExecutorManager {
// The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled);
// respect it and transition accordingly.
logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`);
await JobTransitioner.apply(this.backend, job, JobTransitionFactory.create(result));
await this.applyTerminalTransition(job, JobTransitionFactory.create(result));
} catch (error: unknown) {
isRunning = false;
const err = error as Error;
Expand All @@ -162,10 +163,10 @@ export class ExecutorManager {
const reason: unknown = controller.signal.reason;
logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`);
const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition();
await JobTransitioner.apply(this.backend, job, transition);
await this.applyTerminalTransition(job, transition);
} else {
logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`);
await JobTransitioner.apply(this.backend, job, new RetryTransition(err));
await this.applyTerminalTransition(job, new RetryTransition(err));
}
} finally {
isRunning = false;
Expand All @@ -177,6 +178,27 @@ export class ExecutorManager {
}
}

/**
* Applies a job's final transition, tolerating the job row having disappeared.
*
* A job's row can be deleted while it runs (cleanup routine, an explicit delete, or a test
* truncating the table). Recording its terminal state is then impossible and safe to skip. This
* must never throw: `execute` is fire-and-forget, so an error here would surface as an unhandled
* rejection.
*
* @param job The job being finalized.
* @param transition The terminal transition to apply.
*/
private async applyTerminalTransition(job: JobData, transition: JobTransition): Promise<void> {
try {
await JobTransitioner.apply(this.backend, job, transition);
} catch (error) {
logger("Executor Manager").warn(
`Could not record terminal state for job ${job.id} (it may no longer exist): ${error instanceof Error ? error.message : String(error)}`,
);
}
}

/**
* Destroys the runner pool and releases resources.
*/
Expand Down