From f3a33fed8eff78521b639d8120d83e6b4a5aa8c0 Mon Sep 17 00:00:00 2001 From: Oleg Bk Date: Thu, 28 May 2026 15:01:42 +0200 Subject: [PATCH] feat(egress): add controlled_egress SDK helper Add an SDK-owned HTTPS egress helper that evaluates Runtime Gate, performs the send only on ALLOW, and emits agent-signed egress_receipt/1 artifacts with dual hash handling.\n\nIncludes SDK-local receipt signer/verifier coverage, DID binding checks, and focused stub-driven tests.\n\nImplemented by Claude and reviewed with assistance from Codex. --- agentveil/__init__.py | 16 + agentveil/agent.py | 44 ++- agentveil/egress.py | 511 +++++++++++++++++++++++++++++++ tests/test_egress_sdk_wrapper.py | 420 +++++++++++++++++++++++++ 4 files changed, 990 insertions(+), 1 deletion(-) create mode 100644 agentveil/egress.py create mode 100644 tests/test_egress_sdk_wrapper.py diff --git a/agentveil/__init__.py b/agentveil/__init__.py index 119bbc6..2ae365b 100644 --- a/agentveil/__init__.py +++ b/agentveil/__init__.py @@ -13,6 +13,15 @@ """ from agentveil.agent import AVPAgent +from agentveil.egress import ( + ControlledEgressOutcome, + EgressPolicyViolationError, + EgressReceiptProofError, + EgressReceiptVerificationError, + controlled_egress, + sign_egress_receipt, + verify_egress_receipt, +) from agentveil.mock import AVPMockAgent from agentveil.proof import ProofVerificationError, verify_proof_packet, verify_signed_jcs from agentveil.results import ControlledActionOutcome, IntegrationPreflightReport, ProofPacket @@ -32,9 +41,16 @@ "AVPAgent", "AVPMockAgent", "ControlledActionOutcome", + "ControlledEgressOutcome", + "EgressPolicyViolationError", + "EgressReceiptProofError", + "EgressReceiptVerificationError", "IntegrationPreflightReport", "ProofPacket", "ProofVerificationError", + "controlled_egress", + "sign_egress_receipt", + "verify_egress_receipt", "verify_proof_packet", "verify_signed_jcs", "avp_tracked", diff --git a/agentveil/agent.py b/agentveil/agent.py index fbb0e97..3902923 100644 --- a/agentveil/agent.py +++ b/agentveil/agent.py @@ -19,7 +19,7 @@ from copy import deepcopy from datetime import datetime, timedelta, timezone from importlib.metadata import PackageNotFoundError, version -from typing import Any, Optional +from typing import Any, Mapping, Optional import httpx from nacl.signing import SigningKey, VerifyKey @@ -1623,6 +1623,48 @@ def controlled_action( reason=decision.get("reason", "runtime_gate_blocked"), ) + def controlled_egress( + self, + *, + host: str, + port: int, + method: str, + path: str, + headers: Optional[Mapping[str, str]], + body: bytes, + credential_or_principal_class: str, + policy_id: str, + delegation_receipt: Mapping[str, Any], + timeout_seconds: float = 30.0, + ) -> "ControlledEgressOutcome": + """Perform an HTTPS egress through the AVP-controlled boundary. + + Returns a ``ControlledEgressOutcome`` carrying an **agent-signed** + ``egress_receipt/1`` for ALLOW/BLOCK/FAILED outcomes (no receipt + on ``WAITING_FOR_HUMAN_APPROVAL`` in v0.1). The helper performs + the HTTPS send itself; calls that bypass this method are not + recorded. + + Receipts are signed by the agent's own identity, not by the AVP + backend. Verify with + ``verify_egress_receipt(receipt_jcs, trusted_signer_dids=[self.did])``. + """ + from agentveil.egress import controlled_egress as _controlled_egress + + return _controlled_egress( + agent=self, + host=host, + port=port, + method=method, + path=path, + headers=headers, + body=body, + credential_or_principal_class=credential_or_principal_class, + policy_id=policy_id, + delegation_receipt=delegation_receipt, + timeout_seconds=timeout_seconds, + ) + def execute_after_approval( self, audit_id: str, diff --git a/agentveil/egress.py b/agentveil/egress.py new file mode 100644 index 0000000..b1cc638 --- /dev/null +++ b/agentveil/egress.py @@ -0,0 +1,511 @@ +"""Agent-signed EgressReceipt v0 signer, verifier, and controlled-egress helper. + +EgressReceipt v0 proves a network-egress attempt that the AVP-controlled +boundary (this SDK helper) actually performed. Receipts are signed with the +agent's own identity DID. They are **agent-signed**, not backend-signed. + +Boundary: + - The helper performs the HTTPS send itself via embedded ``httpx.Client``. + Calls that route through other code paths (raw ``requests.post(...)``, + raw sockets, subprocesses, other libraries) are NOT recorded. + - Runtime Gate evaluates the egress; the helper does not open the + connection on BLOCK or WAITING. + - A future slice may add a backend ``/v1/egress/sign`` endpoint for + backend-attested EgressReceipts. v0 is local-signing only. + +Dual hash convention: + - ``payload_digest_hex = sha256(body).hex()`` — plain 64 hex chars, + used in ``egress_receipt/1.payload_hash``. + - ``runtime_payload_hash = "sha256:" + payload_digest_hex`` — prefixed + form required by Runtime Gate's request schema (matches the pattern + ``^sha256:[0-9a-f]{64}$``). + +Schema and signer/verifier here intentionally duplicate the v0 contract +shipped in the AVP backend (``app/core/egress_control/proof.py``). The +sibling lives in the AVP repo; future consolidation may extract a shared +``agentveil-protocol`` package. +""" + +from __future__ import annotations + +import hashlib +import json +import re +import time +import uuid +from collections.abc import Collection, Mapping +from dataclasses import dataclass +from typing import Any, Callable, Optional + +import base58 +import httpx +import jcs +from nacl.signing import SigningKey, VerifyKey + +SCHEMA_VERSION = "egress_receipt/1" +EVALUATOR_VERSION = "egress-control/0.1.0" +ACTION_NETWORK_EGRESS = "network.egress" +PROOF_TYPE = "DataIntegrityProof" +CRYPTOSUITE = "eddsa-jcs-2022" + +ED25519_MULTICODEC = bytes([0xED, 0x01]) + +_REQUIRED_STRING_FIELDS = ( + "receipt_id", + "agent_did", + "action", + "destination", + "protocol", + "method", + "credential_or_principal_class", + "payload_hash", + "policy_id", + "decision", + "outcome", + "evaluator_version", +) + +_SHA256_HEX_RE = re.compile(r"^[0-9a-fA-F]{64}$") +_RUNTIME_DECISIONS = frozenset({"ALLOW", "BLOCK", "WAITING_FOR_HUMAN_APPROVAL"}) + + +class EgressReceiptProofError(ValueError): + """Raised when an unsigned EgressReceipt body is malformed.""" + + +class EgressReceiptVerificationError(ValueError): + """Raised when a signed EgressReceipt fails offline verification.""" + + +class EgressPolicyViolationError(RuntimeError): + """Raised when Runtime Gate returns an unparseable response.""" + + +@dataclass(frozen=True) +class ControlledEgressOutcome: + """Result of an ``AVPAgent.controlled_egress(...)`` call. + + ``status`` values: + - ``sent`` — Runtime Gate returned ALLOW and the helper completed + the HTTPS send. Receipt is agent-signed with ``outcome=SENT``. + - ``blocked`` — Runtime Gate returned BLOCK. Helper did NOT open + the connection. Receipt is agent-signed with ``outcome=BLOCKED``. + - ``approval_required`` — Runtime Gate returned + ``WAITING_FOR_HUMAN_APPROVAL``. Helper did NOT open the + connection. No receipt is emitted in v0.1. + - ``failed`` — Runtime Gate returned ALLOW but the helper's HTTP + send raised. Receipt is agent-signed with ``outcome=FAILED``. + The sanitized error class is captured; raw exception messages + and tracebacks are not stored. + """ + + status: str + decision: dict[str, Any] + receipt_jcs: Optional[str] + receipt: Optional[dict[str, Any]] + audit_id: Optional[str] + send_result: Optional[dict[str, Any]] + error_class: Optional[str] = None + + +def _did_from_public_key(public_key: bytes) -> str: + multicodec_key = ED25519_MULTICODEC + public_key + return "did:key:z" + base58.b58encode(multicodec_key).decode("ascii") + + +def _public_key_from_did(did: str) -> bytes: + if not isinstance(did, str) or not did.startswith("did:key:z"): + raise EgressReceiptVerificationError("signer DID must be did:key") + try: + decoded = base58.b58decode(did[len("did:key:z"):]) + except Exception as exc: + raise EgressReceiptVerificationError("signer DID is not valid base58") from exc + if len(decoded) < 2 or decoded[:2] != ED25519_MULTICODEC: + raise EgressReceiptVerificationError("signer DID is not Ed25519 did:key") + public_key = decoded[2:] + if len(public_key) != 32: + raise EgressReceiptVerificationError( + "signer DID has invalid Ed25519 public key" + ) + return public_key + + +def _validate_body(body: dict[str, Any]) -> None: + if not isinstance(body, dict): + raise EgressReceiptProofError("body must be a dict") + if "proof" in body: + raise EgressReceiptProofError( + "body must not include 'proof'; pass unsigned fields" + ) + if body.get("schema_version") != SCHEMA_VERSION: + raise EgressReceiptProofError( + f"body must include schema_version={SCHEMA_VERSION!r}" + ) + for field_name in _REQUIRED_STRING_FIELDS: + if not isinstance(body.get(field_name), str) or not body[field_name]: + raise EgressReceiptProofError( + f"body must include non-empty string field {field_name!r}" + ) + if body["action"] != ACTION_NETWORK_EGRESS: + raise EgressReceiptProofError( + f"body action must be {ACTION_NETWORK_EGRESS!r}" + ) + if body["evaluator_version"] != EVALUATOR_VERSION: + raise EgressReceiptProofError( + f"body evaluator_version must be {EVALUATOR_VERSION!r}" + ) + if not _SHA256_HEX_RE.match(body["payload_hash"]): + raise EgressReceiptProofError( + "payload_hash must be a 64-character SHA-256 hex digest" + ) + if "approval_id" not in body: + raise EgressReceiptProofError("body must include 'approval_id'") + if body["approval_id"] is not None and ( + not isinstance(body["approval_id"], str) or not body["approval_id"] + ): + raise EgressReceiptProofError( + "approval_id must be a non-empty string or null" + ) + + +def sign_egress_receipt(*, body: dict[str, Any], signing_seed: bytes) -> str: + """Sign an EgressReceipt body and return canonical JCS text. + + The receipt is signed with the agent's identity (not the backend's). + The body's ``agent_did`` MUST equal the DID derived from + ``signing_seed`` — agent-signed receipts must self-attest, and any + mismatch would break offline audit by allowing one identity to + sign a receipt claiming another identity performed the egress. + """ + + _validate_body(body) + if not isinstance(signing_seed, (bytes, bytearray)) or len(signing_seed) != 32: + raise EgressReceiptProofError("signing_seed must be 32 bytes") + signing_key = SigningKey(bytes(signing_seed)) + issuer_pubkey = bytes(signing_key.verify_key) + issuer_did = _did_from_public_key(issuer_pubkey) + if body["agent_did"] != issuer_did: + raise EgressReceiptProofError( + "body.agent_did must equal the DID derived from signing_seed" + ) + canonical_body = jcs.canonicalize(body) + signature = signing_key.sign(canonical_body).signature + proof_value = "z" + base58.b58encode(signature).decode("ascii") + verification_method = f"{issuer_did}#{issuer_did[len('did:key:'):]}" + signed: dict[str, Any] = { + **body, + "proof": { + "type": PROOF_TYPE, + "cryptosuite": CRYPTOSUITE, + "verificationMethod": verification_method, + "proofValue": proof_value, + }, + } + return jcs.canonicalize(signed).decode("utf-8") + + +def verify_egress_receipt( + receipt_jcs: str, + *, + trusted_signer_dids: Optional[Collection[str]] = None, +) -> dict[str, Any]: + """Verify an agent-signed EgressReceipt offline. + + The receipt is signed by the agent's identity (not the AVP backend). + Callers should pin the agent DID(s) they trust via + ``trusted_signer_dids``. + """ + + if not isinstance(receipt_jcs, str) or not receipt_jcs: + raise EgressReceiptVerificationError("receipt_jcs must be a non-empty string") + try: + receipt = json.loads(receipt_jcs) + except json.JSONDecodeError as exc: + raise EgressReceiptVerificationError("receipt is not valid JSON") from exc + if not isinstance(receipt, dict): + raise EgressReceiptVerificationError("receipt must be a JSON object") + + proof = receipt.pop("proof", None) + if not isinstance(proof, dict): + raise EgressReceiptVerificationError("receipt proof missing") + if proof.get("type") != PROOF_TYPE: + raise EgressReceiptVerificationError("receipt proof type unsupported") + if proof.get("cryptosuite") != CRYPTOSUITE: + raise EgressReceiptVerificationError("receipt cryptosuite unsupported") + + verification_method = proof.get("verificationMethod") + proof_value = proof.get("proofValue") + if not isinstance(verification_method, str) or "#" not in verification_method: + raise EgressReceiptVerificationError( + "receipt verification method invalid" + ) + if not isinstance(proof_value, str) or not proof_value.startswith("z"): + raise EgressReceiptVerificationError("receipt proof value invalid") + + signer_did = verification_method.split("#", 1)[0] + if trusted_signer_dids is not None and signer_did not in set(trusted_signer_dids): + raise EgressReceiptVerificationError("receipt signer is not trusted") + + public_key = _public_key_from_did(signer_did) + try: + signature = base58.b58decode(proof_value[1:]) + VerifyKey(public_key).verify(jcs.canonicalize(receipt), signature) + except Exception as exc: + raise EgressReceiptVerificationError("receipt signature invalid") from exc + + try: + _validate_body(receipt) + except EgressReceiptProofError as exc: + raise EgressReceiptVerificationError(str(exc)) from exc + # Bind body.agent_did to the proof signer DID. Without this check a + # receipt signed by one agent but claiming another agent_did would + # verify and misattribute egress in offline audit. + if receipt.get("agent_did") != signer_did: + raise EgressReceiptVerificationError( + "receipt agent_did does not match proof signer DID" + ) + return receipt + + +def _sanitize_error_class(exc: BaseException) -> str: + """Return only the exception class name; never the message or traceback.""" + return type(exc).__name__ + + +def _coerce_decision(response: Any) -> dict[str, Any]: + if not isinstance(response, Mapping): + raise EgressPolicyViolationError("runtime gate response is not a mapping") + decision = response.get("decision") + if not isinstance(decision, str) or decision not in _RUNTIME_DECISIONS: + raise EgressPolicyViolationError( + "runtime gate decision missing or unsupported" + ) + return dict(response) + + +def _sign_outcome_receipt( + *, + agent_did: str, + destination: str, + method: str, + credential_or_principal_class: str, + payload_digest_hex: str, + policy_id: str, + decision_value: str, + outcome: str, + signing_seed: bytes, +) -> str: + """Build and sign the agent-signed ``egress_receipt/1`` body.""" + + body: dict[str, Any] = { + "schema_version": SCHEMA_VERSION, + "receipt_id": f"urn:uuid:{uuid.uuid4()}", + "agent_did": agent_did, + "action": ACTION_NETWORK_EGRESS, + "destination": destination, + "protocol": "https", + "method": method, + "credential_or_principal_class": credential_or_principal_class, + "payload_hash": payload_digest_hex, + "policy_id": policy_id, + "decision": decision_value, + "approval_id": None, + "outcome": outcome, + "evaluator_version": EVALUATOR_VERSION, + } + return sign_egress_receipt(body=body, signing_seed=signing_seed) + + +def controlled_egress( + *, + agent: Any, + host: str, + port: int, + method: str, + path: str, + headers: Optional[Mapping[str, str]], + body: bytes, + credential_or_principal_class: str, + policy_id: str, + delegation_receipt: Mapping[str, Any], + timeout_seconds: float = 30.0, + _http_client_factory: Optional[Callable[..., Any]] = None, +) -> ControlledEgressOutcome: + """Perform an HTTPS egress through the AVP-controlled boundary. + + The helper computes the payload digest in two forms: + + - ``payload_digest_hex`` (plain 64 hex chars) is recorded in the + ``egress_receipt/1`` body's ``payload_hash`` field. + - ``runtime_payload_hash`` (``"sha256:" + payload_digest_hex``) is + sent to ``/v1/runtime/evaluate`` per the Runtime Gate request + schema. + + On ALLOW the helper performs the HTTPS request itself via an + embedded ``httpx.Client`` and signs an **agent-signed** + ``egress_receipt/1`` with ``outcome=SENT`` (or ``FAILED`` on + connection-class error). On BLOCK the helper signs ``outcome=BLOCKED`` + and does NOT open the connection. On ``WAITING_FOR_HUMAN_APPROVAL`` + the helper returns ``status="approval_required"`` with no receipt; + the connection is not opened. + + The receipt is **agent-signed**, not backend-signed. Callers verify + against the agent's DID via ``verify_egress_receipt(..., + trusted_signer_dids=[agent.did])``. + + ``_http_client_factory`` is a private hook for tests; production + callers should leave it unset so the helper uses ``httpx.Client``. + """ + + if not isinstance(body, (bytes, bytearray)): + raise EgressReceiptProofError("body must be bytes") + if not isinstance(host, str) or not host: + raise EgressReceiptProofError("host must be a non-empty string") + if not isinstance(port, int) or port <= 0 or port > 65535: + raise EgressReceiptProofError("port must be a TCP port in 1..65535") + if not isinstance(method, str) or not method: + raise EgressReceiptProofError("method must be a non-empty string") + if not isinstance(path, str) or not path.startswith("/"): + raise EgressReceiptProofError("path must be a string starting with '/'") + if ( + not isinstance(credential_or_principal_class, str) + or not credential_or_principal_class + ): + raise EgressReceiptProofError( + "credential_or_principal_class must be a non-empty string" + ) + if not isinstance(policy_id, str) or not policy_id: + raise EgressReceiptProofError("policy_id must be a non-empty string") + if not isinstance(delegation_receipt, Mapping): + raise EgressReceiptProofError("delegation_receipt must be a mapping") + + body_bytes = bytes(body) + payload_digest_hex = hashlib.sha256(body_bytes).hexdigest() + runtime_payload_hash = f"sha256:{payload_digest_hex}" + destination = f"{host}:{port}" + normalized_method = method.upper() + + try: + gate_response = agent.runtime_evaluate( + action=ACTION_NETWORK_EGRESS, + resource=destination, + environment="production", + delegation_receipt=dict(delegation_receipt), + payload_hash=runtime_payload_hash, + risk_class="write", + ) + except Exception as exc: + raise EgressPolicyViolationError( + f"runtime gate call failed: {_sanitize_error_class(exc)}" + ) from exc + + decision = _coerce_decision(gate_response) + decision_value = decision["decision"] + audit_id_raw = decision.get("audit_id") + audit_id = audit_id_raw if isinstance(audit_id_raw, str) else None + + if decision_value == "WAITING_FOR_HUMAN_APPROVAL": + return ControlledEgressOutcome( + status="approval_required", + decision=decision, + receipt_jcs=None, + receipt=None, + audit_id=audit_id, + send_result=None, + ) + + if decision_value == "BLOCK": + receipt_jcs = _sign_outcome_receipt( + agent_did=agent.did, + destination=destination, + method=normalized_method, + credential_or_principal_class=credential_or_principal_class, + payload_digest_hex=payload_digest_hex, + policy_id=policy_id, + decision_value="BLOCK", + outcome="BLOCKED", + signing_seed=agent._private_key, + ) + return ControlledEgressOutcome( + status="blocked", + decision=decision, + receipt_jcs=receipt_jcs, + receipt=json.loads(receipt_jcs), + audit_id=audit_id, + send_result=None, + ) + + # ALLOW path. Helper owns the HTTPS send. + client_factory = _http_client_factory or httpx.Client + started_at = time.monotonic() + try: + with client_factory(timeout=timeout_seconds) as client: + response = client.request( + method=normalized_method, + url=f"https://{destination}{path}", + headers=dict(headers) if headers is not None else None, + content=body_bytes, + ) + except Exception as exc: + elapsed = time.monotonic() - started_at + error_class = _sanitize_error_class(exc) + receipt_jcs = _sign_outcome_receipt( + agent_did=agent.did, + destination=destination, + method=normalized_method, + credential_or_principal_class=credential_or_principal_class, + payload_digest_hex=payload_digest_hex, + policy_id=policy_id, + decision_value="ALLOW", + outcome="FAILED", + signing_seed=agent._private_key, + ) + return ControlledEgressOutcome( + status="failed", + decision=decision, + receipt_jcs=receipt_jcs, + receipt=json.loads(receipt_jcs), + audit_id=audit_id, + send_result={"elapsed_seconds": round(elapsed, 6)}, + error_class=error_class, + ) + + elapsed = time.monotonic() - started_at + receipt_jcs = _sign_outcome_receipt( + agent_did=agent.did, + destination=destination, + method=normalized_method, + credential_or_principal_class=credential_or_principal_class, + payload_digest_hex=payload_digest_hex, + policy_id=policy_id, + decision_value="ALLOW", + outcome="SENT", + signing_seed=agent._private_key, + ) + return ControlledEgressOutcome( + status="sent", + decision=decision, + receipt_jcs=receipt_jcs, + receipt=json.loads(receipt_jcs), + audit_id=audit_id, + send_result={ + "status_code": int(getattr(response, "status_code", 0)), + "elapsed_seconds": round(elapsed, 6), + }, + ) + + +__all__ = [ + "ACTION_NETWORK_EGRESS", + "CRYPTOSUITE", + "EVALUATOR_VERSION", + "PROOF_TYPE", + "SCHEMA_VERSION", + "ControlledEgressOutcome", + "EgressPolicyViolationError", + "EgressReceiptProofError", + "EgressReceiptVerificationError", + "controlled_egress", + "sign_egress_receipt", + "verify_egress_receipt", +] diff --git a/tests/test_egress_sdk_wrapper.py b/tests/test_egress_sdk_wrapper.py new file mode 100644 index 0000000..85d7a71 --- /dev/null +++ b/tests/test_egress_sdk_wrapper.py @@ -0,0 +1,420 @@ +"""Unit coverage for ``AVPAgent.controlled_egress`` and agent-signed +``egress_receipt/1`` round-trip. + +Tests are stub-driven: ``AVPAgent.runtime_evaluate`` is patched per test +to control the backend's response, and an in-process HTTP client factory +is injected via ``_http_client_factory``. No real network IO. + +These tests exercise the SDK helper's own state machine. They do NOT +claim product-real backend ALLOW/BLOCK behavior — that is PR-A's +concern. +""" + +from __future__ import annotations + +import hashlib +import json +from typing import Any, Optional +from unittest.mock import patch + +import httpx +import jcs +import pytest +from nacl.signing import SigningKey + +import base58 + +from agentveil import ( + AVPAgent, + ControlledEgressOutcome, + EgressReceiptProofError, + EgressReceiptVerificationError, + sign_egress_receipt, + verify_egress_receipt, +) +from agentveil.egress import ( + ACTION_NETWORK_EGRESS, + CRYPTOSUITE, + ED25519_MULTICODEC, + EVALUATOR_VERSION, + PROOF_TYPE, + SCHEMA_VERSION, +) + + +_DELEGATION_RECEIPT = {"id": "urn:uuid:delegation"} +_POLICY_ID = "policy-egress-payments-v0" +_CREDENTIAL_CLASS = "payment_processor_api_key" +_HOST = "api.stripe.com" +_PORT = 443 +_METHOD = "POST" +_PATH = "/v1/charges" +_BODY = b'{"amount": 1000, "currency": "usd"}' + + +def _make_agent() -> AVPAgent: + sk = SigningKey.generate() + return AVPAgent( + "http://localhost:8000", + bytes(sk), + name="egress-test", + timeout=1.0, + ) + + +class _StubHttpResponse: + def __init__(self, status_code: int = 200): + self.status_code = status_code + + +class _StubHttpClient: + """Records the single ``request(...)`` call shape and returns a stub.""" + + def __init__( + self, + *, + response: Optional[_StubHttpResponse] = None, + raises: Optional[BaseException] = None, + ): + self.response = response or _StubHttpResponse() + self.raises = raises + self.last_call: Optional[dict[str, Any]] = None + self.call_count = 0 + + def __enter__(self) -> "_StubHttpClient": + return self + + def __exit__(self, *_args: object) -> None: + return None + + def request( + self, + *, + method: str, + url: str, + headers: Any = None, + content: Any = None, + ) -> _StubHttpResponse: + self.call_count += 1 + self.last_call = { + "method": method, + "url": url, + "headers": headers, + "content": content, + } + if self.raises is not None: + raise self.raises + return self.response + + +def _stub_factory(stub: _StubHttpClient): + def _factory(**_kwargs: Any) -> _StubHttpClient: + return stub + + return _factory + + +def _allow_decision(audit_id: str = "urn:uuid:audit-allow") -> dict[str, Any]: + return { + "audit_id": audit_id, + "decision": "ALLOW", + "reason": "write_action_within_scope", + } + + +def _block_decision(audit_id: str = "urn:uuid:audit-block") -> dict[str, Any]: + return { + "audit_id": audit_id, + "decision": "BLOCK", + "reason": "category_not_allowed", + } + + +def _waiting_decision(audit_id: str = "urn:uuid:audit-waiting") -> dict[str, Any]: + return { + "audit_id": audit_id, + "decision": "WAITING_FOR_HUMAN_APPROVAL", + "reason": "destructive_production_action", + } + + +def _call_controlled_egress( + agent: AVPAgent, + *, + http_factory, + body: bytes = _BODY, +) -> ControlledEgressOutcome: + from agentveil.egress import controlled_egress as _controlled_egress + + return _controlled_egress( + agent=agent, + host=_HOST, + port=_PORT, + method=_METHOD, + path=_PATH, + headers={"Authorization": "Bearer test", "Content-Type": "application/json"}, + body=body, + credential_or_principal_class=_CREDENTIAL_CLASS, + policy_id=_POLICY_ID, + delegation_receipt=_DELEGATION_RECEIPT, + _http_client_factory=http_factory, + ) + + +def test_allow_path_signs_receipt_and_sends_via_helper(): + agent = _make_agent() + stub = _StubHttpClient(response=_StubHttpResponse(status_code=200)) + + with patch.object(agent, "runtime_evaluate", return_value=_allow_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + assert outcome.status == "sent" + assert outcome.receipt is not None + assert outcome.receipt["decision"] == "ALLOW" + assert outcome.receipt["outcome"] == "SENT" + assert outcome.receipt["agent_did"] == agent.did + assert outcome.receipt["destination"] == f"{_HOST}:{_PORT}" + assert outcome.receipt["action"] == ACTION_NETWORK_EGRESS + assert outcome.receipt["protocol"] == "https" + assert outcome.receipt["method"] == "POST" + assert outcome.audit_id == "urn:uuid:audit-allow" + assert outcome.send_result == { + "status_code": 200, + "elapsed_seconds": outcome.send_result["elapsed_seconds"], + } + + # Receipt verifies offline against the agent's DID. The receipt is + # agent-signed, not backend-signed. + verified = verify_egress_receipt( + outcome.receipt_jcs, trusted_signer_dids=[agent.did] + ) + assert verified["agent_did"] == agent.did + assert verified["decision"] == "ALLOW" + assert verified["outcome"] == "SENT" + + # Helper actually opened the connection. + assert stub.call_count == 1 + assert stub.last_call is not None + assert stub.last_call["method"] == "POST" + assert stub.last_call["url"] == f"https://{_HOST}:{_PORT}{_PATH}" + assert stub.last_call["content"] == _BODY + + +def test_block_path_does_not_send_and_signs_blocked_receipt(): + agent = _make_agent() + stub = _StubHttpClient() + + with patch.object(agent, "runtime_evaluate", return_value=_block_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + assert outcome.status == "blocked" + assert stub.call_count == 0 + assert outcome.receipt is not None + assert outcome.receipt["decision"] == "BLOCK" + assert outcome.receipt["outcome"] == "BLOCKED" + assert outcome.send_result is None + + verified = verify_egress_receipt( + outcome.receipt_jcs, trusted_signer_dids=[agent.did] + ) + assert verified["decision"] == "BLOCK" + assert verified["outcome"] == "BLOCKED" + + +def test_waiting_path_does_not_send_and_emits_no_receipt(): + agent = _make_agent() + stub = _StubHttpClient() + + with patch.object(agent, "runtime_evaluate", return_value=_waiting_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + assert outcome.status == "approval_required" + assert outcome.receipt is None + assert outcome.receipt_jcs is None + assert stub.call_count == 0 + + +def test_allow_path_with_connect_error_emits_failed_receipt_without_raw_message(): + agent = _make_agent() + raw_secret_text = "private socket detail with token sk_live_DO_NOT_LEAK" + stub = _StubHttpClient(raises=httpx.ConnectError(raw_secret_text)) + + with patch.object(agent, "runtime_evaluate", return_value=_allow_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + assert outcome.status == "failed" + assert outcome.receipt is not None + assert outcome.receipt["decision"] == "ALLOW" + assert outcome.receipt["outcome"] == "FAILED" + assert outcome.error_class == "ConnectError" + # Sanitization: no raw exception message anywhere in the receipt JCS or + # in the outcome's serialized fields. + assert raw_secret_text not in outcome.receipt_jcs + assert raw_secret_text not in repr(outcome) + assert stub.call_count == 1 + + +def test_dual_hash_runtime_gate_prefixed_receipt_plain(): + agent = _make_agent() + body = b"hello dual-hash" + expected_digest = hashlib.sha256(body).hexdigest() + expected_runtime_payload_hash = f"sha256:{expected_digest}" + stub = _StubHttpClient() + + with patch.object( + agent, "runtime_evaluate", return_value=_allow_decision() + ) as eval_mock: + outcome = _call_controlled_egress( + agent, http_factory=_stub_factory(stub), body=body + ) + + eval_mock.assert_called_once() + call_kwargs = eval_mock.call_args.kwargs + assert call_kwargs["action"] == ACTION_NETWORK_EGRESS + assert call_kwargs["payload_hash"] == expected_runtime_payload_hash + + assert outcome.receipt is not None + assert outcome.receipt["payload_hash"] == expected_digest + # Plain form, no "sha256:" prefix in the receipt body. + assert ":" not in outcome.receipt["payload_hash"] + assert len(outcome.receipt["payload_hash"]) == 64 + + +def test_tampered_receipt_fails_signature_verification(): + agent = _make_agent() + stub = _StubHttpClient() + + with patch.object(agent, "runtime_evaluate", return_value=_allow_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + parsed = json.loads(outcome.receipt_jcs) + parsed["destination"] = "evil.example.com:443" + tampered = jcs.canonicalize(parsed).decode("utf-8") + + with pytest.raises(EgressReceiptVerificationError, match="signature invalid"): + verify_egress_receipt(tampered, trusted_signer_dids=[agent.did]) + + +def test_wrong_trusted_signer_did_fails_verification(): + agent = _make_agent() + other_did = AVPAgent( + "http://localhost:8000", bytes(SigningKey.generate()), name="other" + ).did + stub = _StubHttpClient() + + with patch.object(agent, "runtime_evaluate", return_value=_allow_decision()): + outcome = _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + with pytest.raises(EgressReceiptVerificationError, match="not trusted"): + verify_egress_receipt(outcome.receipt_jcs, trusted_signer_dids=[other_did]) + + +def test_runtime_gate_call_uses_network_egress_action_and_correct_kwargs(): + agent = _make_agent() + stub = _StubHttpClient() + + with patch.object( + agent, "runtime_evaluate", return_value=_allow_decision() + ) as eval_mock: + _call_controlled_egress(agent, http_factory=_stub_factory(stub)) + + eval_mock.assert_called_once() + call_kwargs = eval_mock.call_args.kwargs + assert call_kwargs["action"] == "network.egress" + assert call_kwargs["resource"] == f"{_HOST}:{_PORT}" + assert call_kwargs["environment"] == "production" + assert call_kwargs["risk_class"] == "write" + assert call_kwargs["payload_hash"].startswith("sha256:") + assert call_kwargs["delegation_receipt"] == _DELEGATION_RECEIPT + + +# =========================================================================== +# Agent-DID binding (signer + verifier must self-attest) +# =========================================================================== + + +def _valid_egress_body(*, agent_did: str) -> dict: + return { + "schema_version": SCHEMA_VERSION, + "receipt_id": "urn:uuid:00000000-0000-4000-8000-000000000abc", + "agent_did": agent_did, + "action": ACTION_NETWORK_EGRESS, + "destination": f"{_HOST}:{_PORT}", + "protocol": "https", + "method": "POST", + "credential_or_principal_class": _CREDENTIAL_CLASS, + "payload_hash": "a" * 64, + "policy_id": _POLICY_ID, + "decision": "ALLOW", + "approval_id": None, + "outcome": "SENT", + "evaluator_version": EVALUATOR_VERSION, + } + + +def _did_from_seed(seed: bytes) -> str: + pubkey = bytes(SigningKey(seed).verify_key) + return "did:key:z" + base58.b58encode(ED25519_MULTICODEC + pubkey).decode("ascii") + + +def _sign_egress_body_bypassing_agent_binding( + body: dict, signing_seed: bytes +) -> str: + """Local-only signer mirroring ``sign_egress_receipt`` minus the + ``agent_did``-binding check. Used only by the verifier-binding + regression test to construct a forged-attribution receipt that the + public signer would refuse to produce. + """ + signing_key = SigningKey(signing_seed) + issuer_pubkey = bytes(signing_key.verify_key) + issuer_did = "did:key:z" + base58.b58encode( + ED25519_MULTICODEC + issuer_pubkey + ).decode("ascii") + canonical_body = jcs.canonicalize(body) + signature = signing_key.sign(canonical_body).signature + proof_value = "z" + base58.b58encode(signature).decode("ascii") + verification_method = f"{issuer_did}#{issuer_did[len('did:key:'):]}" + signed = { + **body, + "proof": { + "type": PROOF_TYPE, + "cryptosuite": CRYPTOSUITE, + "verificationMethod": verification_method, + "proofValue": proof_value, + }, + } + return jcs.canonicalize(signed).decode("utf-8") + + +def test_signer_rejects_body_agent_did_mismatching_signing_seed(): + """``sign_egress_receipt`` must self-attest: body.agent_did MUST equal + the DID derived from signing_seed. Otherwise one agent could mint a + receipt claiming another agent's identity. + """ + signer_seed = bytes(SigningKey.generate()) + foreign_seed = bytes(SigningKey.generate()) + foreign_did = _did_from_seed(foreign_seed) + body = _valid_egress_body(agent_did=foreign_did) + + with pytest.raises(EgressReceiptProofError, match="agent_did"): + sign_egress_receipt(body=body, signing_seed=signer_seed) + + +def test_verifier_rejects_receipt_with_agent_did_mismatching_signer_did(): + """``verify_egress_receipt`` must reject a receipt whose signed + body's ``agent_did`` does not equal the proof signer DID. Without + this check a receipt with a valid signature but a forged + ``agent_did`` field would verify and misattribute the egress. + """ + actual_seed = bytes(SigningKey.generate()) + actual_did = _did_from_seed(actual_seed) + other_seed = bytes(SigningKey.generate()) + other_did = _did_from_seed(other_seed) + assert actual_did != other_did + + # Body declares ``other_did`` but is signed with ``actual_seed``. + body = _valid_egress_body(agent_did=other_did) + forged_jcs = _sign_egress_body_bypassing_agent_binding(body, actual_seed) + + with pytest.raises(EgressReceiptVerificationError, match="agent_did"): + verify_egress_receipt(forged_jcs, trusted_signer_dids=[actual_did])