diff --git a/.gitignore b/.gitignore index f5c3012..ea85d20 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,9 @@ pip-wheel-metadata/ *.log logs/ +# Local data (SQLite databases, etc.) +data/ + # GCP credentials (just in case) *.json !pyproject.json diff --git a/README.md b/README.md index 33f9545..c97f339 100644 --- a/README.md +++ b/README.md @@ -329,6 +329,90 @@ class MyCustomStore(EventStore): def health_check(self) -> bool: ... ``` +## Monitoring & Metrics + +`eventkit` exposes Prometheus metrics on a dedicated port (default: 9090) for production observability. + +### Metrics Server + +The metrics server runs independently from the main API server, isolating monitoring traffic from production requests: + +```python +settings = Settings( + eventkit_metrics_enabled=True, # Default + eventkit_metrics_port=9090, # Default +) +``` + +Access metrics: +```bash +curl http://localhost:9090/metrics # Prometheus format +curl http://localhost:9090/health # Health check +``` + +### Available Metrics + +**API Layer:** +- `eventkit_api_requests_total` - Total HTTP requests (labels: `endpoint`, `method`, `status`) +- `eventkit_api_request_duration_seconds` - Request latency histogram (labels: `endpoint`, `method`) + +**Event Processing:** +- `eventkit_events_received_total` - Events received at ingestion +- `eventkit_events_processed_total` - Events successfully processed (label: `event_type`) +- `eventkit_events_failed_total` - Events that failed validation (label: `reason`) + +**Storage:** +- `eventkit_storage_bytes_written_total` - Bytes written to storage +- `eventkit_storage_files_written_total` - Files written to storage (label: `storage_type`) + +**Queue & Ring Buffer:** +- `eventkit_queue_enqueued_total` - Events enqueued (label: `queue_mode`) +- `eventkit_queue_dequeued_total` - Events dequeued (label: `queue_mode`) +- `eventkit_queue_processed_total` - Events processed by workers (labels: `queue_mode`, `result`) +- `eventkit_queue_depth` - Current queue depth gauge (labels: `queue_mode`, `partition`) +- `eventkit_ringbuffer_written_total` - Events written to ring buffer +- `eventkit_ringbuffer_published_total` - Events published from ring buffer (label: `result`) +- `eventkit_ringbuffer_marked_published_total` - Events marked as published +- `eventkit_ringbuffer_size` - Total ring buffer size gauge +- `eventkit_ringbuffer_unpublished` - Unpublished events gauge + +**Warehouse Loader:** +- `eventkit_warehouse_loader_files_processed_total` - Files loaded to warehouse +- `eventkit_warehouse_loader_errors_total` - Loader errors + +**System:** +- `eventkit_info` - Version and platform info +- `eventkit_uptime_seconds` - Process uptime +- `eventkit_component_health` - Component health status (labels: `component`, `status`) + +### Grafana Dashboard + +Example queries for building dashboards: + +```promql +# API request rate +rate(eventkit_api_requests_total[5m]) + +# Event processing throughput +rate(eventkit_events_processed_total[5m]) + +# Storage write rate (bytes/sec) +rate(eventkit_storage_bytes_written_total[5m]) + +# Queue depth (monitor backlog) +eventkit_queue_depth + +# Error rate +rate(eventkit_events_failed_total[5m]) / rate(eventkit_events_received_total[5m]) +``` + +### Design Principles + +- **Counter-focused** - Prefer counters over gauges for aggregation +- **Low cardinality labels** - No unbounded values (user IDs, etc.) +- **Separate server** - Metrics on dedicated port isolates monitoring traffic +- **Standard naming** - Follow Prometheus conventions: `{namespace}_{verb_noun}_{unit}_{suffix}` + ## Development See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions. diff --git a/docker-compose.yml b/docker-compose.yml index 9b92f0b..7bfb04c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,21 +1,7 @@ # Docker Compose for local development and testing -# Runs Firestore, Pub/Sub, and GCS emulators for integration tests +# Runs Pub/Sub and GCS emulators for integration tests services: - firestore-emulator: - image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators - command: gcloud emulators firestore start --host-port=0.0.0.0:8080 - ports: - - "8080:8080" - environment: - - FIRESTORE_PROJECT_ID=test-project - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8080"] - interval: 5s - timeout: 5s - retries: 10 - start_period: 10s - pubsub-emulator: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 diff --git a/pyproject.toml b/pyproject.toml index 3fb7b4a..7d07d70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "structlog>=23.2.0", "tenacity>=8.2.0", "python-dateutil>=2.9.0.post0", + "prometheus-client>=0.20.0", ] [project.optional-dependencies] diff --git a/scripts/e2e_test.sh b/scripts/e2e_test.sh new file mode 100755 index 0000000..9f0188c --- /dev/null +++ b/scripts/e2e_test.sh @@ -0,0 +1,312 @@ +#!/bin/bash +set -e + +# End-to-End Test Script for EventKit +# Tests the full pipeline: HTTP API → Queue → RingBuffer → EventLoader → GCS + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Configuration +API_PORT=8000 +API_URL="http://localhost:${API_PORT}" +GCS_EMULATOR_PORT=9023 +GCS_EMULATOR_URL="http://localhost:${GCS_EMULATOR_PORT}" +TEST_PROJECT="test-project" +TEST_BUCKET="eventkit-events" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" + +# State tracking +CLEANUP_NEEDED=false +API_PID="" + +# Cleanup function +cleanup() { + echo -e "${YELLOW}Cleaning up...${NC}" + + # Stop API server + if [ -n "$API_PID" ]; then + echo "Stopping API server (PID: $API_PID)..." + kill $API_PID 2>/dev/null || true + wait $API_PID 2>/dev/null || true + fi + + # Stop Docker containers + if [ "$CLEANUP_NEEDED" = true ]; then + echo "Stopping Docker containers..." + cd "$PROJECT_ROOT" + docker compose down -v 2>/dev/null || true + fi + + echo -e "${GREEN}Cleanup complete${NC}" +} + +# Set trap for cleanup +trap cleanup EXIT INT TERM + +# Helper functions +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +log_warning() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +wait_for_service() { + local url=$1 + local service_name=$2 + local max_attempts=30 + local attempt=1 + + log_info "Waiting for $service_name to be ready..." + while [ $attempt -le $max_attempts ]; do + if curl -s -f "$url" > /dev/null 2>&1; then + log_success "$service_name is ready" + return 0 + fi + echo -n "." + sleep 1 + attempt=$((attempt + 1)) + done + + log_error "$service_name failed to start after $max_attempts seconds" + return 1 +} + +# Main test flow +main() { + log_info "=== EventKit End-to-End Test ===" + echo "" + + # Step 1: Start GCS emulator + log_info "Step 1: Starting GCS emulator..." + cd "$PROJECT_ROOT" + docker compose up -d gcs-emulator + CLEANUP_NEEDED=true + + wait_for_service "$GCS_EMULATOR_URL/storage/v1/b" "GCS emulator" + + # Step 2: Create test bucket + log_info "Step 2: Creating test bucket..." + curl -s -X POST "$GCS_EMULATOR_URL/storage/v1/b?project=$TEST_PROJECT" \ + -H "Content-Type: application/json" \ + -d "{\"name\": \"$TEST_BUCKET\"}" > /dev/null + log_success "Bucket '$TEST_BUCKET' created" + + # Step 3: Install dependencies + log_info "Step 3: Installing EventKit dependencies..." + cd "$PROJECT_ROOT" + uv sync > /dev/null 2>&1 + uv pip install -e . > /dev/null 2>&1 + log_success "Dependencies installed" + + # Step 4: Create data directory for ring buffer + log_info "Step 4: Creating data directory..." + mkdir -p "$PROJECT_ROOT/data" + log_success "Data directory created" + + # Step 5: Start EventKit API server + log_info "Step 5: Starting EventKit API server..." + + # Export environment variables for the API + export GCP_PROJECT_ID="$TEST_PROJECT" + export GCP_GCS_BUCKET="$TEST_BUCKET" + export STORAGE_EMULATOR_HOST="http://localhost:$GCS_EMULATOR_PORT" + export EVENTKIT_WAREHOUSE_ENABLED="false" # Disable BigQuery loader for e2e + export EVENTKIT_QUEUE_MODE="async" # Use async queue for simplicity + export EVENTKIT_EVENTLOADER_BATCH_SIZE="10" # Small batch for faster testing + export EVENTKIT_EVENTLOADER_FLUSH_INTERVAL="5.0" # 5 second flush + export EVENTKIT_LOG_LEVEL="INFO" + + # Start API server in background + uv run uvicorn eventkit.api.app:app --host 0.0.0.0 --port $API_PORT > /tmp/eventkit-e2e-api.log 2>&1 & + API_PID=$! + + wait_for_service "$API_URL/health" "EventKit API" + log_info "API server logs: /tmp/eventkit-e2e-api.log" + + # Step 6: Send test events + log_info "Step 6: Sending test events..." + + # Track event + curl -s -X POST "$API_URL/v1/track" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "track", + "userId": "user_123", + "event": "page_viewed", + "properties": { + "page": "/home", + "title": "Home Page" + } + }' > /dev/null + log_success "Sent track event" + + # Identify event + curl -s -X POST "$API_URL/v1/identify" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "identify", + "userId": "user_123", + "traits": { + "email": "user@example.com", + "name": "Test User" + } + }' > /dev/null + log_success "Sent identify event" + + # Page event + curl -s -X POST "$API_URL/v1/page" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "page", + "userId": "user_123", + "name": "Home", + "properties": { + "url": "https://example.com/home" + } + }' > /dev/null + log_success "Sent page event" + + # Send a few more events to trigger batching + for i in {1..7}; do + curl -s -X POST "$API_URL/v1/track" \ + -H "Content-Type: application/json" \ + -d "{ + \"type\": \"track\", + \"userId\": \"user_$i\", + \"event\": \"test_event_$i\", + \"properties\": { + \"iteration\": $i + } + }" > /dev/null + done + log_success "Sent 7 additional track events (10 total)" + + # Step 7: Wait for events to be processed and flushed + log_info "Step 7: Waiting for events to be processed (10 seconds)..." + sleep 10 + + # Step 8: Verify events in GCS + log_info "Step 8: Verifying events in GCS..." + + # List objects in bucket + OBJECTS=$(curl -s "$GCS_EMULATOR_URL/storage/v1/b/$TEST_BUCKET/o") + + # Check if we have any files + FILE_COUNT=$(echo "$OBJECTS" | grep -o '"name"' | wc -l) + + if [ "$FILE_COUNT" -eq 0 ]; then + log_error "No files found in GCS bucket" + log_info "Bucket contents: $OBJECTS" + log_info "Checking API logs for errors..." + tail -50 /tmp/eventkit-e2e-api.log + exit 1 + fi + + log_success "Found $FILE_COUNT file(s) in GCS bucket" + + # Verify file structure (should be in events/date=YYYY-MM-DD/ format) + echo "$OBJECTS" | grep -q '"name":"events/date=' || { + log_error "Files are not in expected Hive-style partition format (events/date=YYYY-MM-DD/)" + echo "$OBJECTS" + exit 1 + } + log_success "Files are in correct Hive-style partition format" + + # Get first file name + FIRST_FILE=$(echo "$OBJECTS" | grep -o '"name":"[^"]*"' | head -1 | cut -d'"' -f4) + log_info "Sample file: $FIRST_FILE" + + # Download and inspect the Parquet file + log_info "Downloading sample Parquet file..." + TEMP_FILE="/tmp/eventkit-test.parquet" + curl -s "$GCS_EMULATOR_URL/storage/v1/b/$TEST_BUCKET/o/$(echo $FIRST_FILE | sed 's/\//%2F/g')?alt=media" \ + -o "$TEMP_FILE" + + if [ -f "$TEMP_FILE" ]; then + FILE_SIZE=$(stat -f%z "$TEMP_FILE" 2>/dev/null || stat -c%s "$TEMP_FILE" 2>/dev/null) + log_success "Downloaded Parquet file ($FILE_SIZE bytes)" + + # Check if it's a valid Parquet file (starts with PAR1 magic bytes) + if head -c 4 "$TEMP_FILE" | grep -q "PAR1"; then + log_success "File is a valid Parquet file" + else + log_error "File is not a valid Parquet file" + exit 1 + fi + + rm "$TEMP_FILE" + else + log_error "Failed to download Parquet file" + exit 1 + fi + + # Step 9: Verify API health + log_info "Step 9: Checking API health..." + HEALTH_RESPONSE=$(curl -s "$API_URL/health") + + if echo "$HEALTH_RESPONSE" | grep -q '"status":"ok"'; then + log_success "API health check passed" + else + log_error "API health check failed" + echo "$HEALTH_RESPONSE" + exit 1 + fi + + # Step 10: Check metrics endpoint (on port 9090) + log_info "Step 10: Checking metrics..." + METRICS_URL="http://localhost:9090" + METRICS_RESPONSE=$(curl -s "$METRICS_URL/metrics") + + if echo "$METRICS_RESPONSE" | grep -q "eventkit_events_received_total"; then + log_success "Metrics endpoint working (Prometheus format)" + + # Check for key metrics + if echo "$METRICS_RESPONSE" | grep -q "eventkit_api_requests_total"; then + log_success " ✓ API request metrics present" + fi + if echo "$METRICS_RESPONSE" | grep -q "eventkit_events_processed_total"; then + log_success " ✓ Event processing metrics present" + fi + if echo "$METRICS_RESPONSE" | grep -q "eventkit_storage_written_files_total"; then + log_success " ✓ Storage metrics present" + fi + else + log_warning "Metrics endpoint returned unexpected response" + echo "$METRICS_RESPONSE" | head -20 + fi + + # Final summary + echo "" + log_success "=== All E2E tests passed! ===" + echo "" + echo "Summary:" + echo " ✓ GCS emulator started" + echo " ✓ EventKit API server started" + echo " ✓ 10 events sent via HTTP API" + echo " ✓ Events processed and stored in GCS" + echo " ✓ $FILE_COUNT Parquet file(s) created" + echo " ✓ Files in correct Hive-style partition format" + echo " ✓ Parquet files are valid" + echo " ✓ API health check passed" + echo "" + log_info "Pipeline verified: API → Queue → RingBuffer → EventLoader → GCS ✓" +} + +# Run main function +main "$@" diff --git a/src/eventkit/api/app.py b/src/eventkit/api/app.py index aa7915a..0667a55 100644 --- a/src/eventkit/api/app.py +++ b/src/eventkit/api/app.py @@ -1,16 +1,23 @@ """FastAPI application with lifespan management.""" +import asyncio import logging +import time from collections.abc import AsyncGenerator from contextlib import asynccontextmanager import structlog -from fastapi import FastAPI +from fastapi import FastAPI, Request from eventkit.api.dependencies import get_queue, get_warehouse_loader +from eventkit.api.metrics import ( + eventkit_api_request_duration_seconds, + eventkit_api_requests_total, +) from eventkit.api.router import router from eventkit.config import Settings from eventkit.logging import configure_logging +from eventkit.metrics import init_metrics, start_metrics_server, update_uptime logger = logging.getLogger(__name__) @@ -54,6 +61,24 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Get structured logger after configuration app_logger = structlog.get_logger(__name__) + # Start metrics server (if enabled) + uptime_task = None + if settings.EVENTKIT_METRICS_ENABLED: + init_metrics() + _metrics_thread = start_metrics_server(settings.EVENTKIT_METRICS_PORT) + app_logger.info( + "metrics_server_started", + port=settings.EVENTKIT_METRICS_PORT, + ) + + # Background task to update uptime every 10 seconds + async def update_uptime_periodically() -> None: + while True: + update_uptime() + await asyncio.sleep(10) + + uptime_task = asyncio.create_task(update_uptime_periodically()) + # Startup - queue manages ring buffer, publisher, workers, and processor queue = get_queue() await queue.start() @@ -79,6 +104,14 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Shutdown - gracefully drain ring buffer and queue app_logger.info("application_shutting_down") + # Cancel uptime update task + if uptime_task: + uptime_task.cancel() + try: + await uptime_task + except asyncio.CancelledError: + pass + # Stop warehouse loader first if warehouse_loader: await warehouse_loader.stop() @@ -114,6 +147,37 @@ def create_app() -> FastAPI: lifespan=lifespan, ) + # Add metrics middleware + @app.middleware("http") + async def metrics_middleware(request: Request, call_next): # type: ignore[no-untyped-def] + """Track HTTP request metrics.""" + start_time = time.time() + + # Get endpoint path (remove query params) + endpoint = request.url.path + method = request.method + + try: + response = await call_next(request) + status = response.status_code + except Exception: + status = 500 + raise + finally: + # Record metrics + duration = time.time() - start_time + eventkit_api_requests_total.labels( + endpoint=endpoint, + method=method, + status=status, + ).inc() + eventkit_api_request_duration_seconds.labels( + endpoint=endpoint, + method=method, + ).observe(duration) + + return response + app.include_router(router) return app diff --git a/src/eventkit/api/dependencies.py b/src/eventkit/api/dependencies.py index fa5244c..b8ae5e7 100644 --- a/src/eventkit/api/dependencies.py +++ b/src/eventkit/api/dependencies.py @@ -10,9 +10,8 @@ from eventkit.processing.processor import Processor from eventkit.processing.sequencer import HashSequencer from eventkit.queues import EventQueue, create_queue -from eventkit.stores.error_store import ErrorStore +from eventkit.stores.error_store import ErrorStore, LoggingErrorStore from eventkit.stores.event_store import EventStore -from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore from eventkit.stores.gcs import GCSEventStore @@ -34,30 +33,19 @@ def get_event_store() -> EventStore: """ Get EventStore instance (singleton). - Supports multiple backends via EVENTKIT_EVENT_STORE setting: - - firestore: FirestoreEventStore (default) - - gcs: GCSEventStore + Returns GCSEventStore configured with GCS + BigQuery for scalable event storage. + + To use a different storage backend (e.g., S3, Azure), implement the EventStore + Protocol and update this function. Returns: - EventStore implementation based on configuration + GCSEventStore instance """ settings = get_settings() - - if settings.EVENTKIT_EVENT_STORE == "gcs": - return GCSEventStore( - bucket=settings.GCP_GCS_BUCKET, - project_id=settings.GCP_PROJECT_ID, - ) - elif settings.EVENTKIT_EVENT_STORE == "firestore": - return FirestoreEventStore( - project_id=settings.GCP_PROJECT_ID, - database=settings.FIRESTORE_DATABASE, - ) - else: - raise ValueError( - f"Invalid EVENTKIT_EVENT_STORE: {settings.EVENTKIT_EVENT_STORE}. " - "Must be 'firestore' or 'gcs'." - ) + return GCSEventStore( + bucket=settings.GCP_GCS_BUCKET, + project_id=settings.GCP_PROJECT_ID, + ) @lru_cache @@ -66,8 +54,8 @@ def get_queue() -> EventQueue: Get EventQueue instance (singleton). Creates all dependencies and wires them together: - - EventStore (Firestore) - - ErrorStore (Firestore) + - EventStore (GCS + BigQuery) + - ErrorStore (Logging) - Adapter (SegmentSchemaAdapter) - Sequencer (HashSequencer) - EventLoader (batches events to storage) @@ -88,43 +76,20 @@ async def collect(queue: EventQueue = Depends(get_queue)): """ settings = get_settings() - # Create stores (factory pattern based on config) + # Create stores event_store = get_event_store() - - # ErrorStore (currently only Firestore) - error_store: ErrorStore = FirestoreErrorStore( - project_id=settings.GCP_PROJECT_ID, - database=settings.FIRESTORE_DATABASE, - ) + error_store: ErrorStore = LoggingErrorStore() # Create processing components adapter = SegmentSchemaAdapter() sequencer = HashSequencer(num_partitions=settings.EVENTKIT_NUM_PARTITIONS) - # Adaptive batching: Optimize EventLoader for storage backend - # - GCS: 1000 events / 60 sec (efficient Parquet files) - # - Firestore: 100 events / 5 sec (low latency) + # GCS storage: Optimize EventLoader for efficient Parquet files + # Default: 1000 events / 60 sec # Allow explicit overrides via EVENTKIT_EVENTLOADER_* settings - if settings.EVENTKIT_EVENTLOADER_BATCH_SIZE is not None: - # Explicit override - batch_size = settings.EVENTKIT_EVENTLOADER_BATCH_SIZE - elif settings.EVENTKIT_EVENT_STORE == "gcs": - # GCS defaults - batch_size = 1000 - else: - # Firestore defaults - batch_size = settings.EVENTKIT_BUFFER_SIZE - - if settings.EVENTKIT_EVENTLOADER_FLUSH_INTERVAL is not None: - # Explicit override - flush_interval = settings.EVENTKIT_EVENTLOADER_FLUSH_INTERVAL - elif settings.EVENTKIT_EVENT_STORE == "gcs": - # GCS defaults - flush_interval = 60.0 - else: - # Firestore defaults - flush_interval = settings.EVENTKIT_BUFFER_TIMEOUT + batch_size = settings.EVENTKIT_EVENTLOADER_BATCH_SIZE or 1000 + flush_interval = settings.EVENTKIT_EVENTLOADER_FLUSH_INTERVAL or 60.0 event_loader = EventLoader( event_store=event_store, @@ -150,10 +115,7 @@ def get_warehouse_loader() -> WarehouseLoader | None: """ Get WarehouseLoader instance (singleton). - Returns BigQueryLoader if: - - EVENTKIT_EVENT_STORE=gcs (GCS storage enabled) - - EVENTKIT_WAREHOUSE_ENABLED=True (loader enabled) - + Returns BigQueryLoader if EVENTKIT_WAREHOUSE_ENABLED=True. Returns None otherwise (no warehouse loading). Returns: @@ -161,10 +123,6 @@ def get_warehouse_loader() -> WarehouseLoader | None: """ settings = get_settings() - # Only create loader for GCS storage - if settings.EVENTKIT_EVENT_STORE != "gcs": - return None - # Check if loader is enabled if not settings.EVENTKIT_WAREHOUSE_ENABLED: return None diff --git a/src/eventkit/api/metrics.py b/src/eventkit/api/metrics.py new file mode 100644 index 0000000..de3bca0 --- /dev/null +++ b/src/eventkit/api/metrics.py @@ -0,0 +1,22 @@ +"""API endpoint metrics.""" + +from prometheus_client import Counter, Histogram + +from eventkit.metrics import registry + +# HTTP request counter +eventkit_api_requests_total = Counter( + "eventkit_api_requests_total", + "Total HTTP requests to EventKit API", + ["endpoint", "method", "status"], + registry=registry, +) + +# Request duration histogram +eventkit_api_request_duration_seconds = Histogram( + "eventkit_api_request_duration_seconds", + "HTTP request duration in seconds", + ["endpoint", "method"], + buckets=[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0], + registry=registry, +) diff --git a/src/eventkit/config.py b/src/eventkit/config.py index b848d68..ec47ab8 100644 --- a/src/eventkit/config.py +++ b/src/eventkit/config.py @@ -31,7 +31,7 @@ class Settings(BaseSettings): Application settings loaded from environment variables. Configuration follows patterns from production event processing systems: - - Firestore for durable event storage + - GCS + BigQuery for scalable event storage and analytics - Configurable buffer sizes for throughput optimization - Partition control for hash-based routing @@ -44,16 +44,10 @@ class Settings(BaseSettings): extra="ignore", ) - # Required: GCP Project for Firestore + # Required: GCP Project GCP_PROJECT_ID: str - # Storage configuration - EVENTKIT_EVENT_STORE: str = "firestore" # Event store backend (firestore, gcs) - - # Firestore configuration - FIRESTORE_DATABASE: str = "default" - - # GCS + BigQuery configuration + # Storage configuration (GCS + BigQuery) GCP_GCS_BUCKET: str = "eventkit-events" # GCS bucket for event storage GCP_BIGQUERY_DATASET: str = "events" # BigQuery dataset name GCP_BIGQUERY_TABLE: str = "raw_events" # BigQuery table name @@ -101,3 +95,7 @@ class Settings(BaseSettings): # Logging EVENTKIT_LOG_LEVEL: str = "INFO" # Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) EVENTKIT_JSON_LOGS: bool = False # Use JSON formatter (True) or colored console (False) + + # Metrics (Prometheus) + EVENTKIT_METRICS_ENABLED: bool = True # Enable Prometheus metrics + EVENTKIT_METRICS_PORT: int = 9090 # Metrics server port (separate from API) diff --git a/src/eventkit/loaders/metrics.py b/src/eventkit/loaders/metrics.py new file mode 100644 index 0000000..83d24a4 --- /dev/null +++ b/src/eventkit/loaders/metrics.py @@ -0,0 +1,40 @@ +"""Warehouse loader metrics.""" + +from prometheus_client import Counter, Gauge, Histogram + +from eventkit.metrics import registry + +# File discovery and loading +eventkit_warehouse_discovered_files_total = Counter( + "eventkit_warehouse_discovered_files_total", + "Total files discovered for loading", + registry=registry, +) + +eventkit_warehouse_loaded_files_total = Counter( + "eventkit_warehouse_loaded_files_total", + "Total files loaded into warehouse", + ["result"], + registry=registry, +) + +eventkit_warehouse_loaded_rows_total = Counter( + "eventkit_warehouse_loaded_rows_total", + "Total rows loaded into warehouse", + registry=registry, +) + +# Pending files gauge +eventkit_warehouse_pending_files = Gauge( + "eventkit_warehouse_pending_files", + "Number of files pending warehouse load", + registry=registry, +) + +# Load duration histogram +eventkit_warehouse_load_duration_seconds = Histogram( + "eventkit_warehouse_load_duration_seconds", + "Time taken to load a file into warehouse", + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], + registry=registry, +) diff --git a/src/eventkit/metrics.py b/src/eventkit/metrics.py new file mode 100644 index 0000000..e5114c3 --- /dev/null +++ b/src/eventkit/metrics.py @@ -0,0 +1,143 @@ +""" +Prometheus metrics for EventKit. + +This module provides the core metrics infrastructure, including: +- Metrics registry setup +- Version information +- System health gauges +- Metrics HTTP server + +Following lio's patterns: +- Counter-focused (prefer counters over gauges) +- Low cardinality labels (no account IDs or unbounded values) +- Naming: eventkit_{verb_noun}_{unit}_{suffix} +- Each package has its own metrics.py for package-specific metrics +""" + +import platform +import sys +import time +from threading import Thread + +from fastapi import FastAPI +from prometheus_client import CollectorRegistry, Gauge, Info, make_asgi_app + +# Version information (will be set by app initialization) +VERSION = "0.1.0" + +# Create custom registry (don't use default to avoid conflicts) +registry = CollectorRegistry() + +# System information metric (labels only, value is always 1) +eventkit_info = Info( + "eventkit", + "EventKit version and environment information", + registry=registry, +) + +# Uptime gauge (updated periodically) +eventkit_uptime_seconds = Gauge( + "eventkit_uptime_seconds", + "Time in seconds since the application started", + registry=registry, +) + +# Component health gauges (1 = healthy, 0 = unhealthy) +eventkit_component_healthy = Gauge( + "eventkit_component_healthy", + "Component health status (1 = healthy, 0 = unhealthy)", + ["component"], + registry=registry, +) + +# Application start time (for uptime calculation) +_start_time = time.time() + + +def init_metrics() -> None: + """ + Initialize metrics with system information. + Should be called once at application startup. + """ + # Set version information + python_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + eventkit_info.info( + { + "version": VERSION, + "python_version": python_ver, + "platform": platform.system(), + } + ) + + # Initialize component health (all start healthy) + for component in ["api", "storage", "queue", "ring_buffer", "warehouse_loader"]: + eventkit_component_healthy.labels(component=component).set(1) + + +def update_uptime() -> None: + """Update the uptime gauge with current uptime in seconds.""" + uptime = time.time() - _start_time + eventkit_uptime_seconds.set(uptime) + + +def set_component_health(component: str, healthy: bool) -> None: + """ + Set the health status of a component. + + Args: + component: Component name (api, storage, queue, ring_buffer, warehouse_loader) + healthy: True if healthy, False if unhealthy + """ + eventkit_component_healthy.labels(component=component).set(1 if healthy else 0) + + +def create_metrics_app() -> FastAPI: + """ + Create a separate FastAPI app for the Prometheus metrics endpoint. + + This runs on a separate port (default 9090) to isolate monitoring + traffic from user traffic, following lio's pattern. + + Returns: + FastAPI app with /metrics endpoint + """ + metrics_app = FastAPI(title="EventKit Metrics", docs_url=None, redoc_url=None) + + # Add Prometheus ASGI middleware + metrics_app.mount("/metrics", make_asgi_app(registry=registry)) + + # Health check for the metrics server itself + @metrics_app.get("/health") + async def health() -> dict[str, str]: + return {"status": "ok"} + + return metrics_app + + +def start_metrics_server(port: int) -> Thread: + """ + Start the metrics server in a background thread. + + Args: + port: Port to run the metrics server on + + Returns: + Thread running the metrics server + """ + import uvicorn + + def run() -> None: + metrics_app = create_metrics_app() + config = uvicorn.Config( + metrics_app, + host="0.0.0.0", + port=port, + log_level="warning", # Quiet logs for metrics server + access_log=False, # No access logs + ) + server = uvicorn.Server(config) + server.run() + + thread = Thread(target=run, daemon=True, name="metrics-server") + thread.start() + return thread diff --git a/src/eventkit/processing/metrics.py b/src/eventkit/processing/metrics.py new file mode 100644 index 0000000..89ac5d0 --- /dev/null +++ b/src/eventkit/processing/metrics.py @@ -0,0 +1,70 @@ +"""Event processing metrics.""" + +from prometheus_client import Counter, Gauge, Histogram + +from eventkit.metrics import registry + +# Event counters +eventkit_events_received_total = Counter( + "eventkit_events_received_total", + "Total events received by the API", + ["stream", "event_type"], + registry=registry, +) + +eventkit_events_processed_total = Counter( + "eventkit_events_processed_total", + "Total events successfully processed", + ["stream", "event_type"], + registry=registry, +) + +eventkit_events_failed_total = Counter( + "eventkit_events_failed_total", + "Total events that failed processing", + ["stream", "event_type", "cause"], + registry=registry, +) + +eventkit_events_stored_total = Counter( + "eventkit_events_stored_total", + "Total events successfully stored", + ["stream", "event_type"], + registry=registry, +) + +# Validation errors +eventkit_validation_errors_total = Counter( + "eventkit_validation_errors_total", + "Total validation errors", + ["stream", "validator", "cause"], + registry=registry, +) + +# EventLoader metrics +eventkit_eventloader_batches_flushed_total = Counter( + "eventkit_eventloader_batches_flushed_total", + "Total batches flushed to storage", + ["trigger"], + registry=registry, +) + +eventkit_eventloader_batch_size = Histogram( + "eventkit_eventloader_batch_size", + "Size of batches flushed to storage", + buckets=[1, 5, 10, 25, 50, 100, 250, 500, 1000], + registry=registry, +) + +eventkit_eventloader_flush_duration_seconds = Histogram( + "eventkit_eventloader_flush_duration_seconds", + "Time taken to flush a batch to storage", + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0], + registry=registry, +) + +eventkit_eventloader_current_batch_size = Gauge( + "eventkit_eventloader_current_batch_size", + "Current number of events in the batch", + registry=registry, +) diff --git a/src/eventkit/processing/processor.py b/src/eventkit/processing/processor.py index 54c6e6d..a522c63 100644 --- a/src/eventkit/processing/processor.py +++ b/src/eventkit/processing/processor.py @@ -20,6 +20,11 @@ from eventkit.adapters.base import EventAdapter from eventkit.processing.event_loader import EventLoader +from eventkit.processing.metrics import ( + eventkit_events_failed_total, + eventkit_events_processed_total, + eventkit_events_received_total, +) from eventkit.processing.sequencer import Sequencer from eventkit.schema.raw import RawEvent from eventkit.stores.error_store import ErrorStore @@ -101,6 +106,13 @@ async def process_event(self, raw_event: RawEvent) -> None: Args: raw_event: RawEvent to process """ + # Track event received + event_type_str = str(raw_event.payload.get("type", "unknown")) + eventkit_events_received_total.labels( + stream=raw_event.stream, + event_type=event_type_str, + ).inc() + logger.debug( "event_received", event_type=raw_event.payload.get("type"), @@ -112,6 +124,13 @@ async def process_event(self, raw_event: RawEvent) -> None: if not result.ok: # Invalid event → error store + # Track failed event + eventkit_events_failed_total.labels( + stream=raw_event.stream, + event_type=event_type_str, + cause="adaptation_error", + ).inc() + logger.warning( "adaptation_failed", error=result.error or "Unknown error", @@ -129,6 +148,17 @@ async def process_event(self, raw_event: RawEvent) -> None: # Type narrowing: If ok=True, event must be present assert result.event is not None, "Adapter returned ok=True but event is None" + # Track successfully processed event + event_type_value = ( + result.event.event_type.value + if hasattr(result.event.event_type, "value") + else str(result.event.event_type) + ) + eventkit_events_processed_total.labels( + stream=raw_event.stream, + event_type=event_type_value, + ).inc() + logger.debug("event_adapted", event_type=result.event.event_type) # Step 2: Sequence (consistent routing) diff --git a/src/eventkit/queues/async_queue.py b/src/eventkit/queues/async_queue.py index af71d6d..933d3a1 100644 --- a/src/eventkit/queues/async_queue.py +++ b/src/eventkit/queues/async_queue.py @@ -32,6 +32,12 @@ import logging from typing import TYPE_CHECKING +from eventkit.queues.metrics import ( + eventkit_queue_depth, + eventkit_queue_dequeued_total, + eventkit_queue_enqueued_total, + eventkit_queue_processed_total, +) from eventkit.schema.raw import RawEvent if TYPE_CHECKING: @@ -117,6 +123,10 @@ async def enqueue(self, event: RawEvent) -> None: """ # Write to ring buffer (synchronous, durable) self.ring_buffer.write(event) + + # Track enqueue + eventkit_queue_enqueued_total.labels(queue_mode="async").inc() + logger.debug("Event written to ring buffer") async def start(self) -> None: @@ -226,11 +236,26 @@ async def _worker(self, worker_id: int) -> None: # Wait for event with timeout (check stop signal periodically) event = await asyncio.wait_for(self._internal_queue.get(), timeout=0.1) + # Track dequeue + eventkit_queue_dequeued_total.labels(queue_mode="async").inc() + + # Update queue depth gauge + depth = self._internal_queue.qsize() + eventkit_queue_depth.labels(queue_mode="async", partition=str(worker_id)).set(depth) + # Process event try: await self.processor.process_event(event) + # Track successful processing + eventkit_queue_processed_total.labels( + queue_mode="async", result="success" + ).inc() except Exception as e: logger.error(f"Worker {worker_id} error processing event: {e}") + # Track failed processing + eventkit_queue_processed_total.labels( + queue_mode="async", result="failure" + ).inc() finally: # Mark task as done (for queue.join()) self._internal_queue.task_done() diff --git a/src/eventkit/queues/metrics.py b/src/eventkit/queues/metrics.py new file mode 100644 index 0000000..c03ba3c --- /dev/null +++ b/src/eventkit/queues/metrics.py @@ -0,0 +1,35 @@ +"""Queue metrics.""" + +from prometheus_client import Counter, Gauge + +from eventkit.metrics import registry + +# Queue operations +eventkit_queue_enqueued_total = Counter( + "eventkit_queue_enqueued_total", + "Total messages enqueued", + ["queue_mode"], + registry=registry, +) + +eventkit_queue_dequeued_total = Counter( + "eventkit_queue_dequeued_total", + "Total messages dequeued", + ["queue_mode"], + registry=registry, +) + +eventkit_queue_processed_total = Counter( + "eventkit_queue_processed_total", + "Total messages processed by workers", + ["queue_mode", "result"], + registry=registry, +) + +# Queue depth gauge +eventkit_queue_depth = Gauge( + "eventkit_queue_depth", + "Current queue depth", + ["queue_mode", "partition"], + registry=registry, +) diff --git a/src/eventkit/queues/pubsub.py b/src/eventkit/queues/pubsub.py index a3ad3f9..8b8899d 100644 --- a/src/eventkit/queues/pubsub.py +++ b/src/eventkit/queues/pubsub.py @@ -23,6 +23,12 @@ from google.cloud.pubsub_v1.subscriber.message import Message from eventkit.config import Settings +from eventkit.queues.metrics import ( + eventkit_queue_depth, + eventkit_queue_dequeued_total, + eventkit_queue_enqueued_total, + eventkit_queue_processed_total, +) from eventkit.schema.raw import RawEvent if TYPE_CHECKING: @@ -128,6 +134,10 @@ async def enqueue(self, event: RawEvent) -> None: """ # Write to ring buffer (synchronous, durable) self.ring_buffer.write(event) + + # Track enqueue (ring buffer write) + eventkit_queue_enqueued_total.labels(queue_mode="pubsub").inc() + logger.debug("Event written to ring buffer") async def start(self) -> None: @@ -157,17 +167,28 @@ async def enqueue(self, event: RawEvent) -> None: """Publish event to Pub/Sub topic.""" data = event.model_dump_json().encode("utf-8") - # Publish to Pub/Sub (blocking operation, run in thread) - future = self.pubsub_queue.publisher_client.publish( - self.pubsub_queue.topic_path, - data=data, - # Add attributes for filtering/routing if needed - event_type=event.payload.get("type", "unknown"), - stream=event.stream or "default", - ) + try: + # Publish to Pub/Sub (blocking operation, run in thread) + future = self.pubsub_queue.publisher_client.publish( + self.pubsub_queue.topic_path, + data=data, + # Add attributes for filtering/routing if needed + event_type=event.payload.get("type", "unknown"), + stream=event.stream or "default", + ) - # Wait for publish confirmation (runs in thread to avoid blocking) - await asyncio.to_thread(future.result) + # Wait for publish confirmation (runs in thread to avoid blocking) + await asyncio.to_thread(future.result) + + # Track successful publish + eventkit_queue_enqueued_total.labels(queue_mode="pubsub_published").inc() + except Exception as e: + # Track failed publish + eventkit_queue_processed_total.labels( + queue_mode="pubsub", result="publish_failure" + ).inc() + logger.error(f"Failed to publish to Pub/Sub: {e}") + raise pubsub_adapter = PubSubAdapter(self) event_loop = asyncio.get_running_loop() @@ -261,13 +282,23 @@ def _pubsub_callback(self, message: Message) -> None: asyncio.run_coroutine_threadsafe( self.internal_queue.put((raw_event, message)), self.loop ) + # Track message received + eventkit_queue_dequeued_total.labels(queue_mode="pubsub").inc() else: logger.error("Event loop not initialized, cannot enqueue message") message.nack() + # Track nack + eventkit_queue_processed_total.labels( + queue_mode="pubsub", result="nack_no_loop" + ).inc() except Exception as e: logger.error(f"Failed to process Pub/Sub message: {e}", exc_info=True) message.nack() # Negative acknowledge: redeliver message + # Track decode failure + nack + eventkit_queue_processed_total.labels( + queue_mode="pubsub", result="nack_decode_error" + ).inc() async def _worker(self, worker_id: int) -> None: """Process events from internal queue, ack/nack Pub/Sub messages.""" @@ -281,6 +312,12 @@ async def _worker(self, worker_id: int) -> None: self.internal_queue.get(), timeout=0.1 ) + # Update queue depth gauge + depth = self.internal_queue.qsize() + eventkit_queue_depth.labels(queue_mode="pubsub", partition=str(worker_id)).set( + depth + ) + try: # Process event through pipeline await self.processor.process_event(raw_event) @@ -288,6 +325,11 @@ async def _worker(self, worker_id: int) -> None: # Success: Acknowledge message (won't be redelivered) pubsub_msg.ack() + # Track successful processing + ack + eventkit_queue_processed_total.labels( + queue_mode="pubsub", result="ack_success" + ).inc() + except Exception as e: logger.error( f"Worker {worker_id} failed to process event: {e}", exc_info=True @@ -296,6 +338,11 @@ async def _worker(self, worker_id: int) -> None: # After MAX_DELIVERY_ATTEMPTS, message goes to DLQ pubsub_msg.nack() + # Track failed processing + nack + eventkit_queue_processed_total.labels( + queue_mode="pubsub", result="nack_processing_error" + ).inc() + finally: # Mark task as done (for queue.join()) self.internal_queue.task_done() diff --git a/src/eventkit/ring_buffer/metrics.py b/src/eventkit/ring_buffer/metrics.py new file mode 100644 index 0000000..c3a4a89 --- /dev/null +++ b/src/eventkit/ring_buffer/metrics.py @@ -0,0 +1,46 @@ +"""Ring buffer metrics.""" + +from prometheus_client import Counter, Gauge + +from eventkit.metrics import registry + +# Ring buffer operations +eventkit_ringbuffer_written_total = Counter( + "eventkit_ringbuffer_written_total", + "Total events written to ring buffer", + registry=registry, +) + +eventkit_ringbuffer_published_total = Counter( + "eventkit_ringbuffer_published_total", + "Total events published from ring buffer", + ["result"], + registry=registry, +) + +eventkit_ringbuffer_marked_published_total = Counter( + "eventkit_ringbuffer_marked_published_total", + "Total events marked as published in ring buffer", + registry=registry, +) + +# Ring buffer size gauges +eventkit_ringbuffer_unpublished = Gauge( + "eventkit_ringbuffer_unpublished", + "Number of unpublished events in ring buffer", + registry=registry, +) + +eventkit_ringbuffer_size = Gauge( + "eventkit_ringbuffer_size", + "Total number of events in ring buffer", + registry=registry, +) + +# Cleanup operations +eventkit_ringbuffer_cleanup_total = Counter( + "eventkit_ringbuffer_cleanup_total", + "Total ring buffer cleanup operations", + ["result"], + registry=registry, +) diff --git a/src/eventkit/ring_buffer/publisher.py b/src/eventkit/ring_buffer/publisher.py index dc93e99..141b45e 100644 --- a/src/eventkit/ring_buffer/publisher.py +++ b/src/eventkit/ring_buffer/publisher.py @@ -13,6 +13,7 @@ from eventkit.queues.base import EventQueue from eventkit.ring_buffer.base import RingBuffer +from eventkit.ring_buffer.metrics import eventkit_ringbuffer_published_total from eventkit.schema.raw import RawEvent if TYPE_CHECKING: @@ -148,8 +149,13 @@ def _publish_batch(self) -> int: future.result() published_ids.append(entry.id) + # Track successful publish + eventkit_ringbuffer_published_total.labels(result="success").inc() + except Exception: logger.exception(f"Failed to publish event {entry.id}") + # Track failed publish + eventkit_ringbuffer_published_total.labels(result="failure").inc() # Continue with next event (failed event stays unpublished) continue diff --git a/src/eventkit/ring_buffer/sqlite.py b/src/eventkit/ring_buffer/sqlite.py index 80fca04..4a1f45c 100644 --- a/src/eventkit/ring_buffer/sqlite.py +++ b/src/eventkit/ring_buffer/sqlite.py @@ -9,6 +9,12 @@ from datetime import UTC, datetime, timedelta from eventkit.ring_buffer.base import RingBuffer, RingBufferEntry +from eventkit.ring_buffer.metrics import ( + eventkit_ringbuffer_marked_published_total, + eventkit_ringbuffer_size, + eventkit_ringbuffer_unpublished, + eventkit_ringbuffer_written_total, +) from eventkit.schema.raw import RawEvent @@ -86,9 +92,16 @@ def write(self, event: RawEvent) -> int: ) self.conn.commit() + # Track write + eventkit_ringbuffer_written_total.inc() + # lastrowid should never be None for AUTOINCREMENT, but handle type safety if cursor.lastrowid is None: raise RuntimeError("Failed to get inserted row ID") + + # Update size gauge + self._update_size_metrics() + return cursor.lastrowid def fetch_unpublished(self, limit: int) -> list[RingBufferEntry]: @@ -151,6 +164,12 @@ def mark_published(self, ids: list[int]) -> None: ) self.conn.commit() + # Track marked published + eventkit_ringbuffer_marked_published_total.inc(len(ids)) + + # Update size gauge + self._update_size_metrics() + def delete_old_published(self, max_age_hours: int) -> int: """ Delete published events older than max_age_hours. @@ -216,6 +235,18 @@ def count(self) -> int: return 0 return int(result[0]) + def _update_size_metrics(self) -> None: + """Update ring buffer size metrics.""" + # Total size + total = self.count() + eventkit_ringbuffer_size.set(total) + + # Unpublished count + cursor = self.conn.execute("SELECT COUNT(*) FROM ring_buffer WHERE published = FALSE") + result = cursor.fetchone() + unpublished = int(result[0]) if result else 0 + eventkit_ringbuffer_unpublished.set(unpublished) + def close(self) -> None: """Close database connection.""" self.conn.close() diff --git a/src/eventkit/stores/__init__.py b/src/eventkit/stores/__init__.py index 6da8baa..e96ca32 100644 --- a/src/eventkit/stores/__init__.py +++ b/src/eventkit/stores/__init__.py @@ -1,7 +1,7 @@ """Storage interfaces.""" -from eventkit.stores.error_store import ErrorStore +from eventkit.stores.error_store import ErrorStore, LoggingErrorStore from eventkit.stores.event_store import EventStore from eventkit.stores.gcs import GCSEventStore -__all__ = ["EventStore", "ErrorStore", "GCSEventStore"] +__all__ = ["EventStore", "ErrorStore", "LoggingErrorStore", "GCSEventStore"] diff --git a/src/eventkit/stores/error_store.py b/src/eventkit/stores/error_store.py index 9215ce6..59fdce4 100644 --- a/src/eventkit/stores/error_store.py +++ b/src/eventkit/stores/error_store.py @@ -1,8 +1,12 @@ -"""Error storage protocol.""" +"""Error storage protocol and implementations.""" from datetime import datetime from typing import Any, Protocol +import structlog + +logger = structlog.get_logger(__name__) + class ErrorStore(Protocol): """ @@ -32,3 +36,28 @@ async def store_error( StorageError: If error storage fails """ ... + + +class LoggingErrorStore: + """ + Simple error store that logs errors to structlog. + + Suitable for development and reference implementations where + error persistence isn't critical. + """ + + async def store_error( + self, + payload: dict[str, Any], + error: str, + timestamp: datetime, + metadata: dict[str, Any] | None = None, + ) -> None: + """Log error to structlog.""" + logger.error( + "event_processing_error", + error=error, + payload=payload, + timestamp=timestamp.isoformat(), + metadata=metadata or {}, + ) diff --git a/src/eventkit/stores/firestore.py b/src/eventkit/stores/firestore.py deleted file mode 100644 index fad4c99..0000000 --- a/src/eventkit/stores/firestore.py +++ /dev/null @@ -1,404 +0,0 @@ -""" -Firestore implementation of event and error storage. - -Provides concrete implementations of EventStore and ErrorStore protocols -using Google Cloud Firestore, with batching, retry logic, and stream isolation. -""" - -import asyncio -import time -from datetime import datetime -from typing import Any - -import structlog -from google.api_core.exceptions import ( - DeadlineExceeded, - InternalServerError, - ServiceUnavailable, -) -from google.cloud import firestore # type: ignore[attr-defined] -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) - -from eventkit.schema.events import TypedEvent - -logger = structlog.get_logger(__name__) - - -class FirestoreEventStore: - """ - Firestore-backed event storage with batch operations and retry logic. - - Stores events in subcollections per stream for isolation: - events/{stream}/items/{timestamp_iso}_{uuid} - - Features: - - Async wrapper around sync Firestore client - - Automatic batching for >500 events - - Exponential backoff retries for transient errors - - Stream-based isolation - - Args: - project_id: GCP project ID - database: Firestore database name (default: "default") - - Example: - store = FirestoreEventStore(project_id="my-project") - await store.store(event) - events = await store.read(stream="mobile", limit=100) - """ - - def __init__(self, project_id: str, database: str = "default"): - """Initialize Firestore client.""" - self.db = firestore.Client(project=project_id, database=database) - - def _get_doc_ref(self, event: TypedEvent) -> firestore.DocumentReference: - """ - Get document reference for an event in its stream subcollection. - - Document path: events/{stream}/items/{timestamp_iso}_{event_id} - - Args: - event: The event to get a reference for - - Returns: - Firestore document reference - """ - stream = getattr(event, "stream", None) or "default" - doc_id = f"{event.timestamp.isoformat()}_{event.event_id}" - return self.db.collection("events").document(stream).collection("items").document(doc_id) - - def _event_to_dict(self, event: TypedEvent) -> dict[str, Any]: - """ - Serialize event to Firestore-compatible dict. - - Converts datetime objects to ISO format strings for Firestore storage. - - Args: - event: The event to serialize - - Returns: - Dictionary representation suitable for Firestore - """ - data = event.model_dump() - # Convert datetime to ISO string - if "timestamp" in data and isinstance(data["timestamp"], datetime): - data["timestamp"] = data["timestamp"].isoformat() - return data - - async def store(self, event: TypedEvent) -> None: - """ - Store a single typed event. - - Wraps the synchronous Firestore write in a thread to avoid blocking - the async event loop. - - Args: - event: The event to store - - Raises: - StorageError: If the event cannot be stored after retries - """ - await asyncio.to_thread(self._sync_store, event) - - def _sync_store(self, event: TypedEvent) -> None: - """Synchronous event storage with retry logic.""" - doc_ref = self._get_doc_ref(event) - data = self._event_to_dict(event) - self._write_with_retry(doc_ref, data) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), - ) - def _write_with_retry(self, doc_ref: firestore.DocumentReference, data: dict[str, Any]) -> None: - """Write document with exponential backoff retry.""" - doc_ref.set(data) - - async def store_batch(self, events: list[TypedEvent]) -> None: - """ - Store a batch of typed events. - - Automatically chunks into groups of 500 (Firestore batch limit). - Each chunk is committed atomically, but chunks are NOT atomic across - each other. - - Args: - events: List of events to store - - Raises: - StorageError: If any batch cannot be stored after retries - """ - event_count = len(events) - start_time = time.perf_counter() - - logger.info("store_write_start", event_count=event_count) - - try: - await asyncio.to_thread(self._sync_store_batch, events) - - duration_ms = (time.perf_counter() - start_time) * 1000 - logger.info( - "store_write_complete", - event_count=event_count, - duration_ms=round(duration_ms, 2), - collection="events", - ) - - except Exception as e: - duration_ms = (time.perf_counter() - start_time) * 1000 - logger.error( - "store_write_failed", - event_count=event_count, - error=str(e), - error_type=type(e).__name__, - duration_ms=round(duration_ms, 2), - exc_info=True, - ) - raise - - def _sync_store_batch(self, events: list[TypedEvent]) -> None: - """Synchronous batch storage with chunking and retry logic.""" - # Chunk events into groups of 500 - chunks = [events[i : i + 500] for i in range(0, len(events), 500)] - - for chunk in chunks: - batch = self.db.batch() - for event in chunk: - doc_ref = self._get_doc_ref(event) - data = self._event_to_dict(event) - batch.set(doc_ref, data) - - self._write_batch_with_retry(batch) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), - ) - def _write_batch_with_retry(self, batch: firestore.WriteBatch) -> None: - """Commit batch with exponential backoff retry.""" - batch.commit() - - async def read(self, stream: str, limit: int = 100) -> list[TypedEvent]: - """ - Read events from a specific stream. - - Returns events in reverse chronological order (newest first). - - Args: - stream: The stream name to read from - limit: Maximum number of events to return (default: 100) - - Returns: - List of TypedEvent objects from the stream - - Raises: - StorageError: If events cannot be read after retries - """ - return await asyncio.to_thread(self._sync_read, stream, limit) - - def _sync_read(self, stream: str, limit: int) -> list[TypedEvent]: - """Synchronous read with retry logic.""" - docs = self._read_with_retry(stream, limit) - return [self._dict_to_event(doc.to_dict()) for doc in docs] - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), - ) - def _read_with_retry(self, stream: str, limit: int) -> Any: - """Query Firestore with exponential backoff retry.""" - return ( - self.db.collection("events") - .document(stream) - .collection("items") - .order_by("timestamp", direction=firestore.Query.DESCENDING) - .limit(limit) - .stream() - ) - - def _dict_to_event(self, data: dict[str, Any]) -> TypedEvent: - """ - Deserialize Firestore dict to TypedEvent. - - Converts ISO timestamp strings back to datetime objects. - - Args: - data: Dictionary from Firestore - - Returns: - TypedEvent subclass based on event_type - - Raises: - ValueError: If event_type is not recognized - """ - from eventkit.schema.events import IdentifyEvent, PageEvent, TrackEvent - - # Convert ISO string back to datetime - if "timestamp" in data and isinstance(data["timestamp"], str): - data["timestamp"] = datetime.fromisoformat(data["timestamp"]) - - # Map event_type to concrete class - event_type = data.get("event_type") - if event_type == "identify": - return IdentifyEvent(**data) - elif event_type == "track": - return TrackEvent(**data) - elif event_type == "page": - return PageEvent(**data) - else: - raise ValueError(f"Unknown event_type: {event_type}") - - async def health_check(self) -> bool: - """ - Check Firestore connectivity. - - Attempts a lightweight operation to verify Firestore is reachable. - - Returns: - True if Firestore is healthy, False otherwise - """ - try: - # Try to list collections (lightweight operation) - collections = self.db.collections() - # Consume at most one collection to verify connection - next(collections, None) - return True - except Exception: - return False - - -class FirestoreErrorStore: - """ - Firestore-backed error storage for dead letter queue. - - Stores failed events and their error details in a flat collection: - errors/{timestamp_iso}_{uuid} - - Features: - - Async wrapper around sync Firestore client - - Captures raw payload, error message, timestamp, and metadata - - Simple flat structure for easy querying - - Args: - project_id: GCP project ID - database: Firestore database name (default: "default") - - Example: - store = FirestoreErrorStore(project_id="my-project") - await store.store_error( - payload={"userId": "123"}, - error="Validation failed", - timestamp=datetime.now(UTC), - metadata={"stream": "mobile"} - ) - errors = await store.query_errors(limit=10) - """ - - def __init__(self, project_id: str, database: str = "default"): - """Initialize Firestore client.""" - self.db = firestore.Client(project=project_id, database=database) - - async def store_error( - self, - payload: dict[str, Any], - error: str, - timestamp: datetime, - metadata: dict[str, Any] | None = None, - ) -> None: - """ - Store a failed event with error details. - - Args: - payload: The original raw event payload that failed - error: Error message describing why the event failed - timestamp: When the error occurred - metadata: Optional additional context (e.g., stream, source) - - Raises: - StorageError: If the error cannot be stored after retries - """ - stream = metadata.get("stream") if metadata else None - logger.warning( - "error_stored", - stream=stream, - error=error, - ) - - await asyncio.to_thread(self._sync_store_error, payload, error, timestamp, metadata) - - def _sync_store_error( - self, - payload: dict[str, Any], - error: str, - timestamp: datetime, - metadata: dict[str, Any] | None, - ) -> None: - """Synchronous error storage with retry logic.""" - from uuid import uuid4 - - doc_id = f"{timestamp.isoformat()}_{uuid4()}" - doc_data = { - "payload": payload, - "error": error, - "timestamp": timestamp.isoformat(), - "stream": metadata.get("stream") if metadata else None, - "metadata": metadata or {}, - } - - doc_ref = self.db.collection("errors").document(doc_id) - self._write_error_with_retry(doc_ref, doc_data) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), - ) - def _write_error_with_retry( - self, doc_ref: firestore.DocumentReference, data: dict[str, Any] - ) -> None: - """Write error document with exponential backoff retry.""" - doc_ref.set(data) - - async def query_errors(self, limit: int = 100) -> list[dict[str, Any]]: - """ - Query stored errors. - - Returns errors in reverse chronological order (newest first). - - Args: - limit: Maximum number of errors to return (default: 100) - - Returns: - List of error documents with payload, error, timestamp, and metadata - - Raises: - StorageError: If errors cannot be queried after retries - """ - return await asyncio.to_thread(self._sync_query_errors, limit) - - def _sync_query_errors(self, limit: int) -> list[dict[str, Any]]: - """Synchronous error query with retry logic.""" - docs = self._query_errors_with_retry(limit) - return [doc.to_dict() for doc in docs] - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), - ) - def _query_errors_with_retry(self, limit: int) -> Any: - """Query Firestore errors collection with exponential backoff retry.""" - return ( - self.db.collection("errors") - .order_by("timestamp", direction=firestore.Query.DESCENDING) - .limit(limit) - .stream() - ) diff --git a/src/eventkit/stores/gcs.py b/src/eventkit/stores/gcs.py index 2f12d5e..8142db1 100644 --- a/src/eventkit/stores/gcs.py +++ b/src/eventkit/stores/gcs.py @@ -6,6 +6,7 @@ """ import asyncio +import time from datetime import UTC, datetime from typing import Any from uuid import uuid4 @@ -19,6 +20,11 @@ from eventkit.schema.events import IdentifyEvent, PageEvent, TrackEvent, TypedEvent from eventkit.stores.event_store import EventStore +from eventkit.stores.metrics import ( + eventkit_storage_operations_total, + eventkit_storage_write_duration_seconds, + eventkit_storage_written_files_total, +) logger = structlog.get_logger(__name__) @@ -88,6 +94,7 @@ async def store_batch(self, events: list[TypedEvent]) -> None: return logger.info("gcs_write_started", event_count=len(events)) + start_time = time.time() # Group events by date for partitioning from collections import defaultdict @@ -106,7 +113,23 @@ async def store_batch(self, events: list[TypedEvent]) -> None: path = self._generate_path(date_events[0].timestamp) # Write to GCS (with retries) - await asyncio.to_thread(self._write_parquet, df, path) + try: + await asyncio.to_thread(self._write_parquet, df, path) + + # Track successful write (estimate bytes) + eventkit_storage_written_files_total.labels(storage_backend="gcs").inc() + eventkit_storage_operations_total.labels( + storage_backend="gcs", + operation="write", + result="success", + ).inc() + except Exception: + eventkit_storage_operations_total.labels( + storage_backend="gcs", + operation="write", + result="failure", + ).inc() + raise logger.info( "gcs_write_complete", @@ -114,6 +137,10 @@ async def store_batch(self, events: list[TypedEvent]) -> None: path=path, ) + # Track write duration + duration = time.time() - start_time + eventkit_storage_write_duration_seconds.labels(storage_backend="gcs").observe(duration) + def _event_to_dict(self, event: TypedEvent) -> dict[str, Any]: """ Convert TypedEvent to dict for Parquet serialization. diff --git a/src/eventkit/stores/metrics.py b/src/eventkit/stores/metrics.py new file mode 100644 index 0000000..77f13e7 --- /dev/null +++ b/src/eventkit/stores/metrics.py @@ -0,0 +1,38 @@ +"""Storage metrics.""" + +from prometheus_client import Counter, Histogram + +from eventkit.metrics import registry + +# Storage byte counter +eventkit_storage_written_bytes_total = Counter( + "eventkit_storage_written_bytes_total", + "Total bytes written to storage", + ["storage_backend"], + registry=registry, +) + +# Storage file counter +eventkit_storage_written_files_total = Counter( + "eventkit_storage_written_files_total", + "Total files written to storage", + ["storage_backend"], + registry=registry, +) + +# Storage operations +eventkit_storage_operations_total = Counter( + "eventkit_storage_operations_total", + "Total storage operations", + ["storage_backend", "operation", "result"], + registry=registry, +) + +# Write duration histogram +eventkit_storage_write_duration_seconds = Histogram( + "eventkit_storage_write_duration_seconds", + "Time taken to write to storage", + ["storage_backend"], + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0], + registry=registry, +) diff --git a/tests/integration/test_firestore_integration.py b/tests/integration/test_firestore_integration.py deleted file mode 100644 index 66272ca..0000000 --- a/tests/integration/test_firestore_integration.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Integration tests for Firestore storage using emulator.""" - -import os -from datetime import UTC, datetime - -import pytest -import pytest_asyncio - -from eventkit.schema.events import TrackEvent -from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore - - -@pytest.fixture -def check_emulator(): - """Ensure Firestore emulator is running.""" - emulator_host = os.getenv("FIRESTORE_EMULATOR_HOST") - if not emulator_host: - pytest.skip("FIRESTORE_EMULATOR_HOST not set. Start with: docker compose up") - - -@pytest_asyncio.fixture -async def event_store(check_emulator): - """Fixture for FirestoreEventStore connected to emulator.""" - return FirestoreEventStore(project_id="test-project") - - -@pytest_asyncio.fixture -async def error_store(check_emulator): - """Fixture for FirestoreErrorStore connected to emulator.""" - return FirestoreErrorStore(project_id="test-project") - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_write_and_read_events(event_store): - """Test end-to-end: write 100 events, read them back.""" - stream = "mobile-integration-test" - - # Create 100 track events - events = [ - TrackEvent( - user_id=f"user{i}", - event_name=f"test_event_{i}", - properties={"index": i}, - stream=stream, - ) - for i in range(100) - ] - - # Write batch - await event_store.store_batch(events) - - # Read back - retrieved_events = await event_store.read(stream=stream, limit=100) - - # Verify - assert len(retrieved_events) == 100 - - # Should be in reverse chronological order (newest first) - # Since they were created in the same batch, order may vary - # but all should be present - retrieved_user_ids = {e.user_id for e in retrieved_events} - expected_user_ids = {f"user{i}" for i in range(100)} - assert retrieved_user_ids == expected_user_ids - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_stream_isolation(event_store): - """Test that streams are isolated (mobile vs web).""" - mobile_stream = "mobile-isolation-test" - web_stream = "web-isolation-test" - - # Write 50 events to mobile stream - mobile_events = [ - TrackEvent( - user_id=f"mobile_user{i}", - event_name="mobile_event", - stream=mobile_stream, - ) - for i in range(50) - ] - await event_store.store_batch(mobile_events) - - # Write 30 events to web stream - web_events = [ - TrackEvent( - user_id=f"web_user{i}", - event_name="web_event", - stream=web_stream, - ) - for i in range(30) - ] - await event_store.store_batch(web_events) - - # Query mobile stream - should only get mobile events - mobile_retrieved = await event_store.read(stream=mobile_stream, limit=100) - assert len(mobile_retrieved) == 50 - assert all("mobile_user" in e.user_id for e in mobile_retrieved) - - # Query web stream - should only get web events - web_retrieved = await event_store.read(stream=web_stream, limit=100) - assert len(web_retrieved) == 30 - assert all("web_user" in e.user_id for e in web_retrieved) - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_error_store_persistence(error_store): - """Test error storage and retrieval.""" - # Store 5 failed events with different errors - for i in range(5): - payload = {"userId": f"user{i}", "type": "track"} - error_msg = f"Validation error {i}: missing required field" - timestamp = datetime.now(UTC) - metadata = {"stream": f"stream{i}"} - - await error_store.store_error(payload, error_msg, timestamp, metadata) - - # Query errors - errors = await error_store.query_errors(limit=10) - - # Verify - assert len(errors) >= 5 # May have errors from other tests - - # Find our errors - our_errors = [e for e in errors if "Validation error" in e["error"]] - assert len(our_errors) == 5 - - # Verify structure - for error in our_errors: - assert "payload" in error - assert "error" in error - assert "timestamp" in error - assert "stream" in error - assert error["payload"]["type"] == "track" diff --git a/tests/integration/test_ring_buffer_integration.py b/tests/integration/test_ring_buffer_integration.py deleted file mode 100644 index 2f6a03a..0000000 --- a/tests/integration/test_ring_buffer_integration.py +++ /dev/null @@ -1,531 +0,0 @@ -"""Integration tests for ring buffer → queue → storage flow.""" - -import asyncio -import os -import tempfile -import time -from datetime import UTC, datetime, timedelta - -import pytest -import pytest_asyncio - -from eventkit.adapters.segment import SegmentSchemaAdapter -from eventkit.processing.event_loader import EventLoader -from eventkit.processing.processor import Processor -from eventkit.processing.sequencer import HashSequencer -from eventkit.queues.async_queue import AsyncQueue -from eventkit.ring_buffer.cleanup import RingBufferCleanup -from eventkit.ring_buffer.sqlite import SQLiteRingBuffer -from eventkit.schema.raw import RawEvent -from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore - - -@pytest.fixture -def check_emulator(): - """Ensure Firestore emulator is running.""" - emulator_host = os.getenv("FIRESTORE_EMULATOR_HOST") - if not emulator_host: - pytest.skip("FIRESTORE_EMULATOR_HOST not set. Start with: docker compose up") - - -@pytest.fixture -def temp_db_path(): - """Fixture for a temporary database file.""" - with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: - yield tmp.name - os.remove(tmp.name) - - -@pytest_asyncio.fixture -async def ring_buffer(temp_db_path): - """Fixture for SQLiteRingBuffer.""" - rb = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=1000, - retention_hours=24, - ) - yield rb - rb.close() - - -@pytest_asyncio.fixture -async def event_store(check_emulator): - """Fixture for FirestoreEventStore connected to emulator.""" - return FirestoreEventStore(project_id="test-project") - - -@pytest_asyncio.fixture -async def error_store(check_emulator): - """Fixture for FirestoreErrorStore connected to emulator.""" - return FirestoreErrorStore(project_id="test-project") - - -@pytest_asyncio.fixture -async def processor(event_store, error_store): - """Fixture for fully configured Processor.""" - adapter = SegmentSchemaAdapter() - sequencer = HashSequencer(num_partitions=4) - event_loader = EventLoader( - event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 - ) - - processor = Processor( - adapter=adapter, - sequencer=sequencer, - event_loader=event_loader, - error_store=error_store, - ) - - return processor - - -@pytest_asyncio.fixture -async def queue_with_ring_buffer(ring_buffer, processor): - """Fixture for AsyncQueue with ring buffer.""" - queue = AsyncQueue( - processor=processor, - ring_buffer=ring_buffer, - num_workers=2, - publisher_batch_size=10, - publisher_poll_interval=0.05, # Fast polling for tests - cleanup_interval=0.1, # Fast cleanup for tests - ) - - await queue.start() - yield queue - await queue.stop() - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_end_to_end_ring_buffer_to_firestore( - queue_with_ring_buffer, ring_buffer, event_store -): - """ - Test complete flow: API → ring buffer → publisher → queue → processor → Firestore. - - This validates: - - Events are written to ring buffer - - Publisher moves events from ring buffer to internal queue - - Workers process events - - Events are stored in Firestore - """ - stream = "ring-buffer-test" - num_events = 20 - - # Create events - events = [ - RawEvent( - payload={ - "type": "track", - "event": f"test_event_{i}", - "userId": f"user_{i}", - "properties": {"index": i}, - }, - stream=stream, - ) - for i in range(num_events) - ] - - # Enqueue events (writes to ring buffer) - for event in events: - await queue_with_ring_buffer.enqueue(event) - - # Verify events are in ring buffer - assert ring_buffer.count() >= num_events - - # Wait for publisher to move events to queue and workers to process - await asyncio.sleep(2.0) - - # Verify all events were marked as published - unpublished = ring_buffer.fetch_unpublished(limit=100) - assert len(unpublished) == 0, f"Expected 0 unpublished, found {len(unpublished)}" - - # Verify events reached Firestore - # Give buffer time to flush - await asyncio.sleep(2.0) - - stored_events = await event_store.read(stream=stream, limit=num_events) - assert len(stored_events) == num_events, ( - f"Expected {num_events} events in Firestore, found {len(stored_events)}" - ) - - -@pytest.mark.asyncio -@pytest.mark.integration -@pytest.mark.skip(reason="Flaky: race condition in shutdown timing - needs better synchronization") -async def test_graceful_shutdown_drains_ring_buffer(ring_buffer, processor, event_store): - """ - Test that stopping the queue drains all events from ring buffer. - - This validates: - - Events in ring buffer when stop() is called are processed - - No events are lost during shutdown - - TODO: Fix race condition where publisher may not process all events before stop completes. - """ - stream = "shutdown-test" - num_events = 15 - - # Create queue - queue = AsyncQueue( - processor=processor, - ring_buffer=ring_buffer, - num_workers=1, - publisher_batch_size=5, - publisher_poll_interval=0.1, - cleanup_interval=0.1, - ) - - await queue.start() - - # Enqueue events quickly - events = [ - RawEvent( - payload={ - "type": "track", - "event": f"shutdown_event_{i}", - "userId": f"user_{i}", - }, - stream=stream, - ) - for i in range(num_events) - ] - - for event in events: - await queue.enqueue(event) - - # Verify events are in ring buffer - assert ring_buffer.count() == num_events - - # Give publisher a moment to start moving events - await asyncio.sleep(0.3) - - # Stop queue (should drain ring buffer) - await queue.stop() - - # Verify ring buffer was drained - unpublished = ring_buffer.fetch_unpublished(limit=100) - assert len(unpublished) == 0, "Ring buffer should be drained after stop()" - - # Verify all events reached Firestore - await asyncio.sleep(2.0) # Give buffer time to flush - stored_events = await event_store.read(stream=stream, limit=num_events) - assert len(stored_events) == num_events, ( - f"Expected {num_events} events after drain, found {len(stored_events)}" - ) - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_ring_buffer_cleanup_logic(temp_db_path): - """ - Test cleanup logic directly (not background worker). - - This validates: - - Time-based cleanup removes published events older than retention_hours - - Size-based cleanup removes oldest published events when max_size exceeded - - Unpublished events are never deleted - - Note: This tests the cleanup methods directly for determinism. - Background worker lifecycle is tested separately. - """ - # Create ring buffer with short retention - ring_buffer = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=10, # Small max size - retention_hours=1, # 1 hour retention - ) - - try: - # Write and publish 15 events (exceeds max_size of 10) - for i in range(15): - event = RawEvent( - payload={"type": "track", "event": f"event_{i}"}, - stream="cleanup-test", - ) - event_id = ring_buffer.write(event) - ring_buffer.mark_published([event_id]) - - # Manually backdate 5 events to be older than retention - conn = ring_buffer.conn - old_timestamp = (datetime.now(UTC) - timedelta(hours=2)).isoformat() - conn.execute( - """ - UPDATE ring_buffer - SET published_at = ? - WHERE id <= 5 - """, - (old_timestamp,), - ) - conn.commit() - - assert ring_buffer.count() == 15 - - # Run cleanup directly (deterministic, no timing issues) - # Time-based cleanup: delete 5 old events - deleted_by_time = ring_buffer.delete_old_published(max_age_hours=1) - assert deleted_by_time == 5 - - # Size-based cleanup: keep only 10 newest - deleted_by_size = ring_buffer.delete_oldest_published(keep_count=10) - # Should have 10 remaining after time cleanup, no more deletes needed - assert deleted_by_size == 0 - - # Verify only 10 events remain - remaining_count = ring_buffer.count() - assert remaining_count == 10, f"Expected 10 events, found {remaining_count}" - - finally: - ring_buffer.close() - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_cleanup_worker_lifecycle(temp_db_path): - """ - Test that cleanup worker starts and stops cleanly. - - This validates: - - Worker can start and enters running state - - Worker can stop gracefully - - Worker thread terminates properly - """ - ring_buffer = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=1000, - retention_hours=24, - ) - - cleanup = RingBufferCleanup( - ring_buffer=ring_buffer, - cleanup_interval=0.1, # Fast for testing - ) - - try: - # Test start - cleanup.start() - assert cleanup._running - assert cleanup._thread is not None - assert cleanup._thread.is_alive() - - # Let it run for a few cycles - time.sleep(0.3) - - # Test stop - cleanup.stop(timeout=2.0) - assert not cleanup._running - assert not cleanup._thread.is_alive() - - finally: - ring_buffer.close() - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_publisher_worker_lifecycle(temp_db_path, processor): - """ - Test that publisher worker starts and stops cleanly. - - This validates: - - Worker can start and enters running state - - Worker can stop gracefully - - Worker thread terminates properly - """ - ring_buffer = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=1000, - retention_hours=24, - ) - - # Create queue with ring buffer and publisher - queue = AsyncQueue( - processor=processor, - ring_buffer=ring_buffer, - num_workers=1, - publisher_batch_size=10, - publisher_poll_interval=0.1, - cleanup_interval=0.1, - ) - - try: - # Test start (starts publisher and cleanup workers) - await queue.start() - assert queue._publisher is not None - assert queue._publisher._running - assert queue._publisher._thread.is_alive() - - # Let it run briefly - await asyncio.sleep(0.3) - - # Test stop (stops publisher gracefully) - await queue.stop() - assert not queue._publisher._running - assert not queue._publisher._thread.is_alive() - - finally: - ring_buffer.close() - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_ring_buffer_durability_on_crash(temp_db_path, event_store, error_store): - """ - Test that events in ring buffer survive a "crash" (restart). - - This validates: - - Events written to ring buffer are durable (SQLite WAL) - - Restarting with same ring buffer recovers unpublished events - - Publisher can resume from where it left off - """ - stream = "crash-test" - num_events = 10 - - # Phase 1: Write events to ring buffer - ring_buffer_1 = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=1000, - retention_hours=24, - ) - - # Create first processor for first queue - adapter_1 = SegmentSchemaAdapter() - sequencer_1 = HashSequencer(num_partitions=4) - event_loader_1 = EventLoader( - event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 - ) - processor_1 = Processor( - adapter=adapter_1, - sequencer=sequencer_1, - event_loader=event_loader_1, - error_store=error_store, - ) - - queue_1 = AsyncQueue( - processor=processor_1, - ring_buffer=ring_buffer_1, - num_workers=1, - publisher_batch_size=3, # Small batch to leave some unpublished - publisher_poll_interval=1.0, # Slow polling to keep events in ring buffer - ) - - await queue_1.start() - - # Enqueue events - events = [ - RawEvent( - payload={"type": "track", "event": f"crash_event_{i}"}, - stream=stream, - ) - for i in range(num_events) - ] - - for event in events: - await queue_1.enqueue(event) - - # Wait briefly but stop before all events are published - # Publisher polls every 1s, so stopping after 0.1s ensures events remain unpublished - await asyncio.sleep(0.1) - - # Simulate crash (abrupt stop) - await queue_1.stop() - ring_buffer_1.close() - - # Phase 2: Restart with same ring buffer - ring_buffer_2 = SQLiteRingBuffer( - db_path=temp_db_path, - max_size=1000, - retention_hours=24, - ) - - # Verify some events are still unpublished - unpublished_before = ring_buffer_2.fetch_unpublished(limit=100) - assert len(unpublished_before) > 0, "Should have unpublished events after crash" - - # Create second processor for second queue - adapter_2 = SegmentSchemaAdapter() - sequencer_2 = HashSequencer(num_partitions=4) - event_loader_2 = EventLoader( - event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 - ) - processor_2 = Processor( - adapter=adapter_2, - sequencer=sequencer_2, - event_loader=event_loader_2, - error_store=error_store, - ) - - # Create new queue with recovered ring buffer - queue_2 = AsyncQueue( - processor=processor_2, - ring_buffer=ring_buffer_2, - num_workers=1, - publisher_batch_size=5, - publisher_poll_interval=0.05, - cleanup_interval=0.1, - ) - - await queue_2.start() - - # Wait for publisher to process recovered events - await asyncio.sleep(2.0) - - # Verify all events were eventually published - unpublished_after = ring_buffer_2.fetch_unpublished(limit=100) - assert len(unpublished_after) == 0, "All recovered events should be published" - - # Verify events reached Firestore - await asyncio.sleep(1.0) - stored_events = await event_store.read(stream=stream, limit=num_events) - assert len(stored_events) == num_events, ( - f"Expected {num_events} events after recovery, found {len(stored_events)}" - ) - - await queue_2.stop() - ring_buffer_2.close() - - -@pytest.mark.asyncio -@pytest.mark.integration -async def test_high_throughput_ring_buffer(queue_with_ring_buffer, ring_buffer, event_store): - """ - Test ring buffer handles high throughput without blocking. - - This validates: - - Ring buffer write is fast (doesn't block API) - - Publisher keeps up with high event rate - - No events are lost under load - """ - stream = "throughput-test" - num_events = 100 - - # Measure enqueue time - start = time.time() - - events = [ - RawEvent( - payload={"type": "track", "event": f"throughput_{i}"}, - stream=stream, - ) - for i in range(num_events) - ] - - for event in events: - await queue_with_ring_buffer.enqueue(event) - - enqueue_duration = time.time() - start - - # Enqueue should be fast (< 1 second for 100 events) - assert enqueue_duration < 1.0, f"Enqueue took {enqueue_duration}s, should be < 1s" - - # Wait for processing - await asyncio.sleep(3.0) - - # Verify all events were published - unpublished = ring_buffer.fetch_unpublished(limit=200) - assert len(unpublished) == 0 - - # Verify events reached Firestore - await asyncio.sleep(1.0) - stored_events = await event_store.read(stream=stream, limit=num_events) - assert len(stored_events) == num_events diff --git a/tests/unit/api/test_dependencies.py b/tests/unit/api/test_dependencies.py index b19ab5e..e0d5d94 100644 --- a/tests/unit/api/test_dependencies.py +++ b/tests/unit/api/test_dependencies.py @@ -2,40 +2,21 @@ from unittest.mock import patch -import pytest - from eventkit.api.dependencies import ( get_event_store, get_settings, get_warehouse_loader, ) from eventkit.loaders.bigquery_loader import BigQueryLoader -from eventkit.stores.firestore import FirestoreEventStore from eventkit.stores.gcs import GCSEventStore class TestGetEventStore: """Test EventStore factory.""" - def test_firestore_mode(self, monkeypatch): - """Test Firestore mode returns FirestoreEventStore.""" - monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "firestore") - - # Clear lru_cache - get_settings.cache_clear() - get_event_store.cache_clear() - - # Mock firestore.Client to avoid authentication - with patch("eventkit.stores.firestore.firestore.Client"): - event_store = get_event_store() - - assert isinstance(event_store, FirestoreEventStore) - - def test_gcs_mode(self, monkeypatch): - """Test GCS mode returns GCSEventStore.""" + def test_returns_gcs_event_store(self, monkeypatch): + """Test get_event_store returns GCSEventStore.""" monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "gcs") monkeypatch.setenv("GCP_GCS_BUCKET", "test-bucket") # Clear lru_cache @@ -49,41 +30,13 @@ def test_gcs_mode(self, monkeypatch): assert isinstance(event_store, GCSEventStore) assert event_store.bucket == "test-bucket" - def test_invalid_mode_raises_error(self, monkeypatch): - """Test invalid mode raises ValueError.""" - monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "invalid") - - # Clear lru_cache - get_settings.cache_clear() - get_event_store.cache_clear() - - with pytest.raises(ValueError, match="Invalid EVENTKIT_EVENT_STORE"): - get_event_store() - - def test_default_is_firestore(self, monkeypatch): - """Test default storage backend is Firestore.""" - monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - # Don't set EVENTKIT_EVENT_STORE (should default to firestore) - - # Clear lru_cache - get_settings.cache_clear() - get_event_store.cache_clear() - - # Mock firestore.Client to avoid authentication - with patch("eventkit.stores.firestore.firestore.Client"): - event_store = get_event_store() - - assert isinstance(event_store, FirestoreEventStore) - class TestGetWarehouseLoader: """Test WarehouseLoader factory.""" - def test_gcs_mode_enabled_returns_loader(self, monkeypatch): - """Test GCS mode with loader enabled returns BigQueryLoader.""" + def test_warehouse_enabled_returns_loader(self, monkeypatch): + """Test warehouse loader enabled returns BigQueryLoader.""" monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "gcs") monkeypatch.setenv("GCP_GCS_BUCKET", "test-bucket") monkeypatch.setenv("EVENTKIT_WAREHOUSE_ENABLED", "true") @@ -102,10 +55,9 @@ def test_gcs_mode_enabled_returns_loader(self, monkeypatch): assert isinstance(loader, BigQueryLoader) assert loader.bucket == "test-bucket" - def test_gcs_mode_disabled_returns_none(self, monkeypatch): - """Test GCS mode with loader disabled returns None.""" + def test_warehouse_disabled_returns_none(self, monkeypatch): + """Test warehouse loader disabled returns None.""" monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "gcs") monkeypatch.setenv("EVENTKIT_WAREHOUSE_ENABLED", "false") # Clear lru_cache @@ -115,16 +67,3 @@ def test_gcs_mode_disabled_returns_none(self, monkeypatch): loader = get_warehouse_loader() assert loader is None - - def test_firestore_mode_returns_none(self, monkeypatch): - """Test Firestore mode returns None (no loader).""" - monkeypatch.setenv("GCP_PROJECT_ID", "test-project") - monkeypatch.setenv("EVENTKIT_EVENT_STORE", "firestore") - - # Clear lru_cache - get_settings.cache_clear() - get_warehouse_loader.cache_clear() - - loader = get_warehouse_loader() - - assert loader is None diff --git a/tests/unit/queues/test_pubsub.py b/tests/unit/queues/test_pubsub.py index 66066b2..2f7d2b2 100644 --- a/tests/unit/queues/test_pubsub.py +++ b/tests/unit/queues/test_pubsub.py @@ -160,7 +160,7 @@ def test_nacks_on_decode_error( ): """Test callback nacks message on decode error.""" queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) - queue.loop = asyncio.get_event_loop() + queue.loop = asyncio.new_event_loop() # Create invalid message message = MagicMock() diff --git a/tests/unit/ring_buffer/test_publisher.py b/tests/unit/ring_buffer/test_publisher.py index 49c3c2b..b0b4e67 100644 --- a/tests/unit/ring_buffer/test_publisher.py +++ b/tests/unit/ring_buffer/test_publisher.py @@ -3,7 +3,7 @@ import asyncio import time from datetime import UTC, datetime -from unittest.mock import AsyncMock, MagicMock, Mock +from unittest.mock import MagicMock, Mock import pytest @@ -25,7 +25,12 @@ def mock_ring_buffer(): def mock_queue(): """Create mock queue.""" queue = Mock() - queue.enqueue = AsyncMock() # Use AsyncMock for async methods + + # Create async function that can be tracked + async def mock_enqueue(event): + pass + + queue.enqueue = Mock(side_effect=mock_enqueue) return queue diff --git a/tests/unit/stores/test_firestore.py b/tests/unit/stores/test_firestore.py deleted file mode 100644 index f918488..0000000 --- a/tests/unit/stores/test_firestore.py +++ /dev/null @@ -1,520 +0,0 @@ -"""Unit tests for Firestore storage implementation.""" - -from datetime import UTC, datetime -from unittest.mock import Mock, patch - -import pytest -from google.cloud import firestore - -from eventkit.schema.events import TrackEvent -from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore - - -def _mock_firestore_doc_ref(mock_client_class, mock_doc_ref): - """Helper to set up deeply nested mock chain for Firestore document refs.""" - ( - mock_client_class.return_value.collection.return_value.document.return_value.collection.return_value.document.return_value - ) = mock_doc_ref - - -class TestFirestoreEventStoreSetup: - """Tests for FirestoreEventStore initialization and helper methods.""" - - @patch("eventkit.stores.firestore.firestore.Client") - def test_init_creates_client(self, mock_client_class): - """FirestoreEventStore should initialize Firestore client.""" - store = FirestoreEventStore(project_id="test-project") - - mock_client_class.assert_called_once_with(project="test-project", database="default") - assert store.db == mock_client_class.return_value - - @patch("eventkit.stores.firestore.firestore.Client") - def test_init_with_custom_database(self, mock_client_class): - """FirestoreEventStore should accept custom database name.""" - FirestoreEventStore(project_id="test-project", database="custom-db") - - mock_client_class.assert_called_once_with(project="test-project", database="custom-db") - - -class TestFirestoreHelperMethods: - """Tests for serialization and path construction helpers.""" - - @patch("eventkit.stores.firestore.firestore.Client") - def test_get_doc_ref_constructs_correct_path(self, mock_client_class): - """_get_doc_ref should construct events/{stream}/items/{id} path.""" - store = FirestoreEventStore(project_id="test") - event = TrackEvent( - user_id="user123", - event_name="test_event", - event_id="event-123", - timestamp=datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC), - ) - # Add stream attribute - event.stream = "mobile" - - mock_db = mock_client_class.return_value - mock_collection = Mock() - mock_stream_doc = Mock() - mock_items_collection = Mock() - mock_doc = Mock() - - mock_db.collection.return_value = mock_collection - mock_collection.document.return_value = mock_stream_doc - mock_stream_doc.collection.return_value = mock_items_collection - mock_items_collection.document.return_value = mock_doc - - result = store._get_doc_ref(event) - - # Verify correct path construction - mock_db.collection.assert_called_once_with("events") - mock_collection.document.assert_called_once_with("mobile") - mock_stream_doc.collection.assert_called_once_with("items") - expected_doc_id = "2026-01-10T12:00:00+00:00_event-123" - mock_items_collection.document.assert_called_once_with(expected_doc_id) - assert result == mock_doc - - @patch("eventkit.stores.firestore.firestore.Client") - def test_get_doc_ref_uses_default_stream(self, mock_client_class): - """_get_doc_ref should use 'default' stream if none specified.""" - store = FirestoreEventStore(project_id="test") - event = TrackEvent( - user_id="user123", - event_name="test_event", - ) - # No stream attribute - - mock_db = mock_client_class.return_value - mock_collection = Mock() - mock_stream_doc = Mock() - - mock_db.collection.return_value = mock_collection - mock_collection.document.return_value = mock_stream_doc - mock_stream_doc.collection.return_value = Mock() - - store._get_doc_ref(event) - - # Should use "default" stream - mock_collection.document.assert_called_once_with("default") - - @patch("eventkit.stores.firestore.firestore.Client") - def test_event_to_dict_serializes_correctly(self, mock_client_class): - """_event_to_dict should convert event to Firestore-compatible dict.""" - store = FirestoreEventStore(project_id="test") - event = TrackEvent( - user_id="user123", - event_name="Button Clicked", - properties={"button_id": "submit"}, - timestamp=datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC), - ) - - result = store._event_to_dict(event) - - # Check timestamp is ISO string - assert isinstance(result["timestamp"], str) - assert result["timestamp"] == "2026-01-10T12:00:00+00:00" - - # Check other fields preserved - assert result["user_id"] == "user123" - assert result["event_name"] == "Button Clicked" - assert result["event_type"] == "track" - assert result["properties"] == {"button_id": "submit"} - - @patch("eventkit.stores.firestore.firestore.Client") - def test_event_to_dict_handles_none_user_id(self, mock_client_class): - """_event_to_dict should handle events with no user_id.""" - store = FirestoreEventStore(project_id="test") - event = TrackEvent( - event_name="Anonymous Event", - ) - - result = store._event_to_dict(event) - - assert result["user_id"] is None - assert result["event_name"] == "Anonymous Event" - - -class TestFirestoreEventStoreWrite: - """Tests for write operations (store, store_batch).""" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_store_writes_single_event(self, mock_client_class): - """store() should write event to correct path.""" - store = FirestoreEventStore(project_id="test") - event = TrackEvent( - user_id="user123", - event_name="test_event", - stream="mobile", - ) - - # Mock the document reference chain - mock_doc_ref = Mock() - _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) - - await store.store(event) - - # Verify set was called - mock_doc_ref.set.assert_called_once() - call_args = mock_doc_ref.set.call_args[0][0] - assert call_args["user_id"] == "user123" - assert call_args["event_name"] == "test_event" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_store_batch_chunks_correctly(self, mock_client_class): - """store_batch() should chunk >500 events into multiple batches.""" - store = FirestoreEventStore(project_id="test") - - # Create 501 events (should create 2 batches: 500 + 1) - events = [TrackEvent(user_id=f"user{i}", event_name=f"event{i}") for i in range(501)] - - mock_batch = Mock() - mock_client_class.return_value.batch.return_value = mock_batch - - # Mock document reference - mock_doc_ref = Mock() - _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) - - await store.store_batch(events) - - # Should have created 2 batches - assert mock_client_class.return_value.batch.call_count == 2 - - # Should have committed 2 batches - assert mock_batch.commit.call_count == 2 - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_store_batch_calls_set_for_each_event(self, mock_client_class): - """store_batch() should call batch.set() for each event.""" - store = FirestoreEventStore(project_id="test") - events = [ - TrackEvent(user_id="user1", event_name="event1"), - TrackEvent(user_id="user2", event_name="event2"), - ] - - mock_batch = Mock() - mock_client_class.return_value.batch.return_value = mock_batch - - mock_doc_ref = Mock() - _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) - - await store.store_batch(events) - - # Should call batch.set() twice (once per event) - assert mock_batch.set.call_count == 2 - # Should commit once (only 2 events) - mock_batch.commit.assert_called_once() - - -class TestFirestoreRetryLogic: - """Tests for retry logic with transient errors.""" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_retry_on_deadline_exceeded(self, mock_client_class): - """_write_with_retry should retry on DeadlineExceeded.""" - from google.api_core.exceptions import DeadlineExceeded - - store = FirestoreEventStore(project_id="test") - event = TrackEvent(user_id="user1", event_name="event1") - - mock_doc_ref = Mock() - _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) - - # Fail twice, then succeed - mock_doc_ref.set.side_effect = [ - DeadlineExceeded("timeout"), - DeadlineExceeded("timeout"), - None, # Success - ] - - await store.store(event) - - # Should have retried 3 times total - assert mock_doc_ref.set.call_count == 3 - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_retry_exhaustion_raises_error(self, mock_client_class): - """_write_with_retry should raise after 3 failed attempts.""" - from google.api_core.exceptions import ServiceUnavailable - from tenacity import RetryError - - store = FirestoreEventStore(project_id="test") - event = TrackEvent(user_id="user1", event_name="event1") - - mock_doc_ref = Mock() - _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) - - # Always fail - mock_doc_ref.set.side_effect = ServiceUnavailable("service down") - - with pytest.raises(RetryError): - await store.store(event) - - # Should have tried 3 times - assert mock_doc_ref.set.call_count == 3 - - -class TestFirestoreEventStoreRead: - """Tests for read operations.""" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_read_returns_events_from_stream(self, mock_client_class): - """read() should query correct stream and deserialize events.""" - store = FirestoreEventStore(project_id="test") - - # Mock Firestore query chain - mock_stream = Mock() - mock_limit = Mock() - mock_order = Mock() - mock_collection = Mock() - mock_doc = Mock() - mock_items = Mock() - - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.document.return_value = mock_doc - mock_doc.collection.return_value = mock_items - mock_items.order_by.return_value = mock_order - mock_order.limit.return_value = mock_limit - mock_limit.stream.return_value = mock_stream - - # Mock document data - mock_event_doc = Mock() - mock_event_doc.to_dict.return_value = { - "event_id": "evt-1", - "user_id": "user1", - "event_name": "test_event", - "event_type": "track", - "timestamp": "2026-01-10T12:00:00+00:00", - "properties": {}, - "anonymous_id": None, - "stream": None, - } - mock_stream.__iter__ = Mock(return_value=iter([mock_event_doc])) - - events = await store.read(stream="mobile", limit=50) - - # Verify correct path - mock_collection.document.assert_called_once_with("mobile") - mock_items.order_by.assert_called_once_with( - "timestamp", direction=firestore.Query.DESCENDING - ) - mock_order.limit.assert_called_once_with(50) - - # Verify deserialization - assert len(events) == 1 - assert events[0].event_name == "test_event" - assert events[0].user_id == "user1" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_read_returns_empty_list_for_no_events(self, mock_client_class): - """read() should return empty list when stream has no events.""" - store = FirestoreEventStore(project_id="test") - - # Mock empty query result - mock_stream = Mock() - mock_limit = Mock() - mock_order = Mock() - mock_collection = Mock() - mock_doc = Mock() - mock_items = Mock() - - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.document.return_value = mock_doc - mock_doc.collection.return_value = mock_items - mock_items.order_by.return_value = mock_order - mock_order.limit.return_value = mock_limit - mock_limit.stream.return_value = mock_stream - - # Empty stream - mock_stream.__iter__ = Mock(return_value=iter([])) - - events = await store.read(stream="mobile", limit=100) - - assert events == [] - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_dict_to_event_handles_identify(self, mock_client_class): - """_dict_to_event should deserialize identify events.""" - store = FirestoreEventStore(project_id="test") - - data = { - "event_id": "evt-1", - "user_id": "user1", - "event_type": "identify", - "timestamp": "2026-01-10T12:00:00+00:00", - "traits": {"email": "test@example.com"}, - "properties": {}, - "anonymous_id": None, - "stream": None, - } - - event = store._dict_to_event(data) - - assert event.event_type == "identify" - assert event.user_id == "user1" - assert event.traits == {"email": "test@example.com"} - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_dict_to_event_handles_page(self, mock_client_class): - """_dict_to_event should deserialize page events.""" - store = FirestoreEventStore(project_id="test") - - data = { - "event_id": "evt-1", - "user_id": "user1", - "event_type": "page", - "name": "Home Page", - "timestamp": "2026-01-10T12:00:00+00:00", - "properties": {}, - "anonymous_id": None, - "stream": None, - } - - event = store._dict_to_event(data) - - assert event.event_type == "page" - assert event.name == "Home Page" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_dict_to_event_raises_on_unknown_type(self, mock_client_class): - """_dict_to_event should raise ValueError for unknown event_type.""" - store = FirestoreEventStore(project_id="test") - - data = { - "event_id": "evt-1", - "event_type": "unknown_type", - "timestamp": "2026-01-10T12:00:00+00:00", - } - - with pytest.raises(ValueError, match="Unknown event_type: unknown_type"): - store._dict_to_event(data) - - -class TestFirestoreErrorStore: - """Tests for error storage (dead letter queue).""" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_store_error_writes_to_errors_collection(self, mock_client_class): - """store_error() should write error doc to errors collection.""" - store = FirestoreErrorStore(project_id="test") - - mock_doc_ref = Mock() - mock_collection = Mock() - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.document.return_value = mock_doc_ref - - payload = {"userId": "123", "type": "track"} - error = "Validation failed: missing event name" - timestamp = datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC) - metadata = {"stream": "mobile"} - - await store.store_error(payload, error, timestamp, metadata) - - # Verify correct collection - mock_collection.document.assert_called_once() - doc_id = mock_collection.document.call_args[0][0] - assert doc_id.startswith("2026-01-10T12:00:00+00:00_") - - # Verify error data structure - mock_doc_ref.set.assert_called_once() - call_data = mock_doc_ref.set.call_args[0][0] - assert call_data["payload"] == payload - assert call_data["error"] == error - assert call_data["stream"] == "mobile" - assert call_data["metadata"] == metadata - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_store_error_handles_none_metadata(self, mock_client_class): - """store_error() should handle None metadata gracefully.""" - store = FirestoreErrorStore(project_id="test") - - mock_doc_ref = Mock() - mock_collection = Mock() - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.document.return_value = mock_doc_ref - - payload = {"userId": "123"} - error = "Some error" - timestamp = datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC) - - await store.store_error(payload, error, timestamp, metadata=None) - - # Verify stream is None and metadata is empty dict - call_data = mock_doc_ref.set.call_args[0][0] - assert call_data["stream"] is None - assert call_data["metadata"] == {} - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_query_errors_returns_error_list(self, mock_client_class): - """query_errors() should return list of error documents.""" - store = FirestoreErrorStore(project_id="test") - - # Mock query chain - mock_stream = Mock() - mock_limit = Mock() - mock_order = Mock() - mock_collection = Mock() - - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.order_by.return_value = mock_order - mock_order.limit.return_value = mock_limit - mock_limit.stream.return_value = mock_stream - - # Mock error document - mock_error_doc = Mock() - mock_error_doc.to_dict.return_value = { - "payload": {"userId": "123"}, - "error": "Validation failed", - "timestamp": "2026-01-10T12:00:00+00:00", - "stream": "mobile", - "metadata": {}, - } - mock_stream.__iter__ = Mock(return_value=iter([mock_error_doc])) - - errors = await store.query_errors(limit=50) - - # Verify correct query - mock_collection.order_by.assert_called_once_with( - "timestamp", direction=firestore.Query.DESCENDING - ) - mock_order.limit.assert_called_once_with(50) - - # Verify returned data - assert len(errors) == 1 - assert errors[0]["payload"] == {"userId": "123"} - assert errors[0]["error"] == "Validation failed" - - @pytest.mark.asyncio - @patch("eventkit.stores.firestore.firestore.Client") - async def test_query_errors_returns_empty_list(self, mock_client_class): - """query_errors() should return empty list when no errors.""" - store = FirestoreErrorStore(project_id="test") - - # Mock empty query result - mock_stream = Mock() - mock_limit = Mock() - mock_order = Mock() - mock_collection = Mock() - - mock_client_class.return_value.collection.return_value = mock_collection - mock_collection.order_by.return_value = mock_order - mock_order.limit.return_value = mock_limit - mock_limit.stream.return_value = mock_stream - - # Empty stream - mock_stream.__iter__ = Mock(return_value=iter([])) - - errors = await store.query_errors(limit=100) - - assert errors == [] diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 4fb6c4f..e75273e 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -23,12 +23,8 @@ def test_settings_default_values(clean_env, monkeypatch): settings = Settings() - # Storage - assert settings.EVENTKIT_EVENT_STORE == "firestore" - - # Firestore + # GCP assert settings.GCP_PROJECT_ID == "test-project" - assert settings.FIRESTORE_DATABASE == "default" # GCS + BigQuery assert settings.GCP_GCS_BUCKET == "eventkit-events" @@ -80,7 +76,6 @@ def test_settings_default_values(clean_env, monkeypatch): def test_settings_from_environment(clean_env, monkeypatch): """Test that settings can be overridden via environment variables.""" monkeypatch.setenv("GCP_PROJECT_ID", "prod-project") - monkeypatch.setenv("FIRESTORE_DATABASE", "production") monkeypatch.setenv("GCP_GCS_BUCKET", "prod-events") monkeypatch.setenv("GCP_BIGQUERY_DATASET", "prod_events") monkeypatch.setenv("GCP_BIGQUERY_TABLE", "events") @@ -100,7 +95,6 @@ def test_settings_from_environment(clean_env, monkeypatch): settings = Settings() assert settings.GCP_PROJECT_ID == "prod-project" - assert settings.FIRESTORE_DATABASE == "production" assert settings.GCP_GCS_BUCKET == "prod-events" assert settings.GCP_BIGQUERY_DATASET == "prod_events" assert settings.GCP_BIGQUERY_TABLE == "events" diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py new file mode 100644 index 0000000..bf2d201 --- /dev/null +++ b/tests/unit/test_metrics.py @@ -0,0 +1,68 @@ +"""Tests for Prometheus metrics.""" + +from eventkit.metrics import ( + create_metrics_app, + init_metrics, + set_component_health, + update_uptime, +) + + +def test_init_metrics(): + """Test metrics initialization.""" + # Should not raise + init_metrics() + + +def test_update_uptime(): + """Test uptime gauge updates.""" + # Should not raise + update_uptime() + + +def test_set_component_health(): + """Test setting component health.""" + # Should not raise + set_component_health("api", True) + set_component_health("storage", False) + + +def test_metrics_app_creation(): + """Test metrics FastAPI app creation.""" + app = create_metrics_app() + assert app is not None + assert app.title == "EventKit Metrics" + + +def test_metrics_health_endpoint(): + """Test metrics server health endpoint.""" + from fastapi.testclient import TestClient + + app = create_metrics_app() + client = TestClient(app) + + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_metrics_endpoint_returns_prometheus_format(): + """Test /metrics endpoint returns Prometheus format.""" + from fastapi.testclient import TestClient + + init_metrics() + app = create_metrics_app() + client = TestClient(app) + + response = client.get("/metrics") + assert response.status_code == 200 + + # Check for Prometheus format markers + text = response.text + assert "# HELP" in text + assert "# TYPE" in text + + # Check for our metrics + assert "eventkit_info" in text + assert "eventkit_uptime_seconds" in text + assert "eventkit_component_healthy" in text diff --git a/uv.lock b/uv.lock index ade4ca7..f24d6b1 100644 --- a/uv.lock +++ b/uv.lock @@ -258,6 +258,7 @@ dependencies = [ { name = "google-cloud-pubsub" }, { name = "google-cloud-storage" }, { name = "pandas" }, + { name = "prometheus-client" }, { name = "pyarrow" }, { name = "pydantic" }, { name = "pydantic-settings" }, @@ -299,6 +300,7 @@ requires-dist = [ { name = "httpx", marker = "extra == 'dev'", specifier = ">=0.25.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.7.0" }, { name = "pandas", specifier = ">=2.1.0" }, + { name = "prometheus-client", specifier = ">=0.20.0" }, { name = "pyarrow", specifier = ">=14.0.0" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "pydantic-settings", specifier = ">=2.0.0" }, @@ -965,6 +967,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.24.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f0/58/a794d23feb6b00fc0c72787d7e87d872a6730dd9ed7c7b3e954637d8f280/prometheus_client-0.24.1.tar.gz", hash = "sha256:7e0ced7fbbd40f7b84962d5d2ab6f17ef88a72504dcf7c0b40737b43b2a461f9", size = 85616, upload-time = "2026-01-14T15:26:26.965Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/74/c3/24a2f845e3917201628ecaba4f18bab4d18a337834c1df2a159ee9d22a42/prometheus_client-0.24.1-py3-none-any.whl", hash = "sha256:150db128af71a5c2482b36e588fc8a6b95e498750da4b17065947c16070f4055", size = 64057, upload-time = "2026-01-14T15:26:24.42Z" }, +] + [[package]] name = "proto-plus" version = "1.27.0"