Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agentveil_mcp_proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ script. No additional extras are required.

## Quick Start

For the full step-by-step customer cold path (install → init → doctor →
configure downstream → run → export evidence → offline verify) and the
honest list of what the bundle currently does and does not prove, see
[`docs/MCP_PROXY_QUICKSTART.md`](../docs/MCP_PROXY_QUICKSTART.md).

The short form is:

Create a local proxy identity, config, and control grant:

```bash
Expand Down Expand Up @@ -221,6 +228,8 @@ exhaustive; review patterns for your specific downstream server.
|---|---|
| `init` | Create encrypted identity, config, and control grant. |
| `doctor` | Validate local files and control grant. |
| `doctor --check-backend` | Add a read-only preflight that the backend is reachable and the proxy identity is registered. |
| `register` | Register the existing proxy identity with the configured backend. |
| `run` | Run stdio passthrough, the proxy mode used by MCP clients. |
| `reissue-grant` | Refresh the local control grant before expiry. |
| `export-evidence <path>` | Export durable evidence bundle for offline verification. |
Expand Down
64 changes: 63 additions & 1 deletion agentveil_mcp_proxy/approval/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def request_approval(
created_at=now,
expires_at=record_expires_at,
runtime_decision=None if active_similar_grant is not None else runtime_decision,
approval_token_hash=self.approval_server.token_hash,
granted_by_request_id=(
None if active_similar_grant is None else active_similar_grant.request_id
),
Expand Down Expand Up @@ -178,6 +179,45 @@ def request_approval(
)
return self._deny(request_id, "user_denied")

def record_runtime_allow(
self,
classification: ClassifiedToolCall,
*,
runtime_decision: RuntimeGateDecision,
) -> ApprovalOutcome:
"""Persist a verified Runtime Gate ALLOW decision before forwarding."""

request_id = self._write_runtime_decision_record(
classification,
runtime_decision=runtime_decision,
)
return ApprovalOutcome(
request_id,
ApprovalStatus.APPROVED.value,
"runtime_gate_allow",
)

def record_runtime_block(
self,
classification: ClassifiedToolCall,
*,
runtime_decision: RuntimeGateDecision,
) -> None:
"""Persist a verified Runtime Gate BLOCK decision as terminal evidence."""

request_id = self._write_runtime_decision_record(
classification,
runtime_decision=runtime_decision,
)
try:
self.evidence_store.transition(
request_id,
ApprovalStatus.BLOCKED.value,
error_class="runtime_gate_block",
)
except ApprovalEvidenceError as exc:
raise ApprovalFlowError("runtime decision evidence persistence failed") from exc

def record_execution_result(self, outcome: ApprovalOutcome, response: dict[str, Any]) -> None:
"""Append execution result evidence for an approved downstream call."""

Expand Down Expand Up @@ -277,6 +317,7 @@ def _pending_record(
created_at: int,
expires_at: int | None,
runtime_decision: RuntimeGateDecision | None,
approval_token_hash: str | None = None,
granted_by_request_id: str | None = None,
) -> PendingApproval:
return PendingApproval(
Expand All @@ -297,11 +338,32 @@ def _pending_record(
expires_at=expires_at,
decision_audit_id=None if runtime_decision is None else runtime_decision.audit_id,
decision_receipt_sha256=None if runtime_decision is None else runtime_decision.receipt_digest,
approval_token_hash=self.approval_server.token_hash,
approval_token_hash=approval_token_hash,
matched_policy_rule=classification.policy_evaluation.policy_rule_id,
granted_by_request_id=granted_by_request_id,
)

def _write_runtime_decision_record(
self,
classification: ClassifiedToolCall,
*,
runtime_decision: RuntimeGateDecision,
) -> str:
now = int(time.time())
request_id = str(uuid.uuid4())
record = self._pending_record(
classification,
request_id=request_id,
created_at=now,
expires_at=None,
runtime_decision=runtime_decision,
)
try:
self.evidence_store.write_pending(record)
except ApprovalEvidenceError as exc:
raise ApprovalFlowError("runtime decision evidence persistence failed") from exc
return request_id

def _prompt_for(
self,
classification: ClassifiedToolCall,
Expand Down
Loading
Loading