diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts
index a01f78be..3c3163ae 100644
--- a/example/convex/_generated/api.d.ts
+++ b/example/convex/_generated/api.d.ts
@@ -14,6 +14,7 @@ import type * as e2e from "../e2e.js";
import type * as example from "../example.js";
import type * as inlineTest from "../inlineTest.js";
import type * as nestedWorkflow from "../nestedWorkflow.js";
+import type * as oversized from "../oversized.js";
import type * as passingSignals from "../passingSignals.js";
import type * as test_oldSyntax from "../test/oldSyntax.js";
import type * as transcription from "../transcription.js";
@@ -32,6 +33,7 @@ declare const fullApi: ApiFromModules<{
example: typeof example;
inlineTest: typeof inlineTest;
nestedWorkflow: typeof nestedWorkflow;
+ oversized: typeof oversized;
passingSignals: typeof passingSignals;
"test/oldSyntax": typeof test_oldSyntax;
transcription: typeof transcription;
diff --git a/example/convex/oversized.test.ts b/example/convex/oversized.test.ts
new file mode 100644
index 00000000..45444e13
--- /dev/null
+++ b/example/convex/oversized.test.ts
@@ -0,0 +1,111 @@
+///
+
+import { expect, describe, test, vi, beforeEach, afterEach } from "vitest";
+import { initConvexTest } from "./setup.test";
+import { components, internal } from "./_generated/api";
+import { assert } from "convex-helpers";
+import { getStatus } from "@convex-dev/workflow";
+import { workflow } from "./example";
+
+describe("oversized values", () => {
+ beforeEach(() => {
+ vi.useFakeTimers();
+ });
+ afterEach(() => {
+ vi.useRealTimers();
+ });
+
+ test("large step return value fails workflow and calls onComplete", async () => {
+ const t = initConvexTest();
+ const workflowId = await t.mutation(async (ctx) => {
+ const wfId = await workflow.start(
+ ctx,
+ internal.oversized.largeReturnWorkflow,
+ {},
+ {
+ onComplete: internal.oversized.onComplete,
+ context: {},
+ startAsync: true,
+ },
+ );
+ await ctx.db.insert("flows", {
+ workflowId: wfId,
+ in: "largeReturn",
+ out: null,
+ });
+ return wfId;
+ });
+ await t.finishAllScheduledFunctions(vi.runAllTimers);
+
+ const status = await t.run((ctx) =>
+ getStatus(ctx, components.workflow, workflowId),
+ );
+ expect(status.type).toBe("failed");
+ assert(status.type === "failed");
+ expect(status.error).toContain("Step return value too large");
+
+ const flow = await t.query(async (ctx) => {
+ return ctx.db
+ .query("flows")
+ .withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
+ .first();
+ });
+ expect(flow).not.toBeNull();
+ assert(flow);
+ expect(flow.out).not.toBeNull();
+ expect(flow.out.kind).toBe("failed");
+ expect(flow.out.error).toContain("Step return value too large");
+ });
+
+ test("large event value fails workflow and calls onComplete", async () => {
+ const t = initConvexTest();
+ const workflowId = await t.mutation(async (ctx) => {
+ const wfId = await workflow.start(
+ ctx,
+ internal.oversized.eventWorkflow,
+ {},
+ {
+ onComplete: internal.oversized.onComplete,
+ context: {},
+ startAsync: true,
+ },
+ );
+ await ctx.db.insert("flows", {
+ workflowId: wfId,
+ in: "eventWorkflow",
+ out: null,
+ });
+ return wfId;
+ });
+ // Let the workflow start and reach the awaitEvent point
+ await t.finishAllScheduledFunctions(vi.runAllTimers);
+
+ const status = await t.run((ctx) =>
+ getStatus(ctx, components.workflow, workflowId),
+ );
+ expect(status.type).toBe("inProgress");
+
+ // Send the oversized event
+ await t.mutation(internal.oversized.sendBigEvent, { workflowId });
+ await t.finishAllScheduledFunctions(vi.runAllTimers);
+
+ const status2 = await t.run((ctx) =>
+ getStatus(ctx, components.workflow, workflowId),
+ );
+ expect(status2.type).toBe("failed");
+ assert(status2.type === "failed");
+ expect(status2.error).toContain("Step return value too large");
+ expect(status2.error).toContain("900002 bytes");
+
+ const flow = await t.query(async (ctx) => {
+ return ctx.db
+ .query("flows")
+ .withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
+ .first();
+ });
+ expect(flow).not.toBeNull();
+ expect(flow!.out).not.toBeNull();
+ expect(flow!.out.kind).toBe("failed");
+ expect(flow!.out.error).toContain("Step return value too large");
+ });
+});
diff --git a/example/convex/oversized.ts b/example/convex/oversized.ts
new file mode 100644
index 00000000..03ec9f4c
--- /dev/null
+++ b/example/convex/oversized.ts
@@ -0,0 +1,74 @@
+import { v } from "convex/values";
+import { sendEvent, defineEvent } from "@convex-dev/workflow";
+import { components, internal } from "./_generated/api.js";
+import { internalAction, internalMutation } from "./_generated/server.js";
+import { vWorkflowId } from "@convex-dev/workflow";
+import { vResultValidator } from "@convex-dev/workpool";
+import { workflow } from "./example.js";
+
+// Action that returns a value larger than 800KB.
+export const largeReturnAction = internalAction({
+ args: {},
+ returns: v.string(),
+ handler: async (): Promise => {
+ return "x".repeat(900_000);
+ },
+});
+
+export const largeReturnWorkflow = workflow
+ .define({
+ args: {},
+ })
+ .handler(async (step) => {
+ const result = await step.runAction(
+ internal.oversized.largeReturnAction,
+ {},
+ );
+ return result;
+ });
+
+export const bigEvent = defineEvent({
+ name: "bigEvent",
+ validator: v.string(),
+});
+
+export const eventWorkflow = workflow
+ .define({
+ args: {},
+ })
+ .handler(async (step) => {
+ const result = await step.awaitEvent(bigEvent);
+ return result;
+ });
+
+export const sendBigEvent = internalMutation({
+ args: {
+ workflowId: vWorkflowId,
+ },
+ returns: v.null(),
+ handler: async (ctx, args) => {
+ await sendEvent(ctx, components.workflow, {
+ ...bigEvent,
+ workflowId: args.workflowId,
+ value: "y".repeat(900_000),
+ });
+ },
+});
+
+export const onComplete = internalMutation({
+ args: {
+ workflowId: vWorkflowId,
+ result: vResultValidator,
+ context: v.any(),
+ },
+ returns: v.null(),
+ handler: async (ctx, args) => {
+ const flow = await ctx.db
+ .query("flows")
+ .withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
+ .first();
+ if (!flow) return null;
+ await ctx.db.patch(flow._id, { out: args.result });
+ return null;
+ },
+});
diff --git a/src/component/_generated/api.ts b/src/component/_generated/api.ts
index 9a930507..3e00bc83 100644
--- a/src/component/_generated/api.ts
+++ b/src/component/_generated/api.ts
@@ -12,6 +12,7 @@ import type * as event from "../event.js";
import type * as journal from "../journal.js";
import type * as logging from "../logging.js";
import type * as model from "../model.js";
+import type * as oversizedValues from "../oversizedValues.js";
import type * as pool from "../pool.js";
import type * as utils from "../utils.js";
import type * as workflow from "../workflow.js";
@@ -28,6 +29,7 @@ const fullApi: ApiFromModules<{
journal: typeof journal;
logging: typeof logging;
model: typeof model;
+ oversizedValues: typeof oversizedValues;
pool: typeof pool;
utils: typeof utils;
workflow: typeof workflow;
diff --git a/src/component/event.ts b/src/component/event.ts
index a002316c..f0fef08f 100644
--- a/src/component/event.ts
+++ b/src/component/event.ts
@@ -6,6 +6,7 @@ import { vResultValidator } from "@convex-dev/workpool";
import type { Doc, Id } from "./_generated/dataModel.js";
import { assert } from "convex-helpers";
import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js";
+import { checkForOversizedResult } from "./oversizedValues.js";
export async function awaitEvent(
ctx: MutationCtx,
@@ -40,7 +41,7 @@ export async function awaitEvent(
stepId: entry._id,
},
});
- entry.step.runResult = event.state.result;
+ entry.step.runResult = checkForOversizedResult(event.state.result);
entry.step.inProgress = false;
entry.step.completedAt = Date.now();
break;
@@ -146,7 +147,7 @@ export const send = mutation({
);
assert(step.step.kind === "event", "Step is not an event");
step.step.eventId = event._id;
- step.step.runResult = args.result;
+ step.step.runResult = checkForOversizedResult(args.result);
step.step.inProgress = false;
step.step.completedAt = Date.now();
await ctx.db.replace(step._id, step);
diff --git a/src/component/oversizedValues.ts b/src/component/oversizedValues.ts
new file mode 100644
index 00000000..ee6905ab
--- /dev/null
+++ b/src/component/oversizedValues.ts
@@ -0,0 +1,34 @@
+import { type Value, convexToJson, getConvexSize } from "convex/values";
+import type { RunResult } from "@convex-dev/workpool";
+
+export const MAX_RETURN_VALUE_SIZE = 800 << 10; // 800 KiB
+const PREVIEW_SIZE = 128 << 10; // 128 KB
+
+function truncatedPreview(returnValue: unknown): string {
+ const json = JSON.stringify(convexToJson(returnValue as Value));
+ if (json.length <= PREVIEW_SIZE * 2) {
+ return json;
+ }
+ return json.slice(0, PREVIEW_SIZE) + "..." + json.slice(-PREVIEW_SIZE);
+}
+
+export function checkReturnValueSize(
+ returnValue: Value | undefined,
+): string | null {
+ const size = getConvexSize(returnValue);
+ if (size > MAX_RETURN_VALUE_SIZE) {
+ return `Step return value too large (${size} bytes). Maximum is ${MAX_RETURN_VALUE_SIZE} bytes. Preview: ${truncatedPreview(returnValue)}`;
+ }
+ return null;
+}
+
+export function checkForOversizedResult(result: RunResult): RunResult {
+ if (result.kind !== "success") {
+ return result;
+ }
+ const sizeError = checkReturnValueSize(result.returnValue);
+ if (!sizeError) {
+ return result;
+ }
+ return { kind: "failed", error: sizeError };
+}
diff --git a/src/component/pool.ts b/src/component/pool.ts
index cdaf82fb..87d5a921 100644
--- a/src/component/pool.ts
+++ b/src/component/pool.ts
@@ -22,6 +22,7 @@ import { getDefaultLogger } from "./utils.js";
import { completeHandler } from "./workflow.js";
import type { Doc } from "./_generated/dataModel.js";
import { vWorkflowId, type WorkflowId } from "../types.js";
+import { checkForOversizedResult } from "./oversizedValues.js";
export const workpoolOptions = v.object({
logLevel: v.optional(logLevel),
@@ -147,32 +148,15 @@ async function onCompleteHandler(
}
journalEntry.step.inProgress = false;
journalEntry.step.completedAt = Date.now();
- switch (args.result.kind) {
- case "success":
- journalEntry.step.runResult = {
- kind: "success",
- returnValue: args.result.returnValue,
- };
- break;
- case "failed":
- journalEntry.step.runResult = {
- kind: "failed",
- error: args.result.error,
- };
- break;
- case "canceled":
- journalEntry.step.runResult = {
- kind: "canceled",
- };
- break;
- }
+ const runResult = checkForOversizedResult(args.result);
+ journalEntry.step.runResult = runResult;
await ctx.db.replace(journalEntry._id, journalEntry);
console.debug(`Completed execution of ${stepId}`, journalEntry);
console.event("stepCompleted", {
workflowId,
workflowName: workflow.name,
- status: args.result.kind,
+ status: journalEntry.step.runResult.kind,
stepName: journalEntry.step.name,
stepNumber: journalEntry.stepNumber,
durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt,
@@ -180,7 +164,7 @@ async function onCompleteHandler(
if (workflow.runResult !== undefined) {
if (workflow.runResult.kind !== "canceled") {
console.error(
- `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${args.result.kind}`,
+ `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${journalEntry.step.runResult.kind}`,
);
}
return;