feat: executor mode with query/mutation routing and per-step retries#210
feat: executor mode with query/mutation routing and per-step retries#210sethconvex wants to merge 19 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughThis PR introduces an executor-based batch action system for distributed workflow execution. It adds a coordinator pattern for workflow scheduling, comprehensive task queue infrastructure for executor management, schema extensions for executor tracking, and client APIs for registering batch actions with sharded executor support. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant WorkflowManager
participant Coordinator
participant TaskQueue
participant Executor
participant Workflow
Client->>WorkflowManager: register action(name, handler)
WorkflowManager->>WorkflowManager: store handler in executorActionHandlers
Client->>WorkflowManager: create(args, executorShards=N)
WorkflowManager->>Workflow: create workflow with readyToRun=true, executorShards=N
Coordinator->>Workflow: fetch workflows where readyToRun=true
Coordinator->>Workflow: claim workflows (set readyToRun=false)
Coordinator->>Coordinator: batch process runWorkflowBatch
Coordinator->>Workflow: invoke workflow mutation (workflowHandle)
Workflow->>TaskQueue: enqueue step as task (shard-distributed)
Executor->>TaskQueue: claimTasks(shard)
TaskQueue->>Executor: return task batch (handle, args, stepId)
Executor->>Executor: execute action via handler
Executor->>TaskQueue: recordResult(stepId, result)
TaskQueue->>Workflow: replay workflow if ready
Workflow->>Workflow: continue execution
sequenceDiagram
participant StepExecution
participant Journal
participant TaskQueue as Task Queue
participant Executor
participant WorkflowReplay
StepExecution->>Journal: startSteps(steps, batchActionNames)
Journal->>Journal: identify batchActionName for action steps
alt Has batchActionName and executorShards
Journal->>TaskQueue: insert task (shard, functionType, handle, args, stepId)
Journal->>Journal: return step with workId=executor:shard:taskId
else No batchActionName or no executorShards
Journal->>Journal: direct enqueue via standard path
end
Executor->>TaskQueue: claimTasks(shard, limit)
TaskQueue->>Executor: return task batch
Executor->>Executor: execute task via registered handler
Executor->>TaskQueue: recordResult(stepId, result, workflowId)
TaskQueue->>Journal: update step result
TaskQueue->>TaskQueue: check if last in-progress step
alt Last step finished
TaskQueue->>WorkflowReplay: replayIfReady(workflowId)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This stack of pull requests is managed by Graphite. Learn more about stacking. |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (7)
src/component/workflow.ts (1)
291-368:timelinePagefetches all steps per workflow without pagination — potential read amplification.For each workflow in the paginated result, all steps are fetched via
.collect()(line 335). If workflows have many steps (e.g., hundreds), this could significantly amplify reads and risk hitting query read limits. Consider adding a.take(limit)or documenting the expected step count per workflow.♻️ Optional: limit steps per workflow
const stepDocs = await ctx.db .query("steps") .withIndex("workflow", (q) => q.eq("workflowId", wf._id)) - .collect(); + .take(100); // Limit to prevent excessive reads for workflows with many steps🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 291 - 368, timelinePage currently loads all steps for each workflow by calling ctx.db.query("steps").withIndex("workflow", (q) => q.eq("workflowId", wf._id)).collect(), which can cause read amplification for workflows with many steps; change the query to limit reads (e.g., use .take(limit) or similar paging on the steps query) and surface the limit via a constant or an arg so callers know the cap, update usage around stepDocs and stepDocs.map(...) accordingly, and add a brief comment noting why steps are capped; ensure you modify the query invocation in timelinePage (the steps lookup) rather than other code paths.src/component/taskQueue.ts (3)
46-164: Significant code duplication betweenrecordResultandrecordResultBatch.The step-recording logic (journal lookup, generation check, inProgress check, result recording, task deletion, logging) is nearly identical between
recordResult(lines 68-130) andrecordResultBatch(lines 231-294). The main difference is thatrecordResultdoes inline replay whilerecordResultBatchcollects replay candidates.Extract the shared logic into a helper function to reduce duplication and ensure bug fixes apply to both paths.
♻️ Sketch of a shared helper
async function recordStepResult( ctx: MutationCtx, logger: Logger, stepId: Id<"steps">, result: RunResult, generationNumber: number, ): Promise<{ workflowId: Id<"workflows">; workflowHandle: string; generationNumber: number } | null> { // Shared: deleteTask, journal lookup, generation check, inProgress check, // result recording, logging, task deletion, runResult check // Returns replay candidate or null }Then
recordResultcalls this + does inline replay, andrecordResultBatchcalls this in a loop + collects candidates.Also applies to: 213-307
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 46 - 164, The recordResult and recordResultBatch handlers duplicate the same step-recording workflow (journal lookup, generation/inProgress checks, writing runResult, deleteTask, and logging); extract this into a shared async helper (e.g., recordStepResult(ctx: MutationCtx, logger: Logger, stepId: Id<"steps">, result: RunResult, generationNumber: number)) that performs the deleteTask, validates journalEntry and workflow, applies the runResult to journalEntry, emits the console.event, deletes the task, and returns either a replay candidate { workflowId, workflowHandle, generationNumber } or null; then update recordResult to call the helper and perform inline replay when a candidate is returned, and update recordResultBatch to call the helper in a loop and collect candidates for batched replay. Ensure the helper reuses the same symbols: journalEntry, workflow, deleteTask, and preserves existing error logging and ctx.db.replace/patch semantics.
21-44:claimTasksis a query (read-only) — tasks are not actually "claimed" (no lock/delete).The naming
claimTaskssuggests exclusive claiming, but this is a pure read. Multiple executors or concurrent calls will see the same tasks. This is by design (deletion happens inrecordResultBatch), and the executor'sinFlightStepIdsset prevents local double-processing. However, the name could mislead future maintainers into thinking tasks are atomically claimed.Consider renaming to
peekTasksorfetchTasks, or adding a comment in the function's docstring clarifying the read-only semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 21 - 44, The function claimTasks is misleading because it's a read-only query (tasks are only removed in recordResultBatch) — rename claimTasks to peekTasks (or fetchTasks) throughout the codebase and update any call sites/tests, or if renaming is undesirable, add a clear docstring/comments on the claimTasks query explaining it is read-only and that exclusive claiming is enforced by inFlightStepIds and recordResultBatch; ensure references to claimTasks (calls, exports, and any API docs) are updated to match the new name or note the semantics.
345-377:diagnoseiterates all shards with per-shard queries — O(shards × tasks) reads.For the default 40 shards, this issues 40 indexed queries and counts results by iterating. For diagnostic purposes this is acceptable, but be aware this could hit read limits if shards have many tasks. Consider adding a
takelimit per shard or documenting the expected usage pattern (e.g., "call from dashboard, not from hot paths").🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 345 - 377, The diagnose handler does O(shards × tasks) reads by iterating every task per shard; add a per-shard cap or document intended use. Modify the query handler (diagnose) to accept an optional perShardLimit (e.g., add numShards: v.optional(v.number()) and perShardLimit: v.optional(v.number()) to args) and in the shard loop apply that cap (use the DB query's pagination/.take(perShardLimit) or break the for-await loop after reaching the limit) so each shard issues at most that many reads; alternatively, if you prefer not to change the signature, add a clear comment/docstring above diagnose explaining this is a dashboard-only diagnostic and may hit read limits and suggest a safe per-shard limit when calling it. Ensure you reference the existing symbols diagnose, numShards, shards, taskQueueCounts and the per-shard loop when making the change.src/client/index.ts (3)
445-465: Dropping pending results after MAX_FLUSH_RETRIES is acceptable but warrants a log.Line 459 sets
pendingResults.length = 0which discards unrecorded results. The comment says tasks will be re-processed by the next executor, which is correct since failedrecordResultBatchcalls mean the tasks remain in the queue. However, the work already done by this executor for those tasks is wasted, and there's no log to indicate this happened.♻️ Log dropped results
if (retries >= MAX_FLUSH_RETRIES) { // Give up on remaining items — tasks stay in queue, // next executor run will re-process them. + // eslint-disable-next-line no-console + console.error(`Dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`); pendingResults.length = 0; break; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 445 - 465, In waitUntilIdle, when retries reach MAX_FLUSH_RETRIES and the code sets pendingResults.length = 0, add a log entry so dropped/unrecorded results are visible; call the existing logger (e.g., processLogger.warn or client logger used in this module) and include context: number of dropped items (pendingResults.length before clearing), the retry count (MAX_FLUSH_RETRIES), and that recordResultBatch/flush failed so these results will be reprocessed later; keep behavior the same otherwise.
6-7: Placeholder type bypasses all type safety.
type BatchWorkpool = anyeffectively disables type checking for allthis.batchusage. Consider using a minimal structural interface instead.♻️ Suggested minimal interface
-// BatchWorkpool not yet exported from `@convex-dev/workpool`@0.3.1 -type BatchWorkpool = any; +// Minimal interface until BatchWorkpool is exported from `@convex-dev/workpool` +type BatchWorkpool = { + action(name: string, opts: { args: Record<string, any>; handler: (...args: any[]) => any }): any; + enqueueByHandle(ctx: any, name: string, args: any, opts: any): Promise<any>; +};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 6 - 7, Replace the unsafe placeholder "type BatchWorkpool = any" with a minimal structural interface named BatchWorkpool that declares only the properties and methods actually accessed on this.batch in this file (e.g., the method names, signatures and any used properties); update usages to rely on that interface and mark members optional if uncertain, and add a TODO to switch to the real exported type from `@convex-dev/workpool` once available. This preserves type safety while remaining compatible until the package exports the concrete type.
300-314: Flush error recovery unshifts batch but doesn't rethrow — laterpendingResults.length > 0checks may spin silently.When
recordResultBatchfails (line 312), the batch is put back andflushreturns. TheflushLoop(line 358) will retry on the next interval. However, if the mutation keeps failing (e.g., due to a persistent OCC conflict or a deployment issue), the flush loop will spin every 500ms indefinitely, silently retrying the same failing batch. This could contribute to action timeout without any diagnostic logging.♻️ Add logging on flush failure
} catch { + // Log so persistent failures are visible in action logs. + // eslint-disable-next-line no-console + console.warn(`recordResultBatch failed, ${batch.length} items re-queued for retry`); pendingResults.unshift(...batch); return; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 300 - 314, The flush function swallows errors from ctx.runMutation and simply requeues the batch then returns, which causes silent tight retries; modify the catch in flush (the catch around ctx.runMutation of component.taskQueue.recordResultBatch) to catch the error as a variable (e.g., catch (err)), log a descriptive error with context (include batch size, first item ids, FLUSH_BATCH_SIZE and any request identifiers) using the module's existing logger (e.g., ctx.logger or processLogger), requeue the batch as currently done, and then rethrow the error (or return an error sentinel) so the caller/flushLoop can apply backoff/stop and surface diagnostics instead of spinning silently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@package.json`:
- Around line 66-67: The package.json currently lists the dependency
"@anthropic-ai/sdk" which is not used anywhere; either remove the
"@anthropic-ai/sdk" entry from package.json and regenerate your lockfile (run
npm install / yarn install) to keep deps in sync, or if you intend to keep it,
add a brief comment in the repo README or a deps.md explaining its intended
future purpose and where it will be used; locate the dependency by name
("@anthropic-ai/sdk") in package.json and act accordingly.
In `@src/client/index.ts`:
- Around line 372-417: The loop can skip entirely when task.retry?.maxAttempts
is 0 leaving result undefined and pushing result! into pendingResults; in
processTask ensure maxAttempts cannot be zero (e.g., compute an
effectiveMaxAttempts = Math.max(1, task.retry?.maxAttempts ?? 1)) or immediately
set result to a failed RunResult when maxAttempts === 0 (with a clear error
message referencing task.handle/stepId), then use that result when pushing into
pendingResults; update references to maxAttempts in the retry loop to use the
effective value so result is always defined before pendingResults.push.
- Around line 293-355: The flush function currently gives up silently if all 3
retry attempts of ctx.runMutation(component.taskQueue.replayIfReady, ...) fail,
so add an error log and (preferably) re-enqueue the candidate for later retry:
inside the final for (const c of failed) retry loop in flush (the replayIfReady
retry section), after the retry loop exits without success, call a logging
method (e.g., console.error or ctx.logger.error) that includes c.workflowId,
c.generationNumber and c.workflowHandle and then enqueue the candidate into a
durable retry mechanism (e.g., invoke a new or existing mutation like
component.taskQueue.enqueueReplay or push to a persistent retry store) so the
replay won’t be dropped permanently; ensure inFlightStepIds/pendingResults
handling remains unchanged and the log/requeue is placed where a replay has
definitively failed.
In `@src/client/step.ts`:
- Around line 199-202: The regex used to split fnName unnecessarily escapes the
forward slash; update the split call in the safeFunctionName handling
(variables: fnName, parts, baseName, this.batchActionNames) to use /[:/]/
instead of /[:\/]/ so it satisfies ESLint no-useless-escape (i.e., change
fnName.split(/[:\/]/) to fnName.split(/[:/]/)).
In `@src/component/journal.ts`:
- Around line 185-220: The executor-mode taskQueue inserts currently hard-code
retry: DEFAULT_QM_RETRY and ignore per-step settings; change both query and
mutation branches that call ctx.db.insert("taskQueue", {...}) to compute a retry
value from the step (e.g., use step.retry when present, falling back to
DEFAULT_QM_RETRY) and pass that computed value instead of DEFAULT_QM_RETRY;
update the insert call in the blocks using shardForWorkflow(workflow._id as
string, workflow.executorShards) and ensure the same retry semantics/typing as
the workpool.enqueueQuery/workpool.enqueueMutation path are preserved.
In `@src/component/taskQueue.ts`:
- Line 3: Remove the unused imported symbol MutationCtx from the import
statement that currently reads import { mutation, query, type MutationCtx } from
"./_generated/server.js"; — edit the import to only include the used exports
(mutation and query) so MutationCtx is not imported, ensuring the linter warning
goes away.
In `@src/component/workflow.ts`:
- Around line 454-469: The code assumes ctx.db.normalizeId("steps", stepId)!
always returns a valid id which can be null for malformed stepId; change the
executor branch to first compute const normalizedStepId =
ctx.db.normalizeId("steps", stepId) and if normalizedStepId is falsy, log or
warn (including stepId/workId) and skip the taskQueue lookup, otherwise use
normalizedStepId in ctx.db.query(...).withIndex("by_stepId", q => q.eq("stepId",
normalizedStepId)).unique(); this avoids passing null into the query and
prevents unexpected throws when handling step.workId that
startsWith("executor:").
- Line 10: createHandler and completeHandler currently hardcode
DEFAULT_LOG_LEVEL; change them to use getDefaultLogger(ctx) so they respect
user-configured log levels. Update the imports to include getDefaultLogger from
"./logging.js" and replace any createLogger(..., DEFAULT_LOG_LEVEL) usage inside
createHandler and completeHandler with createLogger(..., getDefaultLogger(ctx))
(or fetch the logger once via getDefaultLogger(ctx) and pass it into
createLogger), mirroring how getStatus and cleanup obtain their logger.
---
Nitpick comments:
In `@src/client/index.ts`:
- Around line 445-465: In waitUntilIdle, when retries reach MAX_FLUSH_RETRIES
and the code sets pendingResults.length = 0, add a log entry so
dropped/unrecorded results are visible; call the existing logger (e.g.,
processLogger.warn or client logger used in this module) and include context:
number of dropped items (pendingResults.length before clearing), the retry count
(MAX_FLUSH_RETRIES), and that recordResultBatch/flush failed so these results
will be reprocessed later; keep behavior the same otherwise.
- Around line 6-7: Replace the unsafe placeholder "type BatchWorkpool = any"
with a minimal structural interface named BatchWorkpool that declares only the
properties and methods actually accessed on this.batch in this file (e.g., the
method names, signatures and any used properties); update usages to rely on that
interface and mark members optional if uncertain, and add a TODO to switch to
the real exported type from `@convex-dev/workpool` once available. This preserves
type safety while remaining compatible until the package exports the concrete
type.
- Around line 300-314: The flush function swallows errors from ctx.runMutation
and simply requeues the batch then returns, which causes silent tight retries;
modify the catch in flush (the catch around ctx.runMutation of
component.taskQueue.recordResultBatch) to catch the error as a variable (e.g.,
catch (err)), log a descriptive error with context (include batch size, first
item ids, FLUSH_BATCH_SIZE and any request identifiers) using the module's
existing logger (e.g., ctx.logger or processLogger), requeue the batch as
currently done, and then rethrow the error (or return an error sentinel) so the
caller/flushLoop can apply backoff/stop and surface diagnostics instead of
spinning silently.
In `@src/component/taskQueue.ts`:
- Around line 46-164: The recordResult and recordResultBatch handlers duplicate
the same step-recording workflow (journal lookup, generation/inProgress checks,
writing runResult, deleteTask, and logging); extract this into a shared async
helper (e.g., recordStepResult(ctx: MutationCtx, logger: Logger, stepId:
Id<"steps">, result: RunResult, generationNumber: number)) that performs the
deleteTask, validates journalEntry and workflow, applies the runResult to
journalEntry, emits the console.event, deletes the task, and returns either a
replay candidate { workflowId, workflowHandle, generationNumber } or null; then
update recordResult to call the helper and perform inline replay when a
candidate is returned, and update recordResultBatch to call the helper in a loop
and collect candidates for batched replay. Ensure the helper reuses the same
symbols: journalEntry, workflow, deleteTask, and preserves existing error
logging and ctx.db.replace/patch semantics.
- Around line 21-44: The function claimTasks is misleading because it's a
read-only query (tasks are only removed in recordResultBatch) — rename
claimTasks to peekTasks (or fetchTasks) throughout the codebase and update any
call sites/tests, or if renaming is undesirable, add a clear docstring/comments
on the claimTasks query explaining it is read-only and that exclusive claiming
is enforced by inFlightStepIds and recordResultBatch; ensure references to
claimTasks (calls, exports, and any API docs) are updated to match the new name
or note the semantics.
- Around line 345-377: The diagnose handler does O(shards × tasks) reads by
iterating every task per shard; add a per-shard cap or document intended use.
Modify the query handler (diagnose) to accept an optional perShardLimit (e.g.,
add numShards: v.optional(v.number()) and perShardLimit: v.optional(v.number())
to args) and in the shard loop apply that cap (use the DB query's
pagination/.take(perShardLimit) or break the for-await loop after reaching the
limit) so each shard issues at most that many reads; alternatively, if you
prefer not to change the signature, add a clear comment/docstring above diagnose
explaining this is a dashboard-only diagnostic and may hit read limits and
suggest a safe per-shard limit when calling it. Ensure you reference the
existing symbols diagnose, numShards, shards, taskQueueCounts and the per-shard
loop when making the change.
In `@src/component/workflow.ts`:
- Around line 291-368: timelinePage currently loads all steps for each workflow
by calling ctx.db.query("steps").withIndex("workflow", (q) => q.eq("workflowId",
wf._id)).collect(), which can cause read amplification for workflows with many
steps; change the query to limit reads (e.g., use .take(limit) or similar paging
on the steps query) and surface the limit via a constant or an arg so callers
know the cap, update usage around stepDocs and stepDocs.map(...) accordingly,
and add a brief comment noting why steps are capped; ensure you modify the query
invocation in timelinePage (the steps lookup) rather than other code paths.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
package-lock.jsonis excluded by!**/package-lock.jsonsrc/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**src/component/_generated/server.tsis excluded by!**/_generated/**
📒 Files selected for processing (12)
package.jsonsrc/client/index.tssrc/client/step.tssrc/client/workflowMutation.tssrc/component/batch.tssrc/component/coordinator.tssrc/component/event.tssrc/component/journal.tssrc/component/pool.tssrc/component/schema.tssrc/component/taskQueue.tssrc/component/workflow.ts
| "@anthropic-ai/sdk": "^0.75.0", | ||
| "async-channel": "^0.2.0" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
curl -s https://registry.npmjs.org/@anthropic-ai/sdk | jq -r '.["dist-tags"].latest, .license'Repository: get-convex/workflow
Length of output: 73
🏁 Script executed:
rg "@anthropic-ai/sdk|from.*anthropic" --type js --type ts --type json -iRepository: get-convex/workflow
Length of output: 328
🏁 Script executed:
# Search for any anthropic references more broadly
rg -i "anthropic" --type js --type ts --type jsx --type tsxRepository: get-convex/workflow
Length of output: 90
🏁 Script executed:
# Check if it's listed as a dependency elsewhere (transitive or direct)
grep -r "anthropic" . --include="*.json" --include="*.js" --include="*.ts" --exclude-dir=node_modules --exclude-dir=.gitRepository: get-convex/workflow
Length of output: 394
🏁 Script executed:
# View the package.json to understand project purpose
cat package.json | head -40Repository: get-convex/workflow
Length of output: 1907
🏁 Script executed:
# Check the project structure
git ls-files | head -30Repository: get-convex/workflow
Length of output: 806
🏁 Script executed:
# Look for any files that might use anthropic
find . -name "*.md" -o -name "*.js" -o -name "*.ts" | grep -v node_modules | head -20Repository: get-convex/workflow
Length of output: 666
🏁 Script executed:
cat ./example/convex/transcription.tsRepository: get-convex/workflow
Length of output: 2738
🏁 Script executed:
# Check the README to see if anthropic is documented as a dependency
cat README.md | grep -A 5 -B 5 -i anthropicRepository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Search in all files including test files
find . -type f \( -name "*.ts" -o -name "*.js" \) ! -path "*/node_modules/*" ! -path "*/.git/*" -exec grep -l "anthropic" {} \;Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check if it's mentioned in documentation
cat README.mdRepository: get-convex/workflow
Length of output: 20472
🏁 Script executed:
# Search for require() patterns with anthropic
rg "require.*anthropic|dynamic.*anthropic|lazy.*anthropic" --type ts --type jsRepository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check if it's used indirectly (transitive dependency check)
cat package.json | grep -A 20 '"dependencies"'Repository: get-convex/workflow
Length of output: 705
Remove unused @anthropic-ai/sdk dependency or clarify its purpose.
This dependency is not imported or used anywhere in the codebase. The example transcription workflow uses the OpenAI SDK instead. If this is needed for a future feature or external use case, document it; otherwise, remove it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@package.json` around lines 66 - 67, The package.json currently lists the
dependency "@anthropic-ai/sdk" which is not used anywhere; either remove the
"@anthropic-ai/sdk" entry from package.json and regenerate your lockfile (run
npm install / yarn install) to keep deps in sync, or if you intend to keep it,
add a brief comment in the repo README or a deps.md explaining its intended
future purpose and where it will be used; locate the dependency by name
("@anthropic-ai/sdk") in package.json and act accordingly.
There was a problem hiding this comment.
Fixed in a229c69 — removed the unused @anthropic-ai/sdk dependency.
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
Add executor mode: long-running actions poll a sharded task queue, replacing per-step workpool scheduling for high-throughput workflows. - Route queries, mutations, and actions through task queue when executorShards is set (deterministic shard by workflow ID) - Per-step retry with exponential backoff (queries/mutations: 4 attempts by default, actions: opt-in) - Batched result recording (50 results per mutation) - Zero-gap executor handoff before 10-min action timeout - Epoch-based lifecycle for safe restarts via startExecutors Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
91a3b55 to
ff68a69
Compare
BatchWorkpool was an alternative batch execution approach that was never released. The executor mode supersedes it entirely, so remove all references to simplify the codebase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused @anthropic-ai/sdk dependency from package.json - Use getDefaultLogger(ctx) in createHandler and completeHandler to respect user-configured log levels instead of hardcoding DEFAULT_LOG_LEVEL - Null-check normalizeId result in executor cancel path to avoid passing null into the taskQueue index query Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 7
♻️ Duplicate comments (6)
package.json (1)
66-67:⚠️ Potential issue | 🟠 MajorUnused
@anthropic-ai/sdkdependency inflates install footprint for all consumers.This is listed under
dependencies(notdevDependencies), so every consumer of@convex-dev/workflowwill transitively install the Anthropic SDK. No imports or references to this package exist in the codebase. Either remove it or move it todevDependenciesif it's only needed for examples/tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@package.json` around lines 66 - 67, Remove the unused runtime dependency "@anthropic-ai/sdk" from the package.json dependencies block (or move it to devDependencies if used only for tests/examples); search the repo for any imports/usages of "@anthropic-ai/sdk" to confirm it's unused, update package.json accordingly (transfer the entry to devDependencies if needed), then run an install and update lockfile (npm/yarn/pnpm) so consumers won't install the unused SDK transitively; ensure package.json's "dependencies" no longer contains "@anthropic-ai/sdk".src/component/taskQueue.ts (1)
1-6: UnusedMutationCtximport has been removed.The previous review flagged this. The import now correctly includes only
mutationandquery.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 1 - 6, The unused import MutationCtx was removed from the import list; ensure there are no remaining references to MutationCtx elsewhere in this file (e.g., in any function signatures or type annotations) and that the file only uses the imported symbols mutation and query; if any remaining references exist replace them with the appropriate context type or remove them, then run a type-check/build to confirm no unused-import or unresolved-type errors remain.src/component/workflow.ts (2)
452-464:normalizeIdwith!assertion can passnullon malformedstepId.This was flagged previously. If
stepId(extracted from the"executor:"prefix at line 455) is not a valid"steps"document ID,normalizeIdreturnsnulland the!forwards it into the index query, which could produce unexpected behavior.🛡️ Proposed fix — guard before querying
const stepId = (step.workId as string).slice("executor:".length); - const taskEntry = await ctx.db - .query("taskQueue") - .withIndex("by_stepId", (q) => - q.eq("stepId", ctx.db.normalizeId("steps", stepId)!), - ) - .unique(); - if (taskEntry) { - await ctx.db.delete(taskEntry._id); - } + const normalizedStepId = ctx.db.normalizeId("steps", stepId); + if (normalizedStepId) { + const taskEntry = await ctx.db + .query("taskQueue") + .withIndex("by_stepId", (q) => + q.eq("stepId", normalizedStepId), + ) + .unique(); + if (taskEntry) { + await ctx.db.delete(taskEntry._id); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 452 - 464, The code currently calls ctx.db.normalizeId("steps", stepId)! and can pass null into the taskQueue index query; update the executor-prefixed branch (where step.workId and stepId are derived) to first call const normalizedStepId = ctx.db.normalizeId("steps", stepId) and guard it (if falsy, skip the DB query/cleanup), then use normalizedStepId in the .withIndex("by_stepId", q => q.eq("stepId", normalizedStepId)) call and proceed to delete the taskEntry only when found; this prevents passing null into the index query and avoids querying with malformed IDs.
56-58:createHandlerstill uses hardcoded log level instead of user-configured one.This was flagged previously:
createLogger(DEFAULT_LOG_LEVEL)bypasses the user-configured log level stored in the database. BothgetStatus(line 97) andcleanup(line 522) correctly usegetDefaultLogger(ctx). The same should apply here and incompleteHandler(line 426).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 56 - 58, The createHandler function (and similarly completeHandler) currently constructs a logger with createLogger(DEFAULT_LOG_LEVEL) which ignores the user-configured log level; replace those calls so they obtain the logger via getDefaultLogger(ctx) instead (e.g., change const console = createLogger(DEFAULT_LOG_LEVEL) to const console = getDefaultLogger(ctx)), ensuring ctx is passed into the function scope where used; update both createHandler and completeHandler to use getDefaultLogger(ctx) consistently like getStatus and cleanup do.src/client/index.ts (2)
328-332: Past review finding has been addressed —maxAttemptscan no longer be 0.
Math.max(task.retry?.maxAttempts ?? 1, 1)ensures the retry loop always executes at least once, soresultis always defined before the push on line 372.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 328 - 332, The Math.max(task.retry?.maxAttempts ?? 1, 1) in processTask already guarantees maxAttempts is at least 1 so the retry loop will run and result will be defined before the push; remove any redundant guard or comment that handled a zero maxAttempts case (or mark that previous check as resolved) and keep the current definition of maxAttempts, initialBackoffMs and base in the processTask function.
285-304: Past review finding has been addressed — replay failure is now logged.The previous review flagged that silently dropping replays after 3 retries could leave workflows stuck. The
replaySucceededflag andconsole.errorat lines 300-303 now surface the failure. Good improvement.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 285 - 304, Replace the plain console.error and the silent catch with structured logging and error capture: inside the retry loop around ctx.runMutation (calling component.taskQueue.replayIfReady) capture the thrown error (e.g., lastError) in the catch block, and after the loop when replaySucceeded is false call the project's logger (not console) — e.g., ctx.logger.error or the available component/process logger — and log a message including c.workflowId and c.generationNumber plus the lastError details so failures are recorded with context.
🧹 Nitpick comments (9)
src/component/coordinator.ts (1)
46-62: Coordinator always reschedules itself even when fewer thanCOORDINATOR_READ_BATCHworkflows were found.When
ready.length < COORDINATOR_READ_BATCH, all ready workflows have been claimed in this pass, so the next invocation will find 0 and exit. This wastes one scheduler invocation per drain cycle. Consider only rescheduling whenready.length === COORDINATOR_READ_BATCH.♻️ Suggested optimization
- // Reschedule self to pick up more - await ctx.scheduler.runAfter(0, internal.coordinator.coordinator); + // Reschedule self only if there might be more + if (ready.length === COORDINATOR_READ_BATCH) { + await ctx.scheduler.runAfter(0, internal.coordinator.coordinator); + } else { + const state = await ctx.db.query("coordinatorState").first(); + if (state) { + await ctx.db.patch(state._id, { scheduled: false }); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/coordinator.ts` around lines 46 - 62, The coordinator currently always reschedules itself at the end of the loop (the call to ctx.scheduler.runAfter(0, internal.coordinator.coordinator)), which wastes one scheduler invocation when fewer than COORDINATOR_READ_BATCH workflows were returned; change the logic in the coordinator function to only call ctx.scheduler.runAfter(..., internal.coordinator.coordinator) when ready.length === COORDINATOR_READ_BATCH (i.e., a full batch was claimed), leaving out the reschedule when fewer items were found so the scheduler isn’t invoked unnecessarily.src/component/pool.ts (1)
291-321: Duplicated error-handling pattern — consider extracting a shared helper.The try/catch +
ctx.db.patch(workflowId, { runResult: { kind: "failed", error } })pattern is repeated inonCompleteHandler(Line 206),directRunWorkflow(Line 312), andrunWorkflowBatchincoordinator.ts(Line 92). A smallrunWorkflowSafe(ctx, workflow, generationNumber)helper would reduce duplication.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/pool.ts` around lines 291 - 321, Extract the repeated try/catch + db.patch failure-handling into a new helper function (e.g., runWorkflowSafe) and replace the duplicated blocks in directRunWorkflow, onCompleteHandler, and runWorkflowBatch to call it; runWorkflowSafe(ctx, workflow, generationNumber) should invoke ctx.runMutation(workflow.workflowHandle as FunctionHandle<"mutation">, { workflowId: workflow.id, generationNumber }), catch any error, normalize the message (e instanceof Error ? e.message : String(e)), log it (reuse createLogger(DEFAULT_LOG_LEVEL) or accept a logger param), and persist the failure with ctx.db.patch(workflow.id, { runResult: { kind: "failed", error } }); ensure the helper returns success/failure so callers can act accordingly.src/component/taskQueue.ts (2)
346-378:diagnosedefaults to 40 shards — should match the actual configured shard count.Line 356 uses
numShards ?? 40as the default. If the actual deployment uses a different shard count, this diagnostic will either miss shards or scan non-existent ones. Consider requiring the argument or reading the shard count from configuration.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 346 - 378, The diagnose handler uses a hardcoded default shards = numShards ?? 40 which can mismatch deployment; update diagnose (handler) to not assume 40 — either make numShards a required arg or read the real shard count from the application's configuration/source (e.g., use the configured shardCount value from ctx.settings or config service) and assign shards = numShards ?? config.shardCount, and remove the magic number 40; ensure the function names/variables referenced are diagnose, numShards, and shards so reviewers can locate and verify the change.
296-303: Replay candidates intentionally skip the in-progress step check — document this for maintainers.Line 297 only checks
workflow.runResult === undefinedbut doesn't verify whether other in-progress steps remain. This is correct by design (the check is deferred toreplayIfReadyto minimize the OCC conflict surface of this batch mutation), but a brief inline comment explaining why would help future maintainers avoid "fixing" it.📝 Suggested comment
if (workflow.runResult === undefined) { + // Don't check inProgress here — that's done in replayIfReady + // to keep this batch mutation's read set small (less OCC conflict). candidates.set(workflowId, {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 296 - 303, Add a brief inline comment next to the condition that populates candidates (the block checking workflow.runResult === undefined and setting candidates.set(workflowId, { workflowId, workflowHandle: workflow.workflowHandle, generationNumber: workflow.generationNumber })) explaining that we intentionally do not check for other in-progress steps here because that verification is deferred to replayIfReady to minimize the OCC/conflict surface for this batch mutation; reference replayIfReady and the candidates map in the comment so future maintainers understand the design choice and where the in-progress-step check occurs.src/client/index.ts (3)
133-163:action()stores handlers by name but doesn't prevent duplicates.Calling
action("myAction", ...)twice with the same name silently overwrites the first handler inexecutorActionHandlersand is a no-op inbatchActionNames(Set). Consider throwing on duplicate names to catch copy-paste errors early.🔧 Proposed guard
this.executorActionHandlers.set(name, opts.handler); + if (this.executorActionHandlers.has(name)) { + throw new Error(`Duplicate batch action name: "${name}"`); + } + this.executorActionHandlers.set(name, opts.handler); this.batchActionNames.add(name);(Move the
hascheck before theset.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 133 - 163, The action() method in WorkflowManager currently overwrites existing handlers when called with the same name; add a duplicate-name guard by checking executorActionHandlers.has(name) (or batchActionNames.has(name)) before calling executorActionHandlers.set and batchActionNames.add, and throw a clear Error (e.g., "action 'X' already registered") to prevent silent overwrites; keep the rest of the logic (storing handler and returning internalActionGeneric) unchanged.
461-462: Handoff cleanup errors are silently swallowed, potentially leaving stale handoff docs.The
.catch(() => {})on the handoff cleanup promise means if the"clear"mutation fails, theexecutorHandoffdocument for this shard will persist indefinitely. The next executor will handle it (thegetHandoff/ cleanup logic at line 434 covers stale docs), so it's not a hard bug, but a log would help diagnose slow handoffs.🔧 Suggested improvement
- })().catch(() => {}); + })().catch((e) => { + console.error(`Handoff cleanup failed for shard ${shard}:`, e); + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 461 - 462, The silent .catch on the handoff cleanup IIFE swallows errors when performing the "clear" mutation for the executorHandoff shard; change the catch to log the error so failed cleanup is visible (use the same logger used elsewhere, e.g., processLogger or a relevant logger in this module) and include the shard/executorHandoff id and error details in the message; look for the anonymous IIFE that performs the "clear" mutation for executorHandoff and the related getHandoff cleanup logic to add the logged .catch.
197-219: Epoch check withepoch === undefinedallows stale executors to run indefinitely.When
executor()is called without anepochargument (line 200:epoch?: number),epochisundefined. IncheckEpoch(line 218),undefined === currentEpochwill always befalsewhencurrentEpoch !== 0, so the executor would terminate after the first epoch check. But whencurrentEpoch === 0(no epoch row exists),checkEpochreturnstrue(line 217), bypassing the check entirely.The implicit contract seems to be that executors launched without an explicit epoch are "legacy" and should run freely when no epoch is tracked. However, once
startExecutorsis called and sets epoch ≥ 1, these legacy executors will be correctly stopped. This is fine as long as the intent is clear — consider documenting this edge case or defaultingepochto0in the args destructuring to make the behavior explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 197 - 219, The epoch argument can be undefined and that makes the intent unclear; default it to 0 so legacy executors are explicit. In the internalActionGeneric handler (the handler in internalActionGeneric), change the args destructuring to set epoch = 0 (e.g., const { shard, epoch = 0 } = args) so checkEpoch sees 0 for legacy runs and behavior is explicit; keep the checkEpoch function as-is (it will treat currentEpoch === 0 as allowing legacy executors).src/component/workflow.ts (2)
360-365:as anycast on the return bypasses the return validator.The
pagearray structure is different from whatpaginatororiginally returned, so the generic type doesn't match. Theas anycast at line 364 silences TypeScript but means the return validator is the only runtime safety net. The validator looks correct, so this works in practice.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 360 - 365, The return uses an unsafe "as any" cast in the function that returns {...result, isDone: result.isDone || hitOld, page} which hides a type mismatch between the current page array shape and the paginator's generic; remove the "as any" cast and fix the types: either change the function's return generic (or the paginator's generic) to match the new page element type, or map/transform the page array into the original expected shape before returning, and update the function signature (and any related type/interface) so TypeScript enforces the validator at compile time (refer to the return expression and the "page" property and adjust the surrounding function/method signature and paginator generic accordingly).
328-358:timelinePageperforms N+1 step queries per workflow on the page.Each workflow in the filtered page triggers a separate
ctx.db.query("steps")call (line 330-333). With the default pagination size, this could mean dozens of sub-queries per page. ThePromise.allparallelism helps, but in Convex queries are deterministic and execute sequentially within the transaction —Promise.alldoesn't parallelize database reads here.This is acceptable for small page sizes but worth noting as a performance concern if pages grow.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 328 - 358, The current page build does an N+1 query by calling ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId", wf._id")) for each workflow inside the Promise.all; instead, collect all workflow IDs from filtered, perform a single ctx.db.query("steps") with an index filter that selects all workflowIds (use the index with an "in"/batch filter) to fetch all step documents at once, then group the returned step rows by workflowId and map those grouped results to each workflow when constructing the page (update the code that builds page, stepDocs, and the step mapping to use the grouped map rather than per-workflow queries).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/client/index.ts`:
- Around line 407-420: The loop that gives up after reaching MAX_FLUSH_RETRIES
silently discards pendingResults by setting pendingResults.length = 0; before
clearing them, emit a warning log that includes the number of dropped items
(pendingResults.length), the retries count (retries) and that MAX_FLUSH_RETRIES
was hit; implement the log using the module's existing logger (e.g., logger or
processLogger) or console.warn if none is available, then clear pendingResults
as before to preserve behavior in the flush() / retries loop.
In `@src/client/step.ts`:
- Around line 191-205: The current base-name-only matching extracts only the
last segment from safeFunctionName(target.function) and may collide across
modules; update the logic in the block that references safeFunctionName,
batchActionNames, and batchActionName so it matches against the full
safeFunctionName (or otherwise include module/path namespace) instead of just
baseName, or alternatively convert/expect batchActionNames to contain
fully-qualified names and compare against fnName directly; if you choose to keep
base-name matching, add a brief doc comment above this block documenting the
uniqueness requirement for action names.
In `@src/component/coordinator.ts`:
- Around line 65-103: The runWorkflowBatch handler currently calls
ctx.runMutation(workflowHandle, ...) for each item inside the same transaction
(runWorkflowBatch), which risks rolling back the whole batch on transaction-size
or OCC failures; change the loop so each workflow is scheduled as a separate
(isolated) mutation using ctx.scheduler.runAfter(0, ...) rather than invoking
ctx.runMutation inline — mirror the approach used by enqueueWorkflow in pool.ts:
for each {workflowId, generationNumber, workflowHandle} call
ctx.scheduler.runAfter(0, { mutation: workflowHandle, args: { workflowId,
generationNumber } }) (or schedule a small wrapper mutation) and remove the
direct ctx.runMutation calls so each workflow replay runs in its own
transaction.
In `@src/component/journal.ts`:
- Around line 240-252: The executor-sharded branch (inside the if checking
stepArgs.batchActionName && workflow.executorShards) currently inserts an action
task without any retry configuration, ignoring any user-provided stepArgs.retry;
update the insert call (the object passed to ctx.db.insert in this branch) to
include a retry field when the user explicitly provides one (e.g., use
stepArgs.retry if present) while leaving retry absent otherwise to preserve the
"no retry unless opted-in" behavior; ensure this mirrors how query/mutation
paths use retryConfig and that stepArgs.retry is respected before assigning
workId = `executor:${stepId}`.
In `@src/component/taskQueue.ts`:
- Around line 47-165: recordResult and recordResultBatch duplicate the same
journal lookup, validation, update, event logging and task deletion logic;
extract that shared flow into a helper (e.g., processStepResult) that takes
(ctx, console, stepId, result, generationNumber) and performs: journal/ workflow
lookup and validation, mark step.inProgress=false/ completedAt, set
step.runResult, replace the journal entry, emit the console.event, and delete
the queue task (using the existing deleteTask logic), returning the workflow id
and workflowHandle (or null on early return). Replace the duplicated blocks in
recordResult (handler) and recordResultBatch with calls to processStepResult;
keep the replay logic (ctx.runMutation and error patching) in recordResult and
have recordResultBatch loop over results and accumulate candidate workflows to
replay.
- Around line 167-195: startExecutors increments the epoch but doesn't validate
numShards, so numShards <= 0 will increment epoch and schedule no executors; add
validation at the start of the mutation handler to ensure numShards is a
positive integer (e.g., throw a descriptive error or return a failure) before
touching the DB or epoch; refer to the startExecutors handler, the numShards
param, and ctx.scheduler.runAfter/epoch logic so you validate and short-circuit
early (reject non-integers, zero, or negatives) to avoid advancing epoch without
scheduling executors.
In `@src/component/workflow.ts`:
- Around line 266-287: creationTimeBuckets currently iterates all matching
workflows in a single query and can hit Convex read limits; change the handler
in creationTimeBuckets to paginate like countByNamePage: run the same
ctx.db.query("workflows").withIndex("name", q => q.eq("name",
name)).order("desc") but fetch results in fixed-size pages (e.g., pageSize),
loop with the query pagination API (cursor/next) until no more pages or until
wf._creationTime < createdAfter, aggregate counts into the existing buckets Map
per page, then return the sorted entries as before; if you cannot implement
paging now, add a clear comment in the creationTimeBuckets handler referencing
countByNamePage and the 16MB read-limit constraint.
---
Duplicate comments:
In `@package.json`:
- Around line 66-67: Remove the unused runtime dependency "@anthropic-ai/sdk"
from the package.json dependencies block (or move it to devDependencies if used
only for tests/examples); search the repo for any imports/usages of
"@anthropic-ai/sdk" to confirm it's unused, update package.json accordingly
(transfer the entry to devDependencies if needed), then run an install and
update lockfile (npm/yarn/pnpm) so consumers won't install the unused SDK
transitively; ensure package.json's "dependencies" no longer contains
"@anthropic-ai/sdk".
In `@src/client/index.ts`:
- Around line 328-332: The Math.max(task.retry?.maxAttempts ?? 1, 1) in
processTask already guarantees maxAttempts is at least 1 so the retry loop will
run and result will be defined before the push; remove any redundant guard or
comment that handled a zero maxAttempts case (or mark that previous check as
resolved) and keep the current definition of maxAttempts, initialBackoffMs and
base in the processTask function.
- Around line 285-304: Replace the plain console.error and the silent catch with
structured logging and error capture: inside the retry loop around
ctx.runMutation (calling component.taskQueue.replayIfReady) capture the thrown
error (e.g., lastError) in the catch block, and after the loop when
replaySucceeded is false call the project's logger (not console) — e.g.,
ctx.logger.error or the available component/process logger — and log a message
including c.workflowId and c.generationNumber plus the lastError details so
failures are recorded with context.
In `@src/component/taskQueue.ts`:
- Around line 1-6: The unused import MutationCtx was removed from the import
list; ensure there are no remaining references to MutationCtx elsewhere in this
file (e.g., in any function signatures or type annotations) and that the file
only uses the imported symbols mutation and query; if any remaining references
exist replace them with the appropriate context type or remove them, then run a
type-check/build to confirm no unused-import or unresolved-type errors remain.
In `@src/component/workflow.ts`:
- Around line 452-464: The code currently calls ctx.db.normalizeId("steps",
stepId)! and can pass null into the taskQueue index query; update the
executor-prefixed branch (where step.workId and stepId are derived) to first
call const normalizedStepId = ctx.db.normalizeId("steps", stepId) and guard it
(if falsy, skip the DB query/cleanup), then use normalizedStepId in the
.withIndex("by_stepId", q => q.eq("stepId", normalizedStepId)) call and proceed
to delete the taskEntry only when found; this prevents passing null into the
index query and avoids querying with malformed IDs.
- Around line 56-58: The createHandler function (and similarly completeHandler)
currently constructs a logger with createLogger(DEFAULT_LOG_LEVEL) which ignores
the user-configured log level; replace those calls so they obtain the logger via
getDefaultLogger(ctx) instead (e.g., change const console =
createLogger(DEFAULT_LOG_LEVEL) to const console = getDefaultLogger(ctx)),
ensuring ctx is passed into the function scope where used; update both
createHandler and completeHandler to use getDefaultLogger(ctx) consistently like
getStatus and cleanup do.
---
Nitpick comments:
In `@src/client/index.ts`:
- Around line 133-163: The action() method in WorkflowManager currently
overwrites existing handlers when called with the same name; add a
duplicate-name guard by checking executorActionHandlers.has(name) (or
batchActionNames.has(name)) before calling executorActionHandlers.set and
batchActionNames.add, and throw a clear Error (e.g., "action 'X' already
registered") to prevent silent overwrites; keep the rest of the logic (storing
handler and returning internalActionGeneric) unchanged.
- Around line 461-462: The silent .catch on the handoff cleanup IIFE swallows
errors when performing the "clear" mutation for the executorHandoff shard;
change the catch to log the error so failed cleanup is visible (use the same
logger used elsewhere, e.g., processLogger or a relevant logger in this module)
and include the shard/executorHandoff id and error details in the message; look
for the anonymous IIFE that performs the "clear" mutation for executorHandoff
and the related getHandoff cleanup logic to add the logged .catch.
- Around line 197-219: The epoch argument can be undefined and that makes the
intent unclear; default it to 0 so legacy executors are explicit. In the
internalActionGeneric handler (the handler in internalActionGeneric), change the
args destructuring to set epoch = 0 (e.g., const { shard, epoch = 0 } = args) so
checkEpoch sees 0 for legacy runs and behavior is explicit; keep the checkEpoch
function as-is (it will treat currentEpoch === 0 as allowing legacy executors).
In `@src/component/coordinator.ts`:
- Around line 46-62: The coordinator currently always reschedules itself at the
end of the loop (the call to ctx.scheduler.runAfter(0,
internal.coordinator.coordinator)), which wastes one scheduler invocation when
fewer than COORDINATOR_READ_BATCH workflows were returned; change the logic in
the coordinator function to only call ctx.scheduler.runAfter(...,
internal.coordinator.coordinator) when ready.length === COORDINATOR_READ_BATCH
(i.e., a full batch was claimed), leaving out the reschedule when fewer items
were found so the scheduler isn’t invoked unnecessarily.
In `@src/component/pool.ts`:
- Around line 291-321: Extract the repeated try/catch + db.patch
failure-handling into a new helper function (e.g., runWorkflowSafe) and replace
the duplicated blocks in directRunWorkflow, onCompleteHandler, and
runWorkflowBatch to call it; runWorkflowSafe(ctx, workflow, generationNumber)
should invoke ctx.runMutation(workflow.workflowHandle as
FunctionHandle<"mutation">, { workflowId: workflow.id, generationNumber }),
catch any error, normalize the message (e instanceof Error ? e.message :
String(e)), log it (reuse createLogger(DEFAULT_LOG_LEVEL) or accept a logger
param), and persist the failure with ctx.db.patch(workflow.id, { runResult: {
kind: "failed", error } }); ensure the helper returns success/failure so callers
can act accordingly.
In `@src/component/taskQueue.ts`:
- Around line 346-378: The diagnose handler uses a hardcoded default shards =
numShards ?? 40 which can mismatch deployment; update diagnose (handler) to not
assume 40 — either make numShards a required arg or read the real shard count
from the application's configuration/source (e.g., use the configured shardCount
value from ctx.settings or config service) and assign shards = numShards ??
config.shardCount, and remove the magic number 40; ensure the function
names/variables referenced are diagnose, numShards, and shards so reviewers can
locate and verify the change.
- Around line 296-303: Add a brief inline comment next to the condition that
populates candidates (the block checking workflow.runResult === undefined and
setting candidates.set(workflowId, { workflowId, workflowHandle:
workflow.workflowHandle, generationNumber: workflow.generationNumber }))
explaining that we intentionally do not check for other in-progress steps here
because that verification is deferred to replayIfReady to minimize the
OCC/conflict surface for this batch mutation; reference replayIfReady and the
candidates map in the comment so future maintainers understand the design choice
and where the in-progress-step check occurs.
In `@src/component/workflow.ts`:
- Around line 360-365: The return uses an unsafe "as any" cast in the function
that returns {...result, isDone: result.isDone || hitOld, page} which hides a
type mismatch between the current page array shape and the paginator's generic;
remove the "as any" cast and fix the types: either change the function's return
generic (or the paginator's generic) to match the new page element type, or
map/transform the page array into the original expected shape before returning,
and update the function signature (and any related type/interface) so TypeScript
enforces the validator at compile time (refer to the return expression and the
"page" property and adjust the surrounding function/method signature and
paginator generic accordingly).
- Around line 328-358: The current page build does an N+1 query by calling
ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId", wf._id"))
for each workflow inside the Promise.all; instead, collect all workflow IDs from
filtered, perform a single ctx.db.query("steps") with an index filter that
selects all workflowIds (use the index with an "in"/batch filter) to fetch all
step documents at once, then group the returned step rows by workflowId and map
those grouped results to each workflow when constructing the page (update the
code that builds page, stepDocs, and the step mapping to use the grouped map
rather than per-workflow queries).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
package-lock.jsonis excluded by!**/package-lock.jsonsrc/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**src/component/_generated/server.tsis excluded by!**/_generated/**
📒 Files selected for processing (11)
package.jsonsrc/client/index.tssrc/client/step.tssrc/client/workflowMutation.tssrc/component/coordinator.tssrc/component/event.tssrc/component/journal.tssrc/component/pool.tssrc/component/schema.tssrc/component/taskQueue.tssrc/component/workflow.ts
| let retries = 0; | ||
| while (pendingResults.length > 0) { | ||
| await flush(); | ||
| if (pendingResults.length > 0) { | ||
| retries++; | ||
| if (retries >= MAX_FLUSH_RETRIES) { | ||
| // Give up on remaining items — tasks stay in queue, | ||
| // next executor run will re-process them. | ||
| pendingResults.length = 0; | ||
| break; | ||
| } | ||
| await new Promise((r) => setTimeout(r, 200)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Dropped results after MAX_FLUSH_RETRIES are silently discarded — add a warning log.
When pendingResults.length = 0 is reached at line 415, results for in-flight tasks are permanently lost from this executor run. The comment says tasks stay in the queue for re-processing, which is correct for idempotent queries/mutations, but for actions the duplicate execution may have side effects. At minimum, log the count of dropped items to aid debugging.
🛡️ Proposed fix
if (retries >= MAX_FLUSH_RETRIES) {
// Give up on remaining items — tasks stay in queue,
// next executor run will re-process them.
+ console.error(
+ `Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`,
+ );
pendingResults.length = 0;
break;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let retries = 0; | |
| while (pendingResults.length > 0) { | |
| await flush(); | |
| if (pendingResults.length > 0) { | |
| retries++; | |
| if (retries >= MAX_FLUSH_RETRIES) { | |
| // Give up on remaining items — tasks stay in queue, | |
| // next executor run will re-process them. | |
| pendingResults.length = 0; | |
| break; | |
| } | |
| await new Promise((r) => setTimeout(r, 200)); | |
| } | |
| } | |
| let retries = 0; | |
| while (pendingResults.length > 0) { | |
| await flush(); | |
| if (pendingResults.length > 0) { | |
| retries++; | |
| if (retries >= MAX_FLUSH_RETRIES) { | |
| // Give up on remaining items — tasks stay in queue, | |
| // next executor run will re-process them. | |
| console.error( | |
| `Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`, | |
| ); | |
| pendingResults.length = 0; | |
| break; | |
| } | |
| await new Promise((r) => setTimeout(r, 200)); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/client/index.ts` around lines 407 - 420, The loop that gives up after
reaching MAX_FLUSH_RETRIES silently discards pendingResults by setting
pendingResults.length = 0; before clearing them, emit a warning log that
includes the number of dropped items (pendingResults.length), the retries count
(retries) and that MAX_FLUSH_RETRIES was hit; implement the log using the
module's existing logger (e.g., logger or processLogger) or console.warn if none
is available, then clear pendingResults as before to preserve behavior in the
flush() / retries loop.
| // Detect batch actions by checking if the function name's base matches | ||
| // a registered batch action name. | ||
| let batchActionName: string | undefined; | ||
| if ( | ||
| this.batchActionNames && | ||
| target.kind === "function" && | ||
| target.functionType === "action" | ||
| ) { | ||
| const fnName = safeFunctionName(target.function); | ||
| const parts = fnName.split(/[:/]/); | ||
| const baseName = parts[parts.length - 1]; | ||
| if (this.batchActionNames.has(baseName)) { | ||
| batchActionName = baseName; | ||
| } | ||
| } |
There was a problem hiding this comment.
Base-name-only matching could collide across modules.
safeFunctionName may return paths like "convex/actions:doWork". Taking only the last segment after splitting on [:/] means two different modules both exporting a doWork action would match the same batchActionNames entry. If this is intentional (users must ensure uniqueness), a brief doc comment would help.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/client/step.ts` around lines 191 - 205, The current base-name-only
matching extracts only the last segment from safeFunctionName(target.function)
and may collide across modules; update the logic in the block that references
safeFunctionName, batchActionNames, and batchActionName so it matches against
the full safeFunctionName (or otherwise include module/path namespace) instead
of just baseName, or alternatively convert/expect batchActionNames to contain
fully-qualified names and compare against fnName directly; if you choose to keep
base-name matching, add a brief doc comment above this block documenting the
uniqueness requirement for action names.
| export const runWorkflowBatch = internalMutation({ | ||
| args: { | ||
| items: v.array( | ||
| v.object({ | ||
| workflowId: v.id("workflows"), | ||
| generationNumber: v.number(), | ||
| workflowHandle: v.string(), | ||
| }), | ||
| ), | ||
| }, | ||
| returns: v.null(), | ||
| handler: async (ctx, { items }) => { | ||
| const console = createLogger(DEFAULT_LOG_LEVEL); | ||
| for (const { workflowId, generationNumber, workflowHandle } of items) { | ||
| const workflow = await ctx.db.get(workflowId); | ||
| if ( | ||
| !workflow || | ||
| workflow.runResult || | ||
| workflow.generationNumber !== generationNumber | ||
| ) { | ||
| continue; | ||
| } | ||
| try { | ||
| await ctx.runMutation( | ||
| workflowHandle as FunctionHandle<"mutation">, | ||
| { workflowId, generationNumber }, | ||
| ); | ||
| } catch (e) { | ||
| const error = | ||
| e instanceof Error ? e.message : `Unknown error: ${String(e)}`; | ||
| console.error( | ||
| `Error running workflow ${workflowId}: ${error}`, | ||
| ); | ||
| await ctx.db.patch(workflowId, { | ||
| runResult: { kind: "failed", error }, | ||
| }); | ||
| } | ||
| } | ||
| }, |
There was a problem hiding this comment.
Running up to 50 workflow mutations in a single transaction is a blast-radius risk.
Each ctx.runMutation(workflowHandle, ...) runs a full workflow replay within the same outer transaction. If the accumulated writes exceed Convex's transaction size limit, or an OCC conflict occurs on any touched document, the entire batch of up to 50 workflows rolls back — even the ones that completed successfully before the failure. Per-item try/catch only handles exceptions from individual mutations, not transaction-level failures.
Consider scheduling each item (or smaller sub-batches) as a separate mutation via ctx.scheduler.runAfter(0, ...) to isolate failures, similar to how enqueueWorkflow in pool.ts works.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/coordinator.ts` around lines 65 - 103, The runWorkflowBatch
handler currently calls ctx.runMutation(workflowHandle, ...) for each item
inside the same transaction (runWorkflowBatch), which risks rolling back the
whole batch on transaction-size or OCC failures; change the loop so each
workflow is scheduled as a separate (isolated) mutation using
ctx.scheduler.runAfter(0, ...) rather than invoking ctx.runMutation inline —
mirror the approach used by enqueueWorkflow in pool.ts: for each {workflowId,
generationNumber, workflowHandle} call ctx.scheduler.runAfter(0, { mutation:
workflowHandle, args: { workflowId, generationNumber } }) (or schedule a small
wrapper mutation) and remove the direct ctx.runMutation calls so each workflow
replay runs in its own transaction.
| if (stepArgs.batchActionName && workflow.executorShards) { | ||
| // Route through sharded task queue for executor-driven execution. | ||
| const shard = shardForWorkflow(workflow._id as string, workflow.executorShards); | ||
| await ctx.db.insert("taskQueue", { | ||
| shard, | ||
| functionType: "action", | ||
| handle: stepArgs.batchActionName, | ||
| args: step.args, | ||
| stepId, | ||
| workflowId: workflow._id, | ||
| generationNumber, | ||
| }); | ||
| workId = `executor:${stepId}`; |
There was a problem hiding this comment.
Action tasks inserted without retry config — confirm this is intentional.
Query and mutation paths set retry: retryConfig, but action tasks routed to the executor omit the retry field entirely. The PR description says "actions no retry unless opted-in," but there's no mechanism here for opting in — a user-provided stepArgs.retry object is silently ignored for executor-routed actions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/journal.ts` around lines 240 - 252, The executor-sharded branch
(inside the if checking stepArgs.batchActionName && workflow.executorShards)
currently inserts an action task without any retry configuration, ignoring any
user-provided stepArgs.retry; update the insert call (the object passed to
ctx.db.insert in this branch) to include a retry field when the user explicitly
provides one (e.g., use stepArgs.retry if present) while leaving retry absent
otherwise to preserve the "no retry unless opted-in" behavior; ensure this
mirrors how query/mutation paths use retryConfig and that stepArgs.retry is
respected before assigning workId = `executor:${stepId}`.
| export const recordResult = mutation({ | ||
| args: { | ||
| stepId: v.id("steps"), | ||
| result: vResultValidator, | ||
| generationNumber: v.number(), | ||
| }, | ||
| returns: v.null(), | ||
| handler: async (ctx, { stepId, result, generationNumber }) => { | ||
| const console = createLogger(DEFAULT_LOG_LEVEL); | ||
|
|
||
| // Helper: always delete the task from the queue, even on early return. | ||
| // Stale/orphaned tasks must not block the shard permanently. | ||
| const deleteTask = async () => { | ||
| const taskEntry = await ctx.db | ||
| .query("taskQueue") | ||
| .withIndex("by_stepId", (q) => q.eq("stepId", stepId)) | ||
| .unique(); | ||
| if (taskEntry) { | ||
| await ctx.db.delete(taskEntry._id); | ||
| } | ||
| }; | ||
|
|
||
| // 1. Record the step result (same logic as pool.ts:onCompleteHandler) | ||
| const journalEntry = await ctx.db.get(stepId); | ||
| if (!journalEntry) { | ||
| console.error(`Journal entry not found: ${stepId}`); | ||
| await deleteTask(); | ||
| return null; | ||
| } | ||
| const workflowId = journalEntry.workflowId; | ||
| const workflow = await ctx.db.get(workflowId); | ||
| if (!workflow) { | ||
| console.error(`Workflow not found: ${workflowId}`); | ||
| await deleteTask(); | ||
| return null; | ||
| } | ||
| if (workflow.generationNumber !== generationNumber) { | ||
| console.error( | ||
| `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, | ||
| ); | ||
| await deleteTask(); | ||
| return null; | ||
| } | ||
| if (!journalEntry.step.inProgress) { | ||
| console.error( | ||
| `Step finished but journal entry not in progress: ${stepId}`, | ||
| ); | ||
| await deleteTask(); | ||
| return null; | ||
| } | ||
|
|
||
| journalEntry.step.inProgress = false; | ||
| journalEntry.step.completedAt = Date.now(); | ||
| switch (result.kind) { | ||
| case "success": | ||
| journalEntry.step.runResult = { | ||
| kind: "success", | ||
| returnValue: result.returnValue, | ||
| }; | ||
| break; | ||
| case "failed": | ||
| journalEntry.step.runResult = { | ||
| kind: "failed", | ||
| error: result.error, | ||
| }; | ||
| break; | ||
| case "canceled": | ||
| journalEntry.step.runResult = { | ||
| kind: "canceled", | ||
| }; | ||
| break; | ||
| } | ||
| await ctx.db.replace(journalEntry._id, journalEntry); | ||
|
|
||
| console.event("stepCompleted", { | ||
| workflowId, | ||
| workflowName: workflow.name, | ||
| status: result.kind, | ||
| stepName: journalEntry.step.name, | ||
| stepNumber: journalEntry.stepNumber, | ||
| durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, | ||
| }); | ||
|
|
||
| // 2. Delete the task from the queue (atomically with result recording) | ||
| await deleteTask(); | ||
|
|
||
| if (workflow.runResult !== undefined) { | ||
| return null; | ||
| } | ||
|
|
||
| // 3. Check if this was the last in-progress step → replay workflow inline | ||
| const otherInProgress = await ctx.db | ||
| .query("steps") | ||
| .withIndex("inProgress", (q) => | ||
| q.eq("step.inProgress", true).eq("workflowId", workflowId), | ||
| ) | ||
| .first(); | ||
| if (!otherInProgress) { | ||
| try { | ||
| await ctx.runMutation( | ||
| workflow.workflowHandle as FunctionHandle<"mutation">, | ||
| { | ||
| workflowId: workflow._id, | ||
| generationNumber: workflow.generationNumber, | ||
| }, | ||
| ); | ||
| } catch (e) { | ||
| const error = | ||
| e instanceof Error ? e.message : `Unknown error: ${String(e)}`; | ||
| console.error(`Error running workflow ${workflowId}: ${error}`); | ||
| await ctx.db.patch(workflowId, { | ||
| runResult: { kind: "failed", error }, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Significant code duplication between recordResult and recordResultBatch.
The journal lookup, validation, result recording, event logging, and task deletion logic (lines 70–131 vs. 243–295) is nearly identical. If a bug is fixed in one path, it needs to be mirrored in the other.
Consider extracting a shared helper, e.g.:
async function processStepResult(
ctx: MutationCtx,
console: Logger,
stepId: Id<"steps">,
result: RunResult,
generationNumber: number,
): Promise<{ workflowId: Id<"workflows">; workflowHandle: string } | null> {
// shared validation, journal update, event logging, task deletion
}Then recordResult calls it + does inline replay, and recordResultBatch calls it in a loop + accumulates candidates.
Also applies to: 214-308
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/taskQueue.ts` around lines 47 - 165, recordResult and
recordResultBatch duplicate the same journal lookup, validation, update, event
logging and task deletion logic; extract that shared flow into a helper (e.g.,
processStepResult) that takes (ctx, console, stepId, result, generationNumber)
and performs: journal/ workflow lookup and validation, mark
step.inProgress=false/ completedAt, set step.runResult, replace the journal
entry, emit the console.event, and delete the queue task (using the existing
deleteTask logic), returning the workflow id and workflowHandle (or null on
early return). Replace the duplicated blocks in recordResult (handler) and
recordResultBatch with calls to processStepResult; keep the replay logic
(ctx.runMutation and error patching) in recordResult and have recordResultBatch
loop over results and accumulate candidate workflows to replay.
| export const startExecutors = mutation({ | ||
| args: { | ||
| executorHandle: v.string(), | ||
| numShards: v.number(), | ||
| }, | ||
| returns: v.number(), // returns the new epoch | ||
| handler: async (ctx, { executorHandle, numShards }) => { | ||
| // Increment executor epoch — old executors with stale epochs will terminate. | ||
| const existing = await ctx.db | ||
| .query("executorEpoch") | ||
| .first(); | ||
| let epoch: number; | ||
| if (existing) { | ||
| epoch = existing.epoch + 1; | ||
| await ctx.db.patch(existing._id, { epoch }); | ||
| } else { | ||
| epoch = 1; | ||
| await ctx.db.insert("executorEpoch", { epoch }); | ||
| } | ||
| for (let i = 0; i < numShards; i++) { | ||
| await ctx.scheduler.runAfter( | ||
| 0, | ||
| executorHandle as FunctionHandle<"action">, | ||
| { shard: i, epoch }, | ||
| ); | ||
| } | ||
| return epoch; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
startExecutors doesn't validate numShards — zero or negative values produce no executors.
If numShards is 0 or negative, the for loop at line 186 never executes and no executors are scheduled. The epoch is still incremented, which would terminate any existing executors without replacement.
🛡️ Proposed guard
handler: async (ctx, { executorHandle, numShards }) => {
+ if (numShards < 1) {
+ throw new Error(`numShards must be >= 1, got ${numShards}`);
+ }
// Increment executor epoch — old executors with stale epochs will terminate.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const startExecutors = mutation({ | |
| args: { | |
| executorHandle: v.string(), | |
| numShards: v.number(), | |
| }, | |
| returns: v.number(), // returns the new epoch | |
| handler: async (ctx, { executorHandle, numShards }) => { | |
| // Increment executor epoch — old executors with stale epochs will terminate. | |
| const existing = await ctx.db | |
| .query("executorEpoch") | |
| .first(); | |
| let epoch: number; | |
| if (existing) { | |
| epoch = existing.epoch + 1; | |
| await ctx.db.patch(existing._id, { epoch }); | |
| } else { | |
| epoch = 1; | |
| await ctx.db.insert("executorEpoch", { epoch }); | |
| } | |
| for (let i = 0; i < numShards; i++) { | |
| await ctx.scheduler.runAfter( | |
| 0, | |
| executorHandle as FunctionHandle<"action">, | |
| { shard: i, epoch }, | |
| ); | |
| } | |
| return epoch; | |
| }, | |
| }); | |
| export const startExecutors = mutation({ | |
| args: { | |
| executorHandle: v.string(), | |
| numShards: v.number(), | |
| }, | |
| returns: v.number(), // returns the new epoch | |
| handler: async (ctx, { executorHandle, numShards }) => { | |
| if (numShards < 1) { | |
| throw new Error(`numShards must be >= 1, got ${numShards}`); | |
| } | |
| // Increment executor epoch — old executors with stale epochs will terminate. | |
| const existing = await ctx.db | |
| .query("executorEpoch") | |
| .first(); | |
| let epoch: number; | |
| if (existing) { | |
| epoch = existing.epoch + 1; | |
| await ctx.db.patch(existing._id, { epoch }); | |
| } else { | |
| epoch = 1; | |
| await ctx.db.insert("executorEpoch", { epoch }); | |
| } | |
| for (let i = 0; i < numShards; i++) { | |
| await ctx.scheduler.runAfter( | |
| 0, | |
| executorHandle as FunctionHandle<"action">, | |
| { shard: i, epoch }, | |
| ); | |
| } | |
| return epoch; | |
| }, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/taskQueue.ts` around lines 167 - 195, startExecutors increments
the epoch but doesn't validate numShards, so numShards <= 0 will increment epoch
and schedule no executors; add validation at the start of the mutation handler
to ensure numShards is a positive integer (e.g., throw a descriptive error or
return a failure) before touching the DB or epoch; refer to the startExecutors
handler, the numShards param, and ctx.scheduler.runAfter/epoch logic so you
validate and short-circuit early (reject non-integers, zero, or negatives) to
avoid advancing epoch without scheduling executors.
| export const creationTimeBuckets = query({ | ||
| args: { | ||
| name: v.string(), | ||
| createdAfter: v.number(), | ||
| bucketMs: v.number(), | ||
| }, | ||
| returns: v.array(v.object({ offsetSec: v.number(), count: v.number() })), | ||
| handler: async (ctx, { name, createdAfter, bucketMs }) => { | ||
| const buckets = new Map<number, number>(); | ||
| for await (const wf of ctx.db | ||
| .query("workflows") | ||
| .withIndex("name", (q) => q.eq("name", name)) | ||
| .order("desc")) { | ||
| if (wf._creationTime < createdAfter) break; | ||
| const bucket = Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000); | ||
| buckets.set(bucket, (buckets.get(bucket) || 0) + 1); | ||
| } | ||
| return [...buckets.entries()] | ||
| .map(([offsetSec, count]) => ({ offsetSec, count })) | ||
| .sort((a, b) => a.offsetSec - b.offsetSec); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
creationTimeBuckets scans unbounded rows — risk hitting Convex read limits on large datasets.
Unlike countByNamePage which uses pagination, creationTimeBuckets iterates all matching workflows in a single query call with no pagination or row-count limit. For popular workflow names with many runs, this could exceed the 16MB read limit and throw. Consider adding a paginated variant similar to countByNamePage, or at minimum document the constraint.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/workflow.ts` around lines 266 - 287, creationTimeBuckets
currently iterates all matching workflows in a single query and can hit Convex
read limits; change the handler in creationTimeBuckets to paginate like
countByNamePage: run the same ctx.db.query("workflows").withIndex("name", q =>
q.eq("name", name)).order("desc") but fetch results in fixed-size pages (e.g.,
pageSize), loop with the query pagination API (cursor/next) until no more pages
or until wf._creationTime < createdAfter, aggregate counts into the existing
buckets Map per page, then return the sorted entries as before; if you cannot
implement paging now, add a clear comment in the creationTimeBuckets handler
referencing countByNamePage and the 16MB read-limit constraint.
When replayIfReady fails after 3 immediate retries, push the candidate to a rescue queue instead of dropping it. The flush loop retries rescued candidates every 500ms, and waitUntilIdle drains them before shutdown. This prevents workflows from getting permanently stuck when all steps complete but replay fails due to transient OCC conflicts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4ffc4e1 to
de4a686
Compare
ianmacartney
left a comment
There was a problem hiding this comment.
overall I like the pattern but don't really like the fixed executor 10m strategy - seems too flaky & prone to errors / outages. would need a lot of hardening, - I think it makes sense as a separate component - and to make Workflow more flexible for what the executor of each step is - scheduler, workpool, executor shards, or BYO - just need a good interface for the equivalent of a Promise for async suspend-able execution
| ...schedulerOptions, | ||
| }, | ||
| ); | ||
| await ensureCoordinatorRunning(ctx); |
There was a problem hiding this comment.
seems like we no longer enqueue here
| const otherInProgress = await ctx.db | ||
| .query("steps") | ||
| .withIndex("inProgress", (q) => | ||
| q.eq("step.inProgress", true).eq("workflowId", workflowId), | ||
| ) | ||
| .first(); |
There was a problem hiding this comment.
note: in a promise.all situation this could keep hitting OCCs on onComplete until they're all in - could cause issues when onComplete is called directly. one thing that could help here is a sense of "de-duping" tasks in the workpool, based on the function handle & args
| stepId, | ||
| workflowId: workflow._id, | ||
| generationNumber, | ||
| retry: retryConfig, |
There was a problem hiding this comment.
why is there a retry config on the mutation?
| stepId, | ||
| workflowId: workflow._id, | ||
| generationNumber, | ||
| retry: retryConfig, |
| return internalActionGeneric({ | ||
| handler: async () => { | ||
| throw new Error( | ||
| `${name} should not be called directly — it runs inside executors`, | ||
| ); | ||
| }, | ||
| }) as any; | ||
| } |
There was a problem hiding this comment.
seems like you could not return this and it'd work just fine
- Add WorkflowRateLimitError + per-step-type rate-limit gates - Fix executor straggler bug (tasks not completing during handoff) - Serialized flush→replay to eliminate OCC conflicts at scale - Inline replay into recordResultBatch + tune flush params - Batched replay mutations for throughput - Sharded task queue + executor-driven chaining Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…plementation
[checkpoint] Manual: Fixed executor idle timeout and auto-start from benchmark
feat: sharded task queue with executor-driven workflow chaining
Replace single-document BatchWorkpool coordination with a sharded task
queue. Each executor action reads from a non-overlapping shard, eliminating
cross-executor OCC contention. Key changes:
- Atomic task lifecycle: claimTasks reads without deleting, recordResult
atomically records step result + deletes task + replays workflow
- Epoch-based executor dedup prevents duplicate executors from accumulating
- Stale tasks always cleaned up on error (no permanent shard blocking)
- Executors never idle-terminate, always self-reschedule at 8 min
- Faster coordinator fan-out (batch size 50, read batch 1000)
- Benchmark harness with parallel batch creation via scheduler
10K 4-step workflows: ~120s (6x improvement over BatchWorkpool baseline)
20K 4-step workflows: ~390s with 40 shards, 0 failures
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: batched result recording, consistent sharding, and query fixes
- Batch recordResults into single mutation per flush (500ms interval,
max 50 items) to eliminate CommitterFullError under high throughput
- Deterministic shard assignment (hash workflowId) so all steps for one
workflow go to the same executor, eliminating cross-shard OCC conflicts
- Split recording from replay: recordResultBatch returns candidates,
replayIfReady runs per-workflow replay as separate small mutations
- Change claimTasks from mutation to query (read-only, much cheaper)
- Add countByName query with createdAfter filter for reliable status
counting in a single transaction (no pagination drift)
- Fix diagnose query to use indexed per-shard queries instead of
table scan
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix: retry failed replayIfReady calls to prevent stuck workflows
When replayIfReady fails (OCC conflict or transient error), the
workflow's tasks are already deleted from the queue. Without retry,
nothing will ever trigger the replay again, leaving the workflow
stuck forever.
Now retries failed replays up to 3 times with increasing backoff
(200ms, 400ms, 600ms). This eliminated the 5/20000 stuck workflows
from the previous benchmark run.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: benchmark timeline visualization served from HTTP action
Add timelinePage query to the workflow component that returns compact
timeline data (workflow id, creation time, step timings) with efficient
desc-order pagination and createdAfter filtering.
Serve a canvas-based visualization from /benchmark-viz HTTP route that
shows every workflow as a 1px colored bar (blue=extract, green=analyze-a,
orange=analyze-b, red=summarize) with live stats and time scale.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: vizUrl in startBenchmark output, stable timeline polling, half-pixel rows
Return clickable viz URL from startBenchmark. Switch timeline from
unstable paginated subscriptions to polling (status stays reactive).
Add vertical labels every 1000 workflows. Use CSS scaleY(0.5) for
half-pixel row height.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: pre-emptive executor restart and transparent viz colors
Schedule successor executor before draining in-flight tasks to eliminate
the throughput gap between executor generations. Reduce RESCHEDULE_MS
from 8 to 5 minutes for more frequent handoffs. Use transparent colors
for analyze-a (green) and analyze-b (amber) so concurrent steps are
visually distinguishable when overlaid.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: jittered executor restarts and 10x smaller viz rows
Add 0-60s jitter to RESCHEDULE_MS so all 40 shards don't restart
simultaneously. Schedule successor before draining so it cold-starts
during the drain period. Reduce viz row scaleY from 0.5 to 0.1 to
fit 20k workflows in view.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
[checkpoint] Auto-save at 11:59:31 AM
feat: executor handoff handshake, drain-before-exit, and LLM benchmark mode
Three-phase handoff protocol eliminates throughput gaps at executor restart
boundaries: old executor keeps claiming tasks until successor signals ready,
then yields cleanly. Stale-epoch exits now drain remaining shard tasks to
prevent stranded work.
Also adds creationTimeBuckets diagnostic query, increases batch size to 1000,
and introduces a real Claude LLM benchmark mode alongside the simulated one.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: real Claude LLM benchmark mode, queue/execution viz, dynamic scaling
- Add benchmarkMode ("simulated" | "real") flag to all workflows and actions
- Add callClaude() with Anthropic SDK, 20 topic corpus, task-specific prompts
- SDK configured with maxRetries: 10 for rate limit resilience
- Extract executorStartedAt from step results in timelinePage query
- Viz: dim bars for queue wait vs bright bars for execution time
- Viz: dynamic Y-axis scaling to fit any workflow count on one screen
- Disable batch mode (BatchWorkpool not in published workpool)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
[checkpoint] Auto-save at 12:34:31 PM
chore: tune executor concurrency limits and Anthropic retry count
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: paginated status counting and executor improvements
- Add countByNamePage to component for paginated workflow counting
that stays under Convex's 16MB read limit at 20k+ workflows
- Convert benchmarkStatus to action that loops over pages
- Replace viz's reactive subscription with paginated status polling
- Add retry support and query/mutation routing to executor task queue
- Add executionStartedAt to timeline for queue vs execution distinction
- Fix BatchWorkpool import (type alias for unpublished export)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: streaming executor pipeline with in-flight tracking
Replace batch-and-drain pattern (claim → feed → waitUntilIdle → loop)
with streaming pattern that continuously fills idle slots. Adds local
inFlightStepIds Set to filter already-claimed tasks client-side,
removes waitUntilIdle() from main loop and handoff claiming loop
(kept only at exit/drain points). Adds adaptive polling: 150ms when
tasks are active, 500ms when idle.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
[checkpoint] Auto-save at 5:00:44 PM
[checkpoint] Auto-save at 5:20:44 PM
[checkpoint] Auto-save at 5:35:44 PM
[checkpoint] Auto-save at 5:40:44 PM
[checkpoint] Auto-save at 6:14:45 PM
[checkpoint] Auto-save at 6:29:45 PM
[checkpoint] Auto-save at 7:12:18 PM
MAYBE improvement: fix task re-claim bug + tune executor constants
- Remove premature inFlightStepIds.delete from processTask. Previously,
completed tasks were removed from tracking immediately but stayed in
the DB queue until flush. The claiming loop would re-claim them 3-6x
each, wasting concurrency slots on duplicate executions.
Now only deleted in flush() after recordResultBatch confirms removal.
- CLAIM_LIMIT 150→800: see full shard queue, no invisible tasks
- MAX_CONCURRENCY 750→200: limit event loop contention
- RESCHEDULE_MS 5min→8min: avoid handoff during benchmark window
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
[checkpoint] Auto-save at 7:27:17 PM
[checkpoint] Auto-save at 7:42:18 PM
[checkpoint] Auto-save at 7:47:18 PM
[checkpoint] Auto-save at 7:52:17 PM
[checkpoint] Auto-save at 7:57:18 PM
[checkpoint] Auto-save at 8:07:18 PM
[checkpoint] Auto-save at 8:37:18 PM
Add executor tuning guide and d1024 benchmark results
Documents all executor-mode knobs (executorShards, MAX_CONCURRENCY,
FLUSH_BATCH_SIZE, etc.) with tradeoffs and tuning profiles for different
workload types: short steps, LLM API calls, long-running steps, and
high-volume bursty creation.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add comprehensive executor mode clean-room implementation spec
Complete specification covering: architecture, schema changes, component
mutations (claimTasks, recordResultBatch, replayIfReady), client-side
WorkflowManager changes, the full executor action with flush system and
bounded-concurrency pool, handoff protocol, shard assignment, cancel
support, benchmark visualizer with timeline query, pitfalls discovered
through testing, and all constants with rationale.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… + throughput tuning) Key changes from base: - Executor-mode workflows bypass coordinator (inline creation) - MAX_CONCURRENCY=500, CLAIM_LIMIT=1500, FLUSH_INTERVAL_MS=200 - BATCH_CREATE_SIZE=100 (smaller batches for scheduler concurrency) - executorFinishedAt tracking + write-delay viz - Replays coupled with flush (50 per batch, 3 retries with backoff) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace N individual replayIfReady mutations per flush with a single replayBatchIfReady call, fixing "too many concurrent commits" rate limiting that was losing ~11 workflows per 20k run. Bump FLUSH_BATCH_SIZE from 50 to 200 for higher throughput. Visualizer now streams pages incrementally (draws after each 1000- workflow page) instead of waiting for all 20k to load. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously the visualizer cleared allWorkflows at the start of each poll cycle, causing a blank flash. Now it builds into a separate array and only swaps once the incoming count exceeds the previous frame. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
recordResultBatch now accepts replayInline flag to replay ready workflows in the same mutation, eliminating a separate round-trip. FLUSH_BATCH_SIZE=50, FLUSH_INTERVAL_MS=100ms for faster flush cycles. 20k benchmark: 20000/20000 completed, 0 failed, ~102-145s span. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move replay out of recordResultBatch into executor-side replayBatchIfReady calls, serialized after each flush batch. This eliminates the inProgress index reads from recordResultBatch (zero OCC surface) while keeping inter-step latency tight. Key changes: - recordResultBatch returns replay candidates instead of replaying inline - Executor calls replayBatchIfReady after each flush (serialized, not concurrent) - Viz: add ?sort=shard mode, batched page fetching, error backoff - Benchmark: smaller batch creation (25), skipExecutorStart flag, inline small counts 20k benchmark: p50=76.7s, p90=82.2s, p99=86.4s, slowest=92s, 0 failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New canvas above the timeline showing a stacked area chart of concurrent workflows in each phase (extract, analyze-a, analyze-b, summarize) over time. Y-axis labels show peak concurrency. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Documents benchmark results, architecture, tuning constants, and failed experiments for the executor flush/replay pipeline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add drain loop on both executor exit paths (handoff and epoch-mismatch) that retries up to 10 times with 500ms delays, catching tasks created by replay sub-mutations during shutdown. Fixes issue where 3+ workflows would get permanently stuck in the queue. Also: restore 100 shards, add sort-by-shard toggle and per-phase step counts to benchmark viz. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Executor now supports rate-limit-aware retries: - WorkflowRateLimitError lets step handlers signal rate limits - Per-action-handle gates prevent thundering herd on retry - Benchmark uses shared gate pattern for Anthropic API 429s - Switch benchmark model to haiku, bump convex to 1.32 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
These belong in the second PR for documentation and testing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Bug fix: replay candidates that fail replayBatchIfReady twice now go into a rescue queue instead of being silently dropped. The flush loop retries the rescue queue every tick, and waitUntilIdle drains it before executor shutdown. This prevents workflows from getting permanently stranded by transient OCC failures. Review feedback (Ian): - Remove retry config from query/mutation task queue inserts — queries are deterministic and mutations auto-retry on OCC. Only actions get application-level retry. - Remove unnecessary explicit return type on executor(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three fixes to ensure every workflow reaches a terminal state: 1. replayInline in recordResultBatch: replay workflows within the same mutation that records step results, eliminating OCC conflicts between recordResultBatch and replayBatchIfReady. 2. Durable scheduler safety net: when replayInline skips a candidate (other steps still in-progress from a concurrent batch), schedule a delayed replayIfReady via ctx.scheduler.runAfter(10s). This is committed atomically with the result recording, so it survives executor crashes and hard 10-minute action timeouts. 3. Lower RESCHEDULE_MS from 8min to 5min, giving more headroom for drain and handoff before the 10-minute hard kill. Also returns unreplayed candidates to the client (instead of []) so the existing replayBatchIfReady + rescue queue path provides an additional fast retry before the scheduled safety net fires. Verified: 10k real Claude benchmark → 9940 completed, 60 failed (rate limits), 0 stuck workflows (previously 3-15 stuck per run). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (6)
package.json (1)
66-67:⚠️ Potential issue | 🟡 MinorReintroduced
@anthropic-ai/sdkshould be usage-verified before release.Line 66 adds this runtime dependency again. This was previously removed as unused; if it’s still unused, it increases supply-chain and install surface for no functional gain.
#!/bin/bash set -euo pipefail # Verify direct imports/usages of Anthropic SDK across source, examples, and docs. rg -n --hidden \ -g '!**/node_modules/**' \ -g '!**/dist/**' \ '@anthropic-ai/sdk|from[[:space:]]+["'\'']@anthropic-ai/sdk["'\'']|new[[:space:]]+Anthropic|anthropic\.' \ --type ts --type js --type tsx --type jsx --type json --type md🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@package.json` around lines 66 - 67, The package reintroduces the runtime dependency "@anthropic-ai/sdk" into package.json but the PR comment requests usage verification; search the repo for direct imports/usages of "@anthropic-ai/sdk" (import, require, new Anthropic, anthropic.*) across source, examples, docs and tests, and if no usage is found remove the dependency from package.json (and any lockfile update) or if it is required, add a short code reference (file path and symbol) and a minimal test/example showing its use plus a package.json comment or release note explaining why it’s needed; ensure any CI/test changes reflect the dependency addition if kept.src/client/step.ts (1)
191-205:⚠️ Potential issue | 🟠 MajorBase-name matching can collide across modules and misroute batch actions.
Line 200 through Line 203 collapses function identity to a basename (
doWork), so two distinct actions can be treated as the same batch action.🔧 Proposed fix (match fully-qualified safe function names)
- // Detect batch actions by checking if the function name's base matches - // a registered batch action name. + // Detect batch actions by matching fully-qualified safe function names. let batchActionName: string | undefined; if ( this.batchActionNames && target.kind === "function" && target.functionType === "action" ) { const fnName = safeFunctionName(target.function); - const parts = fnName.split(/[:/]/); - const baseName = parts[parts.length - 1]; - if (this.batchActionNames.has(baseName)) { - batchActionName = baseName; + if (this.batchActionNames.has(fnName)) { + batchActionName = fnName; } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/step.ts` around lines 191 - 205, The code currently reduces function identity to a basename (using safeFunctionName then taking the last segment) which can collide across modules; instead compare the full safe function name against the batchActionNames set. In the block that computes batchActionName (symbols: batchActionNames, target, safeFunctionName, batchActionName, functionType), stop extracting baseName via split and check this.batchActionNames.has(fnName) and if true set batchActionName = fnName; ensure anywhere that registers names into batchActionNames also uses the same fully-qualified safeFunctionName format so the check is consistent.src/component/coordinator.ts (1)
76-103:⚠️ Potential issue | 🟠 Major
runWorkflowBatchstill couples up to 50 workflow replays into one failure domain.Line 88 through Line 91 executes each workflow via
ctx.runMutationinside the same batch mutation. A single OCC/read-write-limit failure can roll back/retry the entire batch.In Convex, when a mutation calls ctx.runMutation multiple times in a loop, are those calls part of the caller transaction/sub-transaction flow, and can one OCC conflict cause the whole parent mutation to retry or fail?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/coordinator.ts` around lines 76 - 103, The handler currently calls multiple ctx.runMutation invocations in a loop, coupling up to 50 workflow replays into a single failure domain; change the logic so each workflowHandle is invoked in its own top-level mutation/task (so one OCC/read-write failure for one replay doesn't rollback the whole batch). Concretely: stop awaiting ctx.runMutation directly inside this handler's transaction loop; instead schedule/dispatch each run of workflowHandle (the ctx.runMutation call) as an independent mutation/task (so each has its own try/catch and its own ctx.db.patch for runResult on failure), referencing handler, ctx.runMutation, workflowHandle and ctx.db.patch to locate where to detach these calls. Ensure each workflow's error handling remains per-workflow and updates runResult individually.src/component/taskQueue.ts (1)
174-194:⚠️ Potential issue | 🟠 MajorValidate
numShardsbefore bumping executor epoch.
numShards <= 0(or non-integer) currently advances epoch and can stop existing executors without scheduling replacements.💡 Proposed fix
handler: async (ctx, { executorHandle, numShards }) => { + if (!Number.isInteger(numShards) || numShards < 1) { + throw new Error(`numShards must be a positive integer, got ${numShards}`); + } // Increment executor epoch — old executors with stale epochs will terminate. const existing = await ctx.db .query("executorEpoch")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 174 - 194, The handler currently bumps the executor epoch before validating numShards, which allows non-positive or non-integer values to retire executors without scheduling replacements; modify handler to validate numShards (ensure Number.isInteger(numShards) and numShards > 0) at the top and throw or return an error before touching the executorEpoch record if validation fails, then proceed to update/insert executorEpoch and call ctx.scheduler.runAfter for each shard using the validated numShards and epoch; reference the handler function, the executorEpoch DB query/patch/insert logic, and the ctx.scheduler.runAfter/executorHandle scheduling code when making this change.src/client/index.ts (1)
486-490:⚠️ Potential issue | 🟡 MinorLog before dropping unflushed results after retry exhaustion.
This path clears
pendingResultssilently, which makes production triage much harder when flush repeatedly fails.💡 Proposed fix
if (retries >= MAX_FLUSH_RETRIES) { // Give up on remaining items — tasks stay in queue, // next executor run will re-process them. + console.error( + `Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${retries} retries (max ${MAX_FLUSH_RETRIES})`, + ); pendingResults.length = 0; break; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 486 - 490, Before clearing pendingResults when retries >= MAX_FLUSH_RETRIES, add a log statement that records the fact you're dropping items: include retries, MAX_FLUSH_RETRIES, the number of items being dropped (pendingResults.length) and any available last error/flush failure reason; place this log immediately before the line that sets pendingResults.length = 0 inside the same retry/flush loop so operators can triage. Use the module's existing logger (e.g., logger or processLogger) so the message is emitted consistently with other logs and reference pendingResults, retries and MAX_FLUSH_RETRIES in the message.src/component/workflow.ts (1)
270-291:⚠️ Potential issue | 🟠 Major
creationTimeBucketsstill performs an unbounded scan for a workflow name.This endpoint walks all matching rows in one call, which is risky at higher volumes. Please switch to a paginated/batched approach (similar to
countByNamePage) for large histories.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 270 - 291, creationTimeBuckets currently iterates all matching workflows in a single unbounded scan; change it to a paginated/batched scan similar to countByNamePage by using ctx.db.query("workflows").withIndex("name").order("desc") but calling .limit(pageSize).start(cursor) (or whatever pagination API your DB wrapper exposes) in a loop, updating the cursor each iteration and breaking when no more results, and for each page accumulate bucket counts (compute bucket = Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000) and increment Map entries) before returning the sorted array; keep the same return shape and parameter handling in creationTimeBuckets and reuse page size / cursor logic consistent with countByNamePage to avoid unbounded scans.
🧹 Nitpick comments (4)
src/component/pool.ts (1)
97-97: Avoid hard-coding logger level in runtime handlers.Line 97, Line 257, and Line 298 always use
DEFAULT_LOG_LEVEL, which can hide diagnostics when operators configure higher verbosity. Prefer sourcing the configured level (context/config) before falling back.Also applies to: 257-257, 298-299
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/pool.ts` at line 97, The code is hard-coding DEFAULT_LOG_LEVEL when creating loggers (via createLogger(DEFAULT_LOG_LEVEL)), which ignores operator-configured verbosity; change each creation site (the createLogger call near the pool top and the other two occurrences) to first obtain the configured level from the runtime context or config (e.g., ctx.logLevel, context.getConfig('logLevel') or similar) and pass that value into createLogger, falling back to DEFAULT_LOG_LEVEL only if the configured value is absent; update all three locations (the createLogger/DEFAULT_LOG_LEVEL usages) so they use the resolved configuredLevel variable instead of the literal DEFAULT_LOG_LEVEL.src/component/workflow.ts (1)
334-340:timelinePagedoes N+1 fullsteps.collect()queries.Fetching all steps for every workflow in-page can get expensive quickly. Consider limiting step payload (or paginating steps) for this view.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/workflow.ts` around lines 334 - 340, The timelinePage code issues N+1 DB calls by running ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId", wf._id)).collect() for each wf in filtered; replace this with a single batched query that retrieves steps for all workflows in one call (e.g., collect all workflow IDs from filtered, run ctx.db.query("steps").withIndex("workflow", q => q.in("workflowId", ids)).collect()), then group the returned step docs by workflowId and attach the limited/paginated slice to each workflow; also reduce payload by selecting only needed fields (or applying a per-workflow limit) when constructing the step lists for timelinePage.src/client/index.ts (1)
165-169: Reject duplicate executor action names instead of silently overriding.A second registration with the same
namecurrently overwrites the previous handler inexecutorActionHandlers.💡 Proposed fix
// Store handler for executor to call, register name for // batch action detection in step.ts, return a dummy action placeholder. + if (this.executorActionHandlers.has(name)) { + throw new Error(`Duplicate executor action registration: ${name}`); + } this.executorActionHandlers.set(name, opts.handler); this.batchActionNames.add(name);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 165 - 169, Check for an existing registration before inserting: in the code that currently does this.executorActionHandlers.set(name, opts.handler) and this.batchActionNames.add(name), first test this.executorActionHandlers.has(name) and reject the duplicate (throw an Error or return a failure) instead of overwriting; if rejecting, do not modify this.batchActionNames and include the duplicate name in the error message so callers can diagnose which action is duplicated.src/component/taskQueue.ts (1)
56-57: Task queue handlers bypass configurable log levels.Using
createLogger(DEFAULT_LOG_LEVEL)here ignores configured runtime log level behavior used elsewhere.💡 Proposed fix
-import { createLogger, DEFAULT_LOG_LEVEL } from "./logging.js"; +import { getDefaultLogger } from "./utils.js"; // ... -const console = createLogger(DEFAULT_LOG_LEVEL); +const console = await getDefaultLogger(ctx);Also applies to: 230-231, 385-386, 421-422
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/taskQueue.ts` around lines 56 - 57, Handlers create a logger with a hardcoded DEFAULT_LOG_LEVEL via the expression createLogger(DEFAULT_LOG_LEVEL), which bypasses the runtime-configured log level; replace those calls to use the actual configured runtime log level (e.g. read from the existing configuration or a central logger) instead of DEFAULT_LOG_LEVEL — for example, call createLogger(getConfiguredLogLevel()) or reuse the shared logger instance injected into the module; update every occurrence of createLogger(DEFAULT_LOG_LEVEL) in taskQueue.ts (including the instances around the earlier noted positions) so all task queue handlers honor the configured log level.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/client/index.ts`:
- Around line 346-350: The flushLoop currently only calls flush() when
pendingResults.length > 0, so replayRescueQueue can stall until new results
arrive; modify flushLoop (the function containing the while (flushLoopRunning)
loop) to either call flush() unconditionally each interval or explicitly invoke
the logic that processes replayRescueQueue when pendingResults is empty (e.g.,
call replayRescueQueue or a helper like flushReplayRescueQueue inside the loop),
ensuring FLUSH_INTERVAL_MS still governs the retry cadence so rescue items are
retried periodically even if pendingResults is empty.
In `@src/component/journal.ts`:
- Around line 183-193: Queries and mutations enqueued in the executor branches
are missing retry metadata; add a qmRetryConfig computed like actionRetryConfig
but applied when step.functionType is "query" or "mutation" (respecting
stepArgs.retry === false, object override, otherwise DEFAULT_QM_RETRY) and pass
that qmRetryConfig into the enqueue calls inside the switch (step.functionType)
cases for "query" and "mutation" so executor-routed tasks get the same per-step
override + default retry behavior.
---
Duplicate comments:
In `@package.json`:
- Around line 66-67: The package reintroduces the runtime dependency
"@anthropic-ai/sdk" into package.json but the PR comment requests usage
verification; search the repo for direct imports/usages of "@anthropic-ai/sdk"
(import, require, new Anthropic, anthropic.*) across source, examples, docs and
tests, and if no usage is found remove the dependency from package.json (and any
lockfile update) or if it is required, add a short code reference (file path and
symbol) and a minimal test/example showing its use plus a package.json comment
or release note explaining why it’s needed; ensure any CI/test changes reflect
the dependency addition if kept.
In `@src/client/index.ts`:
- Around line 486-490: Before clearing pendingResults when retries >=
MAX_FLUSH_RETRIES, add a log statement that records the fact you're dropping
items: include retries, MAX_FLUSH_RETRIES, the number of items being dropped
(pendingResults.length) and any available last error/flush failure reason; place
this log immediately before the line that sets pendingResults.length = 0 inside
the same retry/flush loop so operators can triage. Use the module's existing
logger (e.g., logger or processLogger) so the message is emitted consistently
with other logs and reference pendingResults, retries and MAX_FLUSH_RETRIES in
the message.
In `@src/client/step.ts`:
- Around line 191-205: The code currently reduces function identity to a
basename (using safeFunctionName then taking the last segment) which can collide
across modules; instead compare the full safe function name against the
batchActionNames set. In the block that computes batchActionName (symbols:
batchActionNames, target, safeFunctionName, batchActionName, functionType), stop
extracting baseName via split and check this.batchActionNames.has(fnName) and if
true set batchActionName = fnName; ensure anywhere that registers names into
batchActionNames also uses the same fully-qualified safeFunctionName format so
the check is consistent.
In `@src/component/coordinator.ts`:
- Around line 76-103: The handler currently calls multiple ctx.runMutation
invocations in a loop, coupling up to 50 workflow replays into a single failure
domain; change the logic so each workflowHandle is invoked in its own top-level
mutation/task (so one OCC/read-write failure for one replay doesn't rollback the
whole batch). Concretely: stop awaiting ctx.runMutation directly inside this
handler's transaction loop; instead schedule/dispatch each run of workflowHandle
(the ctx.runMutation call) as an independent mutation/task (so each has its own
try/catch and its own ctx.db.patch for runResult on failure), referencing
handler, ctx.runMutation, workflowHandle and ctx.db.patch to locate where to
detach these calls. Ensure each workflow's error handling remains per-workflow
and updates runResult individually.
In `@src/component/taskQueue.ts`:
- Around line 174-194: The handler currently bumps the executor epoch before
validating numShards, which allows non-positive or non-integer values to retire
executors without scheduling replacements; modify handler to validate numShards
(ensure Number.isInteger(numShards) and numShards > 0) at the top and throw or
return an error before touching the executorEpoch record if validation fails,
then proceed to update/insert executorEpoch and call ctx.scheduler.runAfter for
each shard using the validated numShards and epoch; reference the handler
function, the executorEpoch DB query/patch/insert logic, and the
ctx.scheduler.runAfter/executorHandle scheduling code when making this change.
In `@src/component/workflow.ts`:
- Around line 270-291: creationTimeBuckets currently iterates all matching
workflows in a single unbounded scan; change it to a paginated/batched scan
similar to countByNamePage by using
ctx.db.query("workflows").withIndex("name").order("desc") but calling
.limit(pageSize).start(cursor) (or whatever pagination API your DB wrapper
exposes) in a loop, updating the cursor each iteration and breaking when no more
results, and for each page accumulate bucket counts (compute bucket =
Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000) and
increment Map entries) before returning the sorted array; keep the same return
shape and parameter handling in creationTimeBuckets and reuse page size / cursor
logic consistent with countByNamePage to avoid unbounded scans.
---
Nitpick comments:
In `@src/client/index.ts`:
- Around line 165-169: Check for an existing registration before inserting: in
the code that currently does this.executorActionHandlers.set(name, opts.handler)
and this.batchActionNames.add(name), first test
this.executorActionHandlers.has(name) and reject the duplicate (throw an Error
or return a failure) instead of overwriting; if rejecting, do not modify
this.batchActionNames and include the duplicate name in the error message so
callers can diagnose which action is duplicated.
In `@src/component/pool.ts`:
- Line 97: The code is hard-coding DEFAULT_LOG_LEVEL when creating loggers (via
createLogger(DEFAULT_LOG_LEVEL)), which ignores operator-configured verbosity;
change each creation site (the createLogger call near the pool top and the other
two occurrences) to first obtain the configured level from the runtime context
or config (e.g., ctx.logLevel, context.getConfig('logLevel') or similar) and
pass that value into createLogger, falling back to DEFAULT_LOG_LEVEL only if the
configured value is absent; update all three locations (the
createLogger/DEFAULT_LOG_LEVEL usages) so they use the resolved configuredLevel
variable instead of the literal DEFAULT_LOG_LEVEL.
In `@src/component/taskQueue.ts`:
- Around line 56-57: Handlers create a logger with a hardcoded DEFAULT_LOG_LEVEL
via the expression createLogger(DEFAULT_LOG_LEVEL), which bypasses the
runtime-configured log level; replace those calls to use the actual configured
runtime log level (e.g. read from the existing configuration or a central
logger) instead of DEFAULT_LOG_LEVEL — for example, call
createLogger(getConfiguredLogLevel()) or reuse the shared logger instance
injected into the module; update every occurrence of
createLogger(DEFAULT_LOG_LEVEL) in taskQueue.ts (including the instances around
the earlier noted positions) so all task queue handlers honor the configured log
level.
In `@src/component/workflow.ts`:
- Around line 334-340: The timelinePage code issues N+1 DB calls by running
ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId",
wf._id)).collect() for each wf in filtered; replace this with a single batched
query that retrieves steps for all workflows in one call (e.g., collect all
workflow IDs from filtered, run ctx.db.query("steps").withIndex("workflow", q =>
q.in("workflowId", ids)).collect()), then group the returned step docs by
workflowId and attach the limited/paginated slice to each workflow; also reduce
payload by selecting only needed fields (or applying a per-workflow limit) when
constructing the step lists for timelinePage.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**package-lock.jsonis excluded by!**/package-lock.jsonsrc/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**src/component/_generated/server.tsis excluded by!**/_generated/**
📒 Files selected for processing (13)
example/convex/convex.config.tsexample/convex/schema.tspackage.jsonsrc/client/index.tssrc/client/step.tssrc/client/workflowMutation.tssrc/component/coordinator.tssrc/component/event.tssrc/component/journal.tssrc/component/pool.tssrc/component/schema.tssrc/component/taskQueue.tssrc/component/workflow.ts
| while (flushLoopRunning) { | ||
| await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS)); | ||
| if (pendingResults.length > 0) { | ||
| await flush(); | ||
| } |
There was a problem hiding this comment.
Replay rescue queue is not retried unless new step results arrive.
flushLoop skips flush() when pendingResults is empty, so replayRescueQueue can sit idle until shutdown/final safety-net scheduling.
💡 Proposed fix
const flushLoop = (async () => {
while (flushLoopRunning) {
await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS));
- if (pendingResults.length > 0) {
+ if (pendingResults.length > 0 || replayRescueQueue.length > 0) {
await flush();
}
}
})();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| while (flushLoopRunning) { | |
| await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS)); | |
| if (pendingResults.length > 0) { | |
| await flush(); | |
| } | |
| const flushLoop = (async () => { | |
| while (flushLoopRunning) { | |
| await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS)); | |
| if (pendingResults.length > 0 || replayRescueQueue.length > 0) { | |
| await flush(); | |
| } | |
| } | |
| })(); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/client/index.ts` around lines 346 - 350, The flushLoop currently only
calls flush() when pendingResults.length > 0, so replayRescueQueue can stall
until new results arrive; modify flushLoop (the function containing the while
(flushLoopRunning) loop) to either call flush() unconditionally each interval or
explicitly invoke the logic that processes replayRescueQueue when pendingResults
is empty (e.g., call replayRescueQueue or a helper like flushReplayRescueQueue
inside the loop), ensuring FLUSH_INTERVAL_MS still governs the retry cadence so
rescue items are retried periodically even if pendingResults is empty.
| // Retry config only applies to actions — queries and mutations | ||
| // are deterministic and retried automatically by Convex on OCC. | ||
| const actionRetryConfig = | ||
| step.functionType === "action" | ||
| ? (stepArgs.retry === false | ||
| ? undefined | ||
| : typeof stepArgs.retry === "object" | ||
| ? stepArgs.retry | ||
| : DEFAULT_QM_RETRY) | ||
| : undefined; | ||
| switch (step.functionType) { |
There was a problem hiding this comment.
Executor-routed queries/mutations are missing retry policy metadata.
Line 195 through Line 205 and Line 218 through Line 228 enqueue query/mutation tasks without retry, so transient failures can fail immediately in executor mode.
🔧 Proposed fix (preserve per-step override with default fallback)
- // Retry config only applies to actions — queries and mutations
- // are deterministic and retried automatically by Convex on OCC.
- const actionRetryConfig =
- step.functionType === "action"
- ? (stepArgs.retry === false
- ? undefined
- : typeof stepArgs.retry === "object"
- ? stepArgs.retry
- : DEFAULT_QM_RETRY)
- : undefined;
+ const retryConfig =
+ stepArgs.retry === false
+ ? undefined
+ : typeof stepArgs.retry === "object"
+ ? stepArgs.retry
+ : DEFAULT_QM_RETRY;
@@
case "query": {
if (workflow.executorShards) {
const shard = shardForWorkflow(workflow._id as string, workflow.executorShards);
await ctx.db.insert("taskQueue", {
shard,
functionType: "query",
handle: step.handle,
args: step.args,
stepId,
workflowId: workflow._id,
generationNumber,
+ ...(retryConfig ? { retry: retryConfig } : {}),
});
workId = `executor:${stepId}`;
@@
case "mutation": {
if (workflow.executorShards) {
const shard = shardForWorkflow(workflow._id as string, workflow.executorShards);
await ctx.db.insert("taskQueue", {
shard,
functionType: "mutation",
handle: step.handle,
args: step.args,
stepId,
workflowId: workflow._id,
generationNumber,
+ ...(retryConfig ? { retry: retryConfig } : {}),
});
workId = `executor:${stepId}`;
@@
case "action": {
@@
await ctx.db.insert("taskQueue", {
@@
- retry: actionRetryConfig,
+ ...(retryConfig ? { retry: retryConfig } : {}),
});Also applies to: 195-205, 218-228
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/journal.ts` around lines 183 - 193, Queries and mutations
enqueued in the executor branches are missing retry metadata; add a
qmRetryConfig computed like actionRetryConfig but applied when step.functionType
is "query" or "mutation" (respecting stepArgs.retry === false, object override,
otherwise DEFAULT_QM_RETRY) and pass that qmRetryConfig into the enqueue calls
inside the switch (step.functionType) cases for "query" and "mutation" so
executor-routed tasks get the same per-step override + default retry behavior.
|
Overall I'm still very nervous about the error handling and recovery when
the actions fail - as well as exceeding memory limits and transaction limits for parallel mutations being run.
Loading 1MB+ of context for an LLM call is not unreasonable.
|
|
I'm currently set as the reviewer but let me know if you feel it's actually watertight enough to consider merging. Given how little code is deleted vs. added, I suspect this could be a new component, similar to the Workpool executor mode. |
|
I think this one is more valuable than the workpool version. at least
further along. I suspect it is useful as a template for agents to build
more specific modes on top of. No need for a review. I think town is
testing something along these lines
…On Mon, Mar 2, 2026 at 7:47 PM Ian Macartney ***@***.***> wrote:
*ianmacartney* left a comment (get-convex/workflow#210)
<#210 (comment)>
I'm currently set as the reviewer but let me know if you feel it's
actually watertight enough to consider merging. Given how little code is
deleted vs. added, I suspect this could be a new component, similar to the
Workpool executor mode.
—
Reply to this email directly, view it on GitHub
<#210 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/BZEPEC2HVAXIQZYBSRYR3AT4OZITXAVCNFSM6AAAAACV5TLHGSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTSOBYGQ2TQOJVHA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|

What is Executor Mode?
Executor mode is an alternative execution backend for workflows that trades the workpool's per-step scheduling for long-running executor actions that poll a shared task queue. It's designed for high-throughput, IO-bound workloads — think hundreds or thousands of workflows running concurrently, each making API calls (LLM providers, external services, etc.).
When to use it
Use executor mode when:
Stick with the standard workpool when:
How it works
Standard (workpool) path:
Each step = 1 scheduled function + 1 completion mutation + 1 workflow replay. At scale, this means thousands of scheduler entries and DB writes.
Executor path:
Long-running executor actions (one per shard) continuously poll for work, execute tasks with bounded concurrency, and flush results in batches. This eliminates:
Shard assignment is deterministic — all steps for a given workflow hash to the same shard, so result recording for one workflow doesn't OCC-conflict with another.
Executor lifecycle: Each executor runs for ~8 minutes, then performs a zero-gap handoff — it schedules a successor, keeps claiming tasks until the successor is ready, then yields. No tasks are dropped during the transition.
Changes in this PR
Schema (
src/component/schema.ts)The previous
taskQueuetable only supported actions (it hadactionName: v.string()). To route queries and mutations through the executor too, we replaced it with:functionType(query/mutation/action) — so the executor knows how to dispatch each taskhandle— a FunctionHandle string for queries/mutations, or a registered action name for actionsretryconfig — so per-step retry settings survive the trip fromjournal.tsthrough the task queue to the executorNew
executorEpochandexecutorHandofftables manage executor lifecycle — epoch tracking lets us safely restart all executors (old ones detect they're stale and drain), and handoff documents coordinate zero-gap transitions between predecessor and successor executors.Task queue (
src/component/taskQueue.ts)These functions form the executor's interface to the database. The key design constraint is minimizing OCC conflicts at high throughput:
claimTasks— a query (not mutation) so reading tasks from a shard doesn't write anything. This means multiple executors can read the same shard without conflicting. Task deletion happens later, atomically with result recording.recordResultBatch— records results for up to 50 steps in a single mutation (instead of one mutation per step). Returns replay candidates so the executor can trigger workflow progression. Batching here is the biggest throughput win — 50 steps = 1 mutation instead of 50.replayIfReady— a separate per-workflow mutation (not inlined intorecordResultBatch) so a replay failure for one workflow doesn't roll back result recording for others.startExecutors— increments the epoch and schedules one executor action per shard. The epoch ensures old executors from a previousstartExecutorscall drain gracefully.handoff,getHandoff) — coordinates the predecessor/successor transition so there's never a gap where tasks sit unclaimed.Step routing (
src/component/journal.ts)Previously, only action steps with
batchActionNamecould route through the executor. Queries and mutations always went through the workpool, meaning they paid the full scheduling overhead even when executors were available.Now, when
workflow.executorShardsis set:taskQueuewith the step's retry config (default: 4 attempts, 125ms, base 2). These are safe to retry since queries are read-only and mutations are transactional.batchActionName→ insert intotaskQueue(no retry by default, since actions may have side effects).batchActionName.Extracted
shardForWorkflow()helper to deduplicate the hash logic that was previously inlined in the action case.Executor (
src/client/index.ts)The executor previously only knew how to call registered action handlers. Now it dispatches by function type:
"query"→ctx.runQuery(handle, args)— calls the query function via its FunctionHandle"mutation"→ctx.runMutation(handle, args)— calls the mutation function via its FunctionHandle"action"→ looks up the handler in the registered handlers map and calls it inline (unchanged behavior, but now keyed byhandleinstead ofactionName)Added a retry loop wrapping all execution. The workpool has built-in retries (5 attempts, exponential backoff), so without this the executor was a regression for transient failures. The loop reads
maxAttempts,initialBackoffMs, andbasefrom the task's retry config (set byjournal.ts).Math.max(maxAttempts, 1)guards against misconfigured zero-attempt retries.Added
console.errorwhen replay retries are exhausted — without this, a workflow could get silently stuck with all steps completed but no replay triggered.What doesn't change
step.ts,workflowContext.ts— step request/response protocol andrunQuery/runMutation/runActionsignatures are untouched. Workflows don't need any code changes to benefit from executor routing.executorShardsis not set, behavior is identical to before.awaitEvent/sendEvent) and nested workflows — handled before the function-step dispatch, completely untouched.recordResultBatch/replayIfReady— don't reference function type, work for all step kinds.Usage
Setup
Defining workflows with parallel steps
Starting executors and workflows
Tuning: executor knobs and how to pick them
The executor has several constants that control its behavior. Here's what each one does, the tradeoffs, and how to think about picking values.
executorShards(constructor option)What: Number of executor actions running in parallel, each owning a slice of the task queue.
Tradeoff: More shards = more parallelism and lower latency (fewer tasks per shard), but more running actions and more DB polling. Fewer shards = less overhead, but tasks queue up longer per shard.
How to pick:
executorShards= number of persistent actionsCLAIM_LIMIT(800)What: Maximum number of tasks to read per poll from one shard.
Tradeoff: Higher = fewer round-trips when there's a burst of work, but larger read transactions. Lower = more responsive to new work but more DB queries.
How to pick: 800 is a good default. Only lower it if you see read transaction limits being hit (very large task args).
MAX_CONCURRENCY(200)What: Maximum tasks executing concurrently within a single executor action.
Tradeoff: Higher = more throughput per shard for IO-bound work, but more memory and more in-flight tasks that need flushing if the executor crashes. Lower = less risk, but underutilizes IO parallelism.
How to pick:
POLL_BACKOFF_MS(500) /POLL_BACKOFF_ACTIVE_MS(150)What: Sleep duration between polls. Uses the shorter
ACTIVEinterval when tasks are in-flight, the longer one when the shard is idle.Tradeoff: Shorter = lower latency (tasks picked up faster), but more DB reads. Longer = less DB load, but tasks sit in the queue longer.
How to pick: The defaults work well. If you need sub-100ms pickup latency, lower
POLL_BACKOFF_ACTIVE_MSto 50–100. If you're cost-sensitive and latency doesn't matter, raisePOLL_BACKOFF_MSto 1000–2000.FLUSH_BATCH_SIZE(50) /FLUSH_INTERVAL_MS(500)What: Results are buffered and flushed in batches.
BATCH_SIZEcontrols max items per flush mutation,INTERVAL_MScontrols how often the flush loop runs.Tradeoff: Larger batches = fewer mutations (more efficient), but higher latency before workflow replay triggers. Smaller batches = faster workflow progression, but more mutation overhead.
How to pick: 50 is a good default. If your workflows have many sequential steps and you need them to progress fast, lower to 10–20. If you have mostly parallel steps and throughput matters more, raise to 100.
RESCHEDULE_MS(8 min)What: How long an executor runs before handing off to a successor (Convex actions time out at 10 min).
Tradeoff: Longer = fewer handoffs and less overhead, but less margin before the 10-min timeout. Shorter = more frequent restarts, but safer.
How to pick: 8 minutes leaves 2 minutes of margin for the handoff protocol. Only change this if you're seeing timeout issues (lower to 6–7 min) or want to minimize handoff frequency (raise to 9 min, but risky).
Design: how the executor avoids common pitfalls
OCC (Optimistic Concurrency Control) conflicts
At scale, OCC is the #1 throughput killer. The executor avoids it at every level:
shardForWorkflow()hashes the workflow ID to a shard number. All steps for one workflow go to the same shard, sorecordResultBatchfor workflow A never writes to the same documents as workflow B. Different shards = different executors = no cross-workflow OCC conflicts.recordResultBatchrecords step results and deletes task queue entries, it returns a list of workflows that might be ready to replay. Each workflow gets its own tinyreplayIfReadymutation, so a replay failure for workflow A doesn't roll back results for workflow B.claimTasksis a query (not a mutation). Multiple executors can read the same shard without conflicting. Deletion happens later inrecordResultBatch, atomically with result recording.Batched mutations
Instead of one mutation per step result (the workpool's
onCompletepattern), the executor buffers results in memory and flushes them in batches:processTaskcompletes → result pushed topendingResults[]FLUSH_INTERVAL_MS(500ms)FLUSH_BATCH_SIZE(50) items and callsrecordResultBatch— one mutation that records 50 step results, deletes 50 task queue entries, and returns replay candidatesThis means 50 steps = 1 mutation instead of 50. At 1000 workflows × 4 steps each, that's ~80 mutations instead of ~4000.
Timeout avoidance
Convex actions have a 10-minute hard timeout. The executor avoids hitting it with a graceful handoff protocol:
RESCHEDULE_MS(8 min), the executor creates a handoff document and schedules a successor actionThis gives a 2-minute safety margin and ensures zero task gap — at no point are tasks sitting in the queue without an executor claiming them.
Safe restarts with epochs
Calling
startExecutors()multiple times is safe. Each call increments theexecutorEpochcounter and schedules a fresh set of executors with the new epoch. Every executor checkscheckEpoch()on each poll iteration — if its epoch doesn't match the current one, it enters the drain path: finishes any in-flight tasks, flushes results, and exits. This means:startExecutorscall can be replaced by simply calling it againclaimTasksis read-only andrecordResultBatchchecksgenerationNumber+inProgressto prevent duplicate result recordingMemory pressure
Currently, the executor does not monitor memory usage. It relies on
MAX_CONCURRENCYto bound the number of in-flight tasks. If tasks have very large args or return values, memory could grow unbounded.Mitigation today: Keep
MAX_CONCURRENCYreasonable (200 default) and keep task args/results small. If a task returns a 10MB object, 200 concurrent tasks = ~2GB in memory.Future work: Add
process.memoryUsage()checks to pause claiming when heap usage exceeds a threshold (see Future Work below).Retry defaults
Future work
process.memoryUsage().heapUsedperiodically and pause claiming when heap usage exceeds a threshold. Could also trigger early handoff if memory is critically high.recordResultBatchwrites multiple step results + deletes task queue entries in a single mutation. At high batch sizes with large return values, this could exceed Convex's per-mutation write limits. Future work: track cumulative bytes written per flush batch and split into multiple mutations when approaching the limit.startStepsmutation).workflow.action("name", { ..., retry: { maxAttempts: 3 } })to opt specific actions into retry through the executor.Visualizing workflows
See the companion PR (#209) for a timeline visualization dashboard that shows workflow execution in real-time. It serves an HTTP endpoint that renders per-workflow timelines — useful for comparing standard (workpool) vs executor mode side by side, and for spotting bottlenecks like uneven shard distribution or slow steps.
Summary by CodeRabbit
Release Notes
New Features
Improvements