diff --git a/CHANGELOG.md b/CHANGELOG.md index 996b8d62..63ad9616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.3.5-alpha.0 + +- Uses getConvexSize instead of bespoke sizing logic (requires convex ^1.31.7) +- Improves error messaging for journal entry mismatches + ## 0.3.4 - Adds `list` and `listByName` APIs (credit: dantman) to list workflows with diff --git a/example/convex/userConfirmation.ts b/example/convex/userConfirmation.ts index 14326ccc..2a1d63e8 100644 --- a/example/convex/userConfirmation.ts +++ b/example/convex/userConfirmation.ts @@ -1,6 +1,7 @@ import { defineEvent, vWorkflowId, + WorkflowId, WorkflowManager, } from "@convex-dev/workflow"; import { v } from "convex/values"; @@ -57,3 +58,25 @@ export const chooseProposal = internalMutation({ return true; }, }); + +/** + * Test this from the CLI: + * ```sh + * npx convex run userConfirmation:startConfirmationWorkflow + * ``` + * Copy the ID it returns, then run: + * ```sh + * npx convex run userConfirmation:chooseProposal '{workflowId:"...", choice:1}' + * ``` + * Watch the logs from `npx convex dev` or `npx convex logs` to see progress. + */ +export const startConfirmationWorkflow = internalMutation({ + args: { prompt: v.optional(v.string()) }, + handler: async (ctx, args): Promise => { + return await workflow.start( + ctx, + internal.userConfirmation.confirmationWorkflow, + { prompt: args.prompt ?? "Generate a recipe for me" }, + ); + }, +}); diff --git a/package-lock.json b/package-lock.json index c7fac0e7..4b4fffa3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@convex-dev/workflow", - "version": "0.3.4", + "version": "0.3.5-alpha.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@convex-dev/workflow", - "version": "0.3.4", + "version": "0.3.5-alpha.0", "license": "Apache-2.0", "dependencies": { "async-channel": "^0.2.0" diff --git a/package.json b/package.json index 44e16e24..d47e7a02 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@convex-dev/workflow", - "version": "0.3.4", + "version": "0.3.5-alpha.0", "description": "Convex component for durably executing workflows.", "keywords": [ "convex", @@ -59,7 +59,7 @@ }, "peerDependencies": { "@convex-dev/workpool": "^0.3.0", - "convex": "^1.24.8", + "convex": "^1.31.7", "convex-helpers": "^0.1.99" }, "dependencies": { diff --git a/src/client/step.ts b/src/client/step.ts index 5ff07765..5a12a152 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -12,16 +12,12 @@ import { type GenericDataModel, type GenericMutationCtx, } from "convex/server"; -import { convexToJson, type Value } from "convex/values"; -import { - type JournalEntry, - journalEntrySize, - type Step, - valueSize, -} from "../component/schema.js"; +import { convexToJson, getConvexSize, type Value } from "convex/values"; +import { type JournalEntry, type Step } from "../component/schema.js"; import type { WorkflowComponent } from "./types.js"; import { MAX_JOURNAL_SIZE } from "../shared.js"; import type { EventId, SchedulerOptions } from "../types.js"; +import { pick } from "convex-helpers"; export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } @@ -34,7 +30,7 @@ export type StepRequest = { kind: "function"; functionType: FunctionType; function: FunctionReference; - args: unknown; + args: Record; } | { kind: "event"; @@ -43,7 +39,7 @@ export type StepRequest = { | { kind: "workflow"; function: FunctionReference<"mutation", "internal">; - args: unknown; + args: Record; }; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; @@ -66,7 +62,7 @@ export class StepExecutor { private workpoolOptions: WorkpoolOptions | undefined, ) { this.journalEntrySize = journalEntries.reduce( - (size, entry) => size + journalEntrySize(entry), + (size, entry) => size + getConvexSize(entry), 0, ); @@ -126,14 +122,18 @@ export class StepExecutor { `Assertion failed: not blocked but have in-progress journal entry`, ); } - const stepArgsJson = JSON.stringify(convexToJson(entry.step.args)); - const messageArgsJson = JSON.stringify( - convexToJson(message.target.args as Value), + const stepJson = JSON.stringify( + convexToJson(pick(entry.step, ["name", "args", "kind"])), ); - if (stepArgsJson !== messageArgsJson) { - throw new Error( - `Journal entry mismatch: ${entry.step.args} !== ${message.target.args}`, - ); + const messageJson = JSON.stringify( + convexToJson({ + name: message.name, + args: message.target.args as Value, + kind: message.target.kind, + }), + ); + if (stepJson !== messageJson) { + throw new Error(`Journal entry mismatch: ${stepJson} !== ${messageJson}`); } if (entry.step.runResult === undefined) { throw new Error( @@ -156,11 +156,12 @@ export class StepExecutor { async startSteps(messages: StepRequest[]): Promise { const steps = await Promise.all( messages.map(async (message) => { + const args = message.target.args ?? {}; const commonFields = { inProgress: true, name: message.name, - args: message.target.args, - argsSize: valueSize(message.target.args as Value), + args, + argsSize: getConvexSize(args as Value), runResult: undefined, startedAt: this.now, completedAt: undefined, @@ -203,7 +204,7 @@ export class StepExecutor { }, )) as JournalEntry[]; for (const entry of entries) { - this.journalEntrySize += journalEntrySize(entry); + this.journalEntrySize += getConvexSize(entry); if (this.journalEntrySize > MAX_JOURNAL_SIZE) { throw new Error( journalSizeError(this.journalEntrySize, this.workflowId) + diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts index 4dd07e97..ac369995 100644 --- a/src/client/workflowContext.ts +++ b/src/client/workflowContext.ts @@ -163,7 +163,7 @@ async function runFunction< sender: BaseChannel, functionType: FunctionType, f: F, - args: unknown, + args: Record | undefined, opts?: RunOptions & RetryOption, ): Promise { const { name, retry, ...schedulerOptions } = opts ?? {}; @@ -173,7 +173,7 @@ async function runFunction< kind: "function", functionType, function: f, - args, + args: args ?? {}, }, retry, schedulerOptions, diff --git a/src/component/journal.ts b/src/component/journal.ts index 62aeafdd..d1c71df2 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -1,9 +1,8 @@ -import { v } from "convex/values"; +import { getConvexSize, v } from "convex/values"; import { mutation, query } from "./_generated/server.js"; import { journalDocument, type JournalEntry, - journalEntrySize, step, workflowDocument, } from "./schema.js"; @@ -62,7 +61,7 @@ export const load = query({ .query("steps") .withIndex("workflow", (q) => q.eq("workflowId", workflowId))) { journalEntries.push(entry); - journalSize += journalEntrySize(entry); + journalSize += getConvexSize(entry); if (journalSize > MAX_JOURNAL_SIZE) { return { journalEntries, workflow, logLevel, ok: false }; } diff --git a/src/component/schema.ts b/src/component/schema.ts index 35e8e0e8..5501424e 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -1,36 +1,9 @@ -import { - vResultValidator, - type RunResult, - vWorkIdValidator, -} from "@convex-dev/workpool"; +import { vResultValidator, vWorkIdValidator } from "@convex-dev/workpool"; import { defineSchema, defineTable } from "convex/server"; -import { convexToJson, type Infer, v, type Value } from "convex/values"; +import { type Infer, v } from "convex/values"; import { logLevel } from "./logging.js"; import { deprecated, literals } from "convex-helpers/validators"; -export function valueSize(value: Value): number { - return JSON.stringify(convexToJson(value)).length; -} - -export function resultSize(result: RunResult): number { - let size = 0; - size += result.kind.length; - switch (result.kind) { - case "success": { - size += 8 + valueSize(result.returnValue); - break; - } - case "failed": { - size += result.error.length; - break; - } - case "canceled": { - break; - } - } - return size; -} - export const vOnComplete = v.object({ fnHandle: v.string(), // mutation context: v.optional(v.any()), @@ -92,51 +65,12 @@ export const step = v.union( ); export type Step = Infer; -function stepSize(step: Step): number { - let size = 0; - size += step.name.length; - size += 1; // inProgress - if (step.kind) size += step.kind.length; - switch (step.kind) { - case undefined: - case "function": - size += step.handle.length; - size += step.functionType.length; - size += step.workId?.length ?? 0; - break; - case "workflow": - size += step.handle.length; - size += step.workflowId?.length ?? 0; - break; - case "event": - size += step.eventId?.length ?? 0; - break; - } - size += 8 + step.argsSize; - if (step.runResult) { - size += resultSize(step.runResult); - } - size += 8; // startedAt - size += 8; // completedAt - return size; -} - const journalObject = { workflowId: v.id("workflows"), stepNumber: v.number(), step, }; -export function journalEntrySize(entry: JournalEntry): number { - let size = 0; - size += entry.workflowId.length; - size += 8; // stepNumber - size += stepSize(entry.step); - size += entry._id.length; - size += 8; // _creationTime - return size; -} - export const journalDocument = v.object({ _id: v.string(), _creationTime: v.number(),