feat: P40 config validation + P45 distributed control plane#14
Conversation
P40 — Configuration validation - T4: tests for PostgresStateStore.preflight() (impl already in place) - T5: nodes/docker-availability.ts — preflight + capability filter wired into node-start so unreachable Docker strips 'docker' from advertised capabilities instead of crashing on first dispatch - T6: nodes/ssh-preflight.ts — startup ping for ssh-typed nodes; unreachable hosts get a loud warning and are marked offline P45 — Distributed topology - T3: PostgresEventBus over LISTEN/NOTIFY + buildEventBus factory selecting via AGENTFORGE_EVENT_BUS=postgres|memory; wired into platform-cli - T4: IJobQueue port + InMemoryJobQueue (default) + PostgresJobQueue using FOR UPDATE SKIP LOCKED for atomic claims across replicas; migration 003-job-queue.sql provisions agent_jobs with claim metadata + indexes - T5: ILeaderElector port + LocalLeaderElector + PostgresLeaderElector (pg_advisory_lock with hashed bigint lock ids) + runWhenLeader helper to gate singleton interval loops (reconciler, scheduler) - T6: IActiveRunCounter port + InMemoryActiveRunCounter + DbActiveRunCounter; LocalAgentScheduler.schedule() is now async and re-reads counts on every decision so two replicas never disagree on load; migration 004-active-run-index.sql adds the partial index for the active-runs query Tests: 1643 passing, typecheck clean, lint clean. T7 (multi-replica integration test in CI) deferred — all building blocks are in place; Postgres-backed orchestration test is its own task.
Major (12) - Scheduler: Promise.allSettled so one failed counter call doesn't block every dispatch; treat failed nodes as full - Scheduler: log counter.recordStarted/Completed rejections instead of void-discarding them - PostgresJobQueue: tolerate corrupted payload rows (warn + skip); COALESCE per-job claim_ttl_ms in reclaimStale so each job ages on its own deadline - InMemoryJobQueue: same per-job ttlMs honour in reclaimStale; enqueue is now a no-op on duplicate runId to match Postgres' ON CONFLICT - PostgresEventBus: separate notify pool from LISTEN client (pg recommends the listener stay idle); start() is idempotent; close() ends both client + pool; channel name validated at construct so LISTEN/UNLISTEN identifier interpolation can't be hijacked - platform-cli: await SSH preflight before health-checker so they don't race on node status; close eventBus on shutdown so PG connections don't leak - DbActiveRunCounter: SQL builds IN clause from ACTIVE_STATUSES via ANY($::text[]) so the constant can't drift out of sync - docker-availability: replace fragile tcp:// regex with new URL parser (handles IPv6, ignores path segments, normalizes ports) Minor (2) - leader-gated-loop: log body errors instead of silently swallowing them - node-start: lower-case capability list before docker check so --capabilities Docker still triggers preflight Doc nits - PostgresLeaderElector: pool sizing + hash collision JSDoc - Parallelize lock release in close() - buildEventBus test asserts the misconfiguration warning fires Skipped (with rationale) - PgLite-based integration tests (3 nitpicks): mock-based tests fully validate the SQL contract; PgLite migration is its own task - close() race with in-flight pg_notify (1 nitpick): close is best-effort, acceptable for shutdown - _setNow @internal annotation (1 nitpick): underscore convention works - Pool sizing options (1 nitpick): defaults reasonable for current load - CHECK constraint on agent_jobs (1 nitpick): nice-to-have Tests: 1651 passing (+8), typecheck clean, lint clean.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (12)
📝 WalkthroughWalkthroughThis PR introduces distributed infrastructure for job queue management, leader election, and active-run counting via pluggable domain ports, with in-memory and Postgres implementations. The scheduler is refactored to query async active-run counters for load-aware node selection. A leader-gated loop enables singleton background operations across replicas. An event-bus factory provides pluggable Postgres LISTEN/NOTIFY multi-replica pub/sub. Node startup gains Docker availability and SSH reachability preflight checks. Database migrations provision the job queue table and active-run index. ChangesDistributed Job Queue and Leader Election
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/core/src/adapters/leader/local-leader-elector.ts`:
- Around line 12-15: The acquire(lockName: string) method currently always
returns true allowing multiple callers to think they hold the same lock; update
acquire to check the internal held set first (this.held) and only add lockName
and return true when it is not already present, otherwise return false (as a
Promise<boolean>) so that concurrent attempts for the same lockName fail; keep
the return type Promise<boolean> and preserve existing behavior for unique
lockNames.
In `@packages/core/src/control-plane/leader-gated-loop.ts`:
- Around line 21-51: The leader loop allows overlapping executions because
setInterval fires without awaiting tick(), and elector.release() is
fire-and-forget so rejections are unhandled; change the loop to a
self-scheduling async function (replace setInterval usage with an async loop
that awaits tick before scheduling the next run) or add a running flag inside
tick() to skip if already running (use the tick function and body() to gate
concurrent runs), and update the shutdown cleanup to await or catch the Promise
returned by elector.release(lockName) (reference tick, body, elector.acquire,
elector.release, setInterval, and handle) so any rejection is handled (catch
errors) instead of causing unhandled promise rejections.
In `@packages/platform/src/adapters/events/postgres-event-bus.ts`:
- Around line 110-118: The close() method currently awaits this.client.end()
then this.notifyPool.end(), so if this.client.end() rejects notifyPool.end() is
skipped and the pool leaks; change close() to isolate teardowns by first
attempting UNLISTEN with try/catch, then call both this.client.end() and
this.notifyPool.end() concurrently using Promise.allSettled (or otherwise await
both independently) and handle/rethrow any errors as needed so both shutdown
operations are always attempted; update references in close(), this.client.end,
and this.notifyPool.end to use Promise.allSettled to await their results.
- Around line 55-71: start() currently connects the pg Client and registers a
"notification" handler before issuing the LISTEN query, which leaves the client
connected and accumulates listeners if LISTEN fails; change start() so that on
any error after a successful this.client.connect() you remove the registered
notification listener and close the client (e.g.,
this.client.removeListener("notification", handler) and await this.client.end())
before rethrowing the error, and only set this.started = true after the LISTEN
query succeeds (or alternatively register the handler only after successful
LISTEN); reference symbols: start(), this.client.connect(),
this.client.on("notification", ...), this.client.query(`LISTEN
${this.channel}`), this.client.removeListener, this.client.end(), and
this.started.
In `@packages/platform/src/adapters/jobs/postgres-job-queue.ts`:
- Around line 58-69: Currently, when PostgresJobQueue.claim() parses rows it
silently drops rows whose payload fails JSON.parse (the parsed: AgentJob[]
loop), leaving those rows still marked claimed so reclaimStale() will unclaim
and they get re-claimed repeatedly; update the loop in postgres-job-queue.ts to
delete the DB row when JSON.parse(r.payload) throws (use the same run_id from
the row) instead of silently skipping it, ensuring callers will not need to call
complete() for that run and the bad payload is removed permanently; keep logging
the parse error and perform the delete via the queue's DB client within the
catch block so the corrupted row cannot re-enter the claim cycle.
In `@packages/platform/src/adapters/leader/postgres-leader-elector.ts`:
- Around line 36-70: The acquire() and release() paths must destroy pooled
clients when an error occurs to avoid returning a session that still holds an
advisory lock; change the acquire() catch to call client.release(true) instead
of release(), and in release() restructure the unlock call so you try { await
handle.client.query("SELECT pg_advisory_unlock(...)"); handle.client.release();
this.held.delete(lockName); } catch (err) { handle.client.release(true);
this.held.delete(lockName); throw err; } — i.e. call client.release(true) on
errors and only call client.release() and then delete the held entry when unlock
succeeds (rethrow the error after destroying the client).
In `@packages/platform/src/nodes/docker-availability.ts`:
- Around line 85-91: The socket's `once("error", ...)` handler can allow a
second "error" event to go unhandled; after registering the `once` handlers add
a persistent no-op sink (e.g., `const noop = () => {}; socket.on("error",
noop);`) and remove that sink inside the `cleanup` function (e.g.,
`socket.removeListener("error", noop)`) so `cleanup` still destroys the socket
and resolves but prevents a subsequent unhandled error; reference the `socket`
variable and the `cleanup` function in docker-availability.ts.
In `@packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts`:
- Around line 49-51: The test for db-active-run-counter.count does not exercise
the "no rows match" fallback because mockQuery currently returns rows: [{ count:
"0" }]; change the mocked response in the test (the
mockQuery.mockResolvedValueOnce call in
packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts) to
return rows: [] so counter.count("worker-a") executes the rows[0] missing-path
and returns 0 as intended.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d1a8c057-e466-48d9-80a1-c47dc377811e
📒 Files selected for processing (32)
packages/core/src/adapters/jobs/in-memory-job-queue.tspackages/core/src/adapters/leader/local-leader-elector.tspackages/core/src/control-plane/leader-gated-loop.tspackages/core/src/control-plane/pipeline-controller.tspackages/core/src/control-plane/scheduler.tspackages/core/src/domain/ports/active-run-counter.port.tspackages/core/src/domain/ports/job-queue.port.tspackages/core/src/domain/ports/leader-elector.port.tspackages/core/tests/adapters/jobs/in-memory-job-queue.test.tspackages/core/tests/adapters/leader/local-leader-elector.test.tspackages/core/tests/control-plane/leader-gated-loop.test.tspackages/core/tests/control-plane/scheduler.test.tspackages/core/tests/control-plane/stateless-scheduler.test.tspackages/platform/src/adapters/events/event-bus-factory.tspackages/platform/src/adapters/events/postgres-event-bus.tspackages/platform/src/adapters/jobs/postgres-job-queue.tspackages/platform/src/adapters/leader/postgres-leader-elector.tspackages/platform/src/adapters/scheduling/db-active-run-counter.tspackages/platform/src/cli/commands/node-start.tspackages/platform/src/nodes/docker-availability.tspackages/platform/src/nodes/ssh-preflight.tspackages/platform/src/platform-cli.tspackages/platform/src/state/migrations/postgres/003-job-queue.sqlpackages/platform/src/state/migrations/postgres/004-active-run-index.sqlpackages/platform/tests/adapters/events/event-bus-factory.test.tspackages/platform/tests/adapters/events/postgres-event-bus.test.tspackages/platform/tests/adapters/jobs/postgres-job-queue.test.tspackages/platform/tests/adapters/leader/postgres-leader-elector.test.tspackages/platform/tests/adapters/scheduling/db-active-run-counter.test.tspackages/platform/tests/nodes/docker-availability.test.tspackages/platform/tests/nodes/ssh-preflight.test.tspackages/platform/tests/state/pg-store.test.ts
Major correctness fixes - LocalLeaderElector.acquire(): returns false when the lock is already held — two runWhenLeader instances on the same process can no longer both think they're leader - runWhenLeader: in-flight reentrancy guard (setInterval doesn't await async bodies, so a slow reconciler scan would have N ticks racing); belt-and-braces .catch on tick() so timer-boundary rejections never surface as unhandled - PostgresJobQueue.claim(): DELETE corrupted-payload rows so the reclaimStale → re-claim → JSON.parse-throws cycle can't poison the FIFO head forever - PostgresLeaderElector: pool client is destroyed (release(true)) on acquire query failure and on pg_advisory_unlock failure — prevents a session that may still hold an advisory lock from being returned to the pool, which would split-brain leadership Minor lifecycle / robustness fixes - PostgresEventBus.start(): if LISTEN throws after a successful connect(), tear down the client and rebuild it so a retry can succeed without "Client is already connected" - PostgresEventBus.close(): allSettled so notifyPool.end() runs even when the listener client.end() rejects - docker-availability probe: persistent no-op "error" listener absorbs late events that arrive after cleanup() destroys the socket (e.g. TCP RST + ETIMEDOUT race) — prevents an unhandled error crashing the process at startup Test polish - DbActiveRunCounter: empty-rows mock is now `rows: []` so the rows[0]?.count fallback path is actually exercised Tests: 1659 passing (+8 new), typecheck clean, lint clean.
Summary
Implements P40 (Configuration Validation & Startup Warnings) and the bulk of P45 (Distributed Topology). Adds the missing pieces that let multiple control-plane replicas share state without lost dispatches, duplicate reconciler runs, or split-brain scheduling.
P40 — Config validation (3 tasks)
SELECT 1) — friendly exit on missing or unreachable DB; tests addeddockerfrom advertised capabilities (warn, don't crash)P45 — Distributed control plane (4 tasks)
PostgresEventBusoverLISTEN/NOTIFYwith separate notify pool (listener stays idle); selected viaAGENTFORGE_EVENT_BUS=postgres|memoryagent_jobstable +FOR UPDATE SKIP LOCKEDclaims so multi-replica dispatch is atomic. Per-jobclaim_ttl_mshonoured byreclaimStalepg_advisory_lock(session-scoped, auto-released on disconnect) +runWhenLeaderhelper for singleton interval loops (reconciler, scheduler)IActiveRunCounterport; platform plugs in a DB-backed counter that queriesagent_runs WHERE status IN ('pending','scheduled','running')on every decision. Partial index addedArchitecture
IJobQueue,ILeaderElector,IActiveRunCounterInMemoryJobQueue,LocalLeaderElector,InMemoryActiveRunCounterPostgresJobQueue,PostgresLeaderElector,DbActiveRunCounter,PostgresEventBusLocalAgentScheduler.schedule()is now async (usesPromise.allSettledso a single hung counter doesn't block all dispatch)003-job-queue.sql,004-active-run-index.sqlDeferred
P45-T7 — multi-replica integration test in CI (real Postgres + 2 control planes + 2 workers). All building blocks are in place; orchestration test is its own task.
CodeRabbit review
23 findings → 17 fixed, 6 skipped with rationale (3 PgLite migrations, close-race, pool-sizing nit,
@internalannotation). See commit6777929for the fix details.Test plan
tsc --build)biome check)AGENTFORGE_EVENT_BUS=postgresand verify SSE events span replicas (deferred to P45-T7)Summary by CodeRabbit
New Features
Improvements