diff --git a/.gitignore b/.gitignore index 2c1dabb..3c87bbc 100644 --- a/.gitignore +++ b/.gitignore @@ -115,3 +115,6 @@ Thumbs.db # Claude Code .claude/ + +# Superpowers documentation (development artifacts) +docs/superpowers/ diff --git a/deploy/cloudrun/README.md b/deploy/cloudrun/README.md index 942acb1..0ec06cc 100644 --- a/deploy/cloudrun/README.md +++ b/deploy/cloudrun/README.md @@ -13,6 +13,7 @@ Deploy the Red Hat Lightspeed Agent for Google Cloud to Google Cloud Run for pro - [3. Set Up Cloud SQL Database](#3-set-up-cloud-sql-database) - [4. Redis Setup for Rate Limiting](#4-redis-setup-for-rate-limiting) - [5. Configure Secrets](#5-configure-secrets) + - [5a. Configure Secret Rotation Schedule](#5a-configure-secret-rotation-schedule) - [6. Copy MCP Image to GCR](#6-copy-mcp-image-to-gcr) - [7. Deploy](#7-deploy) - [Service Configuration](#service-configuration) @@ -439,6 +440,108 @@ echo -n "postgresql+asyncpg://sessions:$SESSION_DB_PASSWORD@/agent_sessions?host # The CA certificate is stored separately (see Redis Setup step 3). ``` +### 5a. Configure Secret Rotation Schedule + +After secrets are populated, bootstrap the rotation schedule metadata and trigger plumbing: + +```bash +./deploy/cloudrun/setup-secret-rotation.sh +``` + +This script configures: + +- **Secret Manager rotation metadata** (`next_rotation_time` + `rotation_period`) for: + - `redhat-sso-client-secret` + - `gma-client-secret` +- **Secret Manager event notifications** by attaching a Pub/Sub topic to those secrets +- **Pub/Sub subscription** to receive `SECRET_ROTATE` events + +> Note: this does not rotate secret values by itself. It configures schedule + `SECRET_ROTATE` notifications so a rotator worker can handle updates. + +**Configure Pub/Sub Push Endpoint (after deployment):** + +After deploying the marketplace handler to Cloud Run, configure the subscription to push events to the `/rotation` endpoint: + +```bash +# Get the marketplace handler URL +MARKETPLACE_HANDLER_URL=$(gcloud run services describe marketplace-handler \ + --region=us-central1 \ + --format='value(status.url)') + +# Get the Cloud Run service account (used for OIDC authentication) +SERVICE_ACCOUNT=$(gcloud run services describe marketplace-handler \ + --region=us-central1 \ + --format='value(spec.template.spec.serviceAccountName)') + +# Configure push endpoint with OIDC authentication +gcloud pubsub subscriptions modify secret-rotation-trigger-sub \ + --push-endpoint="${MARKETPLACE_HANDLER_URL}/rotation" \ + --push-auth-service-account="${SERVICE_ACCOUNT}" \ + --project="${GOOGLE_CLOUD_PROJECT}" +``` + +This configures: +- **Push endpoint**: `https:///rotation` +- **OIDC authentication**: Pub/Sub signs requests with service account JWT +- **Audience**: Marketplace handler validates JWT audience matches service URL + +**Testing the rotation workflow:** + +⚠️ **Prerequisites:** The rotation endpoint is deployed and functional, but secret retrieval requires implementing API-based providers in `src/lightspeed_agent/rotation/providers.py`: + +1. **Implement `RedHatSSOSecretProvider._fetch_secret_from_api()`** + - Authenticate with Red Hat Identity Management API + - Request new OAuth client secret for SSO + - Return the generated secret as a string + +2. **Implement `GMASecretProvider._fetch_secret_from_api()`** + - Authenticate with Google Marketplace Admin API + - Request client secret regeneration + - Return the generated secret as a string + +3. **Register providers in `create_default_registry()`** + ```python + # Uncomment these lines in providers.py: + registry.register("redhat-sso-client-secret", RedHatSSOSecretProvider()) + registry.register("gma-client-secret", GMASecretProvider()) + ``` + +**Current state:** Rotation endpoint will return 500 error with "No provider registered for secret" until providers are implemented. + +Once providers are implemented and registered: + +```bash +# Trigger immediate rotation for testing +gcloud secrets update redhat-sso-client-secret \ + --next-rotation-time="$(date -u +%Y-%m-%dT%H:%M:%SZ)" \ + --project="${GOOGLE_CLOUD_PROJECT}" + +# Monitor rotation logs +gcloud logging read "resource.type=cloud_run_revision \ + AND resource.labels.service_name=marketplace-handler \ + AND jsonPayload.message=~'event_type=secret_rotation_completed'" \ + --limit=10 \ + --format=json \ + --project="${GOOGLE_CLOUD_PROJECT}" +``` + +**What happens during rotation:** + +1. **Secret Manager triggers event** when `next_rotation_time` is reached +2. **Pub/Sub pushes notification** to `/rotation` endpoint with OIDC JWT token +3. **Rotation endpoint validates** the Pub/Sub OIDC token (transport security) +4. **Registry routes to provider** based on secret name in the event +5. **Provider fetches new secret** from upstream API (Red Hat Identity or GMA) +6. **Provider validates secret** (32+ bytes, 10+ unique characters) +7. **Secret Manager stores new version** via `add_secret_version` API +8. **Endpoint logs completion** with `event_type=secret_rotation_completed` + +**Error handling:** +- Invalid OIDC token → 401 response (not retried) +- Missing provider → 500 response (Pub/Sub retries) +- API failure → 500 response (Pub/Sub retries) +- Secret Manager failure → 500 response (Pub/Sub retries) + ### 6. Copy MCP Image to GCR Cloud Run doesn't support Quay.io directly. Copy the MCP server image to GCR. diff --git a/deploy/cloudrun/cleanup.sh b/deploy/cloudrun/cleanup.sh index 58e0862..250e0d9 100755 --- a/deploy/cloudrun/cleanup.sh +++ b/deploy/cloudrun/cleanup.sh @@ -52,6 +52,8 @@ PUBSUB_INVOKER_SA="${PUBSUB_INVOKER_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" # Pub/Sub configuration PUBSUB_TOPIC="${PUBSUB_TOPIC:-marketplace-entitlements}" PUBSUB_SUBSCRIPTION="${PUBSUB_SUBSCRIPTION:-${PUBSUB_TOPIC}-sub}" +ROTATION_TOPIC="${ROTATION_TOPIC:-secret-rotation-trigger}" +ROTATION_SUBSCRIPTION="${ROTATION_SUBSCRIPTION:-secret-rotation-trigger-sub}" # Parse arguments FORCE=false @@ -82,9 +84,11 @@ echo "" echo " - Cloud Run services: $SERVICE_NAME, $HANDLER_SERVICE_NAME" echo " - Pub/Sub topic: $PUBSUB_TOPIC" echo " - Pub/Sub subscription: $PUBSUB_SUBSCRIPTION" +echo " - Rotation Pub/Sub topic: $ROTATION_TOPIC" +echo " - Rotation Pub/Sub subscription: $ROTATION_SUBSCRIPTION" echo " - Secrets: redhat-sso-client-id, redhat-sso-client-secret, database-url," echo " session-database-url, gma-client-id, gma-client-secret, dcr-encryption-key," -echo " rate-limit-redis-url" +echo " rate-limit-redis-url, redis-ca-cert" echo " - Service accounts: $SERVICE_ACCOUNT" echo " $PUBSUB_INVOKER_SA" echo "" @@ -170,6 +174,7 @@ secrets=( "gma-client-secret" "dcr-encryption-key" "rate-limit-redis-url" + "redis-ca-cert" ) for secret in "${secrets[@]}"; do @@ -183,6 +188,29 @@ for secret in "${secrets[@]}"; do fi done +# ============================================================================= +# Step 3b: Delete Secret Rotation Topic and Subscription +# ============================================================================= +log_info "Deleting secret rotation Pub/Sub resources..." + +if gcloud pubsub subscriptions describe "$ROTATION_SUBSCRIPTION" --project="$PROJECT_ID" &>/dev/null; then + gcloud pubsub subscriptions delete "$ROTATION_SUBSCRIPTION" \ + --project="$PROJECT_ID" \ + --quiet + log_info "Rotation Pub/Sub subscription '$ROTATION_SUBSCRIPTION' deleted" +else + log_info "Rotation Pub/Sub subscription '$ROTATION_SUBSCRIPTION' does not exist, skipping" +fi + +if gcloud pubsub topics describe "$ROTATION_TOPIC" --project="$PROJECT_ID" &>/dev/null; then + gcloud pubsub topics delete "$ROTATION_TOPIC" \ + --project="$PROJECT_ID" \ + --quiet + log_info "Rotation Pub/Sub topic '$ROTATION_TOPIC' deleted" +else + log_info "Rotation Pub/Sub topic '$ROTATION_TOPIC' does not exist, skipping" +fi + # ============================================================================= # Step 4: Remove IAM Bindings and Delete Service Account # ============================================================================= @@ -266,6 +294,7 @@ echo "" echo "The following resources have been removed:" echo " - Cloud Run services ($SERVICE_NAME, $HANDLER_SERVICE_NAME)" echo " - Pub/Sub topic and subscription" +echo " - Secret rotation Pub/Sub subscription and trigger topic" echo " - Secret Manager secrets" echo " - Service accounts (runtime + Pub/Sub invoker) and IAM bindings" echo "" diff --git a/deploy/cloudrun/setup-secret-rotation.sh b/deploy/cloudrun/setup-secret-rotation.sh new file mode 100755 index 0000000..98b19ad --- /dev/null +++ b/deploy/cloudrun/setup-secret-rotation.sh @@ -0,0 +1,126 @@ +#!/bin/bash +# ============================================================================= +# Secret Rotation Bootstrap for Cloud Run Deployment +# ============================================================================= +# +# Configures Secret Manager-native rotation plumbing for production: +# 1) Secret Manager rotation metadata (rotation period + next rotation time) +# 2) Secret Manager event notifications to Pub/Sub topics on each secret +# 3) Pub/Sub subscription to receive SECRET_ROTATE events for future rotator workflows +# +# This script does NOT rotate secret values by itself. +# It only creates schedules and notification plumbing. +# +# Usage: +# ./deploy/cloudrun/setup-secret-rotation.sh +# +# Prerequisite: +# Run ./deploy/cloudrun/setup.sh first. +# +# Optional environment variables: +# GOOGLE_CLOUD_PROJECT Required. GCP project id. +# GOOGLE_CLOUD_LOCATION Optional. Scheduler region (default: us-central1). +# ROTATION_TOPIC Optional. Pub/Sub topic (default: secret-rotation-trigger). +# ROTATION_SUBSCRIPTION Optional. Pub/Sub subscription name +# (default: secret-rotation-trigger-sub). +# ROTATION_NEXT_TIME Optional. RFC3339 UTC timestamp for next rotation metadata. +# +# ============================================================================= + +set -euo pipefail + +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' + +log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } +log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } +log_error() { echo -e "${RED}[ERROR]${NC} $1"; } + +PROJECT_ID="${GOOGLE_CLOUD_PROJECT:-}" +REGION="${GOOGLE_CLOUD_LOCATION:-us-central1}" +ROTATION_TOPIC="${ROTATION_TOPIC:-secret-rotation-trigger}" +ROTATION_SUBSCRIPTION="${ROTATION_SUBSCRIPTION:-secret-rotation-trigger-sub}" +# Use first day of next month at midnight UTC by default. +ROTATION_NEXT_TIME="${ROTATION_NEXT_TIME:-$(date -u -d "$(date -u +%Y-%m-01) +1 month" +%Y-%m-01T00:00:00Z)}" +FULL_TOPIC_NAME="projects/${PROJECT_ID}/topics/${ROTATION_TOPIC}" + +if [[ -z "$PROJECT_ID" ]]; then + log_error "GOOGLE_CLOUD_PROJECT environment variable is required" + echo " export GOOGLE_CLOUD_PROJECT=your-project-id" + exit 1 +fi + +# Secret rotation definitions: +# secret_name|rotation_period_seconds +# NOTE: currently set to 3600s (1 hour) for testing. +ROTATION_DEFINITIONS=( + "redhat-sso-client-secret|3600" + "gma-client-secret|3600" +) + +log_info "Configuring secret rotation bootstrap for project: $PROJECT_ID" +log_info "Pub/Sub region (for subscription): $REGION" +log_info "Rotation event topic: $FULL_TOPIC_NAME" +log_info "Rotation event subscription: $ROTATION_SUBSCRIPTION" +log_info "Initial next rotation timestamp: $ROTATION_NEXT_TIME" + +if ! gcloud pubsub topics describe "$ROTATION_TOPIC" --project="$PROJECT_ID" &>/dev/null; then + log_info "Creating Pub/Sub topic: $ROTATION_TOPIC" + gcloud pubsub topics create "$ROTATION_TOPIC" --project="$PROJECT_ID" +else + log_info "Pub/Sub topic already exists: $ROTATION_TOPIC" +fi + +# Create Secret Manager service identity (publisher principal for notifications). +log_info "Ensuring Secret Manager service identity exists..." +gcloud beta services identity create \ + --service="secretmanager.googleapis.com" \ + --project="$PROJECT_ID" --quiet >/dev/null 2>&1 || true + +project_number=$(gcloud projects describe "$PROJECT_ID" --format='value(projectNumber)') +sm_service_account="service-${project_number}@gcp-sa-secretmanager.iam.gserviceaccount.com" + +log_info "Granting Pub/Sub publisher role to Secret Manager service account..." +gcloud pubsub topics add-iam-policy-binding "$ROTATION_TOPIC" \ + --member="serviceAccount:${sm_service_account}" \ + --role="roles/pubsub.publisher" \ + --project="$PROJECT_ID" --quiet >/dev/null + +if ! gcloud pubsub subscriptions describe "$ROTATION_SUBSCRIPTION" --project="$PROJECT_ID" &>/dev/null; then + log_info "Creating Pub/Sub subscription: $ROTATION_SUBSCRIPTION" + gcloud pubsub subscriptions create "$ROTATION_SUBSCRIPTION" \ + --topic="$ROTATION_TOPIC" \ + --project="$PROJECT_ID" --quiet +else + log_info "Pub/Sub subscription already exists: $ROTATION_SUBSCRIPTION" +fi + +for definition in "${ROTATION_DEFINITIONS[@]}"; do + IFS='|' read -r secret_name rotation_period <<< "$definition" + + if ! gcloud secrets describe "$secret_name" --project="$PROJECT_ID" &>/dev/null; then + log_warn "Secret not found, skipping: $secret_name" + continue + fi + + log_info "Configuring rotation + topic notification for $secret_name" + gcloud secrets update "$secret_name" \ + --project="$PROJECT_ID" \ + --add-topics="$FULL_TOPIC_NAME" \ + --next-rotation-time="$ROTATION_NEXT_TIME" \ + --rotation-period="${rotation_period}s" \ + --quiet +done + +echo "" +log_info "Secret rotation bootstrap complete." +echo "" +echo "What is now configured:" +echo " - Secret Manager rotation metadata on 2 secrets" +echo " - Secret event notifications (including SECRET_ROTATE) to topic: $FULL_TOPIC_NAME" +echo " - Pub/Sub subscription to receive rotation events: $ROTATION_SUBSCRIPTION" +echo "" +echo "Next step:" +echo " - Deploy a rotator worker subscribed to '$ROTATION_SUBSCRIPTION' and handle eventType=SECRET_ROTATE." diff --git a/deploy/cloudrun/setup.sh b/deploy/cloudrun/setup.sh index 9e013be..eeb9e7d 100755 --- a/deploy/cloudrun/setup.sh +++ b/deploy/cloudrun/setup.sh @@ -358,14 +358,17 @@ echo " echo -n 'rediss://REDIS_IP:6378/0' | gcloud secrets versions add rate-l echo " # Download and store the Redis server CA certificate for TLS verification:" echo " gcloud redis instances describe lightspeed-redis --region=\$REGION --project=$PROJECT_ID --format='value(serverCaCerts[0].cert)' | gcloud secrets versions add redis-ca-cert --data-file=- --project=$PROJECT_ID" echo "" -echo "3. Copy the MCP server image to GCR (Cloud Run doesn't support Quay.io):" +echo "3. Configure secret rotation schedules and Secret Manager Pub/Sub notifications:" +echo " ./deploy/cloudrun/setup-secret-rotation.sh" +echo "" +echo "4. Copy the MCP server image to GCR (Cloud Run doesn't support Quay.io):" echo " docker pull quay.io/redhat-services-prod/insights-management-tenant/insights-mcp/red-hat-lightspeed-mcp:latest" echo " docker tag quay.io/redhat-services-prod/insights-management-tenant/insights-mcp/red-hat-lightspeed-mcp:latest gcr.io/$PROJECT_ID/red-hat-lightspeed-mcp:latest" echo " docker push gcr.io/$PROJECT_ID/red-hat-lightspeed-mcp:latest" echo "" -echo "4. Build and deploy the agent (includes MCP sidecar):" +echo "5. Build and deploy the agent (includes MCP sidecar):" echo " ./deploy/cloudrun/deploy.sh --build --service all --allow-unauthenticated" echo "" -echo "5. Get the service URL:" +echo "6. Get the service URL:" echo " gcloud run services describe $SERVICE_NAME --region=$REGION --project=$PROJECT_ID --format='value(status.url)'" echo "" diff --git a/src/lightspeed_agent/marketplace/app.py b/src/lightspeed_agent/marketplace/app.py index 7582f27..47b8b48 100644 --- a/src/lightspeed_agent/marketplace/app.py +++ b/src/lightspeed_agent/marketplace/app.py @@ -18,6 +18,7 @@ from lightspeed_agent.config import get_settings from lightspeed_agent.marketplace.router import router as handler_router +from lightspeed_agent.rotation import router as rotation_router from lightspeed_agent.probes import start_probe_server, stop_probe_server from lightspeed_agent.ratelimit import RateLimitMiddleware, get_redis_rate_limiter from lightspeed_agent.security import RequestBodyLimitMiddleware, SecurityHeadersMiddleware @@ -123,6 +124,10 @@ def create_app() -> FastAPI: # This provides the /dcr endpoint that handles both Pub/Sub and DCR app.include_router(handler_router) + # Include the rotation router + # This provides the /rotation endpoint for Secret Manager Pub/Sub events + app.include_router(rotation_router) + # Add rate limiting middleware for /dcr endpoint (IP-based, no auth on this service) app.add_middleware(RateLimitMiddleware, rate_limited_paths={"/dcr"}) diff --git a/src/lightspeed_agent/rotation/__init__.py b/src/lightspeed_agent/rotation/__init__.py new file mode 100644 index 0000000..a759df7 --- /dev/null +++ b/src/lightspeed_agent/rotation/__init__.py @@ -0,0 +1,49 @@ +"""Secret rotation workflow package.""" + +from lightspeed_agent.rotation.gcp import GoogleSecretManagerVersionWriter +from lightspeed_agent.rotation.providers import ( + GMASecretProvider, + MissingSecretValueError, + RedHatSSOSecretProvider, + SecretProviderRegistry, + SecretValueProvider, + create_default_registry, +) +from lightspeed_agent.rotation.pubsub_jwt import ( + PubSubCertificateCache, + PubSubJWTValidator, + get_pubsub_jwt_validator, +) +from lightspeed_agent.rotation.router import router +from lightspeed_agent.rotation.workflow import ( + SUPPORTED_SECRET_NAMES, + RotationEvent, + RotationResult, + RotationWorkflow, + parse_rotation_event, +) + +__all__ = [ + # Providers + "SecretValueProvider", + "RedHatSSOSecretProvider", + "GMASecretProvider", + "SecretProviderRegistry", + "create_default_registry", + "MissingSecretValueError", + # Google Cloud + "GoogleSecretManagerVersionWriter", + # Pub/Sub JWT + "PubSubCertificateCache", + "PubSubJWTValidator", + "get_pubsub_jwt_validator", + # Router + "router", + # Workflow + "RotationEvent", + "RotationResult", + "RotationWorkflow", + "SUPPORTED_SECRET_NAMES", + "parse_rotation_event", +] + diff --git a/src/lightspeed_agent/rotation/gcp.py b/src/lightspeed_agent/rotation/gcp.py new file mode 100644 index 0000000..09664de --- /dev/null +++ b/src/lightspeed_agent/rotation/gcp.py @@ -0,0 +1,62 @@ +"""Google Cloud adapters for secret rotation workflows.""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +class GoogleSecretManagerVersionWriter: + """Write secret versions using google-cloud-secret-manager.""" + + def __init__(self) -> None: + # Imported lazily so this module can be imported in environments where + # google-cloud-secret-manager is not installed. + from google.cloud import secretmanager # type: ignore[import-not-found] + + self._client = secretmanager.SecretManagerServiceClient() + + def add_secret_version(self, project_id: str, secret_name: str, secret_value: bytearray) -> str: + """Add a new secret version to Google Secret Manager. + + Args: + project_id: GCP project ID + secret_name: Name of the secret + secret_value: The secret payload as a bytearray + + Returns: + Full resource name of the created version + (e.g., "projects/my-project/secrets/my-secret/versions/123") + + Raises: + google.api_core.exceptions.GoogleAPIError: For all GCP API errors. + Exceptions are logged and re-raised to trigger Pub/Sub retry. + """ + parent = f"projects/{project_id}/secrets/{secret_name}" + + try: + response = self._client.add_secret_version( + request={"parent": parent, "payload": {"data": bytes(secret_value)}} + ) + version_name = str(response.name) + + logger.info( + "Successfully added secret version (project_id=%s, secret_name=%s, version=%s)", + project_id, + secret_name, + version_name, + ) + return version_name + + except Exception as e: + # Log the error with context before re-raising + # This helps diagnose issues while still allowing Pub/Sub to retry + logger.exception( + "Failed to add secret version to Secret Manager " + "(project_id=%s, secret_name=%s, error_type=%s)", + project_id, + secret_name, + type(e).__name__, + ) + raise diff --git a/src/lightspeed_agent/rotation/providers.py b/src/lightspeed_agent/rotation/providers.py new file mode 100644 index 0000000..d6ec9b7 --- /dev/null +++ b/src/lightspeed_agent/rotation/providers.py @@ -0,0 +1,290 @@ +"""Secret value provider implementations for rotation workflows. + +This module supports pluggable providers for different secret sources. +Each secret can be retrieved from its own API/service: +- Red Hat SSO secrets → Red Hat Identity API +- GMA secrets → Google Marketplace API +- etc. +""" + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod + +from lightspeed_agent.rotation.workflow import RotationEvent + +logger = logging.getLogger(__name__) + + +class MissingSecretValueError(RuntimeError): + """Raised when no value is available for a rotation event.""" + + +class SecretValueProvider(ABC): + """Base class for secret value providers. + + Each provider retrieves secret values from a specific source + (API, service, environment, etc.) and returns them as mutable + bytearrays for secure memory handling. + + Subclasses must implement: + - _fetch_secret_from_api(): API-specific secret retrieval logic + + Subclasses inherit: + - validate_secret_value(): Shared validation logic + - get_next_secret_value(): Template method that orchestrates fetch + validate + convert + """ + + @abstractmethod + def _fetch_secret_from_api(self, event: RotationEvent) -> str: + """Fetch secret value from the provider's API. + + This is the only method subclasses need to implement. + Each provider calls its specific API here. + + Args: + event: The rotation event containing secret metadata + + Returns: + The new secret value as a string + + Raises: + MissingSecretValueError: If the secret cannot be retrieved from the API + """ + + def validate_secret_value(self, value: str, secret_name: str) -> None: + """Validate that a secret meets minimum security requirements. + + This method is inherited by all provider subclasses. + + Args: + value: The secret value to validate + secret_name: Name of the secret (for error messages) + + Raises: + ValueError: If the secret doesn't meet minimum requirements + """ + # OAuth 2.0 client secrets should be at least 32 bytes + byte_len = len(value.encode("utf-8")) + if byte_len < 32: + raise ValueError( + f"Secret value for '{secret_name}' is too short " + f"({byte_len} bytes). Minimum: 32 bytes for OAuth client secrets." + ) + + # Basic entropy check: require at least 10 unique characters + unique_chars = len(set(value)) + if unique_chars < 10: + raise ValueError( + f"Secret value for '{secret_name}' has insufficient entropy " + f"(only {unique_chars} unique characters)." + ) + + def get_next_secret_value(self, event: RotationEvent) -> bytearray: + """Get the next secret value for rotation (template method). + + This method orchestrates the rotation workflow: + 1. Fetch secret from API (calls abstract method) + 2. Validate secret quality (calls inherited method) + 3. Convert to mutable bytearray + + Subclasses should NOT override this - override _fetch_secret_from_api() instead. + + Args: + event: The rotation event containing secret metadata + + Returns: + The new secret value as a mutable bytearray + + Raises: + MissingSecretValueError: If the secret cannot be retrieved + ValueError: If the secret doesn't meet validation requirements + """ + # 1. Fetch from API (abstract - each provider implements differently) + secret_value = self._fetch_secret_from_api(event) + + # 2. Validate (concrete - same for all providers) + self.validate_secret_value(secret_value, event.secret_name) + + # 3. Convert to bytearray (concrete - same for all providers) + # bytearray(str, encoding) encodes directly into a mutable buffer, + # avoiding an explicit intermediate bytes object at the Python level. + return bytearray(secret_value, "utf-8") + + +class RedHatSSOSecretProvider(SecretValueProvider): + """Provider for Red Hat SSO client secrets. + + Retrieves new OAuth client secrets from Red Hat Identity API. + + TODO: Implement integration with Red Hat Identity Management API + to generate/rotate OAuth client secrets for SSO. + """ + + def _fetch_secret_from_api(self, event: RotationEvent) -> str: + """Fetch next Red Hat SSO client secret from Red Hat Identity API. + + TODO: Replace this placeholder with actual Red Hat Identity API call: + 1. Authenticate with Red Hat Identity API + 2. Request new client secret for the SSO client + 3. Return the generated secret as a string + + Args: + event: Rotation event with project_id and secret_name + + Returns: + New client secret as string + + Raises: + MissingSecretValueError: If API call fails + """ + raise NotImplementedError( + "Red Hat SSO secret provider not yet implemented. " + "Implement _fetch_secret_from_api() with Red Hat Identity API integration." + ) + + +class GMASecretProvider(SecretValueProvider): + """Provider for GMA (Google Marketplace API) client secrets. + + Retrieves new OAuth client secrets from Google Marketplace Admin API. + + TODO: Implement integration with GMA API to generate/rotate + OAuth client secrets for marketplace integration. + """ + + def _fetch_secret_from_api(self, event: RotationEvent) -> str: + """Fetch next GMA client secret from Google Marketplace API. + + TODO: Replace this placeholder with actual GMA API call: + 1. Authenticate with GMA API (using GMA service credentials) + 2. Request new client secret regeneration + 3. Return the generated secret as a string + + Args: + event: Rotation event with project_id and secret_name + + Returns: + New client secret as string + + Raises: + MissingSecretValueError: If API call fails + """ + raise NotImplementedError( + "GMA secret provider not yet implemented. " + "Implement _fetch_secret_from_api() with GMA API integration." + ) + + +class SecretProviderRegistry: + """Registry that routes secrets to their appropriate providers. + + Maps secret names to provider instances. This allows each secret + to be retrieved from a different source (API, service, etc.). + + Implements the SecretValueProvider protocol from workflow.py (duck typing), + but does NOT inherit from the ABC since it doesn't fetch from an API itself - + it delegates to registered providers. + + Usage: + registry = SecretProviderRegistry() + registry.register("redhat-sso-client-secret", RedHatSSOSecretProvider()) + registry.register("gma-client-secret", GMASecretProvider()) + + # Use directly in RotationWorkflow: + workflow = RotationWorkflow(value_provider=registry, ...) + result = workflow.handle_event(...) + """ + + def __init__(self) -> None: + self._providers: dict[str, SecretValueProvider] = {} + self._default_provider: SecretValueProvider | None = None + + def register(self, secret_name: str, provider: SecretValueProvider) -> None: + """Register a provider for a specific secret. + + Args: + secret_name: The secret name (e.g., "redhat-sso-client-secret") + provider: The provider instance to use for this secret + """ + self._providers[secret_name] = provider + logger.info("Registered provider %s for secret '%s'", type(provider).__name__, secret_name) + + def set_default_provider(self, provider: SecretValueProvider) -> None: + """Set the default provider for secrets without specific providers. + + Args: + provider: The provider to use as fallback + """ + self._default_provider = provider + logger.info("Set default provider to %s", type(provider).__name__) + + def get_provider(self, secret_name: str) -> SecretValueProvider: + """Get the provider for a specific secret. + + Args: + secret_name: The secret name to look up + + Returns: + The provider instance for this secret + + Raises: + MissingSecretValueError: If no provider is registered for this secret + """ + provider = self._providers.get(secret_name) + if provider is not None: + return provider + + if self._default_provider is not None: + logger.info( + "Using default provider %s for secret '%s'", + type(self._default_provider).__name__, + secret_name, + ) + return self._default_provider + + raise MissingSecretValueError( + f"No provider registered for secret '{secret_name}' and no default provider set." + ) + + def get_next_secret_value(self, event: RotationEvent) -> bytearray: + """Get the next secret value by delegating to the appropriate provider. + + Looks up the provider for the secret name in the event and delegates + the actual retrieval to that provider. + + Args: + event: The rotation event containing secret metadata + + Returns: + The new secret value as a mutable bytearray + + Raises: + MissingSecretValueError: If no provider is registered or retrieval fails + ValueError: If the secret doesn't meet validation requirements + """ + provider = self.get_provider(event.secret_name) + return provider.get_next_secret_value(event) + + +def create_default_registry() -> SecretProviderRegistry: + """Create the default provider registry. + + Register API-based providers for each secret type. + + TODO: Implement the provider classes: + - RedHatSSOSecretProvider: Integrate with Red Hat Identity API + - GMASecretProvider: Integrate with Google Marketplace API + + Returns: + Configured SecretProviderRegistry instance + """ + registry = SecretProviderRegistry() + + # TODO: Uncomment when provider implementations are ready + # registry.register("redhat-sso-client-secret", RedHatSSOSecretProvider()) + # registry.register("gma-client-secret", GMASecretProvider()) + + logger.info("Created provider registry (providers not yet registered)") + return registry diff --git a/src/lightspeed_agent/rotation/pubsub_jwt.py b/src/lightspeed_agent/rotation/pubsub_jwt.py new file mode 100644 index 0000000..1d324d3 --- /dev/null +++ b/src/lightspeed_agent/rotation/pubsub_jwt.py @@ -0,0 +1,242 @@ +"""Pub/Sub JWT validator for rotation endpoint verification. + +This module validates OIDC tokens from Google Cloud Pub/Sub push subscriptions. +Unlike DCR's GoogleJWTValidator (which validates Google Cloud Marketplace +software_statement JWTs), this validates standard Google OAuth tokens used +to authenticate Pub/Sub push requests. + +Security: Rotation events have no application-level auth (no software_statement). +Transport security via Pub/Sub OIDC tokens is the only security layer. +""" + +import asyncio +import logging +import time +from typing import Any + +import httpx +import jwt +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from jwt.exceptions import DecodeError, ExpiredSignatureError, InvalidTokenError + +from lightspeed_agent.config import get_settings + +logger = logging.getLogger(__name__) + +# Google's OAuth certificate endpoint (v1 returns X.509 PEM format) +GOOGLE_OAUTH_CERTS_URL = "https://www.googleapis.com/oauth2/v1/certs" + +# Expected issuer for Pub/Sub OIDC tokens +GOOGLE_OAUTH_ISSUER = "https://accounts.google.com" + + +class PubSubCertificateCache: + """Cache for Google's X.509 certificates used to sign Pub/Sub OIDC tokens.""" + + def __init__(self, cert_url: str = GOOGLE_OAUTH_CERTS_URL, cache_ttl: int = 3600): + """Initialize the certificate cache. + + Args: + cert_url: URL to fetch Google's X.509 certificates. + cache_ttl: Cache time-to-live in seconds (default: 1 hour). + """ + self._cert_url = cert_url + self._cache_ttl = cache_ttl + self._certificates: dict[str, Any] = {} + self._last_fetch: float = 0 + self._lock = asyncio.Lock() + + async def get_public_key(self, kid: str) -> Any | None: + """Get the public key for a given key ID. + + Args: + kid: Key ID from JWT header. + + Returns: + Public key or None if not found. + """ + await self._ensure_fresh() + return self._certificates.get(kid) + + async def _ensure_fresh(self) -> None: + """Ensure the cache is fresh, fetching new certificates if needed.""" + current_time = time.monotonic() + if current_time - self._last_fetch < self._cache_ttl and self._certificates: + return + + async with self._lock: + # Double-check after acquiring lock + if current_time - self._last_fetch < self._cache_ttl and self._certificates: + return + + await self._fetch_certificates() + + async def _fetch_certificates(self) -> None: + """Fetch certificates from Google's endpoint.""" + try: + async with httpx.AsyncClient() as client: + response = await client.get(self._cert_url, timeout=10.0) + response.raise_for_status() + certs_data = response.json() + + self._certificates = {} + for kid, cert_pem in certs_data.items(): + try: + # Parse X.509 certificate and extract public key + cert = x509.load_pem_x509_certificate(cert_pem.encode()) + public_key = cert.public_key() + # Convert to PEM format for jose library + pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + self._certificates[kid] = pem.decode() + except Exception as e: + logger.warning("Failed to parse certificate for kid %s: %s", kid, e) + + self._last_fetch = time.monotonic() + logger.info("Fetched %d certificates from Google OAuth", len(self._certificates)) + + except httpx.HTTPError as e: + logger.error("Failed to fetch Google OAuth certificates: %s", e) + if not self._certificates: + raise RuntimeError(f"Failed to fetch certificates: {e}") from e + + async def force_refresh(self) -> None: + """Force a refresh of the certificate cache.""" + self._last_fetch = 0 + await self._ensure_fresh() + + +class PubSubJWTValidator: + """Validator for Google Pub/Sub OIDC push tokens.""" + + def __init__(self, expected_audience: str | None = None): + """Initialize the validator. + + Args: + expected_audience: Expected audience (agent provider URL). + Uses settings.agent_provider_url if not provided. + """ + self._settings = get_settings() + self._expected_audience = expected_audience or self._settings.agent_provider_url + self._cert_cache = PubSubCertificateCache() + + async def _decode_without_verification(self, token: str) -> dict[str, Any] | None: + """Decode a token JWT without signature or issuer verification. + + Used in development mode (SKIP_JWT_VALIDATION=true) to allow testing + with any OIDC token, not just Google's OAuth tokens. + + Returns: + Claims dict on success, None on decode error. + """ + logger.warning("Skipping Pub/Sub JWT signature/issuer validation - development mode") + try: + claims = jwt.decode( + token, + options={ + "verify_signature": False, + "verify_exp": False, + "verify_aud": False, + }, + algorithms=["RS256"], + ) + except DecodeError as e: + logger.warning("Failed to decode JWT (dev mode): %s", e) + return None + + logger.info( + "Dev mode: accepted Pub/Sub push token (iss: %s, aud: %s)", + claims.get("iss"), + claims.get("aud"), + ) + return claims + + async def validate_push_token(self, token: str) -> dict[str, Any] | None: + """Validate a Pub/Sub push token JWT from Google. + + Args: + token: The JWT string to validate. + + Returns: + Claims dict on success, None on failure (logs warnings). + """ + if self._settings.skip_jwt_validation: + return await self._decode_without_verification(token) + + try: + # Decode header to get key ID + unverified_header = jwt.get_unverified_header(token) + except DecodeError as e: + logger.warning("Failed to decode JWT header: %s", e) + return None + + kid = unverified_header.get("kid") + if not kid: + logger.warning("JWT header missing 'kid' claim") + return None + + # Verify algorithm is RS256 + alg = unverified_header.get("alg") + if alg != "RS256": + logger.warning("Unsupported algorithm: %s. Expected RS256", alg) + return None + + # Get the signing key from Google's certificates + public_key = await self._cert_cache.get_public_key(kid) + if not public_key: + # Key not found, try refreshing the cache + await self._cert_cache.force_refresh() + public_key = await self._cert_cache.get_public_key(kid) + + if not public_key: + logger.warning("Key with ID '%s' not found in Google OAuth certificates", kid) + return None + + # Validate and decode the JWT + try: + claims = jwt.decode( + token, + public_key, + algorithms=["RS256"], + audience=self._expected_audience, + issuer=GOOGLE_OAUTH_ISSUER, + options={ + "verify_aud": True, + "verify_exp": True, + "verify_iat": True, + "verify_iss": True, + "require": ["iss", "iat", "exp", "aud", "sub"], + }, + ) + except ExpiredSignatureError: + logger.warning("Pub/Sub push token has expired") + return None + except InvalidTokenError as e: + logger.warning("JWT validation failed: %s", e) + return None + + logger.info( + "Validated Pub/Sub push token (sub: %s, email: %s)", + claims.get("sub"), + claims.get("email"), + ) + return claims + + +# Global validator instance +_pubsub_jwt_validator: PubSubJWTValidator | None = None + + +def get_pubsub_jwt_validator() -> PubSubJWTValidator: + """Get the global Pub/Sub JWT validator instance. + + Returns: + PubSubJWTValidator instance. + """ + global _pubsub_jwt_validator + if _pubsub_jwt_validator is None: + _pubsub_jwt_validator = PubSubJWTValidator() + return _pubsub_jwt_validator diff --git a/src/lightspeed_agent/rotation/router.py b/src/lightspeed_agent/rotation/router.py new file mode 100644 index 0000000..dc2df11 --- /dev/null +++ b/src/lightspeed_agent/rotation/router.py @@ -0,0 +1,125 @@ +"""Rotation endpoint router for Secret Manager Pub/Sub events.""" + +import logging + +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import JSONResponse + +from lightspeed_agent.rotation.gcp import GoogleSecretManagerVersionWriter +from lightspeed_agent.rotation.providers import create_default_registry +from lightspeed_agent.rotation.pubsub_jwt import get_pubsub_jwt_validator +from lightspeed_agent.rotation.workflow import RotationWorkflow + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["Secret Rotation"]) + +# Module-level workflow singleton (reuses Secret Manager gRPC client across requests) +_workflow = RotationWorkflow( + secret_writer=GoogleSecretManagerVersionWriter(), + value_provider=create_default_registry(), +) + + +@router.post("/rotation") +async def rotation_handler(request: Request) -> JSONResponse: + """Handle Secret Manager rotation events from Pub/Sub push. + + Request format (Pub/Sub push): + { + "message": { + "messageId": "abc123", + "data": "", # Not used for rotation + "attributes": { + "eventType": "SECRET_ROTATE", + "secretId": "projects/my-project/secrets/redhat-sso-client-secret" + } + }, + "subscription": "projects/my-project/subscriptions/secret-rotation-trigger-sub" + } + + Returns: + 200: Success or ignored events (Pub/Sub acks) + 401: Invalid Pub/Sub OIDC token + 400: Invalid request format + 500: Workflow failures (Pub/Sub retries) + """ + # Parse request body + try: + body = await request.json() + except Exception as e: + logger.warning("Invalid JSON body in rotation request: %s", e) + raise HTTPException(status_code=400, detail="Invalid JSON body") from e + + # Extract Pub/Sub message + message = body.get("message", {}) + if not message: + logger.warning("Missing 'message' field in rotation request") + raise HTTPException(status_code=400, detail="Missing Pub/Sub message") + + message_id = message.get("messageId", "unknown") + attributes = message.get("attributes", {}) + secret_id = attributes.get("secretId", "") + + logger.info( + "Processing Secret Manager rotation event (event_type=rotation_event_received, " + "message_id=%s, secret_id=%s)", + message_id, + secret_id, + ) + + # Verify Pub/Sub OIDC token + auth_header = request.headers.get("authorization", "") + token = auth_header.removeprefix("Bearer ").strip() + + if not token: + logger.warning( + "Missing Authorization header (event_type=rotation_auth_failed, message_id=%s)", + message_id, + ) + raise HTTPException(status_code=401, detail="Missing Authorization header") + + validator = get_pubsub_jwt_validator() + claims = await validator.validate_push_token(token) + + if claims is None: + logger.warning( + "Invalid Pub/Sub OIDC token (event_type=rotation_auth_failed, message_id=%s)", + message_id, + ) + raise HTTPException(status_code=401, detail="Invalid Pub/Sub token") + + # Call rotation workflow (synchronous) + try: + result = _workflow.handle_event(attributes=attributes) + except Exception as e: + logger.exception( + "Secret rotation failed (event_type=secret_rotation_failed, " + "message_id=%s, secret_id=%s, error=%s)", + message_id, + secret_id, + type(e).__name__, + ) + raise HTTPException( + status_code=500, + detail=f"Rotation workflow failed: {type(e).__name__}", + ) from e + + # Log completion + logger.info( + "Secret rotation completed (event_type=secret_rotation_completed, " + "secret_name=%s, status=%s, version=%s, reason=%s, message_id=%s)", + result.secret_name or "unknown", + result.status, + result.secret_version or "none", + result.reason, + message_id, + ) + + return JSONResponse( + content={ + "status": result.status, + "version": result.secret_version, + "secret_name": result.secret_name, + } + ) diff --git a/src/lightspeed_agent/rotation/workflow.py b/src/lightspeed_agent/rotation/workflow.py new file mode 100644 index 0000000..f01ed6a --- /dev/null +++ b/src/lightspeed_agent/rotation/workflow.py @@ -0,0 +1,134 @@ +"""Secret rotation workflow primitives. + +This module provides a small, testable workflow that: +1) parses Secret Manager Pub/Sub event attributes +2) filters to supported secrets +3) requests a new secret value from a provider +4) stores a new Secret Manager version + +Actual secret generation is delegated to provider implementations. +""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Protocol + +SUPPORTED_SECRET_NAMES = frozenset( + { + "redhat-sso-client-secret", + "gma-client-secret", + } +) + + +@dataclass(frozen=True) +class RotationEvent: + """Parsed Secret Manager rotation event.""" + + project_id: str + secret_name: str + + +@dataclass(frozen=True) +class RotationResult: + """Outcome returned by the workflow handler.""" + + status: str + reason: str + secret_name: str | None = None + secret_version: str | None = None + + +class SecretManagerVersionWriter(Protocol): + """Protocol for writing a new Secret Manager version.""" + + def add_secret_version(self, project_id: str, secret_name: str, secret_value: bytearray) -> str: + """Add a new secret version and return its full resource name.""" + + +class SecretValueProvider(Protocol): + """Protocol for generating/acquiring the next secret value.""" + + def get_next_secret_value(self, event: RotationEvent) -> bytearray: + """Return the next value for the rotated secret.""" + + +def parse_rotation_event(attributes: Mapping[str, str]) -> RotationEvent | None: + """Parse Secret Manager event attributes into a RotationEvent. + + Expected Secret Manager attributes include: + - eventType: e.g. SECRET_ROTATE + - secretId: projects//secrets/ + """ + event_type = attributes.get("eventType", "") + if event_type != "SECRET_ROTATE": + return None + + secret_id = attributes.get("secretId", "") + parts = secret_id.split("/") + if len(parts) != 4 or parts[0] != "projects" or parts[2] != "secrets": + return None + + project_id = parts[1] + secret_name = parts[3] + if not project_id or not secret_name: + return None + + return RotationEvent(project_id=project_id, secret_name=secret_name) + + +class RotationWorkflow: + """Workflow that handles Secret Manager rotation notifications.""" + + def __init__( + self, + *, + secret_writer: SecretManagerVersionWriter, + value_provider: SecretValueProvider, + ) -> None: + self._secret_writer = secret_writer + self._value_provider = value_provider + + def handle_event( + self, + *, + attributes: Mapping[str, str], + ) -> RotationResult: + """Handle one rotation event notification. + + Returns a RotationResult for all outcomes, including ignored/unsupported + events, so push and pull consumers can ack cleanly without catching. + Exceptions from the secret writer propagate so Pub/Sub retries on + transient GCP failures. + """ + event = parse_rotation_event(attributes) + if event is None: + return RotationResult(status="ignored", reason="not_a_secret_rotate_event") + + if event.secret_name not in SUPPORTED_SECRET_NAMES: + return RotationResult( + status="ignored", + reason="unsupported_secret", + secret_name=event.secret_name, + ) + + next_value = self._value_provider.get_next_secret_value(event) + try: + version_name = self._secret_writer.add_secret_version( + event.project_id, + event.secret_name, + next_value, + ) + finally: + # Zero the mutable buffer regardless of success or failure. + for i in range(len(next_value)): + next_value[i] = 0 + + return RotationResult( + status="rotated", + reason="secret_version_added", + secret_name=event.secret_name, + secret_version=version_name, + ) diff --git a/tests/test_pubsub_jwt.py b/tests/test_pubsub_jwt.py new file mode 100644 index 0000000..7211c7b --- /dev/null +++ b/tests/test_pubsub_jwt.py @@ -0,0 +1,281 @@ +"""Tests for Pub/Sub JWT validator for rotation endpoint.""" + +import time +from unittest.mock import patch + +import pytest + +from lightspeed_agent.rotation.pubsub_jwt import ( + PubSubCertificateCache, + PubSubJWTValidator, + get_pubsub_jwt_validator, +) + + +class TestPubSubCertificateCache: + """Tests for PubSubCertificateCache (Google OAuth certificate caching).""" + + @pytest.mark.asyncio + async def test_certificate_cache_fetches_and_stores(self): + """Test that the cache fetches and stores certificates.""" + # Mock the certificate parsing since we're testing the caching logic + fake_public_key_pem = ( + "-----BEGIN PUBLIC KEY-----\nMOCK_PUBLIC_KEY\n-----END PUBLIC KEY-----" + ) + + cache = PubSubCertificateCache() + + # Mock the _fetch_certificates method to directly populate the cache + async def mock_fetch(): + cache._certificates = { + "key1": fake_public_key_pem, + "key2": fake_public_key_pem, + } + cache._last_fetch = time.monotonic() + + with patch.object(cache, "_fetch_certificates", side_effect=mock_fetch): + # First call should fetch + public_key1 = await cache.get_public_key("key1") + assert public_key1 == fake_public_key_pem + assert cache._fetch_certificates.call_count == 1 + + # Second call should use cache + public_key2 = await cache.get_public_key("key2") + assert public_key2 == fake_public_key_pem + assert cache._fetch_certificates.call_count == 1 # No additional fetch + + +class TestPubSubJWTValidator: + """Tests for PubSubJWTValidator (Pub/Sub OIDC token validation).""" + + @pytest.mark.asyncio + async def test_validator_accepts_valid_token(self): + """Test that a valid token with correct issuer and audience is accepted.""" + from lightspeed_agent.config import Settings + + # Create settings with skip_jwt_validation=False to test actual validation + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + # Mock get_settings to return prod_settings + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + # Create a mock token with valid claims + mock_claims = { + "iss": "https://accounts.google.com", + "aud": "https://localhost:8000", + "sub": "service-account@example.iam.gserviceaccount.com", + "email": "service-account@example.iam.gserviceaccount.com", + "iat": int(time.time()), + "exp": int(time.time()) + 3600, + } + + # Create a mock token string (we'll mock jwt.decode to return our claims) + mock_token = "mock.jwt.token" + mock_header = {"kid": "test-key-id", "alg": "RS256"} + fake_public_key = "-----BEGIN PUBLIC KEY-----\nMOCK_KEY\n-----END PUBLIC KEY-----" + + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + + with ( + patch("jwt.get_unverified_header", return_value=mock_header), + patch("jwt.decode", return_value=mock_claims), + patch.object( + validator._cert_cache, "get_public_key", return_value=fake_public_key + ), + ): + claims = await validator.validate_push_token(mock_token) + + assert claims is not None + assert claims["iss"] == "https://accounts.google.com" + assert claims["aud"] == "https://localhost:8000" + assert claims["sub"] == "service-account@example.iam.gserviceaccount.com" + + @pytest.mark.asyncio + async def test_validator_rejects_invalid_issuer(self): + """Test that a token with wrong issuer returns None.""" + from jwt.exceptions import InvalidTokenError + + from lightspeed_agent.config import Settings + + # Create settings with skip_jwt_validation=False to test actual validation + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + # Mock get_settings to return prod_settings + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + mock_token = "mock.jwt.token" + mock_header = {"kid": "test-key-id", "alg": "RS256"} + fake_public_key = "-----BEGIN PUBLIC KEY-----\nMOCK_KEY\n-----END PUBLIC KEY-----" + + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + + with ( + patch("jwt.get_unverified_header", return_value=mock_header), + # PyJWT will raise InvalidTokenError when issuer validation fails + patch("jwt.decode", side_effect=InvalidTokenError("Invalid issuer")), + patch.object( + validator._cert_cache, "get_public_key", return_value=fake_public_key + ), + ): + claims = await validator.validate_push_token(mock_token) + + # Should return None due to invalid issuer + assert claims is None + + @pytest.mark.asyncio + async def test_validator_skips_verification_in_dev_mode(self): + """Test that dev mode (SKIP_JWT_VALIDATION=true) bypasses verification.""" + from lightspeed_agent.config import Settings + + # Create settings with skip_jwt_validation=True + dev_settings = Settings( + skip_jwt_validation=True, + agent_provider_url="https://localhost:8000", + debug=True, + ) + + # Mock get_settings to return dev_settings + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=dev_settings): + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + + # Create a token with any issuer (should be accepted in dev mode) + mock_claims = { + "iss": "https://any-issuer.com", + "aud": "https://any-audience.com", + "sub": "test-user", + } + + mock_token = "mock.jwt.token" + + with patch("jwt.decode", return_value=mock_claims): + claims = await validator.validate_push_token(mock_token) + + # In dev mode, should accept any token + assert claims is not None + assert claims["iss"] == "https://any-issuer.com" + + @pytest.mark.asyncio + async def test_validator_rejects_missing_kid(self): + """Test that a token with missing kid in header returns None.""" + from lightspeed_agent.config import Settings + + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + mock_token = "mock.jwt.token" + mock_header = {"alg": "RS256"} # No kid + + with patch("jwt.get_unverified_header", return_value=mock_header): + claims = await validator.validate_push_token(mock_token) + assert claims is None + + @pytest.mark.asyncio + async def test_validator_rejects_wrong_algorithm(self): + """Test that a token with non-RS256 algorithm returns None.""" + from lightspeed_agent.config import Settings + + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + mock_token = "mock.jwt.token" + mock_header = {"kid": "test-key", "alg": "HS256"} # Wrong algorithm + + with patch("jwt.get_unverified_header", return_value=mock_header): + claims = await validator.validate_push_token(mock_token) + assert claims is None + + @pytest.mark.asyncio + async def test_validator_rejects_expired_token(self): + """Test that an expired token returns None.""" + from jwt.exceptions import ExpiredSignatureError + + from lightspeed_agent.config import Settings + + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + mock_token = "mock.jwt.token" + mock_header = {"kid": "test-key", "alg": "RS256"} + fake_public_key = "-----BEGIN PUBLIC KEY-----\nMOCK_KEY\n-----END PUBLIC KEY-----" + + with ( + patch("jwt.get_unverified_header", return_value=mock_header), + patch("jwt.decode", side_effect=ExpiredSignatureError), + patch.object( + validator._cert_cache, "get_public_key", return_value=fake_public_key + ), + ): + claims = await validator.validate_push_token(mock_token) + assert claims is None + + @pytest.mark.asyncio + async def test_validator_refreshes_cache_on_missing_key(self): + """Test that validator refreshes cert cache when key is not found.""" + from lightspeed_agent.config import Settings + + prod_settings = Settings( + skip_jwt_validation=False, + agent_provider_url="https://localhost:8000", + debug=False, + ) + + with patch("lightspeed_agent.rotation.pubsub_jwt.get_settings", return_value=prod_settings): + validator = PubSubJWTValidator(expected_audience="https://localhost:8000") + mock_token = "mock.jwt.token" + mock_header = {"kid": "test-key", "alg": "RS256"} + mock_claims = { + "iss": "https://accounts.google.com", + "aud": "https://localhost:8000", + "sub": "test@example.com", + "iat": int(time.time()), + "exp": int(time.time()) + 3600, + } + fake_public_key = "-----BEGIN PUBLIC KEY-----\nMOCK_KEY\n-----END PUBLIC KEY-----" + + with ( + patch("jwt.get_unverified_header", return_value=mock_header), + patch("jwt.decode", return_value=mock_claims), + # First call returns None, second call (after refresh) returns key + patch.object( + validator._cert_cache, + "get_public_key", + side_effect=[None, fake_public_key], + ), + patch.object(validator._cert_cache, "force_refresh") as mock_refresh, + ): + claims = await validator.validate_push_token(mock_token) + + # Should have called force_refresh when key was not found + mock_refresh.assert_called_once() + # Should succeed after refresh + assert claims is not None + + @pytest.mark.asyncio + async def test_get_pubsub_jwt_validator_singleton(self): + """Test that get_pubsub_jwt_validator returns singleton instance.""" + validator1 = get_pubsub_jwt_validator() + validator2 = get_pubsub_jwt_validator() + + # Should be the same instance + assert validator1 is validator2 diff --git a/tests/test_rotation_providers.py b/tests/test_rotation_providers.py new file mode 100644 index 0000000..652cdc7 --- /dev/null +++ b/tests/test_rotation_providers.py @@ -0,0 +1,70 @@ +"""Tests for rotation secret value validation and template method pattern.""" + +from __future__ import annotations + +import pytest + +from lightspeed_agent.rotation.providers import SecretValueProvider +from lightspeed_agent.rotation.workflow import RotationEvent + + +class TestSecretValueProvider: + """Test the abstract base class validation logic.""" + + class FakeProvider(SecretValueProvider): + """Fake provider for testing inherited methods.""" + + def __init__(self, secret_to_return: str) -> None: + self.secret_to_return = secret_to_return + + def _fetch_secret_from_api(self, event: RotationEvent) -> str: + """Return the pre-configured secret.""" + return self.secret_to_return + + def test_validation_rejects_too_short(self) -> None: + """Secret validation should reject secrets shorter than 32 bytes.""" + short_secret = "short-val!" # Only 10 bytes + provider = self.FakeProvider(short_secret) + + with pytest.raises(ValueError, match="too short.*10 bytes.*Minimum: 32 bytes"): + provider.validate_secret_value(short_secret, "test-secret") + + def test_validation_rejects_low_entropy(self) -> None: + """Secret validation should reject secrets with low entropy.""" + # 40-byte secret with only 2 unique characters (low entropy) + low_entropy_secret = "a" * 39 + "b" + provider = self.FakeProvider(low_entropy_secret) + + with pytest.raises(ValueError, match="insufficient entropy.*only 2 unique characters"): + provider.validate_secret_value(low_entropy_secret, "test-secret") + + def test_validation_accepts_valid_secret(self) -> None: + """Secret validation should accept secrets meeting all requirements.""" + # Valid 32+ byte secret with 10+ unique characters + valid_secret = "abcdefghij1234567890KLMNOPQRSTUV" + provider = self.FakeProvider(valid_secret) + + # Should not raise + provider.validate_secret_value(valid_secret, "test-secret") + + def test_template_method_orchestrates_fetch_validate_convert(self) -> None: + """Template method should fetch, validate, and convert to bytearray.""" + valid_secret = "abcdefghij1234567890KLMNOPQRSTUV" + provider = self.FakeProvider(valid_secret) + + event = RotationEvent(project_id="test-proj", secret_name="test-secret") + result = provider.get_next_secret_value(event) + + # Should return bytearray of the secret + assert isinstance(result, bytearray) + assert bytes(result) == valid_secret.encode("utf-8") + + def test_template_method_rejects_invalid_secret(self) -> None: + """Template method should validate and reject invalid secrets.""" + invalid_secret = "tooshort" # Only 8 bytes + provider = self.FakeProvider(invalid_secret) + + event = RotationEvent(project_id="test-proj", secret_name="test-secret") + + with pytest.raises(ValueError, match="too short"): + provider.get_next_secret_value(event) diff --git a/tests/test_rotation_router.py b/tests/test_rotation_router.py new file mode 100644 index 0000000..6ffe607 --- /dev/null +++ b/tests/test_rotation_router.py @@ -0,0 +1,252 @@ +"""Tests for rotation endpoint router.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from lightspeed_agent.rotation.workflow import RotationResult + + +@pytest.fixture +def mock_validator() -> AsyncMock: + """Mock Pub/Sub JWT validator that always returns valid claims.""" + validator = AsyncMock() + validator.validate_push_token.return_value = { + "iss": "https://accounts.google.com", + "aud": "https://test-agent.example.com", + } + return validator + + +@pytest.fixture +def mock_workflow() -> MagicMock: + """Mock rotation workflow that returns success.""" + workflow = MagicMock() + workflow.handle_event.return_value = RotationResult( + status="rotated", + reason="secret_version_added", + secret_name="redhat-sso-client-secret", + secret_version="projects/test-proj/secrets/redhat-sso-client-secret/versions/42", + ) + return workflow + + +def test_rotation_endpoint_success( + mock_validator: AsyncMock, + mock_workflow: MagicMock, +) -> None: + """Should handle valid rotation event and return 200.""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + # Pub/Sub push message format + request_body = { + "message": { + "messageId": "msg-123", + "attributes": { + "eventType": "SECRET_ROTATE", + "secretId": "projects/test-proj/secrets/redhat-sso-client-secret", + }, + }, + "subscription": "projects/test-proj/subscriptions/rotation-sub", + } + + with patch( + "lightspeed_agent.rotation.router.get_pubsub_jwt_validator", + return_value=mock_validator, + ), patch("lightspeed_agent.rotation.router._workflow", mock_workflow): + response = client.post( + "/rotation", + json=request_body, + headers={"Authorization": "Bearer fake-token"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "rotated" + assert "versions/42" in data["version"] + + # Verify workflow was called with correct attributes + mock_workflow.handle_event.assert_called_once() + call_kwargs = mock_workflow.handle_event.call_args.kwargs + assert call_kwargs["attributes"]["eventType"] == "SECRET_ROTATE" + + +def test_rotation_endpoint_missing_auth_header(mock_workflow: MagicMock) -> None: + """Should return 401 when Authorization header is missing.""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + request_body = { + "message": { + "messageId": "msg-123", + "attributes": {"eventType": "SECRET_ROTATE"}, + } + } + + response = client.post("/rotation", json=request_body) + + assert response.status_code == 401 + assert "Missing Authorization header" in response.json()["detail"] + + +def test_rotation_endpoint_invalid_token(mock_workflow: MagicMock) -> None: + """Should return 401 when token validation fails.""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + request_body = { + "message": { + "messageId": "msg-123", + "attributes": {"eventType": "SECRET_ROTATE"}, + } + } + + # Mock validator that rejects token + invalid_validator = AsyncMock() + invalid_validator.validate_push_token.return_value = None + + with patch( + "lightspeed_agent.rotation.router.get_pubsub_jwt_validator", + return_value=invalid_validator, + ): + response = client.post( + "/rotation", + json=request_body, + headers={"Authorization": "Bearer invalid-token"}, + ) + + assert response.status_code == 401 + assert "Invalid Pub/Sub token" in response.json()["detail"] + + +def test_rotation_endpoint_missing_message(mock_validator: AsyncMock) -> None: + """Should return 400 when message field is missing.""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + request_body = {"subscription": "projects/test/subscriptions/sub"} + + with patch( + "lightspeed_agent.rotation.router.get_pubsub_jwt_validator", + return_value=mock_validator, + ): + response = client.post( + "/rotation", + json=request_body, + headers={"Authorization": "Bearer fake-token"}, + ) + + assert response.status_code == 400 + assert "Missing Pub/Sub message" in response.json()["detail"] + + +def test_rotation_endpoint_workflow_failure( + mock_validator: AsyncMock, + mock_workflow: MagicMock, +) -> None: + """Should return 500 when workflow raises exception.""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + request_body = { + "message": { + "messageId": "msg-123", + "attributes": {"eventType": "SECRET_ROTATE"}, + } + } + + # Mock workflow that raises exception + failing_workflow = MagicMock() + failing_workflow.handle_event.side_effect = ValueError("Test error") + + with patch( + "lightspeed_agent.rotation.router.get_pubsub_jwt_validator", + return_value=mock_validator, + ), patch("lightspeed_agent.rotation.router._workflow", failing_workflow): + response = client.post( + "/rotation", + json=request_body, + headers={"Authorization": "Bearer fake-token"}, + ) + + assert response.status_code == 500 + assert "Rotation workflow failed" in response.json()["detail"] + + +def test_rotation_endpoint_ignored_event( + mock_validator: AsyncMock, + mock_workflow: MagicMock, +) -> None: + """Should return 200 for ignored events (unsupported secret).""" + from lightspeed_agent.rotation.router import router + + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + request_body = { + "message": { + "messageId": "msg-123", + "attributes": { + "eventType": "SECRET_ROTATE", + "secretId": "projects/test/secrets/unsupported-secret", + }, + } + } + + # Mock workflow that returns ignored status + ignored_workflow = MagicMock() + ignored_workflow.handle_event.return_value = RotationResult( + status="ignored", + reason="unsupported_secret", + secret_name="unsupported-secret", + ) + + with patch( + "lightspeed_agent.rotation.router.get_pubsub_jwt_validator", + return_value=mock_validator, + ), patch("lightspeed_agent.rotation.router._workflow", ignored_workflow): + response = client.post( + "/rotation", + json=request_body, + headers={"Authorization": "Bearer fake-token"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ignored" + + +def test_rotation_router_integrated_in_marketplace_app() -> None: + """Integration test: rotation router should be included in marketplace app. + + This test will FAIL until Task 5 integrates the rotation router into the + marketplace app. + """ + from lightspeed_agent.marketplace.app import create_app + + app = create_app() + + # Verify that /rotation endpoint is registered + endpoints = [route.path for route in app.routes] + assert "/rotation" in endpoints, f"Expected /rotation endpoint, found: {endpoints}" diff --git a/tests/test_rotation_workflow.py b/tests/test_rotation_workflow.py new file mode 100644 index 0000000..328c398 --- /dev/null +++ b/tests/test_rotation_workflow.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +from dataclasses import dataclass +from unittest.mock import AsyncMock, patch + +import pytest + +from lightspeed_agent.rotation.pubsub_jwt import PubSubCertificateCache +from lightspeed_agent.rotation.workflow import RotationEvent, RotationWorkflow, parse_rotation_event + + +@dataclass +class FakeSecretWriter: + calls: list[tuple[str, str, bytes]] + + def add_secret_version(self, project_id: str, secret_name: str, secret_value: bytearray) -> str: + self.calls.append((project_id, secret_name, bytes(secret_value))) + return f"projects/{project_id}/secrets/{secret_name}/versions/2" + + +@dataclass +class FakeProvider: + value: bytes = b"next-secret-value" + + def get_next_secret_value(self, event: RotationEvent) -> bytearray: + assert event.secret_name in { + "redhat-sso-client-secret", + "gma-client-secret", + } + return bytearray(self.value) + + +def test_parse_rotation_event_returns_none_for_non_rotation() -> None: + result = parse_rotation_event( + {"eventType": "SECRET_UPDATE", "secretId": "projects/p/secrets/s"} + ) + assert result is None + + +def test_parse_rotation_event_parses_secret_rotate() -> None: + result = parse_rotation_event( + { + "eventType": "SECRET_ROTATE", + "secretId": "projects/test-project/secrets/redhat-sso-client-secret", + } + ) + assert result is not None + assert result.project_id == "test-project" + assert result.secret_name == "redhat-sso-client-secret" + + +def test_workflow_ignores_unsupported_secret() -> None: + writer = FakeSecretWriter(calls=[]) + provider = FakeProvider() + workflow = RotationWorkflow(secret_writer=writer, value_provider=provider) + + result = workflow.handle_event( + attributes={ + "eventType": "SECRET_ROTATE", + "secretId": "projects/test-project/secrets/unsupported-secret", + } + ) + + assert result.status == "ignored" + assert result.reason == "unsupported_secret" + assert writer.calls == [] + + +def test_workflow_rotates_supported_secret() -> None: + writer = FakeSecretWriter(calls=[]) + provider = FakeProvider(value=b"brand-new-value") + workflow = RotationWorkflow(secret_writer=writer, value_provider=provider) + + result = workflow.handle_event( + attributes={ + "eventType": "SECRET_ROTATE", + "secretId": "projects/test-project/secrets/gma-client-secret", + }, + ) + + assert result.status == "rotated" + assert result.secret_name == "gma-client-secret" + assert result.secret_version == "projects/test-project/secrets/gma-client-secret/versions/2" + assert writer.calls == [("test-project", "gma-client-secret", b"brand-new-value")] + + +@pytest.mark.asyncio +async def test_certificate_cache_parses_x509_pem_format() -> None: + """Should parse X.509 PEM certificates returned by Google OAuth v1 endpoint.""" + cache = PubSubCertificateCache() + + # Valid X.509 PEM certificate format (self-signed test cert) + cert_data = { + "test-key-id": ( + "-----BEGIN CERTIFICATE-----\n" + "MIICtDCCAZygAwIBAgIUDyF5NVWvQdW92NQyZ+8muSDzQWUwDQYJKoZIhvcNAQEL\n" + "BQAwFDESMBAGA1UEAwwJdGVzdC1jZXJ0MB4XDTI2MDQyNjExMTgxNVoXDTI3MDQy\n" + "NjExMTgxNVowFDESMBAGA1UEAwwJdGVzdC1jZXJ0MIIBIjANBgkqhkiG9w0BAQEF\n" + "AAOCAQ8AMIIBCgKCAQEAz/lRq+GRzANbrFdg0HmYO1589h3C2gsVZxwdOFNcQyPI\n" + "8fyBuhh5X967NjzeXtiIphp5H2+jFYwJPSij054862htGwGGt6qzH8Qrsqf36Lnk\n" + "sFXQNVnyMhqqZpZA3agNtK+t7py1KX7XgrtLXPs66dlrpzH66d4rlXFE6T9/yQdD\n" + "mQeKicSwk90iQa7tvBAVfm7eK5Zm56HEy2tzP76JZswLLPUBOkSqPIxhTKtVpiiU\n" + "30xbzYyJhF59BKqVvPg6FJXeUauaRvfL9IamcAftA06CYIIl+Pjts7cJTwI0qbCL\n" + "cCMtYy3lyIEc/Tyk/Z1wjevLkYmC56eJsCj88c+QeQIDAQABMA0GCSqGSIb3DQEB\n" + "CwUAA4IBAQBZZOo9jZvLEdubB1gXlGYcOevkOJ92XwlocpQIxMg9oitkmqANrY5R\n" + "6zFDCBT+9C6+STkWiD3WW30wT3/XandBO8iTcckfr/iyzEW6NlkXY7WEPBy1WR0b\n" + "HrREUh/8XuqjGSDD8+bgxIqyi7U5LQu4S3ZOUMV+/X5jaLDr5uWDGFDK998ID2gT\n" + "pqWPheZqdA/IXXQ6s7YXWkUGV77sTC3aqnT8IcfFKnzuiCavFzkD5F9+M0E3NDyl\n" + "BKeYDZIq3P2YEbksCv3GjOfmLYvHALR0h9YrGKTuToOWdZc7t+Z2cqK2cSXOFd3b\n" + "fG0y8U7CY0+Q2Qs2CoAyxbHc2kEyUjTy\n" + "-----END CERTIFICATE-----\n" + ), + } + + async def mock_get(*args, **kwargs): + response = AsyncMock() + response.json = lambda: cert_data + response.raise_for_status = lambda: None + return response + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.get = mock_get + mock_client_class.return_value.__aenter__.return_value = mock_client + + await cache._fetch_certificates() + + # Verify the certificate was parsed and public key was extracted + assert len(cache._certificates) == 1 + assert "test-key-id" in cache._certificates + # Public key should be PEM-encoded string starting with BEGIN PUBLIC KEY + public_key_pem = cache._certificates["test-key-id"] + assert isinstance(public_key_pem, str) + assert public_key_pem.startswith("-----BEGIN PUBLIC KEY-----") + assert public_key_pem.endswith("-----END PUBLIC KEY-----\n") + + +@pytest.mark.asyncio +async def test_certificate_cache_handles_malformed_cert() -> None: + """Should skip malformed certificates and continue parsing others.""" + cache = PubSubCertificateCache() + + cert_data = { + "bad-cert": "not-a-valid-pem-cert", + "good-cert": ( + "-----BEGIN CERTIFICATE-----\n" + "MIICtDCCAZygAwIBAgIUDyF5NVWvQdW92NQyZ+8muSDzQWUwDQYJKoZIhvcNAQEL\n" + "BQAwFDESMBAGA1UEAwwJdGVzdC1jZXJ0MB4XDTI2MDQyNjExMTgxNVoXDTI3MDQy\n" + "NjExMTgxNVowFDESMBAGA1UEAwwJdGVzdC1jZXJ0MIIBIjANBgkqhkiG9w0BAQEF\n" + "AAOCAQ8AMIIBCgKCAQEAz/lRq+GRzANbrFdg0HmYO1589h3C2gsVZxwdOFNcQyPI\n" + "8fyBuhh5X967NjzeXtiIphp5H2+jFYwJPSij054862htGwGGt6qzH8Qrsqf36Lnk\n" + "sFXQNVnyMhqqZpZA3agNtK+t7py1KX7XgrtLXPs66dlrpzH66d4rlXFE6T9/yQdD\n" + "mQeKicSwk90iQa7tvBAVfm7eK5Zm56HEy2tzP76JZswLLPUBOkSqPIxhTKtVpiiU\n" + "30xbzYyJhF59BKqVvPg6FJXeUauaRvfL9IamcAftA06CYIIl+Pjts7cJTwI0qbCL\n" + "cCMtYy3lyIEc/Tyk/Z1wjevLkYmC56eJsCj88c+QeQIDAQABMA0GCSqGSIb3DQEB\n" + "CwUAA4IBAQBZZOo9jZvLEdubB1gXlGYcOevkOJ92XwlocpQIxMg9oitkmqANrY5R\n" + "6zFDCBT+9C6+STkWiD3WW30wT3/XandBO8iTcckfr/iyzEW6NlkXY7WEPBy1WR0b\n" + "HrREUh/8XuqjGSDD8+bgxIqyi7U5LQu4S3ZOUMV+/X5jaLDr5uWDGFDK998ID2gT\n" + "pqWPheZqdA/IXXQ6s7YXWkUGV77sTC3aqnT8IcfFKnzuiCavFzkD5F9+M0E3NDyl\n" + "BKeYDZIq3P2YEbksCv3GjOfmLYvHALR0h9YrGKTuToOWdZc7t+Z2cqK2cSXOFd3b\n" + "fG0y8U7CY0+Q2Qs2CoAyxbHc2kEyUjTy\n" + "-----END CERTIFICATE-----\n" + ), + } + + async def mock_get(*args, **kwargs): + response = AsyncMock() + response.json = lambda: cert_data + response.raise_for_status = lambda: None + return response + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.get = mock_get + mock_client_class.return_value.__aenter__.return_value = mock_client + + await cache._fetch_certificates() + + # Should have skipped the bad cert but parsed the good one + assert len(cache._certificates) == 1 + assert "good-cert" in cache._certificates + assert "bad-cert" not in cache._certificates