diff --git a/.github/workflows/phase1-ci-and-release.yml b/.github/workflows/phase1-ci-and-release.yml new file mode 100644 index 0000000..5c78aa2 --- /dev/null +++ b/.github/workflows/phase1-ci-and-release.yml @@ -0,0 +1,100 @@ +name: phase1-ci-and-release + +on: + pull_request: + push: + branches: ["main", "phase1"] + workflow_dispatch: + inputs: + publish: + description: "Run ordered publish jobs" + required: true + default: "false" + type: choice + options: ["false", "true"] + +jobs: + quality: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install dependencies + run: python -m pip install --upgrade pip pre-commit pytest + + - name: Verify package release order + run: python scripts/verify_release_order.py + + - name: Run tests + run: python -m pytest -q + + - name: Run pre-commit checks + run: pre-commit run --all-files + + publish-predicate-contracts: + runs-on: ubuntu-latest + needs: [quality] + if: github.event_name == 'workflow_dispatch' && inputs.publish == 'true' + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install build tooling + run: python -m pip install --upgrade pip build twine + + - name: Verify release order + run: python scripts/verify_release_order.py + + - name: Build predicate-contracts + run: python -m build predicate_contracts + + - name: Validate distribution metadata + run: twine check predicate_contracts/dist/* + + - name: Publish predicate-contracts to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN_PREDICATE_CONTRACTS }} + run: twine upload predicate_contracts/dist/* + + publish-predicate-authority: + runs-on: ubuntu-latest + needs: [publish-predicate-contracts] + if: github.event_name == 'workflow_dispatch' && inputs.publish == 'true' + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install build tooling + run: python -m pip install --upgrade pip build twine + + - name: Verify release order + run: python scripts/verify_release_order.py + + - name: Build predicate-authority + run: python -m build predicate_authority + + - name: Validate distribution metadata + run: twine check predicate_authority/dist/* + + - name: Publish predicate-authority to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN_PREDICATE_AUTHORITY }} + run: twine upload predicate_authority/dist/* diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..bc917e0 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,30 @@ +name: tests + +on: + pull_request: + push: + branches: ["main", "phase1"] + workflow_dispatch: + +jobs: + pytest: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.11", "3.12"] + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install test dependencies + run: python -m pip install --upgrade pip pytest + + - name: Run tests + run: python -m pytest -q diff --git a/.gitignore b/.gitignore index 0b567b9..028bd58 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,7 @@ traces/ artifacts/ tmp/ temp/ + +# Local build artifacts from package-level builds +predicate_authority/predicate_authority/ +predicate_contracts/predicate_contracts/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a184f48..3549b8d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -# Pre-commit hooks for AgentIdentity repository +# Pre-commit hooks for predicate-authority repository # Baseline adapted from /Code/Sentience/sdk-python/.pre-commit-config.yaml repos: diff --git a/Makefile b/Makefile index bdba521..3aa3042 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: hooks lint format format-python format-docs lint-docs +.PHONY: hooks lint test examples verify-release-order build-packages format format-python format-docs lint-docs hooks: pre-commit install @@ -6,6 +6,21 @@ hooks: lint: pre-commit run --all-files +test: + python -m pytest -q + +examples: + PYTHONPATH=. python examples/browser_guard_example.py + PYTHONPATH=. python examples/mcp_tool_guard_example.py + PYTHONPATH=. python examples/outbound_http_guard_example.py + +verify-release-order: + python scripts/verify_release_order.py + +build-packages: + python -m build predicate_contracts + python -m build predicate_authority + format: format-python format-docs format-python: diff --git a/README.md b/README.md new file mode 100644 index 0000000..2d049ea --- /dev/null +++ b/README.md @@ -0,0 +1,149 @@ +# Predicate Authority + +**Deterministic Authority for AI Agents: Secure the "Confused Deputy" with your existing Identity stack.** + +[![License](https://img.shields.io/badge/License-MIT%2FApache--2.0-blue.svg)](LICENSE) +[![PyPI - predicate-authority](https://img.shields.io/pypi/v/predicate-authority.svg)](https://pypi.org/project/predicate-authority/) +[![PyPI - predicate-contracts](https://img.shields.io/pypi/v/predicate-contracts.svg)](https://pypi.org/project/predicate-contracts/) + +`predicate-authority` is a production-grade pre-execution authority layer that binds AI agent identity to deterministic state. It bridges standard IdPs (Entra ID, Okta, OIDC) with runtime verification so every sensitive action is authorized, bounded, and provable. + +## Why Predicate Authority? + +Most agent security fails because it relies on static API keys or broad permissions. Predicate introduces short-lived mandates that are cryptographically tied to: + +- `state_hash` (what state the agent is in), +- `intent_hash` (what action it intends to perform), +- policy constraints and required verification labels. + +This closes the confused-deputy gap where an agent can misuse delegated credentials. + +- **Bridge, don't replace**: leverage existing enterprise identity and governance. +- **Fail-closed by design**: deny before execution when state/intent/policy checks fail. +- **Deterministic binding**: authority is tied to runtime evidence, not only identity. +- **Provable controls**: each decision can emit signed proof events for audit pipelines. + +### Why not just use IdP directly? + +You should still use Entra/Okta/OIDC for identity and token issuance. `predicate-authority` adds the runtime control layer those systems do not provide by default for AI agents: + +- pre-execution allow/deny checks right before each sensitive action, +- binding authority to current `state_hash` and `intent_hash`, +- optional required verification labels from runtime checks (currently web-agent only via [predicate-sdk](https://github.com/PredicateSystems/sdk-python) integration), +- fail-closed local enforcement and per-decision proof events. + +In practice: IdP answers **who the principal is**, while `predicate-authority` answers **whether this exact action is allowed right now in this state**. + +## Repository Components + +| Package | Purpose | +| --- | --- | +| `predicate_contracts` | Shared typed contracts and protocols (`ActionRequest`, `PolicyRule`, evidence, decision/proof models). | +| `predicate_authority` | Runtime authorization engine (`PolicyEngine`, `ActionGuard`, mandate signing, proof ledger, telemetry emitter). | +| `examples/` | Browser/MCP/HTTP guard examples using the local Phase 1 runtime. | + +## Phase 1 Status + +Implemented in this repository: + +- local pre-execution `ActionGuard.authorize(...)` and `enforce(...)`, +- signed local mandates with TTL (`LocalMandateSigner`), +- policy evaluation with deny precedence and required verification labels, +- typed [predicate-sdk](https://github.com/PredicateSystems/sdk-python) integration adapter (`predicate_authority.integrations`), +- OpenTelemetry-compatible trace emitter (`OpenTelemetryTraceEmitter`), +- pytest coverage for core authorization, mandate, integration, and telemetry flows. + +Planned in upcoming phases: + +- `predicate-authorityd` sidecar for token lifecycle and local kill-switch, +- enterprise IdP bridge hardening (Entra/Okta/OIDC adapters), +- hosted governance control plane. + +## Installation + +```bash +pip install predicate-authority +``` + +For shared contracts directly: + +```bash +pip install predicate-contracts +``` + +## Quick Start (Phase 1 API) + +```python +from predicate_authority import ActionGuard, InMemoryProofLedger, LocalMandateSigner, PolicyEngine +from predicate_contracts import ( + ActionRequest, + ActionSpec, + PolicyEffect, + PolicyRule, + PrincipalRef, + StateEvidence, + VerificationEvidence, +) + +guard = ActionGuard( + policy_engine=PolicyEngine( + rules=( + PolicyRule( + name="allow-payment-submit", + effect=PolicyEffect.ALLOW, + principals=("agent:payments",), + actions=("http.post",), + resources=("https://finance.example.com/transfers",), + ), + ) + ), + mandate_signer=LocalMandateSigner(secret_key="dev-secret"), + proof_ledger=InMemoryProofLedger(), +) + +request = ActionRequest( + principal=PrincipalRef(principal_id="agent:payments"), + action_spec=ActionSpec( + action="http.post", + resource="https://finance.example.com/transfers", + intent="submit transfer request #1234", + ), + state_evidence=StateEvidence(source="backend", state_hash="state-hash-abc"), + verification_evidence=VerificationEvidence(), +) + +decision = guard.authorize(request) +if not decision.allowed: + raise RuntimeError(f"Authority denied: {decision.reason.value}") +``` + +See runnable examples in: + +- `examples/browser_guard_example.py` +- `examples/mcp_tool_guard_example.py` +- `examples/outbound_http_guard_example.py` + +## Security: Local Kill-Switch Path + +The current Phase 1 runtime supports fail-closed checks and local proof emission. The sidecar model (`predicate-authorityd`) is planned to provide instant local revocation and managed token lifecycle for long-running production agents. + +## Release + +- CI workflow: `.github/workflows/phase1-ci-and-release.yml` +- Release guide: `docs/pypi-release-guide.md` + +Publish order is always: + +1. `predicate-contracts` +2. `predicate-authority` + +## License + +Dual-licensed under **MIT** and **Apache 2.0**: + +- `LICENSE-MIT` +- `LICENSE-APACHE` + +--- + +Copyright (c) 2026 Predicate Systems Inc. diff --git a/docs/better-sdk-opportunity-proposal.md b/docs/better-sdk-opportunity-proposal.md index e47dbcd..de1a54c 100644 --- a/docs/better-sdk-opportunity-proposal.md +++ b/docs/better-sdk-opportunity-proposal.md @@ -15,6 +15,18 @@ This proposal answers the "Better SDK Opportunity" in `northstar.md` by combinin - Caracal's strongest ideas (short-lived mandates, scope checks, fail-closed gateway-style enforcement, immutable authority ledger), - A bridge-first strategy (works with Azure AD/Okta/Auth0 and existing agent stacks). +## Progress Dashboard + +Status snapshot date: 2026-02-16 + +| Phase | Status | ETA | Owner | +| --- | --- | --- | --- | +| Phase 0: Architecture and Spec Lock | Partially complete | 1-2 weeks total (remaining: sign-off + schema/process formalization) | SDK + Platform + Security | +| Phase 1: Local SDK Guard (MVP) | In progress | 3-5 weeks total (remaining: `sdk-python` hooks + OTel export + examples + CI publish flow) | SDK | +| Phase 2: Sidecar + Identity Bridge | Not started (design only) | 4-6 weeks | Platform + Identity | +| Phase 3: Hosted Governance Control Plane | Not started (design only) | 6-8 weeks | Platform + Product | +| Phase 4: Enterprise Hardening and Scale | Not started (design only) | Ongoing (first 4-6 weeks) | Platform + Security + GTM | + ## TL;DR Design @@ -511,6 +523,19 @@ async def web_search_tool(query: str): - Basic policy DSL. - Trace/proof event emission to existing tracer. +Status (as of 2026-02-16): **in progress (MVP scaffold implemented in this `predicate-authority` repository)** + +- Completed in repo: + - `predicate-contracts` package scaffold with typed contracts and protocols. + - `predicate-authority` local `ActionGuard.authorize(...)` + `enforce(...)`. + - Signed local mandates with TTL + verification. + - Local policy evaluation and normalized deny reasons. + - In-memory proof ledger with optional trace emitter interface. + - pytest coverage for policy, mandate signing, and proof emission paths. +- Remaining to close full Phase 1 exit: + - connect CI publish jobs to real package build/publish steps and credentials, + - publish first `predicate-contracts` and `predicate-authority` versions in dependency order. + ## Phase 2: Sidecar and IdP bridge (4-8 weeks) - `predicate-authorityd`. @@ -609,6 +634,14 @@ Exit criteria: - compatibility mapping to existing `sdk-python` step lifecycle approved. - release orchestration design approved for multi-package PyPI publishing (`predicate-contracts` then `predicate-authority`). +Current status: **partially complete** + +- [x] dependency graph/import boundaries documented in this proposal. +- [x] package scaffolding started in this `predicate-authority` repository (`predicate-contracts`, `predicate-authority`). +- [ ] formal design sign-off from SDK/platform/security. +- [ ] versioned schema docs publication process. +- [ ] approved compatibility mapping with `sdk-python` lifecycle owners. + ## Phase 1: Local SDK Guard (MVP) (3-5 weeks) Objective: deliver immediate value with in-process pre-execution authority. @@ -632,6 +665,18 @@ Exit criteria: - developer quickstart validated end-to-end on local-only mode. - CI release pipeline can publish and verify `predicate-contracts` and `predicate-authority` in dependency order. +Current status: **in progress** + +- [x] local `ActionGuard.authorize(...)`. +- [x] signed local mandates. +- [x] local policy evaluation. +- [x] fail-closed deny path with normalized reason enums. +- [x] deterministic regression tests for authorize/deny paths. +- [x] `sdk-python` runtime integration hooks (typed adapter path). +- [x] OpenTelemetry-native authority event export. +- [x] quickstart/examples for browser/MCP/outbound HTTP. +- [x] dependency-ordered package publish pipeline in CI (workflow scaffold). + ## Phase 2: Sidecar + Identity Bridge (4-6 weeks) Objective: production-ready token lifecycle and enterprise identity compatibility. @@ -657,6 +702,8 @@ Exit criteria: - bridge token exchange validated against at least one enterprise IdP. - sidecar survives restart/network partition with fail-closed guarantees. +Current status: **not started (design only)** + ## Phase 3: Hosted Governance Control Plane (6-8 weeks) Objective: ship monetizable cloud governance capabilities. @@ -675,6 +722,8 @@ Exit criteria: - kill-switch propagation meets incident response target. - billable usage pipeline reconciles authority + snapshot credits accurately. +Current status: **not started (design only)** + ## Phase 4: Enterprise Hardening and Scale (ongoing, first 4-6 weeks) Objective: make it enterprise-ready for regulated production. @@ -693,6 +742,8 @@ Exit criteria: - defined SLOs met in staging/load tests. - enterprise onboarding playbook validated with pilot accounts. +Current status: **not started (design only)** + ## Cross-Phase Dependencies - `sdk-python` runtime contract stability (snapshot schema, assertion labels, step metadata). diff --git a/docs/pypi-release-guide.md b/docs/pypi-release-guide.md new file mode 100644 index 0000000..4c61c87 --- /dev/null +++ b/docs/pypi-release-guide.md @@ -0,0 +1,74 @@ +# PyPI Release Guide + +This repo publishes two Python packages in strict order: + +1. `predicate-contracts` +2. `predicate-authority` (depends on `predicate-contracts`) + +## 1) One-time setup + +### Reserve package names on PyPI + +Ensure both package names exist under your organization: + +- `predicate-contracts` +- `predicate-authority` + +### Add GitHub repository secrets + +In GitHub repository settings -> Secrets and variables -> Actions, add: + +- `PYPI_TOKEN_PREDICATE_CONTRACTS` +- `PYPI_TOKEN_PREDICATE_AUTHORITY` + +Use PyPI API tokens scoped to each package where possible. + +## 2) Prepare a release + +1. Update versions: + - `predicate_contracts/pyproject.toml` -> `project.version` + - `predicate_authority/pyproject.toml` -> `project.version` +2. If `predicate-contracts` version changes, update dependency pin in: + - `predicate_authority/pyproject.toml` (`predicate-contracts>=X, `phase1-ci-and-release`. +3. Click **Run workflow** with input `publish=true`. +4. Workflow order is enforced: + - `publish-predicate-contracts` + - `publish-predicate-authority` (runs only after contracts publish succeeds) + +## 4) Verify published artifacts + +```bash +python -m pip install --upgrade predicate-contracts predicate-authority +python - <<'PY' +import predicate_contracts +import predicate_authority +print("ok", predicate_contracts.__name__, predicate_authority.__name__) +PY +``` + +## 5) Manual fallback publish (if needed) + +```bash +python -m pip install --upgrade build twine +python -m build predicate_contracts +twine check predicate_contracts/dist/* +TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_PREDICATE_CONTRACTS" twine upload predicate_contracts/dist/* + +python -m build predicate_authority +twine check predicate_authority/dist/* +TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_PREDICATE_AUTHORITY" twine upload predicate_authority/dist/* +``` diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..7895665 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,15 @@ +# Phase 1 Examples + +These scripts show the intended local guard patterns for Phase 1: + +- `browser_guard_example.py`: sdk-python style pre-action authorization hook using typed step evidence. +- `mcp_tool_guard_example.py`: guard for MCP tool execution. +- `outbound_http_guard_example.py`: guard for outbound HTTP actions. + +Run with: + +```bash +PYTHONPATH=. python examples/browser_guard_example.py +PYTHONPATH=. python examples/mcp_tool_guard_example.py +PYTHONPATH=. python examples/outbound_http_guard_example.py +``` diff --git a/examples/browser_guard_example.py b/examples/browser_guard_example.py new file mode 100644 index 0000000..cc9ee82 --- /dev/null +++ b/examples/browser_guard_example.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import secrets + +from predicate_authority import ActionGuard, InMemoryProofLedger, LocalMandateSigner, PolicyEngine +from predicate_authority.integrations import SdkAssertionRecord, SdkStepEvidence, authorize_sdk_step +from predicate_contracts import PolicyEffect, PolicyRule + + +def main() -> None: + guard = ActionGuard( + policy_engine=PolicyEngine( + rules=( + PolicyRule( + name="allow-browser-checkout-submit", + effect=PolicyEffect.ALLOW, + principals=("agent:web-checkout",), + actions=("browser.submit",), + resources=("https://shop.example.com/checkout",), + required_labels=("postcondition.url_contains:/receipt",), + ), + ) + ), + mandate_signer=LocalMandateSigner(secret_key=secrets.token_hex(32)), + proof_ledger=InMemoryProofLedger(), + ) + + step = SdkStepEvidence( + principal_id="agent:web-checkout", + action="browser.submit", + resource="https://shop.example.com/checkout", + intent="submit checkout form", + state_hash="state-hash-from-sdk-python", + assertions=( + SdkAssertionRecord( + label="postcondition.url_contains:/receipt", + passed=True, + ), + ), + ) + result = authorize_sdk_step(guard, step) + print(f"allowed={result.decision.allowed}, reason={result.decision.reason.value}") + if result.decision.mandate is not None: + print(f"mandate_id={result.decision.mandate.claims.mandate_id}") + + +if __name__ == "__main__": + main() diff --git a/examples/mcp_tool_guard_example.py b/examples/mcp_tool_guard_example.py new file mode 100644 index 0000000..f9f8853 --- /dev/null +++ b/examples/mcp_tool_guard_example.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import secrets + +from predicate_authority import ActionGuard, InMemoryProofLedger, LocalMandateSigner, PolicyEngine +from predicate_contracts import ( + ActionRequest, + ActionSpec, + PolicyEffect, + PolicyRule, + PrincipalRef, + StateEvidence, + VerificationEvidence, +) + + +def main() -> None: + guard = ActionGuard( + policy_engine=PolicyEngine( + rules=( + PolicyRule( + name="allow-safe-tool", + effect=PolicyEffect.ALLOW, + principals=("agent:mcp-assistant",), + actions=("mcp.execute",), + resources=("mcp://tools/web_search",), + ), + ) + ), + mandate_signer=LocalMandateSigner(secret_key=secrets.token_hex(32)), + proof_ledger=InMemoryProofLedger(), + ) + request = ActionRequest( + principal=PrincipalRef(principal_id="agent:mcp-assistant"), + action_spec=ActionSpec( + action="mcp.execute", + resource="mcp://tools/web_search", + intent="search docs for release notes", + ), + state_evidence=StateEvidence(source="mcp", state_hash="tool-context-hash"), + verification_evidence=VerificationEvidence(), + ) + + result = guard.enforce(lambda: "tool-call-result", request) + print(f"result={result.value}") + print(f"mandate_id={result.mandate.claims.mandate_id}") + + +if __name__ == "__main__": + main() diff --git a/examples/outbound_http_guard_example.py b/examples/outbound_http_guard_example.py new file mode 100644 index 0000000..34a5d64 --- /dev/null +++ b/examples/outbound_http_guard_example.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import secrets + +from predicate_authority import ActionGuard, InMemoryProofLedger, LocalMandateSigner, PolicyEngine +from predicate_contracts import ( + ActionRequest, + ActionSpec, + PolicyEffect, + PolicyRule, + PrincipalRef, + StateEvidence, + VerificationEvidence, +) + + +def perform_http_post() -> str: + # Placeholder for actual outbound HTTP call. + return "201 Created" + + +def main() -> None: + guard = ActionGuard( + policy_engine=PolicyEngine( + rules=( + PolicyRule( + name="allow-vendor-api-post", + effect=PolicyEffect.ALLOW, + principals=("agent:backend-billing",), + actions=("http.post",), + resources=("https://api.vendor.com/invoices",), + ), + ) + ), + mandate_signer=LocalMandateSigner(secret_key=secrets.token_hex(32)), + proof_ledger=InMemoryProofLedger(), + ) + request = ActionRequest( + principal=PrincipalRef(principal_id="agent:backend-billing"), + action_spec=ActionSpec( + action="http.post", + resource="https://api.vendor.com/invoices", + intent="create invoice for order 1432", + ), + state_evidence=StateEvidence(source="backend", state_hash="order-1432-input-hash"), + verification_evidence=VerificationEvidence(), + ) + + result = guard.enforce(perform_http_post, request) + print(f"http_status={result.value}") + print(f"authorized={result.decision.allowed}") + + +if __name__ == "__main__": + main() diff --git a/predicate_authority/README.md b/predicate_authority/README.md new file mode 100644 index 0000000..e933530 --- /dev/null +++ b/predicate_authority/README.md @@ -0,0 +1,11 @@ +# predicate-authority + +`predicate-authority` provides pre-execution authorization for AI agent actions. + +Core pieces: + +- `PolicyEngine` for allow/deny + required verification labels, +- `ActionGuard` for pre-action `authorize` / `enforce`, +- `LocalMandateSigner` for signed short-lived mandates, +- `InMemoryProofLedger` and optional `OpenTelemetryTraceEmitter`, +- typed integration adapters (including `sdk-python` mapping helpers). diff --git a/predicate_authority/__init__.py b/predicate_authority/__init__.py new file mode 100644 index 0000000..6870533 --- /dev/null +++ b/predicate_authority/__init__.py @@ -0,0 +1,20 @@ +from predicate_authority.bridge import IdentityBridge, TokenExchangeResult +from predicate_authority.errors import AuthorizationDeniedError +from predicate_authority.guard import ActionExecutionResult, ActionGuard +from predicate_authority.mandate import LocalMandateSigner +from predicate_authority.policy import PolicyEngine, PolicyMatchResult +from predicate_authority.proof import InMemoryProofLedger +from predicate_authority.telemetry import OpenTelemetryTraceEmitter + +__all__ = [ + "ActionExecutionResult", + "ActionGuard", + "AuthorizationDeniedError", + "IdentityBridge", + "InMemoryProofLedger", + "LocalMandateSigner", + "OpenTelemetryTraceEmitter", + "PolicyEngine", + "PolicyMatchResult", + "TokenExchangeResult", +] diff --git a/predicate_authority/bridge.py b/predicate_authority/bridge.py new file mode 100644 index 0000000..37978c8 --- /dev/null +++ b/predicate_authority/bridge.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import hashlib +import time +from dataclasses import dataclass + +from predicate_contracts import PrincipalRef, StateEvidence + + +@dataclass(frozen=True) +class TokenExchangeResult: + access_token: str + expires_at_epoch_s: int + token_type: str = "Bearer" + + +class IdentityBridge: + """Local placeholder bridge for Phase 1. + + This keeps an explicit interface so Phase 2 can swap in a real OIDC/Entra bridge. + """ + + def __init__(self, token_ttl_seconds: int = 300) -> None: + self._token_ttl_seconds = token_ttl_seconds + + def exchange_token( + self, subject: PrincipalRef, state_evidence: StateEvidence + ) -> TokenExchangeResult: + expires_at = int(time.time()) + self._token_ttl_seconds + token_seed = f"{subject.principal_id}|{state_evidence.state_hash}|{expires_at}" + token_hash = hashlib.sha256(token_seed.encode("utf-8")).hexdigest() + return TokenExchangeResult( + access_token=f"local.{token_hash}", expires_at_epoch_s=expires_at + ) diff --git a/predicate_authority/errors.py b/predicate_authority/errors.py new file mode 100644 index 0000000..7dd843b --- /dev/null +++ b/predicate_authority/errors.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from predicate_contracts import AuthorizationDecision + + +class AuthorizationDeniedError(RuntimeError): + def __init__(self, decision: AuthorizationDecision) -> None: + self.decision = decision + super().__init__(f"Authorization denied: {decision.reason.value}") diff --git a/predicate_authority/guard.py b/predicate_authority/guard.py new file mode 100644 index 0000000..2331b0c --- /dev/null +++ b/predicate_authority/guard.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Generic, TypeVar + +from predicate_authority.errors import AuthorizationDeniedError +from predicate_authority.mandate import LocalMandateSigner +from predicate_authority.policy import PolicyEngine +from predicate_authority.proof import InMemoryProofLedger +from predicate_contracts import ( + ActionRequest, + AuthorizationDecision, + AuthorizationReason, + SignedMandate, +) + +T = TypeVar("T") + + +@dataclass(frozen=True) +class ActionExecutionResult(Generic[T]): + value: T + decision: AuthorizationDecision + mandate: SignedMandate + + +class ActionGuard: + def __init__( + self, + policy_engine: PolicyEngine, + mandate_signer: LocalMandateSigner, + proof_ledger: InMemoryProofLedger, + ) -> None: + self._policy_engine = policy_engine + self._mandate_signer = mandate_signer + self._proof_ledger = proof_ledger + + def authorize(self, request: ActionRequest) -> AuthorizationDecision: + evaluation = self._policy_engine.evaluate(request) + if not evaluation.allowed: + decision = AuthorizationDecision( + allowed=False, + reason=evaluation.reason, + violated_rule=evaluation.matched_rule, + missing_labels=evaluation.missing_labels, + ) + self._proof_ledger.record(decision, request) + return decision + + mandate = self._mandate_signer.issue(request) + decision = AuthorizationDecision( + allowed=True, + reason=AuthorizationReason.ALLOWED, + mandate=mandate, + violated_rule=evaluation.matched_rule, + ) + self._proof_ledger.record(decision, request) + return decision + + def enforce( + self, action_callable: Callable[[], T], request: ActionRequest + ) -> ActionExecutionResult[T]: + decision = self.authorize(request) + if not decision.allowed or decision.mandate is None: + raise AuthorizationDeniedError(decision) + value = action_callable() + return ActionExecutionResult(value=value, decision=decision, mandate=decision.mandate) diff --git a/predicate_authority/integrations/__init__.py b/predicate_authority/integrations/__init__.py new file mode 100644 index 0000000..a655528 --- /dev/null +++ b/predicate_authority/integrations/__init__.py @@ -0,0 +1,15 @@ +from predicate_authority.integrations.sdk_python import ( + SdkAssertionRecord, + SdkPreActionAuthResult, + SdkStepEvidence, + authorize_sdk_step, + to_action_request, +) + +__all__ = [ + "SdkAssertionRecord", + "SdkPreActionAuthResult", + "SdkStepEvidence", + "authorize_sdk_step", + "to_action_request", +] diff --git a/predicate_authority/integrations/sdk_python.py b/predicate_authority/integrations/sdk_python.py new file mode 100644 index 0000000..9bd1f79 --- /dev/null +++ b/predicate_authority/integrations/sdk_python.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from predicate_authority.guard import ActionGuard +from predicate_contracts import ( + ActionRequest, + ActionSpec, + AuthorizationDecision, + PrincipalRef, + StateEvidence, + VerificationEvidence, + VerificationSignal, + VerificationStatus, +) + + +@dataclass(frozen=True) +class SdkAssertionRecord: + """Typed view of one sdk-python assertion result.""" + + label: str + passed: bool + required: bool = True + reason: str | None = None + + +@dataclass(frozen=True) +class SdkStepEvidence: + """Runtime evidence needed by authority before a sensitive action.""" + + principal_id: str + action: str + resource: str + intent: str + state_hash: str + state_source: str = "sdk-python" + assertions: tuple[SdkAssertionRecord, ...] = () + tenant_id: str | None = None + session_id: str | None = None + + +def to_action_request(step: SdkStepEvidence) -> ActionRequest: + verification_signals = tuple( + VerificationSignal( + label=assertion.label, + status=VerificationStatus.PASSED if assertion.passed else VerificationStatus.FAILED, + required=assertion.required, + reason=assertion.reason, + ) + for assertion in step.assertions + ) + return ActionRequest( + principal=PrincipalRef( + principal_id=step.principal_id, + tenant_id=step.tenant_id, + session_id=step.session_id, + ), + action_spec=ActionSpec( + action=step.action, + resource=step.resource, + intent=step.intent, + ), + state_evidence=StateEvidence( + source=step.state_source, + state_hash=step.state_hash, + ), + verification_evidence=VerificationEvidence(signals=verification_signals), + ) + + +@dataclass(frozen=True) +class SdkPreActionAuthResult: + request: ActionRequest + decision: AuthorizationDecision + + +def authorize_sdk_step(guard: ActionGuard, step: SdkStepEvidence) -> SdkPreActionAuthResult: + request = to_action_request(step) + decision = guard.authorize(request) + return SdkPreActionAuthResult(request=request, decision=decision) diff --git a/predicate_authority/mandate.py b/predicate_authority/mandate.py new file mode 100644 index 0000000..68346ba --- /dev/null +++ b/predicate_authority/mandate.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import time +from dataclasses import asdict + +from predicate_contracts import ActionRequest, MandateClaims, SignedMandate + + +class LocalMandateSigner: + def __init__(self, secret_key: str, ttl_seconds: int = 300) -> None: + if ttl_seconds <= 0: + raise ValueError("ttl_seconds must be > 0") + self._secret_key = secret_key.encode("utf-8") + self._ttl_seconds = ttl_seconds + + def issue(self, request: ActionRequest) -> SignedMandate: + issued_at = int(time.time()) + expires_at = issued_at + self._ttl_seconds + intent_hash = hashlib.sha256(request.action_spec.intent.encode("utf-8")).hexdigest() + mandate_id_seed = ( + f"{request.principal.principal_id}|" + f"{request.action_spec.action}|" + f"{request.action_spec.resource}|" + f"{intent_hash}|" + f"{request.state_evidence.state_hash}|" + f"{issued_at}" + ) + mandate_id = hashlib.sha256(mandate_id_seed.encode("utf-8")).hexdigest()[:24] + + claims = MandateClaims( + mandate_id=mandate_id, + principal_id=request.principal.principal_id, + action=request.action_spec.action, + resource=request.action_spec.resource, + intent_hash=intent_hash, + state_hash=request.state_evidence.state_hash, + issued_at_epoch_s=issued_at, + expires_at_epoch_s=expires_at, + ) + token, signature = self._sign_claims(claims) + return SignedMandate(token=token, claims=claims, signature=signature) + + def verify(self, token: str) -> SignedMandate | None: + parts = token.split(".") + if len(parts) != 3: + return None + + encoded_header, encoded_payload, encoded_signature = parts + signing_input = f"{encoded_header}.{encoded_payload}".encode() + expected_signature = self._hmac(signing_input) + expected_signature_encoded = self._base64url_encode(expected_signature) + if not hmac.compare_digest(expected_signature_encoded, encoded_signature): + return None + + try: + payload_json = self._base64url_decode(encoded_payload).decode("utf-8") + payload = json.loads(payload_json) + claims = MandateClaims(**payload) + except (ValueError, TypeError, json.JSONDecodeError): + return None + + now_epoch = int(time.time()) + if claims.expires_at_epoch_s < now_epoch: + return None + return SignedMandate(token=token, claims=claims, signature=encoded_signature) + + def _sign_claims(self, claims: MandateClaims) -> tuple[str, str]: + header_json = json.dumps( + {"alg": "HS256", "typ": "JWT"}, separators=(",", ":"), sort_keys=True + ) + payload_json = json.dumps(asdict(claims), separators=(",", ":"), sort_keys=True) + + encoded_header = self._base64url_encode(header_json.encode("utf-8")) + encoded_payload = self._base64url_encode(payload_json.encode("utf-8")) + signing_input = f"{encoded_header}.{encoded_payload}".encode() + signature = self._base64url_encode(self._hmac(signing_input)) + token = f"{encoded_header}.{encoded_payload}.{signature}" + return token, signature + + def _hmac(self, payload: bytes) -> bytes: + return hmac.new(self._secret_key, payload, hashlib.sha256).digest() + + @staticmethod + def _base64url_encode(value: bytes) -> str: + return base64.urlsafe_b64encode(value).rstrip(b"=").decode("ascii") + + @staticmethod + def _base64url_decode(value: str) -> bytes: + padding = "=" * ((4 - len(value) % 4) % 4) + return base64.urlsafe_b64decode(value + padding) diff --git a/predicate_authority/policy.py b/predicate_authority/policy.py new file mode 100644 index 0000000..99b9835 --- /dev/null +++ b/predicate_authority/policy.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from dataclasses import dataclass +from fnmatch import fnmatch + +from predicate_contracts import ActionRequest, AuthorizationReason, PolicyEffect, PolicyRule + + +@dataclass(frozen=True) +class PolicyMatchResult: + allowed: bool + reason: AuthorizationReason + matched_rule: str | None = None + missing_labels: tuple[str, ...] = () + + +class PolicyEngine: + def __init__(self, rules: tuple[PolicyRule, ...]) -> None: + self._rules = rules + + def evaluate(self, request: ActionRequest) -> PolicyMatchResult: + matching_rules = [rule for rule in self._rules if self._matches_rule(rule, request)] + if not matching_rules: + return PolicyMatchResult( + allowed=False, + reason=AuthorizationReason.NO_MATCHING_POLICY, + ) + + for rule in matching_rules: + if rule.effect == PolicyEffect.DENY: + return PolicyMatchResult( + allowed=False, + reason=AuthorizationReason.EXPLICIT_DENY, + matched_rule=rule.name, + ) + + for rule in matching_rules: + if rule.effect != PolicyEffect.ALLOW: + continue + + missing_labels = tuple( + label + for label in rule.required_labels + if not request.verification_evidence.is_label_passed(label) + ) + if missing_labels: + return PolicyMatchResult( + allowed=False, + reason=AuthorizationReason.MISSING_REQUIRED_VERIFICATION, + matched_rule=rule.name, + missing_labels=missing_labels, + ) + + return PolicyMatchResult( + allowed=True, + reason=AuthorizationReason.ALLOWED, + matched_rule=rule.name, + ) + + return PolicyMatchResult( + allowed=False, + reason=AuthorizationReason.NO_MATCHING_POLICY, + ) + + @staticmethod + def _matches_rule(rule: PolicyRule, request: ActionRequest) -> bool: + principal_ok = any( + fnmatch(request.principal.principal_id, pattern) for pattern in rule.principals + ) + action_ok = any(fnmatch(request.action_spec.action, pattern) for pattern in rule.actions) + resource_ok = any( + fnmatch(request.action_spec.resource, pattern) for pattern in rule.resources + ) + return principal_ok and action_ok and resource_ok diff --git a/predicate_authority/proof.py b/predicate_authority/proof.py new file mode 100644 index 0000000..e78471e --- /dev/null +++ b/predicate_authority/proof.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field + +from predicate_contracts import ActionRequest, AuthorizationDecision, ProofEvent, TraceEmitter + + +@dataclass +class InMemoryProofLedger: + trace_emitter: TraceEmitter | None = None + events: list[ProofEvent] = field(default_factory=list) + + def record(self, decision: AuthorizationDecision, request: ActionRequest) -> ProofEvent: + event = ProofEvent( + event_type="authority.decision", + principal_id=request.principal.principal_id, + action=request.action_spec.action, + resource=request.action_spec.resource, + reason=decision.reason, + allowed=decision.allowed, + mandate_id=decision.mandate.claims.mandate_id if decision.mandate else None, + emitted_at_epoch_s=int(time.time()), + ) + self.events.append(event) + if self.trace_emitter is not None: + self.trace_emitter.emit(event) + return event diff --git a/predicate_authority/pyproject.toml b/predicate_authority/pyproject.toml new file mode 100644 index 0000000..915e2f7 --- /dev/null +++ b/predicate_authority/pyproject.toml @@ -0,0 +1,30 @@ +[build-system] +requires = ["setuptools>=69.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "predicate-authority" +version = "0.1.0" +description = "Pre-execution authority enforcement runtime for AI agents." +readme = "README.md" +requires-python = ">=3.11" +license = "MIT OR Apache-2.0" +authors = [{ name = "Predicate Systems" }] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.11", + "Typing :: Typed", +] +dependencies = [ + "predicate-contracts>=0.1.0,<0.2.0", +] + +[project.optional-dependencies] +telemetry = ["opentelemetry-api>=1.24.0"] + +[tool.setuptools] +packages = ["predicate_authority", "predicate_authority.integrations"] + +[tool.setuptools.package-dir] +"predicate_authority" = "." +"predicate_authority.integrations" = "integrations" diff --git a/predicate_authority/telemetry.py b/predicate_authority/telemetry.py new file mode 100644 index 0000000..741eb28 --- /dev/null +++ b/predicate_authority/telemetry.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from contextlib import AbstractContextManager +from typing import Protocol, cast + +from predicate_contracts import ProofEvent, TraceEmitter + + +class SpanLike(Protocol): + def set_attribute(self, key: str, value: str | bool | int) -> None: ... + + +class TracerLike(Protocol): + def start_as_current_span(self, name: str) -> AbstractContextManager[SpanLike]: ... + + +class OpenTelemetryTraceEmitter(TraceEmitter): + """TraceEmitter backed by OpenTelemetry spans/events.""" + + def __init__(self, tracer: TracerLike | None = None) -> None: + self._tracer = tracer or self._default_tracer() + + def emit(self, event: ProofEvent) -> None: + with self._tracer.start_as_current_span("predicate.authority.decision") as span: + span.set_attribute("predicate.event_type", event.event_type) + span.set_attribute("predicate.principal_id", event.principal_id) + span.set_attribute("predicate.action", event.action) + span.set_attribute("predicate.resource", event.resource) + span.set_attribute("predicate.reason", event.reason.value) + span.set_attribute("predicate.allowed", event.allowed) + span.set_attribute("predicate.emitted_at_epoch_s", event.emitted_at_epoch_s) + if event.mandate_id is not None: + span.set_attribute("predicate.mandate_id", event.mandate_id) + + @staticmethod + def _default_tracer() -> TracerLike: + try: + from opentelemetry import trace + except ImportError as exc: + raise RuntimeError( + "OpenTelemetryTraceEmitter requires 'opentelemetry-api'. " + "Install it or pass an explicit tracer." + ) from exc + return cast(TracerLike, trace.get_tracer("predicate-authority")) diff --git a/predicate_contracts/README.md b/predicate_contracts/README.md new file mode 100644 index 0000000..e2411d4 --- /dev/null +++ b/predicate_contracts/README.md @@ -0,0 +1,9 @@ +# predicate-contracts + +`predicate-contracts` is the shared contract package for Predicate authority workflows. + +It contains: + +- typed data contracts (`ActionRequest`, `PolicyRule`, `AuthorizationDecision`, etc.), +- integration protocols (`StateEvidenceProvider`, `VerificationEvidenceProvider`, `TraceEmitter`), +- no runtime dependency on `sdk-python` internals or authority runtime logic. diff --git a/predicate_contracts/__init__.py b/predicate_contracts/__init__.py new file mode 100644 index 0000000..d453dbf --- /dev/null +++ b/predicate_contracts/__init__.py @@ -0,0 +1,41 @@ +from predicate_contracts.models import ( + ActionRequest, + ActionSpec, + AuthorizationDecision, + AuthorizationReason, + MandateClaims, + PolicyEffect, + PolicyRule, + PrincipalRef, + ProofEvent, + SignedMandate, + StateEvidence, + VerificationEvidence, + VerificationSignal, + VerificationStatus, +) +from predicate_contracts.protocols import ( + StateEvidenceProvider, + TraceEmitter, + VerificationEvidenceProvider, +) + +__all__ = [ + "ActionRequest", + "ActionSpec", + "AuthorizationDecision", + "AuthorizationReason", + "MandateClaims", + "PolicyEffect", + "PolicyRule", + "PrincipalRef", + "ProofEvent", + "SignedMandate", + "StateEvidence", + "StateEvidenceProvider", + "TraceEmitter", + "VerificationEvidence", + "VerificationEvidenceProvider", + "VerificationSignal", + "VerificationStatus", +] diff --git a/predicate_contracts/models.py b/predicate_contracts/models.py new file mode 100644 index 0000000..5c25505 --- /dev/null +++ b/predicate_contracts/models.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum + + +class PolicyEffect(str, Enum): + ALLOW = "allow" + DENY = "deny" + + +class VerificationStatus(str, Enum): + PASSED = "passed" + FAILED = "failed" + SKIPPED = "skipped" + + +class AuthorizationReason(str, Enum): + ALLOWED = "allowed" + NO_MATCHING_POLICY = "no_matching_policy" + EXPLICIT_DENY = "explicit_deny" + MISSING_REQUIRED_VERIFICATION = "missing_required_verification" + INVALID_MANDATE = "invalid_mandate" + + +@dataclass(frozen=True) +class PrincipalRef: + principal_id: str + tenant_id: str | None = None + session_id: str | None = None + + +@dataclass(frozen=True) +class ActionSpec: + action: str + resource: str + intent: str + + +@dataclass(frozen=True) +class StateEvidence: + source: str + state_hash: str + schema_version: str = "v1" + confidence: float | None = None + + +@dataclass(frozen=True) +class VerificationSignal: + label: str + status: VerificationStatus + required: bool = True + reason: str | None = None + + +@dataclass(frozen=True) +class VerificationEvidence: + signals: tuple[VerificationSignal, ...] = field(default_factory=tuple) + + def is_label_passed(self, label: str) -> bool: + for signal in self.signals: + if signal.label == label and signal.status == VerificationStatus.PASSED: + return True + return False + + +@dataclass(frozen=True) +class ActionRequest: + principal: PrincipalRef + action_spec: ActionSpec + state_evidence: StateEvidence + verification_evidence: VerificationEvidence + + +@dataclass(frozen=True) +class PolicyRule: + name: str + effect: PolicyEffect + principals: tuple[str, ...] + actions: tuple[str, ...] + resources: tuple[str, ...] + required_labels: tuple[str, ...] = field(default_factory=tuple) + + +@dataclass(frozen=True) +class MandateClaims: + mandate_id: str + principal_id: str + action: str + resource: str + intent_hash: str + state_hash: str + issued_at_epoch_s: int + expires_at_epoch_s: int + + +@dataclass(frozen=True) +class SignedMandate: + token: str + claims: MandateClaims + signature: str + + +@dataclass(frozen=True) +class AuthorizationDecision: + allowed: bool + reason: AuthorizationReason + mandate: SignedMandate | None = None + violated_rule: str | None = None + missing_labels: tuple[str, ...] = field(default_factory=tuple) + + +@dataclass(frozen=True) +class ProofEvent: + event_type: str + principal_id: str + action: str + resource: str + reason: AuthorizationReason + allowed: bool + mandate_id: str | None + emitted_at_epoch_s: int diff --git a/predicate_contracts/protocols.py b/predicate_contracts/protocols.py new file mode 100644 index 0000000..26f719b --- /dev/null +++ b/predicate_contracts/protocols.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from typing import Protocol + +from predicate_contracts.models import ProofEvent, StateEvidence, VerificationEvidence + + +class StateEvidenceProvider(Protocol): + def get_state_evidence(self) -> StateEvidence: ... + + +class VerificationEvidenceProvider(Protocol): + def get_verification_evidence(self) -> VerificationEvidence: ... + + +class TraceEmitter(Protocol): + def emit(self, event: ProofEvent) -> None: ... diff --git a/predicate_contracts/pyproject.toml b/predicate_contracts/pyproject.toml new file mode 100644 index 0000000..0e4eed1 --- /dev/null +++ b/predicate_contracts/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=69.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "predicate-contracts" +version = "0.1.0" +description = "Shared typed contracts for Predicate authority and integrations." +readme = "README.md" +requires-python = ">=3.11" +license = "MIT OR Apache-2.0" +authors = [{ name = "Predicate Systems" }] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.11", + "Typing :: Typed", +] + +[tool.setuptools] +packages = ["predicate_contracts"] + +[tool.setuptools.package-dir] +"predicate_contracts" = "." diff --git a/pyproject.toml b/pyproject.toml index a392cff..ff9876f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,3 +62,7 @@ exclude = [ [tool.bandit] exclude_dirs = ["tests", "venv", ".venv", "build", "dist"] skips = ["B101", "B601"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] diff --git a/scripts/verify_release_order.py b/scripts/verify_release_order.py new file mode 100644 index 0000000..eac08e4 --- /dev/null +++ b/scripts/verify_release_order.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class PackageReleaseSpec: + name: str + depends_on: tuple[str, ...] = field(default_factory=tuple) + + +def verify_release_order( + release_order: tuple[str, ...], package_specs: tuple[PackageReleaseSpec, ...] +) -> None: + package_index = {name: index for index, name in enumerate(release_order)} + for spec in package_specs: + if spec.name not in package_index: + raise SystemExit(f"Package '{spec.name}' missing from release order.") + for dependency in spec.depends_on: + if dependency not in package_index: + raise SystemExit( + f"Dependency '{dependency}' for package '{spec.name}' missing from release order." + ) + if package_index[dependency] > package_index[spec.name]: + raise SystemExit( + f"Invalid order: dependency '{dependency}' must be released before '{spec.name}'." + ) + + +def main() -> None: + package_specs = ( + PackageReleaseSpec(name="predicate-contracts"), + PackageReleaseSpec(name="predicate-authority", depends_on=("predicate-contracts",)), + ) + release_order = ("predicate-contracts", "predicate-authority") + verify_release_order(release_order=release_order, package_specs=package_specs) + print("Release order validated: predicate-contracts -> predicate-authority") + + +if __name__ == "__main__": + main() diff --git a/tests/test_mandate_signer.py b/tests/test_mandate_signer.py new file mode 100644 index 0000000..4ec7a47 --- /dev/null +++ b/tests/test_mandate_signer.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from predicate_authority import LocalMandateSigner +from predicate_contracts import ( + ActionRequest, + ActionSpec, + PrincipalRef, + StateEvidence, + VerificationEvidence, +) + + +def test_mandate_signature_verifies() -> None: + signer = LocalMandateSigner(secret_key="test-key", ttl_seconds=60) + request = ActionRequest( + principal=PrincipalRef(principal_id="agent:writer"), + action_spec=ActionSpec( + action="mcp.execute", resource="mcp://tools/write_file", intent="write report" + ), + state_evidence=StateEvidence(source="non-web", state_hash="state-xyz"), + verification_evidence=VerificationEvidence(), + ) + + signed = signer.issue(request) + verified = signer.verify(signed.token) + + assert verified is not None + assert verified.claims.mandate_id == signed.claims.mandate_id + assert verified.claims.intent_hash == signed.claims.intent_hash + + +def test_mandate_tamper_is_rejected() -> None: + signer = LocalMandateSigner(secret_key="test-key", ttl_seconds=60) + request = ActionRequest( + principal=PrincipalRef(principal_id="agent:writer"), + action_spec=ActionSpec( + action="mcp.execute", resource="mcp://tools/write_file", intent="write report" + ), + state_evidence=StateEvidence(source="non-web", state_hash="state-xyz"), + verification_evidence=VerificationEvidence(), + ) + signed = signer.issue(request) + + tampered = signed.token[:-1] + ("A" if signed.token[-1] != "A" else "B") + assert signer.verify(tampered) is None diff --git a/tests/test_policy_and_guard.py b/tests/test_policy_and_guard.py new file mode 100644 index 0000000..4f61b1c --- /dev/null +++ b/tests/test_policy_and_guard.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import pytest + +from predicate_authority import ( + ActionGuard, + AuthorizationDeniedError, + InMemoryProofLedger, + LocalMandateSigner, + PolicyEngine, +) +from predicate_contracts import ( + ActionRequest, + ActionSpec, + AuthorizationReason, + PolicyEffect, + PolicyRule, + PrincipalRef, + StateEvidence, + VerificationEvidence, + VerificationSignal, + VerificationStatus, +) + + +def _build_request(with_verified_label: bool) -> ActionRequest: + signals = ( + VerificationSignal( + label="postcondition.url_contains:/checkout", + status=VerificationStatus.PASSED if with_verified_label else VerificationStatus.FAILED, + required=True, + ), + ) + return ActionRequest( + principal=PrincipalRef(principal_id="agent:checkout"), + action_spec=ActionSpec( + action="http.post", + resource="https://api.vendor.com/orders", + intent="submit order payload", + ), + state_evidence=StateEvidence(source="sdk-python", state_hash="abc123"), + verification_evidence=VerificationEvidence(signals=signals), + ) + + +def _build_guard() -> ActionGuard: + rules = ( + PolicyRule( + name="allow-checkout", + effect=PolicyEffect.ALLOW, + principals=("agent:*",), + actions=("http.*",), + resources=("https://api.vendor.com/*",), + required_labels=("postcondition.url_contains:/checkout",), + ), + ) + return ActionGuard( + policy_engine=PolicyEngine(rules=rules), + mandate_signer=LocalMandateSigner(secret_key="dev-secret", ttl_seconds=120), + proof_ledger=InMemoryProofLedger(), + ) + + +def test_authorize_allows_and_issues_signed_mandate() -> None: + guard = _build_guard() + request = _build_request(with_verified_label=True) + + decision = guard.authorize(request) + + assert decision.allowed is True + assert decision.reason == AuthorizationReason.ALLOWED + assert decision.mandate is not None + assert decision.mandate.claims.principal_id == "agent:checkout" + assert decision.mandate.claims.action == "http.post" + + +def test_authorize_denies_when_required_label_missing() -> None: + guard = _build_guard() + request = _build_request(with_verified_label=False) + + decision = guard.authorize(request) + + assert decision.allowed is False + assert decision.reason == AuthorizationReason.MISSING_REQUIRED_VERIFICATION + assert decision.missing_labels == ("postcondition.url_contains:/checkout",) + + +def test_enforce_executes_callable_when_allowed() -> None: + guard = _build_guard() + request = _build_request(with_verified_label=True) + + result = guard.enforce(lambda: "ok", request) + + assert result.value == "ok" + assert result.decision.allowed is True + assert result.mandate.claims.resource == "https://api.vendor.com/orders" + + +def test_enforce_raises_when_denied() -> None: + guard = _build_guard() + request = _build_request(with_verified_label=False) + + with pytest.raises(AuthorizationDeniedError): + guard.enforce(lambda: "should-not-run", request) diff --git a/tests/test_proof_ledger.py b/tests/test_proof_ledger.py new file mode 100644 index 0000000..e665013 --- /dev/null +++ b/tests/test_proof_ledger.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from predicate_authority import InMemoryProofLedger +from predicate_contracts import ( + ActionRequest, + ActionSpec, + AuthorizationDecision, + AuthorizationReason, + PrincipalRef, + ProofEvent, + StateEvidence, + TraceEmitter, + VerificationEvidence, +) + + +@dataclass +class RecordingEmitter(TraceEmitter): + events: list[ProofEvent] = field(default_factory=list) + + def emit(self, event: ProofEvent) -> None: + self.events.append(event) + + +def test_proof_ledger_records_and_emits() -> None: + emitter = RecordingEmitter() + ledger = InMemoryProofLedger(trace_emitter=emitter) + + request = ActionRequest( + principal=PrincipalRef(principal_id="agent:ops"), + action_spec=ActionSpec( + action="infra.apply", resource="terraform://workspace/prod", intent="apply" + ), + state_evidence=StateEvidence(source="infra", state_hash="hash-infra-1"), + verification_evidence=VerificationEvidence(), + ) + decision = AuthorizationDecision(allowed=False, reason=AuthorizationReason.EXPLICIT_DENY) + + event = ledger.record(decision, request) + + assert event.allowed is False + assert event.reason == AuthorizationReason.EXPLICIT_DENY + assert len(ledger.events) == 1 + assert len(emitter.events) == 1 diff --git a/tests/test_sdk_python_integration.py b/tests/test_sdk_python_integration.py new file mode 100644 index 0000000..610e61a --- /dev/null +++ b/tests/test_sdk_python_integration.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from predicate_authority import ActionGuard, InMemoryProofLedger, LocalMandateSigner, PolicyEngine +from predicate_authority.integrations import ( + SdkAssertionRecord, + SdkStepEvidence, + authorize_sdk_step, + to_action_request, +) +from predicate_contracts import AuthorizationReason, PolicyEffect, PolicyRule + + +def _guard() -> ActionGuard: + return ActionGuard( + policy_engine=PolicyEngine( + rules=( + PolicyRule( + name="allow-browser-action-with-label", + effect=PolicyEffect.ALLOW, + principals=("agent:web",), + actions=("browser.click",), + resources=("https://example.com/*",), + required_labels=("postcondition.exists:#receipt",), + ), + ) + ), + mandate_signer=LocalMandateSigner(secret_key="test-secret"), + proof_ledger=InMemoryProofLedger(), + ) + + +def test_to_action_request_maps_assertions_to_verification_signals() -> None: + step = SdkStepEvidence( + principal_id="agent:web", + action="browser.click", + resource="https://example.com/checkout", + intent="click pay", + state_hash="hash-a", + assertions=( + SdkAssertionRecord(label="postcondition.exists:#receipt", passed=True), + SdkAssertionRecord(label="postcondition.url_contains:/receipt", passed=False), + ), + ) + request = to_action_request(step) + + assert request.principal.principal_id == "agent:web" + assert request.state_evidence.state_hash == "hash-a" + assert len(request.verification_evidence.signals) == 2 + assert request.verification_evidence.is_label_passed("postcondition.exists:#receipt") is True + + +def test_authorize_sdk_step_uses_guard_and_returns_decision() -> None: + guard = _guard() + step = SdkStepEvidence( + principal_id="agent:web", + action="browser.click", + resource="https://example.com/checkout", + intent="click pay", + state_hash="hash-a", + assertions=(SdkAssertionRecord(label="postcondition.exists:#receipt", passed=True),), + ) + + result = authorize_sdk_step(guard, step) + + assert result.decision.allowed is True + assert result.decision.reason == AuthorizationReason.ALLOWED + assert result.decision.mandate is not None diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 0000000..4349179 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from contextlib import AbstractContextManager +from dataclasses import dataclass, field +from typing import Any + +from predicate_authority.telemetry import OpenTelemetryTraceEmitter +from predicate_contracts import AuthorizationReason, ProofEvent + + +@dataclass +class FakeSpan: + attributes: dict[str, Any] = field(default_factory=dict) + + def set_attribute(self, key: str, value: str | bool | int) -> None: + self.attributes[key] = value + + +@dataclass +class FakeSpanContextManager(AbstractContextManager[FakeSpan]): + span: FakeSpan + + def __enter__(self) -> FakeSpan: + return self.span + + def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + return None + + +@dataclass +class FakeTracer: + spans: list[FakeSpan] = field(default_factory=list) + + def start_as_current_span(self, name: str) -> FakeSpanContextManager: + span = FakeSpan() + span.set_attribute("span.name", name) + self.spans.append(span) + return FakeSpanContextManager(span=span) + + +def test_open_telemetry_emitter_sets_expected_attributes() -> None: + tracer = FakeTracer() + emitter = OpenTelemetryTraceEmitter(tracer=tracer) + event = ProofEvent( + event_type="authority.decision", + principal_id="agent:ops", + action="infra.apply", + resource="terraform://prod", + reason=AuthorizationReason.ALLOWED, + allowed=True, + mandate_id="mandate123", + emitted_at_epoch_s=1700000000, + ) + + emitter.emit(event) + + assert len(tracer.spans) == 1 + span = tracer.spans[0] + assert span.attributes["span.name"] == "predicate.authority.decision" + assert span.attributes["predicate.principal_id"] == "agent:ops" + assert span.attributes["predicate.allowed"] is True + assert span.attributes["predicate.reason"] == "allowed"