Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.3.1 (2026-04-21)

- fix: concurrent `agentcli exec` calls on the same gated task no longer double-consume a single approval. A new `claimApproval` primitive performs find-then-consume atomically under an fs-lockfile at `~/.agentcli/state/approvals.ndjson.lock` (openSync `wx` + `Atomics.wait` backoff); stale locks (older than 30s) are broken; `approval_lock_timeout` is raised after 5s of contention
- `enforceApprovalGate` in `src/exec.js` now calls `claimApproval` instead of the separate `findValidApproval` + `consumeApproval` pair; signature verification still runs after the atomic claim and a bad signature refuses execution (the grant is already consumed and its use is audited)
- `claimApproval` added to the barrel export (`src/index.js`)
- 4 new concurrency tests: worker-thread race (N=8 parallel claims, exactly 1 winner), distinct-grant parallelism, stale-lock recovery, lock-timeout behavior

## 0.3.0 (2026-04-21)

- local approval gate enforcement in `agentcli exec`: tasks with `approval.policy: "manual"` refuse to execute unless a matching, unconsumed, unrevoked, unexpired approval record is present (`error_type: approval_required`)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@amittell/agentcli",
"version": "0.3.0",
"version": "0.3.1",
"description": "Agent-native workflow manifest CLI, schema, and protocol surface for standalone planning and backend compilation.",
"type": "module",
"main": "./src/index.js",
Expand Down
111 changes: 104 additions & 7 deletions src/approvals.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,81 @@
import { appendFileSync, readFileSync, existsSync, mkdirSync } from 'node:fs';
import {
appendFileSync, readFileSync, existsSync, mkdirSync,
openSync, closeSync, writeSync, unlinkSync, statSync,
} from 'node:fs';
import { dirname } from 'node:path';
import { createHash, randomBytes } from 'node:crypto';
import { getProvider, resolveProvider } from './signing/index.js';
import { resolveAllowedSigners, generateAllowedSigners } from './signing/ssh.js';
import { getAgentcliPaths } from './home.js';

// Concurrency note: grantApproval, findValidApproval, and consumeApproval read
// and append to an NDJSON log without advisory locking. Two concurrent
// `agentcli exec` invocations of the same gated task can both observe the
// same pending grant and both consume it. This is an accepted limitation of
// the local single-machine enforcement model; workflows that require
// multi-dispatcher approval coordination should use openclaw-scheduler.
// Concurrency: `claimApproval` is the atomic public primitive that
// enforceApprovalGate uses. It acquires an fs-lock on <approvals>.lock
// (openSync 'wx'), re-reads the log inside the critical section, finds a
// matching pending grant, appends a consume event, and releases the lock.
// Concurrent claims of the same grant serialize to exactly one winner;
// losers re-read and either find a different pending grant or throw
// approval_required. Locks older than LOCK_STALE_MS are treated as
// abandoned (crashed holder) and removed.

const APPROVAL_RECORD_VERSION = 1;
const DEFAULT_TTL_S = 3600;
const LOCK_SUFFIX = '.lock';
const LOCK_TIMEOUT_MS = 5000;
const LOCK_STALE_MS = 30000;
const LOCK_POLL_MS = 25;

// Shared memory used for sync sleep during lock backoff. Allocated once per
// process rather than per-retry.
const LOCK_SLEEP_BUF = new Int32Array(new SharedArrayBuffer(4));

function sleepSync(ms) {
Atomics.wait(LOCK_SLEEP_BUF, 0, 0, ms);
}

function withApprovalsLock(approvalsPath, fn, {
timeoutMs = LOCK_TIMEOUT_MS,
staleMs = LOCK_STALE_MS,
pollMs = LOCK_POLL_MS,
now = () => Date.now(),
} = {}) {
mkdirSync(dirname(approvalsPath), { recursive: true });
const lockPath = `${approvalsPath}${LOCK_SUFFIX}`;
const deadline = now() + timeoutMs;
let fd;
while (true) {
try {
fd = openSync(lockPath, 'wx');
writeSync(fd, `${process.pid}\n`);
break;
} catch (err) {
if (err.code !== 'EEXIST') throw err;
// Lock held. Check staleness and potentially break it.
try {
const st = statSync(lockPath);
if (now() - st.mtimeMs > staleMs) {
try { unlinkSync(lockPath); } catch { /* someone else cleaned up */ }
continue;
}
} catch {
// Lock vanished between EEXIST and stat; retry immediately.
continue;
}
if (now() >= deadline) {
throw Object.assign(
new Error(`Timed out acquiring approvals lock at ${lockPath} after ${timeoutMs}ms`),
{ code: 'approval_lock_timeout' }
);
}
sleepSync(pollMs);
}
}
try {
closeSync(fd);
return fn();
} finally {
try { unlinkSync(lockPath); } catch { /* already removed */ }
}
}

