From 4bc8658639a83923a808c0fc793633602e415115 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:09:21 -0800 Subject: [PATCH 1/5] allow resuming --- src/client/index.ts | 35 +++++++ src/component/_generated/component.ts | 9 +- src/component/workflow.ts | 144 ++++++++++++++++++++++---- 3 files changed, 168 insertions(+), 20 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index f957167d..7f75f514 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -207,6 +207,41 @@ export class WorkflowManager { } } + /** + * Retry a previously-failed workflow, optionally from a specific step. + * + * @param ctx - The Convex context. + * @param workflowId - The workflow ID. + * @param options - Options for the retry. + * @param options.from - The step to retry from. Can be a step number, + * a step name, or a FunctionReference (uses the function name). + * Steps from this point onwards will be deleted and re-executed. + * @param options.startAsync - If true, the workflow will be enqueued + * via the workpool instead of running immediately. + */ + async retry( + ctx: RunMutationCtx, + workflowId: WorkflowId, + options?: { + from?: number | string | FunctionReference; + startAsync?: boolean; + }, + ): Promise { + let from: number | string | undefined; + if (options?.from !== undefined) { + if (typeof options.from === "number" || typeof options.from === "string") { + from = options.from; + } else { + from = safeFunctionName(options.from); + } + } + await ctx.runMutation(this.component.workflow.retry, { + workflowId, + from, + startAsync: options?.startAsync, + }); + } + /** * Cancel a running workflow. * diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 83ad1ccf..fde004ea 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -272,7 +272,7 @@ export type ComponentApi = cleanup: FunctionReference< "mutation", "internal", - { workflowId: string }, + { force?: boolean; workflowId: string }, boolean, Name >; @@ -486,5 +486,12 @@ export type ComponentApi = }, Name >; + retry: FunctionReference< + "mutation", + "internal", + { from?: number | string; startAsync?: boolean; workflowId: string }, + null, + Name + >; }; }; diff --git a/src/component/workflow.ts b/src/component/workflow.ts index 9ff8901c..93dca218 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -217,6 +217,107 @@ export const listSteps = query({ }, }); +const retryArgs = v.object({ + workflowId: v.id("workflows"), + from: v.optional(v.union(v.number(), v.string())), + startAsync: v.optional(v.boolean()), +}); + +export const retry = mutation({ + args: retryArgs, + returns: v.null(), + handler: retryHandler, +}); + +export async function retryHandler( + ctx: MutationCtx, + args: Infer, +) { + const workflow = await ctx.db.get(args.workflowId); + assert(workflow, `Workflow not found: ${args.workflowId}`); + const console = await getDefaultLogger(ctx); + + if (!workflow.runResult) { + throw new Error(`Workflow is still running: ${args.workflowId}`); + } + + // Delete steps from the specified point + if (args.from !== undefined) { + if (typeof args.from === "number") { + const stepsToDelete = await ctx.db + .query("steps") + .withIndex("workflow", (q) => + q + .eq("workflowId", args.workflowId) + .gte("stepNumber", args.from as number), + ) + .collect(); + if (stepsToDelete.length === 0) { + throw new Error( + `Step number ${args.from} not found in workflow ${args.workflowId}`, + ); + } + await deleteSteps(ctx, stepsToDelete); + } else { + // Walk backwards to find step by name, collecting steps to delete + const stepsDesc = ctx.db + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", args.workflowId)) + .order("desc"); + let found = false; + const toDelete: Doc<"steps">[] = []; + for await (const step of stepsDesc) { + toDelete.push(step); + if (step.step.name === args.from) { + found = true; + break; + } + } + if (!found) { + throw new Error( + `Step "${args.from}" not found in workflow ${args.workflowId}`, + ); + } + await deleteSteps(ctx, toDelete); + } + } + + // Increment generation number and clear result + const generationNumber = workflow.generationNumber + 1; + await ctx.db.patch(args.workflowId, { + generationNumber, + runResult: undefined, + }); + + console.event("retry", { + workflowId: args.workflowId, + name: workflow.name, + from: args.from, + }); + + if (args.startAsync) { + const workpool = await getWorkpool(ctx, {}); + await workpool.enqueueMutation( + ctx, + workflow.workflowHandle as FunctionHandle<"mutation">, + { workflowId: args.workflowId, generationNumber }, + { + name: workflow.name, + onComplete: internal.pool.handlerOnComplete, + context: { workflowId: args.workflowId, generationNumber }, + }, + ); + } else { + await ctx.runMutation( + workflow.workflowHandle as FunctionHandle<"mutation">, + { + workflowId: args.workflowId, + generationNumber, + }, + ); + } +} + export const cancel = mutation({ args: { workflowId: v.id("workflows"), @@ -325,6 +426,7 @@ export async function completeHandler( export const cleanup = mutation({ args: { workflowId: v.string(), + force: v.optional(v.boolean()), }, returns: v.boolean(), handler: async (ctx, args) => { @@ -339,10 +441,13 @@ export const cleanup = mutation({ const logger = await getDefaultLogger(ctx); // TODO: allow cleaning up a workflow from inside it / in the onComplete hook if (!workflow.runResult) { - logger.debug( - `Can't clean up workflow ${workflowId} since it hasn't completed.`, - ); - return false; + if (!args.force) { + logger.debug( + `Can't clean up workflow ${workflowId} since it hasn't completed.`, + ); + return false; + } + logger.debug(`Workflow ${workflowId} is not completed, forcing anyways`); } logger.debug(`Cleaning up workflow ${workflowId}`, workflow); await ctx.db.delete(workflowId); @@ -350,21 +455,7 @@ export const cleanup = mutation({ .query("steps") .withIndex("workflow", (q) => q.eq("workflowId", workflowId)) .collect(); - for (const journalEntry of journalEntries) { - logger.debug("Deleting journal entry", journalEntry); - await ctx.db.delete(journalEntry._id); - if (journalEntry.step.kind === "event" && journalEntry.step.eventId) { - await ctx.db.delete(journalEntry.step.eventId); - } else if ( - journalEntry.step.kind === "workflow" && - journalEntry.step.workflowId - ) { - const workpool = await getWorkpool(ctx, {}); - await workpool.enqueueMutation(ctx, api.workflow.cleanup, { - workflowId: journalEntry.step.workflowId, - }); - } - } + await deleteSteps(ctx, journalEntries); return true; }, }); @@ -385,5 +476,20 @@ async function updateMaxParallelism( } } +async function deleteSteps(ctx: MutationCtx, steps: Doc<"steps">[]) { + for (const entry of steps) { + await ctx.db.delete(entry._id); + if (entry.step.kind === "event" && entry.step.eventId) { + await ctx.db.delete(entry.step.eventId); + } else if (entry.step.kind === "workflow" && entry.step.workflowId) { + const workpool = await getWorkpool(ctx, {}); + await workpool.enqueueMutation(ctx, api.workflow.cleanup, { + workflowId: entry.step.workflowId, + force: true, + }); + } + } +} + // eslint-disable-next-line @typescript-eslint/no-unused-vars const console = "THIS IS A REMINDER TO USE getDefaultLogger"; From a2aa3bb60861398997aa51fba78517cd28598090 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:28:32 -0800 Subject: [PATCH 2/5] tests --- src/component/workflow.test.ts | 249 +++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/src/component/workflow.test.ts b/src/component/workflow.test.ts index 7f507a69..d7be54f6 100644 --- a/src/component/workflow.test.ts +++ b/src/component/workflow.test.ts @@ -44,6 +44,255 @@ describe("workflow", () => { expect(workflow2.workflow.runResult).toMatchObject({ kind: "canceled" }); }); + test("retry a failed workflow", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: { location: "San Francisco" }, + startAsync: true, + }); + // Complete with failure + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "something went wrong" }, + }); + const before = await t.query(api.workflow.getStatus, { workflowId: id }); + expect(before.workflow.runResult).toMatchObject({ kind: "failed" }); + expect(before.workflow.generationNumber).toBe(0); + + // Retry + await t.mutation(api.workflow.retry, { + workflowId: id, + startAsync: true, + }); + const after = await t.query(api.workflow.getStatus, { workflowId: id }); + expect(after.workflow.runResult).toBeUndefined(); + expect(after.workflow.generationNumber).toBe(1); + }); + + test("retry throws if workflow is still running", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + await expect( + t.mutation(api.workflow.retry, { workflowId: id, startAsync: true }), + ).rejects.toThrow("still running"); + }); + + test("retry from step number deletes steps from that point", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + // Insert some steps + await t.run(async (ctx) => { + for (let i = 0; i < 3; i++) { + await ctx.db.insert("steps", { + workflowId: id, + stepNumber: i, + step: { + kind: "function" as const, + functionType: "mutation" as const, + handle: "function://test", + name: `step${i}`, + inProgress: false, + argsSize: 0, + args: {}, + runResult: { kind: "success", returnValue: null }, + startedAt: Date.now(), + completedAt: Date.now(), + }, + }); + } + }); + // Complete with failure + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "step2 failed" }, + }); + + // Retry from step 1 + await t.mutation(api.workflow.retry, { + workflowId: id, + from: 1, + startAsync: true, + }); + + // Only step0 should remain + await t.run(async (ctx) => { + const steps = await ctx.db + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", id)) + .collect(); + expect(steps).toHaveLength(1); + expect(steps[0].stepNumber).toBe(0); + expect(steps[0].step.name).toBe("step0"); + }); + }); + + test("retry from step name deletes that step and subsequent ones", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + // Insert steps with names + await t.run(async (ctx) => { + const names = ["fetch", "process", "save"]; + for (let i = 0; i < names.length; i++) { + await ctx.db.insert("steps", { + workflowId: id, + stepNumber: i, + step: { + kind: "function" as const, + functionType: "action" as const, + handle: "function://test", + name: names[i], + inProgress: false, + argsSize: 0, + args: {}, + runResult: { kind: "success", returnValue: null }, + startedAt: Date.now(), + completedAt: Date.now(), + }, + }); + } + }); + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "save failed" }, + }); + + // Retry from "process" + await t.mutation(api.workflow.retry, { + workflowId: id, + from: "process", + startAsync: true, + }); + + // Only "fetch" (step 0) should remain + await t.run(async (ctx) => { + const steps = await ctx.db + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", id)) + .collect(); + expect(steps).toHaveLength(1); + expect(steps[0].step.name).toBe("fetch"); + }); + }); + + test("retry from unknown step name throws", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "oops" }, + }); + await expect( + t.mutation(api.workflow.retry, { + workflowId: id, + from: "nonexistent", + startAsync: true, + }), + ).rejects.toThrow('Step "nonexistent" not found'); + }); + + test("retry from nonexistent step number throws", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "oops" }, + }); + await expect( + t.mutation(api.workflow.retry, { + workflowId: id, + from: 5, + startAsync: true, + }), + ).rejects.toThrow("Step number 5 not found"); + }); + + test("retry deletes associated event steps", async () => { + const t = initConvexTest(); + const id = await t.mutation(api.workflow.create, { + workflowName: "test", + workflowHandle: "function://internal.example.exampleWorkflow", + workflowArgs: {}, + startAsync: true, + }); + // Insert an event and an event step referencing it + let eventId: any; + await t.run(async (ctx) => { + eventId = await ctx.db.insert("events", { + workflowId: id, + name: "approval", + state: { kind: "created" }, + }); + await ctx.db.insert("steps", { + workflowId: id, + stepNumber: 0, + step: { + kind: "event" as const, + name: "waitForApproval", + inProgress: false, + argsSize: 0, + args: { eventId }, + eventId, + startedAt: Date.now(), + completedAt: Date.now(), + runResult: { kind: "success", returnValue: null }, + }, + }); + }); + await t.mutation(api.workflow.complete, { + workflowId: id, + generationNumber: 0, + runResult: { kind: "failed", error: "oops" }, + }); + + // Retry from step 0 — should delete the event too + await t.mutation(api.workflow.retry, { + workflowId: id, + from: 0, + startAsync: true, + }); + await t.run(async (ctx) => { + const steps = await ctx.db + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", id)) + .collect(); + expect(steps).toHaveLength(0); + const event = await ctx.db.get(eventId); + expect(event).toBeNull(); + }); + }); + test("cleaning up a workflow", async () => { const t = initConvexTest(); const id = await t.mutation(api.workflow.create, { From f9571b0e9a361b269e10192e3e2d2998b3bd868a Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:33:57 -0800 Subject: [PATCH 3/5] format --- src/client/index.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 7f75f514..fed57adc 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -214,8 +214,8 @@ export class WorkflowManager { * @param workflowId - The workflow ID. * @param options - Options for the retry. * @param options.from - The step to retry from. Can be a step number, - * a step name, or a FunctionReference (uses the function name). - * Steps from this point onwards will be deleted and re-executed. + * a step name, or the function / workflow `internal.foo.bar`. + * Steps from this point onwards will be deleted before restarting. * @param options.startAsync - If true, the workflow will be enqueued * via the workpool instead of running immediately. */ @@ -229,7 +229,10 @@ export class WorkflowManager { ): Promise { let from: number | string | undefined; if (options?.from !== undefined) { - if (typeof options.from === "number" || typeof options.from === "string") { + if ( + typeof options.from === "number" || + typeof options.from === "string" + ) { from = options.from; } else { from = safeFunctionName(options.from); From 87fed4c665d7b4bb67b2a7a40e0fc4878d647ce5 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:53:39 -0800 Subject: [PATCH 4/5] docs --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/README.md b/README.md index c88cab59..3b6f356a 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Welcome to the world of Convex workflows. - Specify retry behavior on a per-step basis, along with a default policy. - Specify how many workflow steps can run in parallel to manage load. - Cancel long-running workflows. +- Retry previously-failed workflows from a specific step. - Clean up workflows after they're done. ```ts @@ -369,6 +370,55 @@ export const kickoffWorkflow = action({ }); ``` +### Retrying a failed workflow + +If you want to re-run a workflow from a specific point, you can do so with +`workflow.retry(...)`. + +If a certain step failed, you can retry from that step onwards by providing the +step number, name, or function reference (`internal.foo.bar` e.g.). Events are +named after the event name or event ID if no name is given. You can get the step +number by listing the steps and using the `stepNumber`. + +```ts +// Retry from a step number (0-indexed) +await workflow.retry(ctx, workflowId, { from: 2 }); + +// Retry from a step by name +await workflow.retry(ctx, workflowId, { from: "eventName" }); + +// Retry from a step by function reference +await workflow.retry(ctx, workflowId, { + from: internal.example.myAction, +}); +``` + +If a name or function reference is provided, it will be used to find the last +step with that name or function reference, and delete all subsequent steps, so +the workflow will start from that step when re-executing. + +If the failure was from the workflow handler itself and you don't want to drop +any previous steps, you don't have to specify a step to retry from. + +```ts +await workflow.retry(ctx, args.workflowId); +``` + +Like `workflow.start()`, you can pass `startAsync: true` to enqueue the retry +via the workpool instead of running it immediately: + +```ts +await workflow.retry(ctx, workflowId, { startAsync: true }); +``` + +By default it will execute the handler in the same transaction so any errors +will be immediately visible. However, this means that on a handler error, the +restart itself will also be rolled back and the workflow will be unchanged. + +If you want to retry the workflow in a separate transaction, you can do so by +passing `startAsync: true`. This will enqueue the retry via the workpool instead +of running it immediately. + ### Cleaning up a workflow After a workflow has completed, you can clean up its storage with From 28c53f9db0a044d26325141b1c3c359f900d87af Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 18 Feb 2026 21:17:49 -0800 Subject: [PATCH 5/5] turn getting entry into warning when missing and refactor getting workflow when not checking generation number --- src/component/journal.ts | 2 +- src/component/model.ts | 28 +++++----------------------- src/component/pool.ts | 13 +++++++++---- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/src/component/journal.ts b/src/component/journal.ts index d1c71df2..8edd31df 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -72,7 +72,7 @@ export const load = query({ export const startSteps = mutation({ args: { - workflowId: v.string(), + workflowId: v.id("workflows"), generationNumber: v.number(), steps: v.array( v.object({ diff --git a/src/component/model.ts b/src/component/model.ts index 8a5f46e2..5f147533 100644 --- a/src/component/model.ts +++ b/src/component/model.ts @@ -1,37 +1,19 @@ +import type { Id } from "./_generated/dataModel.js"; import type { QueryCtx } from "./_generated/server.js"; export async function getWorkflow( ctx: QueryCtx, - workflowIdStr: string, - expectedGenerationNumber: number | null, + workflowId: Id<"workflows">, + expectedGenerationNumber: number, ) { - const workflowId = ctx.db.normalizeId("workflows", workflowIdStr); - if (!workflowId) { - throw new Error(`Invalid workflow ID: ${workflowIdStr}`); - } - const workflow = await ctx.db.get(workflowId); + const workflow = await ctx.db.get("workflows", workflowId); if (!workflow) { throw new Error(`Workflow not found: ${workflowId}`); } - if ( - expectedGenerationNumber !== null && - workflow.generationNumber !== expectedGenerationNumber - ) { + if (workflow.generationNumber !== expectedGenerationNumber) { throw new Error( `Invalid generation number: ${expectedGenerationNumber} for workflow ${workflow.name} (${workflowId})`, ); } return workflow; } - -export async function getJournalEntry(ctx: QueryCtx, journalIdStr: string) { - const journalId = ctx.db.normalizeId("steps", journalIdStr); - if (!journalId) { - throw new Error(`Invalid journal ID: ${journalIdStr}`); - } - const journalEntry = await ctx.db.get(journalId); - if (!journalEntry) { - throw new Error(`Journal entry not found: ${journalId}`); - } - return journalEntry; -} diff --git a/src/component/pool.ts b/src/component/pool.ts index ef63afbf..cdaf82fb 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -18,7 +18,6 @@ import { type Infer, v } from "convex/values"; import { components, internal } from "./_generated/api.js"; import { internalMutation, type MutationCtx } from "./_generated/server.js"; import { logLevel } from "./logging.js"; -import { getWorkflow } from "./model.js"; import { getDefaultLogger } from "./utils.js"; import { completeHandler } from "./workflow.js"; import type { Doc } from "./_generated/dataModel.js"; @@ -109,7 +108,12 @@ async function onCompleteHandler( return; } const journalEntry = await ctx.db.get(stepId); - assert(journalEntry, `Journal entry not found: ${stepId}`); + if (!journalEntry) { + console.error( + `Journal entry not found: ${stepId}. This is likely because it was already cleaned up.`, + ); + return; + } const workflowId = journalEntry.workflowId; if ( @@ -127,10 +131,11 @@ async function onCompleteHandler( return; } const { generationNumber } = args.context; - const workflow = await getWorkflow(ctx, workflowId, null); + const workflow = await ctx.db.get("workflows", workflowId); + assert(workflow, `Workflow not found: ${workflowId}`); if (workflow.generationNumber !== generationNumber) { console.error( - `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, + `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}. Expected ${generationNumber}`, ); return; }