Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type * as e2e from "../e2e.js";
import type * as example from "../example.js";
import type * as inlineTest from "../inlineTest.js";
import type * as nestedWorkflow from "../nestedWorkflow.js";
import type * as oversized from "../oversized.js";
import type * as passingSignals from "../passingSignals.js";
import type * as test_oldSyntax from "../test/oldSyntax.js";
import type * as transcription from "../transcription.js";
Expand All @@ -32,6 +33,7 @@ declare const fullApi: ApiFromModules<{
example: typeof example;
inlineTest: typeof inlineTest;
nestedWorkflow: typeof nestedWorkflow;
oversized: typeof oversized;
passingSignals: typeof passingSignals;
"test/oldSyntax": typeof test_oldSyntax;
transcription: typeof transcription;
Expand Down
111 changes: 111 additions & 0 deletions example/convex/oversized.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/// <reference types="vite/client" />

import { expect, describe, test, vi, beforeEach, afterEach } from "vitest";
import { initConvexTest } from "./setup.test";
import { components, internal } from "./_generated/api";
import { assert } from "convex-helpers";
import { getStatus } from "@convex-dev/workflow";
import { workflow } from "./example";

describe("oversized values", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("large step return value fails workflow and calls onComplete", async () => {
const t = initConvexTest();
const workflowId = await t.mutation(async (ctx) => {
const wfId = await workflow.start(
ctx,
internal.oversized.largeReturnWorkflow,
{},
{
onComplete: internal.oversized.onComplete,
context: {},
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "largeReturn",
out: null,
});
return wfId;
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("failed");
assert(status.type === "failed");
expect(status.error).toContain("Step return value too large");

const flow = await t.query(async (ctx) => {
return ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first();
});
expect(flow).not.toBeNull();
assert(flow);
expect(flow.out).not.toBeNull();
expect(flow.out.kind).toBe("failed");
expect(flow.out.error).toContain("Step return value too large");
});

test("large event value fails workflow and calls onComplete", async () => {
const t = initConvexTest();
const workflowId = await t.mutation(async (ctx) => {
const wfId = await workflow.start(
ctx,
internal.oversized.eventWorkflow,
{},
{
onComplete: internal.oversized.onComplete,
context: {},
startAsync: true,
},
);
await ctx.db.insert("flows", {
workflowId: wfId,
in: "eventWorkflow",
out: null,
});
return wfId;
});
// Let the workflow start and reach the awaitEvent point
await t.finishAllScheduledFunctions(vi.runAllTimers);

const status = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status.type).toBe("inProgress");

// Send the oversized event
await t.mutation(internal.oversized.sendBigEvent, { workflowId });
await t.finishAllScheduledFunctions(vi.runAllTimers);

const status2 = await t.run((ctx) =>
getStatus(ctx, components.workflow, workflowId),
);
expect(status2.type).toBe("failed");
assert(status2.type === "failed");
expect(status2.error).toContain("Step return value too large");
expect(status2.error).toContain("900002 bytes");

const flow = await t.query(async (ctx) => {
return ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", workflowId))
.first();
});
expect(flow).not.toBeNull();
expect(flow!.out).not.toBeNull();
expect(flow!.out.kind).toBe("failed");
expect(flow!.out.error).toContain("Step return value too large");
});
});
74 changes: 74 additions & 0 deletions example/convex/oversized.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { v } from "convex/values";
import { sendEvent, defineEvent } from "@convex-dev/workflow";
import { components, internal } from "./_generated/api.js";
import { internalAction, internalMutation } from "./_generated/server.js";
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";
import { workflow } from "./example.js";

// Action that returns a value larger than 800KB.
export const largeReturnAction = internalAction({
args: {},
returns: v.string(),
handler: async (): Promise<string> => {
return "x".repeat(900_000);
},
});

export const largeReturnWorkflow = workflow
.define({
args: {},
})
.handler(async (step) => {
const result = await step.runAction(
internal.oversized.largeReturnAction,
{},
);
return result;
});

export const bigEvent = defineEvent({
name: "bigEvent",
validator: v.string(),
});

export const eventWorkflow = workflow
.define({
args: {},
})
.handler(async (step) => {
const result = await step.awaitEvent(bigEvent);
return result;
});

export const sendBigEvent = internalMutation({
args: {
workflowId: vWorkflowId,
},
returns: v.null(),
handler: async (ctx, args) => {
await sendEvent(ctx, components.workflow, {
...bigEvent,
workflowId: args.workflowId,
value: "y".repeat(900_000),
});
},
});

export const onComplete = internalMutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(),
},
returns: v.null(),
handler: async (ctx, args) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
.first();
if (!flow) return null;
await ctx.db.patch(flow._id, { out: args.result });
return null;
},
});
2 changes: 2 additions & 0 deletions src/component/_generated/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type * as event from "../event.js";
import type * as journal from "../journal.js";
import type * as logging from "../logging.js";
import type * as model from "../model.js";
import type * as oversizedValues from "../oversizedValues.js";
import type * as pool from "../pool.js";
import type * as utils from "../utils.js";
import type * as workflow from "../workflow.js";
Expand All @@ -28,6 +29,7 @@ const fullApi: ApiFromModules<{
journal: typeof journal;
logging: typeof logging;
model: typeof model;
oversizedValues: typeof oversizedValues;
pool: typeof pool;
utils: typeof utils;
workflow: typeof workflow;
Expand Down
5 changes: 3 additions & 2 deletions src/component/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { vResultValidator } from "@convex-dev/workpool";
import type { Doc, Id } from "./_generated/dataModel.js";
import { assert } from "convex-helpers";
import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js";
import { checkForOversizedResult } from "./oversizedValues.js";

export async function awaitEvent(
ctx: MutationCtx,
Expand Down Expand Up @@ -40,7 +41,7 @@ export async function awaitEvent(
stepId: entry._id,
},
});
entry.step.runResult = event.state.result;
entry.step.runResult = checkForOversizedResult(event.state.result);
entry.step.inProgress = false;
entry.step.completedAt = Date.now();
break;
Expand Down Expand Up @@ -146,7 +147,7 @@ export const send = mutation({
);
assert(step.step.kind === "event", "Step is not an event");
step.step.eventId = event._id;
step.step.runResult = args.result;
step.step.runResult = checkForOversizedResult(args.result);
step.step.inProgress = false;
step.step.completedAt = Date.now();
await ctx.db.replace(step._id, step);
Expand Down
34 changes: 34 additions & 0 deletions src/component/oversizedValues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { type Value, convexToJson, getConvexSize } from "convex/values";
import type { RunResult } from "@convex-dev/workpool";

export const MAX_RETURN_VALUE_SIZE = 800 << 10; // 800 KiB
const PREVIEW_SIZE = 128 << 10; // 128 KB

function truncatedPreview(returnValue: unknown): string {
const json = JSON.stringify(convexToJson(returnValue as Value));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this could fail if the return value contains a BigInt. Maybe consider wrapping the JSON.stringify call in a try/catch?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convexToJson will turn bigint (any Convex Value) into a json-serializable value

if (json.length <= PREVIEW_SIZE * 2) {
return json;
}
return json.slice(0, PREVIEW_SIZE) + "..." + json.slice(-PREVIEW_SIZE);
}

export function checkReturnValueSize(
returnValue: Value | undefined,
): string | null {
const size = getConvexSize(returnValue);
if (size > MAX_RETURN_VALUE_SIZE) {
return `Step return value too large (${size} bytes). Maximum is ${MAX_RETURN_VALUE_SIZE} bytes. Preview: ${truncatedPreview(returnValue)}`;
}
return null;
}
Comment thread
ianmacartney marked this conversation as resolved.

export function checkForOversizedResult(result: RunResult): RunResult {
if (result.kind !== "success") {
return result;
}
const sizeError = checkReturnValueSize(result.returnValue);
if (!sizeError) {
return result;
}
return { kind: "failed", error: sizeError };
}
26 changes: 5 additions & 21 deletions src/component/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { getDefaultLogger } from "./utils.js";
import { completeHandler } from "./workflow.js";
import type { Doc } from "./_generated/dataModel.js";
import { vWorkflowId, type WorkflowId } from "../types.js";
import { checkForOversizedResult } from "./oversizedValues.js";

export const workpoolOptions = v.object({
logLevel: v.optional(logLevel),
Expand Down Expand Up @@ -147,40 +148,23 @@ async function onCompleteHandler(
}
journalEntry.step.inProgress = false;
journalEntry.step.completedAt = Date.now();
switch (args.result.kind) {
case "success":
journalEntry.step.runResult = {
kind: "success",
returnValue: args.result.returnValue,
};
break;
case "failed":
journalEntry.step.runResult = {
kind: "failed",
error: args.result.error,
};
break;
case "canceled":
journalEntry.step.runResult = {
kind: "canceled",
};
break;
}
const runResult = checkForOversizedResult(args.result);
journalEntry.step.runResult = runResult;
await ctx.db.replace(journalEntry._id, journalEntry);
console.debug(`Completed execution of ${stepId}`, journalEntry);

console.event("stepCompleted", {
workflowId,
workflowName: workflow.name,
status: args.result.kind,
status: journalEntry.step.runResult.kind,
stepName: journalEntry.step.name,
stepNumber: journalEntry.stepNumber,
durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt,
});
if (workflow.runResult !== undefined) {
if (workflow.runResult.kind !== "canceled") {
console.error(
`Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${args.result.kind}`,
`Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${journalEntry.step.runResult.kind}`,
);
}
return;
Expand Down
Loading