diff --git a/agentveil_mcp_proxy/README.md b/agentveil_mcp_proxy/README.md index 0da8196..102d5c4 100644 --- a/agentveil_mcp_proxy/README.md +++ b/agentveil_mcp_proxy/README.md @@ -28,6 +28,13 @@ script. No additional extras are required. ## Quick Start +For the full step-by-step customer cold path (install → init → doctor → +configure downstream → run → export evidence → offline verify) and the +honest list of what the bundle currently does and does not prove, see +[`docs/MCP_PROXY_QUICKSTART.md`](../docs/MCP_PROXY_QUICKSTART.md). + +The short form is: + Create a local proxy identity, config, and control grant: ```bash @@ -221,6 +228,8 @@ exhaustive; review patterns for your specific downstream server. |---|---| | `init` | Create encrypted identity, config, and control grant. | | `doctor` | Validate local files and control grant. | +| `doctor --check-backend` | Add a read-only preflight that the backend is reachable and the proxy identity is registered. | +| `register` | Register the existing proxy identity with the configured backend. | | `run` | Run stdio passthrough, the proxy mode used by MCP clients. | | `reissue-grant` | Refresh the local control grant before expiry. | | `export-evidence ` | Export durable evidence bundle for offline verification. | diff --git a/agentveil_mcp_proxy/approval/manager.py b/agentveil_mcp_proxy/approval/manager.py index 3b224f1..416d89c 100644 --- a/agentveil_mcp_proxy/approval/manager.py +++ b/agentveil_mcp_proxy/approval/manager.py @@ -122,6 +122,7 @@ def request_approval( created_at=now, expires_at=record_expires_at, runtime_decision=None if active_similar_grant is not None else runtime_decision, + approval_token_hash=self.approval_server.token_hash, granted_by_request_id=( None if active_similar_grant is None else active_similar_grant.request_id ), @@ -178,6 +179,45 @@ def request_approval( ) return self._deny(request_id, "user_denied") + def record_runtime_allow( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> ApprovalOutcome: + """Persist a verified Runtime Gate ALLOW decision before forwarding.""" + + request_id = self._write_runtime_decision_record( + classification, + runtime_decision=runtime_decision, + ) + return ApprovalOutcome( + request_id, + ApprovalStatus.APPROVED.value, + "runtime_gate_allow", + ) + + def record_runtime_block( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> None: + """Persist a verified Runtime Gate BLOCK decision as terminal evidence.""" + + request_id = self._write_runtime_decision_record( + classification, + runtime_decision=runtime_decision, + ) + try: + self.evidence_store.transition( + request_id, + ApprovalStatus.BLOCKED.value, + error_class="runtime_gate_block", + ) + except ApprovalEvidenceError as exc: + raise ApprovalFlowError("runtime decision evidence persistence failed") from exc + def record_execution_result(self, outcome: ApprovalOutcome, response: dict[str, Any]) -> None: """Append execution result evidence for an approved downstream call.""" @@ -277,6 +317,7 @@ def _pending_record( created_at: int, expires_at: int | None, runtime_decision: RuntimeGateDecision | None, + approval_token_hash: str | None = None, granted_by_request_id: str | None = None, ) -> PendingApproval: return PendingApproval( @@ -297,11 +338,32 @@ def _pending_record( expires_at=expires_at, decision_audit_id=None if runtime_decision is None else runtime_decision.audit_id, decision_receipt_sha256=None if runtime_decision is None else runtime_decision.receipt_digest, - approval_token_hash=self.approval_server.token_hash, + approval_token_hash=approval_token_hash, matched_policy_rule=classification.policy_evaluation.policy_rule_id, granted_by_request_id=granted_by_request_id, ) + def _write_runtime_decision_record( + self, + classification: ClassifiedToolCall, + *, + runtime_decision: RuntimeGateDecision, + ) -> str: + now = int(time.time()) + request_id = str(uuid.uuid4()) + record = self._pending_record( + classification, + request_id=request_id, + created_at=now, + expires_at=None, + runtime_decision=runtime_decision, + ) + try: + self.evidence_store.write_pending(record) + except ApprovalEvidenceError as exc: + raise ApprovalFlowError("runtime decision evidence persistence failed") from exc + return request_id + def _prompt_for( self, classification: ClassifiedToolCall, diff --git a/agentveil_mcp_proxy/cli.py b/agentveil_mcp_proxy/cli.py index f656258..f8567da 100644 --- a/agentveil_mcp_proxy/cli.py +++ b/agentveil_mcp_proxy/cli.py @@ -27,6 +27,7 @@ from agentveil.agent import AVPAgent from agentveil.delegation import DelegationInvalid, verify_delegation +from agentveil.exceptions import AVPError, AVPNotFoundError, AVPValidationError from agentveil_mcp_proxy.approval import ( ApprovalManager, ApprovalServer, @@ -635,6 +636,68 @@ def _load_proxy_agent( raise ProxyCliError("proxy identity invalid", exit_code=1) from exc +def _check_backend_preflight( + *, + identity: Mapping[str, Any], + config: ProxyConfig, + passphrase: str | None, + timeout_seconds: float = 5.0, +) -> str | None: + """Issue two read-only GETs to verify backend readiness. + + Returns a failure description on the first failure, or ``None`` on + success. Network and SDK exceptions are sanitized to category + + sender; raw response bodies are not surfaced. No state is mutated + on the backend (only ``GET /v1/health`` and + ``GET /v1/onboarding/{did}``). + """ + + try: + agent = _load_proxy_agent( + identity=identity, + config=config, + passphrase=passphrase, + timeout=timeout_seconds, + ) + except ProxyCliError as exc: + return f"backend preflight skipped: {exc}" + + base_url = config.avp.base_url + try: + agent.health() + except AVPError as exc: + return ( + f"backend health check failed at {base_url}: " + f"status {exc.status_code}" + ) + except Exception as exc: + return ( + f"backend unreachable at {base_url}: " + f"{type(exc).__name__}" + ) + + try: + agent.get_onboarding_status() + except AVPNotFoundError: + did = identity.get("did") + return ( + f"agent {did} is not registered with backend at {base_url}; " + "run `agentveil-mcp-proxy register` to register this identity" + ) + except AVPError as exc: + return ( + "backend onboarding status check failed: " + f"status {exc.status_code}" + ) + except Exception as exc: + return ( + "backend onboarding status unreachable: " + f"{type(exc).__name__}" + ) + + return None + + def doctor_proxy( *, home: Path | None = None, @@ -642,11 +705,20 @@ def doctor_proxy( passphrase: str | None = None, passphrase_file: Path | None = None, out: TextIO | None = None, + check_backend: bool = False, ) -> int: - """Validate local proxy files without starting transport.""" + """Validate local proxy files without starting transport. + + When ``check_backend`` is True, also issue two read-only GET + requests against the configured backend (``/v1/health`` and + ``/v1/onboarding/{did}``) to confirm reachability and that the + proxy agent identity is registered. No backend state is mutated. + Skipped if any local check already failed. + """ out = out or sys.stdout paths = proxy_paths(home, config_path) + backend_ok = False try: config = load_proxy_config(paths.config_path) identity_path = paths.identity_path(config.avp.agent_name) @@ -703,6 +775,17 @@ def doctor_proxy( else: failures.append(f"control grant invalid: {exc}") + if check_backend and not failures: + backend_failure = _check_backend_preflight( + identity=identity, + config=config, + passphrase=identity_passphrase, + ) + if backend_failure is not None: + failures.append(backend_failure) + else: + backend_ok = True + if failures: for failure in failures: print(f"FAIL: {failure}", file=out) @@ -719,6 +802,11 @@ def doctor_proxy( f"{config.circuit_breaker.cooldown_seconds}s cooldown)", file=out, ) + if backend_ok: + print( + f"OK: backend reachable at {config.avp.base_url}, agent registered", + file=out, + ) for warning in warnings: print(f"WARN: {warning}", file=out) return 0 @@ -727,6 +815,126 @@ def doctor_proxy( return 1 +def _rewrite_proxy_identity_after_register( + *, + identity_path: Path, + agent: Any, + passphrase: str | None, +) -> None: + """Persist updated registration status in the proxy's identity format. + + ``AVPAgent.register(...)`` calls ``self.save()`` which writes a + different file layout to the same path as the proxy identity. We + block that save during ``register_proxy`` and rewrite here using + the proxy's own helpers so the file format and encryption are + preserved. + """ + + if passphrase is None: + payload = plaintext_identity_payload(agent) + else: + payload = encrypted_identity_payload(agent, passphrase) + _secure_write_json(identity_path, payload, force=True) + + +def register_proxy( + *, + home: Path | None = None, + config_path: Path | None = None, + passphrase: str | None = None, + passphrase_file: Path | None = None, + out: TextIO | None = None, +) -> int: + """Register the existing proxy identity with the configured backend. + + Reuses the same identity file ``init`` created (preserving the DID + and the encrypted-at-rest format), calls the SDK's + ``AVPAgent.register()`` against the same ``base_url`` from the + proxy config, and rewrites the identity file with + ``registered: true``. Backend network errors are sanitized to + category + status code; private key material and raw response + bodies are never printed. + """ + + out = out or sys.stdout + paths = proxy_paths(home, config_path) + config = load_proxy_config(paths.config_path) + identity_path = paths.identity_path(config.avp.agent_name) + identity = _read_json(identity_path, "agent identity") + identity_passphrase = _resolve_existing_identity_passphrase( + identity, + passphrase=passphrase, + passphrase_file=passphrase_file, + ) + agent = _load_proxy_agent( + identity=identity, + config=config, + passphrase=identity_passphrase, + ) + + # ``AVPAgent.register`` calls ``self.save()`` internally which would + # rewrite ~/.avp/agents/.json in the SDK's plaintext format + # and DOWNGRADE the proxy's encrypted identity. Replace with a + # no-op on this instance; we rewrite the file ourselves below using + # the proxy's own payload helpers. + agent.save = lambda *_args, **_kwargs: None # type: ignore[method-assign] + + base_url = config.avp.base_url + try: + agent.register() + except AVPValidationError as exc: + if getattr(exc, "status_code", 0) == 409: + # ``AVPAgent.register`` raises 409 before it gets to set its + # ``_is_registered`` / ``_is_verified`` flags, so the loaded + # agent's in-memory state still reads False. The backend has + # already accepted this DID, so reflect that locally before + # writing the identity file; otherwise the rewritten file + # would say ``registered: false`` while the CLI told the user + # the identity is already registered. + if hasattr(agent, "_is_registered"): + agent._is_registered = True + if hasattr(agent, "_is_verified"): + agent._is_verified = True + _rewrite_proxy_identity_after_register( + identity_path=identity_path, + agent=agent, + passphrase=identity_passphrase, + ) + print( + f"OK: agent {agent.did} already registered at {base_url}", + file=out, + ) + return 0 + print( + f"FAIL: registration rejected at {base_url}: status {exc.status_code}", + file=out, + ) + return 1 + except AVPError as exc: + print( + f"FAIL: registration failed at {base_url}: status {exc.status_code}", + file=out, + ) + return 1 + except Exception as exc: + print( + f"FAIL: backend unreachable at {base_url}: {type(exc).__name__}", + file=out, + ) + return 1 + + _rewrite_proxy_identity_after_register( + identity_path=identity_path, + agent=agent, + passphrase=identity_passphrase, + ) + print( + f"OK: agent {agent.did} registered at {base_url}", + file=out, + ) + return 0 + + def _grant_scope_for_reissue(scope: Any) -> tuple[list[str], dict[str, Any] | None]: if not isinstance(scope, list): raise ProxyCliError("control grant scope invalid", exit_code=1) @@ -1111,6 +1319,14 @@ def build_parser() -> argparse.ArgumentParser: doctor = subparsers.add_parser("doctor", help="Validate local proxy config and files") _add_common_path_args(doctor) _add_passphrase_args(doctor) + doctor.add_argument( + "--check-backend", + action="store_true", + help=( + "Issue read-only GETs to verify the configured backend is " + "reachable and the proxy agent is registered." + ), + ) run = subparsers.add_parser("run", help="Run stdio MCP passthrough") _add_common_path_args(run) @@ -1119,6 +1335,13 @@ def build_parser() -> argparse.ArgumentParser: run.add_argument("--auto-deny", action="store_true", help="Deny every approval-required action") run.add_argument("--headless-policy", type=Path, default=None, help="Headless approval policy JSON path") + register = subparsers.add_parser( + "register", + help="Register the existing proxy identity with the configured backend", + ) + _add_common_path_args(register) + _add_passphrase_args(register) + reissue = subparsers.add_parser("reissue-grant", help="Issue a fresh local control grant") _add_common_path_args(reissue) _add_passphrase_args(reissue) @@ -1184,6 +1407,7 @@ def main(argv: list[str] | None = None) -> int: config_path=args.config, passphrase=args.passphrase, passphrase_file=args.passphrase_file, + check_backend=args.check_backend, ) if args.command == "run": return run_proxy( @@ -1195,6 +1419,13 @@ def main(argv: list[str] | None = None) -> int: auto_deny=args.auto_deny, headless_policy_path=args.headless_policy, ) + if args.command == "register": + return register_proxy( + home=args.home, + config_path=args.config, + passphrase=args.passphrase, + passphrase_file=args.passphrase_file, + ) if args.command == "reissue-grant": reissue_grant( home=args.home, diff --git a/agentveil_mcp_proxy/evidence/store.py b/agentveil_mcp_proxy/evidence/store.py index defb0f0..a1fd9da 100644 --- a/agentveil_mcp_proxy/evidence/store.py +++ b/agentveil_mcp_proxy/evidence/store.py @@ -84,6 +84,9 @@ class ApprovalStatus(str, Enum): ApprovalStatus.DENIED.value, ApprovalStatus.EXPIRED.value, ApprovalStatus.INVALIDATED.value, + ApprovalStatus.EXECUTED.value, + ApprovalStatus.BLOCKED.value, + ApprovalStatus.ERROR.value, }, ApprovalStatus.APPROVED.value: { ApprovalStatus.EXECUTED.value, diff --git a/agentveil_mcp_proxy/passthrough.py b/agentveil_mcp_proxy/passthrough.py index 358984d..0dc0a00 100644 --- a/agentveil_mcp_proxy/passthrough.py +++ b/agentveil_mcp_proxy/passthrough.py @@ -352,6 +352,25 @@ def _blocked_error( return jsonrpc_error(request_id, JSONRPC_POLICY_BLOCKED, message, data=data) +def _runtime_evidence_unavailable_error( + request_id: Any, + decision: RuntimeGateDecision, +) -> dict[str, Any]: + data: dict[str, Any] = { + "status": "blocked", + "reason": "runtime_gate_evidence_unavailable", + "decision": decision.decision, + } + if decision.audit_id is not None: + data["audit_id"] = decision.audit_id + return jsonrpc_error( + request_id, + JSONRPC_POLICY_BLOCKED, + "runtime decision evidence unavailable", + data=data, + ) + + def _approval_required_error( request_id: Any, *, @@ -723,7 +742,16 @@ def _runtime_gate_error_response( self._record_runtime_gate_events() if decision.decision == DECISION_ALLOW: - return None, None + if self.approval_manager is None: + return None, None + try: + outcome = self.approval_manager.record_runtime_allow( + classification, + runtime_decision=decision, + ) + except ApprovalFlowError: + return _runtime_evidence_unavailable_error(request_id, decision), None + return None, outcome if decision.decision == DECISION_WAITING: return self._approval_flow_response( classification, @@ -732,6 +760,14 @@ def _runtime_gate_error_response( runtime_decision=decision, ) if decision.decision == DECISION_BLOCK: + if self.approval_manager is not None: + try: + self.approval_manager.record_runtime_block( + classification, + runtime_decision=decision, + ) + except ApprovalFlowError: + return _runtime_evidence_unavailable_error(request_id, decision), None return _blocked_error( request_id, "blocked by AVP Runtime Gate", diff --git a/agentveil_paperclip/doctor.py b/agentveil_paperclip/doctor.py index 0bccfca..dde1425 100644 --- a/agentveil_paperclip/doctor.py +++ b/agentveil_paperclip/doctor.py @@ -9,6 +9,7 @@ from __future__ import annotations +import os import shutil from dataclasses import dataclass from pathlib import Path @@ -34,11 +35,16 @@ class DoctorReport: codex_mcp_config: CheckResult -def _which(command: str) -> Path | None: +def _which(command: str) -> str | None: """Resolve `command` on PATH without executing it.""" - resolved = shutil.which(command) - return Path(resolved) if resolved else None + return shutil.which(command) + + +def _default_home() -> Path: + """Resolve the user home, honoring explicit HOME overrides in tests.""" + + return Path(os.environ.get("HOME") or Path.home()) def _claude_mcp_config_present(home: Path, cwd: Path) -> tuple[bool, str | None]: @@ -81,7 +87,7 @@ def collect_doctor_report( ) -> DoctorReport: """Run the read-only diagnostic checks and return a structured report.""" - home = home or Path.home() + home = home or _default_home() cwd = cwd or Path.cwd() proxy_path = _which("agentveil-mcp-proxy") diff --git a/docs/MCP_PROXY_QUICKSTART.md b/docs/MCP_PROXY_QUICKSTART.md new file mode 100644 index 0000000..1be5854 --- /dev/null +++ b/docs/MCP_PROXY_QUICKSTART.md @@ -0,0 +1,382 @@ +# MCP Proxy Quickstart + +This is the cold customer path for wrapping one downstream MCP server with the +AgentVeil MCP Proxy and producing an offline-verifiable evidence bundle. Every +step describes behavior that is implemented and locally verifiable. Anything +this quickstart does not cover yet is listed under +[What this quickstart does NOT prove](#what-this-quickstart-does-not-prove). + +For day-2 operations, headless mode, multi-IDE deployment, evidence vacuum, and +identity migration, see [`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). + +## Prerequisites + +- Python 3.10+ on macOS or Linux. Windows is supported but the proxy README + flags a known orphan-process race on Windows; use a supervisor on Windows. +- A downstream MCP server you want to wrap. The downstream can be any MCP + server: filesystem, GitHub, custom company server. You provide its launch + command and arguments. +- Backend access: the proxy verifies signed AgentVeil Runtime Gate + `decision_receipt/2` artifacts against a pinned signer-DID set. For + `https://agentveil.dev` the trusted DIDs are bundled with the SDK; for any + other base URL pass `--trusted-signer-did` to `init`. + +## Install + +```bash +pip install agentveil +``` + +This installs the core `agentveil` SDK and registers the +`agentveil-mcp-proxy` console script. + +## Step 1 — `agentveil-mcp-proxy init` + +```bash +agentveil-mcp-proxy init +``` + +By default this creates an **encrypted local identity**, a self-issued +delegation receipt scoped to the `mcp_proxy` category, and a starter config at +`~/.avp/mcp-proxy/config.json`. The encrypted-identity passphrase is collected +interactively, from `--passphrase-file`, or from the `AVP_PROXY_PASSPHRASE` +environment variable. See +[`MCP_PROXY_OPERATIONS.md#security-trade-offs-by-passphrase-source`](MCP_PROXY_OPERATIONS.md#security-trade-offs-by-passphrase-source). + +### What `init` actually does + +- Generates a fresh Ed25519 keypair locally. +- Writes the identity, control grant, and config under `~/.avp/` with + owner-only permissions (0o600). +- Issues a local self-signed delegation receipt (issuer = subject = the new + agent DID), scoped to the `mcp_proxy` category. + +### What `init` does NOT do + +- It does not register the new agent with the AgentVeil backend. +- It does not contact `--base-url` at all. +- Until the agent is registered with the backend (see Step 3 + `agentveil-mcp-proxy register`), the proxy cannot exchange Runtime + Gate decisions with `https://agentveil.dev`. + +Custom base URL example: + +```bash +agentveil-mcp-proxy init \ + --base-url https://your-private-avp.example \ + --trusted-signer-did did:key:zYourPinnedSignerDid +``` + +## Step 2 — `agentveil-mcp-proxy doctor` (local-only) + +```bash +agentveil-mcp-proxy doctor +``` + +This validates local files only. Output on a fresh successful `init` looks +like: + +```text +OK: config /home/me/.avp/mcp-proxy/config.json +OK: identity /home/me/.avp/agents/agentveil-mcp-proxy.json +OK: control grant /home/me/.avp/mcp-proxy/agentveil-mcp-proxy.control-grant.json +OK: trusted signers 2 +OK: circuit breaker thresholds (5 failures, 60s window, 30s cooldown) +``` + +`doctor` checks: file permissions are 0600, the identity decrypts with the +configured passphrase, the agent DID matches its stored identity, the control +grant is signed by that identity, issuer/subject match, the trusted-signer DID +set is non-empty, and the control grant has not expired. + +`doctor` exits non-zero on any of these failures, with a specific FAIL line. +It is intentionally offline: it does not call the backend. + +## Step 3 — `agentveil-mcp-proxy register` + +```bash +agentveil-mcp-proxy register +``` + +Registers the exact identity created by `init` (same DID, same key) with +the backend at the `base_url` from your proxy config. This is the bridge +between local identity creation and the next step +(`doctor --check-backend`). Registration is required once per identity; +re-runs are idempotent. + +### What `register` actually does + +- Loads the same proxy identity file `doctor` and `run` use (encrypted + with the same passphrase mechanism). +- Calls the SDK's `AVPAgent.register()` against the configured + `base_url`: registration POST → Proof-of-Work solve → verify POST. +- Updates the local identity file's `registered` flag to `true`. +- Preserves the identity's encrypted-at-rest format (no plaintext + downgrade). + +### What `register` does NOT do + +- It does not provision an agent card / capabilities / endpoint URL / + provider on the backend. If you need those, use the `agentveil` SDK + directly with the keyword arguments documented on + `AVPAgent.register(...)`. +- It does not exchange Runtime Gate decisions or attestations on its + own; that happens later when the proxy actually serves tool calls. +- It does not prove the proxy can ship a verified evidence bundle — + that still requires Steps 7-9 below. + +### Expected output + +Success: + +```text +OK: agent did:key:z6Mk... registered at https://agentveil.dev +``` + +Idempotent re-run when the agent already exists on the backend: + +```text +OK: agent did:key:z6Mk... already registered at https://agentveil.dev +``` + +Sanitized failure shapes: + +```text +FAIL: registration rejected at : status 4xx +FAIL: registration failed at : status 5xx +FAIL: backend unreachable at : +``` + +Raw response bodies and stack traces are never printed. Re-run after +fixing the underlying issue. + +## Step 4 — `agentveil-mcp-proxy doctor --check-backend` + +```bash +agentveil-mcp-proxy doctor --check-backend +``` + +Adds two read-only HTTP GETs against the configured backend: + +- `GET /v1/health` — confirms the backend is reachable. +- `GET /v1/onboarding/{did}` — confirms the proxy agent identity is + registered with the backend. + +On success the output adds one extra line: + +```text +OK: backend reachable at https://agentveil.dev, agent registered +``` + +On failure the output starts with one of: + +```text +FAIL: backend unreachable at : +FAIL: backend health check failed at : status +FAIL: agent is not registered with backend at ; run `agentveil-mcp-proxy register` to register this identity +FAIL: backend onboarding status check failed: status +``` + +These failures are sanitized — raw response bodies and stack traces are not +printed. If you see "not registered", re-run Step 3 +(`agentveil-mcp-proxy register`) before continuing. + +`doctor` skips the backend preflight when any local check has already +failed. + +## Step 5 — Configure the downstream MCP server + +Edit `~/.avp/mcp-proxy/config.json` and set `downstream.command` and +`downstream.args` to the MCP server you want to wrap. Example for a +filesystem MCP server: + +```json +{ + "downstream": { + "name": "filesystem", + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-filesystem", "/Users/me/work"] + } +} +``` + +`name` is the server label the proxy uses internally and in evidence records. +`env` and `env_passthrough` are also supported; see +[`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). The proxy refuses to +forward any `AVP_*` environment variable to the downstream — those names are +reserved for proxy-internal secrets. + +## Step 6 — Point your MCP client at the proxy + +Configure your IDE / MCP client to run `agentveil-mcp-proxy run` as its +server entry point. The MCP Proxy README documents wiring for Claude Desktop, +Cursor, Windsurf, and VS Code: +[`agentveil_mcp_proxy/README.md`](../agentveil_mcp_proxy/README.md). + +When the client starts the proxy: + +```bash +agentveil-mcp-proxy run +``` + +The proxy: re-validates the local files via `doctor`, launches the downstream +MCP server, starts the local HTTP approval UI, and begins serving stdio +JSON-RPC. + +## Step 7 — Trigger one MCP tool call + +Use your MCP client (Claude Desktop, Cursor, Windsurf, VS Code, or any stdio +JSON-RPC client) to invoke a tool on the downstream server. The proxy +classifies the call, applies local policy, and: + +- **ALLOW** policy decisions are forwarded to the downstream server. If the + policy rule routes the call through Runtime Gate (`ASK_BACKEND`), the + verified backend `decision_receipt/2` and the downstream result hash are + recorded into the local SQLite evidence store **when an evidence store is + configured for the run** (this is the default for `agentveil-mcp-proxy run`). +- **APPROVAL** policy decisions open the local browser approval UI bound to + the exact payload hash and matched policy rule. +- **BLOCK** policy decisions return a JSON-RPC error and never reach the + downstream server. Runtime Gate BLOCK decisions taken on the `ASK_BACKEND` + path are also recorded into the local evidence store with the backend's + signed receipt attached. +- **OBSERVE** policy decisions are forwarded without further gating. + +Slice-level note: recording verified Runtime Gate `ALLOW` and `BLOCK` +decisions into the local evidence store is a recent change. Before it +landed, only the approval-required path created proxy-side evidence +records. See `agentveil_mcp_proxy/passthrough.py` and +`agentveil_mcp_proxy/approval/manager.py` for the exact flow. + +## Step 8 — Export the evidence bundle + +After one or more tool calls: + +```bash +agentveil-mcp-proxy export-evidence ./my-bundle.json +``` + +The CLI returns a summary line of the form: + +```text +Evidence exported: ./my-bundle.json (N records, M signed receipts) +``` + +`N` is the count of local evidence records (one per gated tool call); `M` is +the count of backend-signed `decision_receipt/2` artifacts the proxy was +able to fetch via `agent.get_decision_receipt(audit_id)` and embed in the +bundle. If a record references an `audit_id` but the receipt could not be +fetched or the digest did not match, the CLI prints a `WARN` and the +bundle's `unverified_receipt_count` increases. + +The exported bundle file is written with 0600 permissions. + +## Step 9 — Verify the bundle offline + +```bash +agentveil-mcp-proxy verify ./my-bundle.json \ + --trusted-signer-did did:key:zYourPinnedSignerDid +``` + +Pass `--trusted-signer-did` to require an explicit pinned set. If you omit +the flag the verifier will use the signer set embedded in the bundle and +will emit a `WARN` about trusting bundle-embedded signers. For +due-diligence verification, pass the flag. + +Successful output: + +```text +OK: bundle integrity verified, N records, M signed receipts +``` + +The verifier checks, offline: + +1. Local record chain `prev_event_hash` and `record_hash` for every record. +2. Bundle-level `chain_root_hash` matches the last record's hash. +3. For every embedded signed receipt: + - The SHA-256 of the byte-exact JCS receipt matches its key in + `signed_receipts`. + - The DataIntegrityProof / `eddsa-jcs-2022` signature verifies against + one of the pinned signer DIDs. + - `schema_version` is in the accepted set (`decision_receipt/1`, + `decision_receipt/2`). + - `audit_id` is present and well-formed. +4. Field cross-checks between record and embedded receipt: + - `record.payload_hash` == `receipt.payload_hash` + - `record.risk_class` == `receipt.client_risk_class` + - `record.policy_context_hash` == `receipt.client_policy_context_hash` + - `record.decision_audit_id` == `receipt.audit_id` + +The verifier exits non-zero with a specific `EvidenceVerificationError` +message on any of these failures. + +## What this quickstart proves + +After the steps above, the bundle you produced contains, for each gated +tool call: + +- A privacy-preserving local evidence record describing the action class, + risk class, payload hash, policy rule, and decision audit ID. +- The backend-signed Runtime Gate `decision_receipt/2` artifact for every + `ASK_BACKEND` Runtime Gate decision (`ALLOW` / `BLOCK` / + `WAITING_FOR_HUMAN_APPROVAL`) that the proxy issued. +- The downstream result hash for forwarded calls that completed. + +These artifacts are independently verifiable offline against the pinned +backend signer DID set. + +## What this quickstart does NOT prove + +This list is the honest counterpart to the section above. The bundle +**does not** currently contain or prove any of the following: + +- That the **production** AgentVeil backend at `https://agentveil.dev` + signed receipts in any internal demo artifact you may have seen. The + Action Proof Pack v1.2 internal proof harness runs against a local dev + backend with a deterministic dev key, not the production signer set. v1.2 + is an internal proof, not customer production evidence. +- A backend-signed `human_approval_receipt/2` for the WAITING / + approval-required path. The MCP Proxy currently uses a local browser + approval UI; it does not call the backend Human Control API. The bundle + carries the proxy-side local approval record but not a backend-signed + approval artifact. +- A backend-signed `execution_receipt/2`. The backend `/v1/execute` + endpoint exists for capabilities the backend itself adapts; the MCP + Proxy forwards the call to your downstream MCP server instead. There is + no backend-attested execution claim for proxy-forwarded calls. +- Control over agent actions that bypass the MCP Proxy. The proxy gates + only calls that flow through it; raw shell, raw API access, or any path + the MCP client takes without going through the proxy are out of scope. +- Sandbox replacement. The proxy does not contain or restrict the + downstream MCP server's process; use OS-level sandboxing + (container/VM/sandbox) separately if process containment matters. +- A fix to model behavior or model alignment. The proxy controls actions, + not model reasoning or output. + +## Failure modes you may hit + +- `FAIL: agent is not registered with backend at ` — the + proxy identity was created locally but never onboarded. Run Step 3 + (`agentveil-mcp-proxy register`) and then re-run + `doctor --check-backend`. +- `WARN: control grant expires in N days` — the local self-issued + delegation is approaching expiry. Run + `agentveil-mcp-proxy reissue-grant`. +- `WARN: ... default_trust_from_bundle ...` from `verify` — you omitted + `--trusted-signer-did`. For due-diligence verification, always pass the + flag. +- `WARN: N records have decision_audit_id but no matching signed receipt + in bundle` from `export-evidence` — the receipt fetch failed or the + digest did not match. Check network reachability and that the agent + identity matches the agent that produced the records. + +## Where to go next + +- Operations / day-2 reference: + [`MCP_PROXY_OPERATIONS.md`](MCP_PROXY_OPERATIONS.md). +- MCP Proxy README with IDE wiring examples: + [`agentveil_mcp_proxy/README.md`](../agentveil_mcp_proxy/README.md). +- Data handling boundaries: + [`DATA_HANDLING.md`](DATA_HANDLING.md). +- SDK / API surface: + [`API.md`](API.md). diff --git a/tests/test_mcp_proxy_cli.py b/tests/test_mcp_proxy_cli.py index 8c1a905..dde4d5c 100644 --- a/tests/test_mcp_proxy_cli.py +++ b/tests/test_mcp_proxy_cli.py @@ -13,6 +13,7 @@ import agentveil_mcp_proxy.cli as proxy_cli from agentveil.delegation import verify_delegation +from agentveil.exceptions import AVPNotFoundError, AVPServerError, AVPValidationError from agentveil_mcp_proxy.cli import ( AGENTVEIL_DEV_SIGNER_DIDS, MIN_IDENTITY_PASSPHRASE_LENGTH, @@ -22,6 +23,7 @@ init_proxy, main, proxy_paths, + register_proxy, reissue_grant, run_proxy, ) @@ -620,6 +622,379 @@ def test_doctor_fails_when_grant_already_expired(tmp_path): assert secret not in output +def test_doctor_fails_when_identity_file_missing(tmp_path): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_path = proxy_paths(home).identity_path("proxy") + identity_path.unlink() + + out = io.StringIO() + code = doctor_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 1 + output = out.getvalue() + assert "FAIL: agent identity not found" in output + + +class _StubBackendAgent: + """Minimal stub for `--check-backend` tests. + + Counts ``health()`` and ``get_onboarding_status()`` invocations so a + regression test can assert the doctor does not call the backend + without ``--check-backend``. + """ + + def __init__(self, *, did: str, health_raises: Exception | None = None, + onboarding_raises: Exception | None = None): + self.did = did + self._health_raises = health_raises + self._onboarding_raises = onboarding_raises + self.health_calls = 0 + self.onboarding_calls = 0 + + def health(self) -> dict: + self.health_calls += 1 + if self._health_raises is not None: + raise self._health_raises + return {"status": "ok"} + + def get_onboarding_status(self) -> dict: + self.onboarding_calls += 1 + if self._onboarding_raises is not None: + raise self._onboarding_raises + return {"status": "verified"} + + +def _install_stub_agent(monkeypatch, agent: _StubBackendAgent) -> None: + monkeypatch.setattr( + proxy_cli, + "_load_proxy_agent", + lambda **_kwargs: agent, + ) + + +def test_doctor_local_only_does_not_call_backend(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 0 + assert stub.health_calls == 0 + assert stub.onboarding_calls == 0 + assert "backend reachable" not in out.getvalue() + + +def test_doctor_check_backend_succeeds_with_registered_agent(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 0 + assert stub.health_calls == 1 + assert stub.onboarding_calls == 1 + output = out.getvalue() + assert "OK: backend reachable at " in output + assert "agent registered" in output + + +def test_doctor_check_backend_fails_when_backend_unreachable(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + health_raises=ConnectionError("connection refused"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: backend unreachable at" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 0 + assert "connection refused" not in output + + +def test_doctor_check_backend_fails_when_agent_not_registered(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + onboarding_raises=AVPNotFoundError("agent not found", 404, "agent not found"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: agent " in output + assert "is not registered with backend at" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 1 + + +def test_doctor_check_backend_fails_on_server_error(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent( + did=identity["did"], + health_raises=AVPServerError("server error", 500, "server error"), + ) + _install_stub_agent(monkeypatch, stub) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + output = out.getvalue() + assert "FAIL: backend health check failed at" in output + assert "status 500" in output + assert stub.health_calls == 1 + assert stub.onboarding_calls == 0 + + +def test_doctor_check_backend_skipped_when_local_checks_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + stub = _StubBackendAgent(did=identity["did"]) + _install_stub_agent(monkeypatch, stub) + + # Break a local check (empty signer set) so backend preflight is skipped. + config = _load(result.config_path) + config["avp"]["trusted_signer_dids"] = [] + result.config_path.write_text(json.dumps(config), encoding="utf-8") + os.chmod(result.config_path, 0o600) + + out = io.StringIO() + code = doctor_proxy( + home=home, + passphrase=TEST_PASSPHRASE, + out=out, + check_backend=True, + ) + + assert code == 1 + assert stub.health_calls == 0 + assert stub.onboarding_calls == 0 + assert "trusted_signer_dids" in out.getvalue() + + +def _install_fake_register(monkeypatch, *, raises=None, capture_dids=None): + """Replace ``AVPAgent.register`` with a deterministic fake. + + ``raises`` — exception instance to raise instead of completing. + ``capture_dids`` — optional list to append the agent DID for each call. + """ + + from agentveil.agent import AVPAgent + + def fake_register(self): + if capture_dids is not None: + capture_dids.append(self.did) + if raises is not None: + raise raises + self._is_registered = True + self._is_verified = True + return {"did": self.did, "onboarding_pending": True} + + monkeypatch.setattr(AVPAgent, "register", fake_register) + + +def test_register_loads_existing_proxy_did(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_before = _load(result.identity_path) + captured: list[str] = [] + _install_fake_register(monkeypatch, capture_dids=captured) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + assert code == 0 + assert captured == [identity_before["did"]] + # Identity file still exists, still encrypted, same DID, registered flag set. + identity_after = _load(result.identity_path) + assert identity_after["did"] == identity_before["did"] + assert identity_after["encrypted"] is True + assert identity_after.get("registered") is True + + +def test_register_success_prints_sanitized_ok_and_no_secret(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity = _load(result.identity_path) + secret = _secret_material(identity) + _install_fake_register(monkeypatch) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 0 + assert "OK: agent " in output + assert identity["did"] in output + assert secret not in output + assert "private_key" not in output + assert "encryption_salt" not in output + + +def test_register_backend_failure_prints_sanitized_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + secret = _secret_material(_load(result.identity_path)) + _install_fake_register( + monkeypatch, + raises=AVPServerError("server boom raw body", 500, "raw response detail"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 1 + assert "FAIL: registration failed at " in output + assert "status 500" in output + assert "server boom raw body" not in output + assert "raw response detail" not in output + assert secret not in output + + +def test_register_backend_unreachable_prints_sanitized_fail(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + secret = _secret_material(_load(result.identity_path)) + _install_fake_register( + monkeypatch, + raises=ConnectionError("connect timeout to host"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 1 + assert "FAIL: backend unreachable at " in output + assert "ConnectionError" in output + assert "connect timeout to host" not in output + assert secret not in output + + +def test_register_already_registered_returns_ok(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + identity_before = _load(result.identity_path) + _install_fake_register( + monkeypatch, + raises=AVPValidationError("agent already exists", 409, "conflict"), + ) + + out = io.StringIO() + code = register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=out) + + output = out.getvalue() + assert code == 0 + assert "already registered" in output + + # The 409 path must keep the identity encrypted and update the + # local `registered` / `verified` flags to True so the file is not + # left in a stale state contradicting the CLI message. + identity_after = _load(result.identity_path) + assert identity_after["did"] == identity_before["did"] + assert identity_after["encrypted"] is True + assert "private_key_hex" not in identity_after + assert identity_after.get("registered") is True + assert identity_after.get("verified") is True + if os.name != "nt": + assert _mode(result.identity_path) == 0o600 + + +def test_register_encrypted_identity_requires_passphrase(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + _install_fake_register(monkeypatch) + monkeypatch.delenv("AVP_PROXY_PASSPHRASE", raising=False) + + out = io.StringIO() + try: + register_proxy(home=home, out=out) + except ProxyCliError as exc: + assert "passphrase" in str(exc) + assert exc.exit_code == 1 + else: + raise AssertionError("expected register to require encrypted identity passphrase") + + +def test_register_does_not_downgrade_encrypted_identity_to_plaintext(tmp_path, monkeypatch): + home = tmp_path / "avp-home" + result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + encrypted_blob_before = _load(result.identity_path).get("encrypted_blob") + assert encrypted_blob_before # sanity: init produced an encrypted identity + _install_fake_register(monkeypatch) + + register_proxy(home=home, passphrase=TEST_PASSPHRASE, out=io.StringIO()) + + identity_after = _load(result.identity_path) + assert identity_after["encrypted"] is True + # No plaintext private key should ever appear in the file after register. + assert "private_key_hex" not in identity_after + if os.name != "nt": + assert _mode(result.identity_path) == 0o600 + + +def test_register_wired_through_main(tmp_path, monkeypatch, capsys): + home = tmp_path / "avp-home" + init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) + _install_fake_register(monkeypatch) + + exit_code = main([ + "register", + "--home", str(home), + "--passphrase", TEST_PASSPHRASE, + ]) + out, _err = capsys.readouterr() + + assert exit_code == 0 + assert "OK: agent " in out + + def test_reissue_grant_creates_new_grant_with_default_ttl(tmp_path): home = tmp_path / "avp-home" result = init_proxy(home=home, agent_name="proxy", passphrase=TEST_PASSPHRASE) diff --git a/tests/test_mcp_proxy_runtime_gate.py b/tests/test_mcp_proxy_runtime_gate.py index 2f89439..147a786 100644 --- a/tests/test_mcp_proxy_runtime_gate.py +++ b/tests/test_mcp_proxy_runtime_gate.py @@ -17,8 +17,15 @@ from agentveil.agent import AVPAgent from agentveil.delegation import _public_key_to_did +from agentveil_mcp_proxy.approval import ApprovalManager, ApprovalServer from agentveil_mcp_proxy.circuit_breaker import CircuitBreaker, CircuitBreakerConfig from agentveil_mcp_proxy.classification import ToolCallClassifier +from agentveil_mcp_proxy.evidence import ( + ApprovalEvidenceStore, + ApprovalStatus, + export_evidence_bundle, + verify_evidence_bundle, +) from agentveil_mcp_proxy.passthrough import ( DownstreamConfig, JSONRPC_APPROVAL_REQUIRED, @@ -210,6 +217,20 @@ def get_decision_receipt(self, audit_id: str) -> str: raise AssertionError("inline decision_receipt_jcs should avoid a receipt fetch") +class FetchableRecordingAgent(RecordingAgent): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.receipts: dict[str, str] = {} + + def runtime_evaluate(self, **kwargs): + response = super().runtime_evaluate(**kwargs) + self.receipts[response["audit_id"]] = response["decision_receipt_jcs"] + return response + + def get_decision_receipt(self, audit_id: str) -> str: + return self.receipts[audit_id] + + class SequencedReceiptAgent: did = AGENT_DID @@ -269,7 +290,13 @@ def _echo_downstream(tmp_path: Path, log_path: Path) -> Path: return script -def _passthrough(tmp_path: Path, gate: object, config: ProxyConfig) -> tuple[McpPassthrough, Path]: +def _passthrough( + tmp_path: Path, + gate: object, + config: ProxyConfig, + *, + approval_manager: ApprovalManager | None = None, +) -> tuple[McpPassthrough, Path]: log_path = tmp_path / "downstream.log" passthrough = McpPassthrough( DownstreamConfig( @@ -280,6 +307,7 @@ def _passthrough(tmp_path: Path, gate: object, config: ProxyConfig) -> tuple[Mcp ), classifier=ToolCallClassifier(config, server_name="github"), runtime_gate_factory=lambda: gate, + approval_manager=approval_manager, ) return passthrough, log_path @@ -553,6 +581,92 @@ def test_verified_allow_forwards_downstream(tmp_path): assert len(gate.calls) == 1 +def test_verified_allow_records_runtime_receipt_and_downstream_result(tmp_path): + config = _config() + digest = "aa" * 32 + gate = StaticGate(RuntimeGateDecision( + decision="ALLOW", + audit_id=AUDIT_ID, + approval_id=None, + receipt_digest=digest, + receipt_body={}, + )) + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-allow", + ) + passthrough, log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + records = store.list_records() + + assert _responses(client_out.getvalue()) == [{ + "jsonrpc": "2.0", + "id": "call-1", + "result": {"content": [{"type": "text", "text": "forwarded"}]}, + }] + assert log_path.read_text(encoding="utf-8").splitlines() == ["tools/call"] + assert len(records) == 1 + record = records[0] + assert record.status == ApprovalStatus.EXECUTED.value + assert record.decision_audit_id == AUDIT_ID + assert record.decision_receipt_sha256 == digest + assert record.result_status == "executed" + assert record.result_hash is not None + assert record.approval_token_hash is None + assert record.approval_decided_by is None + + +def test_verified_allow_export_bundle_attaches_signed_decision_receipt(tmp_path): + config = _config() + agent = FetchableRecordingAgent(decision="ALLOW") + gate = RuntimeGateClient(agent=agent, config=config, control_grant={"id": "grant"}) + bundle_path = tmp_path / "evidence-bundle.json" + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-bundle", + ) + passthrough, _log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + bundle = export_evidence_bundle( + store, + bundle_path, + proxy_identity_did=AGENT_DID, + trusted_signer_dids=[BACKEND_DID], + receipt_fetcher=agent.get_decision_receipt, + ) + + result = verify_evidence_bundle(bundle, trusted_signer_dids=[BACKEND_DID]) + assert result.valid is True + assert result.record_count == 1 + assert result.signed_receipt_count == 1 + assert result.unverified_receipt_count == 0 + assert result.warnings == () + + def test_block_does_not_forward_and_returns_sanitized_error(tmp_path): config = _config() gate = StaticGate(RuntimeGateDecision( @@ -576,6 +690,52 @@ def test_block_does_not_forward_and_returns_sanitized_error(tmp_path): assert not log_path.exists() +def test_verified_block_records_runtime_receipt_without_forwarding(tmp_path): + config = _config() + digest = "aa" * 32 + gate = StaticGate(RuntimeGateDecision( + decision="BLOCK", + audit_id=AUDIT_ID, + approval_id=None, + receipt_digest=digest, + receipt_body={}, + )) + with ApprovalEvidenceStore(tmp_path / "evidence.sqlite") as store: + manager = ApprovalManager( + evidence_store=store, + approval_server=ApprovalServer(), + config=config, + client_id="pytest", + session_id="session-runtime-block", + ) + passthrough, log_path = _passthrough( + tmp_path, + gate, + config, + approval_manager=manager, + ) + client_out = io.StringIO() + + assert passthrough.run_stdio(io.StringIO(_tool_call()), client_out) == 0 + + records = store.list_records() + + response = _responses(client_out.getvalue())[0] + assert response["error"]["code"] == JSONRPC_POLICY_BLOCKED + assert response["error"]["data"]["reason"] == "runtime_gate_block" + assert SECRET not in client_out.getvalue() + assert not log_path.exists() + assert len(records) == 1 + record = records[0] + assert record.status == ApprovalStatus.BLOCKED.value + assert record.decision_audit_id == AUDIT_ID + assert record.decision_receipt_sha256 == digest + assert record.result_status == "blocked" + assert record.error_class == "runtime_gate_block" + assert record.approval_token_hash is None + assert record.approval_decided_by is None + + def test_waiting_does_not_forward_and_returns_approval_required_shape(tmp_path): config = _config() gate = StaticGate(RuntimeGateDecision(