Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 38 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,26 +185,34 @@ export const exampleAction = internalAction({

### Starting a workflow

Once you've defined a workflow, you can start it from a mutation or action using
`workflow.start()`.
Once you've defined a workflow, you can start it from any mutation or action
using `start()`:

```ts
import { start } from "@convex-dev/workflow";
import { internal } from "./_generated/api";

export const kickoffWorkflow = mutation({
handler: async (ctx): Promise<WorkflowId> => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ exampleArg: "James" },
);
// Starts the workflow immediately, run asynchronously
const workflowId = await start(ctx, internal.example.exampleWorkflow, {
exampleArg: "James",
});
return workflowId;
},
});
```

You can also call workflows directly from code, the CLI or dashboard:

```sh
npx convex run example:exampleWorkflow '{ "args": { "exampleArg": "James" } }'
```

### Handling the workflow's result with onComplete

You can handle the workflow's result with `onComplete`. This is useful for
cleaning up any resources used by the workflow.
You can handle the workflow's result with `onComplete` by using the `start()`
helper. This is useful for cleaning up any resources used by the workflow.

Note: when you return things from a workflow, you'll need to specify the return
type of your `handler` to break type cycles due to using `internal.*` functions
Expand All @@ -217,20 +225,19 @@ error instead of success. You can also do validation in the `onComplete` handler
to have more control over handling that situation.

```ts
import { vWorkflowId, Workflow } from "@convex-dev/workflow";
import { start, vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";
import { workflow } from "./example";

export const foo = mutation({
handler: async (ctx): Promise<WorkflowId> => {
const name = "James";
const workflowId = await workflow.start(
const workflowId = await start(
ctx,
internal.example.exampleWorkflow,
{ name },
{
onComplete: internal.example.handleOnComplete,
context: name, // can be anything
context: { intent: "welcome", for: "James" }, // can be anything
},
);
return workflowId;
Expand All @@ -241,10 +248,11 @@ export const handleOnComplete = mutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(), // used to pass through data from the start site.
// used to pass through data from the start site.
context: v.object({ intent: v.string(), for: v.string() }),
},
handler: async (ctx, args): Promise<void> => {
const name = (args.context as { name: string }).name;
const name = args.context.name;
if (args.result.kind === "success") {
const text = args.result.returnValue;
console.log(`${name} result: ${text}`);
Expand Down Expand Up @@ -441,11 +449,13 @@ budget.

### Checking a workflow's status

The `workflow.start()` method returns a `WorkflowId`, which can then be used for
Calling a workflow returns a `WorkflowId` string, which can then be used for
querying a workflow's status.

```ts
import { vWorkflowId, getStatus, WorkflowStatus } from "@convex-dev/workflow";
import { components } from "./_generated/api";
import { query } from "./_generated/server";

export const runWorkflowAndPoll = query({
args: { workflowId: vWorkflowId },
Expand All @@ -471,19 +481,8 @@ executing.
```ts
import { cancel } from "@convex-dev/workflow";

export const kickoffWorkflow = action({
handler: async (ctx): Promise<void> => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
await new Promise((resolve) => setTimeout(resolve, 1000));

// Cancel the workflow after 1 second.
await cancel(ctx, components.workflow, workflowId);
},
});
// ... Inside a mutation or action
await cancel(ctx, components.workflow, workflowId);
```

### Restart a failed workflow
Expand Down Expand Up @@ -540,28 +539,17 @@ After a workflow has completed, you can clean up its storage with `cleanup()`.
Completed workflows are not automatically cleaned up by the system.

```ts
import { cleanup, getStatus } from "@convex-dev/workflow";
import { cleanup, vWorkflowId, vResultValidator } from "@convex-dev/workflow";

export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
try {
while (true) {
const status = await getStatus(ctx, components.workflow, workflowId);
if (status.type === "inProgress") {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
console.log("Workflow completed with status:", status);
break;
}
} finally {
await cleanup(ctx, components.workflow, workflowId);
}
export const afterWorkflow = mutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(),
},
handler: async (ctx, args): Promise<void> => {
// Clean up a completed workflow's storage.
await cleanup(ctx, components.workflow, args.workflowId);
},
});
```
Expand Down
10 changes: 7 additions & 3 deletions example/convex/example.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { v } from "convex/values";
import { WorkflowId, WorkflowManager } from "@convex-dev/workflow";
import {
WorkflowId,
WorkflowManager,
start,
vWorkflowId,
} from "@convex-dev/workflow";
import { internal } from "./_generated/api.js";
import { internalAction, internalMutation } from "./_generated/server.js";
import { components } from "./_generated/api.js";
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";

export const workflow = new WorkflowManager(components.workflow);
Expand Down Expand Up @@ -67,7 +71,7 @@ export const startWorkflow = internalMutation({
returns: v.string(),
handler: async (ctx, args) => {
const location = args.location ?? "San Francisco";
const id: WorkflowId = await workflow.start(
const id: WorkflowId = await start(
ctx,
internal.example.myWorkflow,
{ location },
Expand Down
2 changes: 1 addition & 1 deletion example/convex/oversized.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const onComplete = internalMutation({
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
.first();
if (!flow) return null;
await ctx.db.patch(flow._id, { out: args.result });
await ctx.db.patch("flows", flow._id, { out: args.result });
return null;
},
});
101 changes: 101 additions & 0 deletions example/convex/test/start.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/// <reference types="vite/client" />

import { expect, describe, test, vi, beforeEach, afterEach } from "vitest";
import { initConvexTest } from "../setup.test";
import { components, internal } from "../_generated/api";
import { assert } from "convex-helpers";
import { getStatus, start } from "@convex-dev/workflow";
import { workflow } from "../example";

describe("direct workflow call", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("calling workflow mutation directly starts and completes", async () => {
const t = initConvexTest();
const workflowId = await t.mutation(
internal.catchError.catchErrorWorkflow,
{ args: { manualRetries: 0 } },
);
await t.finishAllScheduledFunctions(vi.runAllTimers);
const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("completed");
assert(status.type === "completed");
expect(status.result).toBe(1);
});

test("direct call to nested child workflow", async () => {
const t = initConvexTest();
const workflowId = await t.mutation(internal.nestedWorkflow.child, {
args: { foo: "hello" },
});
await t.finishAllScheduledFunctions(vi.runAllTimers);
const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("completed");
assert(status.type === "completed");
expect(status.result).toBe(5);
});
});

describe("start() helper", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("start() with startAsync option", async () => {
const t = initConvexTest();
const workflowId = await t.run((ctx) =>
start(
ctx,
internal.catchError.catchErrorWorkflow,
{
manualRetries: 2,
},
{
startAsync: true,
},
),
);
await t.finishAllScheduledFunctions(vi.runAllTimers);
const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("completed");
assert(status.type === "completed");
expect(status.result).toBe(3);
});
});

describe("backwards compatibility", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("workflow.start() still works", async () => {
const t = initConvexTest();
const workflowId = await t.run((ctx) =>
workflow.start(ctx, internal.nestedWorkflow.child, { foo: "test" }),
);
await t.finishAllScheduledFunctions(vi.runAllTimers);
const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("completed");
assert(status.type === "completed");
expect(status.result).toBe(4);
});
});
16 changes: 1 addition & 15 deletions example/convex/transcription.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { v } from "convex/values";
import { OpenAI } from "openai";
import { internal } from "./_generated/api.js";
import { internalAction, internalMutation } from "./_generated/server.js";
import { internalAction } from "./_generated/server.js";
import { workflow } from "./example.js";

function getOpenAI() {
Expand Down Expand Up @@ -36,20 +36,6 @@ export const transcriptionWorkflow = workflow
console.log(embedding.slice(0, 20));
});

export const startTranscription = internalMutation({
args: {
storageId: v.id("_storage"),
},
handler: async (ctx, args) => {
const id: string = await workflow.start(
ctx,
internal.transcription.transcriptionWorkflow,
{ storageId: args.storageId },
);
return id;
},
});

export const computeTranscription = internalAction({
args: {
storageId: v.id("_storage"),
Expand Down
41 changes: 13 additions & 28 deletions example/convex/userConfirmation.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import {
defineEvent,
sendEvent,
vWorkflowId,
WorkflowId,
} from "@convex-dev/workflow";
import { defineEvent, sendEvent, vWorkflowId } from "@convex-dev/workflow";
import { v } from "convex/values";
import { components, internal } from "./_generated/api";
import { internalAction, internalMutation } from "./_generated/server";
Expand All @@ -17,6 +12,18 @@ export const approvalEvent = defineEvent({
),
});

/**
* Test this from the CLI:
* ```sh
npx convex run userConfirmation:confirmationWorkflow \
'{ "args": { "prompt": "Generate a recipe for me" } }'
* ```
* Copy the ID it returns, then run:
* ```sh
* npx convex run userConfirmation:chooseProposal '{"workflowId":"...", "choice":1}'
* ```
* Watch the logs from `npx convex dev` or `npx convex logs` to see progress.
*/
export const confirmationWorkflow = workflow
.define({
args: { prompt: v.string() },
Expand Down Expand Up @@ -58,25 +65,3 @@ export const chooseProposal = internalMutation({
return true;
},
});

/**
* Test this from the CLI:
* ```sh
* npx convex run userConfirmation:startConfirmationWorkflow
* ```
* Copy the ID it returns, then run:
* ```sh
* npx convex run userConfirmation:chooseProposal '{workflowId:"...", choice:1}'
* ```
* Watch the logs from `npx convex dev` or `npx convex logs` to see progress.
*/
export const startConfirmationWorkflow = internalMutation({
args: { prompt: v.optional(v.string()) },
handler: async (ctx, args): Promise<WorkflowId> => {
return await workflow.start(
ctx,
internal.userConfirmation.confirmationWorkflow,
{ prompt: args.prompt ?? "Generate a recipe for me" },
);
},
});
Loading
Loading