diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index ca3ee05e..5f69a4d4 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -9,7 +9,9 @@ */ import type * as admin from "../admin.js"; +import type * as benchmark from "../benchmark.js"; import type * as example from "../example.js"; +import type * as http from "../http.js"; import type * as nestedWorkflow from "../nestedWorkflow.js"; import type * as passingSignals from "../passingSignals.js"; import type * as transcription from "../transcription.js"; @@ -23,7 +25,9 @@ import type { declare const fullApi: ApiFromModules<{ admin: typeof admin; + benchmark: typeof benchmark; example: typeof example; + http: typeof http; nestedWorkflow: typeof nestedWorkflow; passingSignals: typeof passingSignals; transcription: typeof transcription; @@ -58,4 +62,5 @@ export declare const internal: FilterApi< export declare const components: { workflow: import("@convex-dev/workflow/_generated/component.js").ComponentApi<"workflow">; + workpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"workpool">; }; diff --git a/example/convex/convex.config.ts b/example/convex/convex.config.ts index 3ab9cd8b..67a82927 100644 --- a/example/convex/convex.config.ts +++ b/example/convex/convex.config.ts @@ -1,6 +1,8 @@ import { defineApp } from "convex/server"; import workflow from "@convex-dev/workflow/convex.config"; +import workpool from "@convex-dev/workpool/convex.config"; const app = defineApp(); app.use(workflow); +app.use(workpool); export default app; diff --git a/example/convex/schema.ts b/example/convex/schema.ts index 199b70bf..d105e90e 100644 --- a/example/convex/schema.ts +++ b/example/convex/schema.ts @@ -8,4 +8,9 @@ export default defineSchema({ workflowId: vWorkflowId, out: v.any(), }).index("workflowId", ["workflowId"]), + benchmarkResults: defineTable({ + workflowId: vWorkflowId, + result: v.any(), + completedAt: v.number(), + }).index("workflowId", ["workflowId"]), }); diff --git a/package-lock.json b/package-lock.json index cb8bd93c..d92231dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.3.4", "license": "Apache-2.0", "dependencies": { + "@anthropic-ai/sdk": "^0.78.0", "async-channel": "^0.2.0" }, "devDependencies": { @@ -20,7 +21,7 @@ "@typescript-eslint/eslint-plugin": "8.54.0", "@typescript-eslint/parser": "8.54.0", "chokidar-cli": "3.0.0", - "convex": "1.31.7", + "convex": "^1.32.0", "convex-helpers": "0.1.111", "convex-test": "0.0.41", "cpy-cli": "7.0.0", @@ -82,6 +83,25 @@ "dev": true, "license": "MIT" }, + "node_modules/@anthropic-ai/sdk": { + "version": "0.78.0", + "resolved": "https://registry.npmjs.org/@anthropic-ai/sdk/-/sdk-0.78.0.tgz", + "integrity": "sha512-PzQhR715td/m1UaaN5hHXjYB8Gl2lF9UVhrrGrZeysiF6Rb74Wc9GCB8hzLdzmQtBd1qe89F9OptgB9Za1Ib5w==", + "dependencies": { + "json-schema-to-ts": "^3.1.1" + }, + "bin": { + "anthropic-ai-sdk": "bin/cli" + }, + "peerDependencies": { + "zod": "^3.25.0 || ^4.0.0" + }, + "peerDependenciesMeta": { + "zod": { + "optional": true + } + } + }, "node_modules/@babel/code-frame": { "version": "7.27.1", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.27.1.tgz", @@ -120,7 +140,6 @@ "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -302,6 +321,14 @@ "node": ">=6.0.0" } }, + "node_modules/@babel/runtime": { + "version": "7.28.6", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.28.6.tgz", + "integrity": "sha512-05WQkdpL9COIMz4LjTxGpPNCdlpyimKppYNoJ5Di5EUObifl8t4tuLuUBBZEpoLYOmfvIWrsp9fCl0HoPRVTdA==", + "engines": { + "node": ">=6.9.0" + } + }, "node_modules/@babel/template": { "version": "7.27.2", "resolved": "https://registry.npmjs.org/@babel/template/-/template-7.27.2.tgz", @@ -377,7 +404,6 @@ "integrity": "sha512-NKBGBSIKUG584qrS1tyxVpX/AKJKQw5HgjYEnPLC0QsTw79JrGn+qUr8CXFb955Iy7GUdiiUv1rJ6JBGvaKb6w==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@edge-runtime/primitives": "6.0.0" }, @@ -1217,7 +1243,6 @@ "integrity": "sha512-dKYCMuPO1bmrpuogcjQ8z7ICCH3FP6WmxpwC03yjzGfZhj9fTJg6+bS1+UAplekbN2C+M61UNllGOOoAfGCrdQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@octokit/auth-token": "^4.0.0", "@octokit/graphql": "^7.1.0", @@ -1723,8 +1748,7 @@ "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.1.0.tgz", "integrity": "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==", "dev": true, - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/@types/chai": { "version": "5.2.2", @@ -1770,7 +1794,6 @@ "integrity": "sha512-/Af7O8r1frCVgOz0I62jWUtMohJ0/ZQU/ZoketltOJPZpnb17yoNc9BSoVuV9qlaIXJiPNOpsfq4ByFajSArNQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -1827,7 +1850,6 @@ "integrity": "sha512-BtE0k6cjwjLZoZixN0t5AKP0kSzlGu7FctRXYuPAm//aaiZhmfq1JwdYpYr1brzEspYyFeF+8XF5j2VK6oalrA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.54.0", "@typescript-eslint/types": "8.54.0", @@ -2156,7 +2178,6 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2317,7 +2338,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.8.19", "caniuse-lite": "^1.0.30001751", @@ -2581,15 +2601,14 @@ "license": "MIT" }, "node_modules/convex": { - "version": "1.31.7", - "resolved": "https://registry.npmjs.org/convex/-/convex-1.31.7.tgz", - "integrity": "sha512-PtNMe1mAIOvA8Yz100QTOaIdgt2rIuWqencVXrb4McdhxBHZ8IJ1eXTnrgCC9HydyilGT1pOn+KNqT14mqn9fQ==", + "version": "1.32.0", + "resolved": "https://registry.npmjs.org/convex/-/convex-1.32.0.tgz", + "integrity": "sha512-5FlajdLpW75pdLS+/CgGH5H6yeRuA+ru50AKJEYbJpmyILUS+7fdTvsdTaQ7ZFXMv0gE8mX4S+S3AtJ94k0mfw==", "dev": true, - "license": "Apache-2.0", - "peer": true, "dependencies": { "esbuild": "0.27.0", - "prettier": "^3.0.0" + "prettier": "^3.0.0", + "ws": "8.18.0" }, "bin": { "convex": "bin/main.js" @@ -2621,7 +2640,6 @@ "integrity": "sha512-0O59Ohi8HVc3+KULxSC6JHsw8cQJyc8gZ7OAfNRVX7T5Wy6LhPx3l8veYN9avKg7UiPlO7m1eBiQMHKclIyXyQ==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "convex-helpers": "bin.cjs" }, @@ -3379,7 +3397,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -4105,6 +4122,18 @@ "node": "^18.17.0 || >=20.5.0" } }, + "node_modules/json-schema-to-ts": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/json-schema-to-ts/-/json-schema-to-ts-3.1.1.tgz", + "integrity": "sha512-+DWg8jCJG2TEnpy7kOm/7/AxaYoaRbjVB4LFZLySZlWn8exGs3A4OLJR966cVvU26N7X9TWxl+Jsw7dzAqKT6g==", + "dependencies": { + "@babel/runtime": "^7.18.3", + "ts-algebra": "^2.0.0" + }, + "engines": { + "node": ">=16" + } + }, "node_modules/json-schema-traverse": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", @@ -5456,7 +5485,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -5499,6 +5527,11 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/ts-algebra": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ts-algebra/-/ts-algebra-2.0.0.tgz", + "integrity": "sha512-FPAhNPFMrkwz76P7cdjdmiShwMynZYN6SgOujD1urY4oNm80Ou9oMdmbR45LotcKOXoy7wSmHkRFE6Mxbrhefw==" + }, "node_modules/ts-api-utils": { "version": "2.4.0", "resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-2.4.0.tgz", @@ -5563,7 +5596,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -5717,7 +5749,6 @@ "integrity": "sha512-cZn6NDFE7wdTpINgs++ZJ4N49W2vRp8LCKrn3Ob1kYNtOo21vfDoaV5GzBfLU4MovSAB8uNRm4jgzVQZ+mBzPQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", @@ -5808,7 +5839,6 @@ "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -6028,6 +6058,27 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", "dev": true }, + "node_modules/ws": { + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", @@ -6154,9 +6205,8 @@ "version": "3.25.76", "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", - "dev": true, + "devOptional": true, "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index cfc5a524..15ecea6b 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "convex-helpers": "^0.1.99" }, "dependencies": { + "@anthropic-ai/sdk": "^0.78.0", "async-channel": "^0.2.0" }, "devDependencies": { @@ -74,7 +75,7 @@ "@typescript-eslint/eslint-plugin": "8.54.0", "@typescript-eslint/parser": "8.54.0", "chokidar-cli": "3.0.0", - "convex": "1.31.7", + "convex": "^1.32.0", "convex-helpers": "0.1.111", "convex-test": "0.0.41", "cpy-cli": "7.0.0", diff --git a/src/client/index.ts b/src/client/index.ts index f957167d..6fbbdc12 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -6,14 +6,20 @@ import type { import { parse } from "convex-helpers/validators"; import { createFunctionHandle, + internalActionGeneric, + + type DefaultFunctionArgs, type FunctionArgs, + type FunctionHandle, type FunctionReference, type FunctionVisibility, + type GenericActionCtx, type GenericDataModel, type GenericMutationCtx, type GenericQueryCtx, type PaginationOptions, type PaginationResult, + type RegisteredAction, type RegisteredMutation, type ReturnValueForOptionalValidator, } from "convex/server"; @@ -46,6 +52,20 @@ export { } from "../types.js"; export type { RunOptions, WorkflowCtx } from "./workflowContext.js"; +/** + * Throw this from an executor action handler to signal a rate limit. + * The executor will wait `retryAfterMs` before retrying the task, + * without counting it as a failure attempt. + */ +export class WorkflowRateLimitError extends Error { + public readonly retryAfterMs: number; + constructor(retryAfterMs: number) { + super(`Rate limited, retry after ${retryAfterMs}ms`); + this.name = "WorkflowRateLimitError"; + this.retryAfterMs = retryAfterMs; + } +} + export type CallbackOptions = { /** * A mutation to run after the function succeeds, fails, or is canceled. @@ -97,12 +117,583 @@ export type WorkflowStatus = | { type: "failed"; error: string }; export class WorkflowManager { + private batchActionNames = new Set(); + private executorShards?: number; + private executorActionHandlers = new Map< + string, + (ctx: GenericActionCtx, args: any) => Promise + >(); + private executorRef: FunctionReference<"action", "internal"> | null = null; + constructor( public component: WorkflowComponent, public options?: { - workpoolOptions: WorkpoolOptions; + workpoolOptions?: WorkpoolOptions; + executorShards?: number; }, - ) {} + ) { + this.executorShards = options?.executorShards; + } + + /** + * Register an action to run inline in batch executors. + * The action handler runs inside long-lived executor actions (no separate + * action invocation, no 512 concurrent action limit). + * + * @param name - A unique name for the batch action. + * @param opts - The action definition (args validator and handler). + * @returns A registered action to export from your Convex module. + */ + action< + Args extends DefaultFunctionArgs = any, + Returns = any, + >( + name: string, + opts: { + args: Record>; + handler: ( + ctx: GenericActionCtx, + args: Args, + ) => Promise; + }, + ): RegisteredAction<"internal", Args, Returns> { + if (!this.executorShards) { + throw new Error( + "WorkflowManager.action() requires `executorShards` in the constructor", + ); + } + // Store handler for executor to call, register name for + // batch action detection in step.ts, return a dummy action placeholder. + this.executorActionHandlers.set(name, opts.handler); + this.batchActionNames.add(name); + // Return a no-op action placeholder — the function ref must exist for + // safeFunctionName but is never invoked directly. + return internalActionGeneric({ + handler: async () => { + throw new Error( + `${name} should not be called directly — it runs inside executors`, + ); + }, + }) as any; + } + + /** + * Create a long-running executor action for the sharded task queue. + * Each executor claims tasks from a single shard, processes them + * concurrently, and chains to the next task atomically. + * + * Export the return value from your Convex module, then call + * `setExecutorRef()` with its reference. + */ + executor() { + if (!this.executorShards) { + throw new Error( + "WorkflowManager.executor() requires `executorShards` in the constructor", + ); + } + const handlers = this.executorActionHandlers; + const component = this.component; + const numShards = this.executorShards; + const getExecutorRef = () => this.executorRef; + + const CLAIM_LIMIT = 1500; + const MAX_CONCURRENCY = 500; + const POLL_BACKOFF_MS = 500; + const POLL_BACKOFF_ACTIVE_MS = 100; + const RESCHEDULE_MS = 5 * 60 * 1000; // 5 minutes — leaves 5 min for drain+handoff before 10-min kill + const FLUSH_INTERVAL_MS = 100; + const FLUSH_BATCH_SIZE = 50; + const MAX_FLUSH_RETRIES = 5; + const HANDOFF_POLL_MS = 500; + const HANDOFF_SUCCESSOR_TIMEOUT_MS = 30_000; + const HANDOFF_PREDECESSOR_TIMEOUT_MS = 30_000; + + return internalActionGeneric({ + handler: async ( + ctx: GenericActionCtx, + args: { shard: number; epoch?: number }, + ) => { + const { shard, epoch } = args; + const startTime = Date.now(); + // Stagger restarts by shard index so at most 1 shard hands off at a time. + const JITTER_WINDOW_MS = 60_000; + const shardSlotMs = Math.floor((shard / numShards) * JITTER_WINDOW_MS); + const perturbMs = Math.floor( + Math.random() * Math.floor(JITTER_WINDOW_MS / numShards), + ); + const jitterMs = shardSlotMs + perturbMs; + + const checkEpoch = async (): Promise => { + const currentEpoch: number = await ctx.runQuery( + component.taskQueue.getExecutorEpoch, + {}, + ); + if (currentEpoch === 0) return true; + return epoch === currentEpoch; + }; + + type Task = { + functionType: "query" | "mutation" | "action"; + handle: string; + args: any; + stepId: string; + workflowId: string; + generationNumber: number; + retry?: { + maxAttempts: number; + initialBackoffMs: number; + base: number; + }; + }; + + type PendingResult = { + stepId: string; + result: RunResult; + generationNumber: number; + executorFinishedAt: number; + }; + + // --- Result batching with serialized replay --- + const pendingResults: PendingResult[] = []; + const inFlightStepIds = new Set(); + let flushing = false; + + // Rescue queue: replay candidates that failed to commit. + // The flush loop retries these on every tick so workflows + // don't get permanently stranded by transient OCC failures. + type ReplayCandidate = { + workflowId: string; + generationNumber: number; + workflowHandle: string; + }; + const replayRescueQueue: ReplayCandidate[] = []; + + // Flush pending results, then replay candidates in a single batch. + // Serializing flush → replay keeps inter-step latency tight: the + // next step is enqueued immediately after recording the result. + const flush = async () => { + if (flushing) return; + flushing = true; + try { + while (pendingResults.length > 0) { + const batch = pendingResults.splice(0, FLUSH_BATCH_SIZE); + let candidates: ReplayCandidate[]; + const flushCalledAt = Date.now(); + try { + candidates = await ctx.runMutation( + component.taskQueue.recordResultBatch, + { + items: batch.map((r) => ({ + stepId: r.stepId, + result: r.result, + generationNumber: r.generationNumber, + executorFinishedAt: r.executorFinishedAt, + flushCalledAt, + })), + replayInline: true, + }, + ); + } catch { + pendingResults.push(...batch); + return; + } + for (const item of batch) { + inFlightStepIds.delete(item.stepId); + } + // Replay all candidates in a single batched mutation + if (candidates.length > 0) { + try { + await ctx.runMutation( + component.taskQueue.replayBatchIfReady, + { candidates }, + ); + } catch { + // Retry once after brief pause + await new Promise((r) => setTimeout(r, 100)); + try { + await ctx.runMutation( + component.taskQueue.replayBatchIfReady, + { candidates }, + ); + } catch { + // Move to rescue queue — flush loop will keep retrying + // so these workflows don't get permanently stranded. + replayRescueQueue.push(...candidates); + } + } + } + } + // Drain rescue queue: retry previously failed replays. + if (replayRescueQueue.length > 0) { + const rescue = replayRescueQueue.splice(0, replayRescueQueue.length); + try { + await ctx.runMutation( + component.taskQueue.replayBatchIfReady, + { candidates: rescue }, + ); + } catch { + // Still failing — put them back for next flush tick. + replayRescueQueue.push(...rescue); + } + } + } finally { + flushing = false; + } + }; + + let flushLoopRunning = true; + const flushLoop = (async () => { + while (flushLoopRunning) { + await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS)); + if (pendingResults.length > 0) { + await flush(); + } + } + })(); + + // --- Bounded-concurrency task processor --- + let activeCount = 0; + let resolveIdle: (() => void) | null = null; + const taskBuffer: Task[] = []; + + // Per-step-type rate-limit gates: keyed by action handle so + // different APIs (e.g. OpenAI vs Anthropic) don't gate each other. + // Jitter spreads the thundering herd over a window after the deadline. + const rateLimitGates = new Map(); + const RATE_LIMIT_JITTER_MS = 10000; + const waitForRateLimit = async (handle: string) => { + const until = rateLimitGates.get(handle) ?? 0; + if (until <= Date.now()) return; // no active gate + while ((rateLimitGates.get(handle) ?? 0) > Date.now()) { + const remaining = (rateLimitGates.get(handle) ?? 0) - Date.now(); + await new Promise((r) => setTimeout(r, remaining)); + } + // Jitter so tasks don't all wake up at the exact same instant + await new Promise((r) => setTimeout(r, Math.random() * RATE_LIMIT_JITTER_MS)); + }; + const setRateLimitGate = (handle: string, retryAfterMs: number) => { + const deadline = Date.now() + retryAfterMs; + const current = rateLimitGates.get(handle) ?? 0; + if (deadline > current) { + rateLimitGates.set(handle, deadline); + } + }; + + const processTask = async (task: Task): Promise => { + const maxAttempts = Math.max(task.retry?.maxAttempts ?? 1, 1); + const initialBackoffMs = task.retry?.initialBackoffMs ?? 125; + const base = task.retry?.base ?? 2; + + let result: RunResult | undefined; + let attempt = 0; + let rateLimitRetries = 0; + const MAX_RATE_LIMIT_RETRIES = 20; + while (attempt < maxAttempts) { + // Wait for any active rate-limit gate for this step type + await waitForRateLimit(task.handle); + try { + let returnValue: unknown; + switch (task.functionType) { + case "query": + returnValue = await ctx.runQuery( + task.handle as FunctionHandle<"query">, + task.args, + ); + break; + case "mutation": + returnValue = await ctx.runMutation( + task.handle as FunctionHandle<"mutation">, + task.args, + ); + break; + case "action": { + const handler = handlers.get(task.handle); + if (!handler) { + result = { kind: "failed" as const, error: `Unknown action: ${task.handle}` }; + break; + } + returnValue = await handler(ctx, task.args); + break; + } + } + if (result?.kind === "failed") break; // unknown action — no retry + result = { kind: "success", returnValue: returnValue ?? null }; + break; + } catch (e) { + if (e instanceof WorkflowRateLimitError) { + // Set per-step-type gate — only tasks using the same + // action handle wait; other step types keep running. + setRateLimitGate(task.handle, e.retryAfterMs); + rateLimitRetries++; + if (rateLimitRetries >= MAX_RATE_LIMIT_RETRIES) { + // Give up after too many rate-limit retries to avoid + // holding the executor indefinitely. + const error = `Rate limited ${rateLimitRetries} times, giving up`; + result = { kind: "failed", error }; + break; + } + continue; // retry without consuming an attempt + } + const error = e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + result = { kind: "failed", error }; + attempt++; + if (attempt < maxAttempts) { + const backoff = initialBackoffMs * Math.pow(base, attempt - 1); + await new Promise((r) => setTimeout(r, backoff)); + } + } + } + pendingResults.push({ stepId: task.stepId, result: result!, generationNumber: task.generationNumber, executorFinishedAt: Date.now() }); + }; + + const feedTask = (task: Task) => { + if (activeCount < MAX_CONCURRENCY) { + activeCount++; + runTask(task); + } else { + taskBuffer.push(task); + } + }; + + const runTask = (task: Task) => { + processTask(task) + .catch(() => {}) + .finally(() => { + const next = taskBuffer.shift(); + if (next) { + runTask(next); + } else { + activeCount--; + if (activeCount === 0 && resolveIdle) { + resolveIdle(); + } + } + }); + }; + + // Wait for all tasks to complete, then reliably flush all results. + const waitUntilIdle = async () => { + await new Promise((resolve) => { + if (activeCount === 0) resolve(); + else resolveIdle = resolve; + }); + // Drain all pending results with retries. + let retries = 0; + while (pendingResults.length > 0) { + await flush(); + if (pendingResults.length > 0) { + retries++; + if (retries >= MAX_FLUSH_RETRIES) { + // Give up on remaining items — tasks stay in queue, + // next executor run will re-process them. + pendingResults.length = 0; + break; + } + await new Promise((r) => setTimeout(r, 200)); + } + } + // Drain rescue queue before shutting down so workflows + // aren't stranded by transient replay failures. + let rescueRetries = 0; + while (replayRescueQueue.length > 0 && rescueRetries < MAX_FLUSH_RETRIES) { + await flush(); + if (replayRescueQueue.length > 0) { + rescueRetries++; + await new Promise((r) => setTimeout(r, 500)); + } + } + }; + + // --- Non-blocking handshake --- + // Move handoff cleanup into a background promise so the main + // claiming loop starts immediately. Brief overlap of two executors + // on the same shard is safe (claimTasks is read-only, + // recordResultBatch checks generationNumber + inProgress). + type Handoff = { ready: boolean; yielded: boolean } | null; + const handoffDoc: Handoff = await ctx.runQuery( + component.taskQueue.getHandoff, + { shard }, + ); + const handoffCleanup: Promise = (async () => { + if (!handoffDoc || handoffDoc.yielded) { + if (handoffDoc) { + await ctx.runMutation(component.taskQueue.handoff, { + shard, + action: "clear", + }); + } + return; + } + await ctx.runMutation(component.taskQueue.handoff, { + shard, + action: "ready", + }); + // Poll for yield in background. + const predecessorDeadline = + Date.now() + HANDOFF_PREDECESSOR_TIMEOUT_MS; + while (Date.now() < predecessorDeadline) { + await new Promise((r) => setTimeout(r, HANDOFF_POLL_MS)); + const state: Handoff = await ctx.runQuery( + component.taskQueue.getHandoff, + { shard }, + ); + if (!state || state.yielded) break; + } + await ctx.runMutation(component.taskQueue.handoff, { + shard, + action: "clear", + }); + })().catch(() => {}); + // Main loop starts IMMEDIATELY — zero gap. + + // --- Main loop --- + try { + while (true) { + if (Date.now() - startTime > RESCHEDULE_MS + jitterMs) { + // --- Old executor handoff --- + // Create handoff doc, schedule successor, keep claiming + // until successor is ready, then yield and drain. + if (await checkEpoch()) { + await ctx.runMutation(component.taskQueue.handoff, { + shard, + action: "init", + }); + const ref = getExecutorRef(); + if (ref) { + await ctx.scheduler.runAfter(0, ref, { shard, epoch }); + } + // Handoff claiming loop: keep processing tasks until + // the successor signals ready (or timeout). + const successorDeadline = + Date.now() + HANDOFF_SUCCESSOR_TIMEOUT_MS; + while (Date.now() < successorDeadline) { + if (!(await checkEpoch())) break; + const state: Handoff = await ctx.runQuery( + component.taskQueue.getHandoff, + { shard }, + ); + if (state?.ready) { + await ctx.runMutation(component.taskQueue.handoff, { + shard, + action: "yielded", + }); + break; + } + // Continue claiming tasks (streaming) while waiting. + const tasks: Task[] = await ctx.runQuery( + component.taskQueue.claimTasks, + { shard, limit: CLAIM_LIMIT }, + ); + const newTasks = tasks.filter(t => !inFlightStepIds.has(t.stepId)); + if (newTasks.length > 0) { + for (const task of newTasks) { + inFlightStepIds.add(task.stepId); + feedTask(task); + } + } else { + await new Promise((r) => setTimeout(r, POLL_BACKOFF_MS)); + } + } + + } + // Drain: flush results may trigger replays that create new tasks. + // Loop until the shard is fully empty. Small delay lets + // replay sub-mutations commit before we check. + for (let drain = 0; drain < 10; drain++) { + await waitUntilIdle(); + await new Promise((r) => setTimeout(r, 500)); + const remaining: Task[] = await ctx.runQuery( + component.taskQueue.claimTasks, + { shard, limit: CLAIM_LIMIT }, + ); + const newRemaining = remaining.filter(t => !inFlightStepIds.has(t.stepId)); + if (newRemaining.length === 0) break; + for (const task of newRemaining) { + inFlightStepIds.add(task.stepId); + feedTask(task); + } + } + await waitUntilIdle(); + return null; + } + + // If a newer startExecutors call happened, stop claiming + // new work — but drain any tasks already in the shard first + // so they aren't stranded without an executor. + if (!(await checkEpoch())) { + // Drain all remaining tasks including any created by replays. + // Small delay lets replay sub-mutations commit before we check. + for (let drain = 0; drain < 10; drain++) { + await waitUntilIdle(); + await new Promise((r) => setTimeout(r, 500)); + const drainTasks: Task[] = await ctx.runQuery( + component.taskQueue.claimTasks, + { shard, limit: CLAIM_LIMIT }, + ); + const newDrainTasks = drainTasks.filter(t => !inFlightStepIds.has(t.stepId)); + if (newDrainTasks.length === 0) break; + for (const task of newDrainTasks) { + inFlightStepIds.add(task.stepId); + feedTask(task); + } + } + await waitUntilIdle(); + return null; + } + + if (activeCount + taskBuffer.length < MAX_CONCURRENCY) { + const tasks: Task[] = await ctx.runQuery( + component.taskQueue.claimTasks, + { shard, limit: CLAIM_LIMIT }, + ); + const newTasks = tasks.filter(t => !inFlightStepIds.has(t.stepId)); + if (newTasks.length > 0) { + for (const task of newTasks) { + inFlightStepIds.add(task.stepId); + feedTask(task); + } + continue; // immediately claim more + } + } + + // Sleep: shorter when tasks are active, longer when truly idle. + const sleepMs = activeCount > 0 ? POLL_BACKOFF_ACTIVE_MS : POLL_BACKOFF_MS; + await new Promise((r) => setTimeout(r, sleepMs)); + } + } finally { + flushLoopRunning = false; + await flushLoop; + // Last-resort safety net: if the rescue queue still has items + // after all flush retries, durably schedule individual replay + // mutations so workflows aren't permanently stranded when the + // executor action exits. + for (const candidate of replayRescueQueue) { + try { + await ctx.scheduler.runAfter( + 0, + component.taskQueue.replayIfReady, + candidate, + ); + } catch { + // Best effort — scheduler may reject if at limit. + } + } + replayRescueQueue.length = 0; + await handoffCleanup; + } + }, + }) as any; + } + + /** + * Store the FunctionReference for the executor action, used for + * self-rescheduling before the 10-minute action timeout. + * + * @param ref - The function reference for the exported executor action. + */ + setExecutorRef(ref: FunctionReference<"action", "internal">) { + this.executorRef = ref; + } /** * Define a new workflow. @@ -129,6 +720,7 @@ export class WorkflowManager { this.component, workflow, this.options?.workpoolOptions, + this.batchActionNames.size > 0 ? this.batchActionNames : undefined, ); } @@ -175,6 +767,7 @@ export class WorkflowManager { maxParallelism: this.options?.workpoolOptions?.maxParallelism, onComplete, startAsync: options?.startAsync ?? options?.validateAsync, + executorShards: this.executorShards, }); return workflowId as unknown as WorkflowId; } @@ -219,6 +812,25 @@ export class WorkflowManager { }); } + /** + * Launch executor actions for all shards. Call this once before starting + * executor-mode workflows, or from a mutation that kicks off a benchmark. + * + * @param ctx - The Convex context (mutation). + */ + async startExecutors(ctx: RunMutationCtx) { + if (!this.executorShards || !this.executorRef) { + throw new Error( + "startExecutors requires executorShards and setExecutorRef", + ); + } + const executorHandle = await createFunctionHandle(this.executorRef); + await ctx.runMutation(this.component.taskQueue.startExecutors, { + executorHandle, + numShards: this.executorShards, + }); + } + /** * List workflows, including their name, args, return value etc. * @@ -358,7 +970,6 @@ export class WorkflowManager { result, name: args.name, workflowId: args.workflowId, - workpoolOptions: this.options?.workpoolOptions, })) as EventId; } diff --git a/src/client/step.ts b/src/client/step.ts index 5ff07765..0a2283d0 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -19,6 +19,7 @@ import { type Step, valueSize, } from "../component/schema.js"; +import { safeFunctionName } from "./safeFunctionName.js"; import type { WorkflowComponent } from "./types.js"; import { MAX_JOURNAL_SIZE } from "../shared.js"; import type { EventId, SchedulerOptions } from "../types.js"; @@ -64,6 +65,7 @@ export class StepExecutor { private receiver: BaseChannel, private now: number, private workpoolOptions: WorkpoolOptions | undefined, + private batchActionNames?: Set, ) { this.journalEntrySize = journalEntries.reduce( (size, entry) => size + journalEntrySize(entry), @@ -186,10 +188,27 @@ export class StepExecutor { ...commonFields, args: target.args, }; + // Detect batch actions by checking if the function name's base matches + // a registered batch action name. + let batchActionName: string | undefined; + if ( + this.batchActionNames && + target.kind === "function" && + target.functionType === "action" + ) { + const fnName = safeFunctionName(target.function); + const parts = fnName.split(/[:/]/); + const baseName = parts[parts.length - 1]; + if (this.batchActionNames.has(baseName)) { + batchActionName = baseName; + } + } + return { retry: message.retry, schedulerOptions: message.schedulerOptions, step, + batchActionName, }; }), ); diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 5afb2a6b..a11a8d33 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -46,6 +46,7 @@ export function workflowMutation( component: WorkflowComponent, registered: WorkflowDefinition, defaultWorkpoolOptions?: WorkpoolOptions, + batchActionNames?: Set, ): RegisteredMutation< "internal", { @@ -127,6 +128,7 @@ export function workflowMutation( channel, Date.now(), workpoolOptions, + batchActionNames, ); setupEnvironment(executor.getGenerationState.bind(executor), workflowId); diff --git a/src/component/_generated/api.ts b/src/component/_generated/api.ts index 9a930507..20f2a5d8 100644 --- a/src/component/_generated/api.ts +++ b/src/component/_generated/api.ts @@ -8,11 +8,13 @@ * @module */ +import type * as coordinator from "../coordinator.js"; import type * as event from "../event.js"; import type * as journal from "../journal.js"; import type * as logging from "../logging.js"; import type * as model from "../model.js"; import type * as pool from "../pool.js"; +import type * as taskQueue from "../taskQueue.js"; import type * as utils from "../utils.js"; import type * as workflow from "../workflow.js"; @@ -24,11 +26,13 @@ import type { import { anyApi, componentsGeneric } from "convex/server"; const fullApi: ApiFromModules<{ + coordinator: typeof coordinator; event: typeof event; journal: typeof journal; logging: typeof logging; model: typeof model; pool: typeof pool; + taskQueue: typeof taskQueue; utils: typeof utils; workflow: typeof workflow; }> = anyApi as any; diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 83ad1ccf..6250236f 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -42,16 +42,6 @@ export type ComponentApi = | { error: string; kind: "failed" } | { kind: "canceled" }; workflowId?: string; - workpoolOptions?: { - defaultRetryBehavior?: { - base: number; - initialBackoffMs: number; - maxAttempts: number; - }; - logLevel?: "DEBUG" | "TRACE" | "INFO" | "REPORT" | "WARN" | "ERROR"; - maxParallelism?: number; - retryActionsByDefault?: boolean; - }; }, string, Name @@ -72,6 +62,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; functionType: "query" | "mutation" | "action"; handle: string; inProgress: boolean; @@ -88,6 +79,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; handle: string; inProgress: boolean; kind: "workflow"; @@ -104,6 +96,7 @@ export type ComponentApi = argsSize: number; completedAt?: number; eventId?: string; + executorFinishedAt?: number; inProgress: boolean; kind: "event"; name: string; @@ -122,10 +115,12 @@ export type ComponentApi = _creationTime: number; _id: string; args: any; + executorShards?: number; generationNumber: number; logLevel?: any; name?: string; onComplete?: { context?: any; fnHandle: string }; + readyToRun?: boolean; runResult?: | { kind: "success"; returnValue: any } | { error: string; kind: "failed" } @@ -143,6 +138,7 @@ export type ComponentApi = { generationNumber: number; steps: Array<{ + batchActionName?: string; retry?: | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; @@ -152,6 +148,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; functionType: "query" | "mutation" | "action"; handle: string; inProgress: boolean; @@ -168,6 +165,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; handle: string; inProgress: boolean; kind: "workflow"; @@ -184,6 +182,7 @@ export type ComponentApi = argsSize: number; completedAt?: number; eventId?: string; + executorFinishedAt?: number; inProgress: boolean; kind: "event"; name: string; @@ -214,6 +213,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; functionType: "query" | "mutation" | "action"; handle: string; inProgress: boolean; @@ -230,6 +230,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; handle: string; inProgress: boolean; kind: "workflow"; @@ -246,6 +247,7 @@ export type ComponentApi = argsSize: number; completedAt?: number; eventId?: string; + executorFinishedAt?: number; inProgress: boolean; kind: "event"; name: string; @@ -261,6 +263,127 @@ export type ComponentApi = Name >; }; + taskQueue: { + claimTasks: FunctionReference< + "query", + "internal", + { limit: number; shard: number }, + Array<{ + args: any; + functionType: "query" | "mutation" | "action"; + generationNumber: number; + handle: string; + retry?: { + base: number; + initialBackoffMs: number; + maxAttempts: number; + }; + stepId: string; + workflowId: string; + }>, + Name + >; + diagnose: FunctionReference< + "query", + "internal", + { numShards?: number }, + { + executorEpoch: number; + taskQueueCounts: Array<{ count: number; shard: number }>; + totalTasks: number; + }, + Name + >; + getExecutorEpoch: FunctionReference< + "query", + "internal", + {}, + number, + Name + >; + getHandoff: FunctionReference< + "query", + "internal", + { shard: number }, + { ready: boolean; yielded: boolean } | null, + Name + >; + handoff: FunctionReference< + "mutation", + "internal", + { action: "init" | "ready" | "yielded" | "clear"; shard: number }, + null, + Name + >; + recordResult: FunctionReference< + "mutation", + "internal", + { + generationNumber: number; + result: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + stepId: string; + }, + null, + Name + >; + recordResultBatch: FunctionReference< + "mutation", + "internal", + { + items: Array<{ + executorFinishedAt?: number; + flushCalledAt?: number; + generationNumber: number; + result: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + stepId: string; + }>; + replayInline?: boolean; + }, + Array<{ + generationNumber: number; + workflowHandle: string; + workflowId: string; + }>, + Name + >; + replayBatchIfReady: FunctionReference< + "mutation", + "internal", + { + candidates: Array<{ + generationNumber: number; + workflowHandle: string; + workflowId: string; + }>; + }, + null, + Name + >; + replayIfReady: FunctionReference< + "mutation", + "internal", + { + generationNumber: number; + workflowHandle: string; + workflowId: string; + }, + null, + Name + >; + startExecutors: FunctionReference< + "mutation", + "internal", + { executorHandle: string; numShards: number }, + number, + Name + >; + }; workflow: { cancel: FunctionReference< "mutation", @@ -290,10 +413,42 @@ export type ComponentApi = null, Name >; + countByName: FunctionReference< + "query", + "internal", + { createdAfter?: number; name: string }, + { completed: number; failed: number; running: number; total: number }, + Name + >; + countByNamePage: FunctionReference< + "query", + "internal", + { + createdAfter?: number; + name: string; + paginationOpts: { + cursor: string | null; + endCursor?: string | null; + id?: number; + maximumBytesRead?: number; + maximumRowsRead?: number; + numItems: number; + }; + }, + { + completed: number; + continueCursor: string; + failed: number; + isDone: boolean; + running: number; + }, + Name + >; create: FunctionReference< "mutation", "internal", { + executorShards?: number; maxParallelism?: number; onComplete?: { context?: any; fnHandle: string }; startAsync?: boolean; @@ -304,6 +459,13 @@ export type ComponentApi = string, Name >; + creationTimeBuckets: FunctionReference< + "query", + "internal", + { bucketMs: number; createdAfter: number; name: string }, + Array<{ count: number; offsetSec: number }>, + Name + >; getStatus: FunctionReference< "query", "internal", @@ -317,6 +479,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; functionType: "query" | "mutation" | "action"; handle: string; inProgress: boolean; @@ -333,6 +496,7 @@ export type ComponentApi = args: any; argsSize: number; completedAt?: number; + executorFinishedAt?: number; handle: string; inProgress: boolean; kind: "workflow"; @@ -349,6 +513,7 @@ export type ComponentApi = argsSize: number; completedAt?: number; eventId?: string; + executorFinishedAt?: number; inProgress: boolean; kind: "event"; name: string; @@ -366,10 +531,12 @@ export type ComponentApi = _creationTime: number; _id: string; args: any; + executorShards?: number; generationNumber: number; logLevel?: any; name?: string; onComplete?: { context?: any; fnHandle: string }; + readyToRun?: boolean; runResult?: | { kind: "success"; returnValue: any } | { error: string; kind: "failed" } @@ -486,5 +653,41 @@ export type ComponentApi = }, Name >; + timelinePage: FunctionReference< + "query", + "internal", + { + createdAfter?: number; + name: string; + paginationOpts: { + cursor: string | null; + endCursor?: string | null; + id?: number; + maximumBytesRead?: number; + maximumRowsRead?: number; + numItems: number; + }; + }, + { + continueCursor: string; + isDone: boolean; + page: Array<{ + createdAt: number; + id: string; + runResult?: "success" | "failed" | "canceled"; + steps: Array<{ + completedAt?: number; + executionStartedAt?: number; + executorFinishedAt?: number; + name: string; + startedAt: number; + stepNumber: number; + }>; + }>; + pageStatus?: "SplitRecommended" | "SplitRequired" | null; + splitCursor?: string | null; + }, + Name + >; }; }; diff --git a/src/component/_generated/server.ts b/src/component/_generated/server.ts index 24994e4e..739b02f7 100644 --- a/src/component/_generated/server.ts +++ b/src/component/_generated/server.ts @@ -107,11 +107,6 @@ export const internalAction: ActionBuilder = */ export const httpAction: HttpActionBuilder = httpActionGeneric; -type GenericCtx = - | GenericActionCtx - | GenericMutationCtx - | GenericQueryCtx; - /** * A set of services for use within Convex query functions. * diff --git a/src/component/coordinator.ts b/src/component/coordinator.ts new file mode 100644 index 00000000..396bb2ca --- /dev/null +++ b/src/component/coordinator.ts @@ -0,0 +1,104 @@ +import type { FunctionHandle } from "convex/server"; +import { v } from "convex/values"; +import { internal } from "./_generated/api.js"; +import { internalMutation, type MutationCtx } from "./_generated/server.js"; +import { createLogger, DEFAULT_LOG_LEVEL } from "./logging.js"; + +const COORDINATOR_READ_BATCH = 1000; +const WORKFLOW_BATCH_SIZE = 50; + +export async function ensureCoordinatorRunning(ctx: MutationCtx) { + const state = await ctx.db.query("coordinatorState").first(); + if (state?.scheduled) { + return; + } + if (state) { + await ctx.db.patch(state._id, { scheduled: true }); + } else { + await ctx.db.insert("coordinatorState", { scheduled: true }); + } + await ctx.scheduler.runAfter(0, internal.coordinator.coordinator); +} + +export const coordinator = internalMutation({ + args: {}, + returns: v.null(), + handler: async (ctx) => { + const ready = await ctx.db + .query("workflows") + .withIndex("readyToRun", (q) => q.eq("readyToRun", true)) + .take(COORDINATOR_READ_BATCH); + + if (ready.length === 0) { + const state = await ctx.db.query("coordinatorState").first(); + if (state) { + await ctx.db.patch(state._id, { scheduled: false }); + } + return; + } + + // Claim all workflows by clearing readyToRun + for (const workflow of ready) { + await ctx.db.patch(workflow._id, { readyToRun: undefined }); + } + + // Fan out in batches + for (let i = 0; i < ready.length; i += WORKFLOW_BATCH_SIZE) { + const batch = ready.slice(i, i + WORKFLOW_BATCH_SIZE); + const items = batch.map((w) => ({ + workflowId: w._id, + generationNumber: w.generationNumber, + workflowHandle: w.workflowHandle, + })); + await ctx.scheduler.runAfter( + 0, + internal.coordinator.runWorkflowBatch, + { items }, + ); + } + + // Reschedule self to pick up more + await ctx.scheduler.runAfter(0, internal.coordinator.coordinator); + }, +}); + +export const runWorkflowBatch = internalMutation({ + args: { + items: v.array( + v.object({ + workflowId: v.id("workflows"), + generationNumber: v.number(), + workflowHandle: v.string(), + }), + ), + }, + returns: v.null(), + handler: async (ctx, { items }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + for (const { workflowId, generationNumber, workflowHandle } of items) { + const workflow = await ctx.db.get(workflowId); + if ( + !workflow || + workflow.runResult || + workflow.generationNumber !== generationNumber + ) { + continue; + } + try { + await ctx.runMutation( + workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + ); + } catch (e) { + const error = + e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error( + `Error running workflow ${workflowId}: ${error}`, + ); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + } + }, +}); diff --git a/src/component/event.ts b/src/component/event.ts index a002316c..0ef7c18f 100644 --- a/src/component/event.ts +++ b/src/component/event.ts @@ -5,7 +5,7 @@ import { mutation, type MutationCtx } from "./_generated/server.js"; import { vResultValidator } from "@convex-dev/workpool"; import type { Doc, Id } from "./_generated/dataModel.js"; import { assert } from "convex-helpers"; -import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js"; +import { enqueueWorkflow } from "./pool.js"; export async function awaitEvent( ctx: MutationCtx, @@ -106,7 +106,6 @@ export const send = mutation({ eventId: v.optional(v.id("events")), name: v.optional(v.string()), result: vResultValidator, - workpoolOptions: v.optional(workpoolOptions), }, returns: v.id("events"), handler: async (ctx, args) => { @@ -169,8 +168,7 @@ export const send = mutation({ if (!anyMoreEvents) { const workflow = await ctx.db.get(workflowId); assert(workflow, `Workflow ${workflowId} not found`); - const workpool = await getWorkpool(ctx, args.workpoolOptions); - await enqueueWorkflow(ctx, workflow, workpool); + await enqueueWorkflow(ctx, workflow); } break; } diff --git a/src/component/journal.ts b/src/component/journal.ts index 62aeafdd..82675b10 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -8,7 +8,7 @@ import { workflowDocument, } from "./schema.js"; import { getWorkflow } from "./model.js"; -import { logLevel } from "./logging.js"; +import { createLogger, DEFAULT_LOG_LEVEL, logLevel } from "./logging.js"; import { vRetryBehavior, type WorkId } from "@convex-dev/workpool"; import { getWorkpool, @@ -71,6 +71,20 @@ export const load = query({ }, }); +function shardForWorkflow(workflowId: string, numShards: number): number { + let hash = 0; + for (let i = 0; i < workflowId.length; i++) { + hash = (hash * 31 + workflowId.charCodeAt(i)) | 0; + } + return ((hash % numShards) + numShards) % numShards; +} + +const DEFAULT_QM_RETRY = { + maxAttempts: 4, + initialBackoffMs: 125, + base: 2, +}; + export const startSteps = mutation({ args: { workflowId: v.string(), @@ -85,6 +99,7 @@ export const startSteps = mutation({ v.object({ runAfter: v.optional(v.number()) }), ), ), + batchActionName: v.optional(v.string()), }), ), workpoolOptions: v.optional(workpoolOptions), @@ -96,7 +111,7 @@ export const startSteps = mutation({ } const { generationNumber } = args; const workflow = await getWorkflow(ctx, args.workflowId, generationNumber); - const console = await getDefaultLogger(ctx); + const console = createLogger(DEFAULT_LOG_LEVEL); if (workflow.runResult !== undefined) { throw new Error(`Workflow not running: ${args.workflowId}`); @@ -112,7 +127,7 @@ export const startSteps = mutation({ const entries = await Promise.all( args.steps.map(async (stepArgs, index) => { - const { retry, schedulerOptions } = stepArgs; + const { schedulerOptions } = stepArgs; const stepNumber = stepNumberBase + index; const stepId = await ctx.db.insert("steps", { workflowId: workflow._id, @@ -164,37 +179,91 @@ export const startSteps = mutation({ stepId, workpoolOptions: args.workpoolOptions, }; - let workId: WorkId; + let workId: string; + // Retry config only applies to actions — queries and mutations + // are deterministic and retried automatically by Convex on OCC. + const actionRetryConfig = + step.functionType === "action" + ? (stepArgs.retry === false + ? undefined + : typeof stepArgs.retry === "object" + ? stepArgs.retry + : DEFAULT_QM_RETRY) + : undefined; switch (step.functionType) { case "query": { - workId = await workpool.enqueueQuery( - ctx, - step.handle as FunctionHandle<"query">, - step.args, - { context, onComplete, name, ...schedulerOptions }, - ); + if (workflow.executorShards) { + const shard = shardForWorkflow(workflow._id as string, workflow.executorShards); + await ctx.db.insert("taskQueue", { + shard, + functionType: "query", + handle: step.handle, + args: step.args, + stepId, + workflowId: workflow._id, + generationNumber, + }); + workId = `executor:${stepId}`; + } else { + workId = (await workpool.enqueueQuery( + ctx, + step.handle as FunctionHandle<"query">, + step.args, + { context, onComplete, name, ...schedulerOptions }, + )) as unknown as string; + } break; } case "mutation": { - workId = await workpool.enqueueMutation( - ctx, - step.handle as FunctionHandle<"mutation">, - step.args, - { context, onComplete, name, ...schedulerOptions }, - ); + if (workflow.executorShards) { + const shard = shardForWorkflow(workflow._id as string, workflow.executorShards); + await ctx.db.insert("taskQueue", { + shard, + functionType: "mutation", + handle: step.handle, + args: step.args, + stepId, + workflowId: workflow._id, + generationNumber, + }); + workId = `executor:${stepId}`; + } else { + workId = (await workpool.enqueueMutation( + ctx, + step.handle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name, ...schedulerOptions }, + )) as unknown as string; + } break; } case "action": { - workId = await workpool.enqueueAction( - ctx, - step.handle as FunctionHandle<"action">, - step.args, - { context, onComplete, name, retry, ...schedulerOptions }, - ); + if (stepArgs.batchActionName && workflow.executorShards) { + // Route through sharded task queue for executor-driven execution. + const shard = shardForWorkflow(workflow._id as string, workflow.executorShards); + await ctx.db.insert("taskQueue", { + shard, + functionType: "action", + handle: stepArgs.batchActionName, + args: step.args, + stepId, + workflowId: workflow._id, + generationNumber, + retry: actionRetryConfig, + }); + workId = `executor:${stepId}`; + } else { + workId = (await workpool.enqueueAction( + ctx, + step.handle as FunctionHandle<"action">, + step.args, + { context, onComplete, name, ...schedulerOptions }, + )) as unknown as string; + } break; } } - step.workId = workId; + step.workId = workId as unknown as WorkId; } await ctx.db.replace(entry._id, entry); diff --git a/src/component/pool.ts b/src/component/pool.ts index ef63afbf..42f65c8a 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -17,9 +17,7 @@ import { import { type Infer, v } from "convex/values"; import { components, internal } from "./_generated/api.js"; import { internalMutation, type MutationCtx } from "./_generated/server.js"; -import { logLevel } from "./logging.js"; -import { getWorkflow } from "./model.js"; -import { getDefaultLogger } from "./utils.js"; +import { createLogger, DEFAULT_LOG_LEVEL, logLevel } from "./logging.js"; import { completeHandler } from "./workflow.js"; import type { Doc } from "./_generated/dataModel.js"; import { vWorkflowId, type WorkflowId } from "../types.js"; @@ -96,14 +94,12 @@ async function onCompleteHandler( context: object; }, ) { - const console = await getDefaultLogger(ctx); + const console = createLogger(DEFAULT_LOG_LEVEL); const stepId = "stepId" in args.context && typeof args.context.stepId === "string" ? ctx.db.normalizeId("steps", args.context.stepId) : null; if (!stepId) { - // Write to failures table and return - // So someone can investigate if this ever happens console.error("Invalid onComplete context", args.context); await ctx.db.insert("onCompleteFailures", args); return; @@ -127,7 +123,11 @@ async function onCompleteHandler( return; } const { generationNumber } = args.context; - const workflow = await getWorkflow(ctx, workflowId, null); + const workflow = await ctx.db.get(workflowId); + if (!workflow) { + console.error(`Workflow not found: ${workflowId}`); + return; + } if (workflow.generationNumber !== generationNumber) { console.error( `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, @@ -180,26 +180,52 @@ async function onCompleteHandler( } return; } - const workpool = await getWorkpool(ctx, args.context.workpoolOptions); - await enqueueWorkflow(ctx, workflow, workpool); + // Only progress the workflow when no other steps are still running. + // This avoids wasted replay calls (and OCC conflicts) when parallel + // steps (e.g. analyze-a and analyze-b) complete at different times. + const otherInProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (!otherInProgress) { + // Inline progression: run the workflow mutation directly in this + // transaction instead of scheduling a separate directRunWorkflow. + // This keeps step result recording + workflow progression atomic, and + // at high scale (10K+) the saved scheduling hop outweighs the slightly + // larger transaction's OCC window. + try { + await ctx.runMutation( + workflow.workflowHandle as FunctionHandle<"mutation">, + { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + ); + } catch (e) { + const error = + e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + } } export async function enqueueWorkflow( ctx: MutationCtx, workflow: Doc<"workflows">, - workpool: Workpool, ) { - const { _id: workflowId, generationNumber, name, workflowHandle } = workflow; - await workpool.enqueueMutation( - ctx, - workflowHandle as FunctionHandle<"mutation">, - { workflowId, generationNumber }, - { - name, - onComplete: internal.pool.handlerOnComplete, - context: { workflowId, generationNumber }, - }, - ); + // Schedule the workflow mutation directly instead of going through the + // coordinator. This avoids OCC contention on workflow documents — the old + // approach patched `readyToRun` on every step completion, which conflicts + // with the coordinator's reads/writes on the same docs at high scale. + await ctx.scheduler.runAfter(0, internal.pool.directRunWorkflow, { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }); } export type OnComplete = @@ -228,7 +254,7 @@ export const handlerOnComplete = internalMutation({ if (args.result.kind === "success") { return; } - const console = await getDefaultLogger(ctx); + const console = createLogger(DEFAULT_LOG_LEVEL); if (!validate(handlerOnCompleteContext, args.context)) { console.error("Invalid handlerOnComplete context", args.context); const workflowId = ctx.db.normalizeId( @@ -262,5 +288,37 @@ export const handlerOnComplete = internalMutation({ }); }, }); +export const directRunWorkflow = internalMutation({ + args: { + workflowId: v.id("workflows"), + generationNumber: v.number(), + }, + returns: v.null(), + handler: async (ctx, { workflowId, generationNumber }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + const workflow = await ctx.db.get(workflowId); + if ( + !workflow || + workflow.runResult || + workflow.generationNumber !== generationNumber + ) { + return; + } + try { + await ctx.runMutation( + workflow.workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + ); + } catch (e) { + const error = + e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + }, +}); + // eslint-disable-next-line @typescript-eslint/no-unused-vars const console = "THIS IS A REMINDER TO USE getDefaultLogger"; diff --git a/src/component/schema.ts b/src/component/schema.ts index 35e8e0e8..37024209 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -50,6 +50,13 @@ const workflowObject = { // Internal execution status, used to totally order mutations. generationNumber: v.number(), + + // Set to true when the workflow is ready for the coordinator to pick up. + readyToRun: v.optional(v.boolean()), + + // When set, action steps with batchActionName route through the sharded + // task queue instead of the batch bridge or workpool. + executorShards: v.optional(v.number()), }; export const workflowDocument = v.object({ @@ -67,6 +74,13 @@ const stepCommonFields = { runResult: v.optional(vResultValidator), startedAt: v.number(), completedAt: v.optional(v.number()), + // When the executor finished processing (before batched flush). + // Gap between this and completedAt = batching/write delay. + executorFinishedAt: v.optional(v.number()), + // When the flush loop called the mutation (after queue wait). + // executorFinishedAt → flushCalledAt = queue wait + // flushCalledAt → completedAt = mutation wait (slot + OCC + execution) + flushCalledAt: v.optional(v.number()), }; export const step = v.union( @@ -176,7 +190,12 @@ export default defineSchema({ logLevel: v.optional(logLevel), maxParallelism: v.optional(v.number()), }), - workflows: defineTable(workflowObject).index("name", ["name"]), + coordinatorState: defineTable({ + scheduled: v.boolean(), + }), + workflows: defineTable(workflowObject) + .index("name", ["name"]) + .index("readyToRun", ["readyToRun"]), steps: defineTable(journalObject) .index("workflow", ["workflowId", "stepNumber"]) .index("inProgress", ["step.inProgress", "workflowId"]), @@ -184,6 +203,30 @@ export default defineSchema({ "workflowId", "state.kind", ]), + executorEpoch: defineTable({ + epoch: v.number(), + }), + taskQueue: defineTable({ + shard: v.number(), + functionType: v.union(v.literal("query"), v.literal("mutation"), v.literal("action")), + handle: v.string(), + args: v.any(), + stepId: v.id("steps"), + workflowId: v.id("workflows"), + generationNumber: v.number(), + retry: v.optional(v.object({ + maxAttempts: v.number(), + initialBackoffMs: v.number(), + base: v.number(), + })), + }) + .index("by_shard", ["shard"]) + .index("by_stepId", ["stepId"]), + executorHandoff: defineTable({ + shard: v.number(), + ready: v.boolean(), + yielded: v.boolean(), + }).index("by_shard", ["shard"]), onCompleteFailures: defineTable( v.union( v.object({ diff --git a/src/component/taskQueue.ts b/src/component/taskQueue.ts new file mode 100644 index 00000000..43d86d0a --- /dev/null +++ b/src/component/taskQueue.ts @@ -0,0 +1,562 @@ +import { v } from "convex/values"; +import { vResultValidator } from "@convex-dev/workpool"; +import { mutation, query } from "./_generated/server.js"; +import { api } from "./_generated/api.js"; +import type { Id } from "./_generated/dataModel.js"; +import { createLogger, DEFAULT_LOG_LEVEL } from "./logging.js"; +import type { FunctionHandle } from "convex/server"; + +const taskResult = v.object({ + functionType: v.union(v.literal("query"), v.literal("mutation"), v.literal("action")), + handle: v.string(), + args: v.any(), + stepId: v.id("steps"), + workflowId: v.id("workflows"), + generationNumber: v.number(), + retry: v.optional(v.object({ + maxAttempts: v.number(), + initialBackoffMs: v.number(), + base: v.number(), + })), +}); + +export const claimTasks = query({ + args: { + shard: v.number(), + limit: v.number(), + }, + returns: v.array(taskResult), + handler: async (ctx, { shard, limit }) => { + // Read tasks without deleting — deletion happens in recordResultBatch + // to ensure atomicity with step result recording. + const tasks = await ctx.db + .query("taskQueue") + .withIndex("by_shard", (q) => q.eq("shard", shard)) + .take(limit); + return tasks.map((task) => ({ + functionType: task.functionType, + handle: task.handle, + args: task.args, + stepId: task.stepId, + workflowId: task.workflowId, + generationNumber: task.generationNumber, + retry: task.retry, + })); + }, +}); + +export const recordResult = mutation({ + args: { + stepId: v.id("steps"), + result: vResultValidator, + generationNumber: v.number(), + }, + returns: v.null(), + handler: async (ctx, { stepId, result, generationNumber }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + + // Helper: always delete the task from the queue, even on early return. + // Stale/orphaned tasks must not block the shard permanently. + const deleteTask = async () => { + const taskEntry = await ctx.db + .query("taskQueue") + .withIndex("by_stepId", (q) => q.eq("stepId", stepId)) + .unique(); + if (taskEntry) { + await ctx.db.delete(taskEntry._id); + } + }; + + // 1. Record the step result (same logic as pool.ts:onCompleteHandler) + const journalEntry = await ctx.db.get(stepId); + if (!journalEntry) { + console.error(`Journal entry not found: ${stepId}`); + await deleteTask(); + return null; + } + const workflowId = journalEntry.workflowId; + const workflow = await ctx.db.get(workflowId); + if (!workflow) { + console.error(`Workflow not found: ${workflowId}`); + await deleteTask(); + return null; + } + if (workflow.generationNumber !== generationNumber) { + console.error( + `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, + ); + await deleteTask(); + return null; + } + if (!journalEntry.step.inProgress) { + console.error( + `Step finished but journal entry not in progress: ${stepId}`, + ); + await deleteTask(); + return null; + } + + journalEntry.step.inProgress = false; + journalEntry.step.completedAt = Date.now(); + switch (result.kind) { + case "success": + journalEntry.step.runResult = { + kind: "success", + returnValue: result.returnValue, + }; + break; + case "failed": + journalEntry.step.runResult = { + kind: "failed", + error: result.error, + }; + break; + case "canceled": + journalEntry.step.runResult = { + kind: "canceled", + }; + break; + } + await ctx.db.replace(journalEntry._id, journalEntry); + + console.event("stepCompleted", { + workflowId, + workflowName: workflow.name, + status: result.kind, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, + }); + + // 2. Delete the task from the queue (atomically with result recording) + await deleteTask(); + + if (workflow.runResult !== undefined) { + return null; + } + + // 3. Check if this was the last in-progress step → replay workflow inline + const otherInProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (!otherInProgress) { + try { + await ctx.runMutation( + workflow.workflowHandle as FunctionHandle<"mutation">, + { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + ); + } catch (e) { + const error = + e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + } + + return null; + }, +}); + +export const startExecutors = mutation({ + args: { + executorHandle: v.string(), + numShards: v.number(), + }, + returns: v.number(), // returns the new epoch + handler: async (ctx, { executorHandle, numShards }) => { + // Increment executor epoch — old executors with stale epochs will terminate. + const existing = await ctx.db + .query("executorEpoch") + .first(); + let epoch: number; + if (existing) { + epoch = existing.epoch + 1; + await ctx.db.patch(existing._id, { epoch }); + } else { + epoch = 1; + await ctx.db.insert("executorEpoch", { epoch }); + } + for (let i = 0; i < numShards; i++) { + await ctx.scheduler.runAfter( + 0, + executorHandle as FunctionHandle<"action">, + { shard: i, epoch }, + ); + } + return epoch; + }, +}); + +export const getExecutorEpoch = query({ + args: {}, + returns: v.number(), + handler: async (ctx) => { + const existing = await ctx.db + .query("executorEpoch") + .first(); + return existing?.epoch ?? 0; + }, +}); + +const replayCandidate = v.object({ + workflowId: v.id("workflows"), + generationNumber: v.number(), + workflowHandle: v.string(), +}); + +export const recordResultBatch = mutation({ + args: { + items: v.array( + v.object({ + stepId: v.id("steps"), + result: vResultValidator, + generationNumber: v.number(), + executorFinishedAt: v.optional(v.number()), + flushCalledAt: v.optional(v.number()), + }), + ), + replayInline: v.optional(v.boolean()), + }, + returns: v.array(replayCandidate), + handler: async (ctx, { items, replayInline }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + const candidates = new Map< + string, + { workflowId: Id<"workflows">; workflowHandle: string; generationNumber: number } + >(); + + for (const { stepId, result, generationNumber, executorFinishedAt, flushCalledAt } of items) { + const deleteTask = async () => { + const taskEntry = await ctx.db + .query("taskQueue") + .withIndex("by_stepId", (q) => q.eq("stepId", stepId)) + .unique(); + if (taskEntry) { + await ctx.db.delete(taskEntry._id); + } + }; + + const journalEntry = await ctx.db.get(stepId); + if (!journalEntry) { + await deleteTask(); + continue; + } + const workflowId = journalEntry.workflowId; + const workflow = await ctx.db.get(workflowId); + if (!workflow) { + await deleteTask(); + continue; + } + if (workflow.generationNumber !== generationNumber) { + await deleteTask(); + continue; + } + if (!journalEntry.step.inProgress) { + await deleteTask(); + continue; + } + + journalEntry.step.inProgress = false; + journalEntry.step.completedAt = Date.now(); + if (executorFinishedAt) { + journalEntry.step.executorFinishedAt = executorFinishedAt; + } + if (flushCalledAt) { + journalEntry.step.flushCalledAt = flushCalledAt; + } + switch (result.kind) { + case "success": + journalEntry.step.runResult = { + kind: "success", + returnValue: result.returnValue, + }; + break; + case "failed": + journalEntry.step.runResult = { + kind: "failed", + error: result.error, + }; + break; + case "canceled": + journalEntry.step.runResult = { + kind: "canceled", + }; + break; + } + await ctx.db.replace(journalEntry._id, journalEntry); + + console.event("stepCompleted", { + workflowId, + workflowName: workflow.name, + status: result.kind, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, + }); + + await deleteTask(); + + if (workflow.runResult === undefined) { + candidates.set(workflowId, { + workflowId, + workflowHandle: workflow.workflowHandle, + generationNumber: workflow.generationNumber, + }); + } + } + + // Durable safety net: schedule a delayed replayIfReady for each + // candidate. This is committed atomically with the result recording, + // so it survives executor crashes and hard timeouts. In the normal + // case inline replay handles it first and the scheduled call is a + // cheap no-op (checks runResult, returns early). + if (replayInline) { + for (const candidate of candidates.values()) { + await ctx.scheduler.runAfter( + 10_000, + api.taskQueue.replayIfReady, + candidate, + ); + } + } + + // When replayInline is set, attempt replay within this same mutation. + // This eliminates OCC conflicts for the common case. Candidates where + // other steps are still in-progress are returned to the client so it + // can retry via replayBatchIfReady — this prevents the race where two + // concurrent batches each complete the "last" step but both skip replay + // because neither sees the other's commit (snapshot isolation). + if (replayInline) { + const unreplayed: Array<{ workflowId: Id<"workflows">; workflowHandle: string; generationNumber: number }> = []; + for (const candidate of candidates.values()) { + const { workflowId, generationNumber, workflowHandle } = candidate; + const workflow = await ctx.db.get(workflowId); + if (!workflow || workflow.runResult || workflow.generationNumber !== generationNumber) { + continue; + } + const inProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (inProgress) { + // Other steps still running in a concurrent batch — return to + // client for retry so the workflow isn't permanently stranded. + unreplayed.push(candidate); + continue; + } + try { + await ctx.runMutation( + workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + ); + } catch (e) { + const error = e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + } + return unreplayed; + } + + // Return candidates — executor handles replay in a concurrent pipeline. + // No inProgress index reads here = minimal OCC surface. + return [...candidates.values()]; + }, +}); + +// Per-workflow replay check. Called from executor after recordResultBatch. +// Small mutation = minimal OCC conflict surface. +export const replayIfReady = mutation({ + args: replayCandidate, + returns: v.null(), + handler: async (ctx, { workflowId, generationNumber, workflowHandle }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + const workflow = await ctx.db.get(workflowId); + if (!workflow || workflow.runResult || workflow.generationNumber !== generationNumber) { + return null; + } + const inProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (inProgress) { + return null; + } + try { + await ctx.runMutation( + workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + ); + } catch (e) { + const error = e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + return null; + }, +}); + +// Batched replay: process multiple candidates in a single mutation call. +// Reduces mutation count from N to 1, avoiding "too many concurrent commits". +export const replayBatchIfReady = mutation({ + args: { candidates: v.array(replayCandidate) }, + returns: v.null(), + handler: async (ctx, { candidates }) => { + const console = createLogger(DEFAULT_LOG_LEVEL); + for (const { workflowId, generationNumber, workflowHandle } of candidates) { + const workflow = await ctx.db.get(workflowId); + if (!workflow || workflow.runResult || workflow.generationNumber !== generationNumber) { + continue; + } + const inProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (inProgress) { + continue; + } + try { + await ctx.runMutation( + workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + ); + } catch (e) { + const error = e instanceof Error ? e.message : `Unknown error: ${String(e)}`; + console.error(`Error running workflow ${workflowId}: ${error}`); + await ctx.db.patch(workflowId, { + runResult: { kind: "failed", error }, + }); + } + } + return null; + }, +}); + +export const diagnose = query({ + args: { + numShards: v.optional(v.number()), + }, + returns: v.object({ + taskQueueCounts: v.array(v.object({ shard: v.number(), count: v.number() })), + totalTasks: v.number(), + executorEpoch: v.number(), + }), + handler: async (ctx, { numShards }) => { + const shards = numShards ?? 40; + // Use indexed per-shard queries instead of table scan. + const taskQueueCounts: Array<{ shard: number; count: number }> = []; + let totalTasks = 0; + for (let s = 0; s < shards; s++) { + let count = 0; + for await (const _task of ctx.db + .query("taskQueue") + .withIndex("by_shard", (q) => q.eq("shard", s))) { + count++; + } + if (count > 0) { + taskQueueCounts.push({ shard: s, count }); + totalTasks += count; + } + } + + const existing = await ctx.db.query("executorEpoch").first(); + const executorEpoch = existing?.epoch ?? 0; + + return { taskQueueCounts, totalTasks, executorEpoch }; + }, +}); + +export const handoff = mutation({ + args: { + shard: v.number(), + action: v.union( + v.literal("init"), + v.literal("ready"), + v.literal("yielded"), + v.literal("clear"), + ), + }, + returns: v.null(), + handler: async (ctx, { shard, action }) => { + const existing = await ctx.db + .query("executorHandoff") + .withIndex("by_shard", (q) => q.eq("shard", shard)) + .unique(); + + switch (action) { + case "init": { + // Old executor creates the handoff doc (delete stale first). + if (existing) { + await ctx.db.delete(existing._id); + } + await ctx.db.insert("executorHandoff", { + shard, + ready: false, + yielded: false, + }); + break; + } + case "ready": { + // New executor signals it's warmed up. + if (existing) { + await ctx.db.patch(existing._id, { ready: true }); + } + break; + } + case "yielded": { + // Old executor confirms it has stopped claiming. + if (existing) { + await ctx.db.patch(existing._id, { yielded: true }); + } + break; + } + case "clear": { + // New executor cleans up after handoff completes. + if (existing) { + await ctx.db.delete(existing._id); + } + break; + } + } + return null; + }, +}); + +export const getHandoff = query({ + args: { + shard: v.number(), + }, + returns: v.union( + v.object({ ready: v.boolean(), yielded: v.boolean() }), + v.null(), + ), + handler: async (ctx, { shard }) => { + const existing = await ctx.db + .query("executorHandoff") + .withIndex("by_shard", (q) => q.eq("shard", shard)) + .unique(); + if (!existing) return null; + return { ready: existing.ready, yielded: existing.yielded }; + }, +}); + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const console = "THIS IS A REMINDER TO USE getDefaultLogger"; diff --git a/src/component/workflow.ts b/src/component/workflow.ts index 0f29818d..31291155 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -7,7 +7,7 @@ import { } from "convex/server"; import { type Infer, v } from "convex/values"; import { mutation, type MutationCtx, query } from "./_generated/server.js"; -import { type Logger, logLevel } from "./logging.js"; +import { logLevel } from "./logging.js"; import { getWorkflow } from "./model.js"; import { getWorkpool } from "./pool.js"; import schema, { @@ -17,6 +17,7 @@ import schema, { type JournalEntry, } from "./schema.js"; import { getDefaultLogger } from "./utils.js"; +import { ensureCoordinatorRunning } from "./coordinator.js"; import { type WorkflowId, type OnCompleteArgs, @@ -28,7 +29,7 @@ import { type PublicWorkflow, vPublicWorkflow, } from "../types.js"; -import { api, internal } from "./_generated/api.js"; +import { api } from "./_generated/api.js"; import { formatErrorWithStack } from "../shared.js"; import type { Doc, Id } from "./_generated/dataModel.js"; import { paginator } from "convex-helpers/server/pagination"; @@ -40,6 +41,7 @@ const createArgs = v.object({ maxParallelism: v.optional(v.number()), onComplete: v.optional(vOnComplete), startAsync: v.optional(v.boolean()), + executorShards: v.optional(v.number()), // TODO: ttl }); export const create = mutation({ @@ -51,37 +53,31 @@ export const create = mutation({ export async function createHandler( ctx: MutationCtx, args: Infer, - schedulerOptions?: SchedulerOptions, + _schedulerOptions?: SchedulerOptions, ) { const console = await getDefaultLogger(ctx); - await updateMaxParallelism(ctx, console, args.maxParallelism); const workflowId = await ctx.db.insert("workflows", { name: args.workflowName, workflowHandle: args.workflowHandle, args: args.workflowArgs, generationNumber: 0, onComplete: args.onComplete, + readyToRun: args.startAsync && !args.executorShards ? true : undefined, + executorShards: args.executorShards, }); console.debug( `Created workflow ${workflowId}:`, args.workflowArgs, args.workflowHandle, ); - if (args.startAsync) { - const workpool = await getWorkpool(ctx, args); - await workpool.enqueueMutation( - ctx, - args.workflowHandle as FunctionHandle<"mutation">, - { workflowId, generationNumber: 0 }, - { - name: args.workflowName, - onComplete: internal.pool.handlerOnComplete, - context: { workflowId, generationNumber: 0 }, - ...schedulerOptions, - }, - ); + if (args.startAsync && !args.executorShards) { + await ensureCoordinatorRunning(ctx); } else { - // If we can't start it, may as well not create it, eh? Fail fast... + // For executor-mode workflows, always run inline even with startAsync — + // the first replay just inserts a task into the sharded queue, which is + // fast and avoids the coordinator batch-scheduling bottleneck. + // For non-executor workflows without startAsync, this is the existing + // fail-fast path. await ctx.runMutation(args.workflowHandle as FunctionHandle<"mutation">, { workflowId, generationNumber: 0, @@ -197,6 +193,186 @@ export const listByName = query({ }, }); +// Paginated count — each call reads one page within the 16MB read limit. +// Returns partial counts + a cursor. Call in a loop from an action to +// count across arbitrarily many workflows. +export const countByNamePage = query({ + args: { + name: v.string(), + createdAfter: v.optional(v.number()), + paginationOpts: paginationOptsValidator, + }, + returns: v.object({ + completed: v.number(), + failed: v.number(), + running: v.number(), + continueCursor: v.string(), + isDone: v.boolean(), + }), + handler: async (ctx, { name, createdAfter, paginationOpts }) => { + const result = await paginator(ctx.db, schema) + .query("workflows") + .withIndex("name", (q) => q.eq("name", name)) + .order("desc") + .paginate(paginationOpts); + + let completed = 0; + let failed = 0; + let running = 0; + const hitOld = createdAfter + ? result.page.some((wf) => wf._creationTime < createdAfter) + : false; + for (const wf of result.page) { + if (createdAfter && wf._creationTime < createdAfter) break; + if (!wf.runResult) running++; + else if (wf.runResult.kind === "success") completed++; + else failed++; + } + return { + completed, + failed, + running, + continueCursor: result.continueCursor, + isDone: result.isDone || hitOld, + }; + }, +}); + +// Legacy single-call count — works for small result sets (<~15k workflows). +export const countByName = query({ + args: { + name: v.string(), + createdAfter: v.optional(v.number()), + }, + returns: v.object({ + total: v.number(), + completed: v.number(), + failed: v.number(), + running: v.number(), + }), + handler: async (ctx, { name, createdAfter }) => { + let completed = 0; + let failed = 0; + let running = 0; + for await (const wf of ctx.db + .query("workflows") + .withIndex("name", (q) => q.eq("name", name)) + .order("desc")) { + if (createdAfter && wf._creationTime < createdAfter) break; + if (!wf.runResult) running++; + else if (wf.runResult.kind === "success") completed++; + else failed++; + } + return { total: completed + failed + running, completed, failed, running }; + }, +}); + +export const creationTimeBuckets = query({ + args: { + name: v.string(), + createdAfter: v.number(), + bucketMs: v.number(), + }, + returns: v.array(v.object({ offsetSec: v.number(), count: v.number() })), + handler: async (ctx, { name, createdAfter, bucketMs }) => { + const buckets = new Map(); + for await (const wf of ctx.db + .query("workflows") + .withIndex("name", (q) => q.eq("name", name)) + .order("desc")) { + if (wf._creationTime < createdAfter) break; + const bucket = Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000); + buckets.set(bucket, (buckets.get(bucket) || 0) + 1); + } + return [...buckets.entries()] + .map(([offsetSec, count]) => ({ offsetSec, count })) + .sort((a, b) => a.offsetSec - b.offsetSec); + }, +}); + +export const timelinePage = query({ + args: { + name: v.string(), + createdAfter: v.optional(v.number()), + paginationOpts: paginationOptsValidator, + }, + returns: vPaginationResult( + v.object({ + id: v.string(), + createdAt: v.number(), + runResult: v.optional(v.union(v.literal("success"), v.literal("failed"), v.literal("canceled"))), + steps: v.array( + v.object({ + stepNumber: v.number(), + name: v.string(), + startedAt: v.number(), + completedAt: v.optional(v.number()), + executionStartedAt: v.optional(v.number()), + executorFinishedAt: v.optional(v.number()), + flushCalledAt: v.optional(v.number()), + }), + ), + }), + ), + handler: async (ctx, { name, createdAfter, paginationOpts }) => { + // Paginate desc (newest first) so we can stop early at createdAfter + const result = await paginator(ctx.db, schema) + .query("workflows") + .withIndex("name", (q) => q.eq("name", name)) + .order("desc") + .paginate(paginationOpts); + + const filtered = createdAfter + ? result.page.filter((wf) => wf._creationTime >= createdAfter) + : result.page; + + // If any workflow on this page is older than createdAfter, we're done + const hitOld = createdAfter + ? result.page.some((wf) => wf._creationTime < createdAfter) + : false; + + const page = await Promise.all( + filtered.map(async (wf) => { + const stepDocs = await ctx.db + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", wf._id)) + .collect(); + return { + id: wf._id, + createdAt: wf._creationTime, + runResult: wf.runResult?.kind as "success" | "failed" | "canceled" | undefined, + steps: stepDocs.map((s) => { + // Extract executionStartedAt from the action's return value if present. + // Actions can include `executorStartedAt` in their result to distinguish + // queue wait time from actual execution time. + const rv = s.step.runResult?.kind === "success" + ? (s.step.runResult.returnValue as Record) + : undefined; + const executionStartedAt = typeof rv?.executorStartedAt === "number" + ? rv.executorStartedAt + : undefined; + return { + stepNumber: s.stepNumber, + name: s.step.name, + startedAt: s.step.startedAt, + completedAt: s.step.completedAt, + executionStartedAt, + executorFinishedAt: s.step.executorFinishedAt, + flushCalledAt: s.step.flushCalledAt, + }; + }), + }; + }), + ); + + return { + ...result, + isDone: result.isDone || hitOld, + page, + } as any; + }, +}); + export const listSteps = query({ args: { workflowId: v.id("workflows"), @@ -281,7 +457,25 @@ export async function completeHandler( for (const { step } of inProgress) { if (!step.kind || step.kind === "function") { if (step.workId) { - await workpool.cancel(ctx, step.workId); + // Executor-managed steps use "executor:" prefix — skip workpool cancel. + if (typeof step.workId === "string" && (step.workId as string).startsWith("executor:")) { + // Clean up the task queue entry if it exists. + const stepId = (step.workId as string).slice("executor:".length); + const normalizedStepId = ctx.db.normalizeId("steps", stepId); + if (normalizedStepId) { + const taskEntry = await ctx.db + .query("taskQueue") + .withIndex("by_stepId", (q) => + q.eq("stepId", normalizedStepId), + ) + .unique(); + if (taskEntry) { + await ctx.db.delete(taskEntry._id); + } + } + } else { + await workpool.cancel(ctx, step.workId); + } } } else if (step.kind === "workflow") { if (step.workflowId) { @@ -358,21 +552,5 @@ export const cleanup = mutation({ }, }); -async function updateMaxParallelism( - ctx: MutationCtx, - console: Logger, - maxParallelism: number | undefined, -) { - const config = await ctx.db.query("config").first(); - if (config) { - if (maxParallelism && maxParallelism !== config.maxParallelism) { - console.warn("Updating max parallelism to", maxParallelism); - await ctx.db.patch(config._id, { maxParallelism }); - } - } else { - await ctx.db.insert("config", { maxParallelism }); - } -} - // eslint-disable-next-line @typescript-eslint/no-unused-vars const console = "THIS IS A REMINDER TO USE getDefaultLogger";