diff --git a/README.md b/README.md index b770a18a..91bd5b20 100644 --- a/README.md +++ b/README.md @@ -185,26 +185,34 @@ export const exampleAction = internalAction({ ### Starting a workflow -Once you've defined a workflow, you can start it from a mutation or action using -`workflow.start()`. +Once you've defined a workflow, you can start it from any mutation or action +using `start()`: ```ts +import { start } from "@convex-dev/workflow"; +import { internal } from "./_generated/api"; + export const kickoffWorkflow = mutation({ handler: async (ctx): Promise => { - const workflowId = await workflow.start( - ctx, - internal.example.exampleWorkflow, - { exampleArg: "James" }, - ); + // Starts the workflow immediately, run asynchronously + const workflowId = await start(ctx, internal.example.exampleWorkflow, { + exampleArg: "James", + }); return workflowId; }, }); ``` +You can also call workflows directly from code, the CLI or dashboard: + +```sh +npx convex run example:exampleWorkflow '{ "args": { "exampleArg": "James" } }' +``` + ### Handling the workflow's result with onComplete -You can handle the workflow's result with `onComplete`. This is useful for -cleaning up any resources used by the workflow. +You can handle the workflow's result with `onComplete` by using the `start()` +helper. This is useful for cleaning up any resources used by the workflow. Note: when you return things from a workflow, you'll need to specify the return type of your `handler` to break type cycles due to using `internal.*` functions @@ -217,20 +225,19 @@ error instead of success. You can also do validation in the `onComplete` handler to have more control over handling that situation. ```ts -import { vWorkflowId, Workflow } from "@convex-dev/workflow"; +import { start, vWorkflowId } from "@convex-dev/workflow"; import { vResultValidator } from "@convex-dev/workpool"; -import { workflow } from "./example"; export const foo = mutation({ handler: async (ctx): Promise => { const name = "James"; - const workflowId = await workflow.start( + const workflowId = await start( ctx, internal.example.exampleWorkflow, { name }, { onComplete: internal.example.handleOnComplete, - context: name, // can be anything + context: { intent: "welcome", for: "James" }, // can be anything }, ); return workflowId; @@ -241,10 +248,11 @@ export const handleOnComplete = mutation({ args: { workflowId: vWorkflowId, result: vResultValidator, - context: v.any(), // used to pass through data from the start site. + // used to pass through data from the start site. + context: v.object({ intent: v.string(), for: v.string() }), }, handler: async (ctx, args): Promise => { - const name = (args.context as { name: string }).name; + const name = args.context.name; if (args.result.kind === "success") { const text = args.result.returnValue; console.log(`${name} result: ${text}`); @@ -441,11 +449,13 @@ budget. ### Checking a workflow's status -The `workflow.start()` method returns a `WorkflowId`, which can then be used for +Calling a workflow returns a `WorkflowId` string, which can then be used for querying a workflow's status. ```ts import { vWorkflowId, getStatus, WorkflowStatus } from "@convex-dev/workflow"; +import { components } from "./_generated/api"; +import { query } from "./_generated/server"; export const runWorkflowAndPoll = query({ args: { workflowId: vWorkflowId }, @@ -471,19 +481,8 @@ executing. ```ts import { cancel } from "@convex-dev/workflow"; -export const kickoffWorkflow = action({ - handler: async (ctx): Promise => { - const workflowId = await workflow.start( - ctx, - internal.example.exampleWorkflow, - { name: "James" }, - ); - await new Promise((resolve) => setTimeout(resolve, 1000)); - - // Cancel the workflow after 1 second. - await cancel(ctx, components.workflow, workflowId); - }, -}); +// ... Inside a mutation or action +await cancel(ctx, components.workflow, workflowId); ``` ### Restart a failed workflow @@ -540,28 +539,17 @@ After a workflow has completed, you can clean up its storage with `cleanup()`. Completed workflows are not automatically cleaned up by the system. ```ts -import { cleanup, getStatus } from "@convex-dev/workflow"; +import { cleanup, vWorkflowId, vResultValidator } from "@convex-dev/workflow"; -export const kickoffWorkflow = action({ - handler: async (ctx) => { - const workflowId = await workflow.start( - ctx, - internal.example.exampleWorkflow, - { name: "James" }, - ); - try { - while (true) { - const status = await getStatus(ctx, components.workflow, workflowId); - if (status.type === "inProgress") { - await new Promise((resolve) => setTimeout(resolve, 1000)); - continue; - } - console.log("Workflow completed with status:", status); - break; - } - } finally { - await cleanup(ctx, components.workflow, workflowId); - } +export const afterWorkflow = mutation({ + args: { + workflowId: vWorkflowId, + result: vResultValidator, + context: v.any(), + }, + handler: async (ctx, args): Promise => { + // Clean up a completed workflow's storage. + await cleanup(ctx, components.workflow, args.workflowId); }, }); ``` diff --git a/example/convex/example.ts b/example/convex/example.ts index c83cb942..9afc6d47 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -1,9 +1,13 @@ import { v } from "convex/values"; -import { WorkflowId, WorkflowManager } from "@convex-dev/workflow"; +import { + WorkflowId, + WorkflowManager, + start, + vWorkflowId, +} from "@convex-dev/workflow"; import { internal } from "./_generated/api.js"; import { internalAction, internalMutation } from "./_generated/server.js"; import { components } from "./_generated/api.js"; -import { vWorkflowId } from "@convex-dev/workflow"; import { vResultValidator } from "@convex-dev/workpool"; export const workflow = new WorkflowManager(components.workflow); @@ -67,7 +71,7 @@ export const startWorkflow = internalMutation({ returns: v.string(), handler: async (ctx, args) => { const location = args.location ?? "San Francisco"; - const id: WorkflowId = await workflow.start( + const id: WorkflowId = await start( ctx, internal.example.myWorkflow, { location }, diff --git a/example/convex/oversized.ts b/example/convex/oversized.ts index 03ec9f4c..18a544e2 100644 --- a/example/convex/oversized.ts +++ b/example/convex/oversized.ts @@ -68,7 +68,7 @@ export const onComplete = internalMutation({ .withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId)) .first(); if (!flow) return null; - await ctx.db.patch(flow._id, { out: args.result }); + await ctx.db.patch("flows", flow._id, { out: args.result }); return null; }, }); diff --git a/example/convex/test/start.test.ts b/example/convex/test/start.test.ts new file mode 100644 index 00000000..d6f6ff93 --- /dev/null +++ b/example/convex/test/start.test.ts @@ -0,0 +1,101 @@ +/// + +import { expect, describe, test, vi, beforeEach, afterEach } from "vitest"; +import { initConvexTest } from "../setup.test"; +import { components, internal } from "../_generated/api"; +import { assert } from "convex-helpers"; +import { getStatus, start } from "@convex-dev/workflow"; +import { workflow } from "../example"; + +describe("direct workflow call", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + test("calling workflow mutation directly starts and completes", async () => { + const t = initConvexTest(); + const workflowId = await t.mutation( + internal.catchError.catchErrorWorkflow, + { args: { manualRetries: 0 } }, + ); + await t.finishAllScheduledFunctions(vi.runAllTimers); + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("completed"); + assert(status.type === "completed"); + expect(status.result).toBe(1); + }); + + test("direct call to nested child workflow", async () => { + const t = initConvexTest(); + const workflowId = await t.mutation(internal.nestedWorkflow.child, { + args: { foo: "hello" }, + }); + await t.finishAllScheduledFunctions(vi.runAllTimers); + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("completed"); + assert(status.type === "completed"); + expect(status.result).toBe(5); + }); +}); + +describe("start() helper", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + test("start() with startAsync option", async () => { + const t = initConvexTest(); + const workflowId = await t.run((ctx) => + start( + ctx, + internal.catchError.catchErrorWorkflow, + { + manualRetries: 2, + }, + { + startAsync: true, + }, + ), + ); + await t.finishAllScheduledFunctions(vi.runAllTimers); + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("completed"); + assert(status.type === "completed"); + expect(status.result).toBe(3); + }); +}); + +describe("backwards compatibility", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + test("workflow.start() still works", async () => { + const t = initConvexTest(); + const workflowId = await t.run((ctx) => + workflow.start(ctx, internal.nestedWorkflow.child, { foo: "test" }), + ); + await t.finishAllScheduledFunctions(vi.runAllTimers); + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("completed"); + assert(status.type === "completed"); + expect(status.result).toBe(4); + }); +}); diff --git a/example/convex/transcription.ts b/example/convex/transcription.ts index f797779e..1e7a47dc 100644 --- a/example/convex/transcription.ts +++ b/example/convex/transcription.ts @@ -1,7 +1,7 @@ import { v } from "convex/values"; import { OpenAI } from "openai"; import { internal } from "./_generated/api.js"; -import { internalAction, internalMutation } from "./_generated/server.js"; +import { internalAction } from "./_generated/server.js"; import { workflow } from "./example.js"; function getOpenAI() { @@ -36,20 +36,6 @@ export const transcriptionWorkflow = workflow console.log(embedding.slice(0, 20)); }); -export const startTranscription = internalMutation({ - args: { - storageId: v.id("_storage"), - }, - handler: async (ctx, args) => { - const id: string = await workflow.start( - ctx, - internal.transcription.transcriptionWorkflow, - { storageId: args.storageId }, - ); - return id; - }, -}); - export const computeTranscription = internalAction({ args: { storageId: v.id("_storage"), diff --git a/example/convex/userConfirmation.ts b/example/convex/userConfirmation.ts index 4daf4158..fb1c4ab0 100644 --- a/example/convex/userConfirmation.ts +++ b/example/convex/userConfirmation.ts @@ -1,9 +1,4 @@ -import { - defineEvent, - sendEvent, - vWorkflowId, - WorkflowId, -} from "@convex-dev/workflow"; +import { defineEvent, sendEvent, vWorkflowId } from "@convex-dev/workflow"; import { v } from "convex/values"; import { components, internal } from "./_generated/api"; import { internalAction, internalMutation } from "./_generated/server"; @@ -17,6 +12,18 @@ export const approvalEvent = defineEvent({ ), }); +/** + * Test this from the CLI: + * ```sh + npx convex run userConfirmation:confirmationWorkflow \ + '{ "args": { "prompt": "Generate a recipe for me" } }' + * ``` + * Copy the ID it returns, then run: + * ```sh + * npx convex run userConfirmation:chooseProposal '{"workflowId":"...", "choice":1}' + * ``` + * Watch the logs from `npx convex dev` or `npx convex logs` to see progress. + */ export const confirmationWorkflow = workflow .define({ args: { prompt: v.string() }, @@ -58,25 +65,3 @@ export const chooseProposal = internalMutation({ return true; }, }); - -/** - * Test this from the CLI: - * ```sh - * npx convex run userConfirmation:startConfirmationWorkflow - * ``` - * Copy the ID it returns, then run: - * ```sh - * npx convex run userConfirmation:chooseProposal '{workflowId:"...", choice:1}' - * ``` - * Watch the logs from `npx convex dev` or `npx convex logs` to see progress. - */ -export const startConfirmationWorkflow = internalMutation({ - args: { prompt: v.optional(v.string()) }, - handler: async (ctx, args): Promise => { - return await workflow.start( - ctx, - internal.userConfirmation.confirmationWorkflow, - { prompt: args.prompt ?? "Generate a recipe for me" }, - ); - }, -}); diff --git a/src/client/index.ts b/src/client/index.ts index b75e59cf..3873d350 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -42,8 +42,9 @@ export { } from "../types.js"; export type { RunOptions, WorkflowCtx } from "./workflowContext.js"; export type { WorkflowArgs } from "./workflowMutation.js"; +export { vResultValidator } from "@convex-dev/workpool"; -export type CallbackOptions = { +export type CallbackOptions = { /** * A mutation to run after the function succeeds, fails, or is canceled. * The context type is for your use, feel free to provide a validator for it. @@ -51,9 +52,9 @@ export type CallbackOptions = { * ```ts * export const completion = internalMutation({ * args: { - * workId: workIdValidator, + * workflowId: vWorkflowId, + * result: vResultValidator, * context: v.any(), - * result: resultValidator, * }, * handler: async (ctx, args) => { * console.log(args.result, "Got Context back -> ", args.context, Date.now() - args.context); @@ -64,14 +65,14 @@ export type CallbackOptions = { onComplete?: FunctionReference< "mutation", FunctionVisibility, - OnCompleteArgs + OnCompleteArgs > | null; /** * A context object to pass to the `onComplete` mutation. * Useful for passing data from the enqueue site to the onComplete site. */ - context?: unknown; + context?: Context; }; export type WorkflowDefinition< @@ -100,18 +101,23 @@ export type WorkflowStatus = /** * Define a new workflow with typed args and optional return validator. * - * * @example * ```ts - * export const doSomething = defineWorkflow({ + * export const myWorkflow = defineWorkflow(components.workflow, { * args: { amount: v.number() }, * returns: v.object({ total: v.number() }), * }).handler(async (step, args) => { * ...workflow implementation * }); + * ``` * - * // Start from a mutation or action: - * const id = await workflow.start(ctx, internal.myFile.myWorkflow, { amount }); + * Start the workflow from a mutation or action: + * ```ts + * const workflowId = await start(ctx, internal.myFile.myWorkflow, { amount: 42 }); + * ``` + * Or call it directly: + * ```ts + * const workflowId = await ctx.runMutation(internal.myFile.myWorkflow, { args: { ...myArgs } }); * ``` */ export function defineWorkflow< @@ -130,11 +136,7 @@ export function defineWorkflow< step: WorkflowCtx, args: ObjectType, ) => Promise>, - ): RegisteredMutation< - "internal", - WorkflowArgs, - ReturnValueForOptionalValidator - >; + ): RegisteredMutation<"internal", WorkflowArgs, WorkflowId>; } { return { handler: (fn) => @@ -142,56 +144,68 @@ export function defineWorkflow< }; } -export interface Workflow< - AV extends PropertyValidators, - RV extends Validator | void, -> { - /** - * Define the workflow handler function. - * Returns a registered mutation to export from your Convex module. - */ - handler( - fn: ( - step: WorkflowCtx, - args: ObjectType, - ) => Promise>, - ): RegisteredMutation< - "internal", - WorkflowArgs, - ReturnValueForOptionalValidator - >; +// ── Standalone workflow management functions ───────────────────────── +// These take ctx first, then a workflow component, so they can be +// used without a WorkflowManager instance. +type StartOptions = CallbackOptions & { /** - * Kick off a defined workflow. - * - * @param ctx - The Convex context. - * @param args - The workflow arguments. - * @param options - The workflow options. - * @returns The workflow ID. + * By default, during creation the workflow will be initiated immediately. + * With `startAsync` set to true, the workflow will be created but will + * start asynchronously via the internal workpool. + * @default false */ - start( - ctx: RunMutationCtx, - args: ObjectType, - options?: CallbackOptions & { - /** - * By default, during creation the workflow will be initiated immediately. - * The benefit is that you catch errors earlier (e.g. passing a bad - * workflow reference or catch arg validation). - * - * With `startAsync` set to true, the workflow will be created but will - * start asynchronously via the internal workpool. - * You can use this to queue up a lot of work, - * or make `start` return faster (you still get a workflowId back). - * @default false - */ - startAsync?: boolean; - }, - ): Promise; -} + startAsync?: boolean; +}; -// ── Standalone workflow management functions ───────────────────────── -// These take ctx first, then a workflow component, so they can be -// used without a WorkflowManager instance. +/** + * Start a workflow + * + * It will run asynchronously, returning a workflow ID to monitor the progress. + * + * By default it will start running the handler as part of "start" unless + * `startAsync` is set to true. + * + * ```ts + * const id = await start(ctx, internal.myFile.myWorkflow, { ...args }, { + * onComplete: internal.myFile.handleComplete, + * context: { ...passed through to onComplete }, + * }); + * ``` + * + * @param ctx - The Convex mutation or action context. + * @param workflow - The workflow to start (e.g. `internal.myFile.myWorkflow`). + * @param args - The workflow arguments. + * @param options - Options like `onComplete`, `context`, `startAsync`. + * @returns The workflow ID. + */ +export async function start< + Context = unknown, + F extends FunctionReference<"mutation", "internal"> = FunctionReference< + "mutation", + "internal" + >, +>( + ctx: RunMutationCtx, + workflow: F, + args: FunctionArgs["args"], + options?: StartOptions, +): Promise { + const formatted: Record = { args }; + if (options?.onComplete) { + formatted.onComplete = await createFunctionHandle(options.onComplete); + } + if (options?.context !== undefined) { + formatted.context = options.context; + } + if (options?.startAsync !== undefined) { + formatted.startAsync = options.startAsync; + } + return (await ctx.runMutation( + workflow as any, + formatted as any, + )) as unknown as WorkflowId; +} /** * Get a workflow's status. @@ -466,6 +480,15 @@ export class WorkflowManager { /** * Define a new workflow. * + * Start the workflow from a mutation or action: + * ```ts + * const workflowId = await start(ctx, internal.myFile.myWorkflow, { ...myArgs }); + * ``` + * Or call it directly: + * ```ts + * const workflowId = await ctx.runMutation(internal.myFile.myWorkflow, { args: { ...myArgs } }); + * ``` + * * @param workflow - The workflow definition. * @returns The workflow mutation. */ @@ -476,11 +499,7 @@ export class WorkflowManager { workflow: WorkflowDefinition & { handler: WorkflowHandler; }, - ): RegisteredMutation< - "internal", - WorkflowArgs, - ReturnValueForOptionalValidator - >; + ): RegisteredMutation<"internal", WorkflowArgs, WorkflowId>; define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, @@ -496,11 +515,7 @@ export class WorkflowManager { step: WorkflowCtx, args: ObjectType, ) => Promise>, - ): RegisteredMutation< - "internal", - WorkflowArgs, - ReturnValueForOptionalValidator - >; + ): RegisteredMutation<"internal", WorkflowArgs, WorkflowId>; }; define< ArgsValidator extends PropertyValidators, @@ -533,18 +548,29 @@ export class WorkflowManager { } /** - * Kick off a defined workflow. + * Start a workflow. + * + * Alternative to `start` (`import { start } from "@convex-dev/workflow"`). + * + * This is slightly more efficient than calling `start` when passing + * `startAsync: true`, and slightly less efficient in the default case. * * @param ctx - The Convex context. * @param workflow - The workflow to start (e.g. `internal.index.exampleWorkflow`). * @param args - The workflow arguments. * @returns The workflow ID. */ - async start>( + async start< + Context = unknown, + F extends FunctionReference<"mutation", "internal"> = FunctionReference< + "mutation", + "internal" + >, + >( ctx: RunMutationCtx, workflow: F, args: FunctionArgs["args"], - options?: CallbackOptions & { + options?: CallbackOptions & { /** * By default, during creation the workflow will be initiated immediately. * The benefit is that you catch errors earlier (e.g. passing a bad @@ -557,10 +583,11 @@ export class WorkflowManager { * @default false */ startAsync?: boolean; - /** @deprecated Use `startAsync` instead. */ - validateAsync?: boolean; }, ): Promise { + if (!options?.startAsync) { + return start(ctx, workflow, args, options); + } const handle = await createFunctionHandle(workflow); const onComplete = options?.onComplete ? { @@ -574,7 +601,7 @@ export class WorkflowManager { workflowArgs: args, maxParallelism: this.options?.workpoolOptions?.maxParallelism, onComplete, - startAsync: options?.startAsync ?? options?.validateAsync, + startAsync: true, }); return workflowId as unknown as WorkflowId; } diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 10f22577..e8ec29fa 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -1,3 +1,4 @@ +import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { BaseChannel } from "async-channel"; import { assert } from "convex-helpers"; import { validate, ValidationError } from "convex-helpers/validators"; @@ -5,44 +6,60 @@ import { createFunctionHandle, internalMutationGeneric, makeFunctionReference, + type FunctionHandle, type RegisteredMutation, } from "convex/server"; import { asObjectValidator, + v, type ObjectType, type PropertyValidators, - v, } from "convex/values"; import { createLogger } from "../component/logging.js"; import { type JournalEntry } from "../component/schema.js"; +import { formatErrorWithStack } from "../shared.js"; +import { vWorkflowId, type OnCompleteArgs, type WorkflowId } from "../types.js"; import { setupEnvironment } from "./environment.js"; import type { WorkflowDefinition, WorkflowHandler } from "./index.js"; import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js"; -import { createWorkflowCtx } from "./workflowContext.js"; -import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { type WorkflowComponent } from "./types.js"; -import { vWorkflowId } from "../types.js"; -import { formatErrorWithStack } from "../shared.js"; -import { safeFunctionName } from "./safeFunctionName.js"; +import { createWorkflowCtx } from "./workflowContext.js"; -const workflowArgs = v.union( +export type WorkflowArgs = { + /** + * The arguments to pass to the Workflow handler. + */ + args: ObjectType; + /** + * Whether to enqueue the Workflow for asynchronous execution only. + * By default it will start evaluating the handler's first step in the + * current transaction. + */ + startAsync?: boolean; + /** + * A function handle (created with createFunctionHandle) that will be called + * when the Workflow completes. + */ + onComplete?: FunctionHandle<"mutation", OnCompleteArgs>; + /** + * Any extra context to pass to the Workflow. + */ + context?: Context; +}; +const vWorkflowArgs = v.union( v.object({ workflowId: vWorkflowId, generationNumber: v.number(), }), v.object({ - fn: v.string(), + fn: v.optional(v.string()), args: v.any(), + startAsync: v.optional(v.boolean()), + onComplete: v.optional(v.string()), + context: v.optional(v.any()), }), ); -export type WorkflowArgs = { - fn: "You should not call this directly, call workflow.start instead"; - args: ObjectType; -}; - -const INVALID_WORKFLOW_MESSAGE = `Invalid arguments for workflow: Did you invoke the workflow with ctx.runMutation() instead of workflow.start()? Pro tip: to start a workflow directly from the CLI or dashboard, you can use args '{ fn: "path/to/file:workflowName", args: { ...your workflow args } }'`; - // This function is defined in the calling component but then gets passed by // function handle to the workflow component for execution. This function runs // one "poll" of the workflow, replaying its execution from the journal until @@ -53,39 +70,65 @@ export function workflowMutation( handler: WorkflowHandler; }, defaultWorkpoolOptions?: WorkpoolOptions, - boundFn?: string, -): RegisteredMutation<"internal", WorkflowArgs, void> { +): RegisteredMutation<"internal", WorkflowArgs, WorkflowId> { const workpoolOptions = { ...defaultWorkpoolOptions, ...registered.workpoolOptions, }; return internalMutationGeneric({ - handler: async (ctx, args) => { - if (!validate(workflowArgs, args) && boundFn && "args" in args) { - // Bound workflow called directly with { args: ... } - const fn = makeFunctionReference(boundFn); - const workflowId = await ctx.runMutation(component.workflow.create, { - workflowName: safeFunctionName(fn), - workflowHandle: await createFunctionHandle(fn), - workflowArgs: args.args, - maxParallelism: workpoolOptions.maxParallelism, - }); - return workflowId; - } - if (!validate(workflowArgs, args)) { - throw new Error(INVALID_WORKFLOW_MESSAGE); + handler: async (ctx, args): Promise => { + if (!validate(vWorkflowArgs, args)) { + if (!("workflowId" in args) && !("args" in args)) { + const console = createLogger(workpoolOptions?.logLevel); + console.error( + `Invalid arguments for workflow: When calling it directly, use '{ args: { ...your workflow args } }'`, + ); + } + assert(validate(vWorkflowArgs, args, { throw: true })); } - if ("fn" in args) { - const fn = makeFunctionReference(args.fn); - const workflowId = await ctx.runMutation(component.workflow.create, { - workflowName: safeFunctionName(fn), + let workflowId: WorkflowId, generationNumber: number; + + // Direct call { args: {...}, onComplete?, context?, startAsync? } + if ("args" in args) { + const metadata = await getFunctionMetadata(); + // CLI/dashboard format { fn: "path/to:fn", args: {...} } (deprecated) + if ("fn" in args && typeof args.fn === "string") { + const console = createLogger(workpoolOptions?.logLevel); + if (args.fn !== metadata.name) { + console.error( + `[workflow] Error: calling workflow with { fn: "${args.fn}", args } ` + + `but the function name does not match the workflow name ${metadata.name}. Use { args: { ...yourArgs } } without "fn" to start this workflow, ` + + `or use the start() function.`, + ); + throw new Error(`Invalid workflow function reference: ${args.fn}`); + } + console.warn( + `[workflow] Deprecation warning: calling a workflow with { fn, args } is deprecated. You no longer need to pass "fn". Use { args: { ...yourArgs } } to start a workflow directly.`, + ); + } + const fn = makeFunctionReference(metadata.name); + const onComplete = + typeof args.onComplete === "string" + ? { fnHandle: args.onComplete, context: args.context } + : undefined; + workflowId = (await ctx.runMutation(component.workflow.create, { + workflowName: metadata.name, workflowHandle: await createFunctionHandle(fn), workflowArgs: args.args, maxParallelism: workpoolOptions.maxParallelism, - }); - return workflowId; + onComplete, + startAsync: args.startAsync ?? undefined, + createOnly: !args.startAsync, // either start async or run inline here + })) as WorkflowId; + if (args.startAsync) { + return workflowId; + } + generationNumber = 0; + } else { + workflowId = args.workflowId; + generationNumber = args.generationNumber; } - const { workflowId, generationNumber } = args; + const { workflow, logLevel, journalEntries, ok } = await ctx.runQuery( component.journal.load, { workflowId, shortCircuit: true }, @@ -99,17 +142,17 @@ export function workflowMutation( generationNumber, runResult: { kind: "failed", error: "Failed to load journal" }, }); - return; + return workflowId; } if (workflow.generationNumber !== generationNumber) { console.error( `Invalid generation number: ${generationNumber} running workflow ${workflow.name} (${workflowId})`, ); - return; + return workflowId; } if (workflow.runResult?.kind === "success") { console.log(`Workflow ${workflowId} completed, returning.`); - return; + return workflowId; } if (inProgress.length > 0) { console.log( @@ -118,7 +161,7 @@ export function workflowMutation( .map((entry) => `${entry.step.name} (${entry._id})`) .join(", "), ); - return; + return workflowId; } for (const journalEntry of journalEntries) { assert( @@ -205,9 +248,47 @@ export function workflowMutation( } finally { restoreEnvironment(); } + return workflowId; }, - }) as any; + }); } // eslint-disable-next-line @typescript-eslint/no-unused-vars -const console = "THIS IS A REMINDER TO USE getDefaultLogger"; +const console = "THIS IS A REMINDER TO USE createLogger"; + +// TODO: replace with ctx.meta.getFunctionMetadata() in 1.36+ +export async function getFunctionMetadata(): Promise<{ + name: string; + componentPath: string; +}> { + const syscalls = (global as any).Convex; + return JSON.parse( + await syscalls.asyncSyscall("1.0/getFunctionMetadata", JSON.stringify({})), + ); +} + +type TransactionMetric = { + used: number; + remaining: number; +}; + +type TransactionMetrics = { + bytesRead: TransactionMetric; + bytesWritten: TransactionMetric; + databaseQueries: TransactionMetric; + documentsRead: TransactionMetric; + documentsWritten: TransactionMetric; + functionsScheduled: TransactionMetric; + scheduledFunctionArgsBytes: TransactionMetric; +}; + +// TODO: replace with ctx.meta.getTransactionMetrics() in 1.36+ +export async function getTransactionMetrics(): Promise { + const syscalls = (global as any).Convex; + return JSON.parse( + await syscalls.asyncSyscall( + "1.0/getTransactionMetrics", + JSON.stringify({}), + ), + ); +} diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 0c75eeae..3a52bea4 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -336,6 +336,7 @@ export type ComponentApi = "mutation", "internal", { + createOnly?: boolean; maxParallelism?: number; onComplete?: { context?: any; fnHandle: string }; startAsync?: boolean; diff --git a/src/component/schema.ts b/src/component/schema.ts index 75443203..c3250506 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -1,14 +1,13 @@ import { vResultValidator, vWorkIdValidator } from "@convex-dev/workpool"; +import { deprecated, literals } from "convex-helpers/validators"; import { defineSchema, defineTable } from "convex/server"; import { type Infer, v } from "convex/values"; import { logLevel } from "./logging.js"; -import { deprecated, literals } from "convex-helpers/validators"; export const vOnComplete = v.object({ fnHandle: v.string(), // mutation context: v.optional(v.any()), }); -export type OnComplete = Infer; const workflowObject = { name: v.optional(v.string()), diff --git a/src/component/workflow.ts b/src/component/workflow.ts index 04bf3684..9105ee09 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -45,6 +45,7 @@ const createArgs = v.object({ maxParallelism: v.optional(v.number()), onComplete: v.optional(vOnComplete), startAsync: v.optional(v.boolean()), + createOnly: v.optional(v.boolean()), // TODO: ttl }); export const create = mutation({ @@ -73,6 +74,10 @@ export async function createHandler( args.workflowHandle, ); if (args.startAsync) { + assert( + !args.createOnly, + "Cannot startAsync and createOnly at the same time", + ); const workpool = await getWorkpool(ctx, args); await workpool.enqueueMutation( ctx, @@ -85,7 +90,7 @@ export async function createHandler( ...schedulerOptions, }, ); - } else { + } else if (!args.createOnly) { // If we can't start it, may as well not create it, eh? Fail fast... await ctx.runMutation(args.workflowHandle as FunctionHandle<"mutation">, { workflowId, diff --git a/src/types.ts b/src/types.ts index c4b6ad69..3c9988d6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,6 +4,7 @@ import { type RunResult, type WorkId, } from "@convex-dev/workpool"; +import type { FunctionHandle } from "convex/server"; import { v, type Infer, @@ -105,7 +106,13 @@ export type SchedulerOptions = runAt?: never; }; -export type OnCompleteArgs = { +// The argument to "workflow.create" / calling the function directly +export type OnComplete = { + fnHandle: FunctionHandle<"mutation", OnCompleteArgs>; + context?: Context; +}; + +export type OnCompleteArgs = { /** * The ID of the work that completed. */ @@ -114,7 +121,7 @@ export type OnCompleteArgs = { * The context object passed when enqueuing the work. * Useful for passing data from the enqueue site to the onComplete site. */ - context: unknown; + context: Context; /** * The result of the run that completed. */