Skip to content

feat: executor mode with query/mutation routing and per-step retries#210

Open
sethconvex wants to merge 19 commits intomainfrom
executor-mode-core
Open

feat: executor mode with query/mutation routing and per-step retries#210
sethconvex wants to merge 19 commits intomainfrom
executor-mode-core

Conversation

@sethconvex
Copy link
Copy Markdown
Contributor

@sethconvex sethconvex commented Feb 24, 2026

What is Executor Mode?

Executor mode is an alternative execution backend for workflows that trades the workpool's per-step scheduling for long-running executor actions that poll a shared task queue. It's designed for high-throughput, IO-bound workloads — think hundreds or thousands of workflows running concurrently, each making API calls (LLM providers, external services, etc.).

When to use it

Use executor mode when:

  • You're running many concurrent workflows (100s–1000s+)
  • Steps are IO-bound (API calls, LLM requests) rather than CPU-bound
  • You're hitting Convex's 512 concurrent action limit
  • You want lower per-step latency (no scheduling hop between steps)

Stick with the standard workpool when:

  • You have a small number of workflows
  • You need fine-grained per-step scheduling options (runAt, runAfter)
  • You want the workpool's built-in retry and backoff for actions

How it works

Standard (workpool) path:

step starts → workpool schedules a separate function invocation →
function runs → onComplete mutation records result → workflow replays

Each step = 1 scheduled function + 1 completion mutation + 1 workflow replay. At scale, this means thousands of scheduler entries and DB writes.

Executor path:

step starts → task inserted into sharded taskQueue table →
executor polls shard, picks up task → executor runs it inline →
executor batches result into recordResultBatch mutation → workflow replays

Long-running executor actions (one per shard) continuously poll for work, execute tasks with bounded concurrency, and flush results in batches. This eliminates:

  • Per-step function scheduling overhead
  • Per-step onComplete callback invocations
  • The 512 concurrent action limit (actions run inline in the executor)

Shard assignment is deterministic — all steps for a given workflow hash to the same shard, so result recording for one workflow doesn't OCC-conflict with another.

Executor lifecycle: Each executor runs for ~8 minutes, then performs a zero-gap handoff — it schedules a successor, keeps claiming tasks until the successor is ready, then yields. No tasks are dropped during the transition.

Changes in this PR

Schema (src/component/schema.ts)

The previous taskQueue table only supported actions (it had actionName: v.string()). To route queries and mutations through the executor too, we replaced it with:

  • functionType (query/mutation/action) — so the executor knows how to dispatch each task
  • handle — a FunctionHandle string for queries/mutations, or a registered action name for actions
  • Optional retry config — so per-step retry settings survive the trip from journal.ts through the task queue to the executor

