-
Notifications
You must be signed in to change notification settings - Fork 17
allow ctx.run(ctx => {...}) #231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bce1289
64a90d6
c760955
fc3fcde
5dcfa27
f758fad
7ae937f
1afbaee
ed25c97
f12a238
03e100a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| import { | ||
| defineEvent, | ||
| type EventId, | ||
| vEventId, | ||
| vWorkflowId, | ||
| WorkflowManager, | ||
| type WorkflowId, | ||
| } from "@convex-dev/workflow"; | ||
| import { v } from "convex/values"; | ||
| import { components, internal } from "./_generated/api"; | ||
| import { internalMutation } from "./_generated/server"; | ||
|
|
||
| const workflow = new WorkflowManager(components.workflow, { | ||
| internalMutation, | ||
| }); | ||
|
|
||
| const approvalEvent = defineEvent({ | ||
| name: "approval", | ||
| validator: v.union( | ||
| v.object({ kind: v.literal("approved") }), | ||
| v.object({ kind: v.literal("timeout") }), | ||
| ), | ||
| }); | ||
|
|
||
| /** | ||
| * A workflow that waits for an approval event but times out after 30 seconds. | ||
| * | ||
| * It uses `step.run()` to schedule a timeout function via `ctx.scheduler`, | ||
| * then awaits the event. If the event resolves with a real approval, the | ||
| * timeout function is canceled. If the timeout fires first, the workflow | ||
| * handles it gracefully. | ||
| */ | ||
| export const eventTimeoutWorkflow = workflow.define({ | ||
| args: {}, | ||
| returns: v.string(), | ||
| handler: async (step): Promise<string> => { | ||
| // 1. Create the event so we have an ID to pass to the timeout function. | ||
| const eventId = await step.runMutation( | ||
| internal.eventTimeout.createApprovalEvent, | ||
| { workflowId: step.workflowId }, | ||
| ); | ||
|
|
||
| // 2. Schedule a function that will complete the event with { kind: "timeout" } | ||
| // after 30 seconds, unless it's canceled first. | ||
| const scheduledFnId = await step.run( | ||
| async (ctx) => { | ||
| return ctx.scheduler.runAfter( | ||
| 30_000, | ||
| internal.eventTimeout.timeoutEvent, | ||
| { eventId }, | ||
| ); | ||
| }, | ||
| { name: "scheduleTimeout" }, | ||
| ); | ||
|
|
||
| // 3. Wait for the event — either a real approval or the timeout. | ||
| const result = await step.awaitEvent({ ...approvalEvent, id: eventId }); | ||
|
|
||
| // 4. If we got a real approval, cancel the scheduled timeout function. | ||
| if (result.kind === "approved") { | ||
| await step.run( | ||
| async (ctx) => { | ||
| const scheduled = await ctx.db.system.get(scheduledFnId); | ||
| if (scheduled?.state.kind === "pending") | ||
| await ctx.scheduler.cancel(scheduledFnId); | ||
| }, | ||
| { name: "cancelTimeout" }, | ||
| ); | ||
| return "approved"; | ||
| } | ||
|
|
||
| return "timed out"; | ||
| }, | ||
| }); | ||
|
|
||
| // ── Helper mutations ────────────────────────── | ||
|
|
||
| export const createApprovalEvent = internalMutation({ | ||
| args: { workflowId: vWorkflowId }, | ||
| returns: vEventId("approval"), | ||
| handler: async (ctx, args): Promise<EventId<"approval">> => { | ||
| return await workflow.createEvent(ctx, { | ||
| name: "approval", | ||
| workflowId: args.workflowId, | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| export const timeoutEvent = internalMutation({ | ||
| args: { eventId: vEventId("approval") }, | ||
| handler: async (ctx, args) => { | ||
| await workflow.sendEvent(ctx, { | ||
| ...approvalEvent, | ||
| id: args.eventId, | ||
| value: { kind: "timeout" }, | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| export const approve = internalMutation({ | ||
| args: { eventId: vEventId("approval") }, | ||
| handler: async (ctx, args) => { | ||
| await workflow.sendEvent(ctx, { | ||
| ...approvalEvent, | ||
| id: args.eventId, | ||
| value: { kind: "approved" }, | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| /** | ||
| * Test this from the CLI: | ||
| * ```sh | ||
| * npx convex run eventTimeout:startEventTimeout | ||
| * ``` | ||
| * Then either approve before 30s: | ||
| * ```sh | ||
| * npx convex run eventTimeout:approve '{"eventId":"..."}' | ||
| * ``` | ||
| * Or wait 30s for the timeout to fire automatically. | ||
| */ | ||
| export const startEventTimeout = internalMutation({ | ||
| args: {}, | ||
| handler: async (ctx): Promise<WorkflowId> => { | ||
| return await workflow.start( | ||
| ctx, | ||
| internal.eventTimeout.eventTimeoutWorkflow, | ||
| {}, | ||
| ); | ||
| }, | ||
|
Comment on lines
+111
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CLI example cannot be run as documented. The docstring instructs users to call Consider either:
🤖 Prompt for AI Agents |
||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Journal the captured IDs as
depsin both inline steps.Both handlers close over
eventId/scheduledFnIdbut currently journal{}. If this workflow is replayed or restarted before those steps, the old inline entries can be reused for a different event or scheduled function, so the timeout won’t be scheduled or canceled correctly.Suggested change
const scheduledFnId = await step.run( async (ctx) => { return ctx.scheduler.runAfter( 30_000, internal.eventTimeout.timeoutEvent, { eventId }, ); }, - { name: "scheduleTimeout" }, + { + name: "scheduleTimeout", + deps: { eventId }, + }, ); @@ await step.run( async (ctx) => { const scheduled = await ctx.db.system.get(scheduledFnId); if (scheduled?.state.kind === "pending") await ctx.scheduler.cancel(scheduledFnId); }, - { name: "cancelTimeout" }, + { + name: "cancelTimeout", + deps: { scheduledFnId }, + }, );Also applies to: 60-67
🤖 Prompt for AI Agents