From bce12898d5657b2b78ccda9add25b9cb33af48b9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 23 Mar 2026 22:43:02 -0700 Subject: [PATCH 01/11] allow ctx.run --- src/client/index.ts | 32 +++++++- src/client/step.ts | 76 ++++++++++++------ src/client/stepContext.test.ts | 109 +++++++++++++++++++++++++- src/client/workflowContext.ts | 59 +++++++++++++- src/client/workflowMutation.ts | 20 +++-- src/component/_generated/component.ts | 41 +++++++++- src/component/journal.ts | 6 +- src/component/schema.ts | 4 + src/component/workflow.ts | 11 ++- src/types.ts | 2 + 10 files changed, 324 insertions(+), 36 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index b75e59cf..cb8c875c 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -12,6 +12,7 @@ import { type GenericDataModel, type GenericMutationCtx, type GenericQueryCtx, + type MutationBuilder, type PaginationOptions, type PaginationResult, type RegisteredMutation, @@ -77,17 +78,38 @@ export type CallbackOptions = { export type WorkflowDefinition< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void = any, + DataModel extends GenericDataModel = GenericDataModel, > = { args?: ArgsValidator; returns?: ReturnsValidator; workpoolOptions?: WorkpoolRetryOptions; + /** + * Provide your app's `internalMutation` (from `_generated/server`) to get + * a fully typed `ctx` in `ctx.run()` handlers, with your data model's + * tables available on `ctx.db`. This also lets any custom middleware + * you've configured run around the workflow. + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * workflow.define({ + * internalMutation, + * handler: async (ctx, args) => { + * const user = await ctx.run(async (ctx) => { + * return ctx.db.query("users").first(); // fully typed + * }); + * }, + * }); + * ``` + */ + internalMutation?: MutationBuilder; }; export type WorkflowHandler< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, + DataModel extends GenericDataModel, > = ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>; @@ -472,9 +494,10 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, + DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition & { - handler: WorkflowHandler; + workflow: WorkflowDefinition & { + handler: WorkflowHandler; }, ): RegisteredMutation< "internal", @@ -484,8 +507,9 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, + DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition, + workflow: WorkflowDefinition, ): { /** * Define the workflow handler function. diff --git a/src/client/step.ts b/src/client/step.ts index 3a4e5b30..1357202b 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -44,6 +44,13 @@ export type StepRequest = { | { kind: "sleep"; args: Record; + } + | { + kind: "inline"; + handler: ( + ctx: GenericMutationCtx, + ) => Promise; + args: Record; }; retry: RetryBehavior | boolean | undefined; inline: boolean; @@ -160,30 +167,47 @@ export class StepExecutor { let runResult: RunResult | undefined; if (message.inline) { - if (target.kind !== "function" || target.functionType === "action") { + if (target.kind === "inline") { + try { + const returnValue = (await target.handler(this.ctx)) ?? null; + runResult = { kind: "success", returnValue }; + } catch (error: unknown) { + runResult = { + kind: "failed", + error: formatErrorWithStack(error), + }; + } + } else if ( + target.kind === "function" && + target.functionType !== "action" + ) { + try { + const result = + target.functionType === "query" + ? await this.ctx.runQuery( + target.function as FunctionReference< + typeof target.functionType + >, + target.args, + ) + : await this.ctx.runMutation( + target.function as FunctionReference< + typeof target.functionType + >, + target.args, + ); + runResult = { kind: "success", returnValue: result ?? null }; + } catch (error: unknown) { + runResult = { + kind: "failed", + error: formatErrorWithStack(error), + }; + } + } else { throw new Error( - "Inline execution is only supported for queries and mutations.", + "Inline execution is only supported for queries, mutations, and inline handlers.", ); } - try { - const result = - target.functionType === "query" - ? await this.ctx.runQuery( - target.function as FunctionReference< - typeof target.functionType - >, - target.args, - ) - : await this.ctx.runMutation( - target.function as FunctionReference< - typeof target.functionType - >, - target.args, - ); - runResult = { kind: "success", returnValue: result ?? null }; - } catch (error: unknown) { - runResult = { kind: "failed", error: formatErrorWithStack(error) }; - } } const commonFields = { @@ -226,8 +250,16 @@ export class StepExecutor { ...commonFields, }; break; - default: + case "inline": + step = { + kind: "inline" as const, + ...commonFields, + }; + break; + default: { + const _: never = target; throw new Error(`Unknown step kind: ${(target as any).kind}`); + } } return { retry: message.retry, diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index e8476f50..fa711adc 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -21,7 +21,7 @@ function fakeFuncRef(name: string) { function journalEntry( overrides: { name?: string; - kind?: "function" | "workflow" | "event"; + kind?: "function" | "workflow" | "event" | "inline"; args?: Record; runResult?: RunResult; stepNumber?: number; @@ -67,6 +67,15 @@ function journalEntry( }, } as unknown as JournalEntry; } + if (kind === "inline") { + return { + ...base, + step: { + kind: "inline", + ...stepCommon, + }, + } as unknown as JournalEntry; + } return { ...base, step: { @@ -344,6 +353,104 @@ describe("StepExecutor + WorkflowCtx integration", () => { expect(result).toEqual([1, 2, 3]); }); + + it("ctx.run replays a successful inline handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-11" as any, channel); + + const entry = journalEntry({ + name: "run", + kind: "inline", + args: {}, + runResult: { kind: "success", returnValue: 99 }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => 99), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe(99); + }); + + it("ctx.run replays a failed inline handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-12" as any, channel); + + const entry = journalEntry({ + name: "run", + kind: "inline", + args: {}, + runResult: { kind: "failed", error: "inline boom" }, + }); + + const [error] = await Promise.all([ + ctx.run(async () => { + throw new Error("inline boom"); + }).catch((e: Error) => e), + replayFromJournal(channel, [entry]), + ]); + + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("inline boom"); + }); + + it("ctx.run uses custom name when provided", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-13" as any, channel); + + const entry = journalEntry({ + name: "myCustomStep", + kind: "inline", + args: {}, + runResult: { kind: "success", returnValue: "named" }, + }); + + const handler = async () => { + return ctx.run(async () => "named", { name: "myCustomStep" }); + }; + + const [result] = await Promise.all([ + handler(), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe("named"); + }); + + it("ctx.run works sequentially with other steps", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-14" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + kind: "inline", + args: {}, + runResult: { kind: "success", returnValue: "inline-result" }, + stepNumber: 0, + }), + journalEntry({ + name: "step2", + args: {}, + runResult: { kind: "success", returnValue: "action-result" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + const a = await ctx.run(async () => "inline-result"); + const b = await ctx.runAction(fakeFuncRef("step2") as any, {}); + return [a, b]; + }; + + const [results] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(results).toEqual(["inline-result", "action-result"]); + }); }); describe("unstableArgs", () => { diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 18f44f17..04cc6ec7 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -7,6 +7,8 @@ import type { FunctionReturnType, FunctionType, FunctionVisibility, + GenericDataModel, + GenericMutationCtx, } from "convex/server"; import type { Validator } from "convex/values"; import type { EventId, SchedulerOptions, WorkflowId } from "../types.js"; @@ -39,7 +41,9 @@ type InlineArgs = inline?: false; }; -export type WorkflowCtx = { +export type WorkflowCtx< + DataModel extends GenericDataModel = GenericDataModel, +> = { /** * The ID of the workflow currently running. */ @@ -95,6 +99,43 @@ export type WorkflowCtx = { opts?: RunOptions, ): Promise>; + /** + * Run a handler inline within the workflow's mutation transaction. + * The result is journaled like any other step, so on replay it returns the + * saved value without re-executing the handler. + * + * This gives direct access to the underlying mutation context, allowing + * database reads/writes, running queries, and scheduling functions all + * within the same transaction as the workflow handler. + * + * The handler can read from variables in the enclosing scope, but should + * not modify them — on replay the handler is skipped and the journaled + * result is returned, so any side effects outside the handler's return + * value will not be replayed. + * + * To get a fully typed `ctx` with your data model, provide your app's + * `internalMutation` in the workflow definition: + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * workflow.define({ + * internalMutation, + * handler: async (ctx, args) => { + * const user = await ctx.run(async (ctx) => { + * return ctx.db.query("users").first(); // fully typed + * }); + * }, + * }); + * ``` + * + * @param handler - A function receiving the mutation context to run inline. + * @param opts - Options for naming the step. + */ + run( + handler: (ctx: GenericMutationCtx) => T | Promise, + opts?: { name?: string }, + ): Promise; + /** * Blocks until a matching event is sent to this workflow. * @@ -150,6 +191,22 @@ export function createWorkflowCtx( return runFunction(sender, "action", action, args, opts); }, + run: async (handler, opts?) => { + return run(sender, { + name: opts?.name ?? "run", + target: { + kind: "inline", + handler: handler as ( + ctx: GenericMutationCtx, + ) => Promise, + args: {} as Record, + }, + retry: undefined, + inline: true, + schedulerOptions: {}, + }) as any; + }, + runWorkflow: async (workflow, args, opts?) => { const { name, unstableArgs, ...schedulerOptions } = opts ?? {}; return run(sender, { diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 10f22577..20d68a7f 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -5,6 +5,8 @@ import { createFunctionHandle, internalMutationGeneric, makeFunctionReference, + type GenericDataModel, + type GenericMutationCtx, type RegisteredMutation, } from "convex/server"; import { @@ -18,7 +20,7 @@ import { type JournalEntry } from "../component/schema.js"; import { setupEnvironment } from "./environment.js"; import type { WorkflowDefinition, WorkflowHandler } from "./index.js"; import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js"; -import { createWorkflowCtx } from "./workflowContext.js"; +import { createWorkflowCtx, type WorkflowCtx } from "./workflowContext.js"; import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { type WorkflowComponent } from "./types.js"; import { vWorkflowId } from "../types.js"; @@ -47,9 +49,12 @@ const INVALID_WORKFLOW_MESSAGE = `Invalid arguments for workflow: Did you invoke // function handle to the workflow component for execution. This function runs // one "poll" of the workflow, replaying its execution from the journal until // it blocks next. -export function workflowMutation( +export function workflowMutation< + ArgsValidator extends PropertyValidators, + DataModel extends GenericDataModel = GenericDataModel, +>( component: WorkflowComponent, - registered: WorkflowDefinition & { + registered: WorkflowDefinition & { handler: WorkflowHandler; }, defaultWorkpoolOptions?: WorkpoolOptions, @@ -59,7 +64,9 @@ export function workflowMutation( ...defaultWorkpoolOptions, ...registered.workpoolOptions, }; - return internalMutationGeneric({ + const mutationBuilder = (registered.internalMutation ?? + internalMutationGeneric) as typeof internalMutationGeneric; + return mutationBuilder({ handler: async (ctx, args) => { if (!validate(workflowArgs, args) && boundFn && "args" in args) { // Bound workflow called directly with { args: ... } @@ -129,7 +136,10 @@ export function workflowMutation( const channel = new BaseChannel( workpoolOptions.maxParallelism ?? 10, ); - const step = createWorkflowCtx(workflowId, channel); + const step = createWorkflowCtx( + workflowId, + channel, + ) as unknown as WorkflowCtx; const executor = new StepExecutor( workflowId, generationNumber, diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 0c75eeae..03f06dd1 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -126,6 +126,19 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + kind: "inline"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; }; stepNumber: number; workflowId: string; @@ -220,6 +233,19 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + kind: "inline"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; }; }>; workflowId: string; @@ -413,6 +439,19 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + kind: "inline"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; }; stepNumber: number; workflowId: string; @@ -524,7 +563,7 @@ export type ComponentApi = args: any; completedAt?: number; eventId?: string; - kind: "function" | "workflow" | "event" | "sleep"; + kind: "function" | "workflow" | "event" | "sleep" | "inline"; name: string; nestedWorkflowId?: string; runResult?: diff --git a/src/component/journal.ts b/src/component/journal.ts index aa48574c..af5444b4 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -162,6 +162,10 @@ export const startSteps = mutation({ step.workflowId = workflowId; } else if (step.runResult) { // Already completed inline by the caller — nothing to enqueue. + assert( + !step.kind || step.kind === "function" || step.kind === "inline", + `Unexpected inline-completed step kind: ${step.kind}`, + ); console.event("stepCompleted", { workflowId: entry.workflowId, workflowName: workflow.name, @@ -181,7 +185,7 @@ export const startSteps = mutation({ {}, { context, onComplete, name, ...schedulerOptions }, ); - } else { + } else if (!step.kind || step.kind === "function") { const context: OnCompleteContext = { generationNumber, stepId, diff --git a/src/component/schema.ts b/src/component/schema.ts index 75443203..5a8086a0 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -67,6 +67,10 @@ export const step = v.union( workId: v.optional(vWorkIdValidator), ...stepCommonFields, }), + v.object({ + kind: v.literal("inline"), + ...stepCommonFields, + }), ); export type Step = Infer; diff --git a/src/component/workflow.ts b/src/component/workflow.ts index 04bf3684..ef1e7c2d 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -160,6 +160,8 @@ function publicStep(step: JournalEntry): WorkflowStep { kind: "workflow", nestedWorkflowId: publicWorkflowId(step.step.workflowId!), }; + // It didn't used to be set + case undefined: case "function": return { ...commonFields, @@ -172,8 +174,15 @@ function publicStep(step: JournalEntry): WorkflowStep { kind: "sleep", workId: step.step.workId!, }; - default: + case "inline": + return { + ...commonFields, + kind: "inline", + }; + default: { + const _: never = step.step; throw new Error(`Unknown step kind: ${(step.step as any).kind}`); + } } } diff --git a/src/types.ts b/src/types.ts index c4b6ad69..a293ced7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -58,6 +58,7 @@ export type WorkflowStep = { | { kind: "workflow"; nestedWorkflowId: WorkflowId } | { kind: "event"; eventId: EventId } | { kind: "sleep"; workId: WorkId } + | { kind: "inline" } ); export const vWorkflowStep = v.object({ @@ -77,6 +78,7 @@ export const vWorkflowStep = v.object({ v.literal("workflow"), v.literal("event"), v.literal("sleep"), + v.literal("inline"), ), workId: v.optional(vWorkIdValidator), nestedWorkflowId: v.optional(vWorkflowId), From 64a90d67f94d6f9e996e5fa8c0d0804df7349bee Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:06:21 -0700 Subject: [PATCH 02/11] change around types --- src/client/index.ts | 38 ++++++++++++++++--------- src/client/step.ts | 22 ++++++++------ src/client/stepContext.test.ts | 29 +++++++++---------- src/client/workflowContext.ts | 10 ++++--- src/client/workflowMutation.ts | 10 ++----- src/component/_generated/component.ts | 41 +-------------------------- src/component/journal.ts | 2 +- src/component/schema.ts | 4 --- src/component/workflow.ts | 5 ---- src/types.ts | 2 -- 10 files changed, 62 insertions(+), 101 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index cb8c875c..79dc505b 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -139,9 +139,10 @@ export type WorkflowStatus = export function defineWorkflow< AV extends PropertyValidators, RV extends Validator | void = void, + DM extends GenericDataModel = GenericDataModel, >( component: WorkflowComponent, - config: WorkflowDefinition, + config: WorkflowDefinition, ): { /** * Define the workflow handler function. @@ -149,7 +150,7 @@ export function defineWorkflow< */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -167,6 +168,7 @@ export function defineWorkflow< export interface Workflow< AV extends PropertyValidators, RV extends Validator | void, + DM extends GenericDataModel = GenericDataModel, > { /** * Define the workflow handler function. @@ -174,7 +176,7 @@ export interface Workflow< */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -517,7 +519,7 @@ export class WorkflowManager { */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -529,16 +531,21 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, + DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition & { - handler?: WorkflowHandler; + workflow: WorkflowDefinition & { + handler?: WorkflowHandler; }, ): unknown { if (workflow.handler) { return workflowMutation( this.component, - workflow as WorkflowDefinition & { - handler: WorkflowHandler; + workflow as WorkflowDefinition< + ArgsValidator, + ReturnsValidator, + DataModel + > & { + handler: WorkflowHandler; }, this.options?.workpoolOptions, ); @@ -547,13 +554,16 @@ export class WorkflowManager { // to support, in order to get the maxParallelism / etc. in there. // Direct users of defineWorkflow should instead configure those values // via configuring the component directly. - return defineWorkflow(this.component, { - ...workflow, - workpoolOptions: { - ...this.options?.workpoolOptions, - ...workflow.workpoolOptions, + return defineWorkflow( + this.component, + { + ...workflow, + workpoolOptions: { + ...this.options?.workpoolOptions, + ...workflow.workpoolOptions, + }, }, - }); + ); } /** diff --git a/src/client/step.ts b/src/client/step.ts index 1357202b..f3e04246 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -137,13 +137,17 @@ export class StepExecutor { entry.step, message.unstableArgs ? ["name", "kind"] : ["name", "kind", "args"], ); - const messageFields = message.unstableArgs - ? { name: message.name, kind: message.target.kind } - : { - name: message.name, - kind: message.target.kind, - args: message.target.args as Value, - }; + const messageFields = { + name: message.name, + kind: message.target.kind, + args: message.target.args as Value | undefined, + }; + if (message.unstableArgs) { + delete messageFields.args; + } + if (message.target.kind === "inline") { + messageFields.kind = "function"; + } const stepJson = JSON.stringify(convexToJson(stepFields)); const messageJson = JSON.stringify(convexToJson(messageFields)); if (stepJson !== messageJson) { @@ -252,7 +256,9 @@ export class StepExecutor { break; case "inline": step = { - kind: "inline" as const, + kind: "function", + functionType: "mutation", + handle: "inline", ...commonFields, }; break; diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index fa711adc..d1a5e78b 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -21,7 +21,9 @@ function fakeFuncRef(name: string) { function journalEntry( overrides: { name?: string; - kind?: "function" | "workflow" | "event" | "inline"; + kind?: "function" | "workflow" | "event"; + functionType?: "query" | "mutation" | "action"; + handle?: string; args?: Record; runResult?: RunResult; stepNumber?: number; @@ -51,8 +53,8 @@ function journalEntry( ...base, step: { kind: "function", - functionType: "action", - handle: "handle", + functionType: overrides.functionType ?? "action", + handle: overrides.handle ?? "handle", ...stepCommon, }, } as unknown as JournalEntry; @@ -67,15 +69,6 @@ function journalEntry( }, } as unknown as JournalEntry; } - if (kind === "inline") { - return { - ...base, - step: { - kind: "inline", - ...stepCommon, - }, - } as unknown as JournalEntry; - } return { ...base, step: { @@ -360,7 +353,8 @@ describe("StepExecutor + WorkflowCtx integration", () => { const entry = journalEntry({ name: "run", - kind: "inline", + functionType: "mutation", + handle: "inline", args: {}, runResult: { kind: "success", returnValue: 99 }, }); @@ -379,7 +373,8 @@ describe("StepExecutor + WorkflowCtx integration", () => { const entry = journalEntry({ name: "run", - kind: "inline", + functionType: "mutation", + handle: "inline", args: {}, runResult: { kind: "failed", error: "inline boom" }, }); @@ -401,7 +396,8 @@ describe("StepExecutor + WorkflowCtx integration", () => { const entry = journalEntry({ name: "myCustomStep", - kind: "inline", + functionType: "mutation", + handle: "inline", args: {}, runResult: { kind: "success", returnValue: "named" }, }); @@ -425,7 +421,8 @@ describe("StepExecutor + WorkflowCtx integration", () => { const entries = [ journalEntry({ name: "run", - kind: "inline", + functionType: "mutation", + handle: "inline", args: {}, runResult: { kind: "success", returnValue: "inline-result" }, stepNumber: 0, diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 04cc6ec7..b10a1678 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -173,10 +173,12 @@ export type OptionalRestArgs< ? [args?: Record, opts?: Opts] : [args: FuncRef["_args"], opts?: Opts]; -export function createWorkflowCtx( +export function createWorkflowCtx< + DataModel extends GenericDataModel = GenericDataModel, +>( workflowId: WorkflowId, sender: BaseChannel, -) { +): WorkflowCtx { return { workflowId, runQuery: async (query, args, opts?) => { @@ -196,7 +198,7 @@ export function createWorkflowCtx( name: opts?.name ?? "run", target: { kind: "inline", - handler: handler as ( + handler: handler as unknown as ( ctx: GenericMutationCtx, ) => Promise, args: {} as Record, @@ -254,7 +256,7 @@ export function createWorkflowCtx( } return result as any; }, - } satisfies WorkflowCtx; + }; } async function runFunction< diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 20d68a7f..7884ec5e 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -6,7 +6,6 @@ import { internalMutationGeneric, makeFunctionReference, type GenericDataModel, - type GenericMutationCtx, type RegisteredMutation, } from "convex/server"; import { @@ -20,7 +19,7 @@ import { type JournalEntry } from "../component/schema.js"; import { setupEnvironment } from "./environment.js"; import type { WorkflowDefinition, WorkflowHandler } from "./index.js"; import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js"; -import { createWorkflowCtx, type WorkflowCtx } from "./workflowContext.js"; +import { createWorkflowCtx } from "./workflowContext.js"; import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { type WorkflowComponent } from "./types.js"; import { vWorkflowId } from "../types.js"; @@ -55,7 +54,7 @@ export function workflowMutation< >( component: WorkflowComponent, registered: WorkflowDefinition & { - handler: WorkflowHandler; + handler: WorkflowHandler; }, defaultWorkpoolOptions?: WorkpoolOptions, boundFn?: string, @@ -136,10 +135,7 @@ export function workflowMutation< const channel = new BaseChannel( workpoolOptions.maxParallelism ?? 10, ); - const step = createWorkflowCtx( - workflowId, - channel, - ) as unknown as WorkflowCtx; + const step = createWorkflowCtx(workflowId, channel); const executor = new StepExecutor( workflowId, generationNumber, diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 03f06dd1..0c75eeae 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -126,19 +126,6 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; - } - | { - args: any; - argsSize: number; - completedAt?: number; - inProgress: boolean; - kind: "inline"; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; }; stepNumber: number; workflowId: string; @@ -233,19 +220,6 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; - } - | { - args: any; - argsSize: number; - completedAt?: number; - inProgress: boolean; - kind: "inline"; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; }; }>; workflowId: string; @@ -439,19 +413,6 @@ export type ComponentApi = | { kind: "canceled" }; startedAt: number; workId?: string; - } - | { - args: any; - argsSize: number; - completedAt?: number; - inProgress: boolean; - kind: "inline"; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; }; stepNumber: number; workflowId: string; @@ -563,7 +524,7 @@ export type ComponentApi = args: any; completedAt?: number; eventId?: string; - kind: "function" | "workflow" | "event" | "sleep" | "inline"; + kind: "function" | "workflow" | "event" | "sleep"; name: string; nestedWorkflowId?: string; runResult?: diff --git a/src/component/journal.ts b/src/component/journal.ts index af5444b4..6e6c3082 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -163,7 +163,7 @@ export const startSteps = mutation({ } else if (step.runResult) { // Already completed inline by the caller — nothing to enqueue. assert( - !step.kind || step.kind === "function" || step.kind === "inline", + !step.kind || step.kind === "function", `Unexpected inline-completed step kind: ${step.kind}`, ); console.event("stepCompleted", { diff --git a/src/component/schema.ts b/src/component/schema.ts index 5a8086a0..75443203 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -67,10 +67,6 @@ export const step = v.union( workId: v.optional(vWorkIdValidator), ...stepCommonFields, }), - v.object({ - kind: v.literal("inline"), - ...stepCommonFields, - }), ); export type Step = Infer; diff --git a/src/component/workflow.ts b/src/component/workflow.ts index ef1e7c2d..1cffeddb 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -174,11 +174,6 @@ function publicStep(step: JournalEntry): WorkflowStep { kind: "sleep", workId: step.step.workId!, }; - case "inline": - return { - ...commonFields, - kind: "inline", - }; default: { const _: never = step.step; throw new Error(`Unknown step kind: ${(step.step as any).kind}`); diff --git a/src/types.ts b/src/types.ts index a293ced7..c4b6ad69 100644 --- a/src/types.ts +++ b/src/types.ts @@ -58,7 +58,6 @@ export type WorkflowStep = { | { kind: "workflow"; nestedWorkflowId: WorkflowId } | { kind: "event"; eventId: EventId } | { kind: "sleep"; workId: WorkId } - | { kind: "inline" } ); export const vWorkflowStep = v.object({ @@ -78,7 +77,6 @@ export const vWorkflowStep = v.object({ v.literal("workflow"), v.literal("event"), v.literal("sleep"), - v.literal("inline"), ), workId: v.optional(vWorkIdValidator), nestedWorkflowId: v.optional(vWorkflowId), From c760955ea780172ee92fd0e8ca7da1081193146e Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 11:47:27 -0700 Subject: [PATCH 03/11] use inline syntax in example --- example/convex/example.ts | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/example/convex/example.ts b/example/convex/example.ts index c83cb942..e3f7026e 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -13,6 +13,7 @@ export const myWorkflow = workflow args: { location: v.string(), }, + internalMutation, workpoolOptions: { retryActionsByDefault: true, }, @@ -52,9 +53,16 @@ export const myWorkflow = workflow console.timeLog("weather", temperature); // Wait a beat before writing the result. await step.sleep(100, { name: "cooldown" }); - await step.runMutation(internal.example.updateFlow, { - workflowId: step.workflowId, - out: { name, celsius, farenheit, windSpeed, windGust }, + await step.run(async (ctx) => { + const flow = await ctx.db + .query("flows") + .withIndex("workflowId", (q) => q.eq("workflowId", step.workflowId)) + .first(); + if (flow) { + await ctx.db.patch("flows", flow._id, { + out: { name, celsius, farenheit, windSpeed, windGust }, + }); + } }); console.timeEnd("overall"); return { name, celsius, farenheit, windSpeed, windGust }; @@ -173,24 +181,3 @@ export const getWeather = internalAction({ }; }, }); - -export const updateFlow = internalMutation({ - args: { - workflowId: vWorkflowId, - out: v.any(), - }, - returns: v.null(), - handler: async (ctx, args) => { - const flow = await ctx.db - .query("flows") - .withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId)) - .first(); - if (!flow) { - console.warn(`Flow not found: ${args.workflowId}`); - return; - } - await ctx.db.patch("flows", flow._id, { - out: args.out, - }); - }, -}); From fc3fcde7d953f38f03cc877feec19d1e47f8f56b Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:30:57 -0700 Subject: [PATCH 04/11] add DataModel types to steps --- src/client/step.ts | 18 +++++++++--------- src/client/workflowContext.ts | 11 ++++++----- src/client/workflowMutation.ts | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/client/step.ts b/src/client/step.ts index f3e04246..b9a059f0 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -23,7 +23,7 @@ export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } | { type: "executorBlocked" }; -export type StepRequest = { +export type StepRequest = { name: string; target: | { @@ -47,9 +47,7 @@ export type StepRequest = { } | { kind: "inline"; - handler: ( - ctx: GenericMutationCtx, - ) => Promise; + handler: (ctx: GenericMutationCtx) => Promise; args: Record; }; retry: RetryBehavior | boolean | undefined; @@ -60,16 +58,16 @@ export type StepRequest = { resolve: (result: RunResult) => void; }; -export class StepExecutor { +export class StepExecutor { private journalEntrySize: number; constructor( private workflowId: string, private generationNumber: number, - private ctx: GenericMutationCtx, + private ctx: GenericMutationCtx, private component: WorkflowComponent, private journalEntries: Array, - private receiver: BaseChannel, + private receiver: BaseChannel>, private now: number, private workpoolOptions: WorkpoolOptions | undefined, ) { @@ -127,7 +125,7 @@ export class StepExecutor { }; } - completeMessage(message: StepRequest, entry: JournalEntry) { + completeMessage(message: StepRequest, entry: JournalEntry) { if (entry.step.inProgress) { throw new Error( `Assertion failed: not blocked but have in-progress journal entry`, @@ -163,7 +161,9 @@ export class StepExecutor { message.resolve(entry.step.runResult); } - async startSteps(messages: StepRequest[]): Promise { + async startSteps( + messages: StepRequest[], + ): Promise { const steps = await Promise.all( messages.map(async (message) => { const args = message.target.args ?? {}; diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index b10a1678..bf5134ac 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -177,7 +177,7 @@ export function createWorkflowCtx< DataModel extends GenericDataModel = GenericDataModel, >( workflowId: WorkflowId, - sender: BaseChannel, + sender: BaseChannel>, ): WorkflowCtx { return { workflowId, @@ -261,8 +261,9 @@ export function createWorkflowCtx< async function runFunction< F extends FunctionReference, + DM extends GenericDataModel, >( - sender: BaseChannel, + sender: BaseChannel>, functionType: FunctionType, f: F, args: Record | undefined, @@ -294,9 +295,9 @@ async function runFunction< }); } -async function run( - sender: BaseChannel, - request: Omit, +async function run( + sender: BaseChannel>, + request: Omit, "resolve">, ): Promise { let send: Promise; const p = new Promise((resolve) => { diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 7884ec5e..51edfa0e 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -132,7 +132,7 @@ export function workflowMutation< `Assertion failed: not blocked but have in-progress journal entry`, ); } - const channel = new BaseChannel( + const channel = new BaseChannel>( workpoolOptions.maxParallelism ?? 10, ); const step = createWorkflowCtx(workflowId, channel); From 5dcfa27cfff3bbf0622feebf97dc015cd949c7f9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:31:48 -0700 Subject: [PATCH 05/11] format --- src/client/stepContext.test.ts | 10 +- src/client/workflowContext.ts | 233 ++++++++++++++++----------------- 2 files changed, 122 insertions(+), 121 deletions(-) diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index d1a5e78b..1cb72ad5 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -380,9 +380,11 @@ describe("StepExecutor + WorkflowCtx integration", () => { }); const [error] = await Promise.all([ - ctx.run(async () => { - throw new Error("inline boom"); - }).catch((e: Error) => e), + ctx + .run(async () => { + throw new Error("inline boom"); + }) + .catch((e: Error) => e), replayFromJournal(channel, [entry]), ]); @@ -422,7 +424,7 @@ describe("StepExecutor + WorkflowCtx integration", () => { journalEntry({ name: "run", functionType: "mutation", - handle: "inline", + handle: "inline", args: {}, runResult: { kind: "success", returnValue: "inline-result" }, stepNumber: 0, diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index bf5134ac..d9535688 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -41,129 +41,128 @@ type InlineArgs = inline?: false; }; -export type WorkflowCtx< - DataModel extends GenericDataModel = GenericDataModel, -> = { - /** - * The ID of the workflow currently running. - */ - workflowId: WorkflowId; - /** - * Run a query with the given name and arguments. - * - * @param query - The query to run, like `internal.index.exampleQuery`. - * @param args - The arguments to the query function. - * @param opts - Options for scheduling and naming the query. - */ - runQuery>( - query: Query, - ...args: OptionalRestArgs - ): Promise>; +export type WorkflowCtx = + { + /** + * The ID of the workflow currently running. + */ + workflowId: WorkflowId; + /** + * Run a query with the given name and arguments. + * + * @param query - The query to run, like `internal.index.exampleQuery`. + * @param args - The arguments to the query function. + * @param opts - Options for scheduling and naming the query. + */ + runQuery>( + query: Query, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run a mutation with the given name and arguments. - * - * @param mutation - The mutation to run, like `internal.index.exampleMutation`. - * @param args - The arguments to the mutation function. - * @param opts - Options for scheduling and naming the mutation. - */ - runMutation< - Mutation extends FunctionReference<"mutation", FunctionVisibility>, - >( - mutation: Mutation, - ...args: OptionalRestArgs - ): Promise>; + /** + * Run a mutation with the given name and arguments. + * + * @param mutation - The mutation to run, like `internal.index.exampleMutation`. + * @param args - The arguments to the mutation function. + * @param opts - Options for scheduling and naming the mutation. + */ + runMutation< + Mutation extends FunctionReference<"mutation", FunctionVisibility>, + >( + mutation: Mutation, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run an action with the given name and arguments. - * - * @param action - The action to run, like `internal.index.exampleAction`. - * @param args - The arguments to the action function. - * @param opts - Options for retrying, scheduling and naming the action. - */ - runAction>( - action: Action, - ...args: OptionalRestArgs - ): Promise>; + /** + * Run an action with the given name and arguments. + * + * @param action - The action to run, like `internal.index.exampleAction`. + * @param args - The arguments to the action function. + * @param opts - Options for retrying, scheduling and naming the action. + */ + runAction>( + action: Action, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run a workflow with the given name and arguments. - * - * @param workflow - The workflow to run, like `internal.index.exampleWorkflow`. - * @param args - The arguments to the workflow function. - * @param opts - Options for retrying, scheduling and naming the workflow. - */ - runWorkflow>( - workflow: Workflow, - args: FunctionArgs["args"], - opts?: RunOptions, - ): Promise>; + /** + * Run a workflow with the given name and arguments. + * + * @param workflow - The workflow to run, like `internal.index.exampleWorkflow`. + * @param args - The arguments to the workflow function. + * @param opts - Options for retrying, scheduling and naming the workflow. + */ + runWorkflow>( + workflow: Workflow, + args: FunctionArgs["args"], + opts?: RunOptions, + ): Promise>; - /** - * Run a handler inline within the workflow's mutation transaction. - * The result is journaled like any other step, so on replay it returns the - * saved value without re-executing the handler. - * - * This gives direct access to the underlying mutation context, allowing - * database reads/writes, running queries, and scheduling functions all - * within the same transaction as the workflow handler. - * - * The handler can read from variables in the enclosing scope, but should - * not modify them — on replay the handler is skipped and the journaled - * result is returned, so any side effects outside the handler's return - * value will not be replayed. - * - * To get a fully typed `ctx` with your data model, provide your app's - * `internalMutation` in the workflow definition: - * - * ```ts - * import { internalMutation } from "./_generated/server"; - * workflow.define({ - * internalMutation, - * handler: async (ctx, args) => { - * const user = await ctx.run(async (ctx) => { - * return ctx.db.query("users").first(); // fully typed - * }); - * }, - * }); - * ``` - * - * @param handler - A function receiving the mutation context to run inline. - * @param opts - Options for naming the step. - */ - run( - handler: (ctx: GenericMutationCtx) => T | Promise, - opts?: { name?: string }, - ): Promise; + /** + * Run a handler inline within the workflow's mutation transaction. + * The result is journaled like any other step, so on replay it returns the + * saved value without re-executing the handler. + * + * This gives direct access to the underlying mutation context, allowing + * database reads/writes, running queries, and scheduling functions all + * within the same transaction as the workflow handler. + * + * The handler can read from variables in the enclosing scope, but should + * not modify them — on replay the handler is skipped and the journaled + * result is returned, so any side effects outside the handler's return + * value will not be replayed. + * + * To get a fully typed `ctx` with your data model, provide your app's + * `internalMutation` in the workflow definition: + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * workflow.define({ + * internalMutation, + * handler: async (ctx, args) => { + * const user = await ctx.run(async (ctx) => { + * return ctx.db.query("users").first(); // fully typed + * }); + * }, + * }); + * ``` + * + * @param handler - A function receiving the mutation context to run inline. + * @param opts - Options for naming the step. + */ + run( + handler: (ctx: GenericMutationCtx) => T | Promise, + opts?: { name?: string }, + ): Promise; - /** - * Blocks until a matching event is sent to this workflow. - * - * If an ID is specified, an event with that ID must already exist and must - * not already be "awaited" or "consumed". - * - * If a name is specified, the first available event is consumed that matches - * the name. If there is no available event, it will create one with that name - * with status "awaited". - * @param event - */ - awaitEvent( - event: ( - | { name: Name; id?: EventId } - | { name?: Name; id: EventId } - ) & { - validator?: Validator; - }, - ): Promise; + /** + * Blocks until a matching event is sent to this workflow. + * + * If an ID is specified, an event with that ID must already exist and must + * not already be "awaited" or "consumed". + * + * If a name is specified, the first available event is consumed that matches + * the name. If there is no available event, it will create one with that name + * with status "awaited". + * @param event + */ + awaitEvent( + event: ( + | { name: Name; id?: EventId } + | { name?: Name; id: EventId } + ) & { + validator?: Validator; + }, + ): Promise; - /** - * Suspend execution for the given duration. - * - * @param duration - The number of milliseconds to sleep. - * @param opts - Optionally name the step. Default: "sleep" - */ - sleep(duration: number, opts?: { name?: string }): Promise; -}; + /** + * Suspend execution for the given duration. + * + * @param duration - The number of milliseconds to sleep. + * @param opts - Optionally name the step. Default: "sleep" + */ + sleep(duration: number, opts?: { name?: string }): Promise; + }; export type OptionalRestArgs< Opts, From f758fad93c1c6e144e080b1a555d88479f5ad284 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:33:30 -0700 Subject: [PATCH 06/11] guard --- src/client/stepContext.test.ts | 114 +++++++++++++++++++++++++++++++++ src/client/workflowContext.ts | 30 ++++++++- 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index 1cb72ad5..efa7c908 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -450,6 +450,120 @@ describe("StepExecutor + WorkflowCtx integration", () => { expect(results).toEqual(["inline-result", "action-result"]); }); + + it("throws when calling step methods inside ctx.run()", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-15" as any, channel); + + // Read the inline message from the channel, invoke its handler (which + // sets the lock), and resolve based on the handler outcome. + const executeInline = async () => { + const message = await channel.get(); + if (message.target.kind !== "inline") throw new Error("expected inline"); + try { + const result = await message.target.handler({} as any); + message.resolve({ kind: "success", returnValue: result }); + } catch (e) { + message.resolve({ + kind: "failed", + error: (e as Error).message, + }); + } + }; + + const [error] = await Promise.all([ + ctx + .run(async () => { + // This should throw — the guard fires before the channel push. + await ctx.runMutation(fakeFuncRef("bad") as any, {}); + }) + .catch((e: Error) => e), + executeInline(), + ]); + + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + }); + + it("ctx.run() works normally after a previous ctx.run() completes", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-16" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "first" }, + stepNumber: 0, + }), + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "second" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + const a = await ctx.run(async () => "first"); + const b = await ctx.run(async () => "second"); + return [a, b]; + }; + + const [results] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(results).toEqual(["first", "second"]); + }); + + it("resets the lock flag even when the inline handler throws", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-17" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "failed", error: "handler error" }, + stepNumber: 0, + }), + journalEntry({ + name: "step2", + args: {}, + runResult: { kind: "success", returnValue: "ok" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + try { + await ctx.run(async () => { + throw new Error("handler error"); + }); + } catch { + // expected + } + // This should work — the lock must have been released by try/finally. + return ctx.runAction(fakeFuncRef("step2") as any, {}); + }; + + const [result] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(result).toBe("ok"); + }); }); describe("unstableArgs", () => { diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index d9535688..4a0c1c5b 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -178,37 +178,59 @@ export function createWorkflowCtx< workflowId: WorkflowId, sender: BaseChannel>, ): WorkflowCtx { + let locked = false; + const guardNotInlined = () => { + if (locked) { + throw new Error( + "Cannot call step methods inside a step.run() handler. " + + "Use the `ctx` argument passed to the handler instead, or " + + "move this call outside of step.run().", + ); + } + }; + return { workflowId, runQuery: async (query, args, opts?) => { + guardNotInlined(); return runFunction(sender, "query", query, args, opts); }, runMutation: async (mutation, args, opts?) => { + guardNotInlined(); return runFunction(sender, "mutation", mutation, args, opts); }, runAction: async (action, args, opts?) => { + guardNotInlined(); return runFunction(sender, "action", action, args, opts); }, run: async (handler, opts?) => { + guardNotInlined(); return run(sender, { name: opts?.name ?? "run", target: { kind: "inline", - handler: handler as unknown as ( - ctx: GenericMutationCtx, - ) => Promise, + handler: async (ctx: GenericMutationCtx) => { + locked = true; + try { + return await handler(ctx); + } finally { + locked = false; + } + }, args: {} as Record, }, retry: undefined, inline: true, schedulerOptions: {}, + unstableArgs: false, }) as any; }, runWorkflow: async (workflow, args, opts?) => { + guardNotInlined(); const { name, unstableArgs, ...schedulerOptions } = opts ?? {}; return run(sender, { name: name ?? safeFunctionName(workflow), @@ -225,6 +247,7 @@ export function createWorkflowCtx< }, sleep: async (duration, opts?) => { + guardNotInlined(); await run(sender, { name: opts?.name ?? "sleep", target: { @@ -239,6 +262,7 @@ export function createWorkflowCtx< }, awaitEvent: async (event) => { + guardNotInlined(); const result = await run(sender, { name: event.name ?? event.id ?? "Event", target: { From 7ae937f379d62ceb65cd0d596c03bf8663ef1121 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:40:50 -0700 Subject: [PATCH 07/11] add example using scheduler to timeout an event --- example/convex/_generated/api.d.ts | 2 + example/convex/eventTimeout.ts | 130 +++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 example/convex/eventTimeout.ts diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 3c3163ae..3be30f6b 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -11,6 +11,7 @@ import type * as admin from "../admin.js"; import type * as catchError from "../catchError.js"; import type * as e2e from "../e2e.js"; +import type * as eventTimeout from "../eventTimeout.js"; import type * as example from "../example.js"; import type * as inlineTest from "../inlineTest.js"; import type * as nestedWorkflow from "../nestedWorkflow.js"; @@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{ admin: typeof admin; catchError: typeof catchError; e2e: typeof e2e; + eventTimeout: typeof eventTimeout; example: typeof example; inlineTest: typeof inlineTest; nestedWorkflow: typeof nestedWorkflow; diff --git a/example/convex/eventTimeout.ts b/example/convex/eventTimeout.ts new file mode 100644 index 00000000..9bcb6dea --- /dev/null +++ b/example/convex/eventTimeout.ts @@ -0,0 +1,130 @@ +import { + defineEvent, + type EventId, + vEventId, + vWorkflowId, + WorkflowManager, + type WorkflowId, +} from "@convex-dev/workflow"; +import { v } from "convex/values"; +import { components, internal } from "./_generated/api"; +import { internalMutation } from "./_generated/server"; + +const workflow = new WorkflowManager(components.workflow); + +const approvalEvent = defineEvent({ + name: "approval", + validator: v.union( + v.object({ kind: v.literal("approved") }), + v.object({ kind: v.literal("timeout") }), + ), +}); + +/** + * A workflow that waits for an approval event but times out after 30 seconds. + * + * It uses `step.run()` to schedule a timeout function via `ctx.scheduler`, + * then awaits the event. If the event resolves with a real approval, the + * timeout function is canceled. If the timeout fires first, the workflow + * handles it gracefully. + */ +export const eventTimeoutWorkflow = workflow.define({ + args: {}, + returns: v.string(), + internalMutation, + handler: async (step): Promise => { + // 1. Create the event so we have an ID to pass to the timeout function. + const eventId = await step.runMutation( + internal.eventTimeout.createApprovalEvent, + { workflowId: step.workflowId }, + ); + + // 2. Schedule a function that will complete the event with { kind: "timeout" } + // after 30 seconds, unless it's canceled first. + const scheduledFnId = await step.run( + async (ctx) => { + return ctx.scheduler.runAfter( + 30_000, + internal.eventTimeout.timeoutEvent, + { eventId }, + ); + }, + { name: "scheduleTimeout" }, + ); + + // 3. Wait for the event — either a real approval or the timeout. + const result = await step.awaitEvent({ ...approvalEvent, id: eventId }); + + // 4. If we got a real approval, cancel the scheduled timeout function. + if (result.kind === "approved") { + await step.run( + async (ctx) => { + const scheduled = await ctx.db.system.get(scheduledFnId); + if (scheduled?.state.kind === "pending") + await ctx.scheduler.cancel(scheduledFnId); + }, + { name: "cancelTimeout" }, + ); + return "approved"; + } + + return "timed out"; + }, +}); + +// ── Helper mutations ────────────────────────── + +export const createApprovalEvent = internalMutation({ + args: { workflowId: vWorkflowId }, + returns: vEventId("approval"), + handler: async (ctx, args): Promise> => { + return await workflow.createEvent(ctx, { + name: "approval", + workflowId: args.workflowId, + }); + }, +}); + +export const timeoutEvent = internalMutation({ + args: { eventId: vEventId("approval") }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, { + ...approvalEvent, + id: args.eventId, + value: { kind: "timeout" }, + }); + }, +}); + +export const approve = internalMutation({ + args: { eventId: vEventId("approval") }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, { + ...approvalEvent, + id: args.eventId, + value: { kind: "approved" }, + }); + }, +}); + +/** + * Test this from the CLI: + * ```sh + * npx convex run eventTimeout:startEventTimeout + * ``` + * Then either approve before 30s: + * ```sh + * npx convex run eventTimeout:approve '{"eventId":"..."}' + * ``` + * Or wait 30s for the timeout to fire automatically. + */ +export const startEventTimeout = internalMutation({ + args: {}, + handler: async (ctx): Promise => { + return await workflow.start( + ctx, + internal.eventTimeout.eventTimeoutWorkflow, + {}, + ); + }, +}); From 1afbaee177e0ed8aaf24dde3f1cd3b7b51e06b94 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:42:36 -0700 Subject: [PATCH 08/11] handle parallel step.run executions --- src/client/stepContext.test.ts | 82 ++++++++++++++++++++++++++++++++++ src/client/workflowContext.ts | 8 ++-- 2 files changed, 86 insertions(+), 4 deletions(-) diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index efa7c908..d27a0bd8 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -564,6 +564,88 @@ describe("StepExecutor + WorkflowCtx integration", () => { expect(result).toBe("ok"); }); + + it("parallel step.run() calls via Promise.all work correctly", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-18" as any, channel); + + // Simulate the executor batching and running both inline handlers + // concurrently (as the real executor does via Promise.all). + const executeInlines = async () => { + const msg1 = await channel.get(); + const msg2 = await channel.get(); + // Run both handlers concurrently, just like the real executor. + await Promise.all( + [msg1, msg2].map(async (msg) => { + if (msg.target.kind !== "inline") + throw new Error("expected inline"); + try { + const result = await msg.target.handler({} as any); + msg.resolve({ kind: "success", returnValue: result }); + } catch (e) { + msg.resolve({ kind: "failed", error: (e as Error).message }); + } + }), + ); + }; + + const [results] = await Promise.all([ + Promise.all([ + ctx.run(async () => "a"), + ctx.run(async () => "b"), + ]), + executeInlines(), + ]); + + expect(results).toEqual(["a", "b"]); + }); + + it("guard still fires inside each parallel step.run() handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-19" as any, channel); + + const executeInlines = async () => { + const msg1 = await channel.get(); + const msg2 = await channel.get(); + await Promise.all( + [msg1, msg2].map(async (msg) => { + if (msg.target.kind !== "inline") + throw new Error("expected inline"); + try { + const result = await msg.target.handler({} as any); + msg.resolve({ kind: "success", returnValue: result }); + } catch (e) { + msg.resolve({ kind: "failed", error: (e as Error).message }); + } + }), + ); + }; + + const [errors] = await Promise.all([ + Promise.all([ + ctx + .run(async () => { + await ctx.runMutation(fakeFuncRef("bad1") as any, {}); + }) + .catch((e: Error) => e), + ctx + .run(async () => { + await ctx.runMutation(fakeFuncRef("bad2") as any, {}); + }) + .catch((e: Error) => e), + ]), + executeInlines(), + ]); + + expect(errors[0]).toBeInstanceOf(Error); + expect((errors[0] as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + expect(errors[1]).toBeInstanceOf(Error); + expect((errors[1] as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + }); }); describe("unstableArgs", () => { diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 4a0c1c5b..f189894b 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -178,9 +178,9 @@ export function createWorkflowCtx< workflowId: WorkflowId, sender: BaseChannel>, ): WorkflowCtx { - let locked = false; + let inlineDepth = 0; const guardNotInlined = () => { - if (locked) { + if (inlineDepth > 0) { throw new Error( "Cannot call step methods inside a step.run() handler. " + "Use the `ctx` argument passed to the handler instead, or " + @@ -213,11 +213,11 @@ export function createWorkflowCtx< target: { kind: "inline", handler: async (ctx: GenericMutationCtx) => { - locked = true; + inlineDepth++; try { return await handler(ctx); } finally { - locked = false; + inlineDepth--; } }, args: {} as Record, From ed25c97d775653cf84d85ef89628e1c55baabcd4 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 3 Apr 2026 18:36:12 -0700 Subject: [PATCH 09/11] allow specifying deps for inline run step --- src/client/step.ts | 2 +- src/client/stepContext.test.ts | 64 ++++++++++++++++++++++++++++++++++ src/client/workflowContext.ts | 16 +++++++-- 3 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/client/step.ts b/src/client/step.ts index b9a059f0..0bf6118d 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -48,7 +48,7 @@ export type StepRequest = { | { kind: "inline"; handler: (ctx: GenericMutationCtx) => Promise; - args: Record; + args: Record; }; retry: RetryBehavior | boolean | undefined; inline: boolean; diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index d27a0bd8..be7fbd28 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -646,6 +646,70 @@ describe("StepExecutor + WorkflowCtx integration", () => { /Cannot call step methods inside a step\.run\(\) handler/, ); }); + + it("ctx.run() journals deps and replays when they match", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-20" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: { userId: "u1", count: 3 }, + runResult: { kind: "success", returnValue: "done" }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => "done", { + deps: { userId: "u1", count: 3 }, + }), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe("done"); + }); + + it("ctx.run() sends deps through the channel as args", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-21" as any, channel); + + const deps = { userId: "u1", count: 3 }; + + // Read the message from the channel and verify the args match deps. + const inspectMessage = async () => { + const message = await channel.get(); + expect(message.target.args).toEqual(deps); + expect(message.target.kind).toBe("inline"); + message.resolve({ kind: "success", returnValue: "ok" }); + }; + + const [result] = await Promise.all([ + ctx.run(async () => "ok", { deps }), + inspectMessage(), + ]); + + expect(result).toBe("ok"); + }); + + it("ctx.run() without deps still journals empty args", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-22" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: 42 }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => 42), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe(42); + }); }); describe("unstableArgs", () => { diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index f189894b..e8498ee1 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -128,11 +128,21 @@ export type WorkflowCtx = * ``` * * @param handler - A function receiving the mutation context to run inline. - * @param opts - Options for naming the step. + * @param opts - Options for naming the step and declaring dependencies. */ run( handler: (ctx: GenericMutationCtx) => T | Promise, - opts?: { name?: string }, + opts?: { + name?: string; + /** + * Dependencies that are validated and journaled as part of this step. + * On replay, the saved deps are compared against the current deps — + * if they differ, the workflow detects a mismatch and re-executes. + * Use this to capture values from the enclosing scope that the handler + * depends on. + */ + deps?: Record; + }, ): Promise; /** @@ -220,7 +230,7 @@ export function createWorkflowCtx< inlineDepth--; } }, - args: {} as Record, + args: opts?.deps ?? {}, }, retry: undefined, inline: true, From f12a2385c98e6d85449f7a8a695a70e8f4ce69db Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 10 Apr 2026 02:09:12 -0700 Subject: [PATCH 10/11] have internalMutation be a function on WorkflowManager --- example/convex/eventTimeout.ts | 5 ++- example/convex/example.ts | 5 ++- src/client/index.ts | 68 ++++++++++++++++------------------ src/client/workflowContext.ts | 9 +---- src/client/workflowMutation.ts | 4 +- 5 files changed, 43 insertions(+), 48 deletions(-) diff --git a/example/convex/eventTimeout.ts b/example/convex/eventTimeout.ts index 9bcb6dea..2137e3ea 100644 --- a/example/convex/eventTimeout.ts +++ b/example/convex/eventTimeout.ts @@ -10,7 +10,9 @@ import { v } from "convex/values"; import { components, internal } from "./_generated/api"; import { internalMutation } from "./_generated/server"; -const workflow = new WorkflowManager(components.workflow); +const workflow = new WorkflowManager(components.workflow, { + internalMutation, +}); const approvalEvent = defineEvent({ name: "approval", @@ -31,7 +33,6 @@ const approvalEvent = defineEvent({ export const eventTimeoutWorkflow = workflow.define({ args: {}, returns: v.string(), - internalMutation, handler: async (step): Promise => { // 1. Create the event so we have an ID to pass to the timeout function. const eventId = await step.runMutation( diff --git a/example/convex/example.ts b/example/convex/example.ts index e3f7026e..05222345 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -6,14 +6,15 @@ import { components } from "./_generated/api.js"; import { vWorkflowId } from "@convex-dev/workflow"; import { vResultValidator } from "@convex-dev/workpool"; -export const workflow = new WorkflowManager(components.workflow); +export const workflow = new WorkflowManager(components.workflow, { + internalMutation, +}); export const myWorkflow = workflow .define({ args: { location: v.string(), }, - internalMutation, workpoolOptions: { retryActionsByDefault: true, }, diff --git a/src/client/index.ts b/src/client/index.ts index 79dc505b..e33b0ad0 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -78,36 +78,16 @@ export type CallbackOptions = { export type WorkflowDefinition< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void = any, - DataModel extends GenericDataModel = GenericDataModel, > = { args?: ArgsValidator; returns?: ReturnsValidator; workpoolOptions?: WorkpoolRetryOptions; - /** - * Provide your app's `internalMutation` (from `_generated/server`) to get - * a fully typed `ctx` in `ctx.run()` handlers, with your data model's - * tables available on `ctx.db`. This also lets any custom middleware - * you've configured run around the workflow. - * - * ```ts - * import { internalMutation } from "./_generated/server"; - * workflow.define({ - * internalMutation, - * handler: async (ctx, args) => { - * const user = await ctx.run(async (ctx) => { - * return ctx.db.query("users").first(); // fully typed - * }); - * }, - * }); - * ``` - */ - internalMutation?: MutationBuilder; }; export type WorkflowHandler< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, - DataModel extends GenericDataModel, + DataModel extends GenericDataModel = GenericDataModel, > = ( step: WorkflowCtx, args: ObjectType, @@ -142,7 +122,9 @@ export function defineWorkflow< DM extends GenericDataModel = GenericDataModel, >( component: WorkflowComponent, - config: WorkflowDefinition, + config: WorkflowDefinition & { + internalMutation?: MutationBuilder; + }, ): { /** * Define the workflow handler function. @@ -479,11 +461,27 @@ export async function cleanup( }); } -export class WorkflowManager { +export class WorkflowManager< + DataModel extends GenericDataModel = GenericDataModel, +> { constructor( public component: WorkflowComponent, public options?: { - workpoolOptions: WorkpoolOptions; + workpoolOptions?: WorkpoolOptions; + /** + * Provide your app's `internalMutation` (from `_generated/server`) to get + * a fully typed `ctx` in `step.run()` handlers, with your data model's + * tables available on `ctx.db`. This also lets any custom middleware + * you've configured run around the workflow. + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * const workflow = new WorkflowManager(components.workflow, { + * internalMutation, + * }); + * ``` + */ + internalMutation?: MutationBuilder; }, ) {} @@ -496,9 +494,8 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, - DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition & { + workflow: WorkflowDefinition & { handler: WorkflowHandler; }, ): RegisteredMutation< @@ -509,9 +506,8 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, - DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition, + workflow: WorkflowDefinition, ): { /** * Define the workflow handler function. @@ -531,21 +527,21 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, - DataModel extends GenericDataModel = GenericDataModel, >( - workflow: WorkflowDefinition & { + workflow: WorkflowDefinition & { handler?: WorkflowHandler; }, ): unknown { + const withMutation = { + ...workflow, + internalMutation: this.options?.internalMutation, + }; if (workflow.handler) { return workflowMutation( this.component, - workflow as WorkflowDefinition< - ArgsValidator, - ReturnsValidator, - DataModel - > & { + withMutation as WorkflowDefinition & { handler: WorkflowHandler; + internalMutation?: MutationBuilder; }, this.options?.workpoolOptions, ); @@ -557,7 +553,7 @@ export class WorkflowManager { return defineWorkflow( this.component, { - ...workflow, + ...withMutation, workpoolOptions: { ...this.options?.workpoolOptions, ...workflow.workpoolOptions, diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index e8498ee1..0bc8d7eb 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -113,17 +113,12 @@ export type WorkflowCtx = * value will not be replayed. * * To get a fully typed `ctx` with your data model, provide your app's - * `internalMutation` in the workflow definition: + * `internalMutation` when creating the `WorkflowManager`: * * ```ts * import { internalMutation } from "./_generated/server"; - * workflow.define({ + * const workflow = new WorkflowManager(components.workflow, { * internalMutation, - * handler: async (ctx, args) => { - * const user = await ctx.run(async (ctx) => { - * return ctx.db.query("users").first(); // fully typed - * }); - * }, * }); * ``` * diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 51edfa0e..2b281f1d 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -6,6 +6,7 @@ import { internalMutationGeneric, makeFunctionReference, type GenericDataModel, + type MutationBuilder, type RegisteredMutation, } from "convex/server"; import { @@ -53,8 +54,9 @@ export function workflowMutation< DataModel extends GenericDataModel = GenericDataModel, >( component: WorkflowComponent, - registered: WorkflowDefinition & { + registered: WorkflowDefinition & { handler: WorkflowHandler; + internalMutation?: MutationBuilder; }, defaultWorkpoolOptions?: WorkpoolOptions, boundFn?: string, From 03e100aca08e52e4a61b989371ec06c893cbcacc Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 15 Apr 2026 21:45:52 -0700 Subject: [PATCH 11/11] unstableArgs for inline --- src/client/workflowContext.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 0bc8d7eb..5be8be1d 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -135,6 +135,8 @@ export type WorkflowCtx = * if they differ, the workflow detects a mismatch and re-executes. * Use this to capture values from the enclosing scope that the handler * depends on. + * If you pass {}, it will not check for dependency mismatches, akin to + * unstableArgs in step.run* */ deps?: Record; }, @@ -213,6 +215,10 @@ export function createWorkflowCtx< run: async (handler, opts?) => { guardNotInlined(); + // allow {} to behave like unstableArgs + const unstableArgs = + typeof opts?.deps === "object" && Object.keys(opts.deps).length === 0; + return run(sender, { name: opts?.name ?? "run", target: { @@ -230,7 +236,7 @@ export function createWorkflowCtx< retry: undefined, inline: true, schedulerOptions: {}, - unstableArgs: false, + unstableArgs, }) as any; },