From f5265710d77d4960b23c8028ec03cf1706346876 Mon Sep 17 00:00:00 2001 From: Oleg Bk Date: Wed, 27 May 2026 20:13:55 +0200 Subject: [PATCH 1/3] feat(mcp-proxy): close customer cold path Record verified Runtime Gate ALLOW/BLOCK decisions in local evidence, add backend registration and doctor preflight commands, and document the customer quickstart from init through offline verification. Includes tests for evidence export, backend preflight, registration edge cases, and receipt verification. Implemented with assistance from Codex. --- agentveil_mcp_proxy/README.md | 9 + agentveil_mcp_proxy/approval/manager.py | 64 +++- agentveil_mcp_proxy/cli.py | 233 ++++++++++++++- agentveil_mcp_proxy/evidence/store.py | 3 + agentveil_mcp_proxy/passthrough.py | 38 ++- docs/MCP_PROXY_QUICKSTART.md | 382 ++++++++++++++++++++++++ tests/test_mcp_proxy_cli.py | 373 +++++++++++++++++++++++ tests/test_mcp_proxy_runtime_gate.py | 162 +++++++++- 8 files changed, 1260 insertions(+), 4 deletions(-) create mode 100644 docs/MCP_PROXY_QUICKSTART.md diff --git a/agentveil_mcp_proxy/README.md b/agentveil_mcp_proxy/README.md index 0da8196..102d5c4 100644 --- a/agentveil_mcp_proxy/README.md +++ b/agentveil_mcp_proxy/README.md @@ -28,6 +28,13 @@ script. No additional extras are required. ## Quick Start +For the full step-by-step customer cold path (install → init → doctor → +configure downstream → run → export evidence → offline verify) and the +honest list of what the bundle currently does and does not prove, see +[`docs/MCP_PROXY_QUICKSTART.md`](../docs/MCP_PROXY_QUICKSTART.md). + +The short form is: + Create a local proxy identity, config, and control grant: ```bash @@ -221,6 +228,8 @@ exhaustive; review patterns for your specific downstream server. |---|---| | `init` | Create encrypted identity, config, and control grant. | | `doctor` | Validate local files and control grant. | +| `doctor --check-backend` | Add a read-only preflight that the backend is reachable and the proxy identity is registered. | +| `register` | Register the existing proxy identity with the configured backend. | | `run` | Run stdio passthrough, the proxy mode used by MCP clients. | | `reissue-grant` | Refresh the local control grant before expiry. | | `export-evidence ` | Export durable evidence bundle for offline verification. | diff --git a/agentveil_mcp_proxy/approval/manager.py b/agentveil_mcp_proxy/approval/manager.py index 3b224f1..416d89c 100644 --- a/agentveil_mcp_proxy/approval/manager.py +++ b/agentveil_mcp_proxy/approval/manager.py @@ -122,6 +122,7 @@ def request_approval( created_at=now, expires_at=record_expires_at, runtime_decision=None if active_similar_grant is not None else runtime_decision, + approval_token_hash=self.approval_server.token_hash, granted_by_request_id=( None if active_similar_grant is None else active_similar_grant.request_id ), @@ -178,6 +179,45 @@ def request_approval( ) return self._deny(request_id, "user_denied") + def record_runtime_allow( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> ApprovalOutcome: + """Persist a verified Runtime Gate ALLOW decision before forwarding.""" + + request_id = self._write_runtime_decision_record( + classification, + runtime_decision=runtime_decision, + ) + return ApprovalOutcome( + request_id, + ApprovalStatus.APPROVED.value, + "runtime_gate_allow", + ) + + def record_runtime_block( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> None: + """Persist a verified Runtime Gate BLOCK decision as terminal evidence.""" + + request_id = self._write_runtime_decision_record( + classification, + runtime_decision=runtime_decision, + ) + try: + self.evidence_store.transition( + request_id, + ApprovalStatus.BLOCKED.value, + error_class="runtime_gate_block", + ) + except ApprovalEvidenceError as exc: + raise ApprovalFlowError("runtime decision evidence persistence failed") from exc + def record_execution_result(self, outcome: ApprovalOutcome, response: dict[str, Any]) -> None: """Append execution result evidence for an approved downstream call.""" @@ -277,6 +317,7 @@ def _pending_record( created_at: int, expires_at: int | None, runtime_decision: RuntimeGateDecision | None, + approval_token_hash: str | None = None, granted_by_request_id: str | None = None, ) -> PendingApproval: return PendingApproval( @@ -297,11 +338,32 @@ def _pending_record( expires_at=expires_at, decision_audit_id=None if runtime_decision is None else runtime_decision.audit_id, decision_receipt_sha256=None if runtime_decision is None else runtime_decision.receipt_digest, - approval_token_hash=self.approval_server.token_hash, + approval_token_hash=approval_token_hash, matched_policy_rule=classification.policy_evaluation.policy_rule_id, granted_by_request_id=granted_by_request_id, ) + def _write_runtime_decision_record( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> str: + now = int(time.time()) + request_id = str(uuid.uuid4()) + record = self._pending_record( + classification, + request_id=request_id, + created_at=now, + expires_at=None, + runtime_decision=runtime_decision, + ) + try: + self.evidence_store.write_pending(record) + except ApprovalEvidenceError as exc: + raise ApprovalFlowError("runtime decision evidence persistence failed") from exc + return request_id + def _prompt_for( self, classification: ClassifiedToolCall, diff --git a/agentveil_mcp_proxy/cli.py b/agentveil_mcp_proxy/cli.py index f656258..f8567da 100644 --- a/agentveil_mcp_proxy/cli.py +++ b/agentveil_mcp_proxy/cli.py @@ -27,6 +27,7 @@ from agentveil.agent import AVPAgent from agentveil.delegation import DelegationInvalid, verify_delegation +from agentveil.exceptions import AVPError, AVPNotFoundError, AVPValidationError from agentveil_mcp_proxy.approval import ( ApprovalManager, ApprovalServer, @@ -635,6 +636,68 @@ def _load_proxy_agent( raise ProxyCliError("proxy identity invalid", exit_code=1) from exc +def _check_backend_preflight( + *, + identity: Mapping[str, Any], + config: ProxyConfig, + passphrase: str | None, + timeout_seconds: float = 5.0, +) -> str | None: + """Issue two read-only GETs to verify backend readiness. + + Returns a failure description on the first failure, or ``None`` on + success. Network and SDK exceptions are sanitized to category + + sender; raw response bodies are not surfaced. No state is mutated + on the backend (only ``GET /v1/health`` and + ``GET /v1/onboarding/{did}``). + """ + + try: + agent = _load_proxy_agent( + identity=identity, + config=config, + passphrase=passphrase, + timeout=timeout_seconds, + ) + except ProxyCliError as exc: + return f"backend preflight skipped: {exc}" + + base_url = config.avp.base_url + try: + agent.health() + except AVPError as exc: + return ( + f"backend health check failed at {base_url}: " + f"status {exc.status_code}" + ) + except Exception as exc: + return ( + f"backend unreachable at {base_url}: " + f"{type(exc).__name__}" + ) + + try: + agent.get_onboarding_status() + except AVPNotFoundError: + did = identity.get("did") + return ( + f"agent {did} is not registered with backend at {base_url}; " + "run `agentveil-mcp-proxy register` to register this identity" + ) + except AVPError as exc: + return ( + "backend onboarding status check failed: " + f"status {exc.status_code}" + ) + except Exception as exc: + return ( + "backend onboarding status unreachable: " + f"{type(exc).__name__}" + ) + + return None + + def doctor_proxy( *, home: Path | None = None, @@ -642,11 +705,20 @@ def doctor_proxy( passphrase: str | None = None, passphrase_file: Path | None = None, out: TextIO | None = None, + check_backend: bool = False, ) -> int: - """Validate local proxy files without starting transport.""" + """Validate local proxy files without starting transport. + + When ``check_backend`` is True, also issue two read-only GET + requests against the configured backend (``/v1/health`` and + ``/v1/onboarding/{did}``) to confirm reachability and that the + proxy agent identity is registered. No backend state is mutated. + Skipped if any local check already failed. + """ out = out or sys.stdout paths = proxy_paths(home, config_path) + backend_ok = False try: config = load_proxy_config(paths.config_path) identity_path = paths.identity_path(config.avp.agent_name) @@ -703,6 +775,17 @@ def doctor_proxy( else: failures.append(f"control grant invalid: {exc}") + if check_backend and not failures: + backend_failure = _check_backend_preflight( + identity=identity, + config=config, + passphrase=identity_passphrase, + ) + if backend_failure is not None: + failures.append(backend_failure) + else: + backend_ok = True + if failures: for failure in failures: print(f"FAIL: {failure}", file=out) @@ -719,6 +802,11 @@ def doctor_proxy( f"{config.circuit_breaker.cooldown_seconds}s cooldown)", file=out, ) + if backend_ok: + print( + f"OK: backend reachable at {config.avp.base_url}, agent registered", + file=out, + ) for warning in warnings: print(f"WARN: {warning}", file=out) return 0 @@ -727,6 +815,126 @@ def doctor_proxy( return 1 +def _rewrite_proxy_identity_after_register( + *, + identity_path: Path, + agent: Any, + passphrase: str | None, +) -> None: + """Persist updated registration status in the proxy's identity format. + + ``AVPAgent.register(...)`` calls ``self.save()`` which writes a + different file layout to the same path as the proxy identity. We + block that save during ``register_proxy`` and rewrite here using + the proxy's own helpers so the file format and encryption are + preserved. + """ + + if passphrase is None: + payload = plaintext_identity_payload(agent) + else: + payload = encrypted_identity_payload(agent, passphrase) + _secure_write_json(identity_path, payload, force=True) + + +def register_proxy( + *, + home: Path | None = None, + config_path: Path | None = None, + passphrase: str | None = None, + passphrase_file: Path | None = None, + out: TextIO | None = None, +) -> int: + """Register the existing proxy identity with the configured backend. + + Reuses the same identity file ``init`` created (preserving the DID + and the encrypted-at-rest format), calls the SDK's + ``AVPAgent.register()`` against the same ``base_url`` from the + proxy config, and rewrites the identity file with + ``registered: true``. Backend network errors are sanitized to + category + status code; private key material and raw response + bodies are never printed. + """ + + out = out or sys.stdout + paths = proxy_paths(home, config_path) + config = load_proxy_config(paths.config_path) + identity_path = paths.identity_path(config.avp.agent_name) + identity = _read_json(identity_path, "agent identity") + identity_passphrase = _resolve_existing_identity_passphrase( + identity, + passphrase=passphrase, + passphrase_file=passphrase_file, + ) + agent = _load_proxy_agent( + identity=identity, + config=config, + passphrase=identity_passphrase, + ) + + # ``AVPAgent.register`` calls ``self.save()`` internally which would + # rewrite ~/.avp/agents/.json in the SDK's plaintext format + # and DOWNGRADE the proxy's encrypted identity. Replace with a + # no-op on this instance; we rewrite the file ourselves below using + # the proxy's own payload helpers. + agent.save = lambda *_args, **_kwargs: None # type: ignore[method-assign] + + base_url = config.avp.base_url + try: + agent.register() + except AVPValidationError as exc: + if getattr(exc, "status_code", 0) == 409: + # ``AVPAgent.register`` raises 409 before it gets to set its + # ``_is_registered`` / ``_is_verified`` flags, so the loaded + # agent's in-memory state still reads False. The backend has + # already accepted this DID, so reflect that locally before + # writing the identity file; otherwise the rewritten file + # would say ``registered: false`` while the CLI told the user + # the identity is already registered. + if hasattr(agent, "_is_registered"): + agent._is_registered = True + if hasattr(agent, "_is_verified"): + agent._is_verified = True + _rewrite_proxy_identity_after_register( + identity_path=identity_path, + agent=agent, + passphrase=identity_passphrase, + ) + print( + f"OK: agent {agent.did} already registered at {base_url}", + file=out, + ) + return 0 + print( + f"FAIL: registration rejected at {base_url}: status {exc.status_code}", + file=out, + ) + return 1 + except AVPError as exc: + print( + f"FAIL: registration failed at {base_url}: status {exc.status_code}", + file=out, + ) + return 1 + except Exception as exc: + print( + f"FAIL: backend unreachable at {base_url}: {type(exc).__name__}", + file=out, + ) + return 1 + + _rewrite_proxy_identity_after_register( + identity_path=identity_path, + agent=agent, + passphrase=identity_passphrase, + ) + print( + f"OK: agent {agent.did} registered at {base_url}", + file=out, + ) + return 0 + + def _grant_scope_for_reissue(scope: Any) -> tuple[list[str], dict[str, Any] | None]: if not isinstance(scope, list): raise ProxyCliError("control grant scope invalid", exit_code=1) @@ -1111,6 +1319,14 @@ def build_parser() -> argparse.ArgumentParser: doctor = subparsers.add_parser("doctor", help="Validate local proxy config and files") _add_common_path_args(doctor) _add_passphrase_args(doctor) + doctor.add_argument( + "--check-backend", + action="store_true", + help=( + "Issue read-only GETs to verify the configured backend is " + "reachable and the proxy agent is registered." + ), + ) run = subparsers.add_parser("run", help="Run stdio MCP passthrough") _add_common_path_args(run) @@ -1119,6 +1335,13 @@ def build_parser() -> argparse.ArgumentParser: run.add_argument("--auto-deny", action="store_true", help="Deny every approval-required action") run.add_argument("--headless-policy", type=Path, default=None, help="Headless approval policy JSON path") + register = subparsers.add_parser( + "register", + help="Register the existing proxy identity with the configured backend", + ) + _add_common_path_args(register) + _add_passphrase_args(register) + reissue = subparsers.add_parser("reissue-grant", help="Issue a fresh local control grant") _add_common_path_args(reissue) _add_passphrase_args(reissue) @@ -1184,6 +1407,7 @@ def main(argv: list[str] | None = None) -> int: config_path=args.config, passphrase=args.passphrase, passphrase_file=args.passphrase_file, + check_backend=args.check_backend, ) if args.command == "run": return run_proxy( @@ -1195,6 +1419,13 @@ def main(argv: list[str] | None = None) -> int: auto_deny=args.auto_deny, headless_policy_path=args.headless_policy, ) + if args.command == "register": + return register_proxy( + home=args.home, + config_path=args.config, + passphrase=args.passphrase, + passphrase_file=args.passphrase_file, + ) if args.command == "reissue-grant": reissue_grant( home=args.home, diff --git a/agentveil_mcp_proxy/evidence/store.py b/agentveil_mcp_proxy/evidence/store.py index defb0f0..a1fd9da 100644 --- a/agentveil_mcp_proxy/evidence/store.py +++ b/agentveil_mcp_proxy/evidence/store.py @@ -84,6 +84,9 @@ class ApprovalStatus(str, Enum): ApprovalStatus.DENIED.value, ApprovalStatus.EXPIRED.value, ApprovalStatus.INVALIDATED.value, + ApprovalStatus.EXECUTED.value, + ApprovalStatus.BLOCKED.value, + ApprovalStatus.ERROR.value, }, ApprovalStatus.APPROVED.value: { ApprovalStatus.EXECUTED.value, diff --git a/agentveil_mcp_proxy/passthrough.py b/agentveil_mcp_proxy/passthrough.py index 358984d..0dc0a00 100644 --- a/agentveil_mcp_proxy/passthrough.py +++ b/agentveil_mcp_proxy/passthrough.py @@ -352,6 +352,25 @@ def _blocked_error( return jsonrpc_error(request_id, JSONRPC_POLICY_BLOCKED, message, data=data) +def _runtime_evidence_unavailable_error( + request_id: Any, + decision: RuntimeGateDecision, +) -> dict[str, Any]: + data: dict[str, Any] = { + "status": "blocked", + "reason": "runtime_gate_evidence_unavailable", + "decision": decision.decision, + } + if decision.audit_id is not None: + data["audit_id"] = decision.audit_id + return jsonrpc_error( + request_id, + JSONRPC_POLICY_BLOCKED, + "runtime decision evidence unavailable", + data=data, + ) + + def _approval_required_error( request_id: Any, *, @@ -723,7 +742,16 @@ def _runtime_gate_error_response( self._record_runtime_gate_events() if decision.decision == DECISION_ALLOW: - return None, None + if self.approval_manager is None: + return None, None + try: + outcome = self.approval_manager.record_runtime_allow( + classification, + runtime_decision=decision, + ) + except ApprovalFlowError: + return _runtime_evidence_unavailable_error(request_id, decision), None + return None, outcome if decision.decision == DECISION_WAITING: return self._approval_flow_response( classification, @@ -732,6 +760,14 @@ def _runtime_gate_error_response( runtime_decision=decision, ) if decision.decision == DECISION_BLOCK: + if self.approval_manager is not None: + try: + self.approval_manager.record_runtime_block( + classification, + runtime_decision=decision, + ) + except ApprovalFlowError: + return _runtime_evidence_unavailable_error(request_id, decision), None return _blocked_error( request_id, "blocked by AVP Runtime Gate", diff --git a/docs/MCP_PROXY_QUICKSTART.md b/docs/MCP_PROXY_QUICKSTART.md new file mode 100644 index 0000000..1be5854 --- /dev/null +++ b/docs/MCP_PROXY_QUICKSTART.md @@ -0,0 +1,382 @@ +# MCP Proxy Quickstart + +This is the cold customer path for wrapping one downstream MCP server with the +AgentVeil MCP Proxy and producing an offline-verifiable evidence bundle. Every +step describes behavior that is implemented and locally verifiable. Anything +this quickstart does not cover yet is listed under +[What this quickstart does NOT prove](#what-this-quickstart-does-not-prove). + +For day-2 operations, headless mode, multi-IDE deployment, evidence vacuum, and +identity migration, see [`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). + +## Prerequisites + +- Python 3.10+ on macOS or Linux. Windows is supported but the proxy README + flags a known orphan-process race on Windows; use a supervisor on Windows. +- A downstream MCP server you want to wrap. The downstream can be any MCP + server: filesystem, GitHub, custom company server. You provide its launch + command and arguments. +- Backend access: the proxy verifies signed AgentVeil Runtime Gate + `decision_receipt/2` artifacts against a pinned signer-DID set. For + `https://agentveil.dev` the trusted DIDs are bundled with the SDK; for any + other base URL pass `--trusted-signer-did` to `init`. + +## Install + +```bash +pip install agentveil +``` + +This installs the core `agentveil` SDK and registers the +`agentveil-mcp-proxy` console script. + +## Step 1 — `agentveil-mcp-proxy init` + +```bash +agentveil-mcp-proxy init +``` + +By default this creates an **encrypted local identity**, a self-issued +delegation receipt scoped to the `mcp_proxy` category, and a starter config at +`~/.avp/mcp-proxy/config.json`. The encrypted-identity passphrase is collected +interactively, from `--passphrase-file`, or from the `AVP_PROXY_PASSPHRASE` +environment variable. See +[`MCP_PROXY_OPERATIONS.md#security-trade-offs-by-passphrase-source`](MCP_PROXY_OPERATIONS.md#security-trade-offs-by-passphrase-source). + +### What `init` actually does + +- Generates a fresh Ed25519 keypair locally. +- Writes the identity, control grant, and config under `~/.avp/` with + owner-only permissions (0o600). +- Issues a local self-signed delegation receipt (issuer = subject = the new + agent DID), scoped to the `mcp_proxy` category. + +### What `init` does NOT do + +- It does not register the new agent with the AgentVeil backend. +- It does not contact `--base-url` at all. +- Until the agent is registered with the backend (see Step 3 + `agentveil-mcp-proxy register`), the proxy cannot exchange Runtime + Gate decisions with `https://agentveil.dev`. + +Custom base URL example: + +```bash +agentveil-mcp-proxy init \ + --base-url https://your-private-avp.example \ + --trusted-signer-did did:key:zYourPinnedSignerDid +``` + +## Step 2 — `agentveil-mcp-proxy doctor` (local-only) + +```bash +agentveil-mcp-proxy doctor +``` + +This validates local files only. Output on a fresh successful `init` looks +like: + +```text +OK: config /home/me/.avp/mcp-proxy/config.json +OK: identity /home/me/.avp/agents/agentveil-mcp-proxy.json +OK: control grant /home/me/.avp/mcp-proxy/agentveil-mcp-proxy.control-grant.json +OK: trusted signers 2 +OK: circuit breaker thresholds (5 failures, 60s window, 30s cooldown) +``` + +`doctor` checks: file permissions are 0600, the identity decrypts with the +configured passphrase, the agent DID matches its stored identity, the control +grant is signed by that identity, issuer/subject match, the trusted-signer DID +set is non-empty, and the control grant has not expired. + +`doctor` exits non-zero on any of these failures, with a specific FAIL line. +It is intentionally offline: it does not call the backend. + +## Step 3 — `agentveil-mcp-proxy register` + +```bash +agentveil-mcp-proxy register +``` + +Registers the exact identity created by `init` (same DID, same key) with +the backend at the `base_url` from your proxy config. This is the bridge +between local identity creation and the next step +(`doctor --check-backend`). Registration is required once per identity; +re-runs are idempotent. + +### What `register` actually does + +- Loads the same proxy identity file `doctor` and `run` use (encrypted + with the same passphrase mechanism). +- Calls the SDK's `AVPAgent.register()` against the configured + `base_url`: registration POST → Proof-of-Work solve → verify POST. +- Updates the local identity file's `registered` flag to `true`. +- Preserves the identity's encrypted-at-rest format (no plaintext + downgrade). + +### What `register` does NOT do + +- It does not provision an agent card / capabilities / endpoint URL / + provider on the backend. If you need those, use the `agentveil` SDK + directly with the keyword arguments documented on + `AVPAgent.register(...)`. +- It does not exchange Runtime Gate decisions or attestations on its + own; that happens later when the proxy actually serves tool calls. +- It does not prove the proxy can ship a verified evidence bundle — + that still requires Steps 7-9 below. + +### Expected output + +Success: + +```text +OK: agent did:key:z6Mk... registered at https://agentveil.dev +``` + +Idempotent re-run when the agent already exists on the backend: + +```text +OK: agent did:key:z6Mk... already registered at https://agentveil.dev +``` + +Sanitized failure shapes: + +```text +FAIL: registration rejected at : status 4xx +FAIL: registration failed at : status 5xx +FAIL: backend unreachable at : +``` + +Raw response bodies and stack traces are never printed. Re-run after +fixing the underlying issue. + +## Step 4 — `agentveil-mcp-proxy doctor --check-backend` + +```bash +agentveil-mcp-proxy doctor --check-backend +``` + +Adds two read-only HTTP GETs against the configured backend: + +- `GET /v1/health` — confirms the backend is reachable. +- `GET /v1/onboarding/{did}` — confirms the proxy agent identity is + registered with the backend. + +On success the output adds one extra line: + +```text +OK: backend reachable at https://agentveil.dev, agent registered +``` + +On failure the output starts with one of: + +```text +FAIL: backend unreachable at : +FAIL: backend health check failed at : status +FAIL: agent is not registered with backend at ; run `agentveil-mcp-proxy register` to register this identity +FAIL: backend onboarding status check failed: status +``` + +These failures are sanitized — raw response bodies and stack traces are not +printed. If you see "not registered", re-run Step 3 +(`agentveil-mcp-proxy register`) before continuing. + +`doctor` skips the backend preflight when any local check has already +failed. + +## Step 5 — Configure the downstream MCP server + +Edit `~/.avp/mcp-proxy/config.json` and set `downstream.command` and +`downstream.args` to the MCP server you want to wrap. Example for a +filesystem MCP server: + +```json +{ + "downstream": { + "name": "filesystem", + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-filesystem", "/Users/me/work"] + } +} +``` + +`name` is the server label the proxy uses internally and in evidence records. +`env` and `env_passthrough` are also supported; see +[`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). The proxy refuses to +forward any `AVP_*` environment variable to the downstream — those names are +reserved for proxy-internal secrets. + +## Step 6 — Point your MCP client at the proxy + +Configure your IDE / MCP client to run `agentveil-mcp-proxy run` as its +server entry point. The MCP Proxy README documents wiring for Claude Desktop, +Cursor, Windsurf, and VS Code: +[`agentveil_mcp_proxy/README.md`](../agentveil_mcp_proxy/README.md). + +When the client starts the proxy: + +```bash +agentveil-mcp-proxy run +``` + +The proxy: re-validates the local files via `doctor`, launches the downstream +MCP server, starts the local HTTP approval UI, and begins serving stdio +JSON-RPC. + +## Step 7 — Trigger one MCP tool call + +Use your MCP client (Claude Desktop, Cursor, Windsurf, VS Code, or any stdio +JSON-RPC client) to invoke a tool on the downstream server. The proxy +classifies the call, applies local policy, and: + +- **ALLOW** policy decisions are forwarded to the downstream server. If the + policy rule routes the call through Runtime Gate (`ASK_BACKEND`), the + verified backend `decision_receipt/2` and the downstream result hash are + recorded into the local SQLite evidence store **when an evidence store is + configured for the run** (this is the default for `agentveil-mcp-proxy run`). +- **APPROVAL** policy decisions open the local browser approval UI bound to + the exact payload hash and matched policy rule. +- **BLOCK** policy decisions return a JSON-RPC error and never reach the + downstream server. Runtime Gate BLOCK decisions taken on the `ASK_BACKEND` + path are also recorded into the local evidence store with the backend's + signed receipt attached. +- **OBSERVE** policy decisions are forwarded without further gating. + +Slice-level note: recording verified Runtime Gate `ALLOW` and `BLOCK` +decisions into the local evidence store is a recent change. Before it +landed, only the approval-required path created proxy-side evidence +records. See `agentveil_mcp_proxy/passthrough.py` and +`agentveil_mcp_proxy/approval/manager.py` for the exact flow. + +## Step 8 — Export the evidence bundle + +After one or more tool calls: + +```bash +agentveil-mcp-proxy export-evidence ./my-bundle.json +``` + +The CLI returns a summary line of the form: + +```text +Evidence exported: ./my-bundle.json (N records, M signed receipts) +``` + +`N` is the count of local evidence records (one per gated tool call); `M` is +the count of backend-signed `decision_receipt/2` artifacts the proxy was +able to fetch via `agent.get_decision_receipt(audit_id)` and embed in the +bundle. If a record references an `audit_id` but the receipt could not be +fetched or the digest did not match, the CLI prints a `WARN` and the +bundle's `unverified_receipt_count` increases. + +The exported bundle file is written with 0600 permissions. + +## Step 9 — Verify the bundle offline + +```bash +agentveil-mcp-proxy verify ./my-bundle.json \ + --trusted-signer-did did:key:zYourPinnedSignerDid +``` + +Pass `--trusted-signer-did` to require an explicit pinned set. If you omit +the flag the verifier will use the signer set embedded in the bundle and +will emit a `WARN` about trusting bundle-embedded signers. For +due-diligence verification, pass the flag. + +Successful output: + +```text +OK: bundle integrity verified, N records, M signed receipts +``` + +The verifier checks, offline: + +1. Local record chain `prev_event_hash` and `record_hash` for every record. +2. Bundle-level `chain_root_hash` matches the last record's hash. +3. For every embedded signed receipt: + - The SHA-256 of the byte-exact JCS receipt matches its key in + `signed_receipts`. + - The DataIntegrityProof / `eddsa-jcs-2022` signature verifies against + one of the pinned signer DIDs. + - `schema_version` is in the accepted set (`decision_receipt/1`, + `decision_receipt/2`). + - `audit_id` is present and well-formed. +4. Field cross-checks between record and embedded receipt: + - `record.payload_hash` == `receipt.payload_hash` + - `record.risk_class` == `receipt.client_risk_class` + - `record.policy_context_hash` == `receipt.client_policy_context_hash` + - `record.decision_audit_id` == `receipt.audit_id` + +The verifier exits non-zero with a specific `EvidenceVerificationError` +message on any of these failures. + +## What this quickstart proves + +After the steps above, the bundle you produced contains, for each gated +tool call: + +- A privacy-preserving local evidence record describing the action class, + risk class, payload hash, policy rule, and decision audit ID. +- The backend-signed Runtime Gate `decision_receipt/2` artifact for every + `ASK_BACKEND` Runtime Gate decision (`ALLOW` / `BLOCK` / + `WAITING_FOR_HUMAN_APPROVAL`) that the proxy issued. +- The downstream result hash for forwarded calls that completed. + +These artifacts are independently verifiable offline against the pinned +backend signer DID set. + +## What this quickstart does NOT prove + +This list is the honest counterpart to the section above. The bundle +**does not** currently contain or prove any of the following: + +- That the **production** AgentVeil backend at `https://agentveil.dev` + signed receipts in any internal demo artifact you may have seen. The + Action Proof Pack v1.2 internal proof harness runs against a local dev + backend with a deterministic dev key, not the production signer set. v1.2 + is an internal proof, not customer production evidence. +- A backend-signed `human_approval_receipt/2` for the WAITING / + approval-required path. The MCP Proxy currently uses a local browser + approval UI; it does not call the backend Human Control API. The bundle + carries the proxy-side local approval record but not a backend-signed + approval artifact. +- A backend-signed `execution_receipt/2`. The backend `/v1/execute` + endpoint exists for capabilities the backend itself adapts; the MCP + Proxy forwards the call to your downstream MCP server instead. There is + no backend-attested execution claim for proxy-forwarded calls. +- Control over agent actions that bypass the MCP Proxy. The proxy gates + only calls that flow through it; raw shell, raw API access, or any path + the MCP client takes without going through the proxy are out of scope. +- Sandbox replacement. The proxy does not contain or restrict the + downstream MCP server's process; use OS-level sandboxing + (container/VM/sandbox) separately if process containment matters. +- A fix to model behavior or model alignment. The proxy controls actions, + not model reasoning or output. + +## Failure modes you may hit + +- `FAIL: agent is not registered with backend at ` — the + proxy identity was created locally but never onboarded. Run Step 3 + (`agentveil-mcp-proxy register`) and then re-run + `doctor --check-backend`. +- `WARN: control grant expires in N days` — the local self-issued + delegation is approaching expiry. Run + `agentveil-mcp-proxy reissue-grant`. +- `WARN: ... default_trust_from_bundle ...` from `verify` — you omitted + `--trusted-signer-did`. For due-diligence verification, always pass the + flag. +- `WARN: N records have decision_audit_id but no matching signed receipt + in bundle` from `export-evidence` — the receipt fetch failed or the + digest did not match. Check network reachability and that the agent + identity matches the agent that produced the records. + +## Where to go next + +- Operations / day-2 reference: + [`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). +- MCP Proxy README with IDE wiring examples: + [`agentveil_mcp_proxy/README.md`](../agentveil_mcp_proxy/README.md). +- Data handling boundaries: + [`DATA_HANDLING.md`](DATA_HANDLING.md). +- SDK / API surface: + [`API.md`](API.md). diff --git a/tests/test_mcp_proxy_cli.py b/tests/test_mcp_proxy_cli.py index 8c1a905..e335ac3 100644 --- a/tests/test_mcp_proxy_cli.py +++ b/tests/test_mcp_proxy_cli.py @@ -13,6 +13,7 @@ import agentveil_mcp_proxy.cli as proxy_cli from agentveil.delegation import verify_delegation +from agentveil.exceptions import AVPNotFoundError, AVPServerError, AVPValidationError from agentveil_mcp_proxy.cli import ( AGENTVEIL_DEV_SIGNER_DIDS, MIN_IDENTITY_PASSPHRASE_LENGTH, @@ -22,6 +23,7 @@ init_proxy, main, proxy_paths, + register_proxy, reissue_grant, run_proxy, ) @@ -620,6 +622,377 @@ def test_doctor_fails_when_grant_already_expired(tmp_path): assert secret not in output +def test_doctor_fails_when_identity_file_missing(tmp_path): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_path = proxy_paths(home).identity_path("proxy") + identity_path.unlink() + + out = io.StringIO() + code = doctor_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 1 + output = out.getvalue() + assert "FAIL: agent identity not found" in output + + +class _StubBackendAgent: + """Minimal stub for `--check-backend` tests. + + Counts ``health()`` and ``get_onboarding_status()`` invocations so a + regression test can assert the doctor does not call the backend + without ``--check-backend``. + """ + + def __init__(self, *, did: str, health_raises: Exception | None = None, + onboarding_raises: Exception | None = None): + self.did = did + self._health_raises = health_raises + self._onboarding_raises = onboarding_raises + self.health_calls = 0 + self.onboarding_calls = 0 + + def health(self) -> dict: + self.health_calls += 1 + if self._health_raises is not None: + raise self._health_raises + return {"status": "ok"} + + def get_onboarding_status(self) -> dict: + self.onboarding_calls += 1 + if self._onboarding_raises is not None: + raise self._onboarding_raises + return {"status": "verified"} + + +def _install_stub_agent(monkeypatch, agent: _StubBackendAgent) -> None: + monkeypatch.setattr( + proxy_cli, + "_load_proxy_agent", + lambda **_kwargs: agent, + ) + + +def test_doctor_local_only_does_not_call_backend(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 0 + assert stub.health_calls == 0 + assert stub.onboarding_calls == 0 + assert "backend reachable" not in out.getvalue() + + +def test_doctor_check_backend_succeeds_with_registered_agent(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 0 + assert stub.health_calls == 1 + assert stub.onboarding_calls == 1 + output = out.getvalue() + assert "OK: backend reachable at " in output + assert "agent registered" in output + + +def test_doctor_check_backend_fails_when_backend_unreachable(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + health_raises=ConnectionError("connection refused"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: backend unreachable at" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 0 + assert "connection refused" not in output + + +def test_doctor_check_backend_fails_when_agent_not_registered(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + onboarding_raises=AVPNotFoundError("agent not found", 404, "agent not found"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: agent " in output + assert "is not registered with backend at" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 1 + + +def test_doctor_check_backend_fails_on_server_error(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + health_raises=AVPServerError("server error", 500, "server error"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: backend health check failed at" in output + assert "status 500" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 0 + + +def test_doctor_check_backend_skipped_when_local_checks_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + # Break a local check (empty signer set) so backend preflight is skipped. + config = _load(result.config_path) + config["avp"]["trusted_signer_dids"] = [] + result.config_path.write_text(json.dumps(config), encoding="utf-8") + os.chmod(result.config_path, 0o600) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + assert stub.health_calls == 0 + assert stub.onboarding_calls == 0 + assert "trusted_signer_dids" in out.getvalue() + + +def _install_fake_register(monkeypatch, *, raises=None, capture_dids=None): + """Replace ``AVPAgent.register`` with a deterministic fake. + + ``raises`` — exception instance to raise instead of completing. + ``capture_dids`` — optional list to append the agent DID for each call. + """ + + from agentveil.agent import AVPAgent + + def fake_register(self): + if capture_dids is not None: + capture_dids.append(self.did) + if raises is not None: + raise raises + self._is_registered = True + self._is_verified = True + return {"did": self.did, "onboarding_pending": True} + + monkeypatch.setattr(AVPAgent, "register", fake_register) + + +def test_register_loads_existing_proxy_did(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_before = _load(result.identity_path) + captured: list[str] = [] + _install_fake_register(monkeypatch, capture_dids=captured) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 0 + assert captured == [identity_before["did"]] + # Identity file still exists, still encrypted, same DID, registered flag set. + identity_after = _load(result.identity_path) + assert identity_after["did"] == identity_before["did"] + assert identity_after["encrypted"] is True + assert identity_after.get("registered") is True + + +def test_register_success_prints_sanitized_ok_and_no_secret(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + secret = _secret_material(identity) + _install_fake_register(monkeypatch) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 0 + assert "OK: agent " in output + assert identity["did"] in output + assert secret not in output + assert "private_key" not in output + assert "encryption_salt" not in output + + +def test_register_backend_failure_prints_sanitized_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + secret = _secret_material(_load(result.identity_path)) + _install_fake_register( + monkeypatch, + raises=AVPServerError("server boom raw body", 500, "raw response detail"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 1 + assert "FAIL: registration failed at " in output + assert "status 500" in output + assert "server boom raw body" not in output + assert "raw response detail" not in output + assert secret not in output + + +def test_register_backend_unreachable_prints_sanitized_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + secret = _secret_material(_load(result.identity_path)) + _install_fake_register( + monkeypatch, + raises=ConnectionError("connect timeout to host"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 1 + assert "FAIL: backend unreachable at " in output + assert "ConnectionError" in output + assert "connect timeout to host" not in output + assert secret not in output + + +def test_register_already_registered_returns_ok(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_before = _load(result.identity_path) + _install_fake_register( + monkeypatch, + raises=AVPValidationError("agent already exists", 409, "conflict"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 0 + assert "already registered" in output + + # The 409 path must keep the identity encrypted and update the + # local `registered` / `verified` flags to True so the file is not + # left in a stale state contradicting the CLI message. + identity_after = _load(result.identity_path) + assert identity_after["did"] == identity_before["did"] + assert identity_after["encrypted"] is True + assert "private_key_hex" not in identity_after + assert identity_after.get("registered") is True + assert identity_after.get("verified") is True + assert _mode(result.identity_path) == 0o600 + + +def test_register_encrypted_identity_requires_passphrase(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + _install_fake_register(monkeypatch) + monkeypatch.delenv("AVP_PROXY_PASSPHRASE", raising=False) + + out = io.StringIO() + try: + register_proxy(home=home, out=out) + except ProxyCliError as exc: + assert "passphrase" in str(exc) + assert exc.exit_code == 1 + else: + raise AssertionError("expected register to require encrypted identity passphrase") + + +def test_register_does_not_downgrade_encrypted_identity_to_plaintext(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + encrypted_blob_before = _load(result.identity_path).get("encrypted_blob") + assert encrypted_blob_before # sanity: init produced an encrypted identity + _install_fake_register(monkeypatch) + + register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=io.StringIO()) + + identity_after = _load(result.identity_path) + assert identity_after["encrypted"] is True + # No plaintext private key should ever appear in the file after register. + assert "private_key_hex" not in identity_after + assert _mode(result.identity_path) == 0o600 + + +def test_register_wired_through_main(tmp_path, monkeypatch, capsys): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + _install_fake_register(monkeypatch) + + exit_code = main([ + "register", + "--home", str(home), + "--passphrase", TEST_PASSPHRASE, + ]) + out, _err = capsys.readouterr() + + assert exit_code == 0 + assert "OK: agent " in out + + def test_reissue_grant_creates_new_grant_with_default_ttl(tmp_path): home = tmp_path / "avp-home" result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) diff --git a/tests/test_mcp_proxy_runtime_gate.py b/tests/test_mcp_proxy_runtime_gate.py index 2f89439..147a786 100644 --- a/tests/test_mcp_proxy_runtime_gate.py +++ b/tests/test_mcp_proxy_runtime_gate.py @@ -17,8 +17,15 @@ from agentveil.agent import AVPAgent from agentveil.delegation import _public_key_to_did +from agentveil_mcp_proxy.approval import ApprovalManager, ApprovalServer from agentveil_mcp_proxy.circuit_breaker import CircuitBreaker, CircuitBreakerConfig from agentveil_mcp_proxy.classification import ToolCallClassifier +from agentveil_mcp_proxy.evidence import ( + ApprovalEvidenceStore, + ApprovalStatus, + export_evidence_bundle, + verify_evidence_bundle, +) from agentveil_mcp_proxy.passthrough import ( DownstreamConfig, JSONRPC_APPROVAL_REQUIRED, @@ -210,6 +217,20 @@ def get_decision_receipt(self, audit_id: str) -> str: raise AssertionError("inline decision_receipt_jcs should avoid a receipt fetch") +class FetchableRecordingAgent(RecordingAgent): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.receipts: dict[str, str] = {} + + def runtime_evaluate(self, **kwargs): + response = super().runtime_evaluate(**kwargs) + self.receipts[response["audit_id"]] = response["decision_receipt_jcs"] + return response + + def get_decision_receipt(self, audit_id: str) -> str: + return self.receipts[audit_id] + + class SequencedReceiptAgent: did = AGENT_DID @@ -269,7 +290,13 @@ def _echo_downstream(tmp_path: Path, log_path: Path) -> Path: return script -def _passthrough(tmp_path: Path, gate: object, config: ProxyConfig) -> tuple[McpPassthrough, Path]: +def _passthrough( + tmp_path: Path, + gate: object, + config: ProxyConfig, + *, + approval_manager: ApprovalManager | None = None, +) -> tuple[McpPassthrough, Path]: log_path = tmp_path / "downstream.log" passthrough = McpPassthrough( DownstreamConfig( @@ -280,6 +307,7 @@ def _passthrough(tmp_path: Path, gate: object, config: ProxyConfig) -> tuple[Mcp ), classifier=ToolCallClassifier(config, server_name="github"), runtime_gate_factory=lambda: gate, + approval_manager=approval_manager, ) return passthrough, log_path @@ -553,6 +581,92 @@ def test_verified_allow_forwards_downstream(tmp_path): assert len(gate.calls) == 1 +def test_verified_allow_records_runtime_receipt_and_downstream_result(tmp_path): + config = _config() + digest = "aa" * 32 + gate = StaticGate(RuntimeGateDecision( + decision="ALLOW", + audit_id=AUDIT_ID, + approval_id=None, + receipt_digest=digest, + receipt_body={}, + )) + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-allow", + ) + passthrough, log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + records = store.list_records() + + assert _responses(client_out.getvalue()) == [{ + "jsonrpc": "2.0", + "id": "call-1", + "result": {"content": [{"type": "text", "text": "forwarded"}]}, + }] + assert log_path.read_text(encoding="utf-8").splitlines() == ["tools/call"] + assert len(records) == 1 + record = records[0] + assert record.status == ApprovalStatus.EXECUTED.value + assert record.decision_audit_id == AUDIT_ID + assert record.decision_receipt_sha256 == digest + assert record.result_status == "executed" + assert record.result_hash is not None + assert record.approval_token_hash is None + assert record.approval_decided_by is None + + +def test_verified_allow_export_bundle_attaches_signed_decision_receipt(tmp_path): + config = _config() + agent = FetchableRecordingAgent(decision="ALLOW") + gate = RuntimeGateClient(agent=agent, config=config, control_grant={"id": "grant"}) + bundle_path = tmp_path / "evidence-bundle.json" + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-bundle", + ) + passthrough, _log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + bundle = export_evidence_bundle( + store, + bundle_path, + proxy_identity_did=AGENT_DID, + trusted_signer_dids=[BACKEND_DID], + receipt_fetcher=agent.get_decision_receipt, + ) + + result = verify_evidence_bundle(bundle, trusted_signer_dids=[BACKEND_DID]) + assert result.valid is True + assert result.record_count == 1 + assert result.signed_receipt_count == 1 + assert result.unverified_receipt_count == 0 + assert result.warnings == () + + def test_block_does_not_forward_and_returns_sanitized_error(tmp_path): config = _config() gate = StaticGate(RuntimeGateDecision( @@ -576,6 +690,52 @@ def test_block_does_not_forward_and_returns_sanitized_error(tmp_path): assert not log_path.exists() +def test_verified_block_records_runtime_receipt_without_forwarding(tmp_path): + config = _config() + digest = "aa" * 32 + gate = StaticGate(RuntimeGateDecision( + decision="BLOCK", + audit_id=AUDIT_ID, + approval_id=None, + receipt_digest=digest, + receipt_body={}, + )) + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-block", + ) + passthrough, log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + records = store.list_records() + + response = _responses(client_out.getvalue())[0] + assert response["error"]["code"] == JSONRPC_POLICY_BLOCKED + assert response["error"]["data"]["reason"] == "runtime_gate_block" + assert SECRET not in client_out.getvalue() + assert not log_path.exists() + assert len(records) == 1 + record = records[0] + assert record.status == ApprovalStatus.BLOCKED.value + assert record.decision_audit_id == AUDIT_ID + assert record.decision_receipt_sha256 == digest + assert record.result_status == "blocked" + assert record.error_class == "runtime_gate_block" + assert record.approval_token_hash is None + assert record.approval_decided_by is None + + def test_waiting_does_not_forward_and_returns_approval_required_shape(tmp_path): config = _config() gate = StaticGate(RuntimeGateDecision( From 64cce6d8502371ffc8e504920a03338c9868f23a Mon Sep 17 00:00:00 2001 From: Oleg Bk Date: Thu, 28 May 2026 12:10:44 +0200 Subject: [PATCH 2/3] fix(paperclip): preserve which paths on Windows Keep shutil.which results as strings in the Paperclip doctor so tests and show-paths output preserve the resolved path instead of normalizing POSIX-like fake paths through pathlib on Windows. Implemented with assistance from Codex. --- agentveil_paperclip/doctor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/agentveil_paperclip/doctor.py b/agentveil_paperclip/doctor.py index 0bccfca..002922b 100644 --- a/agentveil_paperclip/doctor.py +++ b/agentveil_paperclip/doctor.py @@ -34,11 +34,10 @@ class DoctorReport: codex_mcp_config: CheckResult -def _which(command: str) -> Path | None: +def _which(command: str) -> str | None: """Resolve `command` on PATH without executing it.""" - resolved = shutil.which(command) - return Path(resolved) if resolved else None + return shutil.which(command) def _claude_mcp_config_present(home: Path, cwd: Path) -> tuple[bool, str | None]: From 20f162370b798e2bc5f50dd0f204e4f2f3cc53d1 Mon Sep 17 00:00:00 2001 From: Oleg Bk Date: Thu, 28 May 2026 12:18:28 +0200 Subject: [PATCH 3/3] fix(ci): handle Windows path compatibility Honor explicit HOME overrides in the Paperclip doctor and make POSIX mode assertions conditional in register tests, since Windows does not preserve chmod semantics in the same way. Implemented with assistance from Codex. --- agentveil_paperclip/doctor.py | 9 ++++++++- tests/test_mcp_proxy_cli.py | 6 ++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/agentveil_paperclip/doctor.py b/agentveil_paperclip/doctor.py index 002922b..dde1425 100644 --- a/agentveil_paperclip/doctor.py +++ b/agentveil_paperclip/doctor.py @@ -9,6 +9,7 @@ from __future__ import annotations +import os import shutil from dataclasses import dataclass from pathlib import Path @@ -40,6 +41,12 @@ def _which(command: str) -> str | None: return shutil.which(command) +def _default_home() -> Path: + """Resolve the user home, honoring explicit HOME overrides in tests.""" + + return Path(os.environ.get("HOME") or Path.home()) + + def _claude_mcp_config_present(home: Path, cwd: Path) -> tuple[bool, str | None]: """Report whether any plausible Claude MCP config exists. @@ -80,7 +87,7 @@ def collect_doctor_report( ) -> DoctorReport: """Run the read-only diagnostic checks and return a structured report.""" - home = home or Path.home() + home = home or _default_home() cwd = cwd or Path.cwd() proxy_path = _which("agentveil-mcp-proxy") diff --git a/tests/test_mcp_proxy_cli.py b/tests/test_mcp_proxy_cli.py index e335ac3..dde4d5c 100644 --- a/tests/test_mcp_proxy_cli.py +++ b/tests/test_mcp_proxy_cli.py @@ -942,7 +942,8 @@ def test_register_already_registered_returns_ok(tmp_path, monkeypatch): assert "private_key_hex" not in identity_after assert identity_after.get("registered") is True assert identity_after.get("verified") is True - assert _mode(result.identity_path) == 0o600 + if os.name != "nt": + assert _mode(result.identity_path) == 0o600 def test_register_encrypted_identity_requires_passphrase(tmp_path, monkeypatch): @@ -974,7 +975,8 @@ def test_register_does_not_downgrade_encrypted_identity_to_plaintext(tmp_path, m assert identity_after["encrypted"] is True # No plaintext private key should ever appear in the file after register. assert "private_key_hex" not in identity_after - assert _mode(result.identity_path) == 0o600 + if os.name != "nt": + assert _mode(result.identity_path) == 0o600 def test_register_wired_through_main(tmp_path, monkeypatch, capsys):