diff --git a/packages/core/src/adapters/jobs/in-memory-job-queue.ts b/packages/core/src/adapters/jobs/in-memory-job-queue.ts new file mode 100644 index 0000000..0b5daa8 --- /dev/null +++ b/packages/core/src/adapters/jobs/in-memory-job-queue.ts @@ -0,0 +1,105 @@ +/** + * InMemoryJobQueue — single-process IJobQueue implementation (P45-T4). + * + * Mirrors the contract the PostgresJobQueue must honour so the same wiring + * works with a swap of adapter via `AGENTFORGE_JOB_QUEUE=postgres|memory`. + * Not safe across processes — for multi-replica control planes use the + * Postgres adapter. + */ + +import type { AgentJob } from "../../domain/ports/agent-executor.port.js"; +import type { + ClaimOptions, + IJobQueue, +} from "../../domain/ports/job-queue.port.js"; + +interface QueueEntry { + job: AgentJob; + nodeName: string; + claimedBy?: string; + claimedAt?: number; + ttlMs?: number; +} + +export interface InMemoryJobQueueOptions { + now?: () => number; +} + +export class InMemoryJobQueue implements IJobQueue { + private readonly entries = new Map(); + private now: () => number; + + constructor(opts: InMemoryJobQueueOptions = {}) { + this.now = opts.now ?? (() => Date.now()); + } + + /** Test-only — override the clock without leaking the field publicly. */ + _setNow(t: number): void { + this.now = () => t; + } + + enqueue(job: AgentJob, nodeName: string): Promise { + // Match Postgres' `ON CONFLICT DO NOTHING`: a duplicate enqueue keeps + // the original entry (and any in-flight claim metadata) instead of + // silently overwriting it. + if (!this.entries.has(job.runId)) { + this.entries.set(job.runId, { job, nodeName }); + } + return Promise.resolve(); + } + + claim(nodeName: string, opts: ClaimOptions = {}): Promise { + const limit = opts.limit ?? 1; + const ttlMs = opts.ttlMs ?? 5 * 60 * 1000; + const now = this.now(); + const claimed: AgentJob[] = []; + // Iterate insertion order — Map preserves it. Pick up to `limit` + // pending entries for this node and atomically mark them claimed. + for (const entry of this.entries.values()) { + if (claimed.length >= limit) break; + if (entry.nodeName !== nodeName) continue; + if (entry.claimedBy !== undefined) continue; + entry.claimedBy = nodeName; + entry.claimedAt = now; + entry.ttlMs = ttlMs; + claimed.push(entry.job); + } + return Promise.resolve(claimed); + } + + complete(runId: string): Promise { + this.entries.delete(runId); + return Promise.resolve(); + } + + reclaimStale(maxAgeMs: number): Promise { + const now = this.now(); + let count = 0; + for (const entry of this.entries.values()) { + if (entry.claimedBy === undefined || entry.claimedAt === undefined) { + continue; + } + // Per-job TTL wins when set — claim() records the worker's + // declared trust horizon, and that's the right age for *this* + // job. The maxAgeMs argument is the global fallback for jobs + // claimed before per-job TTLs were tracked. + const age = now - entry.claimedAt; + const threshold = entry.ttlMs ?? maxAgeMs; + if (age >= threshold) { + entry.claimedBy = undefined; + entry.claimedAt = undefined; + entry.ttlMs = undefined; + count++; + } + } + return Promise.resolve(count); + } + + depth(nodeName: string): Promise { + let count = 0; + for (const entry of this.entries.values()) { + if (entry.nodeName === nodeName) count++; + } + return Promise.resolve(count); + } +} diff --git a/packages/core/src/adapters/leader/local-leader-elector.ts b/packages/core/src/adapters/leader/local-leader-elector.ts new file mode 100644 index 0000000..7d7744d --- /dev/null +++ b/packages/core/src/adapters/leader/local-leader-elector.ts @@ -0,0 +1,30 @@ +/** + * LocalLeaderElector — single-process leader (P45-T5). + * Acquire always succeeds; release flips state. Use for single-replica + * deployments or when no shared store is available. + */ + +import type { ILeaderElector } from "../../domain/ports/leader-elector.port.js"; + +export class LocalLeaderElector implements ILeaderElector { + private readonly held = new Set(); + + acquire(lockName: string): Promise { + // Mutual exclusion: only the *first* caller for a given name wins. + // Without this, two runWhenLeader instances in the same process (e.g. + // reconciler + scheduler that share a lock by mistake) would both + // believe they are leader, defeating the contract. + if (this.held.has(lockName)) return Promise.resolve(false); + this.held.add(lockName); + return Promise.resolve(true); + } + + release(lockName: string): Promise { + this.held.delete(lockName); + return Promise.resolve(); + } + + isLeader(lockName: string): boolean { + return this.held.has(lockName); + } +} diff --git a/packages/core/src/control-plane/leader-gated-loop.ts b/packages/core/src/control-plane/leader-gated-loop.ts new file mode 100644 index 0000000..adfe054 --- /dev/null +++ b/packages/core/src/control-plane/leader-gated-loop.ts @@ -0,0 +1,70 @@ +/** + * runWhenLeader — wraps a singleton interval loop in leader election (P45-T5). + * + * Each tick: try to acquire the lock if we don't already hold it; if we + * become (or remain) leader, run `body`. Otherwise skip and let another + * replica do the work. The released-lock case is automatic on process + * exit (Postgres advisory locks live with the session) — failover is + * picked up at the next tick. + */ + +import type { ILeaderElector } from "../domain/ports/leader-elector.port.js"; + +export function runWhenLeader( + elector: ILeaderElector, + lockName: string, + body: () => Promise, + intervalMs: number, +): () => void { + let stopped = false; + // Reentrancy guard: setInterval fires the callback regardless of whether + // the previous async invocation has resolved. Without this flag, a body + // that runs longer than `intervalMs` (e.g. a slow reconciler scan) would + // have N ticks all in flight at once, each holding leader rights. + let inFlight = false; + + const tick = async (): Promise => { + if (stopped || inFlight) return; + let leader = elector.isLeader(lockName); + if (!leader) { + leader = await elector.acquire(lockName); + } + if (!leader) return; + inFlight = true; + try { + await body(); + } catch (err) { + // Surface the failure but keep the interval running — a transient + // error must not permanently disable the singleton loop. + console.error( + `runWhenLeader: body for lock "${lockName}" threw: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } finally { + inFlight = false; + } + }; + + const handle = setInterval(() => { + // `tick` is async; attach .catch to absorb any rejection from the + // outer pre-body code (acquire/isLeader). Body errors are already + // caught inside tick — this is just a belt-and-braces guard against + // unhandled rejections at the timer boundary. + tick().catch((err) => { + console.error( + `runWhenLeader: pre-body error for lock "${lockName}": ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); + }, intervalMs); + + return () => { + stopped = true; + clearInterval(handle); + // Best-effort release; if the elector has not held the lock this + // is a no-op. + void elector.release(lockName); + }; +} diff --git a/packages/core/src/control-plane/pipeline-controller.ts b/packages/core/src/control-plane/pipeline-controller.ts index 320d73d..5d88052 100644 --- a/packages/core/src/control-plane/pipeline-controller.ts +++ b/packages/core/src/control-plane/pipeline-controller.ts @@ -393,7 +393,7 @@ export class PipelineController { let selectedNode: (typeof nodePool)[0] | null = null; if (agentDef && nodePool.length > 0) { try { - selectedNode = this.scheduler.schedule(agentDef, nodePool); + selectedNode = await this.scheduler.schedule(agentDef, nodePool); } catch { // No node satisfies requirements — fall back to local } diff --git a/packages/core/src/control-plane/scheduler.ts b/packages/core/src/control-plane/scheduler.ts index 0a9b8f6..06688b8 100644 --- a/packages/core/src/control-plane/scheduler.ts +++ b/packages/core/src/control-plane/scheduler.ts @@ -1,13 +1,18 @@ /** - * LocalAgentScheduler — schedules agents to local nodes. - * Respects nodeAffinity requirements and maxConcurrentRuns limits. - * Future phases will add remote node scheduling. + * LocalAgentScheduler — schedules agents to nodes (P45-T6). + * + * Counts are read via an injected IActiveRunCounter so the scheduler is + * stateless across control-plane replicas. The default in-memory counter + * preserves single-process semantics; the platform supplies a DB-backed + * counter that queries `agent_runs` so two replicas never disagree about + * a node's load. */ import type { AgentDefinitionYaml, NodeDefinitionYaml, } from "../definitions/parser.js"; +import type { IActiveRunCounter } from "../domain/ports/active-run-counter.port.js"; export interface INodeRegistry { get(name: string): { status: string } | undefined; @@ -19,33 +24,60 @@ export interface IAgentScheduler { schedule( agent: AgentDefinitionYaml, nodePool: NodeDefinitionYaml[], - ): NodeDefinitionYaml; + ): Promise; recordRunStarted(nodeName: string): void; recordRunCompleted(nodeName: string): void; - getActiveRunCount(nodeName: string): number; + getActiveRunCount(nodeName: string): Promise; +} + +export interface LocalSchedulerOptions { + /** + * Counter for active runs. If omitted, an in-process counter backs the + * historical in-memory Map. Multi-replica deployments + * inject a DB-backed counter so all replicas share one truth. + */ + counter?: IActiveRunCounter; } export class LocalAgentScheduler implements IAgentScheduler { - private readonly activeRuns = new Map(); private readonly registry?: INodeRegistry; + private readonly counter: IActiveRunCounter; - constructor(registry?: INodeRegistry) { + constructor(registry?: INodeRegistry, opts: LocalSchedulerOptions = {}) { this.registry = registry; + this.counter = opts.counter ?? new InMemoryActiveRunCounter(); } - schedule( + async schedule( agent: AgentDefinitionYaml, nodePool: NodeDefinitionYaml[], - ): NodeDefinitionYaml { + ): Promise { const required = agent.spec.nodeAffinity?.required?.map((r) => r.capability) ?? []; const preferred = agent.spec.nodeAffinity?.preferred?.map((p) => p.capability) ?? []; - const candidates = nodePool.filter((node) => { + // Read every candidate's load from the counter — fresh on each call + // so a peer replica's recent dispatch is reflected immediately. + // allSettled (not all) so a single hung/failed counter call only + // disqualifies that node instead of blocking every dispatch. + const settled = await Promise.allSettled( + nodePool.map((n) => this.counter.count(n.metadata.name)), + ); + const counts = settled.map((s, i) => { + if (s.status === "fulfilled") return s.value; + console.warn( + `Scheduler: counter.count("${nodePool[i].metadata.name}") failed — treating as full. ${ + s.reason instanceof Error ? s.reason.message : String(s.reason) + }`, + ); + return Number.POSITIVE_INFINITY; + }); + + const candidates = nodePool.filter((node, i) => { const caps = node.spec.capabilities; const maxRuns = node.spec.resources?.maxConcurrentRuns ?? Infinity; - const active = this.getActiveRunCount(node.metadata.name); + const active = counts[i]; const registration = this.registry?.get(node.metadata.name); const isOffline = registration?.status === "offline"; return ( @@ -69,23 +101,59 @@ export class LocalAgentScheduler implements IAgentScheduler { })); scored.sort((a, b) => b.score - a.score); - const selected = scored[0].node; - - return selected; + return scored[0].node; } recordRunStarted(nodeName: string): void { - this.activeRuns.set(nodeName, (this.activeRuns.get(nodeName) ?? 0) + 1); + // Fire-and-forget: counter.recordStarted is a no-op for the + // stateless DB adapter, but a future adapter may have side effects. + // Surface failures so they don't disappear. + this.counter.recordStarted(nodeName).catch((err) => { + console.warn( + `Scheduler: counter.recordStarted("${nodeName}") failed: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); this.registry?.recordRunStarted(nodeName); } recordRunCompleted(nodeName: string): void { - const current = this.activeRuns.get(nodeName) ?? 0; - this.activeRuns.set(nodeName, Math.max(0, current - 1)); + this.counter.recordCompleted(nodeName).catch((err) => { + console.warn( + `Scheduler: counter.recordCompleted("${nodeName}") failed: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); this.registry?.recordRunCompleted(nodeName); } - getActiveRunCount(nodeName: string): number { - return this.activeRuns.get(nodeName) ?? 0; + getActiveRunCount(nodeName: string): Promise { + return this.counter.count(nodeName); + } +} + +/** + * In-process counter — preserves the historical Map + * behaviour for callers that don't pass a counter explicitly. Not safe + * across processes. + */ +export class InMemoryActiveRunCounter implements IActiveRunCounter { + private readonly counts = new Map(); + + count(nodeName: string): Promise { + return Promise.resolve(this.counts.get(nodeName) ?? 0); + } + + recordStarted(nodeName: string): Promise { + this.counts.set(nodeName, (this.counts.get(nodeName) ?? 0) + 1); + return Promise.resolve(); + } + + recordCompleted(nodeName: string): Promise { + const current = this.counts.get(nodeName) ?? 0; + this.counts.set(nodeName, Math.max(0, current - 1)); + return Promise.resolve(); } } diff --git a/packages/core/src/domain/ports/active-run-counter.port.ts b/packages/core/src/domain/ports/active-run-counter.port.ts new file mode 100644 index 0000000..1f1ba92 --- /dev/null +++ b/packages/core/src/domain/ports/active-run-counter.port.ts @@ -0,0 +1,21 @@ +/** + * IActiveRunCounter — pluggable per-node active-run counter (P45-T6). + * + * The scheduler reads counts via this port before every dispatch decision. + * + * - MemoryActiveRunCounter: single-process default, backs the in-memory map. + * - DbActiveRunCounter: queries `agent_runs WHERE status IN ('running','scheduled') + * GROUP BY node_name` so multiple control-plane replicas see the same + * truth and can never over-schedule a node beyond its maxConcurrentRuns. + */ + +export interface IActiveRunCounter { + /** Active-run count for `nodeName`. Always async to allow DB-backed impls. */ + count(nodeName: string): Promise; + + /** Record that a run has started — may be a no-op for stateless impls. */ + recordStarted(nodeName: string): Promise; + + /** Record that a run has completed — may be a no-op for stateless impls. */ + recordCompleted(nodeName: string): Promise; +} diff --git a/packages/core/src/domain/ports/job-queue.port.ts b/packages/core/src/domain/ports/job-queue.port.ts new file mode 100644 index 0000000..163a34b --- /dev/null +++ b/packages/core/src/domain/ports/job-queue.port.ts @@ -0,0 +1,52 @@ +/** + * IJobQueue — pluggable dispatch queue for agent jobs across control-plane + * replicas (P45-T4). + * + * The single-process default is the in-memory adapter (zero new infra). + * The Postgres adapter (P45-T4) uses `SELECT ... FOR UPDATE SKIP LOCKED` + * so multiple control-plane replicas can claim jobs without lost dispatches + * or duplicate work. + * + * Lifecycle: + * 1. Control plane calls enqueue(job, nodeName) when scheduler picks a node. + * 2. Worker polls claim(nodeName, { limit, ttlMs }); receives 0..N jobs. + * 3. On result POST control plane calls complete(runId) (or fail). + * 4. A periodic reclaimStale(maxAgeMs) sweeps abandoned claims back to + * pending so a different worker can pick them up. + */ + +import type { AgentJob } from "./agent-executor.port.js"; + +export interface ClaimOptions { + /** Maximum number of jobs to claim in this call. Default 1. */ + limit?: number; + /** + * Claim TTL — how long the worker is trusted to hold the job before a + * sweep can reclaim it. Default 5 minutes. + */ + ttlMs?: number; +} + +export interface IJobQueue { + /** Push a job onto the queue for `nodeName` to pick up. */ + enqueue(job: AgentJob, nodeName: string): Promise; + + /** + * Atomically claim up to `limit` jobs for the named worker. Idempotent + * for repeat callers — already-claimed jobs are skipped, never returned + * twice. + */ + claim(nodeName: string, opts?: ClaimOptions): Promise; + + /** Mark a claimed job complete and remove it from the queue. */ + complete(runId: string): Promise; + + /** + * Release claims older than `maxAgeMs` so a different worker can retry. + * Returns the count of reclaimed jobs. + */ + reclaimStale(maxAgeMs: number): Promise; + + /** Best-effort introspection — number of pending+claimed jobs for `nodeName`. */ + depth(nodeName: string): Promise; +} diff --git a/packages/core/src/domain/ports/leader-elector.port.ts b/packages/core/src/domain/ports/leader-elector.port.ts new file mode 100644 index 0000000..4ef4824 --- /dev/null +++ b/packages/core/src/domain/ports/leader-elector.port.ts @@ -0,0 +1,28 @@ +/** + * ILeaderElector — pluggable leader election for control-plane singleton + * loops (P45-T5). + * + * Today PipelineRecoveryService and AgentScheduler each run their interval + * loops on every replica → duplicate reconciler races, duplicate + * scheduling decisions. The Postgres adapter uses session-scoped + * `pg_advisory_lock()` so only one replica becomes leader; the lock + * is released automatically when the holder process exits, so failover is + * instant. + * + * The `LocalLeaderElector` is the single-process default — always leader. + */ + +export interface ILeaderElector { + /** + * Attempt to acquire the lock for `lockName`. Returns true if this + * caller is now the leader. Calling `acquire` while already leader is + * a no-op that returns true. + */ + acquire(lockName: string): Promise; + + /** Release the lock. Idempotent. */ + release(lockName: string): Promise; + + /** Is this elector currently the leader for `lockName`? */ + isLeader(lockName: string): boolean; +} diff --git a/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts new file mode 100644 index 0000000..b24dc3c --- /dev/null +++ b/packages/core/tests/adapters/jobs/in-memory-job-queue.test.ts @@ -0,0 +1,125 @@ +/** + * Tests for InMemoryJobQueue — single-process default for IJobQueue (P45-T4). + * Mirrors the contract that PostgresJobQueue must also satisfy. + */ +import { describe, expect, it } from "vitest"; +import { InMemoryJobQueue } from "../../../src/adapters/jobs/in-memory-job-queue.js"; +import type { AgentJob } from "../../../src/domain/ports/agent-executor.port.js"; + +function job(runId: string): AgentJob { + return { + runId, + agentId: "analyst", + agentDefinition: { metadata: { name: "analyst" }, spec: {} }, + model: { + provider: "anthropic", + name: "claude-sonnet", + maxTokens: 4096, + }, + workdir: "/tmp/work", + outputDir: "/tmp/out", + } as unknown as AgentJob; +} + +describe("InMemoryJobQueue", () => { + it("returns empty when nothing is enqueued", async () => { + const q = new InMemoryJobQueue(); + const claimed = await q.claim("worker-a"); + expect(claimed).toEqual([]); + }); + + it("returns enqueued job to a single claimer", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + const claimed = await q.claim("worker-a"); + expect(claimed.map((j) => j.runId)).toEqual(["r1"]); + }); + + it("enqueue with a duplicate runId is a no-op (matches Postgres ON CONFLICT DO NOTHING)", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a"); // r1 is now claimed + await q.enqueue(job("r1"), "worker-b"); // duplicate runId — must not reset + const second = await q.claim("worker-a"); + expect(second).toEqual([]); // still claimed, not overwritten + const onB = await q.claim("worker-b"); + expect(onB).toEqual([]); // never moved to worker-b + }); + + it("does not return the same job twice (claim is exclusive)", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + const first = await q.claim("worker-a"); + const second = await q.claim("worker-a"); + expect(first.map((j) => j.runId)).toEqual(["r1"]); + expect(second).toEqual([]); + }); + + it("respects limit", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-a"); + await q.enqueue(job("r3"), "worker-a"); + const claimed = await q.claim("worker-a", { limit: 2 }); + expect(claimed.map((j) => j.runId)).toEqual(["r1", "r2"]); + }); + + it("isolates queues per nodeName", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-b"); + const aJobs = await q.claim("worker-a"); + expect(aJobs.map((j) => j.runId)).toEqual(["r1"]); + const bJobs = await q.claim("worker-b"); + expect(bJobs.map((j) => j.runId)).toEqual(["r2"]); + }); + + it("complete removes the job entirely", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a"); + await q.complete("r1"); + expect(await q.depth("worker-a")).toBe(0); + // reclaimStale should not return the completed job + const reclaimed = await q.reclaimStale(0); + expect(reclaimed).toBe(0); + }); + + it("reclaimStale returns claimed-but-stale jobs to the pool", async () => { + const q = new InMemoryJobQueue({ now: () => 1_000 }); + await q.enqueue(job("r1"), "worker-a"); + await q.claim("worker-a", { ttlMs: 100 }); + // advance virtual clock past ttl + (q as unknown as { _setNow: (n: number) => void })._setNow(2_000); + const reclaimed = await q.reclaimStale(100); + expect(reclaimed).toBe(1); + const reclaimedJobs = await q.claim("worker-a"); + expect(reclaimedJobs.map((j) => j.runId)).toEqual(["r1"]); + }); + + it("reclaimStale honours per-job ttlMs over the maxAgeMs argument", async () => { + // Job A claimed with a tight 100ms TTL; Job B with the default 300_000ms. + // At t+200ms (with maxAgeMs=50ms) only A's per-job TTL has elapsed. + const q = new InMemoryJobQueue({ now: () => 0 }); + await q.enqueue(job("ra"), "worker-a"); + await q.enqueue(job("rb"), "worker-a"); + await q.claim("worker-a", { ttlMs: 100, limit: 1 }); + await q.claim("worker-a", { ttlMs: 300_000, limit: 1 }); + (q as unknown as { _setNow: (n: number) => void })._setNow(200); + const reclaimed = await q.reclaimStale(50); + expect(reclaimed).toBe(1); + const next = await q.claim("worker-a"); + expect(next.map((j) => j.runId)).toEqual(["ra"]); + }); + + it("depth reports both pending and claimed jobs", async () => { + const q = new InMemoryJobQueue(); + await q.enqueue(job("r1"), "worker-a"); + await q.enqueue(job("r2"), "worker-a"); + expect(await q.depth("worker-a")).toBe(2); + await q.claim("worker-a", { limit: 1 }); + expect(await q.depth("worker-a")).toBe(2); + await q.complete("r1"); + expect(await q.depth("worker-a")).toBe(1); + }); +}); diff --git a/packages/core/tests/adapters/leader/local-leader-elector.test.ts b/packages/core/tests/adapters/leader/local-leader-elector.test.ts new file mode 100644 index 0000000..9347073 --- /dev/null +++ b/packages/core/tests/adapters/leader/local-leader-elector.test.ts @@ -0,0 +1,54 @@ +/** + * Tests for LocalLeaderElector — single-process default (P45-T5). + * Always leader. Used in single-replica or non-Postgres deployments. + */ +import { describe, expect, it } from "vitest"; +import { LocalLeaderElector } from "../../../src/adapters/leader/local-leader-elector.js"; + +describe("LocalLeaderElector", () => { + it("acquire always returns true", async () => { + const e = new LocalLeaderElector(); + expect(await e.acquire("foo")).toBe(true); + }); + + it("isLeader is true after acquire", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + expect(e.isLeader("foo")).toBe(true); + }); + + it("isLeader is false before any acquire call", () => { + const e = new LocalLeaderElector(); + expect(e.isLeader("foo")).toBe(false); + }); + + it("release flips isLeader back to false", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + await e.release("foo"); + expect(e.isLeader("foo")).toBe(false); + }); + + it("isolates locks by name", async () => { + const e = new LocalLeaderElector(); + await e.acquire("a"); + expect(e.isLeader("a")).toBe(true); + expect(e.isLeader("b")).toBe(false); + }); + + it("acquire returns false on a second call to the same name (mutual exclusion)", async () => { + // Two parallel call sites (e.g. reconciler + scheduler that mistakenly + // share a lock name, or two runWhenLeader instances on the same process) + // must not both think they're leader. + const e = new LocalLeaderElector(); + expect(await e.acquire("foo")).toBe(true); + expect(await e.acquire("foo")).toBe(false); + }); + + it("acquire succeeds again after release", async () => { + const e = new LocalLeaderElector(); + await e.acquire("foo"); + await e.release("foo"); + expect(await e.acquire("foo")).toBe(true); + }); +}); diff --git a/packages/core/tests/control-plane/leader-gated-loop.test.ts b/packages/core/tests/control-plane/leader-gated-loop.test.ts new file mode 100644 index 0000000..44f4e7b --- /dev/null +++ b/packages/core/tests/control-plane/leader-gated-loop.test.ts @@ -0,0 +1,118 @@ +/** + * Tests for runWhenLeader — the wrapper that wires ILeaderElector into the + * existing setInterval loops (PipelineRecovery, AgentScheduler) so only one + * replica executes the body per tick (P45-T5). + */ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { LocalLeaderElector } from "../../src/adapters/leader/local-leader-elector.js"; +import { runWhenLeader } from "../../src/control-plane/leader-gated-loop.js"; +import type { ILeaderElector } from "../../src/domain/ports/leader-elector.port.js"; + +describe("runWhenLeader", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("runs the body on each interval when leader", async () => { + const elector = new LocalLeaderElector(); + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + expect(body).toHaveBeenCalledTimes(3); + stop(); + }); + + it("skips the body when not leader", async () => { + const elector: ILeaderElector = { + acquire: vi.fn().mockResolvedValue(false), + release: vi.fn().mockResolvedValue(undefined), + isLeader: vi.fn().mockReturnValue(false), + }; + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + + await vi.advanceTimersByTimeAsync(3000); + expect(body).not.toHaveBeenCalled(); + stop(); + }); + + it("attempts to re-acquire each tick (so a downed leader is replaced)", async () => { + const acquire = vi.fn().mockResolvedValue(false); + const elector: ILeaderElector = { + acquire, + release: vi.fn().mockResolvedValue(undefined), + isLeader: vi.fn().mockReturnValue(false), + }; + const stop = runWhenLeader(elector, "lock-a", async () => {}, 1000); + await vi.advanceTimersByTimeAsync(3000); + expect(acquire).toHaveBeenCalledTimes(3); + stop(); + }); + + it("stops triggering body after stop() is called", async () => { + const elector = new LocalLeaderElector(); + const body = vi.fn().mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + await vi.advanceTimersByTimeAsync(1000); + stop(); + await vi.advanceTimersByTimeAsync(5000); + expect(body).toHaveBeenCalledTimes(1); + }); + + it("releases the lock when stop() is called", async () => { + const release = vi.fn().mockResolvedValue(undefined); + const elector: ILeaderElector = { + acquire: vi.fn().mockResolvedValue(true), + release, + isLeader: vi.fn().mockReturnValue(true), + }; + const stop = runWhenLeader(elector, "lock-a", async () => {}, 1000); + await vi.advanceTimersByTimeAsync(1000); + stop(); + // release runs in background after stop(); drain microtasks + await vi.runAllTimersAsync(); + expect(release).toHaveBeenCalledWith("lock-a"); + }); + + it("swallows body errors so the loop keeps running, logging each one", async () => { + const elector = new LocalLeaderElector(); + const errSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + const body = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValue(undefined); + const stop = runWhenLeader(elector, "lock-a", body, 1000); + await vi.advanceTimersByTimeAsync(1000); + await vi.advanceTimersByTimeAsync(1000); + expect(body).toHaveBeenCalledTimes(2); + expect(errSpy).toHaveBeenCalledWith(expect.stringMatching(/lock-a.*boom/)); + stop(); + errSpy.mockRestore(); + }); + + it("does not start a second tick while the previous one is still running", async () => { + // Body takes 5 intervals to resolve. Ticks must not overlap, so over + // 4 intervals we should only ever see 1 invocation in flight. + const elector = new LocalLeaderElector(); + let running = 0; + let maxConcurrent = 0; + const body = vi.fn(async () => { + running++; + maxConcurrent = Math.max(maxConcurrent, running); + await new Promise((r) => setTimeout(r, 5_000)); + running--; + }); + const stop = runWhenLeader(elector, "lock-a", body, 1_000); + await vi.advanceTimersByTimeAsync(4_000); + expect(maxConcurrent).toBe(1); + stop(); + // drain in-flight body + await vi.runAllTimersAsync(); + }); +}); diff --git a/packages/core/tests/control-plane/scheduler.test.ts b/packages/core/tests/control-plane/scheduler.test.ts index e92e136..2ed0da3 100644 --- a/packages/core/tests/control-plane/scheduler.test.ts +++ b/packages/core/tests/control-plane/scheduler.test.ts @@ -47,74 +47,74 @@ const LOCAL_NODE = makeNode("local", ["llm-access", "git"]); const DOCKER_NODE = makeNode("local-docker", ["llm-access", "git", "docker"]); describe("LocalAgentScheduler", () => { - it("schedules an agent with no required capabilities to any node", () => { + it("schedules an agent with no required capabilities to any node", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); - const node = scheduler.schedule(agent, [LOCAL_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE]); expect(node.metadata.name).toBe("local"); }); - it("schedules agent requiring docker to node with docker", () => { + it("schedules agent requiring docker to node with docker", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("developer", ["docker"]); - const node = scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); expect(node.metadata.name).toBe("local-docker"); }); - it("throws when no node satisfies required capabilities", () => { + it("throws when no node satisfies required capabilities", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("developer", ["docker"]); - expect(() => scheduler.schedule(agent, [LOCAL_NODE])).toThrow( + await expect(scheduler.schedule(agent, [LOCAL_NODE])).rejects.toThrow( /no available node/i, ); }); - it("throws when all nodes are at capacity", () => { + it("throws when all nodes are at capacity", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); const fullNode = makeNode("local", ["llm-access"], 2); scheduler.recordRunStarted("local"); scheduler.recordRunStarted("local"); - expect(() => scheduler.schedule(agent, [fullNode])).toThrow( + await expect(scheduler.schedule(agent, [fullNode])).rejects.toThrow( /no available node/i, ); }); - it("respects maxConcurrentRuns and picks node with capacity", () => { + it("respects maxConcurrentRuns and picks node with capacity", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); const full = makeNode("busy", ["llm-access"], 1); const free = makeNode("free", ["llm-access"], 2); scheduler.recordRunStarted("busy"); - const node = scheduler.schedule(agent, [full, free]); + const node = await scheduler.schedule(agent, [full, free]); expect(node.metadata.name).toBe("free"); }); - it("tracks active runs and decrements on completion", () => { + it("tracks active runs and decrements on completion", async () => { const scheduler = new LocalAgentScheduler(); scheduler.recordRunStarted("local"); scheduler.recordRunStarted("local"); - expect(scheduler.getActiveRunCount("local")).toBe(2); + expect(await scheduler.getActiveRunCount("local")).toBe(2); scheduler.recordRunCompleted("local"); - expect(scheduler.getActiveRunCount("local")).toBe(1); + expect(await scheduler.getActiveRunCount("local")).toBe(1); }); - it("prefers node matching preferred capabilities", () => { + it("prefers node matching preferred capabilities", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst", [], ["docker"]); - const node = scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE, DOCKER_NODE]); expect(node.metadata.name).toBe("local-docker"); }); }); describe("LocalAgentScheduler without registry", () => { - it("schedules normally when registry not provided", () => { + it("schedules normally when registry not provided", async () => { const scheduler = new LocalAgentScheduler(); const agent = makeAgent("analyst"); - const node = scheduler.schedule(agent, [LOCAL_NODE]); + const node = await scheduler.schedule(agent, [LOCAL_NODE]); expect(node.metadata.name).toBe("local"); }); }); diff --git a/packages/core/tests/control-plane/stateless-scheduler.test.ts b/packages/core/tests/control-plane/stateless-scheduler.test.ts new file mode 100644 index 0000000..5822005 --- /dev/null +++ b/packages/core/tests/control-plane/stateless-scheduler.test.ts @@ -0,0 +1,121 @@ +/** + * Tests for the stateless scheduling path (P45-T6). + * + * Scheduler.schedule consults an IActiveRunCounter before each decision so + * two control-plane replicas reading the same DB never disagree about the + * load on a node. + */ +import { describe, expect, it, vi } from "vitest"; +import { LocalAgentScheduler } from "../../src/control-plane/scheduler.js"; +import type { + AgentDefinitionYaml, + NodeDefinitionYaml, +} from "../../src/definitions/parser.js"; +import type { IActiveRunCounter } from "../../src/domain/ports/active-run-counter.port.js"; + +function makeNode( + name: string, + capabilities: string[], + maxConcurrentRuns = 2, +): NodeDefinitionYaml { + return { + apiVersion: "agentforge/v1", + kind: "NodeDefinition", + metadata: { name, type: "local" }, + spec: { + connection: { type: "local" }, + capabilities, + resources: { maxConcurrentRuns }, + }, + }; +} + +function makeAgent( + name: string, + requiredCaps: string[] = [], + preferredCaps: string[] = [], +): AgentDefinitionYaml { + return { + apiVersion: "agentforge/v1", + kind: "AgentDefinition", + metadata: { name, displayName: name, phase: "1" }, + spec: { + executor: "pi-ai", + systemPrompt: { file: `prompts/${name}.md` }, + outputs: [], + nodeAffinity: { + required: requiredCaps.map((c) => ({ capability: c })), + preferred: preferredCaps.map((c) => ({ capability: c })), + }, + }, + }; +} + +function fakeCounter(counts: Record): IActiveRunCounter { + return { + count: vi.fn(async (name: string) => counts[name] ?? 0), + recordStarted: vi.fn().mockResolvedValue(undefined), + recordCompleted: vi.fn().mockResolvedValue(undefined), + }; +} + +describe("LocalAgentScheduler stateless path (P45-T6)", () => { + it("schedule queries the counter (not the internal map) before each decision", async () => { + const counter = fakeCounter({ "node-a": 0 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const node = makeNode("node-a", ["llm-access"]); + const picked = await scheduler.schedule(makeAgent("a"), [node]); + expect(picked.metadata.name).toBe("node-a"); + expect(counter.count).toHaveBeenCalledWith("node-a"); + }); + + it("skips a node when the counter says it has reached maxConcurrentRuns", async () => { + // node-a is at capacity (2 of 2); node-b is idle (0 of 2). + const counter = fakeCounter({ "node-a": 2, "node-b": 0 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + const nodeB = makeNode("node-b", ["llm-access"], 2); + const picked = await scheduler.schedule(makeAgent("a"), [nodeA, nodeB]); + expect(picked.metadata.name).toBe("node-b"); + }); + + it("throws when every candidate is at capacity according to the counter", async () => { + const counter = fakeCounter({ "node-a": 2 }); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + await expect(scheduler.schedule(makeAgent("a"), [nodeA])).rejects.toThrow( + /no available node/i, + ); + }); + + it("re-reads counts on each call (no internal staleness across replicas)", async () => { + const counts: Record = { "node-a": 0 }; + const counter = fakeCounter(counts); + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const node = makeNode("node-a", ["llm-access"], 2); + await scheduler.schedule(makeAgent("a"), [node]); + // Simulate another replica claiming a slot on node-a. + counts["node-a"] = 2; + await expect(scheduler.schedule(makeAgent("a"), [node])).rejects.toThrow( + /no available node/i, + ); + expect(counter.count).toHaveBeenCalledTimes(2); + }); + + it("treats a single failing counter call as 'capacity unknown' and skips that node", async () => { + const counter: IActiveRunCounter = { + count: vi.fn(async (name: string) => { + if (name === "node-a") throw new Error("DB hiccup"); + return 0; + }), + recordStarted: vi.fn().mockResolvedValue(undefined), + recordCompleted: vi.fn().mockResolvedValue(undefined), + }; + const scheduler = new LocalAgentScheduler(undefined, { counter }); + const nodeA = makeNode("node-a", ["llm-access"], 2); + const nodeB = makeNode("node-b", ["llm-access"], 2); + // node-a's counter rejected; node-b is healthy and idle → must be picked. + const picked = await scheduler.schedule(makeAgent("a"), [nodeA, nodeB]); + expect(picked.metadata.name).toBe("node-b"); + }); +}); diff --git a/packages/platform/src/adapters/events/event-bus-factory.ts b/packages/platform/src/adapters/events/event-bus-factory.ts new file mode 100644 index 0000000..93e99a8 --- /dev/null +++ b/packages/platform/src/adapters/events/event-bus-factory.ts @@ -0,0 +1,45 @@ +/** + * Event-bus selector (P45-T3). + * + * Reads AGENTFORGE_EVENT_BUS at boot: + * - "memory" / unset → InMemoryEventBus (single-process default) + * - "postgres" → PostgresEventBus over LISTEN/NOTIFY when a URL + * is available; otherwise warn and fall back to + * memory rather than crash on a misconfigured env. + * + * The factory is intentionally narrow: callers configure once at startup + * and pass the resulting bus into PipelineController, GateController, + * dashboard SSE, etc. + */ + +import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; +import type { IEventBus } from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import { PostgresEventBus } from "./postgres-event-bus.js"; + +export interface EventBusFactoryOptions { + postgresUrl?: string; + channel?: string; + warn?: (msg: string) => void; +} + +export function buildEventBus(opts: EventBusFactoryOptions = {}): IEventBus { + const choice = (process.env.AGENTFORGE_EVENT_BUS ?? "memory").toLowerCase(); + const warn = opts.warn ?? ((msg) => console.warn(msg)); + + if (choice === "memory") return new InMemoryEventBus(); + + if (choice === "postgres") { + if (!opts.postgresUrl) { + warn( + "AGENTFORGE_EVENT_BUS=postgres but no Postgres URL is configured — " + + "falling back to InMemoryEventBus. Set AGENTFORGE_POSTGRES_URL to enable cross-replica events.", + ); + return new InMemoryEventBus(); + } + return new PostgresEventBus(opts.postgresUrl, opts.channel); + } + + throw new Error( + `Unknown AGENTFORGE_EVENT_BUS=${choice} — supported values: memory, postgres`, + ); +} diff --git a/packages/platform/src/adapters/events/postgres-event-bus.ts b/packages/platform/src/adapters/events/postgres-event-bus.ts new file mode 100644 index 0000000..4e535af --- /dev/null +++ b/packages/platform/src/adapters/events/postgres-event-bus.ts @@ -0,0 +1,146 @@ +/** + * PostgresEventBus — multi-replica IEventBus over LISTEN/NOTIFY (P45-T3). + * + * Each control-plane replica opens a dedicated pg client and `LISTEN`s on + * a shared channel. `emit` issues `NOTIFY` so every replica's subscribers + * (dashboard SSE, reconcilers, etc.) see the same event stream. Local + * subscribers also receive the event synchronously so the emitting + * process's UI does not wait for the round-trip through Postgres. + * + * The listener client is dedicated — pg recommends keeping it idle so + * notifications are delivered promptly. Outgoing `pg_notify` calls go + * through a separate pool to avoid contending with the listener. + * + * Payloads exceed Postgres' 8000-byte NOTIFY limit only for unusual + * StatusUpdate shapes — at that point we drop the cross-replica + * delivery rather than fail the emit. + */ + +import type { + IEventBus, + PipelineEvent, +} from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import pg from "pg"; + +const NOTIFY_PAYLOAD_LIMIT = 7900; +// Strict identifier guard. LISTEN/UNLISTEN take SQL identifiers, not +// parameters, so callers must pre-validate the channel name. +const IDENTIFIER_RE = /^[A-Za-z_][A-Za-z0-9_]*$/; + +export class PostgresEventBus implements IEventBus { + private readonly connectionString: string; + private client: pg.Client; + /** + * Separate pool for outgoing pg_notify calls. The pg docs recommend + * the LISTEN client stay idle — issuing other queries on the same + * connection can delay or drop notifications. Pool size is small + * because emit is fire-and-forget and short-lived. + */ + private readonly notifyPool: pg.Pool; + private readonly listeners = new Set<(event: PipelineEvent) => void>(); + private started = false; + + constructor( + connectionString: string, + private readonly channel: string = "agentforge", + ) { + if (!IDENTIFIER_RE.test(channel)) { + throw new Error( + `Invalid channel name "${channel}" — must match ${IDENTIFIER_RE} (LISTEN/UNLISTEN take identifiers, not parameters).`, + ); + } + this.connectionString = connectionString; + this.client = this.makeClient(); + this.notifyPool = new pg.Pool({ connectionString, max: 2 }); + } + + private makeClient(): pg.Client { + const c = new pg.Client({ connectionString: this.connectionString }); + c.on("notification", (msg) => { + if (!msg.payload) return; + let event: PipelineEvent; + try { + event = JSON.parse(msg.payload) as PipelineEvent; + } catch { + return; + } + this.deliver(event); + }); + return c; + } + + async start(): Promise { + if (this.started) return; + await this.client.connect(); + try { + // Channel is constructor-validated; safe to interpolate. + await this.client.query(`LISTEN ${this.channel}`); + } catch (err) { + // LISTEN failed but connect succeeded → calling start() again on + // the same client would throw "Client is already connected" and + // also leak the previous notification listener. Tear down and + // rebuild so the caller can retry cleanly. + try { + await this.client.end(); + } catch { + // already broken; nothing to recover + } + this.client = this.makeClient(); + throw err; + } + this.started = true; + } + + emit(event: PipelineEvent): void { + // Notify peers asynchronously; deliver locally synchronously so a + // dashboard rendered in the emitting process sees the event without + // waiting for the round-trip. + this.deliver(event); + const payload = JSON.stringify(event); + if (payload.length > NOTIFY_PAYLOAD_LIMIT) { + console.warn( + `PostgresEventBus: dropping cross-replica delivery — payload (${payload.length}B) exceeds NOTIFY limit (${NOTIFY_PAYLOAD_LIMIT}B). type=${event.type}`, + ); + return; + } + void this.notifyPool + .query("SELECT pg_notify($1, $2)", [this.channel, payload]) + .catch(() => { + // Best-effort: if the notify pool is unavailable, peers won't + // see the event. Local subscribers were already notified above. + }); + } + + subscribe(listener: (event: PipelineEvent) => void): () => void { + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } + + private deliver(event: PipelineEvent): void { + for (const listener of this.listeners) { + try { + listener(event); + } catch { + // Resilient: do not let one bad subscriber break the rest. + } + } + } + + async close(): Promise { + // allSettled so a failure tearing down the listener client does not + // leak the notify pool's connections (or vice versa). + await Promise.allSettled([ + (async () => { + try { + await this.client.query(`UNLISTEN ${this.channel}`); + } catch { + // Connection may already be closed. + } + await this.client.end(); + })(), + this.notifyPool.end(), + ]); + } +} diff --git a/packages/platform/src/adapters/jobs/postgres-job-queue.ts b/packages/platform/src/adapters/jobs/postgres-job-queue.ts new file mode 100644 index 0000000..2c1342e --- /dev/null +++ b/packages/platform/src/adapters/jobs/postgres-job-queue.ts @@ -0,0 +1,115 @@ +/** + * PostgresJobQueue — multi-replica IJobQueue (P45-T4). + * + * Uses `SELECT ... FOR UPDATE SKIP LOCKED` so any number of control-plane + * replicas can call `claim()` concurrently without lost dispatches or + * duplicate work. Migration `003-job-queue.sql` provisions the table. + */ + +import type { AgentJob } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; +import type { + ClaimOptions, + IJobQueue, +} from "@mandarnilange/agentforge-core/domain/ports/job-queue.port.js"; +import pg from "pg"; + +export class PostgresJobQueue implements IJobQueue { + private readonly pool: pg.Pool; + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async enqueue(job: AgentJob, nodeName: string): Promise { + await this.pool.query( + `INSERT INTO agent_jobs (run_id, node_name, payload, enqueued_at) + VALUES ($1, $2, $3, now()) + ON CONFLICT (run_id) DO NOTHING`, + [job.runId, nodeName, JSON.stringify(job)], + ); + } + + async claim(nodeName: string, opts: ClaimOptions = {}): Promise { + const limit = opts.limit ?? 1; + const ttlMs = opts.ttlMs ?? 5 * 60 * 1000; + // Single statement: pick eligible rows with FOR UPDATE SKIP LOCKED + // (so a concurrent replica can't grab the same row), then UPDATE + // to mark them claimed and RETURN the payload. CTE keeps it atomic + // inside one query — no transaction round-trip needed. + const { rows } = await this.pool.query( + `WITH eligible AS ( + SELECT run_id FROM agent_jobs + WHERE node_name = $1 AND claimed_by IS NULL + ORDER BY enqueued_at + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE agent_jobs SET + claimed_by = $1, + claimed_at = now(), + claim_ttl_ms = $3 + WHERE run_id IN (SELECT run_id FROM eligible) + RETURNING run_id, payload`, + [nodeName, limit, ttlMs], + ); + // Resilient parse: a single corrupted payload (DB drift, manual + // edit) shouldn't poison the whole claim — drop the bad row and + // log so the operator can investigate. + const parsed: AgentJob[] = []; + const corrupt: string[] = []; + for (const r of rows) { + try { + parsed.push(JSON.parse(r.payload) as AgentJob); + } catch (err) { + corrupt.push(r.run_id); + console.warn( + `PostgresJobQueue: dropping unparseable payload for run_id=${r.run_id}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + if (corrupt.length > 0) { + // Delete the poisoned row(s) so reclaimStale doesn't keep recycling + // them back into the queue head — without this, a single bad row + // would block the FIFO until manually purged. + await this.pool.query( + "DELETE FROM agent_jobs WHERE run_id = ANY($1::text[])", + [corrupt], + ); + } + return parsed; + } + + async complete(runId: string): Promise { + await this.pool.query("DELETE FROM agent_jobs WHERE run_id = $1", [runId]); + } + + async reclaimStale(maxAgeMs: number): Promise { + // COALESCE so each row uses its own claim_ttl_ms when set, falling + // back to maxAgeMs only when the row predates per-job TTL tracking. + const { rowCount } = await this.pool.query( + `UPDATE agent_jobs SET + claimed_by = NULL, + claimed_at = NULL, + claim_ttl_ms = NULL + WHERE claimed_at IS NOT NULL + AND (EXTRACT(EPOCH FROM (now() - claimed_at)) * 1000) + >= COALESCE(claim_ttl_ms, $1)`, + [maxAgeMs], + ); + return rowCount ?? 0; + } + + async depth(nodeName: string): Promise { + const { rows } = await this.pool.query( + "SELECT count(*)::int AS count FROM agent_jobs WHERE node_name = $1", + [nodeName], + ); + return Number(rows[0]?.count ?? 0); + } + + async close(): Promise { + await this.pool.end(); + } +} diff --git a/packages/platform/src/adapters/leader/postgres-leader-elector.ts b/packages/platform/src/adapters/leader/postgres-leader-elector.ts new file mode 100644 index 0000000..12db023 --- /dev/null +++ b/packages/platform/src/adapters/leader/postgres-leader-elector.ts @@ -0,0 +1,116 @@ +/** + * PostgresLeaderElector — pg_advisory_lock-based singleton election (P45-T5). + * + * Holds a dedicated pg client (not a pool checkout) so the session lives + * for the lifetime of the leader. When the process exits or the session + * drops, Postgres auto-releases the lock and another replica's next + * `pg_try_advisory_lock` call returns true → instant failover. + * + * Lock names are hashed into a bigint so callers can use human-readable + * names ("agentforge-reconciler", "agentforge-scheduler") without picking + * magic numbers. + * + * Pool sizing: each held lock checks out one pg.PoolClient for its session + * lifetime. Size the underlying Postgres `max_connections` to accommodate + * (concurrent_lock_count × replica_count) plus headroom for ordinary + * queries. The default Postgres pool is created here with library defaults; + * pass a tuned pool via subclass / DI when load profile demands it. + */ + +import { createHash } from "node:crypto"; +import type { ILeaderElector } from "@mandarnilange/agentforge-core/domain/ports/leader-elector.port.js"; +import pg from "pg"; + +interface LockHandle { + client: pg.PoolClient; +} + +export class PostgresLeaderElector implements ILeaderElector { + private readonly pool: pg.Pool; + private readonly held = new Map(); + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async acquire(lockName: string): Promise { + if (this.held.has(lockName)) return true; + const lockId = lockNameToBigInt(lockName); + const client = (await this.pool.connect()) as pg.PoolClient; + try { + const { rows } = await client.query( + "SELECT pg_try_advisory_lock($1) AS pg_try_advisory_lock", + [lockId], + ); + const acquired = rows[0]?.pg_try_advisory_lock === true; + if (!acquired) { + // Lock held elsewhere — we never opened a session-scoped lock, + // so a plain release is safe. + client.release(); + return false; + } + this.held.set(lockName, { client }); + return true; + } catch (err) { + // Query threw — connection is in an unknown state. Destroying it + // (release(true)) prevents pg from putting a possibly-locked + // session back into circulation, where the next caller would + // inherit either a held lock or a poisoned client. + client.release(true); + throw err; + } + } + + async release(lockName: string): Promise { + const handle = this.held.get(lockName); + if (!handle) return; + const lockId = lockNameToBigInt(lockName); + // Always drop the in-memory handle so isLeader() flips false, even + // when the unlock RPC fails — at that point the only safe thing is + // to destroy the connection (forcing pg to close the session, which + // auto-releases any advisory locks). + this.held.delete(lockName); + try { + await handle.client.query( + "SELECT pg_advisory_unlock($1) AS pg_advisory_unlock", + [lockId], + ); + handle.client.release(); + } catch (err) { + // Unlock failed — the session may still hold the lock. Forcing + // destruction closes the connection, which Postgres treats as a + // session end and releases all advisory locks held by it. + handle.client.release(true); + throw err; + } + } + + isLeader(lockName: string): boolean { + return this.held.has(lockName); + } + + async close(): Promise { + // Release all held locks in parallel so shutdown is not gated on the + // slowest pg_advisory_unlock round-trip. Snapshot the key set first + // to avoid mutation during iteration. + await Promise.all([...this.held.keys()].map((name) => this.release(name))); + await this.pool.end(); + } +} + +/** + * Hash a lock name to a signed 64-bit int. Postgres advisory locks take a + * bigint; sha256 truncated to 8 bytes is collision-resistant for the small + * fixed set of agentforge lock names (collision probability ≈ 2^-63 per + * pair — negligible for the 5-10 lock names we ever expect to declare). + * Returns a string so the pg driver passes it as a numeric parameter + * without precision loss. + */ +function lockNameToBigInt(name: string): string { + const digest = createHash("sha256").update(name).digest(); + // Take 8 bytes, force the high bit to 0 to keep it positive (Postgres + // bigint range is signed but our values are well within int63). + const buf = digest.subarray(0, 8); + buf[0] &= 0x7f; + return buf.readBigInt64BE(0).toString(); +} diff --git a/packages/platform/src/adapters/scheduling/db-active-run-counter.ts b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts new file mode 100644 index 0000000..569f2cf --- /dev/null +++ b/packages/platform/src/adapters/scheduling/db-active-run-counter.ts @@ -0,0 +1,54 @@ +/** + * DbActiveRunCounter — multi-replica IActiveRunCounter (P45-T6). + * + * Queries `agent_runs` GROUP BY node_name on each call so any number of + * control-plane replicas see the same load truth. Backed by a partial + * index (`agent_runs(node_name, status) WHERE status IN ...`) provisioned + * by migration `004-active-run-index.sql` to keep the query cheap. + */ + +import type { IActiveRunCounter } from "@mandarnilange/agentforge-core/domain/ports/active-run-counter.port.js"; +import pg from "pg"; + +const ACTIVE_STATUSES = ["pending", "scheduled", "running"] as const; + +export class DbActiveRunCounter implements IActiveRunCounter { + private readonly pool: pg.Pool; + + constructor(connectionString: string) { + this.pool = new pg.Pool({ connectionString }); + } + + async count(nodeName: string): Promise { + // Build the IN clause from ACTIVE_STATUSES so the constant can't drift + // out of sync with the SQL. status = ANY($2::text[]) is equivalent to + // IN (...) but takes the array as a single bound parameter. + const { rows } = await this.pool.query( + `SELECT count(*)::int AS count + FROM agent_runs + WHERE node_name = $1 + AND status = ANY($2::text[])`, + [nodeName, [...ACTIVE_STATUSES]], + ); + return Number(rows[0]?.count ?? 0); + } + + // recordStarted/recordCompleted are intentionally no-ops: createAgentRun + // already inserts a row with status='pending', and updateAgentRun flips + // it through scheduled→running→succeeded/failed. The DB is the source of + // truth — the in-memory map this adapter replaces is gone. + recordStarted(_nodeName: string): Promise { + return Promise.resolve(); + } + + recordCompleted(_nodeName: string): Promise { + return Promise.resolve(); + } + + async close(): Promise { + await this.pool.end(); + } + + /** Statuses considered "active" for load accounting. Exported for visibility. */ + static readonly ACTIVE_STATUSES: readonly string[] = ACTIVE_STATUSES; +} diff --git a/packages/platform/src/cli/commands/node-start.ts b/packages/platform/src/cli/commands/node-start.ts index 6af9cdb..358e7de 100644 --- a/packages/platform/src/cli/commands/node-start.ts +++ b/packages/platform/src/cli/commands/node-start.ts @@ -20,6 +20,10 @@ import type { } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; import chalk from "chalk"; import type { Command } from "commander"; +import { + checkDockerAvailability, + filterDockerCapability, +} from "../../nodes/docker-availability.js"; interface NodeStartOptions { controlPlaneUrl: string; @@ -206,9 +210,27 @@ export function registerNodeStartCommand(program: Command): void { .option("--host ", "HTTP server host", "0.0.0.0") .action(async (opts: NodeStartOptions) => { const nodeName = opts.name ?? `node-${process.pid}`; - const capabilities = (opts.capabilities ?? "llm-access") + // Capability matching is canonicalised to lowercase so a user + // passing `--capabilities Docker,GPU` still triggers the docker + // preflight and the scheduler's `nodeAffinity.required: [docker]` + // matches as expected. + const declaredCapabilities = (opts.capabilities ?? "llm-access") .split(",") - .map((c) => c.trim()); + .map((c) => c.trim().toLowerCase()) + .filter((c) => c.length > 0); + // Docker preflight (P40-T5): verify the daemon is actually reachable + // before we register a node that claims docker capability. On failure + // we strip "docker" from the effective list and log a warning so the + // scheduler does not route docker-required jobs to a node that would + // immediately fail at execute time. + const dockerOk = declaredCapabilities.includes("docker") + ? await checkDockerAvailability() + : true; + const capabilities = filterDockerCapability( + declaredCapabilities, + dockerOk, + (msg) => console.warn(chalk.yellow(msg)), + ); const maxConcurrentRuns = Number.parseInt( opts.maxConcurrentRuns ?? "3", 10, diff --git a/packages/platform/src/nodes/docker-availability.ts b/packages/platform/src/nodes/docker-availability.ts new file mode 100644 index 0000000..ad9e19e --- /dev/null +++ b/packages/platform/src/nodes/docker-availability.ts @@ -0,0 +1,116 @@ +/** + * Docker socket preflight (P40-T5). + * + * When a node advertises the "docker" capability we probe the local docker + * daemon at registration time. Unavailable → warn once and strip "docker" + * from the effective capability list so the node still registers (other + * capabilities remain) but the scheduler does not route docker-required + * jobs to a node that would immediately fail. + * + * The probe is injectable so callers can swap in a real dockerode ping or + * a unix-socket connect; default uses a lightweight `/var/run/docker.sock` + * (or DOCKER_HOST) connect attempt. + */ + +import { connect } from "node:net"; + +export interface DockerAvailabilityOptions { + /** Custom probe — returns true when docker is reachable. */ + probe?: () => Promise; + /** Override the unix socket path (default `/var/run/docker.sock`). */ + socketPath?: string; + /** Override DOCKER_HOST (e.g. tcp://host:port); takes priority over socket. */ + dockerHost?: string; + /** Probe timeout in ms (default 1500). */ + timeoutMs?: number; +} + +const DEFAULT_SOCKET = "/var/run/docker.sock"; +const DEFAULT_TIMEOUT_MS = 1500; + +export async function checkDockerAvailability( + opts: DockerAvailabilityOptions = {}, +): Promise { + const probe = opts.probe ?? defaultProbe(opts); + try { + return await probe(); + } catch { + return false; + } +} + +/** + * Removes "docker" from `capabilities` when the daemon is unreachable. + * Logs a single warning explaining the downgrade. Idempotent and safe when + * "docker" is not present. + */ +export function filterDockerCapability( + capabilities: readonly string[], + dockerAvailable: boolean, + warn: (msg: string) => void, +): string[] { + if (!capabilities.includes("docker")) return [...capabilities]; + if (dockerAvailable) return [...capabilities]; + + warn( + "docker capability declared but the docker daemon is unavailable — " + + "removing 'docker' from this node's effective capabilities. " + + "Set DOCKER_HOST or start the daemon to restore docker scheduling.", + ); + return capabilities.filter((c) => c !== "docker"); +} + +function defaultProbe(opts: DockerAvailabilityOptions): () => Promise { + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const dockerHost = opts.dockerHost ?? process.env.DOCKER_HOST; + const socketPath = opts.socketPath ?? DEFAULT_SOCKET; + + // URL parser handles IPv6 brackets, ignores path segments, and + // normalizes ports. Falls back to the unix socket for any non-tcp + // scheme (unix://, ssh://, npipe://) — those need their own runtimes + // which dockerode handles, but for a *reachability* probe the unix + // socket is the right default on every host this code runs on. + const tcpTarget = parseTcpDockerHost(dockerHost); + + return () => + new Promise((resolve) => { + const socket = tcpTarget + ? connect({ + host: tcpTarget.host, + port: tcpTarget.port, + timeout: timeoutMs, + }) + : connect({ path: socketPath, timeout: timeoutMs }); + + // Persistent no-op error sink. After cleanup() destroys the + // socket, the OS may emit a *second* "error" (e.g. the TCP + // RST + ETIMEDOUT race CR flagged). The once("error", ...) + // handler below has already consumed the first; without this + // permanent listener the second event would surface as an + // unhandled error and crash the process at startup. + socket.on("error", () => {}); + + const cleanup = (ok: boolean) => { + socket.destroy(); + resolve(ok); + }; + socket.once("connect", () => cleanup(true)); + socket.once("error", () => cleanup(false)); + socket.once("timeout", () => cleanup(false)); + }); +} + +function parseTcpDockerHost( + dockerHost: string | undefined, +): { host: string; port: number } | null { + if (!dockerHost?.startsWith("tcp://")) return null; + try { + const url = new URL(dockerHost); + const port = Number(url.port); + if (!Number.isFinite(port) || port <= 0) return null; + // url.hostname strips IPv6 brackets, which is what `connect()` wants. + return { host: url.hostname, port }; + } catch { + return null; + } +} diff --git a/packages/platform/src/nodes/ssh-preflight.ts b/packages/platform/src/nodes/ssh-preflight.ts new file mode 100644 index 0000000..e0598f6 --- /dev/null +++ b/packages/platform/src/nodes/ssh-preflight.ts @@ -0,0 +1,55 @@ +/** + * SSH preflight validation (P40-T6). + * + * On platform startup, every node whose connection.type is "ssh" gets a + * lightweight TCP/SSH reach check. Unreachable nodes are marked offline in + * the registry and surfaced as a startup warning so operators learn about + * misconfigured hosts immediately rather than via a cryptic dispatch failure + * minutes later. + * + * The actual probe is delegated to the runtime's `ping()` (already + * implemented in SshNodeRuntime) so the policy here is just: who do we + * probe, what do we do with the result, and how do we report it. + */ + +import type { INodeRuntime } from "@mandarnilange/agentforge-core/domain/ports/node-runtime.port.js"; + +export interface SshPreflightOptions { + runtimes: readonly INodeRuntime[]; + warn: (msg: string) => void; + markOffline: (nodeName: string) => void; +} + +export interface SshPreflightResult { + name: string; + reachable: boolean; +} + +export async function validateSshNodesAtStartup( + opts: SshPreflightOptions, +): Promise { + const sshRuntimes = opts.runtimes.filter( + (r) => r.nodeDefinition.spec.connection?.type === "ssh", + ); + const checks = await Promise.all( + sshRuntimes.map(async (rt) => { + const name = rt.nodeDefinition.metadata.name; + const host = rt.nodeDefinition.spec.connection?.host ?? ""; + let reachable = false; + try { + reachable = await rt.ping(); + } catch { + reachable = false; + } + if (!reachable) { + opts.markOffline(name); + opts.warn( + `SSH node "${name}" at ${host} is unreachable — marking offline. ` + + "Verify host, port, key, and connection.user before scheduling.", + ); + } + return { name, reachable }; + }), + ); + return checks; +} diff --git a/packages/platform/src/platform-cli.ts b/packages/platform/src/platform-cli.ts index e60d915..9fc0c50 100644 --- a/packages/platform/src/platform-cli.ts +++ b/packages/platform/src/platform-cli.ts @@ -8,7 +8,6 @@ import { mkdirSync, readFileSync } from "node:fs"; import { join, resolve } from "node:path"; import { fileURLToPath } from "node:url"; -import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; import { setRuntimeDefinitionStore } from "@mandarnilange/agentforge-core/agents/definition-source.js"; import { registerDashboardCommand } from "@mandarnilange/agentforge-core/cli/commands/dashboard.js"; import { registerExecCommand } from "@mandarnilange/agentforge-core/cli/commands/exec.js"; @@ -36,6 +35,8 @@ import { setDiscoveredSchemas } from "@mandarnilange/agentforge-core/schemas/ind import { buildSchemaValidators } from "@mandarnilange/agentforge-core/schemas/schema-discovery.js"; import { SqliteStateStore } from "@mandarnilange/agentforge-core/state/store.js"; import { Command } from "commander"; +import { buildEventBus } from "./adapters/events/event-bus-factory.js"; +import { PostgresEventBus } from "./adapters/events/postgres-event-bus.js"; import { PgDefinitionStore } from "./adapters/store/pg-definition-store.js"; import { SqliteDefinitionStore } from "./adapters/store/sqlite-definition-store.js"; import type { DefinitionPersistSink } from "./cli/commands/apply.js"; @@ -47,6 +48,7 @@ import { PipelineRateLimiter } from "./control-plane/rate-limiter.js"; import { NodeHealthChecker } from "./nodes/health-check.js"; import { LocalNodeRuntime } from "./nodes/local-runtime.js"; import { NodeRegistry } from "./nodes/registry.js"; +import { validateSshNodesAtStartup } from "./nodes/ssh-preflight.js"; import { SshNodeRuntime } from "./nodes/ssh-runtime.js"; import { flushTelemetry, initTelemetry } from "./observability/init.js"; import { @@ -405,7 +407,21 @@ const pipelineController = new PipelineController( agentExecutor, ); -const eventBus = new InMemoryEventBus(); +// P45-T3: pluggable event bus. AGENTFORGE_EVENT_BUS=postgres + an +// AGENTFORGE_POSTGRES_URL switches to LISTEN/NOTIFY so multiple control-plane +// replicas see the same SSE stream. Default stays in-memory for the +// single-process case. +const eventBus = buildEventBus({ postgresUrl: POSTGRES_URL }); +if (eventBus instanceof PostgresEventBus) { + try { + await eventBus.start(); + } catch (err) { + console.error( + `Failed to start Postgres event bus: ${err instanceof Error ? err.message : String(err)}`, + ); + process.exit(1); + } +} // Crash recovery const _recoveryService = new PipelineRecoveryService(stateStore, eventBus, { @@ -440,6 +456,16 @@ const nodeHealthChecker = new NodeHealthChecker( nodeRuntimes, metrics, ); +// SSH preflight (P40-T6): warn early on unreachable ssh hosts so operators +// don't discover the failure via a cryptic dispatch error minutes later. +// Awaited before checkAll() so the general health-check loop doesn't race +// with preflight and re-mark a node "online" between the preflight +// rejection and the registry write. +await validateSshNodesAtStartup({ + runtimes: nodeRuntimes, + warn: (msg) => console.warn(msg), + markOffline: (name) => nodeRegistry.markOffline(name), +}); void nodeHealthChecker.checkAll(); // Init and templates don't need the DI container — register with platform templates merged in. @@ -541,6 +567,13 @@ registerNodeStartCommand(program); void program.parseAsync().then(async () => { if (pgRefreshInterval) clearInterval(pgRefreshInterval); + if (eventBus instanceof PostgresEventBus) { + try { + await eventBus.close(); + } catch { + // Best-effort: connection may already be torn down by the OS. + } + } await stateStore.close(); if (sqliteDefinitionStore) sqliteDefinitionStore.close(); if (pgDefinitionStore) await pgDefinitionStore.close(); diff --git a/packages/platform/src/state/migrations/postgres/003-job-queue.sql b/packages/platform/src/state/migrations/postgres/003-job-queue.sql new file mode 100644 index 0000000..5e9ce38 --- /dev/null +++ b/packages/platform/src/state/migrations/postgres/003-job-queue.sql @@ -0,0 +1,24 @@ +-- P45-T4: DB-backed agent job queue with claim semantics. +-- Replaces per-replica in-memory pendingRunQueues map. Workers claim rows +-- atomically via FOR UPDATE SKIP LOCKED so multiple control-plane replicas +-- can dispatch without lost or duplicate work. + +CREATE TABLE IF NOT EXISTS agent_jobs ( + run_id TEXT PRIMARY KEY, + node_name TEXT NOT NULL, + payload JSONB NOT NULL, + enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(), + claimed_by TEXT, + claimed_at TIMESTAMPTZ, + claim_ttl_ms INTEGER +); + +-- Hot path: claim() filters by node_name + (claimed_by IS NULL). +CREATE INDEX IF NOT EXISTS idx_agent_jobs_pending + ON agent_jobs (node_name, enqueued_at) + WHERE claimed_by IS NULL; + +-- Sweep path: reclaimStale() filters claimed rows by claimed_at. +CREATE INDEX IF NOT EXISTS idx_agent_jobs_claimed_at + ON agent_jobs (claimed_at) + WHERE claimed_at IS NOT NULL; diff --git a/packages/platform/src/state/migrations/postgres/004-active-run-index.sql b/packages/platform/src/state/migrations/postgres/004-active-run-index.sql new file mode 100644 index 0000000..5612bac --- /dev/null +++ b/packages/platform/src/state/migrations/postgres/004-active-run-index.sql @@ -0,0 +1,7 @@ +-- P45-T6: partial index for the per-node active-run count query used by +-- the stateless scheduler. Filters on the small "active" subset so every +-- scheduling decision is O(active rows for node), not O(all agent runs). + +CREATE INDEX IF NOT EXISTS idx_agent_runs_active_by_node + ON agent_runs (node_name) + WHERE status IN ('pending', 'scheduled', 'running'); diff --git a/packages/platform/tests/adapters/events/event-bus-factory.test.ts b/packages/platform/tests/adapters/events/event-bus-factory.test.ts new file mode 100644 index 0000000..3a56edb --- /dev/null +++ b/packages/platform/tests/adapters/events/event-bus-factory.test.ts @@ -0,0 +1,72 @@ +/** + * Tests for buildEventBus — selects the IEventBus adapter based on + * AGENTFORGE_EVENT_BUS env var (P45-T3). + */ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("pg", () => { + class MockClient { + query = vi.fn().mockResolvedValue({ rows: [] }); + on = vi.fn(); + connect = vi.fn().mockResolvedValue(undefined); + end = vi.fn().mockResolvedValue(undefined); + } + class MockPool { + query = vi.fn().mockResolvedValue({ rows: [] }); + end = vi.fn().mockResolvedValue(undefined); + } + return { default: { Client: MockClient, Pool: MockPool } }; +}); + +import { InMemoryEventBus } from "@mandarnilange/agentforge-core/adapters/events/in-memory-event-bus.js"; +import { buildEventBus } from "../../../src/adapters/events/event-bus-factory.js"; +import { PostgresEventBus } from "../../../src/adapters/events/postgres-event-bus.js"; + +const ORIGINAL_ENV = process.env.AGENTFORGE_EVENT_BUS; + +describe("buildEventBus", () => { + beforeEach(() => { + delete process.env.AGENTFORGE_EVENT_BUS; + }); + afterEach(() => { + if (ORIGINAL_ENV === undefined) delete process.env.AGENTFORGE_EVENT_BUS; + else process.env.AGENTFORGE_EVENT_BUS = ORIGINAL_ENV; + }); + + it("returns InMemoryEventBus by default", () => { + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + }); + + it("returns InMemoryEventBus when AGENTFORGE_EVENT_BUS=memory", () => { + process.env.AGENTFORGE_EVENT_BUS = "memory"; + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + }); + + it("returns PostgresEventBus when AGENTFORGE_EVENT_BUS=postgres and url is set", () => { + process.env.AGENTFORGE_EVENT_BUS = "postgres"; + const bus = buildEventBus({ postgresUrl: "postgresql://localhost/test" }); + expect(bus).toBeInstanceOf(PostgresEventBus); + }); + + it("falls back to InMemoryEventBus when AGENTFORGE_EVENT_BUS=postgres but no url", () => { + process.env.AGENTFORGE_EVENT_BUS = "postgres"; + const warn = vi.fn(); + const bus = buildEventBus({ postgresUrl: undefined, warn }); + expect(bus).toBeInstanceOf(InMemoryEventBus); + // Operator must see the misconfiguration — silent fallback would + // hide a broken multi-replica deployment. + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/AGENTFORGE_EVENT_BUS=postgres.*falling back/i), + ); + }); + + it("rejects unknown event-bus types with a clear error", () => { + process.env.AGENTFORGE_EVENT_BUS = "kafka"; + expect(() => + buildEventBus({ postgresUrl: "postgresql://localhost/test" }), + ).toThrow(/AGENTFORGE_EVENT_BUS.*kafka/); + }); +}); diff --git a/packages/platform/tests/adapters/events/postgres-event-bus.test.ts b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts new file mode 100644 index 0000000..0d30f6b --- /dev/null +++ b/packages/platform/tests/adapters/events/postgres-event-bus.test.ts @@ -0,0 +1,204 @@ +/** + * Tests for PostgresEventBus — LISTEN/NOTIFY-backed IEventBus (P45-T3). + * + * Verifies the contract: + * - emit() issues `NOTIFY , ` so peer replicas + * subscribed to the same channel receive the event. + * - subscribe() registers a notification handler and returns an + * unsubscribe function. + * - The same JSON shape that peers receive round-trips through + * in-process subscribers (so dashboards in the emitting process + * don't have to wait for the LISTEN round-trip). + */ +import type { + IEventBus, + PipelineEvent, +} from "@mandarnilange/agentforge-core/domain/ports/event-bus.port.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { + mockClientQuery, + mockClientOn, + mockConnect, + mockEnd, + mockPoolQuery, + mockPoolEnd, +} = vi.hoisted(() => ({ + mockClientQuery: vi.fn().mockResolvedValue({ rows: [] }), + mockClientOn: vi.fn(), + mockConnect: vi.fn(), + mockEnd: vi.fn().mockResolvedValue(undefined), + mockPoolQuery: vi.fn().mockResolvedValue({ rows: [] }), + mockPoolEnd: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("pg", () => { + class MockClient { + query = mockClientQuery; + on = mockClientOn; + connect = mockConnect.mockResolvedValue(undefined); + end = mockEnd; + } + class MockPool { + query = mockPoolQuery; + end = mockPoolEnd; + } + return { default: { Client: MockClient, Pool: MockPool } }; +}); + +import { PostgresEventBus } from "../../../src/adapters/events/postgres-event-bus.js"; + +describe("PostgresEventBus", () => { + let bus: IEventBus & PostgresEventBus; + + beforeEach(async () => { + vi.clearAllMocks(); + bus = new PostgresEventBus("postgresql://localhost/test", "agentforge"); + await bus.start(); + }); + + it("LISTENs on the configured channel after start", () => { + const sql = mockClientQuery.mock.calls.map((c) => c[0]).join("\n"); + expect(sql).toMatch(/^LISTEN/im); + expect(sql).toMatch(/agentforge/); + }); + + it("emit issues NOTIFY on a separate pool (listener client stays idle)", async () => { + const event: PipelineEvent = { + type: "run_updated", + runId: "r1", + status: "succeeded", + }; + bus.emit(event); + await new Promise((r) => setImmediate(r)); + // pg_notify must go through the notify pool, not the LISTEN client. + const notifyCall = mockPoolQuery.mock.calls.find((c) => + (c[0] as string).startsWith("SELECT pg_notify"), + ); + expect(notifyCall).toBeDefined(); + const onListenerCall = mockClientQuery.mock.calls.find((c) => + (c[0] as string).startsWith("SELECT pg_notify"), + ); + expect(onListenerCall).toBeUndefined(); + const params = notifyCall?.[1] as unknown[]; + const payload = JSON.parse(params[1] as string); + expect(payload.type).toBe("run_updated"); + expect(payload.runId).toBe("r1"); + }); + + it("delivers locally-emitted events to in-process subscribers", () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + bus.emit({ type: "node_online", nodeName: "alpha" }); + expect(received).toHaveLength(1); + expect(received[0]).toMatchObject({ + type: "node_online", + nodeName: "alpha", + }); + }); + + it("delivers events received via NOTIFY to subscribers", async () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + // Replay the notification handler bound to client.on('notification', ...) + const handler = mockClientOn.mock.calls.find( + (c) => c[0] === "notification", + )?.[1] as (msg: { payload: string }) => void; + expect(handler).toBeDefined(); + handler({ + payload: JSON.stringify({ + type: "node_offline", + nodeName: "beta", + }), + }); + expect(received).toEqual([{ type: "node_offline", nodeName: "beta" }]); + }); + + it("subscribe returns an unsubscribe function", () => { + const received: PipelineEvent[] = []; + const unsub = bus.subscribe((e) => received.push(e)); + bus.emit({ type: "node_online", nodeName: "alpha" }); + unsub(); + bus.emit({ type: "node_online", nodeName: "beta" }); + expect(received).toHaveLength(1); + }); + + it("ignores malformed payloads from the LISTEN channel", () => { + const received: PipelineEvent[] = []; + bus.subscribe((e) => received.push(e)); + const handler = mockClientOn.mock.calls.find( + (c) => c[0] === "notification", + )?.[1] as (msg: { payload: string }) => void; + handler({ payload: "{ this is not json" }); + expect(received).toHaveLength(0); + }); + + it("rejects channel names with non-identifier characters at construct", () => { + // Identifier interpolation in LISTEN/UNLISTEN — guard at construct + // time so unsafe channel names never reach the SQL layer. + expect( + () => + new ( + PostgresEventBus as unknown as new ( + url: string, + channel: string, + ) => unknown + )("postgresql://localhost/test", "agent;DROP TABLE users;--"), + ).toThrow(/channel name/i); + }); + + it("close() ends both the listener client and the notify pool", async () => { + await bus.close(); + expect(mockEnd).toHaveBeenCalled(); + expect(mockPoolEnd).toHaveBeenCalled(); + }); + + it("start() is idempotent (no second LISTEN on repeat call)", async () => { + const listenCallsBefore = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ).length; + await bus.start(); + const listenCallsAfter = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ).length; + expect(listenCallsAfter).toBe(listenCallsBefore); + }); + + it("start() recovers from a failed LISTEN by tearing down the connect and allowing retry", async () => { + // Fresh bus that hasn't started yet. + vi.clearAllMocks(); + const fresh = new PostgresEventBus( + "postgresql://localhost/test", + "agentforge", + ) as PostgresEventBus & { start: () => Promise }; + // First start: connect succeeds, LISTEN fails. + mockClientQuery.mockRejectedValueOnce(new Error("LISTEN failed")); + await expect(fresh.start()).rejects.toThrow(/LISTEN failed/); + // Retry: connect must NOT be called twice on the same client (would + // throw "Client is already connected"). Implementation should either + // recreate the client or skip connect on retry. + mockClientQuery.mockResolvedValueOnce({ rows: [] }); + mockConnect.mockClear(); + await fresh.start(); + // Second start either re-connects on a fresh client or no-ops connect; + // either way LISTEN must have been issued again. + const listenCalls = mockClientQuery.mock.calls.filter((c) => + (c[0] as string).startsWith("LISTEN"), + ); + expect(listenCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("close() ends notify pool even when client.end() rejects", async () => { + vi.clearAllMocks(); + const fresh = new PostgresEventBus( + "postgresql://localhost/test", + "agentforge", + ); + mockClientQuery.mockResolvedValueOnce({ rows: [] }); // LISTEN + await fresh.start(); + mockClientQuery.mockResolvedValueOnce({ rows: [] }); // UNLISTEN + mockEnd.mockRejectedValueOnce(new Error("client end failed")); + await fresh.close(); + expect(mockPoolEnd).toHaveBeenCalled(); + }); +}); diff --git a/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts new file mode 100644 index 0000000..c572479 --- /dev/null +++ b/packages/platform/tests/adapters/jobs/postgres-job-queue.test.ts @@ -0,0 +1,158 @@ +/** + * Tests for PostgresJobQueue using a mocked pg.Pool (P45-T4). + * + * Verifies the SQL contract: enqueue inserts into agent_jobs, claim uses + * `FOR UPDATE SKIP LOCKED` so concurrent replicas never see the same row, + * complete deletes by runId, reclaimStale releases claims older than the + * threshold. + */ +import type { AgentJob } from "@mandarnilange/agentforge-core/domain/ports/agent-executor.port.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })); + +vi.mock("pg", () => { + class MockPool { + query = mockQuery; + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { PostgresJobQueue } from "../../../src/adapters/jobs/postgres-job-queue.js"; + +function job(runId: string): AgentJob { + return { + runId, + agentId: "analyst", + agentDefinition: { metadata: { name: "analyst" }, spec: {} }, + model: { + provider: "anthropic", + name: "claude-sonnet", + maxTokens: 4096, + }, + workdir: "/tmp/work", + outputDir: "/tmp/out", + } as unknown as AgentJob; +} + +describe("PostgresJobQueue", () => { + let queue: PostgresJobQueue; + beforeEach(() => { + vi.clearAllMocks(); + queue = new PostgresJobQueue("postgresql://localhost/test"); + }); + + describe("enqueue()", () => { + it("inserts a row with run_id, node_name and serialized payload", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.enqueue(job("r1"), "worker-a"); + const sql = mockQuery.mock.calls[0][0] as string; + const params = mockQuery.mock.calls[0][1] as unknown[]; + expect(sql).toMatch(/INSERT INTO agent_jobs/); + expect(params[0]).toBe("r1"); + expect(params[1]).toBe("worker-a"); + // payload should be JSON-serialized + expect(JSON.parse(params[2] as string).runId).toBe("r1"); + }); + }); + + describe("claim()", () => { + it("uses FOR UPDATE SKIP LOCKED for atomic claim across replicas", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.claim("worker-a"); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/FOR UPDATE SKIP LOCKED/i); + }); + + it("returns deserialized AgentJob payloads", async () => { + mockQuery.mockResolvedValueOnce({ + rows: [ + { run_id: "r1", payload: JSON.stringify(job("r1")) }, + { run_id: "r2", payload: JSON.stringify(job("r2")) }, + ], + }); + const claimed = await queue.claim("worker-a", { limit: 5 }); + expect(claimed.map((j) => j.runId)).toEqual(["r1", "r2"]); + }); + + it("skips rows with corrupted payloads instead of failing the whole claim", async () => { + mockQuery.mockResolvedValueOnce({ + rows: [ + { run_id: "ok", payload: JSON.stringify(job("ok")) }, + { run_id: "bad", payload: "{ not valid json" }, + ], + }); + // 2nd query: the cleanup DELETE for the poisoned row. + mockQuery.mockResolvedValueOnce({ rowCount: 1, rows: [] }); + const claimed = await queue.claim("worker-a", { limit: 5 }); + expect(claimed.map((j) => j.runId)).toEqual(["ok"]); + }); + + it("DELETEs corrupted rows so reclaimStale can't recycle them forever", async () => { + // Without delete: the row stays claimed_by=worker, JSON.parse keeps + // throwing, and reclaimStale eventually frees it for another doomed + // claim — head-of-line poison forever. + mockQuery.mockResolvedValueOnce({ + rows: [{ run_id: "bad", payload: "not json" }], + }); + mockQuery.mockResolvedValueOnce({ rowCount: 1, rows: [] }); + await queue.claim("worker-a"); + const deleteCall = mockQuery.mock.calls.find( + (c) => + typeof c[0] === "string" && + (c[0] as string).match(/DELETE FROM agent_jobs/i), + ); + expect(deleteCall).toBeDefined(); + expect(deleteCall?.[1]).toEqual([["bad"]]); + }); + + it("filters by node_name and pending claim", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.claim("worker-a", { limit: 3 }); + const params = mockQuery.mock.calls[0][1] as unknown[]; + // worker name and limit should be bound parameters + expect(params).toContain("worker-a"); + expect(params).toContain(3); + }); + }); + + describe("complete()", () => { + it("DELETEs the row", async () => { + mockQuery.mockResolvedValueOnce({ rows: [] }); + await queue.complete("r1"); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/DELETE FROM agent_jobs/); + expect(mockQuery.mock.calls[0][1]).toEqual(["r1"]); + }); + }); + + describe("reclaimStale()", () => { + it("clears claimed_by/claimed_at when claim age exceeds threshold", async () => { + mockQuery.mockResolvedValueOnce({ rowCount: 2, rows: [] }); + const released = await queue.reclaimStale(60_000); + expect(released).toBe(2); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/UPDATE agent_jobs/); + expect(sql).toMatch(/claimed_by\s*=\s*NULL/i); + expect(sql).toMatch(/claimed_at\s*=\s*NULL/i); + }); + + it("uses per-job claim_ttl_ms when set, falling back to maxAgeMs", async () => { + mockQuery.mockResolvedValueOnce({ rowCount: 0, rows: [] }); + await queue.reclaimStale(60_000); + const sql = mockQuery.mock.calls[0][0] as string; + // SQL should COALESCE per-job ttl with the parameter so each row + // is checked against its own deadline. + expect(sql).toMatch(/COALESCE\(claim_ttl_ms/i); + }); + }); + + describe("depth()", () => { + it("counts rows for a node regardless of claim state", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "3" }] }); + const d = await queue.depth("worker-a"); + expect(d).toBe(3); + }); + }); +}); diff --git a/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts new file mode 100644 index 0000000..205609d --- /dev/null +++ b/packages/platform/tests/adapters/leader/postgres-leader-elector.test.ts @@ -0,0 +1,114 @@ +/** + * Tests for PostgresLeaderElector using a mocked pg client (P45-T5). + * + * Uses `pg_try_advisory_lock()` over a single dedicated client so + * the lock survives only as long as that session — replica crash → lock + * auto-released → another replica acquires immediately. + */ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockClientQuery, mockConnect, mockRelease } = vi.hoisted(() => ({ + mockClientQuery: vi.fn(), + mockConnect: vi.fn(), + mockRelease: vi.fn(), +})); + +vi.mock("pg", () => { + class MockPool { + connect = mockConnect.mockImplementation(async () => ({ + query: mockClientQuery, + release: mockRelease, + })); + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { PostgresLeaderElector } from "../../../src/adapters/leader/postgres-leader-elector.js"; + +describe("PostgresLeaderElector", () => { + let elector: PostgresLeaderElector; + + beforeEach(() => { + vi.clearAllMocks(); + elector = new PostgresLeaderElector("postgresql://localhost/test"); + }); + + it("acquire calls pg_try_advisory_lock and returns true on success", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + expect(await elector.acquire("agentforge-reconciler")).toBe(true); + const sql = mockClientQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/pg_try_advisory_lock/i); + }); + + it("acquire returns false when another holder owns the lock", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: false }], + }); + expect(await elector.acquire("agentforge-reconciler")).toBe(false); + }); + + it("isLeader reflects last successful acquire", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + expect(elector.isLeader("name-a")).toBe(true); + expect(elector.isLeader("name-b")).toBe(false); + }); + + it("release calls pg_advisory_unlock", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_advisory_unlock: true }], + }); + await elector.release("name-a"); + expect(elector.isLeader("name-a")).toBe(false); + const lastSql = mockClientQuery.mock.calls.at(-1)?.[0] as string; + expect(lastSql).toMatch(/pg_advisory_unlock/i); + }); + + it("destroys the connection (release(true)) when acquire query throws", async () => { + mockClientQuery.mockRejectedValueOnce(new Error("connection refused")); + await expect(elector.acquire("name-a")).rejects.toThrow(); + // Connection in unknown state must be evicted from the pool — passing + // no arg risks returning a session-scoped lock holder back into circulation. + expect(mockRelease).toHaveBeenCalledWith(true); + }); + + it("destroys the connection (release(true)) when pg_advisory_unlock throws", async () => { + mockClientQuery.mockResolvedValueOnce({ + rows: [{ pg_try_advisory_lock: true }], + }); + await elector.acquire("name-a"); + mockClientQuery.mockRejectedValueOnce(new Error("server crashed")); + // Even if the unlock RPC failed, we must not return the still-locked + // session back to the pool — split-brain risk. + await expect(elector.release("name-a")).rejects.toThrow(); + expect(mockRelease).toHaveBeenCalledWith(true); + expect(elector.isLeader("name-a")).toBe(false); + }); + + it("hashes lock names to a stable bigint to avoid collisions", async () => { + // Two distinct names produce two distinct lock ids; reusing the same + // name produces the same id (release + re-acquire round-trip). + mockClientQuery.mockResolvedValue({ + rows: [{ pg_try_advisory_lock: true, pg_advisory_unlock: true }], + }); + await elector.acquire("agentforge-reconciler"); + const firstParams = mockClientQuery.mock.calls[0][1] as unknown[]; + await elector.release("agentforge-reconciler"); + await elector.acquire("agentforge-reconciler"); + const reAcquireParams = mockClientQuery.mock.calls.at(-1)?.[1] as unknown[]; + expect(reAcquireParams[0]).toBe(firstParams[0]); + + await elector.acquire("agentforge-scheduler"); + const schedulerParams = mockClientQuery.mock.calls.at(-1)?.[1] as unknown[]; + expect(schedulerParams[0]).not.toBe(firstParams[0]); + }); +}); diff --git a/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts new file mode 100644 index 0000000..9a6be57 --- /dev/null +++ b/packages/platform/tests/adapters/scheduling/db-active-run-counter.test.ts @@ -0,0 +1,61 @@ +/** + * Tests for DbActiveRunCounter — queries agent_runs to compute live load + * across replicas (P45-T6). + */ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockQuery } = vi.hoisted(() => ({ mockQuery: vi.fn() })); + +vi.mock("pg", () => { + class MockPool { + query = mockQuery; + end = vi.fn(); + } + return { default: { Pool: MockPool } }; +}); + +import { DbActiveRunCounter } from "../../../src/adapters/scheduling/db-active-run-counter.js"; + +describe("DbActiveRunCounter", () => { + let counter: DbActiveRunCounter; + + beforeEach(() => { + vi.clearAllMocks(); + counter = new DbActiveRunCounter("postgresql://localhost/test"); + }); + + it("count() queries agent_runs filtered by node and active status", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "3" }] }); + const n = await counter.count("worker-a"); + expect(n).toBe(3); + const sql = mockQuery.mock.calls[0][0] as string; + expect(sql).toMatch(/FROM agent_runs/); + expect(sql).toMatch(/node_name\s*=\s*\$1/); + expect(sql).toMatch(/status\s*=\s*ANY/i); + const params = mockQuery.mock.calls[0][1] as unknown[]; + expect(params).toContain("worker-a"); + }); + + it("count() passes ACTIVE_STATUSES as a single array parameter", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ count: "0" }] }); + await counter.count("worker-a"); + const params = mockQuery.mock.calls[0][1] as unknown[]; + const arrayParam = params.find((p) => Array.isArray(p)) as string[]; + expect(arrayParam).toEqual( + expect.arrayContaining(["pending", "scheduled", "running"]), + ); + }); + + it("count() returns 0 when no rows match (exercises the rows[0] fallback)", async () => { + // Empty result set is the actual no-row case; the previous + // [{count: "0"}] mock skipped the fallback path entirely. + mockQuery.mockResolvedValueOnce({ rows: [] }); + expect(await counter.count("worker-a")).toBe(0); + }); + + it("recordStarted/recordCompleted are no-ops (DB is the source of truth)", async () => { + await counter.recordStarted("worker-a"); + await counter.recordCompleted("worker-a"); + expect(mockQuery).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/platform/tests/nodes/docker-availability.test.ts b/packages/platform/tests/nodes/docker-availability.test.ts new file mode 100644 index 0000000..918e259 --- /dev/null +++ b/packages/platform/tests/nodes/docker-availability.test.ts @@ -0,0 +1,67 @@ +/** + * Tests for docker availability preflight (P40-T5). + * + * checkDockerAvailability() probes the docker socket; filterDockerCapability() + * strips "docker" from the effective capability list when the socket is + * unreachable so the node still registers, but with truthful capabilities. + */ +import { describe, expect, it, vi } from "vitest"; +import { + checkDockerAvailability, + filterDockerCapability, +} from "../../src/nodes/docker-availability.js"; + +describe("checkDockerAvailability()", () => { + it("returns true when the docker probe succeeds", async () => { + const probe = vi.fn().mockResolvedValue(true); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(true); + expect(probe).toHaveBeenCalledOnce(); + }); + + it("returns false when the docker probe rejects", async () => { + const probe = vi.fn().mockRejectedValue(new Error("ENOENT")); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(false); + }); + + it("returns false when the docker probe resolves false (timeout / 5xx)", async () => { + const probe = vi.fn().mockResolvedValue(false); + const ok = await checkDockerAvailability({ probe }); + expect(ok).toBe(false); + }); +}); + +describe("filterDockerCapability()", () => { + it("returns capabilities unchanged when docker is available", () => { + const warn = vi.fn(); + const out = filterDockerCapability( + ["llm-access", "docker", "gpu"], + true, + warn, + ); + expect(out).toEqual(["llm-access", "docker", "gpu"]); + expect(warn).not.toHaveBeenCalled(); + }); + + it("removes docker and warns once when docker is unavailable", () => { + const warn = vi.fn(); + const out = filterDockerCapability( + ["llm-access", "docker", "gpu"], + false, + warn, + ); + expect(out).toEqual(["llm-access", "gpu"]); + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/docker.*unavailable/i), + ); + }); + + it("is a no-op when docker is not in the capability list", () => { + const warn = vi.fn(); + const out = filterDockerCapability(["llm-access", "gpu"], false, warn); + expect(out).toEqual(["llm-access", "gpu"]); + expect(warn).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/platform/tests/nodes/ssh-preflight.test.ts b/packages/platform/tests/nodes/ssh-preflight.test.ts new file mode 100644 index 0000000..8c77cd9 --- /dev/null +++ b/packages/platform/tests/nodes/ssh-preflight.test.ts @@ -0,0 +1,121 @@ +/** + * Tests for SSH node preflight (P40-T6). + * + * On platform startup, ssh-typed node definitions get a lightweight reach + * check via runtime.ping(). Unreachable nodes log a warning and are marked + * offline in the registry instead of silently appearing healthy until the + * first scheduled job fails. + */ +import type { NodeDefinitionYaml } from "@mandarnilange/agentforge-core/definitions/parser.js"; +import type { INodeRuntime } from "@mandarnilange/agentforge-core/domain/ports/node-runtime.port.js"; +import { describe, expect, it, vi } from "vitest"; +import { validateSshNodesAtStartup } from "../../src/nodes/ssh-preflight.js"; + +function sshDef(name: string, host = "host.example"): NodeDefinitionYaml { + return { + apiVersion: "agentforge.dev/v1", + kind: "NodeDefinition", + metadata: { name, type: "remote" }, + spec: { + capabilities: ["llm-access"], + connection: { type: "ssh", host, user: "agent" }, + }, + } as NodeDefinitionYaml; +} + +function localDef(name: string): NodeDefinitionYaml { + return { + apiVersion: "agentforge.dev/v1", + kind: "NodeDefinition", + metadata: { name, type: "local" }, + spec: { capabilities: ["llm-access"] }, + } as NodeDefinitionYaml; +} + +function fakeRuntime( + def: NodeDefinitionYaml, + pingResult: boolean, +): INodeRuntime { + return { + nodeDefinition: def, + ping: vi.fn().mockResolvedValue(pingResult), + execute: vi.fn(), + } as unknown as INodeRuntime; +} + +describe("validateSshNodesAtStartup()", () => { + it("only pings ssh-typed nodes (skips local)", async () => { + const localRt = fakeRuntime(localDef("alpha"), true); + const sshRt = fakeRuntime(sshDef("beta"), true); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [localRt, sshRt], + warn, + markOffline, + }); + expect(localRt.ping).not.toHaveBeenCalled(); + expect(sshRt.ping).toHaveBeenCalledOnce(); + }); + + it("marks unreachable ssh nodes offline and warns", async () => { + const sshRt = fakeRuntime(sshDef("beta", "10.0.0.99"), false); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(markOffline).toHaveBeenCalledWith("beta"); + expect(warn).toHaveBeenCalledOnce(); + expect(warn).toHaveBeenCalledWith( + expect.stringMatching(/ssh.*beta.*unreachable/i), + ); + }); + + it("does not warn or mark offline when ssh is reachable", async () => { + const sshRt = fakeRuntime(sshDef("beta"), true); + const warn = vi.fn(); + const markOffline = vi.fn(); + await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(markOffline).not.toHaveBeenCalled(); + expect(warn).not.toHaveBeenCalled(); + }); + + it("returns a structured report of node names and reachability", async () => { + const ok = fakeRuntime(sshDef("ok"), true); + const bad = fakeRuntime(sshDef("bad"), false); + const result = await validateSshNodesAtStartup({ + runtimes: [ok, bad], + warn: vi.fn(), + markOffline: vi.fn(), + }); + expect(result).toEqual([ + { name: "ok", reachable: true }, + { name: "bad", reachable: false }, + ]); + }); + + it("treats ping rejection as unreachable rather than throwing", async () => { + const sshRt = { + nodeDefinition: sshDef("beta"), + ping: vi.fn().mockRejectedValue(new Error("auth failed")), + execute: vi.fn(), + } as unknown as INodeRuntime; + const warn = vi.fn(); + const markOffline = vi.fn(); + const result = await validateSshNodesAtStartup({ + runtimes: [sshRt], + warn, + markOffline, + }); + expect(result).toEqual([{ name: "beta", reachable: false }]); + expect(markOffline).toHaveBeenCalledWith("beta"); + expect(warn).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/platform/tests/state/pg-store.test.ts b/packages/platform/tests/state/pg-store.test.ts index fcbe60e..b5a5c6d 100644 --- a/packages/platform/tests/state/pg-store.test.ts +++ b/packages/platform/tests/state/pg-store.test.ts @@ -144,6 +144,21 @@ describe("PostgresStateStore", () => { }); }); + describe("preflight()", () => { + it("issues SELECT 1 to verify connectivity", async () => { + mockQuery.mockResolvedValueOnce({ rows: [{ "?column?": 1 }] }); + await store.preflight(); + expect(mockQuery).toHaveBeenCalledWith("SELECT 1"); + }); + + it("wraps pg errors with a friendly message that hints at the env var", async () => { + mockQuery.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:5432")); + await expect(store.preflight()).rejects.toThrow( + /preflight failed.*AGENTFORGE_POSTGRES_URL/i, + ); + }); + }); + describe("createPipelineRun()", () => { it("inserts a pipeline run and returns it", async () => { mockQuery.mockResolvedValueOnce({ rows: [] });