Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Welcome to the world of Convex workflows.
- Specify retry behavior on a per-step basis, along with a default policy.
- Specify how many workflow steps can run in parallel to manage load.
- Cancel long-running workflows.
- Retry previously-failed workflows from a specific step.
- Clean up workflows after they're done.

```ts
Expand Down Expand Up @@ -369,6 +370,55 @@ export const kickoffWorkflow = action({
});
```

### Retrying a failed workflow

If you want to re-run a workflow from a specific point, you can do so with
`workflow.retry(...)`.

If a certain step failed, you can retry from that step onwards by providing the
step number, name, or function reference (`internal.foo.bar` e.g.). Events are
named after the event name or event ID if no name is given. You can get the step
number by listing the steps and using the `stepNumber`.

```ts
// Retry from a step number (0-indexed)
await workflow.retry(ctx, workflowId, { from: 2 });

// Retry from a step by name
await workflow.retry(ctx, workflowId, { from: "eventName" });

// Retry from a step by function reference
await workflow.retry(ctx, workflowId, {
from: internal.example.myAction,
});
```

If a name or function reference is provided, it will be used to find the last
step with that name or function reference, and delete all subsequent steps, so
the workflow will start from that step when re-executing.

If the failure was from the workflow handler itself and you don't want to drop
any previous steps, you don't have to specify a step to retry from.

```ts
await workflow.retry(ctx, args.workflowId);
```

Like `workflow.start()`, you can pass `startAsync: true` to enqueue the retry
via the workpool instead of running it immediately:

```ts
await workflow.retry(ctx, workflowId, { startAsync: true });
```

By default it will execute the handler in the same transaction so any errors
will be immediately visible. However, this means that on a handler error, the
restart itself will also be rolled back and the workflow will be unchanged.

If you want to retry the workflow in a separate transaction, you can do so by
passing `startAsync: true`. This will enqueue the retry via the workpool instead
of running it immediately.

### Cleaning up a workflow

After a workflow has completed, you can clean up its storage with
Expand Down
38 changes: 38 additions & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,44 @@ export class WorkflowManager {
}
}

/**
* Retry a previously-failed workflow, optionally from a specific step.
*
* @param ctx - The Convex context.
* @param workflowId - The workflow ID.
* @param options - Options for the retry.
* @param options.from - The step to retry from. Can be a step number,
* a step name, or the function / workflow `internal.foo.bar`.
* Steps from this point onwards will be deleted before restarting.
* @param options.startAsync - If true, the workflow will be enqueued
* via the workpool instead of running immediately.
*/
async retry(
ctx: RunMutationCtx,
workflowId: WorkflowId,
options?: {
from?: number | string | FunctionReference<any, any>;
startAsync?: boolean;
},
): Promise<void> {
let from: number | string | undefined;
if (options?.from !== undefined) {
if (
typeof options.from === "number" ||
typeof options.from === "string"
) {
from = options.from;
} else {
from = safeFunctionName(options.from);
}
}
await ctx.runMutation(this.component.workflow.retry, {
workflowId,
from,
startAsync: options?.startAsync,
});
}

/**
* Cancel a running workflow.
*
Expand Down
9 changes: 8 additions & 1 deletion src/component/_generated/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
cleanup: FunctionReference<
"mutation",
"internal",
{ workflowId: string },
{ force?: boolean; workflowId: string },
boolean,
Name
>;
Expand Down Expand Up @@ -486,5 +486,12 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
},
Name
>;
retry: FunctionReference<
"mutation",
"internal",
{ from?: number | string; startAsync?: boolean; workflowId: string },
null,
Name
>;
};
};
2 changes: 1 addition & 1 deletion src/component/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export const load = query({

export const startSteps = mutation({
args: {
workflowId: v.string(),
workflowId: v.id("workflows"),
generationNumber: v.number(),
steps: v.array(
v.object({
Expand Down
28 changes: 5 additions & 23 deletions src/component/model.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,19 @@
import type { Id } from "./_generated/dataModel.js";
import type { QueryCtx } from "./_generated/server.js";

export async function getWorkflow(
ctx: QueryCtx,
workflowIdStr: string,
expectedGenerationNumber: number | null,
workflowId: Id<"workflows">,
expectedGenerationNumber: number,
) {
const workflowId = ctx.db.normalizeId("workflows", workflowIdStr);
if (!workflowId) {
throw new Error(`Invalid workflow ID: ${workflowIdStr}`);
}
const workflow = await ctx.db.get(workflowId);
const workflow = await ctx.db.get("workflows", workflowId);
if (!workflow) {
throw new Error(`Workflow not found: ${workflowId}`);
}
if (
expectedGenerationNumber !== null &&
workflow.generationNumber !== expectedGenerationNumber
) {
if (workflow.generationNumber !== expectedGenerationNumber) {
throw new Error(
`Invalid generation number: ${expectedGenerationNumber} for workflow ${workflow.name} (${workflowId})`,
);
}
return workflow;
}

export async function getJournalEntry(ctx: QueryCtx, journalIdStr: string) {
const journalId = ctx.db.normalizeId("steps", journalIdStr);
if (!journalId) {
throw new Error(`Invalid journal ID: ${journalIdStr}`);
}
const journalEntry = await ctx.db.get(journalId);
if (!journalEntry) {
throw new Error(`Journal entry not found: ${journalId}`);
}
return journalEntry;
}
13 changes: 9 additions & 4 deletions src/component/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import { type Infer, v } from "convex/values";
import { components, internal } from "./_generated/api.js";
import { internalMutation, type MutationCtx } from "./_generated/server.js";
import { logLevel } from "./logging.js";
import { getWorkflow } from "./model.js";
import { getDefaultLogger } from "./utils.js";
import { completeHandler } from "./workflow.js";
import type { Doc } from "./_generated/dataModel.js";
Expand Down Expand Up @@ -109,7 +108,12 @@ async function onCompleteHandler(
return;
}
const journalEntry = await ctx.db.get(stepId);
assert(journalEntry, `Journal entry not found: ${stepId}`);
if (!journalEntry) {
console.error(
`Journal entry not found: ${stepId}. This is likely because it was already cleaned up.`,
);
return;
}
const workflowId = journalEntry.workflowId;

if (
Expand All @@ -127,10 +131,11 @@ async function onCompleteHandler(
return;
}
const { generationNumber } = args.context;
const workflow = await getWorkflow(ctx, workflowId, null);
const workflow = await ctx.db.get("workflows", workflowId);
assert(workflow, `Workflow not found: ${workflowId}`);
if (workflow.generationNumber !== generationNumber) {
console.error(
`Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`,
`Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}. Expected ${generationNumber}`,
);
return;
}
Expand Down
Loading
Loading