diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b08ed4..69bd925 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.3.1 (2026-04-21) + +- fix: concurrent `agentcli exec` calls on the same gated task no longer double-consume a single approval. A new `claimApproval` primitive performs find-then-consume atomically under an fs-lockfile at `~/.agentcli/state/approvals.ndjson.lock` (openSync `wx` + `Atomics.wait` backoff); stale locks (older than 30s) are broken; `approval_lock_timeout` is raised after 5s of contention +- `enforceApprovalGate` in `src/exec.js` now calls `claimApproval` instead of the separate `findValidApproval` + `consumeApproval` pair; signature verification still runs after the atomic claim and a bad signature refuses execution (the grant is already consumed and its use is audited) +- `claimApproval` added to the barrel export (`src/index.js`) +- 4 new concurrency tests: worker-thread race (N=8 parallel claims, exactly 1 winner), distinct-grant parallelism, stale-lock recovery, lock-timeout behavior + ## 0.3.0 (2026-04-21) - local approval gate enforcement in `agentcli exec`: tasks with `approval.policy: "manual"` refuse to execute unless a matching, unconsumed, unrevoked, unexpired approval record is present (`error_type: approval_required`) diff --git a/package.json b/package.json index e5f569b..9ea170b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@amittell/agentcli", - "version": "0.3.0", + "version": "0.3.1", "description": "Agent-native workflow manifest CLI, schema, and protocol surface for standalone planning and backend compilation.", "type": "module", "main": "./src/index.js", diff --git a/src/approvals.js b/src/approvals.js index 5362bab..f5cb268 100644 --- a/src/approvals.js +++ b/src/approvals.js @@ -1,19 +1,81 @@ -import { appendFileSync, readFileSync, existsSync, mkdirSync } from 'node:fs'; +import { + appendFileSync, readFileSync, existsSync, mkdirSync, + openSync, closeSync, writeSync, unlinkSync, statSync, +} from 'node:fs'; import { dirname } from 'node:path'; import { createHash, randomBytes } from 'node:crypto'; import { getProvider, resolveProvider } from './signing/index.js'; import { resolveAllowedSigners, generateAllowedSigners } from './signing/ssh.js'; import { getAgentcliPaths } from './home.js'; -// Concurrency note: grantApproval, findValidApproval, and consumeApproval read -// and append to an NDJSON log without advisory locking. Two concurrent -// `agentcli exec` invocations of the same gated task can both observe the -// same pending grant and both consume it. This is an accepted limitation of -// the local single-machine enforcement model; workflows that require -// multi-dispatcher approval coordination should use openclaw-scheduler. +// Concurrency: `claimApproval` is the atomic public primitive that +// enforceApprovalGate uses. It acquires an fs-lock on .lock +// (openSync 'wx'), re-reads the log inside the critical section, finds a +// matching pending grant, appends a consume event, and releases the lock. +// Concurrent claims of the same grant serialize to exactly one winner; +// losers re-read and either find a different pending grant or throw +// approval_required. Locks older than LOCK_STALE_MS are treated as +// abandoned (crashed holder) and removed. const APPROVAL_RECORD_VERSION = 1; const DEFAULT_TTL_S = 3600; +const LOCK_SUFFIX = '.lock'; +const LOCK_TIMEOUT_MS = 5000; +const LOCK_STALE_MS = 30000; +const LOCK_POLL_MS = 25; + +// Shared memory used for sync sleep during lock backoff. Allocated once per +// process rather than per-retry. +const LOCK_SLEEP_BUF = new Int32Array(new SharedArrayBuffer(4)); + +function sleepSync(ms) { + Atomics.wait(LOCK_SLEEP_BUF, 0, 0, ms); +} + +function withApprovalsLock(approvalsPath, fn, { + timeoutMs = LOCK_TIMEOUT_MS, + staleMs = LOCK_STALE_MS, + pollMs = LOCK_POLL_MS, + now = () => Date.now(), +} = {}) { + mkdirSync(dirname(approvalsPath), { recursive: true }); + const lockPath = `${approvalsPath}${LOCK_SUFFIX}`; + const deadline = now() + timeoutMs; + let fd; + while (true) { + try { + fd = openSync(lockPath, 'wx'); + writeSync(fd, `${process.pid}\n`); + break; + } catch (err) { + if (err.code !== 'EEXIST') throw err; + // Lock held. Check staleness and potentially break it. + try { + const st = statSync(lockPath); + if (now() - st.mtimeMs > staleMs) { + try { unlinkSync(lockPath); } catch { /* someone else cleaned up */ } + continue; + } + } catch { + // Lock vanished between EEXIST and stat; retry immediately. + continue; + } + if (now() >= deadline) { + throw Object.assign( + new Error(`Timed out acquiring approvals lock at ${lockPath} after ${timeoutMs}ms`), + { code: 'approval_lock_timeout' } + ); + } + sleepSync(pollMs); + } + } + try { + closeSync(fd); + return fn(); + } finally { + try { unlinkSync(lockPath); } catch { /* already removed */ } + } +} export function approvalPolicyRequiresApproval(approval) { if (!approval) return false; @@ -162,6 +224,41 @@ export function findValidApproval({ return candidates[0]; } +// Atomically find a matching pending grant and mark it consumed. +// Returns the consumed grant object, or null if no match. Concurrent callers +// serialize on the approvals lockfile; at most one wins per grant. +export function claimApproval({ + workflowId, + taskId, + taskHash, + approvalId, + executionId, + env = process.env, + now = () => Date.now(), + lockOptions, +}) { + const paths = getAgentcliPaths({ env }); + return withApprovalsLock(paths.approvals, () => { + const grant = findValidApproval({ + workflowId, taskId, taskHash, approvalId, env, now: now(), + }); + if (!grant) return null; + const consumedAt = new Date(now()).toISOString(); + appendFileSync( + paths.approvals, + JSON.stringify({ + v: APPROVAL_RECORD_VERSION, + kind: 'consume', + approval_id: grant.approval_id, + execution_id: executionId, + consumed_at: consumedAt, + }) + '\n', + 'utf8' + ); + return grant; + }, lockOptions); +} + function buildApprovalSignaturePayload(grant) { return canonicalStringify({ v: APPROVAL_RECORD_VERSION, diff --git a/src/describe.js b/src/describe.js index 77e9f48..74bc3d2 100644 --- a/src/describe.js +++ b/src/describe.js @@ -76,7 +76,7 @@ export const COMMAND_DESCRIPTIONS = [ }, { command: 'approve', - summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task.' + summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task. Concurrent execs serialize on an fs-lockfile so at most one consumer wins per grant.' }, { command: 'approvals', diff --git a/src/exec.js b/src/exec.js index 40d1156..f032704 100644 --- a/src/exec.js +++ b/src/exec.js @@ -28,8 +28,7 @@ import { approvalPolicyRequiresApproval, approvalPolicyAutoRejects, computeTaskApprovalHash, - findValidApproval, - consumeApproval, + claimApproval, verifyApprovalSignature, } from './approvals.js'; @@ -468,11 +467,12 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { } if (!approvalPolicyRequiresApproval(task.approval)) return null; const taskHash = computeTaskApprovalHash({ workflowId: workflow.id, task }); - const grant = findValidApproval({ + const grant = claimApproval({ workflowId: workflow.id, taskId: task.id, taskHash, approvalId, + executionId, env, }); if (!grant) { @@ -489,6 +489,9 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { } const sigCheck = verifyApprovalSignature(grant, { env }); if (sigCheck.verified === false) { + // We have already consumed the grant atomically; a bad signature means + // the record is corrupt or forged. Refuse execution and surface the + // consumption in error context so an operator can audit. throw Object.assign( new Error( `Approval ${grant.approval_id} signature verification failed: ${sigCheck.reason || 'invalid signature'}. ` + @@ -497,7 +500,6 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { { code: 'approval_signature_invalid' } ); } - consumeApproval({ approvalId: grant.approval_id, executionId, env }); return { approval_id: grant.approval_id, task_hash: grant.task_hash, diff --git a/src/index.js b/src/index.js index 844f7ec..2cd5a64 100644 --- a/src/index.js +++ b/src/index.js @@ -35,6 +35,7 @@ export { listApprovals, findValidApproval, consumeApproval, + claimApproval, revokeApproval, computeTaskApprovalHash, approvalPolicyRequiresApproval, diff --git a/test/approvals.test.js b/test/approvals.test.js index dc914ee..12d6726 100644 --- a/test/approvals.test.js +++ b/test/approvals.test.js @@ -1,8 +1,9 @@ import test from 'node:test'; import assert from 'node:assert/strict'; -import { mkdtempSync, readFileSync, appendFileSync, existsSync, rmSync } from 'node:fs'; +import { mkdtempSync, readFileSync, writeFileSync, appendFileSync, existsSync, rmSync, mkdirSync, utimesSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; +import { Worker } from 'node:worker_threads'; import { grantApproval, @@ -10,6 +11,7 @@ import { findValidApproval, consumeApproval, revokeApproval, + claimApproval, computeTaskApprovalHash, approvalPolicyRequiresApproval, approvalPolicyAutoRejects, @@ -525,3 +527,166 @@ test('ssh-signed grant: round-trip with allowed_signers auto-bootstrap', async ( rmSync(home, { recursive: true, force: true }); } }); + +// --- Concurrency: claimApproval is atomic across workers --- + +const CLAIM_WORKER_SOURCE = ` +import { parentPort, workerData } from 'node:worker_threads'; +import { claimApproval } from '${new URL('../src/approvals.js', import.meta.url).href}'; + +const { workflowId, taskId, taskHash, env, executionId, approvalId } = workerData; +try { + const grant = claimApproval({ + workflowId, taskId, taskHash, + approvalId: approvalId || undefined, + executionId, + env, + }); + parentPort.postMessage({ ok: true, grant }); +} catch (err) { + parentPort.postMessage({ ok: false, error: err.message, code: err.code }); +} +`; + +function runClaimWorker({ workflowId, taskId, taskHash, env, executionId, approvalId }) { + return new Promise((resolve, reject) => { + const worker = new Worker(CLAIM_WORKER_SOURCE, { + eval: true, + workerData: { workflowId, taskId, taskHash, env, executionId, approvalId }, + }); + worker.once('message', resolve); + worker.once('error', reject); + }); +} + +test('concurrency: N parallel claims on one grant serialize to exactly one winner', async () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } }); + const task = m.workflows[0].tasks[0]; + const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task }); + const rec = grantApproval({ + manifest: m, + taskId: 'echo-task', + approver: 'alice', + env, + }); + + const N = 8; + const workerArgs = Array.from({ length: N }, (_, i) => ({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash, + env, + executionId: `exec-${i}`, + approvalId: rec.approval_id, + })); + const results = await Promise.all(workerArgs.map(runClaimWorker)); + + const winners = results.filter(r => r.ok && r.grant); + const empties = results.filter(r => r.ok && !r.grant); + const errors = results.filter(r => !r.ok); + + assert.equal(winners.length, 1, `expected exactly one winner, got ${winners.length}`); + assert.equal(empties.length, N - 1, 'losers should receive null, not errors'); + assert.equal(errors.length, 0, `no worker should error: ${JSON.stringify(errors)}`); + assert.equal(winners[0].grant.approval_id, rec.approval_id); + + // Only one consume event was written + const list = listApprovals({ env }); + assert.equal(list.length, 1); + assert.equal(list[0].status, 'consumed'); + } finally { + cleanup(); + } +}); + +test('concurrency: two pending grants + two concurrent claims → both succeed with distinct grants', async () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } }); + const task = m.workflows[0].tasks[0]; + const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task }); + const a = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + const b = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'bob', env }); + + const workerArgs = [ + { workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-1' }, + { workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-2' }, + ]; + const results = await Promise.all(workerArgs.map(runClaimWorker)); + const winners = results.filter(r => r.ok && r.grant); + assert.equal(winners.length, 2, 'both claims should succeed because two grants exist'); + const ids = new Set(winners.map(w => w.grant.approval_id)); + assert.equal(ids.size, 2, 'winners should hold distinct grants'); + assert.ok(ids.has(a.approval_id)); + assert.ok(ids.has(b.approval_id)); + } finally { + cleanup(); + } +}); + +test('concurrency: stale lock is broken and claim proceeds', () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } }); + const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + const taskHash = rec.task_hash; + + // Plant a lock file dated far in the past to simulate a crashed holder. + const paths = getAgentcliPaths({ env }); + const lockPath = `${paths.approvals}.lock`; + mkdirSync(paths.state, { recursive: true }); + writeFileSync(lockPath, 'stale pid\n', 'utf8'); + const hourAgo = (Date.now() - 60 * 60 * 1000) / 1000; + utimesSync(lockPath, hourAgo, hourAgo); + + // Claim should detect staleness, break the lock, and succeed. + const grant = claimApproval({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash, + executionId: 'exec-stale', + env, + }); + assert.ok(grant, 'claim should succeed after breaking stale lock'); + assert.equal(grant.approval_id, rec.approval_id); + + // Lock file cleaned up after claim + assert.equal(existsSync(lockPath), false); + } finally { + cleanup(); + } +}); + +test('concurrency: lock held past timeout throws approval_lock_timeout', () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } }); + const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + + // Plant a fresh lock file so the claim cannot acquire it. + const paths = getAgentcliPaths({ env }); + const lockPath = `${paths.approvals}.lock`; + mkdirSync(paths.state, { recursive: true }); + writeFileSync(lockPath, 'another pid\n', 'utf8'); + // Leave mtime fresh (now) so staleness check doesn't fire. + + assert.throws( + () => claimApproval({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash: rec.task_hash, + executionId: 'exec-blocked', + env, + lockOptions: { timeoutMs: 100, staleMs: 60000, pollMs: 20 }, + }), + err => err.code === 'approval_lock_timeout' + ); + + // Clean up the planted lock + rmSync(lockPath, { force: true }); + } finally { + cleanup(); + } +});