Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions packages/core/src/adapters/jobs/in-memory-job-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* InMemoryJobQueue — single-process IJobQueue implementation (P45-T4).
*
* Mirrors the contract the PostgresJobQueue must honour so the same wiring
* works with a swap of adapter via `AGENTFORGE_JOB_QUEUE=postgres|memory`.
* Not safe across processes — for multi-replica control planes use the
* Postgres adapter.
*/

import type { AgentJob } from "../../domain/ports/agent-executor.port.js";
import type {
ClaimOptions,
IJobQueue,
} from "../../domain/ports/job-queue.port.js";

interface QueueEntry {
job: AgentJob;
nodeName: string;
claimedBy?: string;
claimedAt?: number;
ttlMs?: number;
}

export interface InMemoryJobQueueOptions {
now?: () => number;
}

export class InMemoryJobQueue implements IJobQueue {
private readonly entries = new Map<string, QueueEntry>();
private now: () => number;

constructor(opts: InMemoryJobQueueOptions = {}) {
this.now = opts.now ?? (() => Date.now());
}

/** Test-only — override the clock without leaking the field publicly. */
_setNow(t: number): void {
this.now = () => t;
}

enqueue(job: AgentJob, nodeName: string): Promise<void> {
// Match Postgres' `ON CONFLICT DO NOTHING`: a duplicate enqueue keeps
// the original entry (and any in-flight claim metadata) instead of
// silently overwriting it.
if (!this.entries.has(job.runId)) {
this.entries.set(job.runId, { job, nodeName });
}
return Promise.resolve();
}

claim(nodeName: string, opts: ClaimOptions = {}): Promise<AgentJob[]> {
const limit = opts.limit ?? 1;
const ttlMs = opts.ttlMs ?? 5 * 60 * 1000;
const now = this.now();
const claimed: AgentJob[] = [];
// Iterate insertion order — Map preserves it. Pick up to `limit`
// pending entries for this node and atomically mark them claimed.
for (const entry of this.entries.values()) {
if (claimed.length >= limit) break;
if (entry.nodeName !== nodeName) continue;
if (entry.claimedBy !== undefined) continue;
entry.claimedBy = nodeName;
entry.claimedAt = now;
entry.ttlMs = ttlMs;
claimed.push(entry.job);
}
return Promise.resolve(claimed);
}

complete(runId: string): Promise<void> {
this.entries.delete(runId);
return Promise.resolve();
}

reclaimStale(maxAgeMs: number): Promise<number> {
const now = this.now();
let count = 0;
for (const entry of this.entries.values()) {
if (entry.claimedBy === undefined || entry.claimedAt === undefined) {
continue;
}
// Per-job TTL wins when set — claim() records the worker's
// declared trust horizon, and that's the right age for *this*
// job. The maxAgeMs argument is the global fallback for jobs
// claimed before per-job TTLs were tracked.
const age = now - entry.claimedAt;
const threshold = entry.ttlMs ?? maxAgeMs;
if (age >= threshold) {
entry.claimedBy = undefined;
entry.claimedAt = undefined;
entry.ttlMs = undefined;
count++;
}
}
return Promise.resolve(count);
}

depth(nodeName: string): Promise<number> {
let count = 0;
for (const entry of this.entries.values()) {
if (entry.nodeName === nodeName) count++;
}
return Promise.resolve(count);
}
}
30 changes: 30 additions & 0 deletions packages/core/src/adapters/leader/local-leader-elector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* LocalLeaderElector — single-process leader (P45-T5).
* Acquire always succeeds; release flips state. Use for single-replica
* deployments or when no shared store is available.
*/

import type { ILeaderElector } from "../../domain/ports/leader-elector.port.js";

