From 4133ce8f411e5d021699f1de2ea750b4747dbacd Mon Sep 17 00:00:00 2001 From: Mandar Nilange Date: Fri, 8 May 2026 07:52:38 +0530 Subject: [PATCH 1/3] feat(platform): P40 config validation + P45 distributed control plane MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P40 — Configuration validation - T4: tests for PostgresStateStore.preflight() (impl already in place) - T5: nodes/docker-availability.ts — preflight + capability filter wired into node-start so unreachable Docker strips 'docker' from advertised capabilities instead of crashing on first dispatch - T6: nodes/ssh-preflight.ts — startup ping for ssh-typed nodes; unreachable hosts get a loud warning and are marked offline P45 — Distributed topology - T3: PostgresEventBus over LISTEN/NOTIFY + buildEventBus factory selecting via AGENTFORGE_EVENT_BUS=postgres|memory; wired into platform-cli - T4: IJobQueue port + InMemoryJobQueue (default) + PostgresJobQueue using FOR UPDATE SKIP LOCKED for atomic claims across replicas; migration 003-job-queue.sql provisions agent_jobs with claim metadata + indexes - T5: ILeaderElector port + LocalLeaderElector + PostgresLeaderElector (pg_advisory_lock with hashed bigint lock ids) + runWhenLeader helper to gate singleton interval loops (reconciler, scheduler) - T6: IActiveRunCounter port + InMemoryActiveRunCounter + DbActiveRunCounter; LocalAgentScheduler.schedule() is now async and re-reads counts on every decision so two replicas never disagree on load; migration 004-active-run-index.sql adds the partial index for the active-runs query Tests: 1643 passing, typecheck clean, lint clean. T7 (multi-replica integration test in CI) deferred — all building blocks are in place; Postgres-backed orchestration test is its own task. --- .../src/adapters/jobs/in-memory-job-queue.ts | 95 ++++++++++++++ .../adapters/leader/local-leader-elector.ts | 25 ++++ .../src/control-plane/leader-gated-loop.ts | 47 +++++++ .../src/control-plane/pipeline-controller.ts | 2 +- packages/core/src/control-plane/scheduler.ts | 80 +++++++++--- .../domain/ports/active-run-counter.port.ts | 21 +++ .../core/src/domain/ports/job-queue.port.ts | 52 ++++++++ .../src/domain/ports/leader-elector.port.ts | 28 ++++ .../adapters/jobs/in-memory-job-queue.test.ts | 99 ++++++++++++++ .../leader/local-leader-elector.test.ts | 38 ++++++ .../control-plane/leader-gated-loop.test.ts | 95 ++++++++++++++ .../tests/control-plane/scheduler.test.ts | 34 ++--- .../control-plane/stateless-scheduler.test.ts | 104 +++++++++++++++ .../src/adapters/events/event-bus-factory.ts | 45 +++++++ .../src/adapters/events/postgres-event-bus.ts | 92 +++++++++++++ .../src/adapters/jobs/postgres-job-queue.ts | 86 +++++++++++++ .../leader/postgres-leader-elector.ts | 92 +++++++++++++ .../scheduling/db-active-run-counter.ts | 51 ++++++++ .../platform/src/cli/commands/node-start.ts | 19 ++- .../platform/src/nodes/docker-availability.ts | 87 +++++++++++++ packages/platform/src/nodes/ssh-preflight.ts | 55 ++++++++ packages/platform/src/platform-cli.ts | 29 ++++- .../migrations/postgres/003-job-queue.sql | 24 ++++ .../postgres/004-active-run-index.sql | 7 + .../adapters/events/event-bus-factory.test.ts | 61 +++++++++ .../events/postgres-event-bus.test.ts | 120 +++++++++++++++++ .../adapters/jobs/postgres-job-queue.test.ts | 118 +++++++++++++++++ .../leader/postgres-leader-elector.test.ts | 93 ++++++++++++++ .../scheduling/db-active-run-counter.test.ts | 57 +++++++++ .../tests/nodes/docker-availability.test.ts | 67 ++++++++++ .../tests/nodes/ssh-preflight.test.ts | 121 ++++++++++++++++++ .../platform/tests/state/pg-store.test.ts | 15 +++ 32 files changed, 1919 insertions(+), 40 deletions(-) create mode 100644 packages/core/src/adapters/jobs/in-memory-job-queue.ts create mode 100644 packages/core/src/adapters/leader/local-leader-elector.ts create mode 100644 packages/core/src/control-plane/leader-gated-loop.ts create mode 100644 packages/core/src/domain/ports/active-run-counter.port.ts create mode 100644 packages/core/src/domain/ports/job-queue.port.ts create mode 100644 packages/core/src/domain/ports/leader-elector.port.ts create mode 100644 packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts create mode 100644 packages/core/tests/adapters/leader/local-leader-elector.test.ts create mode 100644 packages/core/tests/control-plane/leader-gated-loop.test.ts create mode 100644 packages/core/tests/control-plane/stateless-scheduler.test.ts create mode 100644 packages/platform/src/adapters/events/event-bus-factory.ts create mode 100644 packages/platform/src/adapters/events/postgres-event-bus.ts create mode 100644 packages/platform/src/adapters/jobs/postgres-job-queue.ts create mode 100644 packages/platform/src/adapters/leader/postgres-leader-elector.ts create mode 100644 packages/platform/src/adapters/scheduling/db-active-run-counter.ts create mode 100644 packages/platform/src/nodes/docker-availability.ts create mode 100644 packages/platform/src/nodes/ssh-preflight.ts create mode 100644 packages/platform/src/state/migrations/postgres/003-job-queue.sql create mode 100644 packages/platform/src/state/migrations/postgres/004-active-run-index.sql create mode 100644 packages/platform/tests/adapters/events/event-bus-factory.test.ts create mode 100644 packages/platform/tests/adapters/events/postgres-event-bus.test.ts create mode 100644 packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts create mode 100644 packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts create mode 100644 packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts create mode 100644 packages/platform/tests/nodes/docker-availability.test.ts create mode 100644 packages/platform/tests/nodes/ssh-preflight.test.ts diff --git a/packages/core/src/adapters/jobs/in-memory-job-queue.ts b/packages/core/src/adapters/jobs/in-memory-job-queue.ts new file mode 100644 index 0000000..3b8d001 --- /dev/null +++ b/packages/core/src/adapters/jobs/in-memory-job-queue.ts @@ -0,0 +1,95 @@ +/** + * InMemoryJobQueue — single-process IJobQueue implementation (P45-T4). + * + * Mirrors the contract the PostgresJobQueue must honour so the same wiring + * works with a swap of adapter via `AGENTFORGE_JOB_QUEUE=postgres|memory`. + * Not safe across processes — for multi-replica control planes use the + * Postgres adapter. + */ + +import type { AgentJob } from "../../domain/ports/agent-executor.port.js"; +import type { + ClaimOptions, + IJobQueue, +} from "../../domain/ports/job-queue.port.js"; + +interface QueueEntry { + job: AgentJob; + nodeName: string; + claimedBy?: string; + claimedAt?: number; + ttlMs?: number; +} + +export interface InMemoryJobQueueOptions { + now?: () => number; +} + +export class InMemoryJobQueue implements IJobQueue { + private readonly entries = new Map(); + private now: () => number; + + constructor(opts: InMemoryJobQueueOptions = {}) { + this.now = opts.now ?? (() => Date.now()); + } + + /** Test-only — override the clock without leaking the field publicly. */ + _setNow(t: number): void { + this.now = () => t; + } + + enqueue(job: AgentJob, nodeName: string): Promise { + this.entries.set(job.runId, { job, nodeName }); + return Promise.resolve(); + } + + claim(nodeName: string, opts: ClaimOptions = {}): Promise { + const limit = opts.limit ?? 1; + const ttlMs = opts.ttlMs ?? 5 * 60 * 1000; + const now = this.now(); + const claimed: AgentJob[] = []; + // Iterate insertion order — Map preserves it. Pick up to `limit` + // pending entries for this node and atomically mark them claimed. + for (const entry of this.entries.values()) { + if (claimed.length >= limit) break; + if (entry.nodeName !== nodeName) continue; + if (entry.claimedBy !== undefined) continue; + entry.claimedBy = nodeName; + entry.claimedAt = now; + entry.ttlMs = ttlMs; + claimed.push(entry.job); + } + return Promise.resolve(claimed); + } + + complete(runId: string): Promise { + this.entries.delete(runId); + return Promise.resolve(); + } + + reclaimStale(maxAgeMs: number): Promise { + const now = this.now(); + let count = 0; + for (const entry of this.entries.values()) { + if (entry.claimedBy === undefined || entry.claimedAt === undefined) { + continue; + } + const age = now - entry.claimedAt; + if (age >= maxAgeMs) { + entry.claimedBy = undefined; + entry.claimedAt = undefined; + entry.ttlMs = undefined; + count++; + } + } + return Promise.resolve(count); + } + + depth(nodeName: string): Promise { + let count = 0; + for (const entry of this.entries.values()) { + if (entry.nodeName === nodeName) count++; + } + return Promise.resolve(count); + } +} diff --git a/packages/core/src/adapters/leader/local-leader-elector.ts b/packages/core/src/adapters/leader/local-leader-elector.ts new file mode 100644 index 0000000..6742755 --- /dev/null +++ b/packages/core/src/adapters/leader/local-leader-elector.ts @@ -0,0 +1,25 @@ +/** + * LocalLeaderElector — single-process leader (P45-T5). + * Acquire always succeeds; release flips state. Use for single-replica + * deployments or when no shared store is available. + */ + +import type { ILeaderElector } from "../../domain/ports/leader-elector.port.js"; + +export class LocalLeaderElector implements ILeaderElector { + private readonly held = new Set(); + + acquire(lockName: string): Promise { + this.held.add(lockName); + return Promise.resolve(true); + } + + release(lockName: string): Promise { + this.held.delete(lockName); + return Promise.resolve(); + } + + isLeader(lockName: string): boolean { + return this.held.has(lockName); + } +} diff --git a/packages/core/src/control-plane/leader-gated-loop.ts b/packages/core/src/control-plane/leader-gated-loop.ts new file mode 100644 index 0000000..d451f35 --- /dev/null +++ b/packages/core/src/control-plane/leader-gated-loop.ts @@ -0,0 +1,47 @@ +/** + * runWhenLeader — wraps a singleton interval loop in leader election (P45-T5). + * + * Each tick: try to acquire the lock if we don't already hold it; if we + * become (or remain) leader, run `body`. Otherwise skip and let another + * replica do the work. The released-lock case is automatic on process + * exit (Postgres advisory locks live with the session) — failover is + * picked up at the next tick. + */ + +import type { ILeaderElector } from "../domain/ports/leader-elector.port.js"; + +export function runWhenLeader( + elector: ILeaderElector, + lockName: string, + body: () => Promise, + intervalMs: number, +): () => void { + let stopped = false; + + const tick = async (): Promise => { + if (stopped) return; + let leader = elector.isLeader(lockName); + if (!leader) { + leader = await elector.acquire(lockName); + } + if (!leader) return; + try { + await body(); + } catch { + // Swallow — the interval must keep running so a transient + // failure doesn't permanently disable the singleton loop. + } + }; + + const handle = setInterval(() => { + void tick(); + }, intervalMs); + + return () => { + stopped = true; + clearInterval(handle); + // Best-effort release; if the elector has not held the lock this + // is a no-op. + void elector.release(lockName); + }; +} diff --git a/packages/core/src/control-plane/pipeline-controller.ts b/packages/core/src/control-plane/pipeline-controller.ts index 320d73d..5d88052 100644 --- a/packages/core/src/control-plane/pipeline-controller.ts +++ b/packages/core/src/control-plane/pipeline-controller.ts @@ -393,7 +393,7 @@ export class PipelineController { let selectedNode: (typeof nodePool)[0] | null = null; if (agentDef && nodePool.length > 0) { try { - selectedNode = this.scheduler.schedule(agentDef, nodePool); + selectedNode = await this.scheduler.schedule(agentDef, nodePool); } catch { // No node satisfies requirements — fall back to local } diff --git a/packages/core/src/control-plane/scheduler.ts b/packages/core/src/control-plane/scheduler.ts index 0a9b8f6..362a991 100644 --- a/packages/core/src/control-plane/scheduler.ts +++ b/packages/core/src/control-plane/scheduler.ts @@ -1,13 +1,18 @@ /** - * LocalAgentScheduler — schedules agents to local nodes. - * Respects nodeAffinity requirements and maxConcurrentRuns limits. - * Future phases will add remote node scheduling. + * LocalAgentScheduler — schedules agents to nodes (P45-T6). + * + * Counts are read via an injected IActiveRunCounter so the scheduler is + * stateless across control-plane replicas. The default in-memory counter + * preserves single-process semantics; the platform supplies a DB-backed + * counter that queries `agent_runs` so two replicas never disagree about + * a node's load. */ import type { AgentDefinitionYaml, NodeDefinitionYaml, } from "../definitions/parser.js"; +import type { IActiveRunCounter } from "../domain/ports/active-run-counter.port.js"; export interface INodeRegistry { get(name: string): { status: string } | undefined; @@ -19,33 +24,49 @@ export interface IAgentScheduler { schedule( agent: AgentDefinitionYaml, nodePool: NodeDefinitionYaml[], - ): NodeDefinitionYaml; + ): Promise; recordRunStarted(nodeName: string): void; recordRunCompleted(nodeName: string): void; - getActiveRunCount(nodeName: string): number; + getActiveRunCount(nodeName: string): Promise; +} + +export interface LocalSchedulerOptions { + /** + * Counter for active runs. If omitted, an in-process counter backs the + * historical in-memory Map. Multi-replica deployments + * inject a DB-backed counter so all replicas share one truth. + */ + counter?: IActiveRunCounter; } export class LocalAgentScheduler implements IAgentScheduler { - private readonly activeRuns = new Map(); private readonly registry?: INodeRegistry; + private readonly counter: IActiveRunCounter; - constructor(registry?: INodeRegistry) { + constructor(registry?: INodeRegistry, opts: LocalSchedulerOptions = {}) { this.registry = registry; + this.counter = opts.counter ?? new InMemoryActiveRunCounter(); } - schedule( + async schedule( agent: AgentDefinitionYaml, nodePool: NodeDefinitionYaml[], - ): NodeDefinitionYaml { + ): Promise { const required = agent.spec.nodeAffinity?.required?.map((r) => r.capability) ?? []; const preferred = agent.spec.nodeAffinity?.preferred?.map((p) => p.capability) ?? []; - const candidates = nodePool.filter((node) => { + // Read every candidate's load from the counter — fresh on each call + // so a peer replica's recent dispatch is reflected immediately. + const counts = await Promise.all( + nodePool.map((n) => this.counter.count(n.metadata.name)), + ); + + const candidates = nodePool.filter((node, i) => { const caps = node.spec.capabilities; const maxRuns = node.spec.resources?.maxConcurrentRuns ?? Infinity; - const active = this.getActiveRunCount(node.metadata.name); + const active = counts[i]; const registration = this.registry?.get(node.metadata.name); const isOffline = registration?.status === "offline"; return ( @@ -69,23 +90,44 @@ export class LocalAgentScheduler implements IAgentScheduler { })); scored.sort((a, b) => b.score - a.score); - const selected = scored[0].node; - - return selected; + return scored[0].node; } recordRunStarted(nodeName: string): void { - this.activeRuns.set(nodeName, (this.activeRuns.get(nodeName) ?? 0) + 1); + void this.counter.recordStarted(nodeName); this.registry?.recordRunStarted(nodeName); } recordRunCompleted(nodeName: string): void { - const current = this.activeRuns.get(nodeName) ?? 0; - this.activeRuns.set(nodeName, Math.max(0, current - 1)); + void this.counter.recordCompleted(nodeName); this.registry?.recordRunCompleted(nodeName); } - getActiveRunCount(nodeName: string): number { - return this.activeRuns.get(nodeName) ?? 0; + getActiveRunCount(nodeName: string): Promise { + return this.counter.count(nodeName); + } +} + +/** + * In-process counter — preserves the historical Map + * behaviour for callers that don't pass a counter explicitly. Not safe + * across processes. + */ +export class InMemoryActiveRunCounter implements IActiveRunCounter { + private readonly counts = new Map(); + + count(nodeName: string): Promise { + return Promise.resolve(this.counts.get(nodeName) ?? 0); + } + + recordStarted(nodeName: string): Promise { + this.counts.set(nodeName, (this.counts.get(nodeName) ?? 0) + 1); + return Promise.resolve(); + } + + recordCompleted(nodeName: string): Promise { + const current = this.counts.get(nodeName) ?? 0; + this.counts.set(nodeName, Math.max(0, current - 1)); + return Promise.resolve(); } } diff --git a/packages/core/src/domain/ports/active-run-counter.port.ts b/packages/core/src/domain/ports/active-run-counter.port.ts new file mode 100644 index 0000000..1f1ba92 --- /dev/null +++ b/packages/core/src/domain/ports/active-run-counter.port.ts @@ -0,0 +1,21 @@ +/** + * IActiveRunCounter — pluggable per-node active-run counter (P45-T6). + * + * The scheduler reads counts via this port before every dispatch decision. + * + * - MemoryActiveRunCounter: single-process default, backs the in-memory map. + * - DbActiveRunCounter: queries `agent_runs WHERE status IN ('running','scheduled') + * GROUP BY node_name` so multiple control-plane replicas see the same + * truth and can never over-schedule a node beyond its maxConcurrentRuns. + */ + +export interface IActiveRunCounter { + /** Active-run count for `nodeName`. Always async to allow DB-backed impls. */ + count(nodeName: string): Promise; + + /** Record that a run has started — may be a no-op for stateless impls. */ + recordStarted(nodeName: string): Promise; + + /** Record that a run has completed — may be a no-op for stateless impls. */ + recordCompleted(nodeName: string): Promise; +} diff --git a/packages/core/src/domain/ports/job-queue.port.ts b/packages/core/src/domain/ports/job-queue.port.ts new file mode 100644 index 0000000..163a34b --- /dev/null +++ b/packages/core/src/domain/ports/job-queue.port.ts @@ -0,0 +1,52 @@ +/** + * IJobQueue — pluggable dispatch queue for agent jobs across control-plane + * replicas (P45-T4). + * + * The single-process default is the in-memory adapter (zero new infra). + * The Postgres adapter (P45-T4) uses `SELECT ... FOR UPDATE SKIP LOCKED` + * so multiple control-plane replicas can claim jobs without lost dispatches + * or duplicate work. + * + * Lifecycle: + * 1. Control plane calls enqueue(job, nodeName) when scheduler picks a node. + * 2. Worker polls claim(nodeName, { limit, ttlMs }); receives 0..N jobs. + * 3. On result POST control plane calls complete(runId) (or fail). + * 4. A periodic reclaimStale(maxAgeMs) sweeps abandoned claims back to + * pending so a different worker can pick them up. + */ + +import type { AgentJob } from "./agent-executor.port.js"; + +export interface ClaimOptions { + /** Maximum number of jobs to claim in this call. Default 1. */ + limit?: number; + /** + * Claim TTL — how long the worker is trusted to hold the job before a + * sweep can reclaim it. Default 5 minutes. + */ + ttlMs?: number; +} + +export interface IJobQueue { + /** Push a job onto the queue for `nodeName` to pick up. */ + enqueue(job: AgentJob, nodeName: string): Promise; + + /** + * Atomically claim up to `limit` jobs for the named worker. Idempotent + * for repeat callers — already-claimed jobs are skipped, never returned + * twice. + */ + claim(nodeName: string, opts?: ClaimOptions): Promise; + + /** Mark a claimed job complete and remove it from the queue. */ + complete(runId: string): Promise; + + /** + * Release claims older than `maxAgeMs` so a different worker can retry. + * Returns the count of reclaimed jobs. + */ + reclaimStale(maxAgeMs: number): Promise; + + /** Best-effort introspection — number of pending+claimed jobs for `nodeName`. */ + depth(nodeName: string): Promise; +} diff --git a/packages/core/src/domain/ports/leader-elector.port.ts b/packages/core/src/domain/ports/leader-elector.port.ts new file mode 100644 index 0000000..4ef4824 --- /dev/null +++ b/packages/core/src/domain/ports/leader-elector.port.ts @@ -0,0 +1,28 @@ +/** + * ILeaderElector — pluggable leader election for control-plane singleton + * loops (P45-T5). + * + * Today PipelineRecoveryService and AgentScheduler each run their interval + * loops on every replica → duplicate reconciler races, duplicate + * scheduling decisions. The Postgres adapter uses session-scoped + * `pg_advisory_lock()` so only one replica becomes leader; the lock + * is released automatically when the holder process exits, so failover is + * instant. + * + * The `LocalLeaderElector` is the single-process default — always leader. + */ + +export interface ILeaderElector { + /** + * Attempt to acquire the lock for `lockName`. Returns true if this + * caller is now the leader. Calling `acquire` while already leader is + * a no-op that returns true. + */ + acquire(lockName: string): Promise; + + /** Release the lock. Idempotent. */ + release(lockName: string): Promise; + + /** Is this elector currently the leader for `lockName`? */ + isLeader(lockName: string): boolean; +} diff --git a/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts new file mode 100644 index 0000000..cc530d8 --- /dev/null +++ b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts @@ -0,0 +1,99 @@ +/** + * Tests for InMemoryJobQueue — single-process default for IJobQueue (P45-T4). + * Mirrors the contract that PostgresJobQueue must also satisfy. + */ +import { describe, expect, it } from "vitest"; +import { InMemoryJobQueue } from "../../../src/adapters/jobs/in-memory-job-queue.js"; +import type { AgentJob } from "../../../src/domain/ports/agent-executor.port.js"; + +function job(runId: string): AgentJob { + return { + runId, + agentId: "analyst", + agentDefinition: { metadata: { name: "analyst" }, spec: {} }, + model: { + provider: "anthropic", + name: "claude-sonnet", + maxTokens: 4096, + }, + workdir: "/tmp/work", + outputDir: "/tmp/out", + } as unknown as AgentJob; +} + +describe("InMemoryJobQueue", () => { + it("returns empty when nothing is enqueued", async () => { + const q = new InMemoryJobQueue(); + const claimed = await q.claim("worker-a"); + expect(claimed).toEqual([]); + }); + + it("returns enqueued job to a single claimer", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + const claimed = await q.claim("worker-a"); + expect(claimed.map((j) => j.runId)).toEqual(["r1"]); + }); + + it("does not return the same job twice (claim is exclusive)", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + const first = await q.claim("worker-a"); + const second = await q.claim("worker-a"); + expect(first.map((j) => j.runId)).toEqual(["r1"]); + expect(second).toEqual([]); + }); + + it("respects limit", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-a"); + await q.enqueue(job("r3"), "worker-a"); + const claimed = await q.claim("worker-a", { limit: 2 }); + expect(claimed.map((j) => j.runId)).toEqual(["r1", "r2"]); + }); + + it("isolates queues per nodeName", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-b"); + const aJobs = await q.claim("worker-a"); + expect(aJobs.map((j) => j.runId)).toEqual(["r1"]); + const bJobs = await q.claim("worker-b"); + expect(bJobs.map((j) => j.runId)).toEqual(["r2"]); + }); + + it("complete removes the job entirely", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a"); + await q.complete("r1"); + expect(await q.depth("worker-a")).toBe(0); + // reclaimStale should not return the completed job + const reclaimed = await q.reclaimStale(0); + expect(reclaimed).toBe(0); + }); + + it("reclaimStale returns claimed-but-stale jobs to the pool", async () => { + const q = new InMemoryJobQueue({ now: () => 1_000 }); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a", { ttlMs: 100 }); + // advance virtual clock past ttl + (q as unknown as { _setNow: (n: number) => void })._setNow(2_000); + const reclaimed = await q.reclaimStale(100); + expect(reclaimed).toBe(1); + const reclaimedJobs = await q.claim("worker-a"); + expect(reclaimedJobs.map((j) => j.runId)).toEqual(["r1"]); + }); + + it("depth reports both pending and claimed jobs", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-a"); + expect(await q.depth("worker-a")).toBe(2); + await q.claim("worker-a", { limit: 1 }); + expect(await q.depth("worker-a")).toBe(2); + await q.complete("r1"); + expect(await q.depth("worker-a")).toBe(1); + }); +}); diff --git a/packages/core/tests/adapters/leader/local-leader-elector.test.ts b/packages/core/tests/adapters/leader/local-leader-elector.test.ts new file mode 100644 index 0000000..9c46f98 --- /dev/null +++ b/packages/core/tests/adapters/leader/local-leader-elector.test.ts @@ -0,0 +1,38 @@ +/** + * Tests for LocalLeaderElector — single-process default (P45-T5). + * Always leader. Used in single-replica or non-Postgres deployments. + */ +import { describe, expect, it } from "vitest"; +import { LocalLeaderElector } from "../../../src/adapters/leader/local-leader-elector.js"; + +describe("LocalLeaderElector", () => { + it("acquire always returns true", async () => { + const e = new LocalLeaderElector(); + expect(await e.acquire("foo")).toBe(true); + }); + + it("isLeader is true after acquire", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + expect(e.isLeader("foo")).toBe(true); + }); + + it("isLeader is false before any acquire call", () => { + const e = new LocalLeaderElector(); + expect(e.isLeader("foo")).toBe(false); + }); + + it("release flips isLeader back to false", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + await e.release("foo"); + expect(e.isLeader("foo")).toBe(false); + }); + + it("isolates locks by name", async () => { + const e = new LocalLeaderElector(); + await e.acquire("a"); + expect(e.isLeader("a")).toBe(true); + expect(e.isLeader("b")).toBe(false); + }); +}); diff --git a/packages/core/tests/control-plane/leader-gated-loop.test.ts b/packages/core/tests/control-plane/leader-gated-loop.test.ts new file mode 100644 index 0000000..62a2b81 --- /dev/null +++ b/packages/core/tests/control-plane/leader-gated-loop.test.ts @@ -0,0 +1,95 @@ +/** + * Tests for runWhenLeader — the wrapper that wires ILeaderElector into the + * existing setInterval loops (PipelineRecovery, AgentScheduler) so only one + * replica executes the body per tick (P45-T5). + */ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { LocalLeaderElector } from "../../src/adapters/leader/local-leader-elector.js"; +import { runWhenLeader } from "../../src/control-plane/leader-gated-loop.js"; +import type { ILeaderElector } from "../../src/domain/ports/leader-elector.port.js"; + +describe("runWhenLeader", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("runs the body on each interval when leader", async () => { + const elector = new LocalLeaderElector(); + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + expect(body).toHaveBeenCalledTimes(3); + stop(); + }); + + it("skips the body when not leader", async () => { + const elector: ILeaderElector = { + acquire: vi.fn().mockResolvedValue(false), + release: vi.fn().mockResolvedValue(undefined), + isLeader: vi.fn().mockReturnValue(false), + }; + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + + await vi.advanceTimersByTimeAsync(3000); + expect(body).not.toHaveBeenCalled(); + stop(); + }); + + it("attempts to re-acquire each tick (so a downed leader is replaced)", async () => { + const acquire = vi.fn().mockResolvedValue(false); + const elector: ILeaderElector = { + acquire, + release: vi.fn().mockResolvedValue(undefined), + isLeader: vi.fn().mockReturnValue(false), + }; + const stop = runWhenLeader(elector, "lock-a", async () => {}, 1000); + await vi.advanceTimersByTimeAsync(3000); + expect(acquire).toHaveBeenCalledTimes(3); + stop(); + }); + + it("stops triggering body after stop() is called", async () => { + const elector = new LocalLeaderElector(); + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + await vi.advanceTimersByTimeAsync(1000); + stop(); + await vi.advanceTimersByTimeAsync(5000); + expect(body).toHaveBeenCalledTimes(1); + }); + + it("releases the lock when stop() is called", async () => { + const release = vi.fn().mockResolvedValue(undefined); + const elector: ILeaderElector = { + acquire: vi.fn().mockResolvedValue(true), + release, + isLeader: vi.fn().mockReturnValue(true), + }; + const stop = runWhenLeader(elector, "lock-a", async () => {}, 1000); + await vi.advanceTimersByTimeAsync(1000); + stop(); + // release runs in background after stop(); drain microtasks + await vi.runAllTimersAsync(); + expect(release).toHaveBeenCalledWith("lock-a"); + }); + + it("swallows body errors so the loop keeps running", async () => { + const elector = new LocalLeaderElector(); + const body = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + expect(body).toHaveBeenCalledTimes(2); + stop(); + }); +}); diff --git a/packages/core/tests/control-plane/scheduler.test.ts b/packages/core/tests/control-plane/scheduler.test.ts index e92e136..2ed0da3 100644 --- a/packages/core/tests/control-plane/scheduler.test.ts +++ b/packages/core/tests/control-plane/scheduler.test.ts @@ -47,74 +47,74 @@ const LOCAL_NODE = makeNode("local", ["llm-access", "git"]); const DOCKER_NODE = makeNode("local-docker", ["llm-access", "git", "docker"]); describe("LocalAgentScheduler", () => { - it("schedules an agent with no required capabilities to any node", () => { + it("schedules an agent with no required capabilities to any node", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); - const node = scheduler.schedule(agent, [LOCAL_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE]); expect(node.metadata.name).toBe("local"); }); - it("schedules agent requiring docker to node with docker", () => { + it("schedules agent requiring docker to node with docker", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("developer", ["docker"]); - const node = scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); expect(node.metadata.name).toBe("local-docker"); }); - it("throws when no node satisfies required capabilities", () => { + it("throws when no node satisfies required capabilities", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("developer", ["docker"]); - expect(() => scheduler.schedule(agent, [LOCAL_NODE])).toThrow( + await expect(scheduler.schedule(agent, [LOCAL_NODE])).rejects.toThrow( /no available node/i, ); }); - it("throws when all nodes are at capacity", () => { + it("throws when all nodes are at capacity", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); const fullNode = makeNode("local", ["llm-access"], 2); scheduler.recordRunStarted("local"); scheduler.recordRunStarted("local"); - expect(() => scheduler.schedule(agent, [fullNode])).toThrow( + await expect(scheduler.schedule(agent, [fullNode])).rejects.toThrow( /no available node/i, ); }); - it("respects maxConcurrentRuns and picks node with capacity", () => { + it("respects maxConcurrentRuns and picks node with capacity", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); const full = makeNode("busy", ["llm-access"], 1); const free = makeNode("free", ["llm-access"], 2); scheduler.recordRunStarted("busy"); - const node = scheduler.schedule(agent, [full, free]); + const node = await scheduler.schedule(agent, [full, free]); expect(node.metadata.name).toBe("free"); }); - it("tracks active runs and decrements on completion", () => { + it("tracks active runs and decrements on completion", async () => { const scheduler = new LocalAgentScheduler(); scheduler.recordRunStarted("local"); scheduler.recordRunStarted("local"); - expect(scheduler.getActiveRunCount("local")).toBe(2); + expect(await scheduler.getActiveRunCount("local")).toBe(2); scheduler.recordRunCompleted("local"); - expect(scheduler.getActiveRunCount("local")).toBe(1); + expect(await scheduler.getActiveRunCount("local")).toBe(1); }); - it("prefers node matching preferred capabilities", () => { + it("prefers node matching preferred capabilities", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst", [], ["docker"]); - const node = scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); expect(node.metadata.name).toBe("local-docker"); }); }); describe("LocalAgentScheduler without registry", () => { - it("schedules normally when registry not provided", () => { + it("schedules normally when registry not provided", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); - const node = scheduler.schedule(agent, [LOCAL_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE]); expect(node.metadata.name).toBe("local"); }); }); diff --git a/packages/core/tests/control-plane/stateless-scheduler.test.ts b/packages/core/tests/control-plane/stateless-scheduler.test.ts new file mode 100644 index 0000000..5b928a5 --- /dev/null +++ b/packages/core/tests/control-plane/stateless-scheduler.test.ts @@ -0,0 +1,104 @@ +/** + * Tests for the stateless scheduling path (P45-T6). + * + * Scheduler.schedule consults an IActiveRunCounter before each decision so + * two control-plane replicas reading the same DB never disagree about the + * load on a node. + */ +import { describe, expect, it, vi } from "vitest"; +import { LocalAgentScheduler } from "../../src/control-plane/scheduler.js"; +import type { + AgentDefinitionYaml, + NodeDefinitionYaml, +} from "../../src/definitions/parser.js"; +import type { IActiveRunCounter } from "../../src/domain/ports/active-run-counter.port.js"; + +function makeNode( + name: string, + capabilities: string[], + maxConcurrentRuns = 2, +): NodeDefinitionYaml { + return { + apiVersion: "agentforge/v1", + kind: "NodeDefinition", + metadata: { name, type: "local" }, + spec: { + connection: { type: "local" }, + capabilities, + resources: { maxConcurrentRuns }, + }, + }; +} + +function makeAgent( + name: string, + requiredCaps: string[] = [], + preferredCaps: string[] = [], +): AgentDefinitionYaml { + return { + apiVersion: "agentforge/v1", + kind: "AgentDefinition", + metadata: { name, displayName: name, phase: "1" }, + spec: { + executor: "pi-ai", + systemPrompt: { file: `prompts/${name}.md` }, + outputs: [], + nodeAffinity: { + required: requiredCaps.map((c) => ({ capability: c })), + preferred: preferredCaps.map((c) => ({ capability: c })), + }, + }, + }; +} + +function fakeCounter(counts: Record): IActiveRunCounter { + return { + count: vi.fn(async (name: string) => counts[name] ?? 0), + recordStarted: vi.fn().mockResolvedValue(undefined), + recordCompleted: vi.fn().mockResolvedValue(undefined), + }; +} + +describe("LocalAgentScheduler stateless path (P45-T6)", () => { + it("schedule queries the counter (not the internal map) before each decision", async () => { + const counter = fakeCounter({ "node-a": 0 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const node = makeNode("node-a", ["llm-access"]); + const picked = await scheduler.schedule(makeAgent("a"), [node]); + expect(picked.metadata.name).toBe("node-a"); + expect(counter.count).toHaveBeenCalledWith("node-a"); + }); + + it("skips a node when the counter says it has reached maxConcurrentRuns", async () => { + // node-a is at capacity (2 of 2); node-b is idle (0 of 2). + const counter = fakeCounter({ "node-a": 2, "node-b": 0 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + const nodeB = makeNode("node-b", ["llm-access"], 2); + const picked = await scheduler.schedule(makeAgent("a"), [nodeA, nodeB]); + expect(picked.metadata.name).toBe("node-b"); + }); + + it("throws when every candidate is at capacity according to the counter", async () => { + const counter = fakeCounter({ "node-a": 2 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + await expect(scheduler.schedule(makeAgent("a"), [nodeA])).rejects.toThrow( + /no available node/i, + ); + }); + + it("re-reads counts on each call (no internal staleness across replicas)", async () => { + const counts: Record = { "node-a": 0 }; + const counter = fakeCounter(counts); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const node = makeNode("node-a", ["llm-access"], 2); + await scheduler.schedule(makeAgent("a"), [node]); + // Simulate another replica claiming a slot on node-a. + counts["node-a"] = 2; + await expect(scheduler.schedule(makeAgent("a"), [node])).rejects.toThrow( + /no available node/i, + ); + expect(counter.count).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/platform/src/adapters/events/event-bus-factory.ts b/packages/platform/src/adapters/events/event-bus-factory.ts new file mode 100644 index 0000000..93e99a8 --- /dev/null +++ b/packages/platform/src/adapters/events/event-bus-factory.ts @@ -0,0 +1,45 @@ +/** + * Event-bus selector (P45-T3). + * + * Reads AGENTFORGE_EVENT_BUS at boot: + * - "memory" / unset → InMemoryEventBus (single-process default) + * - "postgres" → PostgresEventBus over LISTEN/NOTIFY when a URL + * is available; otherwise warn and fall back to + * memory rather than crash on a misconfigured env. + * + * The factory is intentionally narrow: callers configure once at startup + * and pass the resulting bus into PipelineController, GateController, + * dashboard SSE, etc. + */ + +import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; +import type { IEventBus } from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import { PostgresEventBus } from "./postgres-event-bus.js"; + +export interface EventBusFactoryOptions { + postgresUrl?: string; + channel?: string; + warn?: (msg: string) => void; +} + +export function buildEventBus(opts: EventBusFactoryOptions = {}): IEventBus { + const choice = (process.env.AGENTFORGE_EVENT_BUS ?? "memory").toLowerCase(); + const warn = opts.warn ?? ((msg) => console.warn(msg)); + + if (choice === "memory") return new InMemoryEventBus(); + + if (choice === "postgres") { + if (!opts.postgresUrl) { + warn( + "AGENTFORGE_EVENT_BUS=postgres but no Postgres URL is configured — " + + "falling back to InMemoryEventBus. Set AGENTFORGE_POSTGRES_URL to enable cross-replica events.", + ); + return new InMemoryEventBus(); + } + return new PostgresEventBus(opts.postgresUrl, opts.channel); + } + + throw new Error( + `Unknown AGENTFORGE_EVENT_BUS=${choice} — supported values: memory, postgres`, + ); +} diff --git a/packages/platform/src/adapters/events/postgres-event-bus.ts b/packages/platform/src/adapters/events/postgres-event-bus.ts new file mode 100644 index 0000000..27f3d06 --- /dev/null +++ b/packages/platform/src/adapters/events/postgres-event-bus.ts @@ -0,0 +1,92 @@ +/** + * PostgresEventBus — multi-replica IEventBus over LISTEN/NOTIFY (P45-T3). + * + * Each control-plane replica opens a dedicated pg client and `LISTEN`s on + * a shared channel. `emit` issues `NOTIFY` so every replica's subscribers + * (dashboard SSE, reconcilers, etc.) see the same event stream. Local + * subscribers also receive the event synchronously so the emitting + * process's UI does not wait for the round-trip through Postgres. + * + * Payloads exceed Postgres' 8000-byte NOTIFY limit only for unusual + * StatusUpdate shapes — at that point we drop the cross-replica + * delivery rather than fail the emit. + */ + +import type { + IEventBus, + PipelineEvent, +} from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import pg from "pg"; + +const NOTIFY_PAYLOAD_LIMIT = 7900; + +export class PostgresEventBus implements IEventBus { + private readonly client: pg.Client; + private readonly listeners = new Set<(event: PipelineEvent) => void>(); + + constructor( + connectionString: string, + private readonly channel: string = "agentforge", + ) { + this.client = new pg.Client({ connectionString }); + } + + async start(): Promise { + await this.client.connect(); + this.client.on("notification", (msg) => { + if (!msg.payload) return; + let event: PipelineEvent; + try { + event = JSON.parse(msg.payload) as PipelineEvent; + } catch { + return; + } + this.deliver(event); + }); + // Channel names come from config / code, not user input. Quoting them + // would force Postgres to treat them as case-sensitive identifiers + // which is not what we want here. + await this.client.query(`LISTEN ${this.channel}`); + } + + emit(event: PipelineEvent): void { + // Notify peers asynchronously; deliver locally synchronously so a + // dashboard rendered in the emitting process sees the event without + // waiting for the round-trip. + this.deliver(event); + const payload = JSON.stringify(event); + if (payload.length > NOTIFY_PAYLOAD_LIMIT) return; + void this.client + .query("SELECT pg_notify($1, $2)", [this.channel, payload]) + .catch(() => { + // Best-effort: if the LISTEN connection drops, peers won't see + // the event. Local subscribers were already notified above. + }); + } + + subscribe(listener: (event: PipelineEvent) => void): () => void { + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } + + private deliver(event: PipelineEvent): void { + for (const listener of this.listeners) { + try { + listener(event); + } catch { + // Resilient: do not let one bad subscriber break the rest. + } + } + } + + async close(): Promise { + try { + await this.client.query(`UNLISTEN ${this.channel}`); + } catch { + // Connection may already be closed. + } + await this.client.end(); + } +} diff --git a/packages/platform/src/adapters/jobs/postgres-job-queue.ts b/packages/platform/src/adapters/jobs/postgres-job-queue.ts new file mode 100644 index 0000000..9fa3c4d --- /dev/null +++ b/packages/platform/src/adapters/jobs/postgres-job-queue.ts @@ -0,0 +1,86 @@ +/** + * PostgresJobQueue — multi-replica IJobQueue (P45-T4). + * + * Uses `SELECT ... FOR UPDATE SKIP LOCKED` so any number of control-plane + * replicas can call `claim()` concurrently without lost dispatches or + * duplicate work. Migration `003-job-queue.sql` provisions the table. + */ + +import type { AgentJob } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; +import type { + ClaimOptions, + IJobQueue, +} from "@mandarnilange/agentforge-core/domain/ports/job-queue.port.js"; +import pg from "pg"; + +export class PostgresJobQueue implements IJobQueue { + private readonly pool: pg.Pool; + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async enqueue(job: AgentJob, nodeName: string): Promise { + await this.pool.query( + `INSERT INTO agent_jobs (run_id, node_name, payload, enqueued_at) + VALUES ($1, $2, $3, now()) + ON CONFLICT (run_id) DO NOTHING`, + [job.runId, nodeName, JSON.stringify(job)], + ); + } + + async claim(nodeName: string, opts: ClaimOptions = {}): Promise { + const limit = opts.limit ?? 1; + const ttlMs = opts.ttlMs ?? 5 * 60 * 1000; + // Single statement: pick eligible rows with FOR UPDATE SKIP LOCKED + // (so a concurrent replica can't grab the same row), then UPDATE + // to mark them claimed and RETURN the payload. CTE keeps it atomic + // inside one query — no transaction round-trip needed. + const { rows } = await this.pool.query( + `WITH eligible AS ( + SELECT run_id FROM agent_jobs + WHERE node_name = $1 AND claimed_by IS NULL + ORDER BY enqueued_at + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE agent_jobs SET + claimed_by = $1, + claimed_at = now(), + claim_ttl_ms = $3 + WHERE run_id IN (SELECT run_id FROM eligible) + RETURNING run_id, payload`, + [nodeName, limit, ttlMs], + ); + return rows.map((r) => JSON.parse(r.payload) as AgentJob); + } + + async complete(runId: string): Promise { + await this.pool.query("DELETE FROM agent_jobs WHERE run_id = $1", [runId]); + } + + async reclaimStale(maxAgeMs: number): Promise { + const { rowCount } = await this.pool.query( + `UPDATE agent_jobs SET + claimed_by = NULL, + claimed_at = NULL, + claim_ttl_ms = NULL + WHERE claimed_at IS NOT NULL + AND (EXTRACT(EPOCH FROM (now() - claimed_at)) * 1000) >= $1`, + [maxAgeMs], + ); + return rowCount ?? 0; + } + + async depth(nodeName: string): Promise { + const { rows } = await this.pool.query( + "SELECT count(*)::int AS count FROM agent_jobs WHERE node_name = $1", + [nodeName], + ); + return Number(rows[0]?.count ?? 0); + } + + async close(): Promise { + await this.pool.end(); + } +} diff --git a/packages/platform/src/adapters/leader/postgres-leader-elector.ts b/packages/platform/src/adapters/leader/postgres-leader-elector.ts new file mode 100644 index 0000000..a52d12f --- /dev/null +++ b/packages/platform/src/adapters/leader/postgres-leader-elector.ts @@ -0,0 +1,92 @@ +/** + * PostgresLeaderElector — pg_advisory_lock-based singleton election (P45-T5). + * + * Holds a dedicated pg client (not a pool checkout) so the session lives + * for the lifetime of the leader. When the process exits or the session + * drops, Postgres auto-releases the lock and another replica's next + * `pg_try_advisory_lock` call returns true → instant failover. + * + * Lock names are hashed into a bigint so callers can use human-readable + * names ("agentforge-reconciler", "agentforge-scheduler") without picking + * magic numbers. + */ + +import { createHash } from "node:crypto"; +import type { ILeaderElector } from "@mandarnilange/agentforge-core/domain/ports/leader-elector.port.js"; +import pg from "pg"; + +interface LockHandle { + client: pg.PoolClient; +} + +export class PostgresLeaderElector implements ILeaderElector { + private readonly pool: pg.Pool; + private readonly held = new Map(); + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async acquire(lockName: string): Promise { + if (this.held.has(lockName)) return true; + const lockId = lockNameToBigInt(lockName); + const client = (await this.pool.connect()) as pg.PoolClient; + try { + const { rows } = await client.query( + "SELECT pg_try_advisory_lock($1) AS pg_try_advisory_lock", + [lockId], + ); + const acquired = rows[0]?.pg_try_advisory_lock === true; + if (!acquired) { + client.release(); + return false; + } + this.held.set(lockName, { client }); + return true; + } catch (err) { + client.release(); + throw err; + } + } + + async release(lockName: string): Promise { + const handle = this.held.get(lockName); + if (!handle) return; + const lockId = lockNameToBigInt(lockName); + try { + await handle.client.query( + "SELECT pg_advisory_unlock($1) AS pg_advisory_unlock", + [lockId], + ); + } finally { + handle.client.release(); + this.held.delete(lockName); + } + } + + isLeader(lockName: string): boolean { + return this.held.has(lockName); + } + + async close(): Promise { + for (const name of [...this.held.keys()]) { + await this.release(name); + } + await this.pool.end(); + } +} + +/** + * Hash a lock name to a signed 64-bit int. Postgres advisory locks take a + * bigint; sha256 truncated to 8 bytes is collision-resistant for the small + * fixed set of agentforge lock names. Returns a string so the pg driver + * passes it as a numeric parameter without precision loss. + */ +function lockNameToBigInt(name: string): string { + const digest = createHash("sha256").update(name).digest(); + // Take 8 bytes, force the high bit to 0 to keep it positive (Postgres + // bigint range is signed but our values are well within int63). + const buf = digest.subarray(0, 8); + buf[0] &= 0x7f; + return buf.readBigInt64BE(0).toString(); +} diff --git a/packages/platform/src/adapters/scheduling/db-active-run-counter.ts b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts new file mode 100644 index 0000000..aff3419 --- /dev/null +++ b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts @@ -0,0 +1,51 @@ +/** + * DbActiveRunCounter — multi-replica IActiveRunCounter (P45-T6). + * + * Queries `agent_runs` GROUP BY node_name on each call so any number of + * control-plane replicas see the same load truth. Backed by a partial + * index (`agent_runs(node_name, status) WHERE status IN ...`) provisioned + * by migration `004-active-run-index.sql` to keep the query cheap. + */ + +import type { IActiveRunCounter } from "@mandarnilange/agentforge-core/domain/ports/active-run-counter.port.js"; +import pg from "pg"; + +const ACTIVE_STATUSES = ["pending", "scheduled", "running"] as const; + +export class DbActiveRunCounter implements IActiveRunCounter { + private readonly pool: pg.Pool; + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async count(nodeName: string): Promise { + const { rows } = await this.pool.query( + `SELECT count(*)::int AS count + FROM agent_runs + WHERE node_name = $1 + AND status IN ('pending','scheduled','running')`, + [nodeName], + ); + return Number(rows[0]?.count ?? 0); + } + + // recordStarted/recordCompleted are intentionally no-ops: createAgentRun + // already inserts a row with status='pending', and updateAgentRun flips + // it through scheduled→running→succeeded/failed. The DB is the source of + // truth — the in-memory map this adapter replaces is gone. + recordStarted(_nodeName: string): Promise { + return Promise.resolve(); + } + + recordCompleted(_nodeName: string): Promise { + return Promise.resolve(); + } + + async close(): Promise { + await this.pool.end(); + } + + /** Statuses considered "active" for load accounting. Exported for visibility. */ + static readonly ACTIVE_STATUSES: readonly string[] = ACTIVE_STATUSES; +} diff --git a/packages/platform/src/cli/commands/node-start.ts b/packages/platform/src/cli/commands/node-start.ts index 6af9cdb..3a56862 100644 --- a/packages/platform/src/cli/commands/node-start.ts +++ b/packages/platform/src/cli/commands/node-start.ts @@ -20,6 +20,10 @@ import type { } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; import chalk from "chalk"; import type { Command } from "commander"; +import { + checkDockerAvailability, + filterDockerCapability, +} from "../../nodes/docker-availability.js"; interface NodeStartOptions { controlPlaneUrl: string; @@ -206,9 +210,22 @@ export function registerNodeStartCommand(program: Command): void { .option("--host ", "HTTP server host", "0.0.0.0") .action(async (opts: NodeStartOptions) => { const nodeName = opts.name ?? `node-${process.pid}`; - const capabilities = (opts.capabilities ?? "llm-access") + const declaredCapabilities = (opts.capabilities ?? "llm-access") .split(",") .map((c) => c.trim()); + // Docker preflight (P40-T5): verify the daemon is actually reachable + // before we register a node that claims docker capability. On failure + // we strip "docker" from the effective list and log a warning so the + // scheduler does not route docker-required jobs to a node that would + // immediately fail at execute time. + const dockerOk = declaredCapabilities.includes("docker") + ? await checkDockerAvailability() + : true; + const capabilities = filterDockerCapability( + declaredCapabilities, + dockerOk, + (msg) => console.warn(chalk.yellow(msg)), + ); const maxConcurrentRuns = Number.parseInt( opts.maxConcurrentRuns ?? "3", 10, diff --git a/packages/platform/src/nodes/docker-availability.ts b/packages/platform/src/nodes/docker-availability.ts new file mode 100644 index 0000000..0dd8921 --- /dev/null +++ b/packages/platform/src/nodes/docker-availability.ts @@ -0,0 +1,87 @@ +/** + * Docker socket preflight (P40-T5). + * + * When a node advertises the "docker" capability we probe the local docker + * daemon at registration time. Unavailable → warn once and strip "docker" + * from the effective capability list so the node still registers (other + * capabilities remain) but the scheduler does not route docker-required + * jobs to a node that would immediately fail. + * + * The probe is injectable so callers can swap in a real dockerode ping or + * a unix-socket connect; default uses a lightweight `/var/run/docker.sock` + * (or DOCKER_HOST) connect attempt. + */ + +import { connect } from "node:net"; + +export interface DockerAvailabilityOptions { + /** Custom probe — returns true when docker is reachable. */ + probe?: () => Promise; + /** Override the unix socket path (default `/var/run/docker.sock`). */ + socketPath?: string; + /** Override DOCKER_HOST (e.g. tcp://host:port); takes priority over socket. */ + dockerHost?: string; + /** Probe timeout in ms (default 1500). */ + timeoutMs?: number; +} + +const DEFAULT_SOCKET = "/var/run/docker.sock"; +const DEFAULT_TIMEOUT_MS = 1500; + +export async function checkDockerAvailability( + opts: DockerAvailabilityOptions = {}, +): Promise { + const probe = opts.probe ?? defaultProbe(opts); + try { + return await probe(); + } catch { + return false; + } +} + +/** + * Removes "docker" from `capabilities` when the daemon is unreachable. + * Logs a single warning explaining the downgrade. Idempotent and safe when + * "docker" is not present. + */ +export function filterDockerCapability( + capabilities: readonly string[], + dockerAvailable: boolean, + warn: (msg: string) => void, +): string[] { + if (!capabilities.includes("docker")) return [...capabilities]; + if (dockerAvailable) return [...capabilities]; + + warn( + "docker capability declared but the docker daemon is unavailable — " + + "removing 'docker' from this node's effective capabilities. " + + "Set DOCKER_HOST or start the daemon to restore docker scheduling.", + ); + return capabilities.filter((c) => c !== "docker"); +} + +function defaultProbe(opts: DockerAvailabilityOptions): () => Promise { + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const dockerHost = opts.dockerHost ?? process.env.DOCKER_HOST; + const socketPath = opts.socketPath ?? DEFAULT_SOCKET; + + return () => + new Promise((resolve) => { + const tcpMatch = dockerHost?.match(/^tcp:\/\/([^:]+):(\d+)/); + const socket = tcpMatch + ? connect({ + host: tcpMatch[1], + port: Number(tcpMatch[2]), + timeout: timeoutMs, + }) + : connect({ path: socketPath, timeout: timeoutMs }); + + const cleanup = (ok: boolean) => { + socket.destroy(); + resolve(ok); + }; + socket.once("connect", () => cleanup(true)); + socket.once("error", () => cleanup(false)); + socket.once("timeout", () => cleanup(false)); + }); +} diff --git a/packages/platform/src/nodes/ssh-preflight.ts b/packages/platform/src/nodes/ssh-preflight.ts new file mode 100644 index 0000000..e0598f6 --- /dev/null +++ b/packages/platform/src/nodes/ssh-preflight.ts @@ -0,0 +1,55 @@ +/** + * SSH preflight validation (P40-T6). + * + * On platform startup, every node whose connection.type is "ssh" gets a + * lightweight TCP/SSH reach check. Unreachable nodes are marked offline in + * the registry and surfaced as a startup warning so operators learn about + * misconfigured hosts immediately rather than via a cryptic dispatch failure + * minutes later. + * + * The actual probe is delegated to the runtime's `ping()` (already + * implemented in SshNodeRuntime) so the policy here is just: who do we + * probe, what do we do with the result, and how do we report it. + */ + +import type { INodeRuntime } from "@mandarnilange/agentforge-core/domain/ports/node-runtime.port.js"; + +export interface SshPreflightOptions { + runtimes: readonly INodeRuntime[]; + warn: (msg: string) => void; + markOffline: (nodeName: string) => void; +} + +export interface SshPreflightResult { + name: string; + reachable: boolean; +} + +export async function validateSshNodesAtStartup( + opts: SshPreflightOptions, +): Promise { + const sshRuntimes = opts.runtimes.filter( + (r) => r.nodeDefinition.spec.connection?.type === "ssh", + ); + const checks = await Promise.all( + sshRuntimes.map(async (rt) => { + const name = rt.nodeDefinition.metadata.name; + const host = rt.nodeDefinition.spec.connection?.host ?? ""; + let reachable = false; + try { + reachable = await rt.ping(); + } catch { + reachable = false; + } + if (!reachable) { + opts.markOffline(name); + opts.warn( + `SSH node "${name}" at ${host} is unreachable — marking offline. ` + + "Verify host, port, key, and connection.user before scheduling.", + ); + } + return { name, reachable }; + }), + ); + return checks; +} diff --git a/packages/platform/src/platform-cli.ts b/packages/platform/src/platform-cli.ts index e60d915..e7d6b1b 100644 --- a/packages/platform/src/platform-cli.ts +++ b/packages/platform/src/platform-cli.ts @@ -8,7 +8,6 @@ import { mkdirSync, readFileSync } from "node:fs"; import { join, resolve } from "node:path"; import { fileURLToPath } from "node:url"; -import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; import { setRuntimeDefinitionStore } from "@mandarnilange/agentforge-core/agents/definition-source.js"; import { registerDashboardCommand } from "@mandarnilange/agentforge-core/cli/commands/dashboard.js"; import { registerExecCommand } from "@mandarnilange/agentforge-core/cli/commands/exec.js"; @@ -36,6 +35,8 @@ import { setDiscoveredSchemas } from "@mandarnilange/agentforge-core/schemas/ind import { buildSchemaValidators } from "@mandarnilange/agentforge-core/schemas/schema-discovery.js"; import { SqliteStateStore } from "@mandarnilange/agentforge-core/state/store.js"; import { Command } from "commander"; +import { buildEventBus } from "./adapters/events/event-bus-factory.js"; +import { PostgresEventBus } from "./adapters/events/postgres-event-bus.js"; import { PgDefinitionStore } from "./adapters/store/pg-definition-store.js"; import { SqliteDefinitionStore } from "./adapters/store/sqlite-definition-store.js"; import type { DefinitionPersistSink } from "./cli/commands/apply.js"; @@ -47,6 +48,7 @@ import { PipelineRateLimiter } from "./control-plane/rate-limiter.js"; import { NodeHealthChecker } from "./nodes/health-check.js"; import { LocalNodeRuntime } from "./nodes/local-runtime.js"; import { NodeRegistry } from "./nodes/registry.js"; +import { validateSshNodesAtStartup } from "./nodes/ssh-preflight.js"; import { SshNodeRuntime } from "./nodes/ssh-runtime.js"; import { flushTelemetry, initTelemetry } from "./observability/init.js"; import { @@ -405,7 +407,21 @@ const pipelineController = new PipelineController( agentExecutor, ); -const eventBus = new InMemoryEventBus(); +// P45-T3: pluggable event bus. AGENTFORGE_EVENT_BUS=postgres + an +// AGENTFORGE_POSTGRES_URL switches to LISTEN/NOTIFY so multiple control-plane +// replicas see the same SSE stream. Default stays in-memory for the +// single-process case. +const eventBus = buildEventBus({ postgresUrl: POSTGRES_URL }); +if (eventBus instanceof PostgresEventBus) { + try { + await eventBus.start(); + } catch (err) { + console.error( + `Failed to start Postgres event bus: ${err instanceof Error ? err.message : String(err)}`, + ); + process.exit(1); + } +} // Crash recovery const _recoveryService = new PipelineRecoveryService(stateStore, eventBus, { @@ -440,6 +456,15 @@ const nodeHealthChecker = new NodeHealthChecker( nodeRuntimes, metrics, ); +// SSH preflight (P40-T6): warn early on unreachable ssh hosts so operators +// don't discover the failure via a cryptic dispatch error minutes later. +// The general health checker still runs below — preflight just gives a +// loud, immediate signal at startup for the ssh-specific configuration class. +void validateSshNodesAtStartup({ + runtimes: nodeRuntimes, + warn: (msg) => console.warn(msg), + markOffline: (name) => nodeRegistry.markOffline(name), +}); void nodeHealthChecker.checkAll(); // Init and templates don't need the DI container — register with platform templates merged in. diff --git a/packages/platform/src/state/migrations/postgres/003-job-queue.sql b/packages/platform/src/state/migrations/postgres/003-job-queue.sql new file mode 100644 index 0000000..5e9ce38 --- /dev/null +++ b/packages/platform/src/state/migrations/postgres/003-job-queue.sql @@ -0,0 +1,24 @@ +-- P45-T4: DB-backed agent job queue with claim semantics. +-- Replaces per-replica in-memory pendingRunQueues map. Workers claim rows +-- atomically via FOR UPDATE SKIP LOCKED so multiple control-plane replicas +-- can dispatch without lost or duplicate work. + +CREATE TABLE IF NOT EXISTS agent_jobs ( + run_id TEXT PRIMARY KEY, + node_name TEXT NOT NULL, + payload JSONB NOT NULL, + enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(), + claimed_by TEXT, + claimed_at TIMESTAMPTZ, + claim_ttl_ms INTEGER +); + +-- Hot path: claim() filters by node_name + (claimed_by IS NULL). +CREATE INDEX IF NOT EXISTS idx_agent_jobs_pending + ON agent_jobs (node_name, enqueued_at) + WHERE claimed_by IS NULL; + +-- Sweep path: reclaimStale() filters claimed rows by claimed_at. +CREATE INDEX IF NOT EXISTS idx_agent_jobs_claimed_at + ON agent_jobs (claimed_at) + WHERE claimed_at IS NOT NULL; diff --git a/packages/platform/src/state/migrations/postgres/004-active-run-index.sql b/packages/platform/src/state/migrations/postgres/004-active-run-index.sql new file mode 100644 index 0000000..5612bac --- /dev/null +++ b/packages/platform/src/state/migrations/postgres/004-active-run-index.sql @@ -0,0 +1,7 @@ +-- P45-T6: partial index for the per-node active-run count query used by +-- the stateless scheduler. Filters on the small "active" subset so every +-- scheduling decision is O(active rows for node), not O(all agent runs). + +CREATE INDEX IF NOT EXISTS idx_agent_runs_active_by_node + ON agent_runs (node_name) + WHERE status IN ('pending', 'scheduled', 'running'); diff --git a/packages/platform/tests/adapters/events/event-bus-factory.test.ts b/packages/platform/tests/adapters/events/event-bus-factory.test.ts new file mode 100644 index 0000000..c8aa0c4 --- /dev/null +++ b/packages/platform/tests/adapters/events/event-bus-factory.test.ts @@ -0,0 +1,61 @@ +/** + * Tests for buildEventBus — selects the IEventBus adapter based on + * AGENTFORGE_EVENT_BUS env var (P45-T3). + */ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("pg", () => { + class MockClient { + query = vi.fn().mockResolvedValue({ rows: [] }); + on = vi.fn(); + connect = vi.fn().mockResolvedValue(undefined); + end = vi.fn().mockResolvedValue(undefined); + } + return { default: { Client: MockClient } }; +}); + +import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; +import { buildEventBus } from "../../../src/adapters/events/event-bus-factory.js"; +import { PostgresEventBus } from "../../../src/adapters/events/postgres-event-bus.js"; + +const ORIGINAL_ENV = process.env.AGENTFORGE_EVENT_BUS; + +describe("buildEventBus", () => { + beforeEach(() => { + delete process.env.AGENTFORGE_EVENT_BUS; + }); + afterEach(() => { + if (ORIGINAL_ENV === undefined) delete process.env.AGENTFORGE_EVENT_BUS; + else process.env.AGENTFORGE_EVENT_BUS = ORIGINAL_ENV; + }); + + it("returns InMemoryEventBus by default", () => { + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + }); + + it("returns InMemoryEventBus when AGENTFORGE_EVENT_BUS=memory", () => { + process.env.AGENTFORGE_EVENT_BUS = "memory"; + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + }); + + it("returns PostgresEventBus when AGENTFORGE_EVENT_BUS=postgres and url is set", () => { + process.env.AGENTFORGE_EVENT_BUS = "postgres"; + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(PostgresEventBus); + }); + + it("falls back to InMemoryEventBus when AGENTFORGE_EVENT_BUS=postgres but no url", () => { + process.env.AGENTFORGE_EVENT_BUS = "postgres"; + const bus = buildEventBus({ postgresUrl: undefined }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + }); + + it("rejects unknown event-bus types with a clear error", () => { + process.env.AGENTFORGE_EVENT_BUS = "kafka"; + expect(() => + buildEventBus({ postgresUrl: "postgresql://localhost/test" }), + ).toThrow(/AGENTFORGE_EVENT_BUS.*kafka/); + }); +}); diff --git a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts new file mode 100644 index 0000000..2828068 --- /dev/null +++ b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts @@ -0,0 +1,120 @@ +/** + * Tests for PostgresEventBus — LISTEN/NOTIFY-backed IEventBus (P45-T3). + * + * Verifies the contract: + * - emit() issues `NOTIFY , ` so peer replicas + * subscribed to the same channel receive the event. + * - subscribe() registers a notification handler and returns an + * unsubscribe function. + * - The same JSON shape that peers receive round-trips through + * in-process subscribers (so dashboards in the emitting process + * don't have to wait for the LISTEN round-trip). + */ +import type { + IEventBus, + PipelineEvent, +} from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockClientQuery, mockClientOn, mockConnect, mockEnd } = vi.hoisted( + () => ({ + mockClientQuery: vi.fn().mockResolvedValue({ rows: [] }), + mockClientOn: vi.fn(), + mockConnect: vi.fn(), + mockEnd: vi.fn().mockResolvedValue(undefined), + }), +); + +vi.mock("pg", () => { + class MockClient { + query = mockClientQuery; + on = mockClientOn; + connect = mockConnect.mockResolvedValue(undefined); + end = mockEnd; + } + return { default: { Client: MockClient } }; +}); + +import { PostgresEventBus } from "../../../src/adapters/events/postgres-event-bus.js"; + +describe("PostgresEventBus", () => { + let bus: IEventBus & PostgresEventBus; + + beforeEach(async () => { + vi.clearAllMocks(); + bus = new PostgresEventBus("postgresql://localhost/test", "agentforge"); + await bus.start(); + }); + + it("LISTENs on the configured channel after start", () => { + const sql = mockClientQuery.mock.calls.map((c) => c[0]).join("\n"); + expect(sql).toMatch(/^LISTEN/im); + expect(sql).toMatch(/agentforge/); + }); + + it("emit issues NOTIFY with a JSON payload", async () => { + const event: PipelineEvent = { + type: "run_updated", + runId: "r1", + status: "succeeded", + }; + bus.emit(event); + // emit is sync but issues query asynchronously — flush microtasks + await new Promise((r) => setImmediate(r)); + const notifyCall = mockClientQuery.mock.calls.find((c) => + (c[0] as string).startsWith("SELECT pg_notify"), + ); + expect(notifyCall).toBeDefined(); + const params = notifyCall?.[1] as unknown[]; + const payload = JSON.parse(params[1] as string); + expect(payload.type).toBe("run_updated"); + expect(payload.runId).toBe("r1"); + }); + + it("delivers locally-emitted events to in-process subscribers", () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + bus.emit({ type: "node_online", nodeName: "alpha" }); + expect(received).toHaveLength(1); + expect(received[0]).toMatchObject({ + type: "node_online", + nodeName: "alpha", + }); + }); + + it("delivers events received via NOTIFY to subscribers", async () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + // Replay the notification handler bound to client.on('notification', ...) + const handler = mockClientOn.mock.calls.find( + (c) => c[0] === "notification", + )?.[1] as (msg: { payload: string }) => void; + expect(handler).toBeDefined(); + handler({ + payload: JSON.stringify({ + type: "node_offline", + nodeName: "beta", + }), + }); + expect(received).toEqual([{ type: "node_offline", nodeName: "beta" }]); + }); + + it("subscribe returns an unsubscribe function", () => { + const received: PipelineEvent[] = []; + const unsub = bus.subscribe((e) => received.push(e)); + bus.emit({ type: "node_online", nodeName: "alpha" }); + unsub(); + bus.emit({ type: "node_online", nodeName: "beta" }); + expect(received).toHaveLength(1); + }); + + it("ignores malformed payloads from the LISTEN channel", () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + const handler = mockClientOn.mock.calls.find( + (c) => c[0] === "notification", + )?.[1] as (msg: { payload: string }) => void; + handler({ payload: "{ this is not json" }); + expect(received).toHaveLength(0); + }); +}); diff --git a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts new file mode 100644 index 0000000..11af30f --- /dev/null +++ b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts @@ -0,0 +1,118 @@ +/** + * Tests for PostgresJobQueue using a mocked pg.Pool (P45-T4). + * + * Verifies the SQL contract: enqueue inserts into agent_jobs, claim uses + * `FOR UPDATE SKIP LOCKED` so concurrent replicas never see the same row, + * complete deletes by runId, reclaimStale releases claims older than the + * threshold. + */ +import type { AgentJob } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })); + +vi.mock("pg", () => { + class MockPool { + query = mockQuery; + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { PostgresJobQueue } from "../../../src/adapters/jobs/postgres-job-queue.js"; + +function job(runId: string): AgentJob { + return { + runId, + agentId: "analyst", + agentDefinition: { metadata: { name: "analyst" }, spec: {} }, + model: { + provider: "anthropic", + name: "claude-sonnet", + maxTokens: 4096, + }, + workdir: "/tmp/work", + outputDir: "/tmp/out", + } as unknown as AgentJob; +} + +describe("PostgresJobQueue", () => { + let queue: PostgresJobQueue; + beforeEach(() => { + vi.clearAllMocks(); + queue = new PostgresJobQueue("postgresql://localhost/test"); + }); + + describe("enqueue()", () => { + it("inserts a row with run_id, node_name and serialized payload", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.enqueue(job("r1"), "worker-a"); + const sql = mockQuery.mock.calls[0][0] as string; + const params = mockQuery.mock.calls[0][1] as unknown[]; + expect(sql).toMatch(/INSERT INTO agent_jobs/); + expect(params[0]).toBe("r1"); + expect(params[1]).toBe("worker-a"); + // payload should be JSON-serialized + expect(JSON.parse(params[2] as string).runId).toBe("r1"); + }); + }); + + describe("claim()", () => { + it("uses FOR UPDATE SKIP LOCKED for atomic claim across replicas", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.claim("worker-a"); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/FOR UPDATE SKIP LOCKED/i); + }); + + it("returns deserialized AgentJob payloads", async () => { + mockQuery.mockResolvedValueOnce({ + rows: [ + { run_id: "r1", payload: JSON.stringify(job("r1")) }, + { run_id: "r2", payload: JSON.stringify(job("r2")) }, + ], + }); + const claimed = await queue.claim("worker-a", { limit: 5 }); + expect(claimed.map((j) => j.runId)).toEqual(["r1", "r2"]); + }); + + it("filters by node_name and pending claim", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.claim("worker-a", { limit: 3 }); + const params = mockQuery.mock.calls[0][1] as unknown[]; + // worker name and limit should be bound parameters + expect(params).toContain("worker-a"); + expect(params).toContain(3); + }); + }); + + describe("complete()", () => { + it("DELETEs the row", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.complete("r1"); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/DELETE FROM agent_jobs/); + expect(mockQuery.mock.calls[0][1]).toEqual(["r1"]); + }); + }); + + describe("reclaimStale()", () => { + it("clears claimed_by/claimed_at when claim age exceeds threshold", async () => { + mockQuery.mockResolvedValueOnce({ rowCount: 2, rows: [] }); + const released = await queue.reclaimStale(60_000); + expect(released).toBe(2); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/UPDATE agent_jobs/); + expect(sql).toMatch(/claimed_by\s*=\s*NULL/i); + expect(sql).toMatch(/claimed_at\s*=\s*NULL/i); + }); + }); + + describe("depth()", () => { + it("counts rows for a node regardless of claim state", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "3" }] }); + const d = await queue.depth("worker-a"); + expect(d).toBe(3); + }); + }); +}); diff --git a/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts new file mode 100644 index 0000000..3beb2cf --- /dev/null +++ b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts @@ -0,0 +1,93 @@ +/** + * Tests for PostgresLeaderElector using a mocked pg client (P45-T5). + * + * Uses `pg_try_advisory_lock()` over a single dedicated client so + * the lock survives only as long as that session — replica crash → lock + * auto-released → another replica acquires immediately. + */ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockClientQuery, mockConnect, mockRelease } = vi.hoisted(() => ({ + mockClientQuery: vi.fn(), + mockConnect: vi.fn(), + mockRelease: vi.fn(), +})); + +vi.mock("pg", () => { + class MockPool { + connect = mockConnect.mockImplementation(async () => ({ + query: mockClientQuery, + release: mockRelease, + })); + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { PostgresLeaderElector } from "../../../src/adapters/leader/postgres-leader-elector.js"; + +describe("PostgresLeaderElector", () => { + let elector: PostgresLeaderElector; + + beforeEach(() => { + vi.clearAllMocks(); + elector = new PostgresLeaderElector("postgresql://localhost/test"); + }); + + it("acquire calls pg_try_advisory_lock and returns true on success", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + expect(await elector.acquire("agentforge-reconciler")).toBe(true); + const sql = mockClientQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/pg_try_advisory_lock/i); + }); + + it("acquire returns false when another holder owns the lock", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: false }], + }); + expect(await elector.acquire("agentforge-reconciler")).toBe(false); + }); + + it("isLeader reflects last successful acquire", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + expect(elector.isLeader("name-a")).toBe(true); + expect(elector.isLeader("name-b")).toBe(false); + }); + + it("release calls pg_advisory_unlock", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_advisory_unlock: true }], + }); + await elector.release("name-a"); + expect(elector.isLeader("name-a")).toBe(false); + const lastSql = mockClientQuery.mock.calls.at(-1)?.[0] as string; + expect(lastSql).toMatch(/pg_advisory_unlock/i); + }); + + it("hashes lock names to a stable bigint to avoid collisions", async () => { + // Two distinct names produce two distinct lock ids; reusing the same + // name produces the same id (release + re-acquire round-trip). + mockClientQuery.mockResolvedValue({ + rows: [{ pg_try_advisory_lock: true, pg_advisory_unlock: true }], + }); + await elector.acquire("agentforge-reconciler"); + const firstParams = mockClientQuery.mock.calls[0][1] as unknown[]; + await elector.release("agentforge-reconciler"); + await elector.acquire("agentforge-reconciler"); + const reAcquireParams = mockClientQuery.mock.calls.at(-1)?.[1] as unknown[]; + expect(reAcquireParams[0]).toBe(firstParams[0]); + + await elector.acquire("agentforge-scheduler"); + const schedulerParams = mockClientQuery.mock.calls.at(-1)?.[1] as unknown[]; + expect(schedulerParams[0]).not.toBe(firstParams[0]); + }); +}); diff --git a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts new file mode 100644 index 0000000..80a31a6 --- /dev/null +++ b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts @@ -0,0 +1,57 @@ +/** + * Tests for DbActiveRunCounter — queries agent_runs to compute live load + * across replicas (P45-T6). + */ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })); + +vi.mock("pg", () => { + class MockPool { + query = mockQuery; + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { DbActiveRunCounter } from "../../../src/adapters/scheduling/db-active-run-counter.js"; + +describe("DbActiveRunCounter", () => { + let counter: DbActiveRunCounter; + + beforeEach(() => { + vi.clearAllMocks(); + counter = new DbActiveRunCounter("postgresql://localhost/test"); + }); + + it("count() queries agent_runs filtered by node and active status", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "3" }] }); + const n = await counter.count("worker-a"); + expect(n).toBe(3); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/FROM agent_runs/); + expect(sql).toMatch(/node_name\s*=\s*\$1/); + expect(sql).toMatch(/status\s+IN/i); + const params = mockQuery.mock.calls[0][1] as unknown[]; + expect(params).toContain("worker-a"); + }); + + it("count() includes both 'running' and 'scheduled' / 'pending' statuses", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "0" }] }); + await counter.count("worker-a"); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/'running'/); + expect(sql).toMatch(/'scheduled'|'pending'/); + }); + + it("count() returns 0 when no rows match", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "0" }] }); + expect(await counter.count("worker-a")).toBe(0); + }); + + it("recordStarted/recordCompleted are no-ops (DB is the source of truth)", async () => { + await counter.recordStarted("worker-a"); + await counter.recordCompleted("worker-a"); + expect(mockQuery).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/platform/tests/nodes/docker-availability.test.ts b/packages/platform/tests/nodes/docker-availability.test.ts new file mode 100644 index 0000000..918e259 --- /dev/null +++ b/packages/platform/tests/nodes/docker-availability.test.ts @@ -0,0 +1,67 @@ +/** + * Tests for docker availability preflight (P40-T5). + * + * checkDockerAvailability() probes the docker socket; filterDockerCapability() + * strips "docker" from the effective capability list when the socket is + * unreachable so the node still registers, but with truthful capabilities. + */ +import { describe, expect, it, vi } from "vitest"; +import { + checkDockerAvailability, + filterDockerCapability, +} from "../../src/nodes/docker-availability.js"; + +describe("checkDockerAvailability()", () => { + it("returns true when the docker probe succeeds", async () => { + const probe = vi.fn().mockResolvedValue(true); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(true); + expect(probe).toHaveBeenCalledOnce(); + }); + + it("returns false when the docker probe rejects", async () => { + const probe = vi.fn().mockRejectedValue(new Error("ENOENT")); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(false); + }); + + it("returns false when the docker probe resolves false (timeout / 5xx)", async () => { + const probe = vi.fn().mockResolvedValue(false); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(false); + }); +}); + +describe("filterDockerCapability()", () => { + it("returns capabilities unchanged when docker is available", () => { + const warn = vi.fn(); + const out = filterDockerCapability( + ["llm-access", "docker", "gpu"], + true, + warn, + ); + expect(out).toEqual(["llm-access", "docker", "gpu"]); + expect(warn).not.toHaveBeenCalled(); + }); + + it("removes docker and warns once when docker is unavailable", () => { + const warn = vi.fn(); + const out = filterDockerCapability( + ["llm-access", "docker", "gpu"], + false, + warn, + ); + expect(out).toEqual(["llm-access", "gpu"]); + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/docker.*unavailable/i), + ); + }); + + it("is a no-op when docker is not in the capability list", () => { + const warn = vi.fn(); + const out = filterDockerCapability(["llm-access", "gpu"], false, warn); + expect(out).toEqual(["llm-access", "gpu"]); + expect(warn).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/platform/tests/nodes/ssh-preflight.test.ts b/packages/platform/tests/nodes/ssh-preflight.test.ts new file mode 100644 index 0000000..8c77cd9 --- /dev/null +++ b/packages/platform/tests/nodes/ssh-preflight.test.ts @@ -0,0 +1,121 @@ +/** + * Tests for SSH node preflight (P40-T6). + * + * On platform startup, ssh-typed node definitions get a lightweight reach + * check via runtime.ping(). Unreachable nodes log a warning and are marked + * offline in the registry instead of silently appearing healthy until the + * first scheduled job fails. + */ +import type { NodeDefinitionYaml } from "@mandarnilange/agentforge-core/definitions/parser.js"; +import type { INodeRuntime } from "@mandarnilange/agentforge-core/domain/ports/node-runtime.port.js"; +import { describe, expect, it, vi } from "vitest"; +import { validateSshNodesAtStartup } from "../../src/nodes/ssh-preflight.js"; + +function sshDef(name: string, host = "host.example"): NodeDefinitionYaml { + return { + apiVersion: "agentforge.dev/v1", + kind: "NodeDefinition", + metadata: { name, type: "remote" }, + spec: { + capabilities: ["llm-access"], + connection: { type: "ssh", host, user: "agent" }, + }, + } as NodeDefinitionYaml; +} + +function localDef(name: string): NodeDefinitionYaml { + return { + apiVersion: "agentforge.dev/v1", + kind: "NodeDefinition", + metadata: { name, type: "local" }, + spec: { capabilities: ["llm-access"] }, + } as NodeDefinitionYaml; +} + +function fakeRuntime( + def: NodeDefinitionYaml, + pingResult: boolean, +): INodeRuntime { + return { + nodeDefinition: def, + ping: vi.fn().mockResolvedValue(pingResult), + execute: vi.fn(), + } as unknown as INodeRuntime; +} + +describe("validateSshNodesAtStartup()", () => { + it("only pings ssh-typed nodes (skips local)", async () => { + const localRt = fakeRuntime(localDef("alpha"), true); + const sshRt = fakeRuntime(sshDef("beta"), true); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [localRt, sshRt], + warn, + markOffline, + }); + expect(localRt.ping).not.toHaveBeenCalled(); + expect(sshRt.ping).toHaveBeenCalledOnce(); + }); + + it("marks unreachable ssh nodes offline and warns", async () => { + const sshRt = fakeRuntime(sshDef("beta", "10.0.0.99"), false); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(markOffline).toHaveBeenCalledWith("beta"); + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/ssh.*beta.*unreachable/i), + ); + }); + + it("does not warn or mark offline when ssh is reachable", async () => { + const sshRt = fakeRuntime(sshDef("beta"), true); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(markOffline).not.toHaveBeenCalled(); + expect(warn).not.toHaveBeenCalled(); + }); + + it("returns a structured report of node names and reachability", async () => { + const ok = fakeRuntime(sshDef("ok"), true); + const bad = fakeRuntime(sshDef("bad"), false); + const result = await validateSshNodesAtStartup({ + runtimes: [ok, bad], + warn: vi.fn(), + markOffline: vi.fn(), + }); + expect(result).toEqual([ + { name: "ok", reachable: true }, + { name: "bad", reachable: false }, + ]); + }); + + it("treats ping rejection as unreachable rather than throwing", async () => { + const sshRt = { + nodeDefinition: sshDef("beta"), + ping: vi.fn().mockRejectedValue(new Error("auth failed")), + execute: vi.fn(), + } as unknown as INodeRuntime; + const warn = vi.fn(); + const markOffline = vi.fn(); + const result = await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(result).toEqual([{ name: "beta", reachable: false }]); + expect(markOffline).toHaveBeenCalledWith("beta"); + expect(warn).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/platform/tests/state/pg-store.test.ts b/packages/platform/tests/state/pg-store.test.ts index fcbe60e..b5a5c6d 100644 --- a/packages/platform/tests/state/pg-store.test.ts +++ b/packages/platform/tests/state/pg-store.test.ts @@ -144,6 +144,21 @@ describe("PostgresStateStore", () => { }); }); + describe("preflight()", () => { + it("issues SELECT 1 to verify connectivity", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ "?column?": 1 }] }); + await store.preflight(); + expect(mockQuery).toHaveBeenCalledWith("SELECT 1"); + }); + + it("wraps pg errors with a friendly message that hints at the env var", async () => { + mockQuery.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:5432")); + await expect(store.preflight()).rejects.toThrow( + /preflight failed.*AGENTFORGE_POSTGRES_URL/i, + ); + }); + }); + describe("createPipelineRun()", () => { it("inserts a pipeline run and returns it", async () => { mockQuery.mockResolvedValueOnce({ rows: [] }); From 6777929e6d13a3c4db4864794cf4c9fc4841a3b5 Mon Sep 17 00:00:00 2001 From: Mandar Nilange Date: Fri, 8 May 2026 08:12:53 +0530 Subject: [PATCH 2/3] fix: address CodeRabbit review findings on P40+P45 Major (12) - Scheduler: Promise.allSettled so one failed counter call doesn't block every dispatch; treat failed nodes as full - Scheduler: log counter.recordStarted/Completed rejections instead of void-discarding them - PostgresJobQueue: tolerate corrupted payload rows (warn + skip); COALESCE per-job claim_ttl_ms in reclaimStale so each job ages on its own deadline - InMemoryJobQueue: same per-job ttlMs honour in reclaimStale; enqueue is now a no-op on duplicate runId to match Postgres' ON CONFLICT - PostgresEventBus: separate notify pool from LISTEN client (pg recommends the listener stay idle); start() is idempotent; close() ends both client + pool; channel name validated at construct so LISTEN/UNLISTEN identifier interpolation can't be hijacked - platform-cli: await SSH preflight before health-checker so they don't race on node status; close eventBus on shutdown so PG connections don't leak - DbActiveRunCounter: SQL builds IN clause from ACTIVE_STATUSES via ANY($::text[]) so the constant can't drift out of sync - docker-availability: replace fragile tcp:// regex with new URL parser (handles IPv6, ignores path segments, normalizes ports) Minor (2) - leader-gated-loop: log body errors instead of silently swallowing them - node-start: lower-case capability list before docker check so --capabilities Docker still triggers preflight Doc nits - PostgresLeaderElector: pool sizing + hash collision JSDoc - Parallelize lock release in close() - buildEventBus test asserts the misconfiguration warning fires Skipped (with rationale) - PgLite-based integration tests (3 nitpicks): mock-based tests fully validate the SQL contract; PgLite migration is its own task - close() race with in-flight pg_notify (1 nitpick): close is best-effort, acceptable for shutdown - _setNow @internal annotation (1 nitpick): underscore convention works - Pool sizing options (1 nitpick): defaults reasonable for current load - CHECK constraint on agent_jobs (1 nitpick): nice-to-have Tests: 1651 passing (+8), typecheck clean, lint clean. --- .../src/adapters/jobs/in-memory-job-queue.ts | 14 +++- .../src/control-plane/leader-gated-loop.ts | 11 ++- packages/core/src/control-plane/scheduler.ts | 32 ++++++++- .../adapters/jobs/in-memory-job-queue.test.ts | 26 +++++++ .../control-plane/leader-gated-loop.test.ts | 5 +- .../control-plane/stateless-scheduler.test.ts | 17 +++++ .../src/adapters/events/postgres-event-bus.ts | 41 +++++++++-- .../src/adapters/jobs/postgres-job-queue.ts | 22 +++++- .../leader/postgres-leader-elector.ts | 19 +++-- .../scheduling/db-active-run-counter.ts | 7 +- .../platform/src/cli/commands/node-start.ts | 7 +- .../platform/src/nodes/docker-availability.ts | 29 ++++++-- packages/platform/src/platform-cli.ts | 14 +++- .../adapters/events/event-bus-factory.test.ts | 15 +++- .../events/postgres-event-bus.test.ts | 70 +++++++++++++++---- .../adapters/jobs/postgres-job-queue.test.ts | 20 ++++++ .../scheduling/db-active-run-counter.test.ts | 12 ++-- 17 files changed, 309 insertions(+), 52 deletions(-) diff --git a/packages/core/src/adapters/jobs/in-memory-job-queue.ts b/packages/core/src/adapters/jobs/in-memory-job-queue.ts index 3b8d001..0b5daa8 100644 --- a/packages/core/src/adapters/jobs/in-memory-job-queue.ts +++ b/packages/core/src/adapters/jobs/in-memory-job-queue.ts @@ -39,7 +39,12 @@ export class InMemoryJobQueue implements IJobQueue { } enqueue(job: AgentJob, nodeName: string): Promise { - this.entries.set(job.runId, { job, nodeName }); + // Match Postgres' `ON CONFLICT DO NOTHING`: a duplicate enqueue keeps + // the original entry (and any in-flight claim metadata) instead of + // silently overwriting it. + if (!this.entries.has(job.runId)) { + this.entries.set(job.runId, { job, nodeName }); + } return Promise.resolve(); } @@ -74,8 +79,13 @@ export class InMemoryJobQueue implements IJobQueue { if (entry.claimedBy === undefined || entry.claimedAt === undefined) { continue; } + // Per-job TTL wins when set — claim() records the worker's + // declared trust horizon, and that's the right age for *this* + // job. The maxAgeMs argument is the global fallback for jobs + // claimed before per-job TTLs were tracked. const age = now - entry.claimedAt; - if (age >= maxAgeMs) { + const threshold = entry.ttlMs ?? maxAgeMs; + if (age >= threshold) { entry.claimedBy = undefined; entry.claimedAt = undefined; entry.ttlMs = undefined; diff --git a/packages/core/src/control-plane/leader-gated-loop.ts b/packages/core/src/control-plane/leader-gated-loop.ts index d451f35..23d5bca 100644 --- a/packages/core/src/control-plane/leader-gated-loop.ts +++ b/packages/core/src/control-plane/leader-gated-loop.ts @@ -27,9 +27,14 @@ export function runWhenLeader( if (!leader) return; try { await body(); - } catch { - // Swallow — the interval must keep running so a transient - // failure doesn't permanently disable the singleton loop. + } catch (err) { + // Surface the failure but keep the interval running — a transient + // error must not permanently disable the singleton loop. + console.error( + `runWhenLeader: body for lock "${lockName}" threw: ${ + err instanceof Error ? err.message : String(err) + }`, + ); } }; diff --git a/packages/core/src/control-plane/scheduler.ts b/packages/core/src/control-plane/scheduler.ts index 362a991..06688b8 100644 --- a/packages/core/src/control-plane/scheduler.ts +++ b/packages/core/src/control-plane/scheduler.ts @@ -59,9 +59,20 @@ export class LocalAgentScheduler implements IAgentScheduler { // Read every candidate's load from the counter — fresh on each call // so a peer replica's recent dispatch is reflected immediately. - const counts = await Promise.all( + // allSettled (not all) so a single hung/failed counter call only + // disqualifies that node instead of blocking every dispatch. + const settled = await Promise.allSettled( nodePool.map((n) => this.counter.count(n.metadata.name)), ); + const counts = settled.map((s, i) => { + if (s.status === "fulfilled") return s.value; + console.warn( + `Scheduler: counter.count("${nodePool[i].metadata.name}") failed — treating as full. ${ + s.reason instanceof Error ? s.reason.message : String(s.reason) + }`, + ); + return Number.POSITIVE_INFINITY; + }); const candidates = nodePool.filter((node, i) => { const caps = node.spec.capabilities; @@ -94,12 +105,27 @@ export class LocalAgentScheduler implements IAgentScheduler { } recordRunStarted(nodeName: string): void { - void this.counter.recordStarted(nodeName); + // Fire-and-forget: counter.recordStarted is a no-op for the + // stateless DB adapter, but a future adapter may have side effects. + // Surface failures so they don't disappear. + this.counter.recordStarted(nodeName).catch((err) => { + console.warn( + `Scheduler: counter.recordStarted("${nodeName}") failed: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); this.registry?.recordRunStarted(nodeName); } recordRunCompleted(nodeName: string): void { - void this.counter.recordCompleted(nodeName); + this.counter.recordCompleted(nodeName).catch((err) => { + console.warn( + `Scheduler: counter.recordCompleted("${nodeName}") failed: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); this.registry?.recordRunCompleted(nodeName); } diff --git a/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts index cc530d8..b24dc3c 100644 --- a/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts +++ b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts @@ -35,6 +35,17 @@ describe("InMemoryJobQueue", () => { expect(claimed.map((j) => j.runId)).toEqual(["r1"]); }); + it("enqueue with a duplicate runId is a no-op (matches Postgres ON CONFLICT DO NOTHING)", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a"); // r1 is now claimed + await q.enqueue(job("r1"), "worker-b"); // duplicate runId — must not reset + const second = await q.claim("worker-a"); + expect(second).toEqual([]); // still claimed, not overwritten + const onB = await q.claim("worker-b"); + expect(onB).toEqual([]); // never moved to worker-b + }); + it("does not return the same job twice (claim is exclusive)", async () => { const q = new InMemoryJobQueue(); await q.enqueue(job("r1"), "worker-a"); @@ -86,6 +97,21 @@ describe("InMemoryJobQueue", () => { expect(reclaimedJobs.map((j) => j.runId)).toEqual(["r1"]); }); + it("reclaimStale honours per-job ttlMs over the maxAgeMs argument", async () => { + // Job A claimed with a tight 100ms TTL; Job B with the default 300_000ms. + // At t+200ms (with maxAgeMs=50ms) only A's per-job TTL has elapsed. + const q = new InMemoryJobQueue({ now: () => 0 }); + await q.enqueue(job("ra"), "worker-a"); + await q.enqueue(job("rb"), "worker-a"); + await q.claim("worker-a", { ttlMs: 100, limit: 1 }); + await q.claim("worker-a", { ttlMs: 300_000, limit: 1 }); + (q as unknown as { _setNow: (n: number) => void })._setNow(200); + const reclaimed = await q.reclaimStale(50); + expect(reclaimed).toBe(1); + const next = await q.claim("worker-a"); + expect(next.map((j) => j.runId)).toEqual(["ra"]); + }); + it("depth reports both pending and claimed jobs", async () => { const q = new InMemoryJobQueue(); await q.enqueue(job("r1"), "worker-a"); diff --git a/packages/core/tests/control-plane/leader-gated-loop.test.ts b/packages/core/tests/control-plane/leader-gated-loop.test.ts index 62a2b81..192c5ed 100644 --- a/packages/core/tests/control-plane/leader-gated-loop.test.ts +++ b/packages/core/tests/control-plane/leader-gated-loop.test.ts @@ -80,8 +80,9 @@ describe("runWhenLeader", () => { expect(release).toHaveBeenCalledWith("lock-a"); }); - it("swallows body errors so the loop keeps running", async () => { + it("swallows body errors so the loop keeps running, logging each one", async () => { const elector = new LocalLeaderElector(); + const errSpy = vi.spyOn(console, "error").mockImplementation(() => {}); const body = vi .fn() .mockRejectedValueOnce(new Error("boom")) @@ -90,6 +91,8 @@ describe("runWhenLeader", () => { await vi.advanceTimersByTimeAsync(1000); await vi.advanceTimersByTimeAsync(1000); expect(body).toHaveBeenCalledTimes(2); + expect(errSpy).toHaveBeenCalledWith(expect.stringMatching(/lock-a.*boom/)); stop(); + errSpy.mockRestore(); }); }); diff --git a/packages/core/tests/control-plane/stateless-scheduler.test.ts b/packages/core/tests/control-plane/stateless-scheduler.test.ts index 5b928a5..5822005 100644 --- a/packages/core/tests/control-plane/stateless-scheduler.test.ts +++ b/packages/core/tests/control-plane/stateless-scheduler.test.ts @@ -101,4 +101,21 @@ describe("LocalAgentScheduler stateless path (P45-T6)", () => { ); expect(counter.count).toHaveBeenCalledTimes(2); }); + + it("treats a single failing counter call as 'capacity unknown' and skips that node", async () => { + const counter: IActiveRunCounter = { + count: vi.fn(async (name: string) => { + if (name === "node-a") throw new Error("DB hiccup"); + return 0; + }), + recordStarted: vi.fn().mockResolvedValue(undefined), + recordCompleted: vi.fn().mockResolvedValue(undefined), + }; + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + const nodeB = makeNode("node-b", ["llm-access"], 2); + // node-a's counter rejected; node-b is healthy and idle → must be picked. + const picked = await scheduler.schedule(makeAgent("a"), [nodeA, nodeB]); + expect(picked.metadata.name).toBe("node-b"); + }); }); diff --git a/packages/platform/src/adapters/events/postgres-event-bus.ts b/packages/platform/src/adapters/events/postgres-event-bus.ts index 27f3d06..6f99a47 100644 --- a/packages/platform/src/adapters/events/postgres-event-bus.ts +++ b/packages/platform/src/adapters/events/postgres-event-bus.ts @@ -7,6 +7,10 @@ * subscribers also receive the event synchronously so the emitting * process's UI does not wait for the round-trip through Postgres. * + * The listener client is dedicated — pg recommends keeping it idle so + * notifications are delivered promptly. Outgoing `pg_notify` calls go + * through a separate pool to avoid contending with the listener. + * * Payloads exceed Postgres' 8000-byte NOTIFY limit only for unusual * StatusUpdate shapes — at that point we drop the cross-replica * delivery rather than fail the emit. @@ -19,19 +23,37 @@ import type { import pg from "pg"; const NOTIFY_PAYLOAD_LIMIT = 7900; +// Strict identifier guard. LISTEN/UNLISTEN take SQL identifiers, not +// parameters, so callers must pre-validate the channel name. +const IDENTIFIER_RE = /^[A-Za-z_][A-Za-z0-9_]*$/; export class PostgresEventBus implements IEventBus { private readonly client: pg.Client; + /** + * Separate pool for outgoing pg_notify calls. The pg docs recommend + * the LISTEN client stay idle — issuing other queries on the same + * connection can delay or drop notifications. Pool size is small + * because emit is fire-and-forget and short-lived. + */ + private readonly notifyPool: pg.Pool; private readonly listeners = new Set<(event: PipelineEvent) => void>(); + private started = false; constructor( connectionString: string, private readonly channel: string = "agentforge", ) { + if (!IDENTIFIER_RE.test(channel)) { + throw new Error( + `Invalid channel name "${channel}" — must match ${IDENTIFIER_RE} (LISTEN/UNLISTEN take identifiers, not parameters).`, + ); + } this.client = new pg.Client({ connectionString }); + this.notifyPool = new pg.Pool({ connectionString, max: 2 }); } async start(): Promise { + if (this.started) return; await this.client.connect(); this.client.on("notification", (msg) => { if (!msg.payload) return; @@ -43,10 +65,9 @@ export class PostgresEventBus implements IEventBus { } this.deliver(event); }); - // Channel names come from config / code, not user input. Quoting them - // would force Postgres to treat them as case-sensitive identifiers - // which is not what we want here. + // Channel is constructor-validated; safe to interpolate. await this.client.query(`LISTEN ${this.channel}`); + this.started = true; } emit(event: PipelineEvent): void { @@ -55,12 +76,17 @@ export class PostgresEventBus implements IEventBus { // waiting for the round-trip. this.deliver(event); const payload = JSON.stringify(event); - if (payload.length > NOTIFY_PAYLOAD_LIMIT) return; - void this.client + if (payload.length > NOTIFY_PAYLOAD_LIMIT) { + console.warn( + `PostgresEventBus: dropping cross-replica delivery — payload (${payload.length}B) exceeds NOTIFY limit (${NOTIFY_PAYLOAD_LIMIT}B). type=${event.type}`, + ); + return; + } + void this.notifyPool .query("SELECT pg_notify($1, $2)", [this.channel, payload]) .catch(() => { - // Best-effort: if the LISTEN connection drops, peers won't see - // the event. Local subscribers were already notified above. + // Best-effort: if the notify pool is unavailable, peers won't + // see the event. Local subscribers were already notified above. }); } @@ -88,5 +114,6 @@ export class PostgresEventBus implements IEventBus { // Connection may already be closed. } await this.client.end(); + await this.notifyPool.end(); } } diff --git a/packages/platform/src/adapters/jobs/postgres-job-queue.ts b/packages/platform/src/adapters/jobs/postgres-job-queue.ts index 9fa3c4d..7366f6c 100644 --- a/packages/platform/src/adapters/jobs/postgres-job-queue.ts +++ b/packages/platform/src/adapters/jobs/postgres-job-queue.ts @@ -52,7 +52,22 @@ export class PostgresJobQueue implements IJobQueue { RETURNING run_id, payload`, [nodeName, limit, ttlMs], ); - return rows.map((r) => JSON.parse(r.payload) as AgentJob); + // Resilient parse: a single corrupted payload (DB drift, manual + // edit) shouldn't poison the whole claim — drop the bad row and + // log so the operator can investigate. + const parsed: AgentJob[] = []; + for (const r of rows) { + try { + parsed.push(JSON.parse(r.payload) as AgentJob); + } catch (err) { + console.warn( + `PostgresJobQueue: dropping unparseable payload for run_id=${r.run_id}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + return parsed; } async complete(runId: string): Promise { @@ -60,13 +75,16 @@ export class PostgresJobQueue implements IJobQueue { } async reclaimStale(maxAgeMs: number): Promise { + // COALESCE so each row uses its own claim_ttl_ms when set, falling + // back to maxAgeMs only when the row predates per-job TTL tracking. const { rowCount } = await this.pool.query( `UPDATE agent_jobs SET claimed_by = NULL, claimed_at = NULL, claim_ttl_ms = NULL WHERE claimed_at IS NOT NULL - AND (EXTRACT(EPOCH FROM (now() - claimed_at)) * 1000) >= $1`, + AND (EXTRACT(EPOCH FROM (now() - claimed_at)) * 1000) + >= COALESCE(claim_ttl_ms, $1)`, [maxAgeMs], ); return rowCount ?? 0; diff --git a/packages/platform/src/adapters/leader/postgres-leader-elector.ts b/packages/platform/src/adapters/leader/postgres-leader-elector.ts index a52d12f..281f610 100644 --- a/packages/platform/src/adapters/leader/postgres-leader-elector.ts +++ b/packages/platform/src/adapters/leader/postgres-leader-elector.ts @@ -9,6 +9,12 @@ * Lock names are hashed into a bigint so callers can use human-readable * names ("agentforge-reconciler", "agentforge-scheduler") without picking * magic numbers. + * + * Pool sizing: each held lock checks out one pg.PoolClient for its session + * lifetime. Size the underlying Postgres `max_connections` to accommodate + * (concurrent_lock_count × replica_count) plus headroom for ordinary + * queries. The default Postgres pool is created here with library defaults; + * pass a tuned pool via subclass / DI when load profile demands it. */ import { createHash } from "node:crypto"; @@ -69,9 +75,10 @@ export class PostgresLeaderElector implements ILeaderElector { } async close(): Promise { - for (const name of [...this.held.keys()]) { - await this.release(name); - } + // Release all held locks in parallel so shutdown is not gated on the + // slowest pg_advisory_unlock round-trip. Snapshot the key set first + // to avoid mutation during iteration. + await Promise.all([...this.held.keys()].map((name) => this.release(name))); await this.pool.end(); } } @@ -79,8 +86,10 @@ export class PostgresLeaderElector implements ILeaderElector { /** * Hash a lock name to a signed 64-bit int. Postgres advisory locks take a * bigint; sha256 truncated to 8 bytes is collision-resistant for the small - * fixed set of agentforge lock names. Returns a string so the pg driver - * passes it as a numeric parameter without precision loss. + * fixed set of agentforge lock names (collision probability ≈ 2^-63 per + * pair — negligible for the 5-10 lock names we ever expect to declare). + * Returns a string so the pg driver passes it as a numeric parameter + * without precision loss. */ function lockNameToBigInt(name: string): string { const digest = createHash("sha256").update(name).digest(); diff --git a/packages/platform/src/adapters/scheduling/db-active-run-counter.ts b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts index aff3419..569f2cf 100644 --- a/packages/platform/src/adapters/scheduling/db-active-run-counter.ts +++ b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts @@ -20,12 +20,15 @@ export class DbActiveRunCounter implements IActiveRunCounter { } async count(nodeName: string): Promise { + // Build the IN clause from ACTIVE_STATUSES so the constant can't drift + // out of sync with the SQL. status = ANY($2::text[]) is equivalent to + // IN (...) but takes the array as a single bound parameter. const { rows } = await this.pool.query( `SELECT count(*)::int AS count FROM agent_runs WHERE node_name = $1 - AND status IN ('pending','scheduled','running')`, - [nodeName], + AND status = ANY($2::text[])`, + [nodeName, [...ACTIVE_STATUSES]], ); return Number(rows[0]?.count ?? 0); } diff --git a/packages/platform/src/cli/commands/node-start.ts b/packages/platform/src/cli/commands/node-start.ts index 3a56862..358e7de 100644 --- a/packages/platform/src/cli/commands/node-start.ts +++ b/packages/platform/src/cli/commands/node-start.ts @@ -210,9 +210,14 @@ export function registerNodeStartCommand(program: Command): void { .option("--host ", "HTTP server host", "0.0.0.0") .action(async (opts: NodeStartOptions) => { const nodeName = opts.name ?? `node-${process.pid}`; + // Capability matching is canonicalised to lowercase so a user + // passing `--capabilities Docker,GPU` still triggers the docker + // preflight and the scheduler's `nodeAffinity.required: [docker]` + // matches as expected. const declaredCapabilities = (opts.capabilities ?? "llm-access") .split(",") - .map((c) => c.trim()); + .map((c) => c.trim().toLowerCase()) + .filter((c) => c.length > 0); // Docker preflight (P40-T5): verify the daemon is actually reachable // before we register a node that claims docker capability. On failure // we strip "docker" from the effective list and log a warning so the diff --git a/packages/platform/src/nodes/docker-availability.ts b/packages/platform/src/nodes/docker-availability.ts index 0dd8921..e5c99c3 100644 --- a/packages/platform/src/nodes/docker-availability.ts +++ b/packages/platform/src/nodes/docker-availability.ts @@ -65,13 +65,19 @@ function defaultProbe(opts: DockerAvailabilityOptions): () => Promise { const dockerHost = opts.dockerHost ?? process.env.DOCKER_HOST; const socketPath = opts.socketPath ?? DEFAULT_SOCKET; + // URL parser handles IPv6 brackets, ignores path segments, and + // normalizes ports. Falls back to the unix socket for any non-tcp + // scheme (unix://, ssh://, npipe://) — those need their own runtimes + // which dockerode handles, but for a *reachability* probe the unix + // socket is the right default on every host this code runs on. + const tcpTarget = parseTcpDockerHost(dockerHost); + return () => new Promise((resolve) => { - const tcpMatch = dockerHost?.match(/^tcp:\/\/([^:]+):(\d+)/); - const socket = tcpMatch + const socket = tcpTarget ? connect({ - host: tcpMatch[1], - port: Number(tcpMatch[2]), + host: tcpTarget.host, + port: tcpTarget.port, timeout: timeoutMs, }) : connect({ path: socketPath, timeout: timeoutMs }); @@ -85,3 +91,18 @@ function defaultProbe(opts: DockerAvailabilityOptions): () => Promise { socket.once("timeout", () => cleanup(false)); }); } + +function parseTcpDockerHost( + dockerHost: string | undefined, +): { host: string; port: number } | null { + if (!dockerHost?.startsWith("tcp://")) return null; + try { + const url = new URL(dockerHost); + const port = Number(url.port); + if (!Number.isFinite(port) || port <= 0) return null; + // url.hostname strips IPv6 brackets, which is what `connect()` wants. + return { host: url.hostname, port }; + } catch { + return null; + } +} diff --git a/packages/platform/src/platform-cli.ts b/packages/platform/src/platform-cli.ts index e7d6b1b..9fc0c50 100644 --- a/packages/platform/src/platform-cli.ts +++ b/packages/platform/src/platform-cli.ts @@ -458,9 +458,10 @@ const nodeHealthChecker = new NodeHealthChecker( ); // SSH preflight (P40-T6): warn early on unreachable ssh hosts so operators // don't discover the failure via a cryptic dispatch error minutes later. -// The general health checker still runs below — preflight just gives a -// loud, immediate signal at startup for the ssh-specific configuration class. -void validateSshNodesAtStartup({ +// Awaited before checkAll() so the general health-check loop doesn't race +// with preflight and re-mark a node "online" between the preflight +// rejection and the registry write. +await validateSshNodesAtStartup({ runtimes: nodeRuntimes, warn: (msg) => console.warn(msg), markOffline: (name) => nodeRegistry.markOffline(name), @@ -566,6 +567,13 @@ registerNodeStartCommand(program); void program.parseAsync().then(async () => { if (pgRefreshInterval) clearInterval(pgRefreshInterval); + if (eventBus instanceof PostgresEventBus) { + try { + await eventBus.close(); + } catch { + // Best-effort: connection may already be torn down by the OS. + } + } await stateStore.close(); if (sqliteDefinitionStore) sqliteDefinitionStore.close(); if (pgDefinitionStore) await pgDefinitionStore.close(); diff --git a/packages/platform/tests/adapters/events/event-bus-factory.test.ts b/packages/platform/tests/adapters/events/event-bus-factory.test.ts index c8aa0c4..3a56edb 100644 --- a/packages/platform/tests/adapters/events/event-bus-factory.test.ts +++ b/packages/platform/tests/adapters/events/event-bus-factory.test.ts @@ -11,7 +11,11 @@ vi.mock("pg", () => { connect = vi.fn().mockResolvedValue(undefined); end = vi.fn().mockResolvedValue(undefined); } - return { default: { Client: MockClient } }; + class MockPool { + query = vi.fn().mockResolvedValue({ rows: [] }); + end = vi.fn().mockResolvedValue(undefined); + } + return { default: { Client: MockClient, Pool: MockPool } }; }); import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; @@ -48,8 +52,15 @@ describe("buildEventBus", () => { it("falls back to InMemoryEventBus when AGENTFORGE_EVENT_BUS=postgres but no url", () => { process.env.AGENTFORGE_EVENT_BUS = "postgres"; - const bus = buildEventBus({ postgresUrl: undefined }); + const warn = vi.fn(); + const bus = buildEventBus({ postgresUrl: undefined, warn }); expect(bus).toBeInstanceOf(InMemoryEventBus); + // Operator must see the misconfiguration — silent fallback would + // hide a broken multi-replica deployment. + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/AGENTFORGE_EVENT_BUS=postgres.*falling back/i), + ); }); it("rejects unknown event-bus types with a clear error", () => { diff --git a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts index 2828068..94b5a7a 100644 --- a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts +++ b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts @@ -16,14 +16,21 @@ import type { } from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; -const { mockClientQuery, mockClientOn, mockConnect, mockEnd } = vi.hoisted( - () => ({ - mockClientQuery: vi.fn().mockResolvedValue({ rows: [] }), - mockClientOn: vi.fn(), - mockConnect: vi.fn(), - mockEnd: vi.fn().mockResolvedValue(undefined), - }), -); +const { + mockClientQuery, + mockClientOn, + mockConnect, + mockEnd, + mockPoolQuery, + mockPoolEnd, +} = vi.hoisted(() => ({ + mockClientQuery: vi.fn().mockResolvedValue({ rows: [] }), + mockClientOn: vi.fn(), + mockConnect: vi.fn(), + mockEnd: vi.fn().mockResolvedValue(undefined), + mockPoolQuery: vi.fn().mockResolvedValue({ rows: [] }), + mockPoolEnd: vi.fn().mockResolvedValue(undefined), +})); vi.mock("pg", () => { class MockClient { @@ -32,7 +39,11 @@ vi.mock("pg", () => { connect = mockConnect.mockResolvedValue(undefined); end = mockEnd; } - return { default: { Client: MockClient } }; + class MockPool { + query = mockPoolQuery; + end = mockPoolEnd; + } + return { default: { Client: MockClient, Pool: MockPool } }; }); import { PostgresEventBus } from "../../../src/adapters/events/postgres-event-bus.js"; @@ -52,19 +63,23 @@ describe("PostgresEventBus", () => { expect(sql).toMatch(/agentforge/); }); - it("emit issues NOTIFY with a JSON payload", async () => { + it("emit issues NOTIFY on a separate pool (listener client stays idle)", async () => { const event: PipelineEvent = { type: "run_updated", runId: "r1", status: "succeeded", }; bus.emit(event); - // emit is sync but issues query asynchronously — flush microtasks await new Promise((r) => setImmediate(r)); - const notifyCall = mockClientQuery.mock.calls.find((c) => + // pg_notify must go through the notify pool, not the LISTEN client. + const notifyCall = mockPoolQuery.mock.calls.find((c) => (c[0] as string).startsWith("SELECT pg_notify"), ); expect(notifyCall).toBeDefined(); + const onListenerCall = mockClientQuery.mock.calls.find((c) => + (c[0] as string).startsWith("SELECT pg_notify"), + ); + expect(onListenerCall).toBeUndefined(); const params = notifyCall?.[1] as unknown[]; const payload = JSON.parse(params[1] as string); expect(payload.type).toBe("run_updated"); @@ -117,4 +132,35 @@ describe("PostgresEventBus", () => { handler({ payload: "{ this is not json" }); expect(received).toHaveLength(0); }); + + it("rejects channel names with non-identifier characters at construct", () => { + // Identifier interpolation in LISTEN/UNLISTEN — guard at construct + // time so unsafe channel names never reach the SQL layer. + expect( + () => + new ( + PostgresEventBus as unknown as new ( + url: string, + channel: string, + ) => unknown + )("postgresql://localhost/test", "agent;DROP TABLE users;--"), + ).toThrow(/channel name/i); + }); + + it("close() ends both the listener client and the notify pool", async () => { + await bus.close(); + expect(mockEnd).toHaveBeenCalled(); + expect(mockPoolEnd).toHaveBeenCalled(); + }); + + it("start() is idempotent (no second LISTEN on repeat call)", async () => { + const listenCallsBefore = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ).length; + await bus.start(); + const listenCallsAfter = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ).length; + expect(listenCallsAfter).toBe(listenCallsBefore); + }); }); diff --git a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts index 11af30f..17f4cc6 100644 --- a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts +++ b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts @@ -76,6 +76,17 @@ describe("PostgresJobQueue", () => { expect(claimed.map((j) => j.runId)).toEqual(["r1", "r2"]); }); + it("skips rows with corrupted payloads instead of failing the whole claim", async () => { + mockQuery.mockResolvedValueOnce({ + rows: [ + { run_id: "ok", payload: JSON.stringify(job("ok")) }, + { run_id: "bad", payload: "{ not valid json" }, + ], + }); + const claimed = await queue.claim("worker-a", { limit: 5 }); + expect(claimed.map((j) => j.runId)).toEqual(["ok"]); + }); + it("filters by node_name and pending claim", async () => { mockQuery.mockResolvedValueOnce({ rows: [] }); await queue.claim("worker-a", { limit: 3 }); @@ -106,6 +117,15 @@ describe("PostgresJobQueue", () => { expect(sql).toMatch(/claimed_by\s*=\s*NULL/i); expect(sql).toMatch(/claimed_at\s*=\s*NULL/i); }); + + it("uses per-job claim_ttl_ms when set, falling back to maxAgeMs", async () => { + mockQuery.mockResolvedValueOnce({ rowCount: 0, rows: [] }); + await queue.reclaimStale(60_000); + const sql = mockQuery.mock.calls[0][0] as string; + // SQL should COALESCE per-job ttl with the parameter so each row + // is checked against its own deadline. + expect(sql).toMatch(/COALESCE\(claim_ttl_ms/i); + }); }); describe("depth()", () => { diff --git a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts index 80a31a6..15702e5 100644 --- a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts +++ b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts @@ -31,17 +31,19 @@ describe("DbActiveRunCounter", () => { const sql = mockQuery.mock.calls[0][0] as string; expect(sql).toMatch(/FROM agent_runs/); expect(sql).toMatch(/node_name\s*=\s*\$1/); - expect(sql).toMatch(/status\s+IN/i); + expect(sql).toMatch(/status\s*=\s*ANY/i); const params = mockQuery.mock.calls[0][1] as unknown[]; expect(params).toContain("worker-a"); }); - it("count() includes both 'running' and 'scheduled' / 'pending' statuses", async () => { + it("count() passes ACTIVE_STATUSES as a single array parameter", async () => { mockQuery.mockResolvedValueOnce({ rows: [{ count: "0" }] }); await counter.count("worker-a"); - const sql = mockQuery.mock.calls[0][0] as string; - expect(sql).toMatch(/'running'/); - expect(sql).toMatch(/'scheduled'|'pending'/); + const params = mockQuery.mock.calls[0][1] as unknown[]; + const arrayParam = params.find((p) => Array.isArray(p)) as string[]; + expect(arrayParam).toEqual( + expect.arrayContaining(["pending", "scheduled", "running"]), + ); }); it("count() returns 0 when no rows match", async () => { From ffcb4ed93a5adc373d44f7bf15bc535129e95cb4 Mon Sep 17 00:00:00 2001 From: Mandar Nilange Date: Fri, 8 May 2026 08:35:31 +0530 Subject: [PATCH 3/3] fix: address 8 CodeRabbit findings on PR #14 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major correctness fixes - LocalLeaderElector.acquire(): returns false when the lock is already held — two runWhenLeader instances on the same process can no longer both think they're leader - runWhenLeader: in-flight reentrancy guard (setInterval doesn't await async bodies, so a slow reconciler scan would have N ticks racing); belt-and-braces .catch on tick() so timer-boundary rejections never surface as unhandled - PostgresJobQueue.claim(): DELETE corrupted-payload rows so the reclaimStale → re-claim → JSON.parse-throws cycle can't poison the FIFO head forever - PostgresLeaderElector: pool client is destroyed (release(true)) on acquire query failure and on pg_advisory_unlock failure — prevents a session that may still hold an advisory lock from being returned to the pool, which would split-brain leadership Minor lifecycle / robustness fixes - PostgresEventBus.start(): if LISTEN throws after a successful connect(), tear down the client and rebuild it so a retry can succeed without "Client is already connected" - PostgresEventBus.close(): allSettled so notifyPool.end() runs even when the listener client.end() rejects - docker-availability probe: persistent no-op "error" listener absorbs late events that arrive after cleanup() destroys the socket (e.g. TCP RST + ETIMEDOUT race) — prevents an unhandled error crashing the process at startup Test polish - DbActiveRunCounter: empty-rows mock is now `rows: []` so the rows[0]?.count fallback path is actually exercised Tests: 1659 passing (+8 new), typecheck clean, lint clean. --- .../adapters/leader/local-leader-elector.ts | 5 ++ .../src/control-plane/leader-gated-loop.ts | 22 ++++++- .../leader/local-leader-elector.test.ts | 16 ++++++ .../control-plane/leader-gated-loop.test.ts | 20 +++++++ .../src/adapters/events/postgres-event-bus.ts | 57 ++++++++++++++----- .../src/adapters/jobs/postgres-job-queue.ts | 11 ++++ .../leader/postgres-leader-elector.ts | 21 ++++++- .../platform/src/nodes/docker-availability.ts | 8 +++ .../events/postgres-event-bus.test.ts | 38 +++++++++++++ .../adapters/jobs/postgres-job-queue.test.ts | 20 +++++++ .../leader/postgres-leader-elector.test.ts | 21 +++++++ .../scheduling/db-active-run-counter.test.ts | 6 +- 12 files changed, 223 insertions(+), 22 deletions(-) diff --git a/packages/core/src/adapters/leader/local-leader-elector.ts b/packages/core/src/adapters/leader/local-leader-elector.ts index 6742755..7d7744d 100644 --- a/packages/core/src/adapters/leader/local-leader-elector.ts +++ b/packages/core/src/adapters/leader/local-leader-elector.ts @@ -10,6 +10,11 @@ export class LocalLeaderElector implements ILeaderElector { private readonly held = new Set(); acquire(lockName: string): Promise { + // Mutual exclusion: only the *first* caller for a given name wins. + // Without this, two runWhenLeader instances in the same process (e.g. + // reconciler + scheduler that share a lock by mistake) would both + // believe they are leader, defeating the contract. + if (this.held.has(lockName)) return Promise.resolve(false); this.held.add(lockName); return Promise.resolve(true); } diff --git a/packages/core/src/control-plane/leader-gated-loop.ts b/packages/core/src/control-plane/leader-gated-loop.ts index 23d5bca..adfe054 100644 --- a/packages/core/src/control-plane/leader-gated-loop.ts +++ b/packages/core/src/control-plane/leader-gated-loop.ts @@ -17,14 +17,20 @@ export function runWhenLeader( intervalMs: number, ): () => void { let stopped = false; + // Reentrancy guard: setInterval fires the callback regardless of whether + // the previous async invocation has resolved. Without this flag, a body + // that runs longer than `intervalMs` (e.g. a slow reconciler scan) would + // have N ticks all in flight at once, each holding leader rights. + let inFlight = false; const tick = async (): Promise => { - if (stopped) return; + if (stopped || inFlight) return; let leader = elector.isLeader(lockName); if (!leader) { leader = await elector.acquire(lockName); } if (!leader) return; + inFlight = true; try { await body(); } catch (err) { @@ -35,11 +41,23 @@ export function runWhenLeader( err instanceof Error ? err.message : String(err) }`, ); + } finally { + inFlight = false; } }; const handle = setInterval(() => { - void tick(); + // `tick` is async; attach .catch to absorb any rejection from the + // outer pre-body code (acquire/isLeader). Body errors are already + // caught inside tick — this is just a belt-and-braces guard against + // unhandled rejections at the timer boundary. + tick().catch((err) => { + console.error( + `runWhenLeader: pre-body error for lock "${lockName}": ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); }, intervalMs); return () => { diff --git a/packages/core/tests/adapters/leader/local-leader-elector.test.ts b/packages/core/tests/adapters/leader/local-leader-elector.test.ts index 9c46f98..9347073 100644 --- a/packages/core/tests/adapters/leader/local-leader-elector.test.ts +++ b/packages/core/tests/adapters/leader/local-leader-elector.test.ts @@ -35,4 +35,20 @@ describe("LocalLeaderElector", () => { expect(e.isLeader("a")).toBe(true); expect(e.isLeader("b")).toBe(false); }); + + it("acquire returns false on a second call to the same name (mutual exclusion)", async () => { + // Two parallel call sites (e.g. reconciler + scheduler that mistakenly + // share a lock name, or two runWhenLeader instances on the same process) + // must not both think they're leader. + const e = new LocalLeaderElector(); + expect(await e.acquire("foo")).toBe(true); + expect(await e.acquire("foo")).toBe(false); + }); + + it("acquire succeeds again after release", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + await e.release("foo"); + expect(await e.acquire("foo")).toBe(true); + }); }); diff --git a/packages/core/tests/control-plane/leader-gated-loop.test.ts b/packages/core/tests/control-plane/leader-gated-loop.test.ts index 192c5ed..44f4e7b 100644 --- a/packages/core/tests/control-plane/leader-gated-loop.test.ts +++ b/packages/core/tests/control-plane/leader-gated-loop.test.ts @@ -95,4 +95,24 @@ describe("runWhenLeader", () => { stop(); errSpy.mockRestore(); }); + + it("does not start a second tick while the previous one is still running", async () => { + // Body takes 5 intervals to resolve. Ticks must not overlap, so over + // 4 intervals we should only ever see 1 invocation in flight. + const elector = new LocalLeaderElector(); + let running = 0; + let maxConcurrent = 0; + const body = vi.fn(async () => { + running++; + maxConcurrent = Math.max(maxConcurrent, running); + await new Promise((r) => setTimeout(r, 5_000)); + running--; + }); + const stop = runWhenLeader(elector, "lock-a", body, 1_000); + await vi.advanceTimersByTimeAsync(4_000); + expect(maxConcurrent).toBe(1); + stop(); + // drain in-flight body + await vi.runAllTimersAsync(); + }); }); diff --git a/packages/platform/src/adapters/events/postgres-event-bus.ts b/packages/platform/src/adapters/events/postgres-event-bus.ts index 6f99a47..4e535af 100644 --- a/packages/platform/src/adapters/events/postgres-event-bus.ts +++ b/packages/platform/src/adapters/events/postgres-event-bus.ts @@ -28,7 +28,8 @@ const NOTIFY_PAYLOAD_LIMIT = 7900; const IDENTIFIER_RE = /^[A-Za-z_][A-Za-z0-9_]*$/; export class PostgresEventBus implements IEventBus { - private readonly client: pg.Client; + private readonly connectionString: string; + private client: pg.Client; /** * Separate pool for outgoing pg_notify calls. The pg docs recommend * the LISTEN client stay idle — issuing other queries on the same @@ -48,14 +49,14 @@ export class PostgresEventBus implements IEventBus { `Invalid channel name "${channel}" — must match ${IDENTIFIER_RE} (LISTEN/UNLISTEN take identifiers, not parameters).`, ); } - this.client = new pg.Client({ connectionString }); + this.connectionString = connectionString; + this.client = this.makeClient(); this.notifyPool = new pg.Pool({ connectionString, max: 2 }); } - async start(): Promise { - if (this.started) return; - await this.client.connect(); - this.client.on("notification", (msg) => { + private makeClient(): pg.Client { + const c = new pg.Client({ connectionString: this.connectionString }); + c.on("notification", (msg) => { if (!msg.payload) return; let event: PipelineEvent; try { @@ -65,8 +66,28 @@ export class PostgresEventBus implements IEventBus { } this.deliver(event); }); - // Channel is constructor-validated; safe to interpolate. - await this.client.query(`LISTEN ${this.channel}`); + return c; + } + + async start(): Promise { + if (this.started) return; + await this.client.connect(); + try { + // Channel is constructor-validated; safe to interpolate. + await this.client.query(`LISTEN ${this.channel}`); + } catch (err) { + // LISTEN failed but connect succeeded → calling start() again on + // the same client would throw "Client is already connected" and + // also leak the previous notification listener. Tear down and + // rebuild so the caller can retry cleanly. + try { + await this.client.end(); + } catch { + // already broken; nothing to recover + } + this.client = this.makeClient(); + throw err; + } this.started = true; } @@ -108,12 +129,18 @@ export class PostgresEventBus implements IEventBus { } async close(): Promise { - try { - await this.client.query(`UNLISTEN ${this.channel}`); - } catch { - // Connection may already be closed. - } - await this.client.end(); - await this.notifyPool.end(); + // allSettled so a failure tearing down the listener client does not + // leak the notify pool's connections (or vice versa). + await Promise.allSettled([ + (async () => { + try { + await this.client.query(`UNLISTEN ${this.channel}`); + } catch { + // Connection may already be closed. + } + await this.client.end(); + })(), + this.notifyPool.end(), + ]); } } diff --git a/packages/platform/src/adapters/jobs/postgres-job-queue.ts b/packages/platform/src/adapters/jobs/postgres-job-queue.ts index 7366f6c..2c1342e 100644 --- a/packages/platform/src/adapters/jobs/postgres-job-queue.ts +++ b/packages/platform/src/adapters/jobs/postgres-job-queue.ts @@ -56,10 +56,12 @@ export class PostgresJobQueue implements IJobQueue { // edit) shouldn't poison the whole claim — drop the bad row and // log so the operator can investigate. const parsed: AgentJob[] = []; + const corrupt: string[] = []; for (const r of rows) { try { parsed.push(JSON.parse(r.payload) as AgentJob); } catch (err) { + corrupt.push(r.run_id); console.warn( `PostgresJobQueue: dropping unparseable payload for run_id=${r.run_id}: ${ err instanceof Error ? err.message : String(err) @@ -67,6 +69,15 @@ export class PostgresJobQueue implements IJobQueue { ); } } + if (corrupt.length > 0) { + // Delete the poisoned row(s) so reclaimStale doesn't keep recycling + // them back into the queue head — without this, a single bad row + // would block the FIFO until manually purged. + await this.pool.query( + "DELETE FROM agent_jobs WHERE run_id = ANY($1::text[])", + [corrupt], + ); + } return parsed; } diff --git a/packages/platform/src/adapters/leader/postgres-leader-elector.ts b/packages/platform/src/adapters/leader/postgres-leader-elector.ts index 281f610..12db023 100644 --- a/packages/platform/src/adapters/leader/postgres-leader-elector.ts +++ b/packages/platform/src/adapters/leader/postgres-leader-elector.ts @@ -44,13 +44,19 @@ export class PostgresLeaderElector implements ILeaderElector { ); const acquired = rows[0]?.pg_try_advisory_lock === true; if (!acquired) { + // Lock held elsewhere — we never opened a session-scoped lock, + // so a plain release is safe. client.release(); return false; } this.held.set(lockName, { client }); return true; } catch (err) { - client.release(); + // Query threw — connection is in an unknown state. Destroying it + // (release(true)) prevents pg from putting a possibly-locked + // session back into circulation, where the next caller would + // inherit either a held lock or a poisoned client. + client.release(true); throw err; } } @@ -59,14 +65,23 @@ export class PostgresLeaderElector implements ILeaderElector { const handle = this.held.get(lockName); if (!handle) return; const lockId = lockNameToBigInt(lockName); + // Always drop the in-memory handle so isLeader() flips false, even + // when the unlock RPC fails — at that point the only safe thing is + // to destroy the connection (forcing pg to close the session, which + // auto-releases any advisory locks). + this.held.delete(lockName); try { await handle.client.query( "SELECT pg_advisory_unlock($1) AS pg_advisory_unlock", [lockId], ); - } finally { handle.client.release(); - this.held.delete(lockName); + } catch (err) { + // Unlock failed — the session may still hold the lock. Forcing + // destruction closes the connection, which Postgres treats as a + // session end and releases all advisory locks held by it. + handle.client.release(true); + throw err; } } diff --git a/packages/platform/src/nodes/docker-availability.ts b/packages/platform/src/nodes/docker-availability.ts index e5c99c3..ad9e19e 100644 --- a/packages/platform/src/nodes/docker-availability.ts +++ b/packages/platform/src/nodes/docker-availability.ts @@ -82,6 +82,14 @@ function defaultProbe(opts: DockerAvailabilityOptions): () => Promise { }) : connect({ path: socketPath, timeout: timeoutMs }); + // Persistent no-op error sink. After cleanup() destroys the + // socket, the OS may emit a *second* "error" (e.g. the TCP + // RST + ETIMEDOUT race CR flagged). The once("error", ...) + // handler below has already consumed the first; without this + // permanent listener the second event would surface as an + // unhandled error and crash the process at startup. + socket.on("error", () => {}); + const cleanup = (ok: boolean) => { socket.destroy(); resolve(ok); diff --git a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts index 94b5a7a..0d30f6b 100644 --- a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts +++ b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts @@ -163,4 +163,42 @@ describe("PostgresEventBus", () => { ).length; expect(listenCallsAfter).toBe(listenCallsBefore); }); + + it("start() recovers from a failed LISTEN by tearing down the connect and allowing retry", async () => { + // Fresh bus that hasn't started yet. + vi.clearAllMocks(); + const fresh = new PostgresEventBus( + "postgresql://localhost/test", + "agentforge", + ) as PostgresEventBus & { start: () => Promise }; + // First start: connect succeeds, LISTEN fails. + mockClientQuery.mockRejectedValueOnce(new Error("LISTEN failed")); + await expect(fresh.start()).rejects.toThrow(/LISTEN failed/); + // Retry: connect must NOT be called twice on the same client (would + // throw "Client is already connected"). Implementation should either + // recreate the client or skip connect on retry. + mockClientQuery.mockResolvedValueOnce({ rows: [] }); + mockConnect.mockClear(); + await fresh.start(); + // Second start either re-connects on a fresh client or no-ops connect; + // either way LISTEN must have been issued again. + const listenCalls = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ); + expect(listenCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("close() ends notify pool even when client.end() rejects", async () => { + vi.clearAllMocks(); + const fresh = new PostgresEventBus( + "postgresql://localhost/test", + "agentforge", + ); + mockClientQuery.mockResolvedValueOnce({ rows: [] }); // LISTEN + await fresh.start(); + mockClientQuery.mockResolvedValueOnce({ rows: [] }); // UNLISTEN + mockEnd.mockRejectedValueOnce(new Error("client end failed")); + await fresh.close(); + expect(mockPoolEnd).toHaveBeenCalled(); + }); }); diff --git a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts index 17f4cc6..c572479 100644 --- a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts +++ b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts @@ -83,10 +83,30 @@ describe("PostgresJobQueue", () => { { run_id: "bad", payload: "{ not valid json" }, ], }); + // 2nd query: the cleanup DELETE for the poisoned row. + mockQuery.mockResolvedValueOnce({ rowCount: 1, rows: [] }); const claimed = await queue.claim("worker-a", { limit: 5 }); expect(claimed.map((j) => j.runId)).toEqual(["ok"]); }); + it("DELETEs corrupted rows so reclaimStale can't recycle them forever", async () => { + // Without delete: the row stays claimed_by=worker, JSON.parse keeps + // throwing, and reclaimStale eventually frees it for another doomed + // claim — head-of-line poison forever. + mockQuery.mockResolvedValueOnce({ + rows: [{ run_id: "bad", payload: "not json" }], + }); + mockQuery.mockResolvedValueOnce({ rowCount: 1, rows: [] }); + await queue.claim("worker-a"); + const deleteCall = mockQuery.mock.calls.find( + (c) => + typeof c[0] === "string" && + (c[0] as string).match(/DELETE FROM agent_jobs/i), + ); + expect(deleteCall).toBeDefined(); + expect(deleteCall?.[1]).toEqual([["bad"]]); + }); + it("filters by node_name and pending claim", async () => { mockQuery.mockResolvedValueOnce({ rows: [] }); await queue.claim("worker-a", { limit: 3 }); diff --git a/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts index 3beb2cf..205609d 100644 --- a/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts +++ b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts @@ -73,6 +73,27 @@ describe("PostgresLeaderElector", () => { expect(lastSql).toMatch(/pg_advisory_unlock/i); }); + it("destroys the connection (release(true)) when acquire query throws", async () => { + mockClientQuery.mockRejectedValueOnce(new Error("connection refused")); + await expect(elector.acquire("name-a")).rejects.toThrow(); + // Connection in unknown state must be evicted from the pool — passing + // no arg risks returning a session-scoped lock holder back into circulation. + expect(mockRelease).toHaveBeenCalledWith(true); + }); + + it("destroys the connection (release(true)) when pg_advisory_unlock throws", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + mockClientQuery.mockRejectedValueOnce(new Error("server crashed")); + // Even if the unlock RPC failed, we must not return the still-locked + // session back to the pool — split-brain risk. + await expect(elector.release("name-a")).rejects.toThrow(); + expect(mockRelease).toHaveBeenCalledWith(true); + expect(elector.isLeader("name-a")).toBe(false); + }); + it("hashes lock names to a stable bigint to avoid collisions", async () => { // Two distinct names produce two distinct lock ids; reusing the same // name produces the same id (release + re-acquire round-trip). diff --git a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts index 15702e5..9a6be57 100644 --- a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts +++ b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts @@ -46,8 +46,10 @@ describe("DbActiveRunCounter", () => { ); }); - it("count() returns 0 when no rows match", async () => { - mockQuery.mockResolvedValueOnce({ rows: [{ count: "0" }] }); + it("count() returns 0 when no rows match (exercises the rows[0] fallback)", async () => { + // Empty result set is the actual no-row case; the previous + // [{count: "0"}] mock skipped the fallback path entirely. + mockQuery.mockResolvedValueOnce({ rows: [] }); expect(await counter.count("worker-a")).toBe(0); });