Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type * as inlineTest from "../inlineTest.js";
import type * as nestedWorkflow from "../nestedWorkflow.js";
import type * as oversized from "../oversized.js";
import type * as passingSignals from "../passingSignals.js";
import type * as test_contextRoundtrip from "../test/contextRoundtrip.js";
import type * as test_oldSyntax from "../test/oldSyntax.js";
import type * as transcription from "../transcription.js";
import type * as userConfirmation from "../userConfirmation.js";
Expand All @@ -35,6 +36,7 @@ declare const fullApi: ApiFromModules<{
nestedWorkflow: typeof nestedWorkflow;
oversized: typeof oversized;
passingSignals: typeof passingSignals;
"test/contextRoundtrip": typeof test_contextRoundtrip;
"test/oldSyntax": typeof test_oldSyntax;
transcription: typeof transcription;
userConfirmation: typeof userConfirmation;
Expand Down
157 changes: 157 additions & 0 deletions example/convex/test/contextRoundtrip.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/// <reference types="vite/client" />

import { describe, test, expect, vi, beforeEach, afterEach } from "vitest";
import { createFunctionHandle } from "convex/server";
import { assert } from "convex-helpers";
import { workflow } from "../example";
import { internal } from "../_generated/api";
import { initConvexTest } from "../setup.test";

describe("context round-trips through failure paths", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("[1] direct call + handler throws", async () => {
const t = initConvexTest();
const ctxValue = { case: "directThrow", marker: 12345 };
const workflowId = await t.mutation(async (ctx) => {
const onCompleteHandle = await createFunctionHandle(
internal.test.contextRoundtrip.captureOnComplete,
);
const wfId = await ctx.runMutation(
internal.test.contextRoundtrip.throwingWorkflow,
{
args: {},
onComplete: onCompleteHandle,
context: ctxValue,
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "directThrow",
out: null,
});
return wfId;
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

const flow = await t.query(async (ctx) =>
ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first(),
);
assert(flow);
expect(flow.out?.result?.kind).toBe("failed");
expect(flow.out?.capturedContext).toEqual(ctxValue);
});

test("[2] start() + handler throws", async () => {
const t = initConvexTest();
const ctxValue = { case: "startThrow", marker: 23456 };
const workflowId = await t.mutation(async (ctx) => {
const wfId = await workflow.start(
ctx,
internal.test.contextRoundtrip.throwingWorkflow,
{},
{
onComplete: internal.test.contextRoundtrip.captureOnComplete,
context: ctxValue,
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "startThrow",
out: null,
});
return wfId;
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

const flow = await t.query(async (ctx) =>
ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first(),
);
assert(flow);
expect(flow.out?.result?.kind).toBe("failed");
expect(flow.out?.capturedContext).toEqual(ctxValue);
});

test("[3] direct call + oversized return", async () => {
const t = initConvexTest();
const ctxValue = { case: "directOversized", marker: 34567 };
const workflowId = await t.mutation(async (ctx) => {
const onCompleteHandle = await createFunctionHandle(
internal.test.contextRoundtrip.captureOnComplete,
);
const wfId = await ctx.runMutation(
internal.oversized.largeReturnWorkflow,
{
args: {},
onComplete: onCompleteHandle,
context: ctxValue,
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "directOversized",
out: null,
});
return wfId;
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

const flow = await t.query(async (ctx) =>
ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first(),
);
assert(flow);
expect(flow.out?.result?.kind).toBe("failed");
expect(flow.out?.capturedContext).toEqual(ctxValue);
});

test("[4] start() + oversized return", async () => {
const t = initConvexTest();
const ctxValue = { case: "startOversized", marker: 45678 };
const workflowId = await t.mutation(async (ctx) => {
const wfId = await workflow.start(
ctx,
internal.oversized.largeReturnWorkflow,
{},
{
onComplete: internal.test.contextRoundtrip.captureOnComplete,
context: ctxValue,
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "startOversized",
out: null,
});
return wfId;
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

const flow = await t.query(async (ctx) =>
ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first(),
);
assert(flow);
expect(flow.out?.result?.kind).toBe("failed");
expect(flow.out?.capturedContext).toEqual(ctxValue);
});
});
35 changes: 35 additions & 0 deletions example/convex/test/contextRoundtrip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { v } from "convex/values";
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";
import { internalMutation } from "../_generated/server.js";
import { workflow } from "../example.js";

export const throwingWorkflow = workflow
.define({
args: {},
})
.handler(async () => {
throw new Error("intentional failure");
});

// onComplete that captures both the result and the received context, so tests
// can verify the context round-trips through every failure path.
export const captureOnComplete = internalMutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(),
},
returns: v.null(),
handler: async (ctx, args) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
.first();
if (!flow) return null;
await ctx.db.patch("flows", flow._id, {
out: { result: args.result, capturedContext: args.context },
});
return null;
},
});
67 changes: 37 additions & 30 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,43 @@ export type { RunOptions, WorkflowCtx } from "./workflowContext.js";
export type { WorkflowArgs } from "./workflowMutation.js";
export { vResultValidator } from "@convex-dev/workpool";

export type CallbackOptions<Context = unknown> = {
/**
* A mutation to run after the function succeeds, fails, or is canceled.
* The context type is for your use, feel free to provide a validator for it.
* e.g.
* ```ts
* export const completion = internalMutation({
* args: {
* workflowId: vWorkflowId,
* result: vResultValidator,
* context: v.any(),
* },
* handler: async (ctx, args) => {
* console.log(args.result, "Got Context back -> ", args.context, Date.now() - args.context);
* },
* });
* ```
*/
onComplete?: FunctionReference<
"mutation",
FunctionVisibility,
OnCompleteArgs<Context>
> | null;

/**
* A context object to pass to the `onComplete` mutation.
* Useful for passing data from the enqueue site to the onComplete site.
*/
context?: Context;
};
export type CallbackOptions<Context = unknown> =
| {
/**
* A mutation to run after the workflow succeeds, fails, or is canceled.
* The context type is for your use, feel free to provide a validator for it.
*
* If you don't need `context`, you can set the validator to optional
* with `v.optional(v.any())` and pass `context: undefined`.
*
* ```ts
* export const completion = internalMutation({
* args: {
* workflowId: vWorkflowId,
* result: vResultValidator,
* context: v.optional(v.any()),
* },
* handler: async (ctx, args) => {
* console.log(args.result, "Got Context back -> ", args.context);
* },
* });
* ```
*/
onComplete: FunctionReference<
"mutation",
FunctionVisibility,
OnCompleteArgs<Context>
>;
/**
* A context object to pass to the `onComplete` mutation.
* Useful for passing data from the enqueue site to the onComplete site.
*/
context: Context;
}
| {
onComplete?: undefined;
context?: undefined;
};

export type WorkflowDefinition<
ArgsValidator extends PropertyValidators,
Expand Down
27 changes: 17 additions & 10 deletions src/client/workflowMutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ export type WorkflowArgs<V extends PropertyValidators, Context = unknown> = {
* current transaction.
*/
startAsync?: boolean;
/**
* A function handle (created with createFunctionHandle) that will be called
* when the Workflow completes.
*/
onComplete?: FunctionHandle<"mutation", OnCompleteArgs<Context>>;
/**
* Any extra context to pass to the Workflow.
*/
context?: Context;
};
} & (
| {
/**
* A function handle (created with createFunctionHandle) that will be
* called when the Workflow completes.
*/
onComplete: FunctionHandle<"mutation", OnCompleteArgs<Context>>;
/**
* Context forwarded to the `onComplete` mutation.
*/
context: Context;
}
| {
onComplete?: undefined;
context?: undefined;
}
);
const vWorkflowArgs = v.union(
v.object({
workflowId: vWorkflowId,
Expand Down
Loading