diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 3c3163a..3be30f6 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -11,6 +11,7 @@ import type * as admin from "../admin.js"; import type * as catchError from "../catchError.js"; import type * as e2e from "../e2e.js"; +import type * as eventTimeout from "../eventTimeout.js"; import type * as example from "../example.js"; import type * as inlineTest from "../inlineTest.js"; import type * as nestedWorkflow from "../nestedWorkflow.js"; @@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{ admin: typeof admin; catchError: typeof catchError; e2e: typeof e2e; + eventTimeout: typeof eventTimeout; example: typeof example; inlineTest: typeof inlineTest; nestedWorkflow: typeof nestedWorkflow; diff --git a/example/convex/eventTimeout.ts b/example/convex/eventTimeout.ts new file mode 100644 index 0000000..2137e3e --- /dev/null +++ b/example/convex/eventTimeout.ts @@ -0,0 +1,131 @@ +import { + defineEvent, + type EventId, + vEventId, + vWorkflowId, + WorkflowManager, + type WorkflowId, +} from "@convex-dev/workflow"; +import { v } from "convex/values"; +import { components, internal } from "./_generated/api"; +import { internalMutation } from "./_generated/server"; + +const workflow = new WorkflowManager(components.workflow, { + internalMutation, +}); + +const approvalEvent = defineEvent({ + name: "approval", + validator: v.union( + v.object({ kind: v.literal("approved") }), + v.object({ kind: v.literal("timeout") }), + ), +}); + +/** + * A workflow that waits for an approval event but times out after 30 seconds. + * + * It uses `step.run()` to schedule a timeout function via `ctx.scheduler`, + * then awaits the event. If the event resolves with a real approval, the + * timeout function is canceled. If the timeout fires first, the workflow + * handles it gracefully. + */ +export const eventTimeoutWorkflow = workflow.define({ + args: {}, + returns: v.string(), + handler: async (step): Promise => { + // 1. Create the event so we have an ID to pass to the timeout function. + const eventId = await step.runMutation( + internal.eventTimeout.createApprovalEvent, + { workflowId: step.workflowId }, + ); + + // 2. Schedule a function that will complete the event with { kind: "timeout" } + // after 30 seconds, unless it's canceled first. + const scheduledFnId = await step.run( + async (ctx) => { + return ctx.scheduler.runAfter( + 30_000, + internal.eventTimeout.timeoutEvent, + { eventId }, + ); + }, + { name: "scheduleTimeout" }, + ); + + // 3. Wait for the event — either a real approval or the timeout. + const result = await step.awaitEvent({ ...approvalEvent, id: eventId }); + + // 4. If we got a real approval, cancel the scheduled timeout function. + if (result.kind === "approved") { + await step.run( + async (ctx) => { + const scheduled = await ctx.db.system.get(scheduledFnId); + if (scheduled?.state.kind === "pending") + await ctx.scheduler.cancel(scheduledFnId); + }, + { name: "cancelTimeout" }, + ); + return "approved"; + } + + return "timed out"; + }, +}); + +// ── Helper mutations ────────────────────────── + +export const createApprovalEvent = internalMutation({ + args: { workflowId: vWorkflowId }, + returns: vEventId("approval"), + handler: async (ctx, args): Promise> => { + return await workflow.createEvent(ctx, { + name: "approval", + workflowId: args.workflowId, + }); + }, +}); + +export const timeoutEvent = internalMutation({ + args: { eventId: vEventId("approval") }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, { + ...approvalEvent, + id: args.eventId, + value: { kind: "timeout" }, + }); + }, +}); + +export const approve = internalMutation({ + args: { eventId: vEventId("approval") }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, { + ...approvalEvent, + id: args.eventId, + value: { kind: "approved" }, + }); + }, +}); + +/** + * Test this from the CLI: + * ```sh + * npx convex run eventTimeout:startEventTimeout + * ``` + * Then either approve before 30s: + * ```sh + * npx convex run eventTimeout:approve '{"eventId":"..."}' + * ``` + * Or wait 30s for the timeout to fire automatically. + */ +export const startEventTimeout = internalMutation({ + args: {}, + handler: async (ctx): Promise => { + return await workflow.start( + ctx, + internal.eventTimeout.eventTimeoutWorkflow, + {}, + ); + }, +}); diff --git a/example/convex/example.ts b/example/convex/example.ts index c83cb94..0522234 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -6,7 +6,9 @@ import { components } from "./_generated/api.js"; import { vWorkflowId } from "@convex-dev/workflow"; import { vResultValidator } from "@convex-dev/workpool"; -export const workflow = new WorkflowManager(components.workflow); +export const workflow = new WorkflowManager(components.workflow, { + internalMutation, +}); export const myWorkflow = workflow .define({ @@ -52,9 +54,16 @@ export const myWorkflow = workflow console.timeLog("weather", temperature); // Wait a beat before writing the result. await step.sleep(100, { name: "cooldown" }); - await step.runMutation(internal.example.updateFlow, { - workflowId: step.workflowId, - out: { name, celsius, farenheit, windSpeed, windGust }, + await step.run(async (ctx) => { + const flow = await ctx.db + .query("flows") + .withIndex("workflowId", (q) => q.eq("workflowId", step.workflowId)) + .first(); + if (flow) { + await ctx.db.patch("flows", flow._id, { + out: { name, celsius, farenheit, windSpeed, windGust }, + }); + } }); console.timeEnd("overall"); return { name, celsius, farenheit, windSpeed, windGust }; @@ -173,24 +182,3 @@ export const getWeather = internalAction({ }; }, }); - -export const updateFlow = internalMutation({ - args: { - workflowId: vWorkflowId, - out: v.any(), - }, - returns: v.null(), - handler: async (ctx, args) => { - const flow = await ctx.db - .query("flows") - .withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId)) - .first(); - if (!flow) { - console.warn(`Flow not found: ${args.workflowId}`); - return; - } - await ctx.db.patch("flows", flow._id, { - out: args.out, - }); - }, -}); diff --git a/src/client/index.ts b/src/client/index.ts index b75e59c..e33b0ad 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -12,6 +12,7 @@ import { type GenericDataModel, type GenericMutationCtx, type GenericQueryCtx, + type MutationBuilder, type PaginationOptions, type PaginationResult, type RegisteredMutation, @@ -86,8 +87,9 @@ export type WorkflowDefinition< export type WorkflowHandler< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, + DataModel extends GenericDataModel = GenericDataModel, > = ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>; @@ -117,9 +119,12 @@ export type WorkflowStatus = export function defineWorkflow< AV extends PropertyValidators, RV extends Validator | void = void, + DM extends GenericDataModel = GenericDataModel, >( component: WorkflowComponent, - config: WorkflowDefinition, + config: WorkflowDefinition & { + internalMutation?: MutationBuilder; + }, ): { /** * Define the workflow handler function. @@ -127,7 +132,7 @@ export function defineWorkflow< */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -145,6 +150,7 @@ export function defineWorkflow< export interface Workflow< AV extends PropertyValidators, RV extends Validator | void, + DM extends GenericDataModel = GenericDataModel, > { /** * Define the workflow handler function. @@ -152,7 +158,7 @@ export interface Workflow< */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -455,11 +461,27 @@ export async function cleanup( }); } -export class WorkflowManager { +export class WorkflowManager< + DataModel extends GenericDataModel = GenericDataModel, +> { constructor( public component: WorkflowComponent, public options?: { - workpoolOptions: WorkpoolOptions; + workpoolOptions?: WorkpoolOptions; + /** + * Provide your app's `internalMutation` (from `_generated/server`) to get + * a fully typed `ctx` in `step.run()` handlers, with your data model's + * tables available on `ctx.db`. This also lets any custom middleware + * you've configured run around the workflow. + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * const workflow = new WorkflowManager(components.workflow, { + * internalMutation, + * }); + * ``` + */ + internalMutation?: MutationBuilder; }, ) {} @@ -474,7 +496,7 @@ export class WorkflowManager { ReturnsValidator extends Validator | void, >( workflow: WorkflowDefinition & { - handler: WorkflowHandler; + handler: WorkflowHandler; }, ): RegisteredMutation< "internal", @@ -493,7 +515,7 @@ export class WorkflowManager { */ handler( fn: ( - step: WorkflowCtx, + step: WorkflowCtx, args: ObjectType, ) => Promise>, ): RegisteredMutation< @@ -507,14 +529,19 @@ export class WorkflowManager { ReturnsValidator extends Validator | void, >( workflow: WorkflowDefinition & { - handler?: WorkflowHandler; + handler?: WorkflowHandler; }, ): unknown { + const withMutation = { + ...workflow, + internalMutation: this.options?.internalMutation, + }; if (workflow.handler) { return workflowMutation( this.component, - workflow as WorkflowDefinition & { - handler: WorkflowHandler; + withMutation as WorkflowDefinition & { + handler: WorkflowHandler; + internalMutation?: MutationBuilder; }, this.options?.workpoolOptions, ); @@ -523,13 +550,16 @@ export class WorkflowManager { // to support, in order to get the maxParallelism / etc. in there. // Direct users of defineWorkflow should instead configure those values // via configuring the component directly. - return defineWorkflow(this.component, { - ...workflow, - workpoolOptions: { - ...this.options?.workpoolOptions, - ...workflow.workpoolOptions, + return defineWorkflow( + this.component, + { + ...withMutation, + workpoolOptions: { + ...this.options?.workpoolOptions, + ...workflow.workpoolOptions, + }, }, - }); + ); } /** diff --git a/src/client/step.ts b/src/client/step.ts index 3a4e5b3..0bf6118 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -23,7 +23,7 @@ export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } | { type: "executorBlocked" }; -export type StepRequest = { +export type StepRequest = { name: string; target: | { @@ -44,6 +44,11 @@ export type StepRequest = { | { kind: "sleep"; args: Record; + } + | { + kind: "inline"; + handler: (ctx: GenericMutationCtx) => Promise; + args: Record; }; retry: RetryBehavior | boolean | undefined; inline: boolean; @@ -53,16 +58,16 @@ export type StepRequest = { resolve: (result: RunResult) => void; }; -export class StepExecutor { +export class StepExecutor { private journalEntrySize: number; constructor( private workflowId: string, private generationNumber: number, - private ctx: GenericMutationCtx, + private ctx: GenericMutationCtx, private component: WorkflowComponent, private journalEntries: Array, - private receiver: BaseChannel, + private receiver: BaseChannel>, private now: number, private workpoolOptions: WorkpoolOptions | undefined, ) { @@ -120,7 +125,7 @@ export class StepExecutor { }; } - completeMessage(message: StepRequest, entry: JournalEntry) { + completeMessage(message: StepRequest, entry: JournalEntry) { if (entry.step.inProgress) { throw new Error( `Assertion failed: not blocked but have in-progress journal entry`, @@ -130,13 +135,17 @@ export class StepExecutor { entry.step, message.unstableArgs ? ["name", "kind"] : ["name", "kind", "args"], ); - const messageFields = message.unstableArgs - ? { name: message.name, kind: message.target.kind } - : { - name: message.name, - kind: message.target.kind, - args: message.target.args as Value, - }; + const messageFields = { + name: message.name, + kind: message.target.kind, + args: message.target.args as Value | undefined, + }; + if (message.unstableArgs) { + delete messageFields.args; + } + if (message.target.kind === "inline") { + messageFields.kind = "function"; + } const stepJson = JSON.stringify(convexToJson(stepFields)); const messageJson = JSON.stringify(convexToJson(messageFields)); if (stepJson !== messageJson) { @@ -152,7 +161,9 @@ export class StepExecutor { message.resolve(entry.step.runResult); } - async startSteps(messages: StepRequest[]): Promise { + async startSteps( + messages: StepRequest[], + ): Promise { const steps = await Promise.all( messages.map(async (message) => { const args = message.target.args ?? {}; @@ -160,30 +171,47 @@ export class StepExecutor { let runResult: RunResult | undefined; if (message.inline) { - if (target.kind !== "function" || target.functionType === "action") { + if (target.kind === "inline") { + try { + const returnValue = (await target.handler(this.ctx)) ?? null; + runResult = { kind: "success", returnValue }; + } catch (error: unknown) { + runResult = { + kind: "failed", + error: formatErrorWithStack(error), + }; + } + } else if ( + target.kind === "function" && + target.functionType !== "action" + ) { + try { + const result = + target.functionType === "query" + ? await this.ctx.runQuery( + target.function as FunctionReference< + typeof target.functionType + >, + target.args, + ) + : await this.ctx.runMutation( + target.function as FunctionReference< + typeof target.functionType + >, + target.args, + ); + runResult = { kind: "success", returnValue: result ?? null }; + } catch (error: unknown) { + runResult = { + kind: "failed", + error: formatErrorWithStack(error), + }; + } + } else { throw new Error( - "Inline execution is only supported for queries and mutations.", + "Inline execution is only supported for queries, mutations, and inline handlers.", ); } - try { - const result = - target.functionType === "query" - ? await this.ctx.runQuery( - target.function as FunctionReference< - typeof target.functionType - >, - target.args, - ) - : await this.ctx.runMutation( - target.function as FunctionReference< - typeof target.functionType - >, - target.args, - ); - runResult = { kind: "success", returnValue: result ?? null }; - } catch (error: unknown) { - runResult = { kind: "failed", error: formatErrorWithStack(error) }; - } } const commonFields = { @@ -226,8 +254,18 @@ export class StepExecutor { ...commonFields, }; break; - default: + case "inline": + step = { + kind: "function", + functionType: "mutation", + handle: "inline", + ...commonFields, + }; + break; + default: { + const _: never = target; throw new Error(`Unknown step kind: ${(target as any).kind}`); + } } return { retry: message.retry, diff --git a/src/client/stepContext.test.ts b/src/client/stepContext.test.ts index e8476f5..be7fbd2 100644 --- a/src/client/stepContext.test.ts +++ b/src/client/stepContext.test.ts @@ -22,6 +22,8 @@ function journalEntry( overrides: { name?: string; kind?: "function" | "workflow" | "event"; + functionType?: "query" | "mutation" | "action"; + handle?: string; args?: Record; runResult?: RunResult; stepNumber?: number; @@ -51,8 +53,8 @@ function journalEntry( ...base, step: { kind: "function", - functionType: "action", - handle: "handle", + functionType: overrides.functionType ?? "action", + handle: overrides.handle ?? "handle", ...stepCommon, }, } as unknown as JournalEntry; @@ -344,6 +346,370 @@ describe("StepExecutor + WorkflowCtx integration", () => { expect(result).toEqual([1, 2, 3]); }); + + it("ctx.run replays a successful inline handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-11" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: 99 }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => 99), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe(99); + }); + + it("ctx.run replays a failed inline handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-12" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "failed", error: "inline boom" }, + }); + + const [error] = await Promise.all([ + ctx + .run(async () => { + throw new Error("inline boom"); + }) + .catch((e: Error) => e), + replayFromJournal(channel, [entry]), + ]); + + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("inline boom"); + }); + + it("ctx.run uses custom name when provided", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-13" as any, channel); + + const entry = journalEntry({ + name: "myCustomStep", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "named" }, + }); + + const handler = async () => { + return ctx.run(async () => "named", { name: "myCustomStep" }); + }; + + const [result] = await Promise.all([ + handler(), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe("named"); + }); + + it("ctx.run works sequentially with other steps", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-14" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "inline-result" }, + stepNumber: 0, + }), + journalEntry({ + name: "step2", + args: {}, + runResult: { kind: "success", returnValue: "action-result" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + const a = await ctx.run(async () => "inline-result"); + const b = await ctx.runAction(fakeFuncRef("step2") as any, {}); + return [a, b]; + }; + + const [results] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(results).toEqual(["inline-result", "action-result"]); + }); + + it("throws when calling step methods inside ctx.run()", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-15" as any, channel); + + // Read the inline message from the channel, invoke its handler (which + // sets the lock), and resolve based on the handler outcome. + const executeInline = async () => { + const message = await channel.get(); + if (message.target.kind !== "inline") throw new Error("expected inline"); + try { + const result = await message.target.handler({} as any); + message.resolve({ kind: "success", returnValue: result }); + } catch (e) { + message.resolve({ + kind: "failed", + error: (e as Error).message, + }); + } + }; + + const [error] = await Promise.all([ + ctx + .run(async () => { + // This should throw — the guard fires before the channel push. + await ctx.runMutation(fakeFuncRef("bad") as any, {}); + }) + .catch((e: Error) => e), + executeInline(), + ]); + + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + }); + + it("ctx.run() works normally after a previous ctx.run() completes", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-16" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "first" }, + stepNumber: 0, + }), + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: "second" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + const a = await ctx.run(async () => "first"); + const b = await ctx.run(async () => "second"); + return [a, b]; + }; + + const [results] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(results).toEqual(["first", "second"]); + }); + + it("resets the lock flag even when the inline handler throws", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-17" as any, channel); + + const entries = [ + journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "failed", error: "handler error" }, + stepNumber: 0, + }), + journalEntry({ + name: "step2", + args: {}, + runResult: { kind: "success", returnValue: "ok" }, + stepNumber: 1, + }), + ]; + + const handler = async () => { + try { + await ctx.run(async () => { + throw new Error("handler error"); + }); + } catch { + // expected + } + // This should work — the lock must have been released by try/finally. + return ctx.runAction(fakeFuncRef("step2") as any, {}); + }; + + const [result] = await Promise.all([ + handler(), + replayFromJournal(channel, entries), + ]); + + expect(result).toBe("ok"); + }); + + it("parallel step.run() calls via Promise.all work correctly", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-18" as any, channel); + + // Simulate the executor batching and running both inline handlers + // concurrently (as the real executor does via Promise.all). + const executeInlines = async () => { + const msg1 = await channel.get(); + const msg2 = await channel.get(); + // Run both handlers concurrently, just like the real executor. + await Promise.all( + [msg1, msg2].map(async (msg) => { + if (msg.target.kind !== "inline") + throw new Error("expected inline"); + try { + const result = await msg.target.handler({} as any); + msg.resolve({ kind: "success", returnValue: result }); + } catch (e) { + msg.resolve({ kind: "failed", error: (e as Error).message }); + } + }), + ); + }; + + const [results] = await Promise.all([ + Promise.all([ + ctx.run(async () => "a"), + ctx.run(async () => "b"), + ]), + executeInlines(), + ]); + + expect(results).toEqual(["a", "b"]); + }); + + it("guard still fires inside each parallel step.run() handler", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-19" as any, channel); + + const executeInlines = async () => { + const msg1 = await channel.get(); + const msg2 = await channel.get(); + await Promise.all( + [msg1, msg2].map(async (msg) => { + if (msg.target.kind !== "inline") + throw new Error("expected inline"); + try { + const result = await msg.target.handler({} as any); + msg.resolve({ kind: "success", returnValue: result }); + } catch (e) { + msg.resolve({ kind: "failed", error: (e as Error).message }); + } + }), + ); + }; + + const [errors] = await Promise.all([ + Promise.all([ + ctx + .run(async () => { + await ctx.runMutation(fakeFuncRef("bad1") as any, {}); + }) + .catch((e: Error) => e), + ctx + .run(async () => { + await ctx.runMutation(fakeFuncRef("bad2") as any, {}); + }) + .catch((e: Error) => e), + ]), + executeInlines(), + ]); + + expect(errors[0]).toBeInstanceOf(Error); + expect((errors[0] as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + expect(errors[1]).toBeInstanceOf(Error); + expect((errors[1] as Error).message).toMatch( + /Cannot call step methods inside a step\.run\(\) handler/, + ); + }); + + it("ctx.run() journals deps and replays when they match", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-20" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: { userId: "u1", count: 3 }, + runResult: { kind: "success", returnValue: "done" }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => "done", { + deps: { userId: "u1", count: 3 }, + }), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe("done"); + }); + + it("ctx.run() sends deps through the channel as args", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-21" as any, channel); + + const deps = { userId: "u1", count: 3 }; + + // Read the message from the channel and verify the args match deps. + const inspectMessage = async () => { + const message = await channel.get(); + expect(message.target.args).toEqual(deps); + expect(message.target.kind).toBe("inline"); + message.resolve({ kind: "success", returnValue: "ok" }); + }; + + const [result] = await Promise.all([ + ctx.run(async () => "ok", { deps }), + inspectMessage(), + ]); + + expect(result).toBe("ok"); + }); + + it("ctx.run() without deps still journals empty args", async () => { + const channel = new BaseChannel(0); + const ctx = createWorkflowCtx("wf-22" as any, channel); + + const entry = journalEntry({ + name: "run", + functionType: "mutation", + handle: "inline", + args: {}, + runResult: { kind: "success", returnValue: 42 }, + }); + + const [result] = await Promise.all([ + ctx.run(async () => 42), + replayFromJournal(channel, [entry]), + ]); + + expect(result).toBe(42); + }); }); describe("unstableArgs", () => { diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 18f44f1..5be8be1 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -7,6 +7,8 @@ import type { FunctionReturnType, FunctionType, FunctionVisibility, + GenericDataModel, + GenericMutationCtx, } from "convex/server"; import type { Validator } from "convex/values"; import type { EventId, SchedulerOptions, WorkflowId } from "../types.js"; @@ -39,90 +41,135 @@ type InlineArgs = inline?: false; }; -export type WorkflowCtx = { - /** - * The ID of the workflow currently running. - */ - workflowId: WorkflowId; - /** - * Run a query with the given name and arguments. - * - * @param query - The query to run, like `internal.index.exampleQuery`. - * @param args - The arguments to the query function. - * @param opts - Options for scheduling and naming the query. - */ - runQuery>( - query: Query, - ...args: OptionalRestArgs - ): Promise>; +export type WorkflowCtx = + { + /** + * The ID of the workflow currently running. + */ + workflowId: WorkflowId; + /** + * Run a query with the given name and arguments. + * + * @param query - The query to run, like `internal.index.exampleQuery`. + * @param args - The arguments to the query function. + * @param opts - Options for scheduling and naming the query. + */ + runQuery>( + query: Query, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run a mutation with the given name and arguments. - * - * @param mutation - The mutation to run, like `internal.index.exampleMutation`. - * @param args - The arguments to the mutation function. - * @param opts - Options for scheduling and naming the mutation. - */ - runMutation< - Mutation extends FunctionReference<"mutation", FunctionVisibility>, - >( - mutation: Mutation, - ...args: OptionalRestArgs - ): Promise>; + /** + * Run a mutation with the given name and arguments. + * + * @param mutation - The mutation to run, like `internal.index.exampleMutation`. + * @param args - The arguments to the mutation function. + * @param opts - Options for scheduling and naming the mutation. + */ + runMutation< + Mutation extends FunctionReference<"mutation", FunctionVisibility>, + >( + mutation: Mutation, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run an action with the given name and arguments. - * - * @param action - The action to run, like `internal.index.exampleAction`. - * @param args - The arguments to the action function. - * @param opts - Options for retrying, scheduling and naming the action. - */ - runAction>( - action: Action, - ...args: OptionalRestArgs - ): Promise>; + /** + * Run an action with the given name and arguments. + * + * @param action - The action to run, like `internal.index.exampleAction`. + * @param args - The arguments to the action function. + * @param opts - Options for retrying, scheduling and naming the action. + */ + runAction>( + action: Action, + ...args: OptionalRestArgs + ): Promise>; - /** - * Run a workflow with the given name and arguments. - * - * @param workflow - The workflow to run, like `internal.index.exampleWorkflow`. - * @param args - The arguments to the workflow function. - * @param opts - Options for retrying, scheduling and naming the workflow. - */ - runWorkflow>( - workflow: Workflow, - args: FunctionArgs["args"], - opts?: RunOptions, - ): Promise>; + /** + * Run a workflow with the given name and arguments. + * + * @param workflow - The workflow to run, like `internal.index.exampleWorkflow`. + * @param args - The arguments to the workflow function. + * @param opts - Options for retrying, scheduling and naming the workflow. + */ + runWorkflow>( + workflow: Workflow, + args: FunctionArgs["args"], + opts?: RunOptions, + ): Promise>; - /** - * Blocks until a matching event is sent to this workflow. - * - * If an ID is specified, an event with that ID must already exist and must - * not already be "awaited" or "consumed". - * - * If a name is specified, the first available event is consumed that matches - * the name. If there is no available event, it will create one with that name - * with status "awaited". - * @param event - */ - awaitEvent( - event: ( - | { name: Name; id?: EventId } - | { name?: Name; id: EventId } - ) & { - validator?: Validator; - }, - ): Promise; + /** + * Run a handler inline within the workflow's mutation transaction. + * The result is journaled like any other step, so on replay it returns the + * saved value without re-executing the handler. + * + * This gives direct access to the underlying mutation context, allowing + * database reads/writes, running queries, and scheduling functions all + * within the same transaction as the workflow handler. + * + * The handler can read from variables in the enclosing scope, but should + * not modify them — on replay the handler is skipped and the journaled + * result is returned, so any side effects outside the handler's return + * value will not be replayed. + * + * To get a fully typed `ctx` with your data model, provide your app's + * `internalMutation` when creating the `WorkflowManager`: + * + * ```ts + * import { internalMutation } from "./_generated/server"; + * const workflow = new WorkflowManager(components.workflow, { + * internalMutation, + * }); + * ``` + * + * @param handler - A function receiving the mutation context to run inline. + * @param opts - Options for naming the step and declaring dependencies. + */ + run( + handler: (ctx: GenericMutationCtx) => T | Promise, + opts?: { + name?: string; + /** + * Dependencies that are validated and journaled as part of this step. + * On replay, the saved deps are compared against the current deps — + * if they differ, the workflow detects a mismatch and re-executes. + * Use this to capture values from the enclosing scope that the handler + * depends on. + * If you pass {}, it will not check for dependency mismatches, akin to + * unstableArgs in step.run* + */ + deps?: Record; + }, + ): Promise; - /** - * Suspend execution for the given duration. - * - * @param duration - The number of milliseconds to sleep. - * @param opts - Optionally name the step. Default: "sleep" - */ - sleep(duration: number, opts?: { name?: string }): Promise; -}; + /** + * Blocks until a matching event is sent to this workflow. + * + * If an ID is specified, an event with that ID must already exist and must + * not already be "awaited" or "consumed". + * + * If a name is specified, the first available event is consumed that matches + * the name. If there is no available event, it will create one with that name + * with status "awaited". + * @param event + */ + awaitEvent( + event: ( + | { name: Name; id?: EventId } + | { name?: Name; id: EventId } + ) & { + validator?: Validator; + }, + ): Promise; + + /** + * Suspend execution for the given duration. + * + * @param duration - The number of milliseconds to sleep. + * @param opts - Optionally name the step. Default: "sleep" + */ + sleep(duration: number, opts?: { name?: string }): Promise; + }; export type OptionalRestArgs< Opts, @@ -132,25 +179,69 @@ export type OptionalRestArgs< ? [args?: Record, opts?: Opts] : [args: FuncRef["_args"], opts?: Opts]; -export function createWorkflowCtx( +export function createWorkflowCtx< + DataModel extends GenericDataModel = GenericDataModel, +>( workflowId: WorkflowId, - sender: BaseChannel, -) { + sender: BaseChannel>, +): WorkflowCtx { + let inlineDepth = 0; + const guardNotInlined = () => { + if (inlineDepth > 0) { + throw new Error( + "Cannot call step methods inside a step.run() handler. " + + "Use the `ctx` argument passed to the handler instead, or " + + "move this call outside of step.run().", + ); + } + }; + return { workflowId, runQuery: async (query, args, opts?) => { + guardNotInlined(); return runFunction(sender, "query", query, args, opts); }, runMutation: async (mutation, args, opts?) => { + guardNotInlined(); return runFunction(sender, "mutation", mutation, args, opts); }, runAction: async (action, args, opts?) => { + guardNotInlined(); return runFunction(sender, "action", action, args, opts); }, + run: async (handler, opts?) => { + guardNotInlined(); + // allow {} to behave like unstableArgs + const unstableArgs = + typeof opts?.deps === "object" && Object.keys(opts.deps).length === 0; + + return run(sender, { + name: opts?.name ?? "run", + target: { + kind: "inline", + handler: async (ctx: GenericMutationCtx) => { + inlineDepth++; + try { + return await handler(ctx); + } finally { + inlineDepth--; + } + }, + args: opts?.deps ?? {}, + }, + retry: undefined, + inline: true, + schedulerOptions: {}, + unstableArgs, + }) as any; + }, + runWorkflow: async (workflow, args, opts?) => { + guardNotInlined(); const { name, unstableArgs, ...schedulerOptions } = opts ?? {}; return run(sender, { name: name ?? safeFunctionName(workflow), @@ -167,6 +258,7 @@ export function createWorkflowCtx( }, sleep: async (duration, opts?) => { + guardNotInlined(); await run(sender, { name: opts?.name ?? "sleep", target: { @@ -181,6 +273,7 @@ export function createWorkflowCtx( }, awaitEvent: async (event) => { + guardNotInlined(); const result = await run(sender, { name: event.name ?? event.id ?? "Event", target: { @@ -197,13 +290,14 @@ export function createWorkflowCtx( } return result as any; }, - } satisfies WorkflowCtx; + }; } async function runFunction< F extends FunctionReference, + DM extends GenericDataModel, >( - sender: BaseChannel, + sender: BaseChannel>, functionType: FunctionType, f: F, args: Record | undefined, @@ -235,9 +329,9 @@ async function runFunction< }); } -async function run( - sender: BaseChannel, - request: Omit, +async function run( + sender: BaseChannel>, + request: Omit, "resolve">, ): Promise { let send: Promise; const p = new Promise((resolve) => { diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 10f2257..2b281f1 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -5,6 +5,8 @@ import { createFunctionHandle, internalMutationGeneric, makeFunctionReference, + type GenericDataModel, + type MutationBuilder, type RegisteredMutation, } from "convex/server"; import { @@ -47,10 +49,14 @@ const INVALID_WORKFLOW_MESSAGE = `Invalid arguments for workflow: Did you invoke // function handle to the workflow component for execution. This function runs // one "poll" of the workflow, replaying its execution from the journal until // it blocks next. -export function workflowMutation( +export function workflowMutation< + ArgsValidator extends PropertyValidators, + DataModel extends GenericDataModel = GenericDataModel, +>( component: WorkflowComponent, registered: WorkflowDefinition & { - handler: WorkflowHandler; + handler: WorkflowHandler; + internalMutation?: MutationBuilder; }, defaultWorkpoolOptions?: WorkpoolOptions, boundFn?: string, @@ -59,7 +65,9 @@ export function workflowMutation( ...defaultWorkpoolOptions, ...registered.workpoolOptions, }; - return internalMutationGeneric({ + const mutationBuilder = (registered.internalMutation ?? + internalMutationGeneric) as typeof internalMutationGeneric; + return mutationBuilder({ handler: async (ctx, args) => { if (!validate(workflowArgs, args) && boundFn && "args" in args) { // Bound workflow called directly with { args: ... } @@ -126,10 +134,10 @@ export function workflowMutation( `Assertion failed: not blocked but have in-progress journal entry`, ); } - const channel = new BaseChannel( + const channel = new BaseChannel>( workpoolOptions.maxParallelism ?? 10, ); - const step = createWorkflowCtx(workflowId, channel); + const step = createWorkflowCtx(workflowId, channel); const executor = new StepExecutor( workflowId, generationNumber, diff --git a/src/component/journal.ts b/src/component/journal.ts index aa48574..6e6c308 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -162,6 +162,10 @@ export const startSteps = mutation({ step.workflowId = workflowId; } else if (step.runResult) { // Already completed inline by the caller — nothing to enqueue. + assert( + !step.kind || step.kind === "function", + `Unexpected inline-completed step kind: ${step.kind}`, + ); console.event("stepCompleted", { workflowId: entry.workflowId, workflowName: workflow.name, @@ -181,7 +185,7 @@ export const startSteps = mutation({ {}, { context, onComplete, name, ...schedulerOptions }, ); - } else { + } else if (!step.kind || step.kind === "function") { const context: OnCompleteContext = { generationNumber, stepId, diff --git a/src/component/workflow.ts b/src/component/workflow.ts index 04bf368..1cffedd 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -160,6 +160,8 @@ function publicStep(step: JournalEntry): WorkflowStep { kind: "workflow", nestedWorkflowId: publicWorkflowId(step.step.workflowId!), }; + // It didn't used to be set + case undefined: case "function": return { ...commonFields, @@ -172,8 +174,10 @@ function publicStep(step: JournalEntry): WorkflowStep { kind: "sleep", workId: step.step.workId!, }; - default: + default: { + const _: never = step.step; throw new Error(`Unknown step kind: ${(step.step as any).kind}`); + } } }