From fd566e0b388eae6715a47c1b42190177cdbad28e Mon Sep 17 00:00:00 2001 From: amittell Date: Tue, 21 Apr 2026 18:04:19 -0400 Subject: [PATCH] feat(approvals): atomic claim to fix concurrent-exec race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two concurrent agentcli exec calls on the same gated task could both observe the same pending grant and both append consume events, allowing both to execute from one approval. Fixed by introducing claimApproval, a single atomic primitive that performs find-then-consume under an fs-lockfile. Lock implementation (no new deps): - openSync(lockPath, 'wx') for atomic creation - Atomics.wait on SharedArrayBuffer for sync backoff during contention - Stale locks (older than 30s) are treated as abandoned and broken - approval_lock_timeout (default 5s) surfaces genuine contention enforceApprovalGate in src/exec.js now calls claimApproval instead of findValidApproval + consumeApproval separately. Signature verification still runs after the atomic claim; a bad signature refuses execution with approval_signature_invalid (the consumed grant is audited). Tests added (4): - N=8 worker threads racing on one grant → exactly 1 winner - 2 pending grants + 2 concurrent claims → both succeed, distinct grants - Stale lock (hour-old mtime) is broken and claim proceeds - Fresh lock held past timeout raises approval_lock_timeout claimApproval added to the barrel export. Previously-exported consumeApproval remains for callers that consume without claiming; approvals.js module comment updated to describe the new atomicity. Release cut to 0.3.1 (bug fix, no schema or CLI surface changes). --- CHANGELOG.md | 7 ++ package.json | 2 +- src/approvals.js | 111 +++++++++++++++++++++++++-- src/describe.js | 2 +- src/exec.js | 10 ++- src/index.js | 1 + test/approvals.test.js | 167 ++++++++++++++++++++++++++++++++++++++++- 7 files changed, 286 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b08ed4..69bd925 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.3.1 (2026-04-21) + +- fix: concurrent `agentcli exec` calls on the same gated task no longer double-consume a single approval. A new `claimApproval` primitive performs find-then-consume atomically under an fs-lockfile at `~/.agentcli/state/approvals.ndjson.lock` (openSync `wx` + `Atomics.wait` backoff); stale locks (older than 30s) are broken; `approval_lock_timeout` is raised after 5s of contention +- `enforceApprovalGate` in `src/exec.js` now calls `claimApproval` instead of the separate `findValidApproval` + `consumeApproval` pair; signature verification still runs after the atomic claim and a bad signature refuses execution (the grant is already consumed and its use is audited) +- `claimApproval` added to the barrel export (`src/index.js`) +- 4 new concurrency tests: worker-thread race (N=8 parallel claims, exactly 1 winner), distinct-grant parallelism, stale-lock recovery, lock-timeout behavior + ## 0.3.0 (2026-04-21) - local approval gate enforcement in `agentcli exec`: tasks with `approval.policy: "manual"` refuse to execute unless a matching, unconsumed, unrevoked, unexpired approval record is present (`error_type: approval_required`) diff --git a/package.json b/package.json index e5f569b..9ea170b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@amittell/agentcli", - "version": "0.3.0", + "version": "0.3.1", "description": "Agent-native workflow manifest CLI, schema, and protocol surface for standalone planning and backend compilation.", "type": "module", "main": "./src/index.js", diff --git a/src/approvals.js b/src/approvals.js index 5362bab..f5cb268 100644 --- a/src/approvals.js +++ b/src/approvals.js @@ -1,19 +1,81 @@ -import { appendFileSync, readFileSync, existsSync, mkdirSync } from 'node:fs'; +import { + appendFileSync, readFileSync, existsSync, mkdirSync, + openSync, closeSync, writeSync, unlinkSync, statSync, +} from 'node:fs'; import { dirname } from 'node:path'; import { createHash, randomBytes } from 'node:crypto'; import { getProvider, resolveProvider } from './signing/index.js'; import { resolveAllowedSigners, generateAllowedSigners } from './signing/ssh.js'; import { getAgentcliPaths } from './home.js'; -// Concurrency note: grantApproval, findValidApproval, and consumeApproval read -// and append to an NDJSON log without advisory locking. Two concurrent -// `agentcli exec` invocations of the same gated task can both observe the -// same pending grant and both consume it. This is an accepted limitation of -// the local single-machine enforcement model; workflows that require -// multi-dispatcher approval coordination should use openclaw-scheduler. +// Concurrency: `claimApproval` is the atomic public primitive that +// enforceApprovalGate uses. It acquires an fs-lock on .lock +// (openSync 'wx'), re-reads the log inside the critical section, finds a +// matching pending grant, appends a consume event, and releases the lock. +// Concurrent claims of the same grant serialize to exactly one winner; +// losers re-read and either find a different pending grant or throw +// approval_required. Locks older than LOCK_STALE_MS are treated as +// abandoned (crashed holder) and removed. const APPROVAL_RECORD_VERSION = 1; const DEFAULT_TTL_S = 3600; +const LOCK_SUFFIX = '.lock'; +const LOCK_TIMEOUT_MS = 5000; +const LOCK_STALE_MS = 30000; +const LOCK_POLL_MS = 25; + +// Shared memory used for sync sleep during lock backoff. Allocated once per +// process rather than per-retry. +const LOCK_SLEEP_BUF = new Int32Array(new SharedArrayBuffer(4)); + +function sleepSync(ms) { + Atomics.wait(LOCK_SLEEP_BUF, 0, 0, ms); +} + +function withApprovalsLock(approvalsPath, fn, { + timeoutMs = LOCK_TIMEOUT_MS, + staleMs = LOCK_STALE_MS, + pollMs = LOCK_POLL_MS, + now = () => Date.now(), +} = {}) { + mkdirSync(dirname(approvalsPath), { recursive: true }); + const lockPath = `${approvalsPath}${LOCK_SUFFIX}`; + const deadline = now() + timeoutMs; + let fd; + while (true) { + try { + fd = openSync(lockPath, 'wx'); + writeSync(fd, `${process.pid}\n`); + break; + } catch (err) { + if (err.code !== 'EEXIST') throw err; + // Lock held. Check staleness and potentially break it. + try { + const st = statSync(lockPath); + if (now() - st.mtimeMs > staleMs) { + try { unlinkSync(lockPath); } catch { /* someone else cleaned up */ } + continue; + } + } catch { + // Lock vanished between EEXIST and stat; retry immediately. + continue; + } + if (now() >= deadline) { + throw Object.assign( + new Error(`Timed out acquiring approvals lock at ${lockPath} after ${timeoutMs}ms`), + { code: 'approval_lock_timeout' } + ); + } + sleepSync(pollMs); + } + } + try { + closeSync(fd); + return fn(); + } finally { + try { unlinkSync(lockPath); } catch { /* already removed */ } + } +} export function approvalPolicyRequiresApproval(approval) { if (!approval) return false; @@ -162,6 +224,41 @@ export function findValidApproval({ return candidates[0]; } +// Atomically find a matching pending grant and mark it consumed. +// Returns the consumed grant object, or null if no match. Concurrent callers +// serialize on the approvals lockfile; at most one wins per grant. +export function claimApproval({ + workflowId, + taskId, + taskHash, + approvalId, + executionId, + env = process.env, + now = () => Date.now(), + lockOptions, +}) { + const paths = getAgentcliPaths({ env }); + return withApprovalsLock(paths.approvals, () => { + const grant = findValidApproval({ + workflowId, taskId, taskHash, approvalId, env, now: now(), + }); + if (!grant) return null; + const consumedAt = new Date(now()).toISOString(); + appendFileSync( + paths.approvals, + JSON.stringify({ + v: APPROVAL_RECORD_VERSION, + kind: 'consume', + approval_id: grant.approval_id, + execution_id: executionId, + consumed_at: consumedAt, + }) + '\n', + 'utf8' + ); + return grant; + }, lockOptions); +} + function buildApprovalSignaturePayload(grant) { return canonicalStringify({ v: APPROVAL_RECORD_VERSION, diff --git a/src/describe.js b/src/describe.js index 77e9f48..74bc3d2 100644 --- a/src/describe.js +++ b/src/describe.js @@ -76,7 +76,7 @@ export const COMMAND_DESCRIPTIONS = [ }, { command: 'approve', - summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task.' + summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task. Concurrent execs serialize on an fs-lockfile so at most one consumer wins per grant.' }, { command: 'approvals', diff --git a/src/exec.js b/src/exec.js index 40d1156..f032704 100644 --- a/src/exec.js +++ b/src/exec.js @@ -28,8 +28,7 @@ import { approvalPolicyRequiresApproval, approvalPolicyAutoRejects, computeTaskApprovalHash, - findValidApproval, - consumeApproval, + claimApproval, verifyApprovalSignature, } from './approvals.js'; @@ -468,11 +467,12 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { } if (!approvalPolicyRequiresApproval(task.approval)) return null; const taskHash = computeTaskApprovalHash({ workflowId: workflow.id, task }); - const grant = findValidApproval({ + const grant = claimApproval({ workflowId: workflow.id, taskId: task.id, taskHash, approvalId, + executionId, env, }); if (!grant) { @@ -489,6 +489,9 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { } const sigCheck = verifyApprovalSignature(grant, { env }); if (sigCheck.verified === false) { + // We have already consumed the grant atomically; a bad signature means + // the record is corrupt or forged. Refuse execution and surface the + // consumption in error context so an operator can audit. throw Object.assign( new Error( `Approval ${grant.approval_id} signature verification failed: ${sigCheck.reason || 'invalid signature'}. ` + @@ -497,7 +500,6 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) { { code: 'approval_signature_invalid' } ); } - consumeApproval({ approvalId: grant.approval_id, executionId, env }); return { approval_id: grant.approval_id, task_hash: grant.task_hash, diff --git a/src/index.js b/src/index.js index 844f7ec..2cd5a64 100644 --- a/src/index.js +++ b/src/index.js @@ -35,6 +35,7 @@ export { listApprovals, findValidApproval, consumeApproval, + claimApproval, revokeApproval, computeTaskApprovalHash, approvalPolicyRequiresApproval, diff --git a/test/approvals.test.js b/test/approvals.test.js index dc914ee..12d6726 100644 --- a/test/approvals.test.js +++ b/test/approvals.test.js @@ -1,8 +1,9 @@ import test from 'node:test'; import assert from 'node:assert/strict'; -import { mkdtempSync, readFileSync, appendFileSync, existsSync, rmSync } from 'node:fs'; +import { mkdtempSync, readFileSync, writeFileSync, appendFileSync, existsSync, rmSync, mkdirSync, utimesSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; +import { Worker } from 'node:worker_threads'; import { grantApproval, @@ -10,6 +11,7 @@ import { findValidApproval, consumeApproval, revokeApproval, + claimApproval, computeTaskApprovalHash, approvalPolicyRequiresApproval, approvalPolicyAutoRejects, @@ -525,3 +527,166 @@ test('ssh-signed grant: round-trip with allowed_signers auto-bootstrap', async ( rmSync(home, { recursive: true, force: true }); } }); + +// --- Concurrency: claimApproval is atomic across workers --- + +const CLAIM_WORKER_SOURCE = ` +import { parentPort, workerData } from 'node:worker_threads'; +import { claimApproval } from '${new URL('../src/approvals.js', import.meta.url).href}'; + +const { workflowId, taskId, taskHash, env, executionId, approvalId } = workerData; +try { + const grant = claimApproval({ + workflowId, taskId, taskHash, + approvalId: approvalId || undefined, + executionId, + env, + }); + parentPort.postMessage({ ok: true, grant }); +} catch (err) { + parentPort.postMessage({ ok: false, error: err.message, code: err.code }); +} +`; + +function runClaimWorker({ workflowId, taskId, taskHash, env, executionId, approvalId }) { + return new Promise((resolve, reject) => { + const worker = new Worker(CLAIM_WORKER_SOURCE, { + eval: true, + workerData: { workflowId, taskId, taskHash, env, executionId, approvalId }, + }); + worker.once('message', resolve); + worker.once('error', reject); + }); +} + +test('concurrency: N parallel claims on one grant serialize to exactly one winner', async () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } }); + const task = m.workflows[0].tasks[0]; + const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task }); + const rec = grantApproval({ + manifest: m, + taskId: 'echo-task', + approver: 'alice', + env, + }); + + const N = 8; + const workerArgs = Array.from({ length: N }, (_, i) => ({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash, + env, + executionId: `exec-${i}`, + approvalId: rec.approval_id, + })); + const results = await Promise.all(workerArgs.map(runClaimWorker)); + + const winners = results.filter(r => r.ok && r.grant); + const empties = results.filter(r => r.ok && !r.grant); + const errors = results.filter(r => !r.ok); + + assert.equal(winners.length, 1, `expected exactly one winner, got ${winners.length}`); + assert.equal(empties.length, N - 1, 'losers should receive null, not errors'); + assert.equal(errors.length, 0, `no worker should error: ${JSON.stringify(errors)}`); + assert.equal(winners[0].grant.approval_id, rec.approval_id); + + // Only one consume event was written + const list = listApprovals({ env }); + assert.equal(list.length, 1); + assert.equal(list[0].status, 'consumed'); + } finally { + cleanup(); + } +}); + +test('concurrency: two pending grants + two concurrent claims → both succeed with distinct grants', async () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } }); + const task = m.workflows[0].tasks[0]; + const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task }); + const a = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + const b = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'bob', env }); + + const workerArgs = [ + { workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-1' }, + { workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-2' }, + ]; + const results = await Promise.all(workerArgs.map(runClaimWorker)); + const winners = results.filter(r => r.ok && r.grant); + assert.equal(winners.length, 2, 'both claims should succeed because two grants exist'); + const ids = new Set(winners.map(w => w.grant.approval_id)); + assert.equal(ids.size, 2, 'winners should hold distinct grants'); + assert.ok(ids.has(a.approval_id)); + assert.ok(ids.has(b.approval_id)); + } finally { + cleanup(); + } +}); + +test('concurrency: stale lock is broken and claim proceeds', () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } }); + const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + const taskHash = rec.task_hash; + + // Plant a lock file dated far in the past to simulate a crashed holder. + const paths = getAgentcliPaths({ env }); + const lockPath = `${paths.approvals}.lock`; + mkdirSync(paths.state, { recursive: true }); + writeFileSync(lockPath, 'stale pid\n', 'utf8'); + const hourAgo = (Date.now() - 60 * 60 * 1000) / 1000; + utimesSync(lockPath, hourAgo, hourAgo); + + // Claim should detect staleness, break the lock, and succeed. + const grant = claimApproval({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash, + executionId: 'exec-stale', + env, + }); + assert.ok(grant, 'claim should succeed after breaking stale lock'); + assert.equal(grant.approval_id, rec.approval_id); + + // Lock file cleaned up after claim + assert.equal(existsSync(lockPath), false); + } finally { + cleanup(); + } +}); + +test('concurrency: lock held past timeout throws approval_lock_timeout', () => { + const { env, cleanup } = isolatedEnv(); + try { + const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } }); + const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env }); + + // Plant a fresh lock file so the claim cannot acquire it. + const paths = getAgentcliPaths({ env }); + const lockPath = `${paths.approvals}.lock`; + mkdirSync(paths.state, { recursive: true }); + writeFileSync(lockPath, 'another pid\n', 'utf8'); + // Leave mtime fresh (now) so staleness check doesn't fire. + + assert.throws( + () => claimApproval({ + workflowId: 'test-wf', + taskId: 'echo-task', + taskHash: rec.task_hash, + executionId: 'exec-blocked', + env, + lockOptions: { timeoutMs: 100, staleMs: 60000, pollMs: 20 }, + }), + err => err.code === 'approval_lock_timeout' + ); + + // Clean up the planted lock + rmSync(lockPath, { force: true }); + } finally { + cleanup(); + } +});