export function approvalPolicyRequiresApproval(approval) {
if (!approval) return false;
Expand Down Expand Up @@ -162,6 +224,41 @@ export function findValidApproval({
return candidates[0];
}

// Atomically find a matching pending grant and mark it consumed.
// Returns the consumed grant object, or null if no match. Concurrent callers
// serialize on the approvals lockfile; at most one wins per grant.
export function claimApproval({
workflowId,
taskId,
taskHash,
approvalId,
executionId,
env = process.env,
now = () => Date.now(),
lockOptions,
}) {
const paths = getAgentcliPaths({ env });
return withApprovalsLock(paths.approvals, () => {
const grant = findValidApproval({
workflowId, taskId, taskHash, approvalId, env, now: now(),
});
if (!grant) return null;
const consumedAt = new Date(now()).toISOString();
appendFileSync(
paths.approvals,
JSON.stringify({
v: APPROVAL_RECORD_VERSION,
kind: 'consume',
approval_id: grant.approval_id,
execution_id: executionId,
consumed_at: consumedAt,
}) + '\n',
'utf8'
);
return grant;
}, lockOptions);
}

function buildApprovalSignaturePayload(grant) {
return canonicalStringify({
v: APPROVAL_RECORD_VERSION,
Expand Down
2 changes: 1 addition & 1 deletion src/describe.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export const COMMAND_DESCRIPTIONS = [
},
{
command: 'approve',
summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task.'
summary: 'Grant a local, single-use, ssh-signed approval record for a task whose approval.policy is "manual". Required before agentcli exec will run the task. Concurrent execs serialize on an fs-lockfile so at most one consumer wins per grant.'
},
{
command: 'approvals',
Expand Down
10 changes: 6 additions & 4 deletions src/exec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import {
approvalPolicyRequiresApproval,
approvalPolicyAutoRejects,
computeTaskApprovalHash,
findValidApproval,
consumeApproval,
claimApproval,
verifyApprovalSignature,
} from './approvals.js';

Expand Down Expand Up @@ -468,11 +467,12 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) {
}
if (!approvalPolicyRequiresApproval(task.approval)) return null;
const taskHash = computeTaskApprovalHash({ workflowId: workflow.id, task });
const grant = findValidApproval({
const grant = claimApproval({
workflowId: workflow.id,
taskId: task.id,
taskHash,
approvalId,
executionId,
env,
});
if (!grant) {
Expand All @@ -489,6 +489,9 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) {
}
const sigCheck = verifyApprovalSignature(grant, { env });
if (sigCheck.verified === false) {
// We have already consumed the grant atomically; a bad signature means
// the record is corrupt or forged. Refuse execution and surface the
// consumption in error context so an operator can audit.
throw Object.assign(
new Error(
`Approval ${grant.approval_id} signature verification failed: ${sigCheck.reason || 'invalid signature'}. ` +
Expand All @@ -497,7 +500,6 @@ function enforceApprovalGate({ workflow, task, executionId, approvalId, env }) {
{ code: 'approval_signature_invalid' }
);
}
consumeApproval({ approvalId: grant.approval_id, executionId, env });
return {
approval_id: grant.approval_id,
task_hash: grant.task_hash,
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export {
listApprovals,
findValidApproval,
consumeApproval,
claimApproval,
revokeApproval,
computeTaskApprovalHash,
approvalPolicyRequiresApproval,
Expand Down
167 changes: 166 additions & 1 deletion test/approvals.test.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { mkdtempSync, readFileSync, appendFileSync, existsSync, rmSync } from 'node:fs';
import { mkdtempSync, readFileSync, writeFileSync, appendFileSync, existsSync, rmSync, mkdirSync, utimesSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { Worker } from 'node:worker_threads';

import {
grantApproval,
listApprovals,
findValidApproval,
consumeApproval,
revokeApproval,
claimApproval,
computeTaskApprovalHash,
approvalPolicyRequiresApproval,
approvalPolicyAutoRejects,
Expand Down Expand Up @@ -525,3 +527,166 @@ test('ssh-signed grant: round-trip with allowed_signers auto-bootstrap', async (
rmSync(home, { recursive: true, force: true });
}
});

// --- Concurrency: claimApproval is atomic across workers ---

const CLAIM_WORKER_SOURCE = `
import { parentPort, workerData } from 'node:worker_threads';
import { claimApproval } from '${new URL('../src/approvals.js', import.meta.url).href}';

const { workflowId, taskId, taskHash, env, executionId, approvalId } = workerData;
try {
const grant = claimApproval({
workflowId, taskId, taskHash,
approvalId: approvalId || undefined,
executionId,
env,
});
parentPort.postMessage({ ok: true, grant });
} catch (err) {
parentPort.postMessage({ ok: false, error: err.message, code: err.code });
}
`;

function runClaimWorker({ workflowId, taskId, taskHash, env, executionId, approvalId }) {
return new Promise((resolve, reject) => {
const worker = new Worker(CLAIM_WORKER_SOURCE, {
eval: true,
workerData: { workflowId, taskId, taskHash, env, executionId, approvalId },
});
worker.once('message', resolve);
worker.once('error', reject);
});
}

test('concurrency: N parallel claims on one grant serialize to exactly one winner', async () => {
const { env, cleanup } = isolatedEnv();
try {
const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } });
const task = m.workflows[0].tasks[0];
const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task });
const rec = grantApproval({
manifest: m,
taskId: 'echo-task',
approver: 'alice',
env,
});

const N = 8;
const workerArgs = Array.from({ length: N }, (_, i) => ({
workflowId: 'test-wf',
taskId: 'echo-task',
taskHash,
env,
executionId: `exec-${i}`,
approvalId: rec.approval_id,
}));
const results = await Promise.all(workerArgs.map(runClaimWorker));

const winners = results.filter(r => r.ok && r.grant);
const empties = results.filter(r => r.ok && !r.grant);
const errors = results.filter(r => !r.ok);

assert.equal(winners.length, 1, `expected exactly one winner, got ${winners.length}`);
assert.equal(empties.length, N - 1, 'losers should receive null, not errors');
assert.equal(errors.length, 0, `no worker should error: ${JSON.stringify(errors)}`);
assert.equal(winners[0].grant.approval_id, rec.approval_id);

// Only one consume event was written
const list = listApprovals({ env });
assert.equal(list.length, 1);
assert.equal(list[0].status, 'consumed');
} finally {
cleanup();
}
});

test('concurrency: two pending grants + two concurrent claims → both succeed with distinct grants', async () => {
const { env, cleanup } = isolatedEnv();
try {
const m = makeManifest({ approval: { policy: 'manual', risk_level: 'medium' } });
const task = m.workflows[0].tasks[0];
const taskHash = computeTaskApprovalHash({ workflowId: 'test-wf', task });
const a = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env });
const b = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'bob', env });

const workerArgs = [
{ workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-1' },
{ workflowId: 'test-wf', taskId: 'echo-task', taskHash, env, executionId: 'exec-2' },
];
const results = await Promise.all(workerArgs.map(runClaimWorker));
const winners = results.filter(r => r.ok && r.grant);
assert.equal(winners.length, 2, 'both claims should succeed because two grants exist');
const ids = new Set(winners.map(w => w.grant.approval_id));
assert.equal(ids.size, 2, 'winners should hold distinct grants');
assert.ok(ids.has(a.approval_id));
assert.ok(ids.has(b.approval_id));
} finally {
cleanup();
}
});

test('concurrency: stale lock is broken and claim proceeds', () => {
const { env, cleanup } = isolatedEnv();
try {
const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } });
const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env });
const taskHash = rec.task_hash;

// Plant a lock file dated far in the past to simulate a crashed holder.
const paths = getAgentcliPaths({ env });
const lockPath = `${paths.approvals}.lock`;
mkdirSync(paths.state, { recursive: true });
writeFileSync(lockPath, 'stale pid\n', 'utf8');
const hourAgo = (Date.now() - 60 * 60 * 1000) / 1000;
utimesSync(lockPath, hourAgo, hourAgo);

// Claim should detect staleness, break the lock, and succeed.
const grant = claimApproval({
workflowId: 'test-wf',
taskId: 'echo-task',
taskHash,
executionId: 'exec-stale',
env,
});
assert.ok(grant, 'claim should succeed after breaking stale lock');
assert.equal(grant.approval_id, rec.approval_id);

// Lock file cleaned up after claim
assert.equal(existsSync(lockPath), false);
} finally {
cleanup();
}
});

test('concurrency: lock held past timeout throws approval_lock_timeout', () => {
const { env, cleanup } = isolatedEnv();
try {
const m = makeManifest({ approval: { policy: 'manual', risk_level: 'low' } });
const rec = grantApproval({ manifest: m, taskId: 'echo-task', approver: 'alice', env });

// Plant a fresh lock file so the claim cannot acquire it.
const paths = getAgentcliPaths({ env });
const lockPath = `${paths.approvals}.lock`;
mkdirSync(paths.state, { recursive: true });
writeFileSync(lockPath, 'another pid\n', 'utf8');
// Leave mtime fresh (now) so staleness check doesn't fire.

assert.throws(
() => claimApproval({
workflowId: 'test-wf',
taskId: 'echo-task',
taskHash: rec.task_hash,
executionId: 'exec-blocked',
env,
lockOptions: { timeoutMs: 100, staleMs: 60000, pollMs: 20 },
}),
err => err.code === 'approval_lock_timeout'
);

// Clean up the planted lock
rmSync(lockPath, { force: true });
} finally {
cleanup();
}
});