export class LocalLeaderElector implements ILeaderElector {
private readonly held = new Set<string>();

acquire(lockName: string): Promise<boolean> {
// Mutual exclusion: only the *first* caller for a given name wins.
// Without this, two runWhenLeader instances in the same process (e.g.
// reconciler + scheduler that share a lock by mistake) would both
// believe they are leader, defeating the contract.
if (this.held.has(lockName)) return Promise.resolve(false);
this.held.add(lockName);
return Promise.resolve(true);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

release(lockName: string): Promise<void> {
this.held.delete(lockName);
return Promise.resolve();
}

isLeader(lockName: string): boolean {
return this.held.has(lockName);
}
}
70 changes: 70 additions & 0 deletions packages/core/src/control-plane/leader-gated-loop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* runWhenLeader — wraps a singleton interval loop in leader election (P45-T5).
*
* Each tick: try to acquire the lock if we don't already hold it; if we
* become (or remain) leader, run `body`. Otherwise skip and let another
* replica do the work. The released-lock case is automatic on process
* exit (Postgres advisory locks live with the session) — failover is
* picked up at the next tick.
*/

import type { ILeaderElector } from "../domain/ports/leader-elector.port.js";

export function runWhenLeader(
elector: ILeaderElector,
lockName: string,
body: () => Promise<void>,
intervalMs: number,
): () => void {
let stopped = false;
// Reentrancy guard: setInterval fires the callback regardless of whether
// the previous async invocation has resolved. Without this flag, a body
// that runs longer than `intervalMs` (e.g. a slow reconciler scan) would
// have N ticks all in flight at once, each holding leader rights.
let inFlight = false;

const tick = async (): Promise<void> => {
if (stopped || inFlight) return;
let leader = elector.isLeader(lockName);
if (!leader) {
leader = await elector.acquire(lockName);
}
if (!leader) return;
inFlight = true;
try {
await body();
} catch (err) {
// Surface the failure but keep the interval running — a transient
// error must not permanently disable the singleton loop.
console.error(
`runWhenLeader: body for lock "${lockName}" threw: ${
err instanceof Error ? err.message : String(err)
}`,
);
} finally {
inFlight = false;
}
};

const handle = setInterval(() => {
// `tick` is async; attach .catch to absorb any rejection from the
// outer pre-body code (acquire/isLeader). Body errors are already
// caught inside tick — this is just a belt-and-braces guard against
// unhandled rejections at the timer boundary.
tick().catch((err) => {
console.error(
`runWhenLeader: pre-body error for lock "${lockName}": ${
err instanceof Error ? err.message : String(err)
}`,
);
});
}, intervalMs);

return () => {
stopped = true;
clearInterval(handle);
// Best-effort release; if the elector has not held the lock this
// is a no-op.
void elector.release(lockName);
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
2 changes: 1 addition & 1 deletion packages/core/src/control-plane/pipeline-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ export class PipelineController {
let selectedNode: (typeof nodePool)[0] | null = null;
if (agentDef && nodePool.length > 0) {
try {
selectedNode = this.scheduler.schedule(agentDef, nodePool);
selectedNode = await this.scheduler.schedule(agentDef, nodePool);
} catch {
// No node satisfies requirements — fall back to local
}
Expand Down
106 changes: 87 additions & 19 deletions packages/core/src/control-plane/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
/**
* LocalAgentScheduler — schedules agents to local nodes.
* Respects nodeAffinity requirements and maxConcurrentRuns limits.
* Future phases will add remote node scheduling.
* LocalAgentScheduler — schedules agents to nodes (P45-T6).
*
* Counts are read via an injected IActiveRunCounter so the scheduler is
* stateless across control-plane replicas. The default in-memory counter
* preserves single-process semantics; the platform supplies a DB-backed
* counter that queries `agent_runs` so two replicas never disagree about
* a node's load.
*/

import type {
AgentDefinitionYaml,
NodeDefinitionYaml,
} from "../definitions/parser.js";
import type { IActiveRunCounter } from "../domain/ports/active-run-counter.port.js";

export interface INodeRegistry {
get(name: string): { status: string } | undefined;
Expand All @@ -19,33 +24,60 @@ export interface IAgentScheduler {
schedule(
agent: AgentDefinitionYaml,
nodePool: NodeDefinitionYaml[],
): NodeDefinitionYaml;
): Promise<NodeDefinitionYaml>;
recordRunStarted(nodeName: string): void;
recordRunCompleted(nodeName: string): void;
getActiveRunCount(nodeName: string): number;
getActiveRunCount(nodeName: string): Promise<number>;
}

export interface LocalSchedulerOptions {
/**
* Counter for active runs. If omitted, an in-process counter backs the
* historical in-memory Map<string, number>. Multi-replica deployments
* inject a DB-backed counter so all replicas share one truth.
*/
counter?: IActiveRunCounter;
}

export class LocalAgentScheduler implements IAgentScheduler {
private readonly activeRuns = new Map<string, number>();
private readonly registry?: INodeRegistry;
private readonly counter: IActiveRunCounter;

constructor(registry?: INodeRegistry) {
constructor(registry?: INodeRegistry, opts: LocalSchedulerOptions = {}) {
this.registry = registry;
this.counter = opts.counter ?? new InMemoryActiveRunCounter();
}

schedule(
async schedule(
agent: AgentDefinitionYaml,
nodePool: NodeDefinitionYaml[],
): NodeDefinitionYaml {
): Promise<NodeDefinitionYaml> {
const required =
agent.spec.nodeAffinity?.required?.map((r) => r.capability) ?? [];
const preferred =
agent.spec.nodeAffinity?.preferred?.map((p) => p.capability) ?? [];

const candidates = nodePool.filter((node) => {
// Read every candidate's load from the counter — fresh on each call
// so a peer replica's recent dispatch is reflected immediately.
// allSettled (not all) so a single hung/failed counter call only
// disqualifies that node instead of blocking every dispatch.
const settled = await Promise.allSettled(
nodePool.map((n) => this.counter.count(n.metadata.name)),
);
const counts = settled.map((s, i) => {
if (s.status === "fulfilled") return s.value;
console.warn(
`Scheduler: counter.count("${nodePool[i].metadata.name}") failed — treating as full. ${
s.reason instanceof Error ? s.reason.message : String(s.reason)
}`,
);
return Number.POSITIVE_INFINITY;
});

const candidates = nodePool.filter((node, i) => {
const caps = node.spec.capabilities;
const maxRuns = node.spec.resources?.maxConcurrentRuns ?? Infinity;
const active = this.getActiveRunCount(node.metadata.name);
const active = counts[i];
const registration = this.registry?.get(node.metadata.name);
const isOffline = registration?.status === "offline";
return (
Expand All @@ -69,23 +101,59 @@ export class LocalAgentScheduler implements IAgentScheduler {
}));

scored.sort((a, b) => b.score - a.score);
const selected = scored[0].node;

return selected;
return scored[0].node;
}

recordRunStarted(nodeName: string): void {
this.activeRuns.set(nodeName, (this.activeRuns.get(nodeName) ?? 0) + 1);
// Fire-and-forget: counter.recordStarted is a no-op for the
// stateless DB adapter, but a future adapter may have side effects.
// Surface failures so they don't disappear.
this.counter.recordStarted(nodeName).catch((err) => {
console.warn(
`Scheduler: counter.recordStarted("${nodeName}") failed: ${
err instanceof Error ? err.message : String(err)
}`,
);
});
this.registry?.recordRunStarted(nodeName);
}

recordRunCompleted(nodeName: string): void {
const current = this.activeRuns.get(nodeName) ?? 0;
this.activeRuns.set(nodeName, Math.max(0, current - 1));
this.counter.recordCompleted(nodeName).catch((err) => {
console.warn(
`Scheduler: counter.recordCompleted("${nodeName}") failed: ${
err instanceof Error ? err.message : String(err)
}`,
);
});
this.registry?.recordRunCompleted(nodeName);
}

getActiveRunCount(nodeName: string): number {
return this.activeRuns.get(nodeName) ?? 0;
getActiveRunCount(nodeName: string): Promise<number> {
return this.counter.count(nodeName);
}
}

/**
* In-process counter — preserves the historical Map<string, number>
* behaviour for callers that don't pass a counter explicitly. Not safe
* across processes.
*/
export class InMemoryActiveRunCounter implements IActiveRunCounter {
private readonly counts = new Map<string, number>();

count(nodeName: string): Promise<number> {
return Promise.resolve(this.counts.get(nodeName) ?? 0);
}

recordStarted(nodeName: string): Promise<void> {
this.counts.set(nodeName, (this.counts.get(nodeName) ?? 0) + 1);
return Promise.resolve();
}

recordCompleted(nodeName: string): Promise<void> {
const current = this.counts.get(nodeName) ?? 0;
this.counts.set(nodeName, Math.max(0, current - 1));
return Promise.resolve();
}
}
21 changes: 21 additions & 0 deletions packages/core/src/domain/ports/active-run-counter.port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* IActiveRunCounter — pluggable per-node active-run counter (P45-T6).
*
* The scheduler reads counts via this port before every dispatch decision.
*
* - MemoryActiveRunCounter: single-process default, backs the in-memory map.
* - DbActiveRunCounter: queries `agent_runs WHERE status IN ('running','scheduled')
* GROUP BY node_name` so multiple control-plane replicas see the same
* truth and can never over-schedule a node beyond its maxConcurrentRuns.
*/

export interface IActiveRunCounter {
/** Active-run count for `nodeName`. Always async to allow DB-backed impls. */
count(nodeName: string): Promise<number>;

/** Record that a run has started — may be a no-op for stateless impls. */
recordStarted(nodeName: string): Promise<void>;

/** Record that a run has completed — may be a no-op for stateless impls. */
recordCompleted(nodeName: string): Promise<void>;
}
Loading