diff --git a/src/component/event.ts b/src/component/event.ts index a002316c..779b2d56 100644 --- a/src/component/event.ts +++ b/src/component/event.ts @@ -6,6 +6,7 @@ import { vResultValidator } from "@convex-dev/workpool"; import type { Doc, Id } from "./_generated/dataModel.js"; import { assert } from "convex-helpers"; import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js"; +import { checkForOversizedResult } from "./oversizedValues.js"; export async function awaitEvent( ctx: MutationCtx, @@ -40,7 +41,11 @@ export async function awaitEvent( stepId: entry._id, }, }); - entry.step.runResult = event.state.result; + entry.step.runResult = await checkForOversizedResult( + ctx, + event.state.result, + { stepId: entry._id }, + ); entry.step.inProgress = false; entry.step.completedAt = Date.now(); break; @@ -146,7 +151,11 @@ export const send = mutation({ ); assert(step.step.kind === "event", "Step is not an event"); step.step.eventId = event._id; - step.step.runResult = args.result; + step.step.runResult = await checkForOversizedResult( + ctx, + args.result, + { stepId: step._id }, + ); step.step.inProgress = false; step.step.completedAt = Date.now(); await ctx.db.replace(step._id, step); diff --git a/src/component/oversizedValues.ts b/src/component/oversizedValues.ts new file mode 100644 index 00000000..d2728b06 --- /dev/null +++ b/src/component/oversizedValues.ts @@ -0,0 +1,40 @@ +import { type Value, convexToJson, getConvexSize } from "convex/values"; +import type { RunResult } from "@convex-dev/workpool"; +import type { MutationCtx } from "./_generated/server.js"; +import type { Id } from "./_generated/dataModel.js"; + +export const MAX_RETURN_VALUE_SIZE = 800 << 10; // 800 KiB +const PREVIEW_SIZE = 128 << 10; // 128 KB + +function truncatedPreview(returnValue: unknown): string { + const json = JSON.stringify(convexToJson(returnValue as Value)); + if (json.length <= PREVIEW_SIZE * 2) { + return json; + } + return json.slice(0, PREVIEW_SIZE) + "..." + json.slice(-PREVIEW_SIZE); +} + +export function checkReturnValueSize(returnValue: unknown): string | null { + const size = getConvexSize(returnValue as Value | undefined); + if (size > MAX_RETURN_VALUE_SIZE) { + return `Step return value too large (${size} bytes). Maximum is ${MAX_RETURN_VALUE_SIZE} bytes. Preview: ${truncatedPreview(returnValue)}`; + } + return null; +} + +export async function checkForOversizedResult( + _ctx: MutationCtx, + result: RunResult, + _opts: { + stepId: Id<"steps">; + }, +): Promise { + if (result.kind !== "success") { + return result; + } + const sizeError = checkReturnValueSize(result.returnValue); + if (!sizeError) { + return result; + } + return { kind: "failed", error: sizeError }; +} diff --git a/src/component/pool.ts b/src/component/pool.ts index cdaf82fb..862452cb 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -22,6 +22,7 @@ import { getDefaultLogger } from "./utils.js"; import { completeHandler } from "./workflow.js"; import type { Doc } from "./_generated/dataModel.js"; import { vWorkflowId, type WorkflowId } from "../types.js"; +import { checkForOversizedResult } from "./oversizedValues.js"; export const workpoolOptions = v.object({ logLevel: v.optional(logLevel), @@ -147,25 +148,11 @@ async function onCompleteHandler( } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); - switch (args.result.kind) { - case "success": - journalEntry.step.runResult = { - kind: "success", - returnValue: args.result.returnValue, - }; - break; - case "failed": - journalEntry.step.runResult = { - kind: "failed", - error: args.result.error, - }; - break; - case "canceled": - journalEntry.step.runResult = { - kind: "canceled", - }; - break; - } + journalEntry.step.runResult = await checkForOversizedResult( + ctx, + args.result, + { stepId }, + ); await ctx.db.replace(journalEntry._id, journalEntry); console.debug(`Completed execution of ${stepId}`, journalEntry);