New executorEpoch and executorHandoff tables manage executor lifecycle — epoch tracking lets us safely restart all executors (old ones detect they're stale and drain), and handoff documents coordinate zero-gap transitions between predecessor and successor executors.

Task queue (src/component/taskQueue.ts)

These functions form the executor's interface to the database. The key design constraint is minimizing OCC conflicts at high throughput:

  • claimTasks — a query (not mutation) so reading tasks from a shard doesn't write anything. This means multiple executors can read the same shard without conflicting. Task deletion happens later, atomically with result recording.
  • recordResultBatch — records results for up to 50 steps in a single mutation (instead of one mutation per step). Returns replay candidates so the executor can trigger workflow progression. Batching here is the biggest throughput win — 50 steps = 1 mutation instead of 50.
  • replayIfReady — a separate per-workflow mutation (not inlined into recordResultBatch) so a replay failure for one workflow doesn't roll back result recording for others.
  • startExecutors — increments the epoch and schedules one executor action per shard. The epoch ensures old executors from a previous startExecutors call drain gracefully.
  • Handoff protocol (handoff, getHandoff) — coordinates the predecessor/successor transition so there's never a gap where tasks sit unclaimed.

Step routing (src/component/journal.ts)

Previously, only action steps with batchActionName could route through the executor. Queries and mutations always went through the workpool, meaning they paid the full scheduling overhead even when executors were available.

Now, when workflow.executorShards is set:

  • query/mutation steps → insert into taskQueue with the step's retry config (default: 4 attempts, 125ms, base 2). These are safe to retry since queries are read-only and mutations are transactional.
  • action steps with batchActionName → insert into taskQueue (no retry by default, since actions may have side effects).
  • Everything else → workpool (unchanged). This preserves the existing behavior for non-executor workflows and for action steps without batchActionName.

Extracted shardForWorkflow() helper to deduplicate the hash logic that was previously inlined in the action case.

Executor (src/client/index.ts)

The executor previously only knew how to call registered action handlers. Now it dispatches by function type:

  • "query"ctx.runQuery(handle, args) — calls the query function via its FunctionHandle
  • "mutation"ctx.runMutation(handle, args) — calls the mutation function via its FunctionHandle
  • "action" → looks up the handler in the registered handlers map and calls it inline (unchanged behavior, but now keyed by handle instead of actionName)

Added a retry loop wrapping all execution. The workpool has built-in retries (5 attempts, exponential backoff), so without this the executor was a regression for transient failures. The loop reads maxAttempts, initialBackoffMs, and base from the task's retry config (set by journal.ts). Math.max(maxAttempts, 1) guards against misconfigured zero-attempt retries.

Added console.error when replay retries are exhausted — without this, a workflow could get silently stuck with all steps completed but no replay triggered.

What doesn't change

  • step.ts, workflowContext.ts — step request/response protocol and runQuery/runMutation/runAction signatures are untouched. Workflows don't need any code changes to benefit from executor routing.
  • Non-executor workflows — the workpool path is fully preserved. If executorShards is not set, behavior is identical to before.
  • Events (awaitEvent/sendEvent) and nested workflows — handled before the function-step dispatch, completely untouched.
  • recordResultBatch / replayIfReady — don't reference function type, work for all step kinds.

Usage

Setup

import { WorkflowManager } from "@convex-dev/workflow";
import { components, internal } from "./_generated/api";

// Create a workflow manager with executor mode enabled
const workflow = new WorkflowManager(components.workflow, {
  executorShards: 40,  // number of executor actions to run in parallel
});

// Register actions to run inline in executors.
// These bypass the 512 concurrent action limit — they run inside
// the long-lived executor actions, not as separate invocations.
export const callLLM = workflow.action("callLLM", {
  args: { prompt: v.string() },
  handler: async (ctx, args) => {
    const response = await fetch("https://api.openai.com/...", { ... });
    return await response.json();
  },
});

export const fetchData = workflow.action("fetchData", {
  args: { url: v.string() },
  handler: async (ctx, args) => {
    const response = await fetch(args.url);
    return await response.json();
  },
});

// Create and export the executor action
export const executor = workflow.executor();
workflow.setExecutorRef(internal.index.executor);

Defining workflows with parallel steps

export const researchWorkflow = workflow.define({
  args: { topic: v.string() },
  handler: async (step, args) => {
    // Sequential: fetch initial data via a query (routed through executor automatically)
    const sources = await step.runQuery(api.research.getSources, { topic: args.topic });

    // Parallel: run multiple actions concurrently using Promise.all
    // All of these run inline in executors — no separate action invocations
    const [analysis1, analysis2, analysis3] = await Promise.all([
      step.runAction(internal.index.callLLM, { prompt: `Analyze ${sources[0]}` }),
      step.runAction(internal.index.callLLM, { prompt: `Analyze ${sources[1]}` }),
      step.runAction(internal.index.callLLM, { prompt: `Analyze ${sources[2]}` }),
    ]);

    // Sequential: save results via a mutation (also routed through executor)
    await step.runMutation(api.research.saveAnalysis, {
      topic: args.topic,
      results: [analysis1, analysis2, analysis3],
    });

    // Final summary
    const summary = await step.runAction(internal.index.callLLM, {
      prompt: `Summarize: ${analysis1}, ${analysis2}, ${analysis3}`,
    });

    return summary;
  },
});

Starting executors and workflows

// Call once to boot up all executor shards (e.g. from a setup mutation)
export const setup = internalMutation(async (ctx) => {
  await workflow.startExecutors(ctx);
});

// Start workflows as usual
export const startResearch = mutation(async (ctx, args: { topic: string }) => {
  const id = await workflow.start(ctx, internal.index.researchWorkflow, { topic: args.topic });
  return id;
});

Tuning: executor knobs and how to pick them

The executor has several constants that control its behavior. Here's what each one does, the tradeoffs, and how to think about picking values.

executorShards (constructor option)

What: Number of executor actions running in parallel, each owning a slice of the task queue.

Tradeoff: More shards = more parallelism and lower latency (fewer tasks per shard), but more running actions and more DB polling. Fewer shards = less overhead, but tasks queue up longer per shard.

How to pick:

  • Start with 40 for most workloads
  • If you're running 10K+ concurrent workflows with IO-bound steps, try 80–100
  • If you're running <100 workflows, 10–20 is fine
  • Each shard is one long-running action, so executorShards = number of persistent actions

CLAIM_LIMIT (800)

What: Maximum number of tasks to read per poll from one shard.

Tradeoff: Higher = fewer round-trips when there's a burst of work, but larger read transactions. Lower = more responsive to new work but more DB queries.

How to pick: 800 is a good default. Only lower it if you see read transaction limits being hit (very large task args).

MAX_CONCURRENCY (200)

What: Maximum tasks executing concurrently within a single executor action.

Tradeoff: Higher = more throughput per shard for IO-bound work, but more memory and more in-flight tasks that need flushing if the executor crashes. Lower = less risk, but underutilizes IO parallelism.

How to pick:

  • For IO-bound work (API calls, LLM requests): 100–300 is the sweet spot
  • For CPU-bound work: keep low (10–50), since Node.js is single-threaded
  • If tasks are very fast (<10ms), you can go higher (500+)

POLL_BACKOFF_MS (500) / POLL_BACKOFF_ACTIVE_MS (150)

What: Sleep duration between polls. Uses the shorter ACTIVE interval when tasks are in-flight, the longer one when the shard is idle.

Tradeoff: Shorter = lower latency (tasks picked up faster), but more DB reads. Longer = less DB load, but tasks sit in the queue longer.

How to pick: The defaults work well. If you need sub-100ms pickup latency, lower POLL_BACKOFF_ACTIVE_MS to 50–100. If you're cost-sensitive and latency doesn't matter, raise POLL_BACKOFF_MS to 1000–2000.

FLUSH_BATCH_SIZE (50) / FLUSH_INTERVAL_MS (500)

What: Results are buffered and flushed in batches. BATCH_SIZE controls max items per flush mutation, INTERVAL_MS controls how often the flush loop runs.

Tradeoff: Larger batches = fewer mutations (more efficient), but higher latency before workflow replay triggers. Smaller batches = faster workflow progression, but more mutation overhead.

How to pick: 50 is a good default. If your workflows have many sequential steps and you need them to progress fast, lower to 10–20. If you have mostly parallel steps and throughput matters more, raise to 100.

RESCHEDULE_MS (8 min)

What: How long an executor runs before handing off to a successor (Convex actions time out at 10 min).

Tradeoff: Longer = fewer handoffs and less overhead, but less margin before the 10-min timeout. Shorter = more frequent restarts, but safer.

How to pick: 8 minutes leaves 2 minutes of margin for the handoff protocol. Only change this if you're seeing timeout issues (lower to 6–7 min) or want to minimize handoff frequency (raise to 9 min, but risky).

Design: how the executor avoids common pitfalls

OCC (Optimistic Concurrency Control) conflicts

At scale, OCC is the #1 throughput killer. The executor avoids it at every level:

  • Deterministic sharding: shardForWorkflow() hashes the workflow ID to a shard number. All steps for one workflow go to the same shard, so recordResultBatch for workflow A never writes to the same documents as workflow B. Different shards = different executors = no cross-workflow OCC conflicts.
  • Separated replay mutations: After recordResultBatch records step results and deletes task queue entries, it returns a list of workflows that might be ready to replay. Each workflow gets its own tiny replayIfReady mutation, so a replay failure for workflow A doesn't roll back results for workflow B.
  • Read-only claiming: claimTasks is a query (not a mutation). Multiple executors can read the same shard without conflicting. Deletion happens later in recordResultBatch, atomically with result recording.

Batched mutations

Instead of one mutation per step result (the workpool's onComplete pattern), the executor buffers results in memory and flushes them in batches:

  1. processTask completes → result pushed to pendingResults[]
  2. A background flush loop runs every FLUSH_INTERVAL_MS (500ms)
  3. Each flush takes up to FLUSH_BATCH_SIZE (50) items and calls recordResultBatch — one mutation that records 50 step results, deletes 50 task queue entries, and returns replay candidates
  4. Replays are then triggered in parallel, each as a separate tiny mutation

This means 50 steps = 1 mutation instead of 50. At 1000 workflows × 4 steps each, that's ~80 mutations instead of ~4000.

Timeout avoidance

Convex actions have a 10-minute hard timeout. The executor avoids hitting it with a graceful handoff protocol:

  1. After RESCHEDULE_MS (8 min), the executor creates a handoff document and schedules a successor action
  2. The old executor keeps claiming and processing tasks normally while waiting
  3. The new executor starts, reads the handoff doc, signals "ready"
  4. The old executor sees the ready signal, marks "yielded", and drains its in-flight tasks
  5. The new executor clears the handoff doc and takes over

This gives a 2-minute safety margin and ensures zero task gap — at no point are tasks sitting in the queue without an executor claiming them.

Safe restarts with epochs

Calling startExecutors() multiple times is safe. Each call increments the executorEpoch counter and schedules a fresh set of executors with the new epoch. Every executor checks checkEpoch() on each poll iteration — if its epoch doesn't match the current one, it enters the drain path: finishes any in-flight tasks, flushes results, and exits. This means:

  • Old executors gracefully wind down instead of being killed
  • No tasks are lost — in-progress work completes before the executor exits
  • Stale executors from a crashed startExecutors call can be replaced by simply calling it again
  • Brief overlap of old + new executors is safe since claimTasks is read-only and recordResultBatch checks generationNumber + inProgress to prevent duplicate result recording

Memory pressure

Currently, the executor does not monitor memory usage. It relies on MAX_CONCURRENCY to bound the number of in-flight tasks. If tasks have very large args or return values, memory could grow unbounded.

Mitigation today: Keep MAX_CONCURRENCY reasonable (200 default) and keep task args/results small. If a task returns a 10MB object, 200 concurrent tasks = ~2GB in memory.

Future work: Add process.memoryUsage() checks to pause claiming when heap usage exceeds a threshold (see Future Work below).

Retry defaults

Step type Through executor Through workpool
Query 4 attempts, 125ms initial, base 2 workpool default (5 attempts, 500ms)
Mutation 4 attempts, 125ms initial, base 2 workpool default (5 attempts, 500ms)
Action No retry (opt-in) workpool default (no retry unless configured)

Future work

  • Memory pressure monitoring: Check process.memoryUsage().heapUsed periodically and pause claiming when heap usage exceeds a threshold. Could also trigger early handoff if memory is critically high.
  • Mutation size throttling: recordResultBatch writes multiple step results + deletes task queue entries in a single mutation. At high batch sizes with large return values, this could exceed Convex's per-mutation write limits. Future work: track cumulative bytes written per flush batch and split into multiple mutations when approaching the limit.
  • Convex limit awareness: The executor currently doesn't monitor other Convex limits (document size, transaction read/write limits, scheduler queue depth). Instrumenting these and backing off when approaching limits would prevent executor crashes under edge-case workloads.
  • Scheduler backpressure: At very high throughput, the Convex scheduler can fall behind — if executors are scheduling successors, replaying workflows, and flushing results faster than the scheduler can service them, it may queue up and stop servicing for several minutes while it catches up. If an executor gets killed during this window (e.g. timeout, OCC storm), tasks in its shard have no executor to pick them up until the scheduler processes the rescheduled successor. Future work: detect scheduler lag and throttle executor handoffs / flush rates accordingly.
  • Autoscaling shards: Dynamically adjust the number of executor shards based on queue depth. When the task queue is deep, spin up more shards; when it's empty, let idle executors terminate without rescheduling.
  • Idle executor shutdown: If a shard has had zero tasks for N consecutive polls, skip rescheduling on handoff. A new executor can be spun up on demand when tasks appear (e.g. triggered by the startSteps mutation).
  • Per-action retry configuration: Allow workflow.action("name", { ..., retry: { maxAttempts: 3 } }) to opt specific actions into retry through the executor.
  • Observability: Expose per-shard metrics (tasks claimed, tasks completed, flush latency, retry counts) via a query for dashboards.

Visualizing workflows

See the companion PR (#209) for a timeline visualization dashboard that shows workflow execution in real-time. It serves an HTTP endpoint that renders per-workflow timelines — useful for comparing standard (workpool) vs executor mode side by side, and for spotting bottlenecks like uneven shard distribution or slow steps.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added executor-based batch action execution with sharded task distribution
    • Introduced workflow coordinator for improved scheduling and orchestration
    • Added workflow analytics: count by name pagination, timeline views, and creation time bucketing
  • Improvements

    • Enhanced step execution tracking with automatic result recording and replay
    • Optimized workflow scheduling with coordinator-based routing

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 24, 2026

📝 Walkthrough

Walkthrough

This PR introduces an executor-based batch action system for distributed workflow execution. It adds a coordinator pattern for workflow scheduling, comprehensive task queue infrastructure for executor management, schema extensions for executor tracking, and client APIs for registering batch actions with sharded executor support.

Changes

Cohort / File(s) Summary
Configuration & Dependencies
example/convex/convex.config.ts, package.json
Added workpool middleware registration and updated dependencies (Anthropic SDK ^0.78.0, Convex ^1.32.0).
Schema Extensions
example/convex/schema.ts, src/component/schema.ts
Extended schema with benchmarkResults table and new executor infrastructure tables (coordinatorState, executorEpoch, taskQueue, executorHandoff). Added workflow fields (readyToRun, executorShards) and step fields (executorFinishedAt, flushCalledAt).
Client API & Batch Actions
src/client/index.ts, src/client/step.ts, src/client/workflowMutation.ts
Introduced WorkflowRateLimitError class, action() and executor() registration methods, executor lifecycle management (setExecutorRef, startExecutors). Extended step execution to track batchActionName for executor routing.
Workflow Coordination
src/component/coordinator.ts, src/component/workflow.ts
Implemented coordinator mutations (ensureCoordinatorRunning, coordinator, runWorkflowBatch) for scheduling and executing ready workflows. Enhanced workflow creation to support executorShards and coordinator-based async paths. Added public queries (countByNamePage, countByName, creationTimeBuckets, timelinePage).
Task Queue & Executor Infrastructure
src/component/taskQueue.ts, src/component/pool.ts, src/component/event.ts, src/component/journal.ts
Introduced comprehensive taskQueue module with task claiming, result recording, inline replay, and handoff management. Refactored pool.ts to use direct scheduling instead of workpool. Updated journal.ts to support sharded executor paths and batchActionName routing. Simplified event.ts by removing workpool dependencies.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant WorkflowManager
    participant Coordinator
    participant TaskQueue
    participant Executor
    participant Workflow

    Client->>WorkflowManager: register action(name, handler)
    WorkflowManager->>WorkflowManager: store handler in executorActionHandlers
    
    Client->>WorkflowManager: create(args, executorShards=N)
    WorkflowManager->>Workflow: create workflow with readyToRun=true, executorShards=N
    
    Coordinator->>Workflow: fetch workflows where readyToRun=true
    Coordinator->>Workflow: claim workflows (set readyToRun=false)
    Coordinator->>Coordinator: batch process runWorkflowBatch
    
    Coordinator->>Workflow: invoke workflow mutation (workflowHandle)
    Workflow->>TaskQueue: enqueue step as task (shard-distributed)
    
    Executor->>TaskQueue: claimTasks(shard)
    TaskQueue->>Executor: return task batch (handle, args, stepId)
    
    Executor->>Executor: execute action via handler
    Executor->>TaskQueue: recordResult(stepId, result)
    TaskQueue->>Workflow: replay workflow if ready
    Workflow->>Workflow: continue execution
Loading
sequenceDiagram
    participant StepExecution
    participant Journal
    participant TaskQueue as Task Queue
    participant Executor
    participant WorkflowReplay

    StepExecution->>Journal: startSteps(steps, batchActionNames)
    Journal->>Journal: identify batchActionName for action steps
    
    alt Has batchActionName and executorShards
        Journal->>TaskQueue: insert task (shard, functionType, handle, args, stepId)
        Journal->>Journal: return step with workId=executor:shard:taskId
    else No batchActionName or no executorShards
        Journal->>Journal: direct enqueue via standard path
    end
    
    Executor->>TaskQueue: claimTasks(shard, limit)
    TaskQueue->>Executor: return task batch
    Executor->>Executor: execute task via registered handler
    Executor->>TaskQueue: recordResult(stepId, result, workflowId)
    
    TaskQueue->>Journal: update step result
    TaskQueue->>TaskQueue: check if last in-progress step
    
    alt Last step finished
        TaskQueue->>WorkflowReplay: replayIfReady(workflowId)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Suggested reviewers

  • ianmacartney

Poem

🐰 A rabbit's tale of executors grand,
Batch actions now take choreographed stand,
Sharded and coordinated with care,
Tasks queue and replay through executor's fair,
Workflows now scale with distributed flair! 🚀

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately summarizes the main feature: executor mode with query/mutation routing and per-step retries, which aligns with the comprehensive changes across schema, routing, task queue, and executor implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch executor-mode-core

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor Author

sethconvex commented Feb 24, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@sethconvex sethconvex changed the title [checkpoint] Manual: Sharded task queue + executor-driven chaining implementation feat: executor mode with query/mutation routing and per-step retries Feb 24, 2026
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented Feb 24, 2026

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/workflow/@convex-dev/workflow@210

commit: de4a686

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (7)
src/component/workflow.ts (1)

291-368: timelinePage fetches all steps per workflow without pagination — potential read amplification.

For each workflow in the paginated result, all steps are fetched via .collect() (line 335). If workflows have many steps (e.g., hundreds), this could significantly amplify reads and risk hitting query read limits. Consider adding a .take(limit) or documenting the expected step count per workflow.

♻️ Optional: limit steps per workflow
         const stepDocs = await ctx.db
           .query("steps")
           .withIndex("workflow", (q) => q.eq("workflowId", wf._id))
-          .collect();
+          .take(100); // Limit to prevent excessive reads for workflows with many steps
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 291 - 368, timelinePage currently
loads all steps for each workflow by calling
ctx.db.query("steps").withIndex("workflow", (q) => q.eq("workflowId",
wf._id)).collect(), which can cause read amplification for workflows with many
steps; change the query to limit reads (e.g., use .take(limit) or similar paging
on the steps query) and surface the limit via a constant or an arg so callers
know the cap, update usage around stepDocs and stepDocs.map(...) accordingly,
and add a brief comment noting why steps are capped; ensure you modify the query
invocation in timelinePage (the steps lookup) rather than other code paths.
src/component/taskQueue.ts (3)

46-164: Significant code duplication between recordResult and recordResultBatch.

The step-recording logic (journal lookup, generation check, inProgress check, result recording, task deletion, logging) is nearly identical between recordResult (lines 68-130) and recordResultBatch (lines 231-294). The main difference is that recordResult does inline replay while recordResultBatch collects replay candidates.

Extract the shared logic into a helper function to reduce duplication and ensure bug fixes apply to both paths.

♻️ Sketch of a shared helper
async function recordStepResult(
  ctx: MutationCtx,
  logger: Logger,
  stepId: Id<"steps">,
  result: RunResult,
  generationNumber: number,
): Promise<{ workflowId: Id<"workflows">; workflowHandle: string; generationNumber: number } | null> {
  // Shared: deleteTask, journal lookup, generation check, inProgress check,
  // result recording, logging, task deletion, runResult check
  // Returns replay candidate or null
}

Then recordResult calls this + does inline replay, and recordResultBatch calls this in a loop + collects candidates.

Also applies to: 213-307

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 46 - 164, The recordResult and
recordResultBatch handlers duplicate the same step-recording workflow (journal
lookup, generation/inProgress checks, writing runResult, deleteTask, and
logging); extract this into a shared async helper (e.g., recordStepResult(ctx:
MutationCtx, logger: Logger, stepId: Id<"steps">, result: RunResult,
generationNumber: number)) that performs the deleteTask, validates journalEntry
and workflow, applies the runResult to journalEntry, emits the console.event,
deletes the task, and returns either a replay candidate { workflowId,
workflowHandle, generationNumber } or null; then update recordResult to call the
helper and perform inline replay when a candidate is returned, and update
recordResultBatch to call the helper in a loop and collect candidates for
batched replay. Ensure the helper reuses the same symbols: journalEntry,
workflow, deleteTask, and preserves existing error logging and
ctx.db.replace/patch semantics.

21-44: claimTasks is a query (read-only) — tasks are not actually "claimed" (no lock/delete).

The naming claimTasks suggests exclusive claiming, but this is a pure read. Multiple executors or concurrent calls will see the same tasks. This is by design (deletion happens in recordResultBatch), and the executor's inFlightStepIds set prevents local double-processing. However, the name could mislead future maintainers into thinking tasks are atomically claimed.

Consider renaming to peekTasks or fetchTasks, or adding a comment in the function's docstring clarifying the read-only semantics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 21 - 44, The function claimTasks is
misleading because it's a read-only query (tasks are only removed in
recordResultBatch) — rename claimTasks to peekTasks (or fetchTasks) throughout
the codebase and update any call sites/tests, or if renaming is undesirable, add
a clear docstring/comments on the claimTasks query explaining it is read-only
and that exclusive claiming is enforced by inFlightStepIds and
recordResultBatch; ensure references to claimTasks (calls, exports, and any API
docs) are updated to match the new name or note the semantics.

345-377: diagnose iterates all shards with per-shard queries — O(shards × tasks) reads.

For the default 40 shards, this issues 40 indexed queries and counts results by iterating. For diagnostic purposes this is acceptable, but be aware this could hit read limits if shards have many tasks. Consider adding a take limit per shard or documenting the expected usage pattern (e.g., "call from dashboard, not from hot paths").

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 345 - 377, The diagnose handler does
O(shards × tasks) reads by iterating every task per shard; add a per-shard cap
or document intended use. Modify the query handler (diagnose) to accept an
optional perShardLimit (e.g., add numShards: v.optional(v.number()) and
perShardLimit: v.optional(v.number()) to args) and in the shard loop apply that
cap (use the DB query's pagination/.take(perShardLimit) or break the for-await
loop after reaching the limit) so each shard issues at most that many reads;
alternatively, if you prefer not to change the signature, add a clear
comment/docstring above diagnose explaining this is a dashboard-only diagnostic
and may hit read limits and suggest a safe per-shard limit when calling it.
Ensure you reference the existing symbols diagnose, numShards, shards,
taskQueueCounts and the per-shard loop when making the change.
src/client/index.ts (3)

445-465: Dropping pending results after MAX_FLUSH_RETRIES is acceptable but warrants a log.

Line 459 sets pendingResults.length = 0 which discards unrecorded results. The comment says tasks will be re-processed by the next executor, which is correct since failed recordResultBatch calls mean the tasks remain in the queue. However, the work already done by this executor for those tasks is wasted, and there's no log to indicate this happened.

♻️ Log dropped results
               if (retries >= MAX_FLUSH_RETRIES) {
                 // Give up on remaining items — tasks stay in queue,
                 // next executor run will re-process them.
+                // eslint-disable-next-line no-console
+                console.error(`Dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`);
                 pendingResults.length = 0;
                 break;
               }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 445 - 465, In waitUntilIdle, when retries
reach MAX_FLUSH_RETRIES and the code sets pendingResults.length = 0, add a log
entry so dropped/unrecorded results are visible; call the existing logger (e.g.,
processLogger.warn or client logger used in this module) and include context:
number of dropped items (pendingResults.length before clearing), the retry count
(MAX_FLUSH_RETRIES), and that recordResultBatch/flush failed so these results
will be reprocessed later; keep behavior the same otherwise.

6-7: Placeholder type bypasses all type safety.

type BatchWorkpool = any effectively disables type checking for all this.batch usage. Consider using a minimal structural interface instead.

♻️ Suggested minimal interface
-// BatchWorkpool not yet exported from `@convex-dev/workpool`@0.3.1
-type BatchWorkpool = any;
+// Minimal interface until BatchWorkpool is exported from `@convex-dev/workpool`
+type BatchWorkpool = {
+  action(name: string, opts: { args: Record<string, any>; handler: (...args: any[]) => any }): any;
+  enqueueByHandle(ctx: any, name: string, args: any, opts: any): Promise<any>;
+};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 6 - 7, Replace the unsafe placeholder "type
BatchWorkpool = any" with a minimal structural interface named BatchWorkpool
that declares only the properties and methods actually accessed on this.batch in
this file (e.g., the method names, signatures and any used properties); update
usages to rely on that interface and mark members optional if uncertain, and add
a TODO to switch to the real exported type from `@convex-dev/workpool` once
available. This preserves type safety while remaining compatible until the
package exports the concrete type.

300-314: Flush error recovery unshifts batch but doesn't rethrow — later pendingResults.length > 0 checks may spin silently.

When recordResultBatch fails (line 312), the batch is put back and flush returns. The flushLoop (line 358) will retry on the next interval. However, if the mutation keeps failing (e.g., due to a persistent OCC conflict or a deployment issue), the flush loop will spin every 500ms indefinitely, silently retrying the same failing batch. This could contribute to action timeout without any diagnostic logging.

♻️ Add logging on flush failure
              } catch {
+                // Log so persistent failures are visible in action logs.
+                // eslint-disable-next-line no-console
+                console.warn(`recordResultBatch failed, ${batch.length} items re-queued for retry`);
                 pendingResults.unshift(...batch);
                 return;
               }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 300 - 314, The flush function swallows
errors from ctx.runMutation and simply requeues the batch then returns, which
causes silent tight retries; modify the catch in flush (the catch around
ctx.runMutation of component.taskQueue.recordResultBatch) to catch the error as
a variable (e.g., catch (err)), log a descriptive error with context (include
batch size, first item ids, FLUSH_BATCH_SIZE and any request identifiers) using
the module's existing logger (e.g., ctx.logger or processLogger), requeue the
batch as currently done, and then rethrow the error (or return an error
sentinel) so the caller/flushLoop can apply backoff/stop and surface diagnostics
instead of spinning silently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@package.json`:
- Around line 66-67: The package.json currently lists the dependency
"@anthropic-ai/sdk" which is not used anywhere; either remove the
"@anthropic-ai/sdk" entry from package.json and regenerate your lockfile (run
npm install / yarn install) to keep deps in sync, or if you intend to keep it,
add a brief comment in the repo README or a deps.md explaining its intended
future purpose and where it will be used; locate the dependency by name
("@anthropic-ai/sdk") in package.json and act accordingly.

In `@src/client/index.ts`:
- Around line 372-417: The loop can skip entirely when task.retry?.maxAttempts
is 0 leaving result undefined and pushing result! into pendingResults; in
processTask ensure maxAttempts cannot be zero (e.g., compute an
effectiveMaxAttempts = Math.max(1, task.retry?.maxAttempts ?? 1)) or immediately
set result to a failed RunResult when maxAttempts === 0 (with a clear error
message referencing task.handle/stepId), then use that result when pushing into
pendingResults; update references to maxAttempts in the retry loop to use the
effective value so result is always defined before pendingResults.push.
- Around line 293-355: The flush function currently gives up silently if all 3
retry attempts of ctx.runMutation(component.taskQueue.replayIfReady, ...) fail,
so add an error log and (preferably) re-enqueue the candidate for later retry:
inside the final for (const c of failed) retry loop in flush (the replayIfReady
retry section), after the retry loop exits without success, call a logging
method (e.g., console.error or ctx.logger.error) that includes c.workflowId,
c.generationNumber and c.workflowHandle and then enqueue the candidate into a
durable retry mechanism (e.g., invoke a new or existing mutation like
component.taskQueue.enqueueReplay or push to a persistent retry store) so the
replay won’t be dropped permanently; ensure inFlightStepIds/pendingResults
handling remains unchanged and the log/requeue is placed where a replay has
definitively failed.

In `@src/client/step.ts`:
- Around line 199-202: The regex used to split fnName unnecessarily escapes the
forward slash; update the split call in the safeFunctionName handling
(variables: fnName, parts, baseName, this.batchActionNames) to use /[:/]/
instead of /[:\/]/ so it satisfies ESLint no-useless-escape (i.e., change
fnName.split(/[:\/]/) to fnName.split(/[:/]/)).

In `@src/component/journal.ts`:
- Around line 185-220: The executor-mode taskQueue inserts currently hard-code
retry: DEFAULT_QM_RETRY and ignore per-step settings; change both query and
mutation branches that call ctx.db.insert("taskQueue", {...}) to compute a retry
value from the step (e.g., use step.retry when present, falling back to
DEFAULT_QM_RETRY) and pass that computed value instead of DEFAULT_QM_RETRY;
update the insert call in the blocks using shardForWorkflow(workflow._id as
string, workflow.executorShards) and ensure the same retry semantics/typing as
the workpool.enqueueQuery/workpool.enqueueMutation path are preserved.

In `@src/component/taskQueue.ts`:
- Line 3: Remove the unused imported symbol MutationCtx from the import
statement that currently reads import { mutation, query, type MutationCtx } from
"./_generated/server.js"; — edit the import to only include the used exports
(mutation and query) so MutationCtx is not imported, ensuring the linter warning
goes away.

In `@src/component/workflow.ts`:
- Around line 454-469: The code assumes ctx.db.normalizeId("steps", stepId)!
always returns a valid id which can be null for malformed stepId; change the
executor branch to first compute const normalizedStepId =
ctx.db.normalizeId("steps", stepId) and if normalizedStepId is falsy, log or
warn (including stepId/workId) and skip the taskQueue lookup, otherwise use
normalizedStepId in ctx.db.query(...).withIndex("by_stepId", q => q.eq("stepId",
normalizedStepId)).unique(); this avoids passing null into the query and
prevents unexpected throws when handling step.workId that
startsWith("executor:").
- Line 10: createHandler and completeHandler currently hardcode
DEFAULT_LOG_LEVEL; change them to use getDefaultLogger(ctx) so they respect
user-configured log levels. Update the imports to include getDefaultLogger from
"./logging.js" and replace any createLogger(..., DEFAULT_LOG_LEVEL) usage inside
createHandler and completeHandler with createLogger(..., getDefaultLogger(ctx))
(or fetch the logger once via getDefaultLogger(ctx) and pass it into
createLogger), mirroring how getStatus and cleanup obtain their logger.

---

Nitpick comments:
In `@src/client/index.ts`:
- Around line 445-465: In waitUntilIdle, when retries reach MAX_FLUSH_RETRIES
and the code sets pendingResults.length = 0, add a log entry so
dropped/unrecorded results are visible; call the existing logger (e.g.,
processLogger.warn or client logger used in this module) and include context:
number of dropped items (pendingResults.length before clearing), the retry count
(MAX_FLUSH_RETRIES), and that recordResultBatch/flush failed so these results
will be reprocessed later; keep behavior the same otherwise.
- Around line 6-7: Replace the unsafe placeholder "type BatchWorkpool = any"
with a minimal structural interface named BatchWorkpool that declares only the
properties and methods actually accessed on this.batch in this file (e.g., the
method names, signatures and any used properties); update usages to rely on that
interface and mark members optional if uncertain, and add a TODO to switch to
the real exported type from `@convex-dev/workpool` once available. This preserves
type safety while remaining compatible until the package exports the concrete
type.
- Around line 300-314: The flush function swallows errors from ctx.runMutation
and simply requeues the batch then returns, which causes silent tight retries;
modify the catch in flush (the catch around ctx.runMutation of
component.taskQueue.recordResultBatch) to catch the error as a variable (e.g.,
catch (err)), log a descriptive error with context (include batch size, first
item ids, FLUSH_BATCH_SIZE and any request identifiers) using the module's
existing logger (e.g., ctx.logger or processLogger), requeue the batch as
currently done, and then rethrow the error (or return an error sentinel) so the
caller/flushLoop can apply backoff/stop and surface diagnostics instead of
spinning silently.

In `@src/component/taskQueue.ts`:
- Around line 46-164: The recordResult and recordResultBatch handlers duplicate
the same step-recording workflow (journal lookup, generation/inProgress checks,
writing runResult, deleteTask, and logging); extract this into a shared async
helper (e.g., recordStepResult(ctx: MutationCtx, logger: Logger, stepId:
Id<"steps">, result: RunResult, generationNumber: number)) that performs the
deleteTask, validates journalEntry and workflow, applies the runResult to
journalEntry, emits the console.event, deletes the task, and returns either a
replay candidate { workflowId, workflowHandle, generationNumber } or null; then
update recordResult to call the helper and perform inline replay when a
candidate is returned, and update recordResultBatch to call the helper in a loop
and collect candidates for batched replay. Ensure the helper reuses the same
symbols: journalEntry, workflow, deleteTask, and preserves existing error
logging and ctx.db.replace/patch semantics.
- Around line 21-44: The function claimTasks is misleading because it's a
read-only query (tasks are only removed in recordResultBatch) — rename
claimTasks to peekTasks (or fetchTasks) throughout the codebase and update any
call sites/tests, or if renaming is undesirable, add a clear docstring/comments
on the claimTasks query explaining it is read-only and that exclusive claiming
is enforced by inFlightStepIds and recordResultBatch; ensure references to
claimTasks (calls, exports, and any API docs) are updated to match the new name
or note the semantics.
- Around line 345-377: The diagnose handler does O(shards × tasks) reads by
iterating every task per shard; add a per-shard cap or document intended use.
Modify the query handler (diagnose) to accept an optional perShardLimit (e.g.,
add numShards: v.optional(v.number()) and perShardLimit: v.optional(v.number())
to args) and in the shard loop apply that cap (use the DB query's
pagination/.take(perShardLimit) or break the for-await loop after reaching the
limit) so each shard issues at most that many reads; alternatively, if you
prefer not to change the signature, add a clear comment/docstring above diagnose
explaining this is a dashboard-only diagnostic and may hit read limits and
suggest a safe per-shard limit when calling it. Ensure you reference the
existing symbols diagnose, numShards, shards, taskQueueCounts and the per-shard
loop when making the change.

In `@src/component/workflow.ts`:
- Around line 291-368: timelinePage currently loads all steps for each workflow
by calling ctx.db.query("steps").withIndex("workflow", (q) => q.eq("workflowId",
wf._id)).collect(), which can cause read amplification for workflows with many
steps; change the query to limit reads (e.g., use .take(limit) or similar paging
on the steps query) and surface the limit via a constant or an arg so callers
know the cap, update usage around stepDocs and stepDocs.map(...) accordingly,
and add a brief comment noting why steps are capped; ensure you modify the query
invocation in timelinePage (the steps lookup) rather than other code paths.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b1552ab and 372547f.

⛔ Files ignored due to path filters (4)
  • package-lock.json is excluded by !**/package-lock.json
  • src/component/_generated/api.ts is excluded by !**/_generated/**
  • src/component/_generated/component.ts is excluded by !**/_generated/**
  • src/component/_generated/server.ts is excluded by !**/_generated/**
📒 Files selected for processing (12)
  • package.json
  • src/client/index.ts
  • src/client/step.ts
  • src/client/workflowMutation.ts
  • src/component/batch.ts
  • src/component/coordinator.ts
  • src/component/event.ts
  • src/component/journal.ts
  • src/component/pool.ts
  • src/component/schema.ts
  • src/component/taskQueue.ts
  • src/component/workflow.ts

Comment thread package.json Outdated
Comment on lines 66 to 67
"@anthropic-ai/sdk": "^0.75.0",
"async-channel": "^0.2.0"
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

curl -s https://registry.npmjs.org/@anthropic-ai/sdk | jq -r '.["dist-tags"].latest, .license'

Repository: get-convex/workflow

Length of output: 73


🏁 Script executed:

rg "@anthropic-ai/sdk|from.*anthropic" --type js --type ts --type json -i

Repository: get-convex/workflow

Length of output: 328


🏁 Script executed:

# Search for any anthropic references more broadly
rg -i "anthropic" --type js --type ts --type jsx --type tsx

Repository: get-convex/workflow

Length of output: 90


🏁 Script executed:

# Check if it's listed as a dependency elsewhere (transitive or direct)
grep -r "anthropic" . --include="*.json" --include="*.js" --include="*.ts" --exclude-dir=node_modules --exclude-dir=.git

Repository: get-convex/workflow

Length of output: 394


🏁 Script executed:

# View the package.json to understand project purpose
cat package.json | head -40

Repository: get-convex/workflow

Length of output: 1907


🏁 Script executed:

# Check the project structure
git ls-files | head -30

Repository: get-convex/workflow

Length of output: 806


🏁 Script executed:

# Look for any files that might use anthropic
find . -name "*.md" -o -name "*.js" -o -name "*.ts" | grep -v node_modules | head -20

Repository: get-convex/workflow

Length of output: 666


🏁 Script executed:

cat ./example/convex/transcription.ts

Repository: get-convex/workflow

Length of output: 2738


🏁 Script executed:

# Check the README to see if anthropic is documented as a dependency
cat README.md | grep -A 5 -B 5 -i anthropic

Repository: get-convex/workflow

Length of output: 45


🏁 Script executed:

# Search in all files including test files
find . -type f \( -name "*.ts" -o -name "*.js" \) ! -path "*/node_modules/*" ! -path "*/.git/*" -exec grep -l "anthropic" {} \;

Repository: get-convex/workflow

Length of output: 45


🏁 Script executed:

# Check if it's mentioned in documentation
cat README.md

Repository: get-convex/workflow

Length of output: 20472


🏁 Script executed:

# Search for require() patterns with anthropic
rg "require.*anthropic|dynamic.*anthropic|lazy.*anthropic" --type ts --type js

Repository: get-convex/workflow

Length of output: 45


🏁 Script executed:

# Check if it's used indirectly (transitive dependency check)
cat package.json | grep -A 20 '"dependencies"'

Repository: get-convex/workflow

Length of output: 705


Remove unused @anthropic-ai/sdk dependency or clarify its purpose.

This dependency is not imported or used anywhere in the codebase. The example transcription workflow uses the OpenAI SDK instead. If this is needed for a future feature or external use case, document it; otherwise, remove it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@package.json` around lines 66 - 67, The package.json currently lists the
dependency "@anthropic-ai/sdk" which is not used anywhere; either remove the
"@anthropic-ai/sdk" entry from package.json and regenerate your lockfile (run
npm install / yarn install) to keep deps in sync, or if you intend to keep it,
add a brief comment in the repo README or a deps.md explaining its intended
future purpose and where it will be used; locate the dependency by name
("@anthropic-ai/sdk") in package.json and act accordingly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a229c69 — removed the unused @anthropic-ai/sdk dependency.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment thread src/client/index.ts
Comment thread src/client/index.ts
Comment thread src/client/step.ts
Comment thread src/component/journal.ts
Comment thread src/component/taskQueue.ts Outdated
Comment thread src/component/workflow.ts Outdated
Comment thread src/component/workflow.ts
Add executor mode: long-running actions poll a sharded task queue,
replacing per-step workpool scheduling for high-throughput workflows.

- Route queries, mutations, and actions through task queue when
  executorShards is set (deterministic shard by workflow ID)
- Per-step retry with exponential backoff (queries/mutations: 4
  attempts by default, actions: opt-in)
- Batched result recording (50 results per mutation)
- Zero-gap executor handoff before 10-min action timeout
- Epoch-based lifecycle for safe restarts via startExecutors

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sethconvex and others added 2 commits February 23, 2026 23:49
BatchWorkpool was an alternative batch execution approach that was never
released. The executor mode supersedes it entirely, so remove all
references to simplify the codebase.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused @anthropic-ai/sdk dependency from package.json
- Use getDefaultLogger(ctx) in createHandler and completeHandler to
  respect user-configured log levels instead of hardcoding DEFAULT_LOG_LEVEL
- Null-check normalizeId result in executor cancel path to avoid
  passing null into the taskQueue index query

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

♻️ Duplicate comments (6)
package.json (1)

66-67: ⚠️ Potential issue | 🟠 Major

Unused @anthropic-ai/sdk dependency inflates install footprint for all consumers.

This is listed under dependencies (not devDependencies), so every consumer of @convex-dev/workflow will transitively install the Anthropic SDK. No imports or references to this package exist in the codebase. Either remove it or move it to devDependencies if it's only needed for examples/tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@package.json` around lines 66 - 67, Remove the unused runtime dependency
"@anthropic-ai/sdk" from the package.json dependencies block (or move it to
devDependencies if used only for tests/examples); search the repo for any
imports/usages of "@anthropic-ai/sdk" to confirm it's unused, update
package.json accordingly (transfer the entry to devDependencies if needed), then
run an install and update lockfile (npm/yarn/pnpm) so consumers won't install
the unused SDK transitively; ensure package.json's "dependencies" no longer
contains "@anthropic-ai/sdk".
src/component/taskQueue.ts (1)

1-6: Unused MutationCtx import has been removed.

The previous review flagged this. The import now correctly includes only mutation and query.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 1 - 6, The unused import MutationCtx
was removed from the import list; ensure there are no remaining references to
MutationCtx elsewhere in this file (e.g., in any function signatures or type
annotations) and that the file only uses the imported symbols mutation and
query; if any remaining references exist replace them with the appropriate
context type or remove them, then run a type-check/build to confirm no
unused-import or unresolved-type errors remain.
src/component/workflow.ts (2)

452-464: normalizeId with ! assertion can pass null on malformed stepId.

This was flagged previously. If stepId (extracted from the "executor:" prefix at line 455) is not a valid "steps" document ID, normalizeId returns null and the ! forwards it into the index query, which could produce unexpected behavior.

🛡️ Proposed fix — guard before querying
              const stepId = (step.workId as string).slice("executor:".length);
-              const taskEntry = await ctx.db
-                .query("taskQueue")
-                .withIndex("by_stepId", (q) =>
-                  q.eq("stepId", ctx.db.normalizeId("steps", stepId)!),
-                )
-                .unique();
-              if (taskEntry) {
-                await ctx.db.delete(taskEntry._id);
-              }
+              const normalizedStepId = ctx.db.normalizeId("steps", stepId);
+              if (normalizedStepId) {
+                const taskEntry = await ctx.db
+                  .query("taskQueue")
+                  .withIndex("by_stepId", (q) =>
+                    q.eq("stepId", normalizedStepId),
+                  )
+                  .unique();
+                if (taskEntry) {
+                  await ctx.db.delete(taskEntry._id);
+                }
+              }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 452 - 464, The code currently calls
ctx.db.normalizeId("steps", stepId)! and can pass null into the taskQueue index
query; update the executor-prefixed branch (where step.workId and stepId are
derived) to first call const normalizedStepId = ctx.db.normalizeId("steps",
stepId) and guard it (if falsy, skip the DB query/cleanup), then use
normalizedStepId in the .withIndex("by_stepId", q => q.eq("stepId",
normalizedStepId)) call and proceed to delete the taskEntry only when found;
this prevents passing null into the index query and avoids querying with
malformed IDs.

56-58: createHandler still uses hardcoded log level instead of user-configured one.

This was flagged previously: createLogger(DEFAULT_LOG_LEVEL) bypasses the user-configured log level stored in the database. Both getStatus (line 97) and cleanup (line 522) correctly use getDefaultLogger(ctx). The same should apply here and in completeHandler (line 426).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 56 - 58, The createHandler function
(and similarly completeHandler) currently constructs a logger with
createLogger(DEFAULT_LOG_LEVEL) which ignores the user-configured log level;
replace those calls so they obtain the logger via getDefaultLogger(ctx) instead
(e.g., change const console = createLogger(DEFAULT_LOG_LEVEL) to const console =
getDefaultLogger(ctx)), ensuring ctx is passed into the function scope where
used; update both createHandler and completeHandler to use getDefaultLogger(ctx)
consistently like getStatus and cleanup do.
src/client/index.ts (2)

328-332: Past review finding has been addressed — maxAttempts can no longer be 0.

Math.max(task.retry?.maxAttempts ?? 1, 1) ensures the retry loop always executes at least once, so result is always defined before the push on line 372.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 328 - 332, The
Math.max(task.retry?.maxAttempts ?? 1, 1) in processTask already guarantees
maxAttempts is at least 1 so the retry loop will run and result will be defined
before the push; remove any redundant guard or comment that handled a zero
maxAttempts case (or mark that previous check as resolved) and keep the current
definition of maxAttempts, initialBackoffMs and base in the processTask
function.

285-304: Past review finding has been addressed — replay failure is now logged.

The previous review flagged that silently dropping replays after 3 retries could leave workflows stuck. The replaySucceeded flag and console.error at lines 300-303 now surface the failure. Good improvement.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 285 - 304, Replace the plain console.error
and the silent catch with structured logging and error capture: inside the retry
loop around ctx.runMutation (calling component.taskQueue.replayIfReady) capture
the thrown error (e.g., lastError) in the catch block, and after the loop when
replaySucceeded is false call the project's logger (not console) — e.g.,
ctx.logger.error or the available component/process logger — and log a message
including c.workflowId and c.generationNumber plus the lastError details so
failures are recorded with context.
🧹 Nitpick comments (9)
src/component/coordinator.ts (1)

46-62: Coordinator always reschedules itself even when fewer than COORDINATOR_READ_BATCH workflows were found.

When ready.length < COORDINATOR_READ_BATCH, all ready workflows have been claimed in this pass, so the next invocation will find 0 and exit. This wastes one scheduler invocation per drain cycle. Consider only rescheduling when ready.length === COORDINATOR_READ_BATCH.

♻️ Suggested optimization
-    // Reschedule self to pick up more
-    await ctx.scheduler.runAfter(0, internal.coordinator.coordinator);
+    // Reschedule self only if there might be more
+    if (ready.length === COORDINATOR_READ_BATCH) {
+      await ctx.scheduler.runAfter(0, internal.coordinator.coordinator);
+    } else {
+      const state = await ctx.db.query("coordinatorState").first();
+      if (state) {
+        await ctx.db.patch(state._id, { scheduled: false });
+      }
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/coordinator.ts` around lines 46 - 62, The coordinator currently
always reschedules itself at the end of the loop (the call to
ctx.scheduler.runAfter(0, internal.coordinator.coordinator)), which wastes one
scheduler invocation when fewer than COORDINATOR_READ_BATCH workflows were
returned; change the logic in the coordinator function to only call
ctx.scheduler.runAfter(..., internal.coordinator.coordinator) when ready.length
=== COORDINATOR_READ_BATCH (i.e., a full batch was claimed), leaving out the
reschedule when fewer items were found so the scheduler isn’t invoked
unnecessarily.
src/component/pool.ts (1)

291-321: Duplicated error-handling pattern — consider extracting a shared helper.

The try/catch + ctx.db.patch(workflowId, { runResult: { kind: "failed", error } }) pattern is repeated in onCompleteHandler (Line 206), directRunWorkflow (Line 312), and runWorkflowBatch in coordinator.ts (Line 92). A small runWorkflowSafe(ctx, workflow, generationNumber) helper would reduce duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/pool.ts` around lines 291 - 321, Extract the repeated try/catch
+ db.patch failure-handling into a new helper function (e.g., runWorkflowSafe)
and replace the duplicated blocks in directRunWorkflow, onCompleteHandler, and
runWorkflowBatch to call it; runWorkflowSafe(ctx, workflow, generationNumber)
should invoke ctx.runMutation(workflow.workflowHandle as
FunctionHandle<"mutation">, { workflowId: workflow.id, generationNumber }),
catch any error, normalize the message (e instanceof Error ? e.message :
String(e)), log it (reuse createLogger(DEFAULT_LOG_LEVEL) or accept a logger
param), and persist the failure with ctx.db.patch(workflow.id, { runResult: {
kind: "failed", error } }); ensure the helper returns success/failure so callers
can act accordingly.
src/component/taskQueue.ts (2)

346-378: diagnose defaults to 40 shards — should match the actual configured shard count.

Line 356 uses numShards ?? 40 as the default. If the actual deployment uses a different shard count, this diagnostic will either miss shards or scan non-existent ones. Consider requiring the argument or reading the shard count from configuration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 346 - 378, The diagnose handler uses
a hardcoded default shards = numShards ?? 40 which can mismatch deployment;
update diagnose (handler) to not assume 40 — either make numShards a required
arg or read the real shard count from the application's configuration/source
(e.g., use the configured shardCount value from ctx.settings or config service)
and assign shards = numShards ?? config.shardCount, and remove the magic number
40; ensure the function names/variables referenced are diagnose, numShards, and
shards so reviewers can locate and verify the change.

296-303: Replay candidates intentionally skip the in-progress step check — document this for maintainers.

Line 297 only checks workflow.runResult === undefined but doesn't verify whether other in-progress steps remain. This is correct by design (the check is deferred to replayIfReady to minimize the OCC conflict surface of this batch mutation), but a brief inline comment explaining why would help future maintainers avoid "fixing" it.

📝 Suggested comment
      if (workflow.runResult === undefined) {
+       // Don't check inProgress here — that's done in replayIfReady
+       // to keep this batch mutation's read set small (less OCC conflict).
        candidates.set(workflowId, {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 296 - 303, Add a brief inline
comment next to the condition that populates candidates (the block checking
workflow.runResult === undefined and setting candidates.set(workflowId, {
workflowId, workflowHandle: workflow.workflowHandle, generationNumber:
workflow.generationNumber })) explaining that we intentionally do not check for
other in-progress steps here because that verification is deferred to
replayIfReady to minimize the OCC/conflict surface for this batch mutation;
reference replayIfReady and the candidates map in the comment so future
maintainers understand the design choice and where the in-progress-step check
occurs.
src/client/index.ts (3)

133-163: action() stores handlers by name but doesn't prevent duplicates.

Calling action("myAction", ...) twice with the same name silently overwrites the first handler in executorActionHandlers and is a no-op in batchActionNames (Set). Consider throwing on duplicate names to catch copy-paste errors early.

🔧 Proposed guard
    this.executorActionHandlers.set(name, opts.handler);
+   if (this.executorActionHandlers.has(name)) {
+     throw new Error(`Duplicate batch action name: "${name}"`);
+   }
+   this.executorActionHandlers.set(name, opts.handler);
    this.batchActionNames.add(name);

(Move the has check before the set.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 133 - 163, The action() method in
WorkflowManager currently overwrites existing handlers when called with the same
name; add a duplicate-name guard by checking executorActionHandlers.has(name)
(or batchActionNames.has(name)) before calling executorActionHandlers.set and
batchActionNames.add, and throw a clear Error (e.g., "action 'X' already
registered") to prevent silent overwrites; keep the rest of the logic (storing
handler and returning internalActionGeneric) unchanged.

461-462: Handoff cleanup errors are silently swallowed, potentially leaving stale handoff docs.

The .catch(() => {}) on the handoff cleanup promise means if the "clear" mutation fails, the executorHandoff document for this shard will persist indefinitely. The next executor will handle it (the getHandoff / cleanup logic at line 434 covers stale docs), so it's not a hard bug, but a log would help diagnose slow handoffs.

🔧 Suggested improvement
-    })().catch(() => {});
+    })().catch((e) => {
+      console.error(`Handoff cleanup failed for shard ${shard}:`, e);
+    });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 461 - 462, The silent .catch on the handoff
cleanup IIFE swallows errors when performing the "clear" mutation for the
executorHandoff shard; change the catch to log the error so failed cleanup is
visible (use the same logger used elsewhere, e.g., processLogger or a relevant
logger in this module) and include the shard/executorHandoff id and error
details in the message; look for the anonymous IIFE that performs the "clear"
mutation for executorHandoff and the related getHandoff cleanup logic to add the
logged .catch.

197-219: Epoch check with epoch === undefined allows stale executors to run indefinitely.

When executor() is called without an epoch argument (line 200: epoch?: number), epoch is undefined. In checkEpoch (line 218), undefined === currentEpoch will always be false when currentEpoch !== 0, so the executor would terminate after the first epoch check. But when currentEpoch === 0 (no epoch row exists), checkEpoch returns true (line 217), bypassing the check entirely.

The implicit contract seems to be that executors launched without an explicit epoch are "legacy" and should run freely when no epoch is tracked. However, once startExecutors is called and sets epoch ≥ 1, these legacy executors will be correctly stopped. This is fine as long as the intent is clear — consider documenting this edge case or defaulting epoch to 0 in the args destructuring to make the behavior explicit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 197 - 219, The epoch argument can be
undefined and that makes the intent unclear; default it to 0 so legacy executors
are explicit. In the internalActionGeneric handler (the handler in
internalActionGeneric), change the args destructuring to set epoch = 0 (e.g.,
const { shard, epoch = 0 } = args) so checkEpoch sees 0 for legacy runs and
behavior is explicit; keep the checkEpoch function as-is (it will treat
currentEpoch === 0 as allowing legacy executors).
src/component/workflow.ts (2)

360-365: as any cast on the return bypasses the return validator.

The page array structure is different from what paginator originally returned, so the generic type doesn't match. The as any cast at line 364 silences TypeScript but means the return validator is the only runtime safety net. The validator looks correct, so this works in practice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 360 - 365, The return uses an unsafe
"as any" cast in the function that returns {...result, isDone: result.isDone ||
hitOld, page} which hides a type mismatch between the current page array shape
and the paginator's generic; remove the "as any" cast and fix the types: either
change the function's return generic (or the paginator's generic) to match the
new page element type, or map/transform the page array into the original
expected shape before returning, and update the function signature (and any
related type/interface) so TypeScript enforces the validator at compile time
(refer to the return expression and the "page" property and adjust the
surrounding function/method signature and paginator generic accordingly).

328-358: timelinePage performs N+1 step queries per workflow on the page.

Each workflow in the filtered page triggers a separate ctx.db.query("steps") call (line 330-333). With the default pagination size, this could mean dozens of sub-queries per page. The Promise.all parallelism helps, but in Convex queries are deterministic and execute sequentially within the transaction — Promise.all doesn't parallelize database reads here.

This is acceptable for small page sizes but worth noting as a performance concern if pages grow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 328 - 358, The current page build
does an N+1 query by calling ctx.db.query("steps").withIndex("workflow", q =>
q.eq("workflowId", wf._id")) for each workflow inside the Promise.all; instead,
collect all workflow IDs from filtered, perform a single ctx.db.query("steps")
with an index filter that selects all workflowIds (use the index with an
"in"/batch filter) to fetch all step documents at once, then group the returned
step rows by workflowId and map those grouped results to each workflow when
constructing the page (update the code that builds page, stepDocs, and the step
mapping to use the grouped map rather than per-workflow queries).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/client/index.ts`:
- Around line 407-420: The loop that gives up after reaching MAX_FLUSH_RETRIES
silently discards pendingResults by setting pendingResults.length = 0; before
clearing them, emit a warning log that includes the number of dropped items
(pendingResults.length), the retries count (retries) and that MAX_FLUSH_RETRIES
was hit; implement the log using the module's existing logger (e.g., logger or
processLogger) or console.warn if none is available, then clear pendingResults
as before to preserve behavior in the flush() / retries loop.

In `@src/client/step.ts`:
- Around line 191-205: The current base-name-only matching extracts only the
last segment from safeFunctionName(target.function) and may collide across
modules; update the logic in the block that references safeFunctionName,
batchActionNames, and batchActionName so it matches against the full
safeFunctionName (or otherwise include module/path namespace) instead of just
baseName, or alternatively convert/expect batchActionNames to contain
fully-qualified names and compare against fnName directly; if you choose to keep
base-name matching, add a brief doc comment above this block documenting the
uniqueness requirement for action names.

In `@src/component/coordinator.ts`:
- Around line 65-103: The runWorkflowBatch handler currently calls
ctx.runMutation(workflowHandle, ...) for each item inside the same transaction
(runWorkflowBatch), which risks rolling back the whole batch on transaction-size
or OCC failures; change the loop so each workflow is scheduled as a separate
(isolated) mutation using ctx.scheduler.runAfter(0, ...) rather than invoking
ctx.runMutation inline — mirror the approach used by enqueueWorkflow in pool.ts:
for each {workflowId, generationNumber, workflowHandle} call
ctx.scheduler.runAfter(0, { mutation: workflowHandle, args: { workflowId,
generationNumber } }) (or schedule a small wrapper mutation) and remove the
direct ctx.runMutation calls so each workflow replay runs in its own
transaction.

In `@src/component/journal.ts`:
- Around line 240-252: The executor-sharded branch (inside the if checking
stepArgs.batchActionName && workflow.executorShards) currently inserts an action
task without any retry configuration, ignoring any user-provided stepArgs.retry;
update the insert call (the object passed to ctx.db.insert in this branch) to
include a retry field when the user explicitly provides one (e.g., use
stepArgs.retry if present) while leaving retry absent otherwise to preserve the
"no retry unless opted-in" behavior; ensure this mirrors how query/mutation
paths use retryConfig and that stepArgs.retry is respected before assigning
workId = `executor:${stepId}`.

In `@src/component/taskQueue.ts`:
- Around line 47-165: recordResult and recordResultBatch duplicate the same
journal lookup, validation, update, event logging and task deletion logic;
extract that shared flow into a helper (e.g., processStepResult) that takes
(ctx, console, stepId, result, generationNumber) and performs: journal/ workflow
lookup and validation, mark step.inProgress=false/ completedAt, set
step.runResult, replace the journal entry, emit the console.event, and delete
the queue task (using the existing deleteTask logic), returning the workflow id
and workflowHandle (or null on early return). Replace the duplicated blocks in
recordResult (handler) and recordResultBatch with calls to processStepResult;
keep the replay logic (ctx.runMutation and error patching) in recordResult and
have recordResultBatch loop over results and accumulate candidate workflows to
replay.
- Around line 167-195: startExecutors increments the epoch but doesn't validate
numShards, so numShards <= 0 will increment epoch and schedule no executors; add
validation at the start of the mutation handler to ensure numShards is a
positive integer (e.g., throw a descriptive error or return a failure) before
touching the DB or epoch; refer to the startExecutors handler, the numShards
param, and ctx.scheduler.runAfter/epoch logic so you validate and short-circuit
early (reject non-integers, zero, or negatives) to avoid advancing epoch without
scheduling executors.

In `@src/component/workflow.ts`:
- Around line 266-287: creationTimeBuckets currently iterates all matching
workflows in a single query and can hit Convex read limits; change the handler
in creationTimeBuckets to paginate like countByNamePage: run the same
ctx.db.query("workflows").withIndex("name", q => q.eq("name",
name)).order("desc") but fetch results in fixed-size pages (e.g., pageSize),
loop with the query pagination API (cursor/next) until no more pages or until
wf._creationTime < createdAfter, aggregate counts into the existing buckets Map
per page, then return the sorted entries as before; if you cannot implement
paging now, add a clear comment in the creationTimeBuckets handler referencing
countByNamePage and the 16MB read-limit constraint.

---

Duplicate comments:
In `@package.json`:
- Around line 66-67: Remove the unused runtime dependency "@anthropic-ai/sdk"
from the package.json dependencies block (or move it to devDependencies if used
only for tests/examples); search the repo for any imports/usages of
"@anthropic-ai/sdk" to confirm it's unused, update package.json accordingly
(transfer the entry to devDependencies if needed), then run an install and
update lockfile (npm/yarn/pnpm) so consumers won't install the unused SDK
transitively; ensure package.json's "dependencies" no longer contains
"@anthropic-ai/sdk".

In `@src/client/index.ts`:
- Around line 328-332: The Math.max(task.retry?.maxAttempts ?? 1, 1) in
processTask already guarantees maxAttempts is at least 1 so the retry loop will
run and result will be defined before the push; remove any redundant guard or
comment that handled a zero maxAttempts case (or mark that previous check as
resolved) and keep the current definition of maxAttempts, initialBackoffMs and
base in the processTask function.
- Around line 285-304: Replace the plain console.error and the silent catch with
structured logging and error capture: inside the retry loop around
ctx.runMutation (calling component.taskQueue.replayIfReady) capture the thrown
error (e.g., lastError) in the catch block, and after the loop when
replaySucceeded is false call the project's logger (not console) — e.g.,
ctx.logger.error or the available component/process logger — and log a message
including c.workflowId and c.generationNumber plus the lastError details so
failures are recorded with context.

In `@src/component/taskQueue.ts`:
- Around line 1-6: The unused import MutationCtx was removed from the import
list; ensure there are no remaining references to MutationCtx elsewhere in this
file (e.g., in any function signatures or type annotations) and that the file
only uses the imported symbols mutation and query; if any remaining references
exist replace them with the appropriate context type or remove them, then run a
type-check/build to confirm no unused-import or unresolved-type errors remain.

In `@src/component/workflow.ts`:
- Around line 452-464: The code currently calls ctx.db.normalizeId("steps",
stepId)! and can pass null into the taskQueue index query; update the
executor-prefixed branch (where step.workId and stepId are derived) to first
call const normalizedStepId = ctx.db.normalizeId("steps", stepId) and guard it
(if falsy, skip the DB query/cleanup), then use normalizedStepId in the
.withIndex("by_stepId", q => q.eq("stepId", normalizedStepId)) call and proceed
to delete the taskEntry only when found; this prevents passing null into the
index query and avoids querying with malformed IDs.
- Around line 56-58: The createHandler function (and similarly completeHandler)
currently constructs a logger with createLogger(DEFAULT_LOG_LEVEL) which ignores
the user-configured log level; replace those calls so they obtain the logger via
getDefaultLogger(ctx) instead (e.g., change const console =
createLogger(DEFAULT_LOG_LEVEL) to const console = getDefaultLogger(ctx)),
ensuring ctx is passed into the function scope where used; update both
createHandler and completeHandler to use getDefaultLogger(ctx) consistently like
getStatus and cleanup do.

---

Nitpick comments:
In `@src/client/index.ts`:
- Around line 133-163: The action() method in WorkflowManager currently
overwrites existing handlers when called with the same name; add a
duplicate-name guard by checking executorActionHandlers.has(name) (or
batchActionNames.has(name)) before calling executorActionHandlers.set and
batchActionNames.add, and throw a clear Error (e.g., "action 'X' already
registered") to prevent silent overwrites; keep the rest of the logic (storing
handler and returning internalActionGeneric) unchanged.
- Around line 461-462: The silent .catch on the handoff cleanup IIFE swallows
errors when performing the "clear" mutation for the executorHandoff shard;
change the catch to log the error so failed cleanup is visible (use the same
logger used elsewhere, e.g., processLogger or a relevant logger in this module)
and include the shard/executorHandoff id and error details in the message; look
for the anonymous IIFE that performs the "clear" mutation for executorHandoff
and the related getHandoff cleanup logic to add the logged .catch.
- Around line 197-219: The epoch argument can be undefined and that makes the
intent unclear; default it to 0 so legacy executors are explicit. In the
internalActionGeneric handler (the handler in internalActionGeneric), change the
args destructuring to set epoch = 0 (e.g., const { shard, epoch = 0 } = args) so
checkEpoch sees 0 for legacy runs and behavior is explicit; keep the checkEpoch
function as-is (it will treat currentEpoch === 0 as allowing legacy executors).

In `@src/component/coordinator.ts`:
- Around line 46-62: The coordinator currently always reschedules itself at the
end of the loop (the call to ctx.scheduler.runAfter(0,
internal.coordinator.coordinator)), which wastes one scheduler invocation when
fewer than COORDINATOR_READ_BATCH workflows were returned; change the logic in
the coordinator function to only call ctx.scheduler.runAfter(...,
internal.coordinator.coordinator) when ready.length === COORDINATOR_READ_BATCH
(i.e., a full batch was claimed), leaving out the reschedule when fewer items
were found so the scheduler isn’t invoked unnecessarily.

In `@src/component/pool.ts`:
- Around line 291-321: Extract the repeated try/catch + db.patch
failure-handling into a new helper function (e.g., runWorkflowSafe) and replace
the duplicated blocks in directRunWorkflow, onCompleteHandler, and
runWorkflowBatch to call it; runWorkflowSafe(ctx, workflow, generationNumber)
should invoke ctx.runMutation(workflow.workflowHandle as
FunctionHandle<"mutation">, { workflowId: workflow.id, generationNumber }),
catch any error, normalize the message (e instanceof Error ? e.message :
String(e)), log it (reuse createLogger(DEFAULT_LOG_LEVEL) or accept a logger
param), and persist the failure with ctx.db.patch(workflow.id, { runResult: {
kind: "failed", error } }); ensure the helper returns success/failure so callers
can act accordingly.

In `@src/component/taskQueue.ts`:
- Around line 346-378: The diagnose handler uses a hardcoded default shards =
numShards ?? 40 which can mismatch deployment; update diagnose (handler) to not
assume 40 — either make numShards a required arg or read the real shard count
from the application's configuration/source (e.g., use the configured shardCount
value from ctx.settings or config service) and assign shards = numShards ??
config.shardCount, and remove the magic number 40; ensure the function
names/variables referenced are diagnose, numShards, and shards so reviewers can
locate and verify the change.
- Around line 296-303: Add a brief inline comment next to the condition that
populates candidates (the block checking workflow.runResult === undefined and
setting candidates.set(workflowId, { workflowId, workflowHandle:
workflow.workflowHandle, generationNumber: workflow.generationNumber }))
explaining that we intentionally do not check for other in-progress steps here
because that verification is deferred to replayIfReady to minimize the
OCC/conflict surface for this batch mutation; reference replayIfReady and the
candidates map in the comment so future maintainers understand the design choice
and where the in-progress-step check occurs.

In `@src/component/workflow.ts`:
- Around line 360-365: The return uses an unsafe "as any" cast in the function
that returns {...result, isDone: result.isDone || hitOld, page} which hides a
type mismatch between the current page array shape and the paginator's generic;
remove the "as any" cast and fix the types: either change the function's return
generic (or the paginator's generic) to match the new page element type, or
map/transform the page array into the original expected shape before returning,
and update the function signature (and any related type/interface) so TypeScript
enforces the validator at compile time (refer to the return expression and the
"page" property and adjust the surrounding function/method signature and
paginator generic accordingly).
- Around line 328-358: The current page build does an N+1 query by calling
ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId", wf._id"))
for each workflow inside the Promise.all; instead, collect all workflow IDs from
filtered, perform a single ctx.db.query("steps") with an index filter that
selects all workflowIds (use the index with an "in"/batch filter) to fetch all
step documents at once, then group the returned step rows by workflowId and map
those grouped results to each workflow when constructing the page (update the
code that builds page, stepDocs, and the step mapping to use the grouped map
rather than per-workflow queries).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b1552ab and f0bd00a.

⛔ Files ignored due to path filters (4)
  • package-lock.json is excluded by !**/package-lock.json
  • src/component/_generated/api.ts is excluded by !**/_generated/**
  • src/component/_generated/component.ts is excluded by !**/_generated/**
  • src/component/_generated/server.ts is excluded by !**/_generated/**
📒 Files selected for processing (11)
  • package.json
  • src/client/index.ts
  • src/client/step.ts
  • src/client/workflowMutation.ts
  • src/component/coordinator.ts
  • src/component/event.ts
  • src/component/journal.ts
  • src/component/pool.ts
  • src/component/schema.ts
  • src/component/taskQueue.ts
  • src/component/workflow.ts

Comment thread src/client/index.ts
Comment on lines +407 to +420
let retries = 0;
while (pendingResults.length > 0) {
await flush();
if (pendingResults.length > 0) {
retries++;
if (retries >= MAX_FLUSH_RETRIES) {
// Give up on remaining items — tasks stay in queue,
// next executor run will re-process them.
pendingResults.length = 0;
break;
}
await new Promise((r) => setTimeout(r, 200));
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Dropped results after MAX_FLUSH_RETRIES are silently discarded — add a warning log.

When pendingResults.length = 0 is reached at line 415, results for in-flight tasks are permanently lost from this executor run. The comment says tasks stay in the queue for re-processing, which is correct for idempotent queries/mutations, but for actions the duplicate execution may have side effects. At minimum, log the count of dropped items to aid debugging.

🛡️ Proposed fix
              if (retries >= MAX_FLUSH_RETRIES) {
                // Give up on remaining items — tasks stay in queue,
                // next executor run will re-process them.
+               console.error(
+                 `Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`,
+               );
                pendingResults.length = 0;
                break;
              }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let retries = 0;
while (pendingResults.length > 0) {
await flush();
if (pendingResults.length > 0) {
retries++;
if (retries >= MAX_FLUSH_RETRIES) {
// Give up on remaining items — tasks stay in queue,
// next executor run will re-process them.
pendingResults.length = 0;
break;
}
await new Promise((r) => setTimeout(r, 200));
}
}
let retries = 0;
while (pendingResults.length > 0) {
await flush();
if (pendingResults.length > 0) {
retries++;
if (retries >= MAX_FLUSH_RETRIES) {
// Give up on remaining items — tasks stay in queue,
// next executor run will re-process them.
console.error(
`Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${MAX_FLUSH_RETRIES} retries`,
);
pendingResults.length = 0;
break;
}
await new Promise((r) => setTimeout(r, 200));
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 407 - 420, The loop that gives up after
reaching MAX_FLUSH_RETRIES silently discards pendingResults by setting
pendingResults.length = 0; before clearing them, emit a warning log that
includes the number of dropped items (pendingResults.length), the retries count
(retries) and that MAX_FLUSH_RETRIES was hit; implement the log using the
module's existing logger (e.g., logger or processLogger) or console.warn if none
is available, then clear pendingResults as before to preserve behavior in the
flush() / retries loop.

Comment thread src/client/step.ts
Comment on lines +191 to +205
// Detect batch actions by checking if the function name's base matches
// a registered batch action name.
let batchActionName: string | undefined;
if (
this.batchActionNames &&
target.kind === "function" &&
target.functionType === "action"
) {
const fnName = safeFunctionName(target.function);
const parts = fnName.split(/[:/]/);
const baseName = parts[parts.length - 1];
if (this.batchActionNames.has(baseName)) {
batchActionName = baseName;
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Base-name-only matching could collide across modules.

safeFunctionName may return paths like "convex/actions:doWork". Taking only the last segment after splitting on [:/] means two different modules both exporting a doWork action would match the same batchActionNames entry. If this is intentional (users must ensure uniqueness), a brief doc comment would help.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/step.ts` around lines 191 - 205, The current base-name-only
matching extracts only the last segment from safeFunctionName(target.function)
and may collide across modules; update the logic in the block that references
safeFunctionName, batchActionNames, and batchActionName so it matches against
the full safeFunctionName (or otherwise include module/path namespace) instead
of just baseName, or alternatively convert/expect batchActionNames to contain
fully-qualified names and compare against fnName directly; if you choose to keep
base-name matching, add a brief doc comment above this block documenting the
uniqueness requirement for action names.

Comment on lines +65 to +103
export const runWorkflowBatch = internalMutation({
args: {
items: v.array(
v.object({
workflowId: v.id("workflows"),
generationNumber: v.number(),
workflowHandle: v.string(),
}),
),
},
returns: v.null(),
handler: async (ctx, { items }) => {
const console = createLogger(DEFAULT_LOG_LEVEL);
for (const { workflowId, generationNumber, workflowHandle } of items) {
const workflow = await ctx.db.get(workflowId);
if (
!workflow ||
workflow.runResult ||
workflow.generationNumber !== generationNumber
) {
continue;
}
try {
await ctx.runMutation(
workflowHandle as FunctionHandle<"mutation">,
{ workflowId, generationNumber },
);
} catch (e) {
const error =
e instanceof Error ? e.message : `Unknown error: ${String(e)}`;
console.error(
`Error running workflow ${workflowId}: ${error}`,
);
await ctx.db.patch(workflowId, {
runResult: { kind: "failed", error },
});
}
}
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Running up to 50 workflow mutations in a single transaction is a blast-radius risk.

Each ctx.runMutation(workflowHandle, ...) runs a full workflow replay within the same outer transaction. If the accumulated writes exceed Convex's transaction size limit, or an OCC conflict occurs on any touched document, the entire batch of up to 50 workflows rolls back — even the ones that completed successfully before the failure. Per-item try/catch only handles exceptions from individual mutations, not transaction-level failures.

Consider scheduling each item (or smaller sub-batches) as a separate mutation via ctx.scheduler.runAfter(0, ...) to isolate failures, similar to how enqueueWorkflow in pool.ts works.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/coordinator.ts` around lines 65 - 103, The runWorkflowBatch
handler currently calls ctx.runMutation(workflowHandle, ...) for each item
inside the same transaction (runWorkflowBatch), which risks rolling back the
whole batch on transaction-size or OCC failures; change the loop so each
workflow is scheduled as a separate (isolated) mutation using
ctx.scheduler.runAfter(0, ...) rather than invoking ctx.runMutation inline —
mirror the approach used by enqueueWorkflow in pool.ts: for each {workflowId,
generationNumber, workflowHandle} call ctx.scheduler.runAfter(0, { mutation:
workflowHandle, args: { workflowId, generationNumber } }) (or schedule a small
wrapper mutation) and remove the direct ctx.runMutation calls so each workflow
replay runs in its own transaction.

Comment thread src/component/journal.ts
Comment on lines +240 to +252
if (stepArgs.batchActionName && workflow.executorShards) {
// Route through sharded task queue for executor-driven execution.
const shard = shardForWorkflow(workflow._id as string, workflow.executorShards);
await ctx.db.insert("taskQueue", {
shard,
functionType: "action",
handle: stepArgs.batchActionName,
args: step.args,
stepId,
workflowId: workflow._id,
generationNumber,
});
workId = `executor:${stepId}`;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Action tasks inserted without retry config — confirm this is intentional.

Query and mutation paths set retry: retryConfig, but action tasks routed to the executor omit the retry field entirely. The PR description says "actions no retry unless opted-in," but there's no mechanism here for opting in — a user-provided stepArgs.retry object is silently ignored for executor-routed actions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/journal.ts` around lines 240 - 252, The executor-sharded branch
(inside the if checking stepArgs.batchActionName && workflow.executorShards)
currently inserts an action task without any retry configuration, ignoring any
user-provided stepArgs.retry; update the insert call (the object passed to
ctx.db.insert in this branch) to include a retry field when the user explicitly
provides one (e.g., use stepArgs.retry if present) while leaving retry absent
otherwise to preserve the "no retry unless opted-in" behavior; ensure this
mirrors how query/mutation paths use retryConfig and that stepArgs.retry is
respected before assigning workId = `executor:${stepId}`.

Comment on lines +47 to +165
export const recordResult = mutation({
args: {
stepId: v.id("steps"),
result: vResultValidator,
generationNumber: v.number(),
},
returns: v.null(),
handler: async (ctx, { stepId, result, generationNumber }) => {
const console = createLogger(DEFAULT_LOG_LEVEL);

// Helper: always delete the task from the queue, even on early return.
// Stale/orphaned tasks must not block the shard permanently.
const deleteTask = async () => {
const taskEntry = await ctx.db
.query("taskQueue")
.withIndex("by_stepId", (q) => q.eq("stepId", stepId))
.unique();
if (taskEntry) {
await ctx.db.delete(taskEntry._id);
}
};

// 1. Record the step result (same logic as pool.ts:onCompleteHandler)
const journalEntry = await ctx.db.get(stepId);
if (!journalEntry) {
console.error(`Journal entry not found: ${stepId}`);
await deleteTask();
return null;
}
const workflowId = journalEntry.workflowId;
const workflow = await ctx.db.get(workflowId);
if (!workflow) {
console.error(`Workflow not found: ${workflowId}`);
await deleteTask();
return null;
}
if (workflow.generationNumber !== generationNumber) {
console.error(
`Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`,
);
await deleteTask();
return null;
}
if (!journalEntry.step.inProgress) {
console.error(
`Step finished but journal entry not in progress: ${stepId}`,
);
await deleteTask();
return null;
}

journalEntry.step.inProgress = false;
journalEntry.step.completedAt = Date.now();
switch (result.kind) {
case "success":
journalEntry.step.runResult = {
kind: "success",
returnValue: result.returnValue,
};
break;
case "failed":
journalEntry.step.runResult = {
kind: "failed",
error: result.error,
};
break;
case "canceled":
journalEntry.step.runResult = {
kind: "canceled",
};
break;
}
await ctx.db.replace(journalEntry._id, journalEntry);

console.event("stepCompleted", {
workflowId,
workflowName: workflow.name,
status: result.kind,
stepName: journalEntry.step.name,
stepNumber: journalEntry.stepNumber,
durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt,
});

// 2. Delete the task from the queue (atomically with result recording)
await deleteTask();

if (workflow.runResult !== undefined) {
return null;
}

// 3. Check if this was the last in-progress step → replay workflow inline
const otherInProgress = await ctx.db
.query("steps")
.withIndex("inProgress", (q) =>
q.eq("step.inProgress", true).eq("workflowId", workflowId),
)
.first();
if (!otherInProgress) {
try {
await ctx.runMutation(
workflow.workflowHandle as FunctionHandle<"mutation">,
{
workflowId: workflow._id,
generationNumber: workflow.generationNumber,
},
);
} catch (e) {
const error =
e instanceof Error ? e.message : `Unknown error: ${String(e)}`;
console.error(`Error running workflow ${workflowId}: ${error}`);
await ctx.db.patch(workflowId, {
runResult: { kind: "failed", error },
});
}
}

return null;
},
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Significant code duplication between recordResult and recordResultBatch.

The journal lookup, validation, result recording, event logging, and task deletion logic (lines 70–131 vs. 243–295) is nearly identical. If a bug is fixed in one path, it needs to be mirrored in the other.

Consider extracting a shared helper, e.g.:

async function processStepResult(
  ctx: MutationCtx,
  console: Logger,
  stepId: Id<"steps">,
  result: RunResult,
  generationNumber: number,
): Promise<{ workflowId: Id<"workflows">; workflowHandle: string } | null> {
  // shared validation, journal update, event logging, task deletion
}

Then recordResult calls it + does inline replay, and recordResultBatch calls it in a loop + accumulates candidates.

Also applies to: 214-308

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 47 - 165, recordResult and
recordResultBatch duplicate the same journal lookup, validation, update, event
logging and task deletion logic; extract that shared flow into a helper (e.g.,
processStepResult) that takes (ctx, console, stepId, result, generationNumber)
and performs: journal/ workflow lookup and validation, mark
step.inProgress=false/ completedAt, set step.runResult, replace the journal
entry, emit the console.event, and delete the queue task (using the existing
deleteTask logic), returning the workflow id and workflowHandle (or null on
early return). Replace the duplicated blocks in recordResult (handler) and
recordResultBatch with calls to processStepResult; keep the replay logic
(ctx.runMutation and error patching) in recordResult and have recordResultBatch
loop over results and accumulate candidate workflows to replay.

Comment on lines +167 to +195
export const startExecutors = mutation({
args: {
executorHandle: v.string(),
numShards: v.number(),
},
returns: v.number(), // returns the new epoch
handler: async (ctx, { executorHandle, numShards }) => {
// Increment executor epoch — old executors with stale epochs will terminate.
const existing = await ctx.db
.query("executorEpoch")
.first();
let epoch: number;
if (existing) {
epoch = existing.epoch + 1;
await ctx.db.patch(existing._id, { epoch });
} else {
epoch = 1;
await ctx.db.insert("executorEpoch", { epoch });
}
for (let i = 0; i < numShards; i++) {
await ctx.scheduler.runAfter(
0,
executorHandle as FunctionHandle<"action">,
{ shard: i, epoch },
);
}
return epoch;
},
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

startExecutors doesn't validate numShards — zero or negative values produce no executors.

If numShards is 0 or negative, the for loop at line 186 never executes and no executors are scheduled. The epoch is still incremented, which would terminate any existing executors without replacement.

🛡️ Proposed guard
  handler: async (ctx, { executorHandle, numShards }) => {
+   if (numShards < 1) {
+     throw new Error(`numShards must be >= 1, got ${numShards}`);
+   }
    // Increment executor epoch — old executors with stale epochs will terminate.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const startExecutors = mutation({
args: {
executorHandle: v.string(),
numShards: v.number(),
},
returns: v.number(), // returns the new epoch
handler: async (ctx, { executorHandle, numShards }) => {
// Increment executor epoch — old executors with stale epochs will terminate.
const existing = await ctx.db
.query("executorEpoch")
.first();
let epoch: number;
if (existing) {
epoch = existing.epoch + 1;
await ctx.db.patch(existing._id, { epoch });
} else {
epoch = 1;
await ctx.db.insert("executorEpoch", { epoch });
}
for (let i = 0; i < numShards; i++) {
await ctx.scheduler.runAfter(
0,
executorHandle as FunctionHandle<"action">,
{ shard: i, epoch },
);
}
return epoch;
},
});
export const startExecutors = mutation({
args: {
executorHandle: v.string(),
numShards: v.number(),
},
returns: v.number(), // returns the new epoch
handler: async (ctx, { executorHandle, numShards }) => {
if (numShards < 1) {
throw new Error(`numShards must be >= 1, got ${numShards}`);
}
// Increment executor epoch — old executors with stale epochs will terminate.
const existing = await ctx.db
.query("executorEpoch")
.first();
let epoch: number;
if (existing) {
epoch = existing.epoch + 1;
await ctx.db.patch(existing._id, { epoch });
} else {
epoch = 1;
await ctx.db.insert("executorEpoch", { epoch });
}
for (let i = 0; i < numShards; i++) {
await ctx.scheduler.runAfter(
0,
executorHandle as FunctionHandle<"action">,
{ shard: i, epoch },
);
}
return epoch;
},
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 167 - 195, startExecutors increments
the epoch but doesn't validate numShards, so numShards <= 0 will increment epoch
and schedule no executors; add validation at the start of the mutation handler
to ensure numShards is a positive integer (e.g., throw a descriptive error or
return a failure) before touching the DB or epoch; refer to the startExecutors
handler, the numShards param, and ctx.scheduler.runAfter/epoch logic so you
validate and short-circuit early (reject non-integers, zero, or negatives) to
avoid advancing epoch without scheduling executors.

Comment thread src/component/workflow.ts
Comment on lines +266 to +287
export const creationTimeBuckets = query({
args: {
name: v.string(),
createdAfter: v.number(),
bucketMs: v.number(),
},
returns: v.array(v.object({ offsetSec: v.number(), count: v.number() })),
handler: async (ctx, { name, createdAfter, bucketMs }) => {
const buckets = new Map<number, number>();
for await (const wf of ctx.db
.query("workflows")
.withIndex("name", (q) => q.eq("name", name))
.order("desc")) {
if (wf._creationTime < createdAfter) break;
const bucket = Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000);
buckets.set(bucket, (buckets.get(bucket) || 0) + 1);
}
return [...buckets.entries()]
.map(([offsetSec, count]) => ({ offsetSec, count }))
.sort((a, b) => a.offsetSec - b.offsetSec);
},
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

creationTimeBuckets scans unbounded rows — risk hitting Convex read limits on large datasets.

Unlike countByNamePage which uses pagination, creationTimeBuckets iterates all matching workflows in a single query call with no pagination or row-count limit. For popular workflow names with many runs, this could exceed the 16MB read limit and throw. Consider adding a paginated variant similar to countByNamePage, or at minimum document the constraint.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 266 - 287, creationTimeBuckets
currently iterates all matching workflows in a single query and can hit Convex
read limits; change the handler in creationTimeBuckets to paginate like
countByNamePage: run the same ctx.db.query("workflows").withIndex("name", q =>
q.eq("name", name)).order("desc") but fetch results in fixed-size pages (e.g.,
pageSize), loop with the query pagination API (cursor/next) until no more pages
or until wf._creationTime < createdAfter, aggregate counts into the existing
buckets Map per page, then return the sorted entries as before; if you cannot
implement paging now, add a clear comment in the creationTimeBuckets handler
referencing countByNamePage and the 16MB read-limit constraint.

When replayIfReady fails after 3 immediate retries, push the candidate
to a rescue queue instead of dropping it. The flush loop retries rescued
candidates every 500ms, and waitUntilIdle drains them before shutdown.
This prevents workflows from getting permanently stuck when all steps
complete but replay fails due to transient OCC conflicts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@ianmacartney ianmacartney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I like the pattern but don't really like the fixed executor 10m strategy - seems too flaky & prone to errors / outages. would need a lot of hardening, - I think it makes sense as a separate component - and to make Workflow more flexible for what the executor of each step is - scheduler, workpool, executor shards, or BYO - just need a good interface for the equivalent of a Promise for async suspend-able execution

Comment thread src/component/workflow.ts
...schedulerOptions,
},
);
await ensureCoordinatorRunning(ctx);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we no longer enqueue here

Comment thread src/component/pool.ts
Comment on lines +186 to +191
const otherInProgress = await ctx.db
.query("steps")
.withIndex("inProgress", (q) =>
q.eq("step.inProgress", true).eq("workflowId", workflowId),
)
.first();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: in a promise.all situation this could keep hitting OCCs on onComplete until they're all in - could cause issues when onComplete is called directly. one thing that could help here is a sense of "de-duping" tasks in the workpool, based on the function handle & args

Comment thread src/component/journal.ts Outdated
stepId,
workflowId: workflow._id,
generationNumber,
retry: retryConfig,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there a retry config on the mutation?

Comment thread src/component/journal.ts Outdated
stepId,
workflowId: workflow._id,
generationNumber,
retry: retryConfig,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry on a query? why?

Comment thread src/client/index.ts
Comment on lines +157 to +164
return internalActionGeneric({
handler: async () => {
throw new Error(
`${name} should not be called directly — it runs inside executors`,
);
},
}) as any;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you could not return this and it'd work just fine

sethconvex and others added 15 commits February 25, 2026 09:02
- Add WorkflowRateLimitError + per-step-type rate-limit gates
- Fix executor straggler bug (tasks not completing during handoff)
- Serialized flush→replay to eliminate OCC conflicts at scale
- Inline replay into recordResultBatch + tune flush params
- Batched replay mutations for throughput
- Sharded task queue + executor-driven chaining

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…plementation

[checkpoint] Manual: Fixed executor idle timeout and auto-start from benchmark

feat: sharded task queue with executor-driven workflow chaining

Replace single-document BatchWorkpool coordination with a sharded task
queue. Each executor action reads from a non-overlapping shard, eliminating
cross-executor OCC contention. Key changes:

- Atomic task lifecycle: claimTasks reads without deleting, recordResult
  atomically records step result + deletes task + replays workflow
- Epoch-based executor dedup prevents duplicate executors from accumulating
- Stale tasks always cleaned up on error (no permanent shard blocking)
- Executors never idle-terminate, always self-reschedule at 8 min
- Faster coordinator fan-out (batch size 50, read batch 1000)
- Benchmark harness with parallel batch creation via scheduler

10K 4-step workflows: ~120s (6x improvement over BatchWorkpool baseline)
20K 4-step workflows: ~390s with 40 shards, 0 failures

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: batched result recording, consistent sharding, and query fixes

- Batch recordResults into single mutation per flush (500ms interval,
  max 50 items) to eliminate CommitterFullError under high throughput
- Deterministic shard assignment (hash workflowId) so all steps for one
  workflow go to the same executor, eliminating cross-shard OCC conflicts
- Split recording from replay: recordResultBatch returns candidates,
  replayIfReady runs per-workflow replay as separate small mutations
- Change claimTasks from mutation to query (read-only, much cheaper)
- Add countByName query with createdAfter filter for reliable status
  counting in a single transaction (no pagination drift)
- Fix diagnose query to use indexed per-shard queries instead of
  table scan

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

fix: retry failed replayIfReady calls to prevent stuck workflows

When replayIfReady fails (OCC conflict or transient error), the
workflow's tasks are already deleted from the queue. Without retry,
nothing will ever trigger the replay again, leaving the workflow
stuck forever.

Now retries failed replays up to 3 times with increasing backoff
(200ms, 400ms, 600ms). This eliminated the 5/20000 stuck workflows
from the previous benchmark run.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: benchmark timeline visualization served from HTTP action

Add timelinePage query to the workflow component that returns compact
timeline data (workflow id, creation time, step timings) with efficient
desc-order pagination and createdAfter filtering.

Serve a canvas-based visualization from /benchmark-viz HTTP route that
shows every workflow as a 1px colored bar (blue=extract, green=analyze-a,
orange=analyze-b, red=summarize) with live stats and time scale.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: vizUrl in startBenchmark output, stable timeline polling, half-pixel rows

Return clickable viz URL from startBenchmark. Switch timeline from
unstable paginated subscriptions to polling (status stays reactive).
Add vertical labels every 1000 workflows. Use CSS scaleY(0.5) for
half-pixel row height.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: pre-emptive executor restart and transparent viz colors

Schedule successor executor before draining in-flight tasks to eliminate
the throughput gap between executor generations. Reduce RESCHEDULE_MS
from 8 to 5 minutes for more frequent handoffs. Use transparent colors
for analyze-a (green) and analyze-b (amber) so concurrent steps are
visually distinguishable when overlaid.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: jittered executor restarts and 10x smaller viz rows

Add 0-60s jitter to RESCHEDULE_MS so all 40 shards don't restart
simultaneously. Schedule successor before draining so it cold-starts
during the drain period. Reduce viz row scaleY from 0.5 to 0.1 to
fit 20k workflows in view.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

[checkpoint] Auto-save at 11:59:31 AM

feat: executor handoff handshake, drain-before-exit, and LLM benchmark mode

Three-phase handoff protocol eliminates throughput gaps at executor restart
boundaries: old executor keeps claiming tasks until successor signals ready,
then yields cleanly. Stale-epoch exits now drain remaining shard tasks to
prevent stranded work.

Also adds creationTimeBuckets diagnostic query, increases batch size to 1000,
and introduces a real Claude LLM benchmark mode alongside the simulated one.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: real Claude LLM benchmark mode, queue/execution viz, dynamic scaling

- Add benchmarkMode ("simulated" | "real") flag to all workflows and actions
- Add callClaude() with Anthropic SDK, 20 topic corpus, task-specific prompts
- SDK configured with maxRetries: 10 for rate limit resilience
- Extract executorStartedAt from step results in timelinePage query
- Viz: dim bars for queue wait vs bright bars for execution time
- Viz: dynamic Y-axis scaling to fit any workflow count on one screen
- Disable batch mode (BatchWorkpool not in published workpool)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

[checkpoint] Auto-save at 12:34:31 PM

chore: tune executor concurrency limits and Anthropic retry count

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: paginated status counting and executor improvements

- Add countByNamePage to component for paginated workflow counting
  that stays under Convex's 16MB read limit at 20k+ workflows
- Convert benchmarkStatus to action that loops over pages
- Replace viz's reactive subscription with paginated status polling
- Add retry support and query/mutation routing to executor task queue
- Add executionStartedAt to timeline for queue vs execution distinction
- Fix BatchWorkpool import (type alias for unpublished export)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

feat: streaming executor pipeline with in-flight tracking

Replace batch-and-drain pattern (claim → feed → waitUntilIdle → loop)
with streaming pattern that continuously fills idle slots. Adds local
inFlightStepIds Set to filter already-claimed tasks client-side,
removes waitUntilIdle() from main loop and handoff claiming loop
(kept only at exit/drain points). Adds adaptive polling: 150ms when
tasks are active, 500ms when idle.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

[checkpoint] Auto-save at 5:00:44 PM

[checkpoint] Auto-save at 5:20:44 PM

[checkpoint] Auto-save at 5:35:44 PM

[checkpoint] Auto-save at 5:40:44 PM

[checkpoint] Auto-save at 6:14:45 PM

[checkpoint] Auto-save at 6:29:45 PM

[checkpoint] Auto-save at 7:12:18 PM

MAYBE improvement: fix task re-claim bug + tune executor constants

- Remove premature inFlightStepIds.delete from processTask. Previously,
  completed tasks were removed from tracking immediately but stayed in
  the DB queue until flush. The claiming loop would re-claim them 3-6x
  each, wasting concurrency slots on duplicate executions.
  Now only deleted in flush() after recordResultBatch confirms removal.

- CLAIM_LIMIT 150→800: see full shard queue, no invisible tasks
- MAX_CONCURRENCY 750→200: limit event loop contention
- RESCHEDULE_MS 5min→8min: avoid handoff during benchmark window

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

[checkpoint] Auto-save at 7:27:17 PM

[checkpoint] Auto-save at 7:42:18 PM

[checkpoint] Auto-save at 7:47:18 PM

[checkpoint] Auto-save at 7:52:17 PM

[checkpoint] Auto-save at 7:57:18 PM

[checkpoint] Auto-save at 8:07:18 PM

[checkpoint] Auto-save at 8:37:18 PM

Add executor tuning guide and d1024 benchmark results

Documents all executor-mode knobs (executorShards, MAX_CONCURRENCY,
FLUSH_BATCH_SIZE, etc.) with tradeoffs and tuning profiles for different
workload types: short steps, LLM API calls, long-running steps, and
high-volume bursty creation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Add comprehensive executor mode clean-room implementation spec

Complete specification covering: architecture, schema changes, component
mutations (claimTasks, recordResultBatch, replayIfReady), client-side
WorkflowManager changes, the full executor action with flush system and
bounded-concurrency pool, handoff protocol, shard assignment, cancel
support, benchmark visualizer with timeline query, pitfalls discovered
through testing, and all constants with rationale.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… + throughput tuning)

Key changes from base:
- Executor-mode workflows bypass coordinator (inline creation)
- MAX_CONCURRENCY=500, CLAIM_LIMIT=1500, FLUSH_INTERVAL_MS=200
- BATCH_CREATE_SIZE=100 (smaller batches for scheduler concurrency)
- executorFinishedAt tracking + write-delay viz
- Replays coupled with flush (50 per batch, 3 retries with backoff)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace N individual replayIfReady mutations per flush with a single
replayBatchIfReady call, fixing "too many concurrent commits" rate
limiting that was losing ~11 workflows per 20k run. Bump
FLUSH_BATCH_SIZE from 50 to 200 for higher throughput.

Visualizer now streams pages incrementally (draws after each 1000-
workflow page) instead of waiting for all 20k to load.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously the visualizer cleared allWorkflows at the start of each
poll cycle, causing a blank flash. Now it builds into a separate array
and only swaps once the incoming count exceeds the previous frame.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
recordResultBatch now accepts replayInline flag to replay ready
workflows in the same mutation, eliminating a separate round-trip.
FLUSH_BATCH_SIZE=50, FLUSH_INTERVAL_MS=100ms for faster flush cycles.

20k benchmark: 20000/20000 completed, 0 failed, ~102-145s span.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move replay out of recordResultBatch into executor-side
replayBatchIfReady calls, serialized after each flush batch.
This eliminates the inProgress index reads from recordResultBatch
(zero OCC surface) while keeping inter-step latency tight.

Key changes:
- recordResultBatch returns replay candidates instead of replaying inline
- Executor calls replayBatchIfReady after each flush (serialized, not concurrent)
- Viz: add ?sort=shard mode, batched page fetching, error backoff
- Benchmark: smaller batch creation (25), skipExecutorStart flag, inline small counts

20k benchmark: p50=76.7s, p90=82.2s, p99=86.4s, slowest=92s, 0 failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New canvas above the timeline showing a stacked area chart of
concurrent workflows in each phase (extract, analyze-a, analyze-b,
summarize) over time. Y-axis labels show peak concurrency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Documents benchmark results, architecture, tuning constants,
and failed experiments for the executor flush/replay pipeline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add drain loop on both executor exit paths (handoff and epoch-mismatch)
that retries up to 10 times with 500ms delays, catching tasks created by
replay sub-mutations during shutdown. Fixes issue where 3+ workflows
would get permanently stuck in the queue.

Also: restore 100 shards, add sort-by-shard toggle and per-phase step
counts to benchmark viz.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Executor now supports rate-limit-aware retries:
- WorkflowRateLimitError lets step handlers signal rate limits
- Per-action-handle gates prevent thundering herd on retry
- Benchmark uses shared gate pattern for Anthropic API 429s
- Switch benchmark model to haiku, bump convex to 1.32

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
These belong in the second PR for documentation and testing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Bug fix: replay candidates that fail replayBatchIfReady twice now go
into a rescue queue instead of being silently dropped. The flush loop
retries the rescue queue every tick, and waitUntilIdle drains it before
executor shutdown. This prevents workflows from getting permanently
stranded by transient OCC failures.

Review feedback (Ian):
- Remove retry config from query/mutation task queue inserts — queries
  are deterministic and mutations auto-retry on OCC. Only actions get
  application-level retry.
- Remove unnecessary explicit return type on executor().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three fixes to ensure every workflow reaches a terminal state:

1. replayInline in recordResultBatch: replay workflows within the same
   mutation that records step results, eliminating OCC conflicts between
   recordResultBatch and replayBatchIfReady.

2. Durable scheduler safety net: when replayInline skips a candidate
   (other steps still in-progress from a concurrent batch), schedule a
   delayed replayIfReady via ctx.scheduler.runAfter(10s). This is
   committed atomically with the result recording, so it survives
   executor crashes and hard 10-minute action timeouts.

3. Lower RESCHEDULE_MS from 8min to 5min, giving more headroom for
   drain and handoff before the 10-minute hard kill.

Also returns unreplayed candidates to the client (instead of []) so
the existing replayBatchIfReady + rescue queue path provides an
additional fast retry before the scheduled safety net fires.

Verified: 10k real Claude benchmark → 9940 completed, 60 failed
(rate limits), 0 stuck workflows (previously 3-15 stuck per run).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sethconvex sethconvex marked this pull request as ready for review February 26, 2026 16:30
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (6)
package.json (1)

66-67: ⚠️ Potential issue | 🟡 Minor

Reintroduced @anthropic-ai/sdk should be usage-verified before release.

Line 66 adds this runtime dependency again. This was previously removed as unused; if it’s still unused, it increases supply-chain and install surface for no functional gain.

#!/bin/bash
set -euo pipefail

# Verify direct imports/usages of Anthropic SDK across source, examples, and docs.
rg -n --hidden \
  -g '!**/node_modules/**' \
  -g '!**/dist/**' \
  '@anthropic-ai/sdk|from[[:space:]]+["'\'']@anthropic-ai/sdk["'\'']|new[[:space:]]+Anthropic|anthropic\.' \
  --type ts --type js --type tsx --type jsx --type json --type md
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@package.json` around lines 66 - 67, The package reintroduces the runtime
dependency "@anthropic-ai/sdk" into package.json but the PR comment requests
usage verification; search the repo for direct imports/usages of
"@anthropic-ai/sdk" (import, require, new Anthropic, anthropic.*) across source,
examples, docs and tests, and if no usage is found remove the dependency from
package.json (and any lockfile update) or if it is required, add a short code
reference (file path and symbol) and a minimal test/example showing its use plus
a package.json comment or release note explaining why it’s needed; ensure any
CI/test changes reflect the dependency addition if kept.
src/client/step.ts (1)

191-205: ⚠️ Potential issue | 🟠 Major

Base-name matching can collide across modules and misroute batch actions.

Line 200 through Line 203 collapses function identity to a basename (doWork), so two distinct actions can be treated as the same batch action.

🔧 Proposed fix (match fully-qualified safe function names)
-        // Detect batch actions by checking if the function name's base matches
-        // a registered batch action name.
+        // Detect batch actions by matching fully-qualified safe function names.
         let batchActionName: string | undefined;
         if (
           this.batchActionNames &&
           target.kind === "function" &&
           target.functionType === "action"
         ) {
           const fnName = safeFunctionName(target.function);
-          const parts = fnName.split(/[:/]/);
-          const baseName = parts[parts.length - 1];
-          if (this.batchActionNames.has(baseName)) {
-            batchActionName = baseName;
+          if (this.batchActionNames.has(fnName)) {
+            batchActionName = fnName;
           }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/step.ts` around lines 191 - 205, The code currently reduces
function identity to a basename (using safeFunctionName then taking the last
segment) which can collide across modules; instead compare the full safe
function name against the batchActionNames set. In the block that computes
batchActionName (symbols: batchActionNames, target, safeFunctionName,
batchActionName, functionType), stop extracting baseName via split and check
this.batchActionNames.has(fnName) and if true set batchActionName = fnName;
ensure anywhere that registers names into batchActionNames also uses the same
fully-qualified safeFunctionName format so the check is consistent.
src/component/coordinator.ts (1)

76-103: ⚠️ Potential issue | 🟠 Major

runWorkflowBatch still couples up to 50 workflow replays into one failure domain.

Line 88 through Line 91 executes each workflow via ctx.runMutation inside the same batch mutation. A single OCC/read-write-limit failure can roll back/retry the entire batch.

In Convex, when a mutation calls ctx.runMutation multiple times in a loop, are those calls part of the caller transaction/sub-transaction flow, and can one OCC conflict cause the whole parent mutation to retry or fail?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/coordinator.ts` around lines 76 - 103, The handler currently
calls multiple ctx.runMutation invocations in a loop, coupling up to 50 workflow
replays into a single failure domain; change the logic so each workflowHandle is
invoked in its own top-level mutation/task (so one OCC/read-write failure for
one replay doesn't rollback the whole batch). Concretely: stop awaiting
ctx.runMutation directly inside this handler's transaction loop; instead
schedule/dispatch each run of workflowHandle (the ctx.runMutation call) as an
independent mutation/task (so each has its own try/catch and its own
ctx.db.patch for runResult on failure), referencing handler, ctx.runMutation,
workflowHandle and ctx.db.patch to locate where to detach these calls. Ensure
each workflow's error handling remains per-workflow and updates runResult
individually.
src/component/taskQueue.ts (1)

174-194: ⚠️ Potential issue | 🟠 Major

Validate numShards before bumping executor epoch.

numShards <= 0 (or non-integer) currently advances epoch and can stop existing executors without scheduling replacements.

💡 Proposed fix
   handler: async (ctx, { executorHandle, numShards }) => {
+    if (!Number.isInteger(numShards) || numShards < 1) {
+      throw new Error(`numShards must be a positive integer, got ${numShards}`);
+    }
     // Increment executor epoch — old executors with stale epochs will terminate.
     const existing = await ctx.db
       .query("executorEpoch")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 174 - 194, The handler currently
bumps the executor epoch before validating numShards, which allows non-positive
or non-integer values to retire executors without scheduling replacements;
modify handler to validate numShards (ensure Number.isInteger(numShards) and
numShards > 0) at the top and throw or return an error before touching the
executorEpoch record if validation fails, then proceed to update/insert
executorEpoch and call ctx.scheduler.runAfter for each shard using the validated
numShards and epoch; reference the handler function, the executorEpoch DB
query/patch/insert logic, and the ctx.scheduler.runAfter/executorHandle
scheduling code when making this change.
src/client/index.ts (1)

486-490: ⚠️ Potential issue | 🟡 Minor

Log before dropping unflushed results after retry exhaustion.

This path clears pendingResults silently, which makes production triage much harder when flush repeatedly fails.

💡 Proposed fix
               if (retries >= MAX_FLUSH_RETRIES) {
                 // Give up on remaining items — tasks stay in queue,
                 // next executor run will re-process them.
+                console.error(
+                  `Executor shard ${shard}: dropping ${pendingResults.length} unflushed results after ${retries} retries (max ${MAX_FLUSH_RETRIES})`,
+                );
                 pendingResults.length = 0;
                 break;
               }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 486 - 490, Before clearing pendingResults
when retries >= MAX_FLUSH_RETRIES, add a log statement that records the fact
you're dropping items: include retries, MAX_FLUSH_RETRIES, the number of items
being dropped (pendingResults.length) and any available last error/flush failure
reason; place this log immediately before the line that sets
pendingResults.length = 0 inside the same retry/flush loop so operators can
triage. Use the module's existing logger (e.g., logger or processLogger) so the
message is emitted consistently with other logs and reference pendingResults,
retries and MAX_FLUSH_RETRIES in the message.
src/component/workflow.ts (1)

270-291: ⚠️ Potential issue | 🟠 Major

creationTimeBuckets still performs an unbounded scan for a workflow name.

This endpoint walks all matching rows in one call, which is risky at higher volumes. Please switch to a paginated/batched approach (similar to countByNamePage) for large histories.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 270 - 291, creationTimeBuckets
currently iterates all matching workflows in a single unbounded scan; change it
to a paginated/batched scan similar to countByNamePage by using
ctx.db.query("workflows").withIndex("name").order("desc") but calling
.limit(pageSize).start(cursor) (or whatever pagination API your DB wrapper
exposes) in a loop, updating the cursor each iteration and breaking when no more
results, and for each page accumulate bucket counts (compute bucket =
Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000) and
increment Map entries) before returning the sorted array; keep the same return
shape and parameter handling in creationTimeBuckets and reuse page size / cursor
logic consistent with countByNamePage to avoid unbounded scans.
🧹 Nitpick comments (4)
src/component/pool.ts (1)

97-97: Avoid hard-coding logger level in runtime handlers.

Line 97, Line 257, and Line 298 always use DEFAULT_LOG_LEVEL, which can hide diagnostics when operators configure higher verbosity. Prefer sourcing the configured level (context/config) before falling back.

Also applies to: 257-257, 298-299

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/pool.ts` at line 97, The code is hard-coding DEFAULT_LOG_LEVEL
when creating loggers (via createLogger(DEFAULT_LOG_LEVEL)), which ignores
operator-configured verbosity; change each creation site (the createLogger call
near the pool top and the other two occurrences) to first obtain the configured
level from the runtime context or config (e.g., ctx.logLevel,
context.getConfig('logLevel') or similar) and pass that value into createLogger,
falling back to DEFAULT_LOG_LEVEL only if the configured value is absent; update
all three locations (the createLogger/DEFAULT_LOG_LEVEL usages) so they use the
resolved configuredLevel variable instead of the literal DEFAULT_LOG_LEVEL.
src/component/workflow.ts (1)

334-340: timelinePage does N+1 full steps.collect() queries.

Fetching all steps for every workflow in-page can get expensive quickly. Consider limiting step payload (or paginating steps) for this view.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/workflow.ts` around lines 334 - 340, The timelinePage code
issues N+1 DB calls by running ctx.db.query("steps").withIndex("workflow", q =>
q.eq("workflowId", wf._id)).collect() for each wf in filtered; replace this with
a single batched query that retrieves steps for all workflows in one call (e.g.,
collect all workflow IDs from filtered, run
ctx.db.query("steps").withIndex("workflow", q => q.in("workflowId",
ids)).collect()), then group the returned step docs by workflowId and attach the
limited/paginated slice to each workflow; also reduce payload by selecting only
needed fields (or applying a per-workflow limit) when constructing the step
lists for timelinePage.
src/client/index.ts (1)

165-169: Reject duplicate executor action names instead of silently overriding.

A second registration with the same name currently overwrites the previous handler in executorActionHandlers.

💡 Proposed fix
     // Store handler for executor to call, register name for
     // batch action detection in step.ts, return a dummy action placeholder.
+    if (this.executorActionHandlers.has(name)) {
+      throw new Error(`Duplicate executor action registration: ${name}`);
+    }
     this.executorActionHandlers.set(name, opts.handler);
     this.batchActionNames.add(name);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 165 - 169, Check for an existing
registration before inserting: in the code that currently does
this.executorActionHandlers.set(name, opts.handler) and
this.batchActionNames.add(name), first test
this.executorActionHandlers.has(name) and reject the duplicate (throw an Error
or return a failure) instead of overwriting; if rejecting, do not modify
this.batchActionNames and include the duplicate name in the error message so
callers can diagnose which action is duplicated.
src/component/taskQueue.ts (1)

56-57: Task queue handlers bypass configurable log levels.

Using createLogger(DEFAULT_LOG_LEVEL) here ignores configured runtime log level behavior used elsewhere.

💡 Proposed fix
-import { createLogger, DEFAULT_LOG_LEVEL } from "./logging.js";
+import { getDefaultLogger } from "./utils.js";

 // ...
-const console = createLogger(DEFAULT_LOG_LEVEL);
+const console = await getDefaultLogger(ctx);

Also applies to: 230-231, 385-386, 421-422

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/taskQueue.ts` around lines 56 - 57, Handlers create a logger
with a hardcoded DEFAULT_LOG_LEVEL via the expression
createLogger(DEFAULT_LOG_LEVEL), which bypasses the runtime-configured log
level; replace those calls to use the actual configured runtime log level (e.g.
read from the existing configuration or a central logger) instead of
DEFAULT_LOG_LEVEL — for example, call createLogger(getConfiguredLogLevel()) or
reuse the shared logger instance injected into the module; update every
occurrence of createLogger(DEFAULT_LOG_LEVEL) in taskQueue.ts (including the
instances around the earlier noted positions) so all task queue handlers honor
the configured log level.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/client/index.ts`:
- Around line 346-350: The flushLoop currently only calls flush() when
pendingResults.length > 0, so replayRescueQueue can stall until new results
arrive; modify flushLoop (the function containing the while (flushLoopRunning)
loop) to either call flush() unconditionally each interval or explicitly invoke
the logic that processes replayRescueQueue when pendingResults is empty (e.g.,
call replayRescueQueue or a helper like flushReplayRescueQueue inside the loop),
ensuring FLUSH_INTERVAL_MS still governs the retry cadence so rescue items are
retried periodically even if pendingResults is empty.

In `@src/component/journal.ts`:
- Around line 183-193: Queries and mutations enqueued in the executor branches
are missing retry metadata; add a qmRetryConfig computed like actionRetryConfig
but applied when step.functionType is "query" or "mutation" (respecting
stepArgs.retry === false, object override, otherwise DEFAULT_QM_RETRY) and pass
that qmRetryConfig into the enqueue calls inside the switch (step.functionType)
cases for "query" and "mutation" so executor-routed tasks get the same per-step
override + default retry behavior.

---

Duplicate comments:
In `@package.json`:
- Around line 66-67: The package reintroduces the runtime dependency
"@anthropic-ai/sdk" into package.json but the PR comment requests usage
verification; search the repo for direct imports/usages of "@anthropic-ai/sdk"
(import, require, new Anthropic, anthropic.*) across source, examples, docs and
tests, and if no usage is found remove the dependency from package.json (and any
lockfile update) or if it is required, add a short code reference (file path and
symbol) and a minimal test/example showing its use plus a package.json comment
or release note explaining why it’s needed; ensure any CI/test changes reflect
the dependency addition if kept.

In `@src/client/index.ts`:
- Around line 486-490: Before clearing pendingResults when retries >=
MAX_FLUSH_RETRIES, add a log statement that records the fact you're dropping
items: include retries, MAX_FLUSH_RETRIES, the number of items being dropped
(pendingResults.length) and any available last error/flush failure reason; place
this log immediately before the line that sets pendingResults.length = 0 inside
the same retry/flush loop so operators can triage. Use the module's existing
logger (e.g., logger or processLogger) so the message is emitted consistently
with other logs and reference pendingResults, retries and MAX_FLUSH_RETRIES in
the message.

In `@src/client/step.ts`:
- Around line 191-205: The code currently reduces function identity to a
basename (using safeFunctionName then taking the last segment) which can collide
across modules; instead compare the full safe function name against the
batchActionNames set. In the block that computes batchActionName (symbols:
batchActionNames, target, safeFunctionName, batchActionName, functionType), stop
extracting baseName via split and check this.batchActionNames.has(fnName) and if
true set batchActionName = fnName; ensure anywhere that registers names into
batchActionNames also uses the same fully-qualified safeFunctionName format so
the check is consistent.

In `@src/component/coordinator.ts`:
- Around line 76-103: The handler currently calls multiple ctx.runMutation
invocations in a loop, coupling up to 50 workflow replays into a single failure
domain; change the logic so each workflowHandle is invoked in its own top-level
mutation/task (so one OCC/read-write failure for one replay doesn't rollback the
whole batch). Concretely: stop awaiting ctx.runMutation directly inside this
handler's transaction loop; instead schedule/dispatch each run of workflowHandle
(the ctx.runMutation call) as an independent mutation/task (so each has its own
try/catch and its own ctx.db.patch for runResult on failure), referencing
handler, ctx.runMutation, workflowHandle and ctx.db.patch to locate where to
detach these calls. Ensure each workflow's error handling remains per-workflow
and updates runResult individually.

In `@src/component/taskQueue.ts`:
- Around line 174-194: The handler currently bumps the executor epoch before
validating numShards, which allows non-positive or non-integer values to retire
executors without scheduling replacements; modify handler to validate numShards
(ensure Number.isInteger(numShards) and numShards > 0) at the top and throw or
return an error before touching the executorEpoch record if validation fails,
then proceed to update/insert executorEpoch and call ctx.scheduler.runAfter for
each shard using the validated numShards and epoch; reference the handler
function, the executorEpoch DB query/patch/insert logic, and the
ctx.scheduler.runAfter/executorHandle scheduling code when making this change.

In `@src/component/workflow.ts`:
- Around line 270-291: creationTimeBuckets currently iterates all matching
workflows in a single unbounded scan; change it to a paginated/batched scan
similar to countByNamePage by using
ctx.db.query("workflows").withIndex("name").order("desc") but calling
.limit(pageSize).start(cursor) (or whatever pagination API your DB wrapper
exposes) in a loop, updating the cursor each iteration and breaking when no more
results, and for each page accumulate bucket counts (compute bucket =
Math.floor((wf._creationTime - createdAfter) / bucketMs) * (bucketMs / 1000) and
increment Map entries) before returning the sorted array; keep the same return
shape and parameter handling in creationTimeBuckets and reuse page size / cursor
logic consistent with countByNamePage to avoid unbounded scans.

---

Nitpick comments:
In `@src/client/index.ts`:
- Around line 165-169: Check for an existing registration before inserting: in
the code that currently does this.executorActionHandlers.set(name, opts.handler)
and this.batchActionNames.add(name), first test
this.executorActionHandlers.has(name) and reject the duplicate (throw an Error
or return a failure) instead of overwriting; if rejecting, do not modify
this.batchActionNames and include the duplicate name in the error message so
callers can diagnose which action is duplicated.

In `@src/component/pool.ts`:
- Line 97: The code is hard-coding DEFAULT_LOG_LEVEL when creating loggers (via
createLogger(DEFAULT_LOG_LEVEL)), which ignores operator-configured verbosity;
change each creation site (the createLogger call near the pool top and the other
two occurrences) to first obtain the configured level from the runtime context
or config (e.g., ctx.logLevel, context.getConfig('logLevel') or similar) and
pass that value into createLogger, falling back to DEFAULT_LOG_LEVEL only if the
configured value is absent; update all three locations (the
createLogger/DEFAULT_LOG_LEVEL usages) so they use the resolved configuredLevel
variable instead of the literal DEFAULT_LOG_LEVEL.

In `@src/component/taskQueue.ts`:
- Around line 56-57: Handlers create a logger with a hardcoded DEFAULT_LOG_LEVEL
via the expression createLogger(DEFAULT_LOG_LEVEL), which bypasses the
runtime-configured log level; replace those calls to use the actual configured
runtime log level (e.g. read from the existing configuration or a central
logger) instead of DEFAULT_LOG_LEVEL — for example, call
createLogger(getConfiguredLogLevel()) or reuse the shared logger instance
injected into the module; update every occurrence of
createLogger(DEFAULT_LOG_LEVEL) in taskQueue.ts (including the instances around
the earlier noted positions) so all task queue handlers honor the configured log
level.

In `@src/component/workflow.ts`:
- Around line 334-340: The timelinePage code issues N+1 DB calls by running
ctx.db.query("steps").withIndex("workflow", q => q.eq("workflowId",
wf._id)).collect() for each wf in filtered; replace this with a single batched
query that retrieves steps for all workflows in one call (e.g., collect all
workflow IDs from filtered, run ctx.db.query("steps").withIndex("workflow", q =>
q.in("workflowId", ids)).collect()), then group the returned step docs by
workflowId and attach the limited/paginated slice to each workflow; also reduce
payload by selecting only needed fields (or applying a per-workflow limit) when
constructing the step lists for timelinePage.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b1552ab and bafddef.

⛔ Files ignored due to path filters (5)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
  • package-lock.json is excluded by !**/package-lock.json
  • src/component/_generated/api.ts is excluded by !**/_generated/**
  • src/component/_generated/component.ts is excluded by !**/_generated/**
  • src/component/_generated/server.ts is excluded by !**/_generated/**
📒 Files selected for processing (13)
  • example/convex/convex.config.ts
  • example/convex/schema.ts
  • package.json
  • src/client/index.ts
  • src/client/step.ts
  • src/client/workflowMutation.ts
  • src/component/coordinator.ts
  • src/component/event.ts
  • src/component/journal.ts
  • src/component/pool.ts
  • src/component/schema.ts
  • src/component/taskQueue.ts
  • src/component/workflow.ts

Comment thread src/client/index.ts
Comment on lines +346 to +350
while (flushLoopRunning) {
await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS));
if (pendingResults.length > 0) {
await flush();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replay rescue queue is not retried unless new step results arrive.

flushLoop skips flush() when pendingResults is empty, so replayRescueQueue can sit idle until shutdown/final safety-net scheduling.

💡 Proposed fix
         const flushLoop = (async () => {
           while (flushLoopRunning) {
             await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS));
-            if (pendingResults.length > 0) {
+            if (pendingResults.length > 0 || replayRescueQueue.length > 0) {
               await flush();
             }
           }
         })();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (flushLoopRunning) {
await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS));
if (pendingResults.length > 0) {
await flush();
}
const flushLoop = (async () => {
while (flushLoopRunning) {
await new Promise((r) => setTimeout(r, FLUSH_INTERVAL_MS));
if (pendingResults.length > 0 || replayRescueQueue.length > 0) {
await flush();
}
}
})();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 346 - 350, The flushLoop currently only
calls flush() when pendingResults.length > 0, so replayRescueQueue can stall
until new results arrive; modify flushLoop (the function containing the while
(flushLoopRunning) loop) to either call flush() unconditionally each interval or
explicitly invoke the logic that processes replayRescueQueue when pendingResults
is empty (e.g., call replayRescueQueue or a helper like flushReplayRescueQueue
inside the loop), ensuring FLUSH_INTERVAL_MS still governs the retry cadence so
rescue items are retried periodically even if pendingResults is empty.

Comment thread src/component/journal.ts
Comment on lines +183 to 193
// Retry config only applies to actions — queries and mutations
// are deterministic and retried automatically by Convex on OCC.
const actionRetryConfig =
step.functionType === "action"
? (stepArgs.retry === false
? undefined
: typeof stepArgs.retry === "object"
? stepArgs.retry
: DEFAULT_QM_RETRY)
: undefined;
switch (step.functionType) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Executor-routed queries/mutations are missing retry policy metadata.

Line 195 through Line 205 and Line 218 through Line 228 enqueue query/mutation tasks without retry, so transient failures can fail immediately in executor mode.

🔧 Proposed fix (preserve per-step override with default fallback)
-          // Retry config only applies to actions — queries and mutations
-          // are deterministic and retried automatically by Convex on OCC.
-          const actionRetryConfig =
-            step.functionType === "action"
-              ? (stepArgs.retry === false
-                  ? undefined
-                  : typeof stepArgs.retry === "object"
-                    ? stepArgs.retry
-                    : DEFAULT_QM_RETRY)
-              : undefined;
+          const retryConfig =
+            stepArgs.retry === false
+              ? undefined
+              : typeof stepArgs.retry === "object"
+                ? stepArgs.retry
+                : DEFAULT_QM_RETRY;
@@
             case "query": {
               if (workflow.executorShards) {
                 const shard = shardForWorkflow(workflow._id as string, workflow.executorShards);
                 await ctx.db.insert("taskQueue", {
                   shard,
                   functionType: "query",
                   handle: step.handle,
                   args: step.args,
                   stepId,
                   workflowId: workflow._id,
                   generationNumber,
+                  ...(retryConfig ? { retry: retryConfig } : {}),
                 });
                 workId = `executor:${stepId}`;
@@
             case "mutation": {
               if (workflow.executorShards) {
                 const shard = shardForWorkflow(workflow._id as string, workflow.executorShards);
                 await ctx.db.insert("taskQueue", {
                   shard,
                   functionType: "mutation",
                   handle: step.handle,
                   args: step.args,
                   stepId,
                   workflowId: workflow._id,
                   generationNumber,
+                  ...(retryConfig ? { retry: retryConfig } : {}),
                 });
                 workId = `executor:${stepId}`;
@@
             case "action": {
@@
                 await ctx.db.insert("taskQueue", {
@@
-                  retry: actionRetryConfig,
+                  ...(retryConfig ? { retry: retryConfig } : {}),
                 });

Also applies to: 195-205, 218-228

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/journal.ts` around lines 183 - 193, Queries and mutations
enqueued in the executor branches are missing retry metadata; add a
qmRetryConfig computed like actionRetryConfig but applied when step.functionType
is "query" or "mutation" (respecting stepArgs.retry === false, object override,
otherwise DEFAULT_QM_RETRY) and pass that qmRetryConfig into the enqueue calls
inside the switch (step.functionType) cases for "query" and "mutation" so
executor-routed tasks get the same per-step override + default retry behavior.

@ianmacartney
Copy link
Copy Markdown
Member

ianmacartney commented Mar 3, 2026 via email

@ianmacartney
Copy link
Copy Markdown
Member

I'm currently set as the reviewer but let me know if you feel it's actually watertight enough to consider merging. Given how little code is deleted vs. added, I suspect this could be a new component, similar to the Workpool executor mode.

@sethconvex
Copy link
Copy Markdown
Contributor Author

sethconvex commented Mar 3, 2026 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants