diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index a01f78be..3c3163ae 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -14,6 +14,7 @@ import type * as e2e from "../e2e.js"; import type * as example from "../example.js"; import type * as inlineTest from "../inlineTest.js"; import type * as nestedWorkflow from "../nestedWorkflow.js"; +import type * as oversized from "../oversized.js"; import type * as passingSignals from "../passingSignals.js"; import type * as test_oldSyntax from "../test/oldSyntax.js"; import type * as transcription from "../transcription.js"; @@ -32,6 +33,7 @@ declare const fullApi: ApiFromModules<{ example: typeof example; inlineTest: typeof inlineTest; nestedWorkflow: typeof nestedWorkflow; + oversized: typeof oversized; passingSignals: typeof passingSignals; "test/oldSyntax": typeof test_oldSyntax; transcription: typeof transcription; diff --git a/example/convex/oversized.test.ts b/example/convex/oversized.test.ts new file mode 100644 index 00000000..45444e13 --- /dev/null +++ b/example/convex/oversized.test.ts @@ -0,0 +1,111 @@ +/// + +import { expect, describe, test, vi, beforeEach, afterEach } from "vitest"; +import { initConvexTest } from "./setup.test"; +import { components, internal } from "./_generated/api"; +import { assert } from "convex-helpers"; +import { getStatus } from "@convex-dev/workflow"; +import { workflow } from "./example"; + +describe("oversized values", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + test("large step return value fails workflow and calls onComplete", async () => { + const t = initConvexTest(); + const workflowId = await t.mutation(async (ctx) => { + const wfId = await workflow.start( + ctx, + internal.oversized.largeReturnWorkflow, + {}, + { + onComplete: internal.oversized.onComplete, + context: {}, + startAsync: true, + }, + ); + await ctx.db.insert("flows", { + workflowId: wfId, + in: "largeReturn", + out: null, + }); + return wfId; + }); + await t.finishAllScheduledFunctions(vi.runAllTimers); + + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("failed"); + assert(status.type === "failed"); + expect(status.error).toContain("Step return value too large"); + + const flow = await t.query(async (ctx) => { + return ctx.db + .query("flows") + .withIndex("workflowId", (q) => q.eq("workflowId", workflowId)) + .first(); + }); + expect(flow).not.toBeNull(); + assert(flow); + expect(flow.out).not.toBeNull(); + expect(flow.out.kind).toBe("failed"); + expect(flow.out.error).toContain("Step return value too large"); + }); + + test("large event value fails workflow and calls onComplete", async () => { + const t = initConvexTest(); + const workflowId = await t.mutation(async (ctx) => { + const wfId = await workflow.start( + ctx, + internal.oversized.eventWorkflow, + {}, + { + onComplete: internal.oversized.onComplete, + context: {}, + startAsync: true, + }, + ); + await ctx.db.insert("flows", { + workflowId: wfId, + in: "eventWorkflow", + out: null, + }); + return wfId; + }); + // Let the workflow start and reach the awaitEvent point + await t.finishAllScheduledFunctions(vi.runAllTimers); + + const status = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status.type).toBe("inProgress"); + + // Send the oversized event + await t.mutation(internal.oversized.sendBigEvent, { workflowId }); + await t.finishAllScheduledFunctions(vi.runAllTimers); + + const status2 = await t.run((ctx) => + getStatus(ctx, components.workflow, workflowId), + ); + expect(status2.type).toBe("failed"); + assert(status2.type === "failed"); + expect(status2.error).toContain("Step return value too large"); + expect(status2.error).toContain("900002 bytes"); + + const flow = await t.query(async (ctx) => { + return ctx.db + .query("flows") + .withIndex("workflowId", (q) => q.eq("workflowId", workflowId)) + .first(); + }); + expect(flow).not.toBeNull(); + expect(flow!.out).not.toBeNull(); + expect(flow!.out.kind).toBe("failed"); + expect(flow!.out.error).toContain("Step return value too large"); + }); +}); diff --git a/example/convex/oversized.ts b/example/convex/oversized.ts new file mode 100644 index 00000000..03ec9f4c --- /dev/null +++ b/example/convex/oversized.ts @@ -0,0 +1,74 @@ +import { v } from "convex/values"; +import { sendEvent, defineEvent } from "@convex-dev/workflow"; +import { components, internal } from "./_generated/api.js"; +import { internalAction, internalMutation } from "./_generated/server.js"; +import { vWorkflowId } from "@convex-dev/workflow"; +import { vResultValidator } from "@convex-dev/workpool"; +import { workflow } from "./example.js"; + +// Action that returns a value larger than 800KB. +export const largeReturnAction = internalAction({ + args: {}, + returns: v.string(), + handler: async (): Promise => { + return "x".repeat(900_000); + }, +}); + +export const largeReturnWorkflow = workflow + .define({ + args: {}, + }) + .handler(async (step) => { + const result = await step.runAction( + internal.oversized.largeReturnAction, + {}, + ); + return result; + }); + +export const bigEvent = defineEvent({ + name: "bigEvent", + validator: v.string(), +}); + +export const eventWorkflow = workflow + .define({ + args: {}, + }) + .handler(async (step) => { + const result = await step.awaitEvent(bigEvent); + return result; + }); + +export const sendBigEvent = internalMutation({ + args: { + workflowId: vWorkflowId, + }, + returns: v.null(), + handler: async (ctx, args) => { + await sendEvent(ctx, components.workflow, { + ...bigEvent, + workflowId: args.workflowId, + value: "y".repeat(900_000), + }); + }, +}); + +export const onComplete = internalMutation({ + args: { + workflowId: vWorkflowId, + result: vResultValidator, + context: v.any(), + }, + returns: v.null(), + handler: async (ctx, args) => { + const flow = await ctx.db + .query("flows") + .withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId)) + .first(); + if (!flow) return null; + await ctx.db.patch(flow._id, { out: args.result }); + return null; + }, +}); diff --git a/src/component/_generated/api.ts b/src/component/_generated/api.ts index 9a930507..3e00bc83 100644 --- a/src/component/_generated/api.ts +++ b/src/component/_generated/api.ts @@ -12,6 +12,7 @@ import type * as event from "../event.js"; import type * as journal from "../journal.js"; import type * as logging from "../logging.js"; import type * as model from "../model.js"; +import type * as oversizedValues from "../oversizedValues.js"; import type * as pool from "../pool.js"; import type * as utils from "../utils.js"; import type * as workflow from "../workflow.js"; @@ -28,6 +29,7 @@ const fullApi: ApiFromModules<{ journal: typeof journal; logging: typeof logging; model: typeof model; + oversizedValues: typeof oversizedValues; pool: typeof pool; utils: typeof utils; workflow: typeof workflow; diff --git a/src/component/event.ts b/src/component/event.ts index a002316c..f0fef08f 100644 --- a/src/component/event.ts +++ b/src/component/event.ts @@ -6,6 +6,7 @@ import { vResultValidator } from "@convex-dev/workpool"; import type { Doc, Id } from "./_generated/dataModel.js"; import { assert } from "convex-helpers"; import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js"; +import { checkForOversizedResult } from "./oversizedValues.js"; export async function awaitEvent( ctx: MutationCtx, @@ -40,7 +41,7 @@ export async function awaitEvent( stepId: entry._id, }, }); - entry.step.runResult = event.state.result; + entry.step.runResult = checkForOversizedResult(event.state.result); entry.step.inProgress = false; entry.step.completedAt = Date.now(); break; @@ -146,7 +147,7 @@ export const send = mutation({ ); assert(step.step.kind === "event", "Step is not an event"); step.step.eventId = event._id; - step.step.runResult = args.result; + step.step.runResult = checkForOversizedResult(args.result); step.step.inProgress = false; step.step.completedAt = Date.now(); await ctx.db.replace(step._id, step); diff --git a/src/component/oversizedValues.ts b/src/component/oversizedValues.ts new file mode 100644 index 00000000..ee6905ab --- /dev/null +++ b/src/component/oversizedValues.ts @@ -0,0 +1,34 @@ +import { type Value, convexToJson, getConvexSize } from "convex/values"; +import type { RunResult } from "@convex-dev/workpool"; + +export const MAX_RETURN_VALUE_SIZE = 800 << 10; // 800 KiB +const PREVIEW_SIZE = 128 << 10; // 128 KB + +function truncatedPreview(returnValue: unknown): string { + const json = JSON.stringify(convexToJson(returnValue as Value)); + if (json.length <= PREVIEW_SIZE * 2) { + return json; + } + return json.slice(0, PREVIEW_SIZE) + "..." + json.slice(-PREVIEW_SIZE); +} + +export function checkReturnValueSize( + returnValue: Value | undefined, +): string | null { + const size = getConvexSize(returnValue); + if (size > MAX_RETURN_VALUE_SIZE) { + return `Step return value too large (${size} bytes). Maximum is ${MAX_RETURN_VALUE_SIZE} bytes. Preview: ${truncatedPreview(returnValue)}`; + } + return null; +} + +export function checkForOversizedResult(result: RunResult): RunResult { + if (result.kind !== "success") { + return result; + } + const sizeError = checkReturnValueSize(result.returnValue); + if (!sizeError) { + return result; + } + return { kind: "failed", error: sizeError }; +} diff --git a/src/component/pool.ts b/src/component/pool.ts index cdaf82fb..87d5a921 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -22,6 +22,7 @@ import { getDefaultLogger } from "./utils.js"; import { completeHandler } from "./workflow.js"; import type { Doc } from "./_generated/dataModel.js"; import { vWorkflowId, type WorkflowId } from "../types.js"; +import { checkForOversizedResult } from "./oversizedValues.js"; export const workpoolOptions = v.object({ logLevel: v.optional(logLevel), @@ -147,32 +148,15 @@ async function onCompleteHandler( } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); - switch (args.result.kind) { - case "success": - journalEntry.step.runResult = { - kind: "success", - returnValue: args.result.returnValue, - }; - break; - case "failed": - journalEntry.step.runResult = { - kind: "failed", - error: args.result.error, - }; - break; - case "canceled": - journalEntry.step.runResult = { - kind: "canceled", - }; - break; - } + const runResult = checkForOversizedResult(args.result); + journalEntry.step.runResult = runResult; await ctx.db.replace(journalEntry._id, journalEntry); console.debug(`Completed execution of ${stepId}`, journalEntry); console.event("stepCompleted", { workflowId, workflowName: workflow.name, - status: args.result.kind, + status: journalEntry.step.runResult.kind, stepName: journalEntry.step.name, stepNumber: journalEntry.stepNumber, durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, @@ -180,7 +164,7 @@ async function onCompleteHandler( if (workflow.runResult !== undefined) { if (workflow.runResult.kind !== "canceled") { console.error( - `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${args.result.kind}`, + `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${journalEntry.step.runResult.kind}`, ); } return;