Skip to content
Open
2 changes: 2 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import type * as admin from "../admin.js";
import type * as catchError from "../catchError.js";
import type * as e2e from "../e2e.js";
import type * as eventTimeout from "../eventTimeout.js";
import type * as example from "../example.js";
import type * as inlineTest from "../inlineTest.js";
import type * as nestedWorkflow from "../nestedWorkflow.js";
Expand All @@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{
admin: typeof admin;
catchError: typeof catchError;
e2e: typeof e2e;
eventTimeout: typeof eventTimeout;
example: typeof example;
inlineTest: typeof inlineTest;
nestedWorkflow: typeof nestedWorkflow;
Expand Down
131 changes: 131 additions & 0 deletions example/convex/eventTimeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {
defineEvent,
type EventId,
vEventId,
vWorkflowId,
WorkflowManager,
type WorkflowId,
} from "@convex-dev/workflow";
import { v } from "convex/values";
import { components, internal } from "./_generated/api";
import { internalMutation } from "./_generated/server";

const workflow = new WorkflowManager(components.workflow, {
internalMutation,
});

const approvalEvent = defineEvent({
name: "approval",
validator: v.union(
v.object({ kind: v.literal("approved") }),
v.object({ kind: v.literal("timeout") }),
),
});

/**
* A workflow that waits for an approval event but times out after 30 seconds.
*
* It uses `step.run()` to schedule a timeout function via `ctx.scheduler`,
* then awaits the event. If the event resolves with a real approval, the
* timeout function is canceled. If the timeout fires first, the workflow
* handles it gracefully.
*/
export const eventTimeoutWorkflow = workflow.define({
args: {},
returns: v.string(),
handler: async (step): Promise<string> => {
// 1. Create the event so we have an ID to pass to the timeout function.
const eventId = await step.runMutation(
internal.eventTimeout.createApprovalEvent,
{ workflowId: step.workflowId },
);

// 2. Schedule a function that will complete the event with { kind: "timeout" }
// after 30 seconds, unless it's canceled first.
const scheduledFnId = await step.run(
async (ctx) => {
return ctx.scheduler.runAfter(
30_000,
internal.eventTimeout.timeoutEvent,
{ eventId },
);
},
{ name: "scheduleTimeout" },
);
Comment on lines +45 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Journal the captured IDs as deps in both inline steps.

Both handlers close over eventId / scheduledFnId but currently journal {}. If this workflow is replayed or restarted before those steps, the old inline entries can be reused for a different event or scheduled function, so the timeout won’t be scheduled or canceled correctly.

Suggested change
     const scheduledFnId = await step.run(
       async (ctx) => {
         return ctx.scheduler.runAfter(
           30_000,
           internal.eventTimeout.timeoutEvent,
           { eventId },
         );
       },
-      { name: "scheduleTimeout" },
+      {
+        name: "scheduleTimeout",
+        deps: { eventId },
+      },
     );
@@
       await step.run(
         async (ctx) => {
           const scheduled = await ctx.db.system.get(scheduledFnId);
           if (scheduled?.state.kind === "pending")
             await ctx.scheduler.cancel(scheduledFnId);
         },
-        { name: "cancelTimeout" },
+        {
+          name: "cancelTimeout",
+          deps: { scheduledFnId },
+        },
       );

Also applies to: 60-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/eventTimeout.ts` around lines 44 - 53, The inline step runs
are closing over eventId and scheduledFnId but journal an empty deps object,
risking reuse of stale inline entries; update both step.run calls (the
scheduling step that calls ctx.scheduler.runAfter with
internal.eventTimeout.timeoutEvent and the cancellation step that uses
scheduledFnId) to pass deps that include the closed-over values (e.g., deps: {
eventId } for the scheduler step and deps: { eventId, scheduledFnId } for the
cancel step) so replay/restart correctly distinguishes entries and
schedules/cancels the right timeout.


// 3. Wait for the event — either a real approval or the timeout.
const result = await step.awaitEvent({ ...approvalEvent, id: eventId });

// 4. If we got a real approval, cancel the scheduled timeout function.
if (result.kind === "approved") {
await step.run(
async (ctx) => {
const scheduled = await ctx.db.system.get(scheduledFnId);

Check failure on line 63 in example/convex/eventTimeout.ts

View workflow job for this annotation

GitHub Actions / Test and lint

Database get call should include an explicit table name as the first argument. Expected: db.get("_scheduled_functions", ...)
if (scheduled?.state.kind === "pending")
await ctx.scheduler.cancel(scheduledFnId);
},
{ name: "cancelTimeout" },
);
return "approved";
}

return "timed out";
},
});

// ── Helper mutations ──────────────────────────

export const createApprovalEvent = internalMutation({
args: { workflowId: vWorkflowId },
returns: vEventId("approval"),
handler: async (ctx, args): Promise<EventId<"approval">> => {
return await workflow.createEvent(ctx, {
name: "approval",
workflowId: args.workflowId,
});
},
});

export const timeoutEvent = internalMutation({
args: { eventId: vEventId("approval") },
handler: async (ctx, args) => {
await workflow.sendEvent(ctx, {
...approvalEvent,
id: args.eventId,
value: { kind: "timeout" },
});
},
});

export const approve = internalMutation({
args: { eventId: vEventId("approval") },
handler: async (ctx, args) => {
await workflow.sendEvent(ctx, {
...approvalEvent,
id: args.eventId,
value: { kind: "approved" },
});
},
});

/**
* Test this from the CLI:
* ```sh
* npx convex run eventTimeout:startEventTimeout
* ```
* Then either approve before 30s:
* ```sh
* npx convex run eventTimeout:approve '{"eventId":"..."}'
* ```
* Or wait 30s for the timeout to fire automatically.
*/
export const startEventTimeout = internalMutation({
args: {},
handler: async (ctx): Promise<WorkflowId> => {
return await workflow.start(
ctx,
internal.eventTimeout.eventTimeoutWorkflow,
{},
);
},
Comment on lines +111 to +130
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

CLI example cannot be run as documented.

The docstring instructs users to call approve '{"eventId":"..."}', but startEventTimeout only returns the WorkflowId. The eventId is created inside the workflow handler and is never exposed to the caller.

Consider either:

  1. Persisting the eventId to a table keyed by workflowId so users can look it up
  2. Returning both workflowId and eventId from the workflow's first step
  3. Adding a helper query that retrieves the approval event for a given workflow
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/eventTimeout.ts` around lines 110 - 129, The CLI can't approve
because startEventTimeout (which calls workflow.start with
internal.eventTimeout.eventTimeoutWorkflow) only returns a WorkflowId while the
workflow generates the eventId internally; persist the eventId keyed by the
workflowId so callers can look it up. Update the eventTimeoutWorkflow handler to
write the generated eventId into a small table (e.g., EventApproval table) keyed
by the WorkflowId returned by workflow.start, and update startEventTimeout to
return the WorkflowId (unchanged) while adding a helper query (or document) that
reads EventApproval[workflowId] to get the eventId; reference startEventTimeout,
workflow.start, internal.eventTimeout.eventTimeoutWorkflow, and the new
EventApproval mapping in your changes.

});
38 changes: 13 additions & 25 deletions example/convex/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { components } from "./_generated/api.js";
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";

export const workflow = new WorkflowManager(components.workflow);
export const workflow = new WorkflowManager(components.workflow, {
internalMutation,
});

export const myWorkflow = workflow
.define({
Expand Down Expand Up @@ -52,9 +54,16 @@ export const myWorkflow = workflow
console.timeLog("weather", temperature);
// Wait a beat before writing the result.
await step.sleep(100, { name: "cooldown" });
await step.runMutation(internal.example.updateFlow, {
workflowId: step.workflowId,
out: { name, celsius, farenheit, windSpeed, windGust },
await step.run(async (ctx) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", step.workflowId))
.first();
if (flow) {
await ctx.db.patch("flows", flow._id, {
out: { name, celsius, farenheit, windSpeed, windGust },
});
}
});
console.timeEnd("overall");
return { name, celsius, farenheit, windSpeed, windGust };
Expand Down Expand Up @@ -173,24 +182,3 @@ export const getWeather = internalAction({
};
},
});

export const updateFlow = internalMutation({
args: {
workflowId: vWorkflowId,
out: v.any(),
},
returns: v.null(),
handler: async (ctx, args) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
.first();
if (!flow) {
console.warn(`Flow not found: ${args.workflowId}`);
return;
}
await ctx.db.patch("flows", flow._id, {
out: args.out,
});
},
});
64 changes: 47 additions & 17 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
type GenericDataModel,
type GenericMutationCtx,
type GenericQueryCtx,
type MutationBuilder,
type PaginationOptions,
type PaginationResult,
type RegisteredMutation,
Expand Down Expand Up @@ -86,8 +87,9 @@ export type WorkflowDefinition<
export type WorkflowHandler<
ArgsValidator extends PropertyValidators,
ReturnsValidator extends Validator<any, "required", any> | void,
DataModel extends GenericDataModel = GenericDataModel,
> = (
step: WorkflowCtx,
step: WorkflowCtx<DataModel>,
args: ObjectType<ArgsValidator>,
) => Promise<ReturnValueForOptionalValidator<ReturnsValidator>>;

Expand Down Expand Up @@ -117,17 +119,20 @@ export type WorkflowStatus =
export function defineWorkflow<
AV extends PropertyValidators,
RV extends Validator<any, "required", any> | void = void,
DM extends GenericDataModel = GenericDataModel,
>(
component: WorkflowComponent,
config: WorkflowDefinition<AV, RV>,
config: WorkflowDefinition<AV, RV> & {
internalMutation?: MutationBuilder<DM, "internal">;
},
): {
/**
* Define the workflow handler function.
* Returns a registered mutation to export from your Convex module.
*/
handler(
fn: (
step: WorkflowCtx,
step: WorkflowCtx<DM>,
args: ObjectType<AV>,
) => Promise<ReturnValueForOptionalValidator<RV>>,
): RegisteredMutation<
Expand All @@ -145,14 +150,15 @@ export function defineWorkflow<
export interface Workflow<
AV extends PropertyValidators,
RV extends Validator<any, "required", any> | void,
DM extends GenericDataModel = GenericDataModel,
> {
/**
* Define the workflow handler function.
* Returns a registered mutation to export from your Convex module.
*/
handler(
fn: (
step: WorkflowCtx,
step: WorkflowCtx<DM>,
args: ObjectType<AV>,
) => Promise<ReturnValueForOptionalValidator<RV>>,
): RegisteredMutation<
Expand Down Expand Up @@ -455,11 +461,27 @@ export async function cleanup(
});
}

export class WorkflowManager {
export class WorkflowManager<
DataModel extends GenericDataModel = GenericDataModel,
> {
constructor(
public component: WorkflowComponent,
public options?: {
workpoolOptions: WorkpoolOptions;
workpoolOptions?: WorkpoolOptions;
/**
* Provide your app's `internalMutation` (from `_generated/server`) to get
* a fully typed `ctx` in `step.run()` handlers, with your data model's
* tables available on `ctx.db`. This also lets any custom middleware
* you've configured run around the workflow.
*
* ```ts
* import { internalMutation } from "./_generated/server";
* const workflow = new WorkflowManager(components.workflow, {
* internalMutation,
* });
* ```
*/
internalMutation?: MutationBuilder<DataModel, "internal">;
},
) {}

Expand All @@ -474,7 +496,7 @@ export class WorkflowManager {
ReturnsValidator extends Validator<unknown, "required", string> | void,
>(
workflow: WorkflowDefinition<ArgsValidator, ReturnsValidator> & {
handler: WorkflowHandler<ArgsValidator, ReturnsValidator>;
handler: WorkflowHandler<ArgsValidator, ReturnsValidator, DataModel>;
},
): RegisteredMutation<
"internal",
Expand All @@ -493,7 +515,7 @@ export class WorkflowManager {
*/
handler(
fn: (
step: WorkflowCtx,
step: WorkflowCtx<DataModel>,
args: ObjectType<ArgsValidator>,
) => Promise<ReturnValueForOptionalValidator<ReturnsValidator>>,
): RegisteredMutation<
Expand All @@ -507,14 +529,19 @@ export class WorkflowManager {
ReturnsValidator extends Validator<unknown, "required", string> | void,
>(
workflow: WorkflowDefinition<ArgsValidator, ReturnsValidator> & {
handler?: WorkflowHandler<ArgsValidator, ReturnsValidator>;
handler?: WorkflowHandler<ArgsValidator, ReturnsValidator, DataModel>;
},
): unknown {
const withMutation = {
...workflow,
internalMutation: this.options?.internalMutation,
};
if (workflow.handler) {
return workflowMutation(
this.component,
workflow as WorkflowDefinition<ArgsValidator, ReturnsValidator> & {
handler: WorkflowHandler<ArgsValidator, ReturnsValidator>;
withMutation as WorkflowDefinition<ArgsValidator, ReturnsValidator> & {
handler: WorkflowHandler<ArgsValidator, ReturnsValidator, DataModel>;
internalMutation?: MutationBuilder<DataModel, "internal">;
},
this.options?.workpoolOptions,
);
Expand All @@ -523,13 +550,16 @@ export class WorkflowManager {
// to support, in order to get the maxParallelism / etc. in there.
// Direct users of defineWorkflow should instead configure those values
// via configuring the component directly.
return defineWorkflow<ArgsValidator, ReturnsValidator>(this.component, {
...workflow,
workpoolOptions: {
...this.options?.workpoolOptions,
...workflow.workpoolOptions,
return defineWorkflow<ArgsValidator, ReturnsValidator, DataModel>(
this.component,
{
...withMutation,
workpoolOptions: {
...this.options?.workpoolOptions,
...workflow.workpoolOptions,
},
},
});
);
}

/**
Expand Down
Loading
Loading