From 96ada6833c84e8b8e82513131abe0e7d4218bcb1 Mon Sep 17 00:00:00 2001 From: prosdev Date: Wed, 14 Jan 2026 17:06:44 -0800 Subject: [PATCH] feat: add EventSubscriptionCoordinator for dual-path architecture Add new streaming package with coordinator pattern: - EventSubscriptionCoordinator: Manages Pub/Sub subscription lifecycle - EventCoordinatorManager: Coordinates multiple subscriptions - EventHandler Protocol: Interface for event processing Features: - Bridge sync Pub/Sub callbacks to async handlers - Graceful start/stop with queue draining - Prometheus metrics (reusing queue metrics with coordinator labels) - Support for dual-path: real-time + batched processing Example usage: ```python # Real-time processing realtime = EventSubscriptionCoordinator( subscription="events-realtime", handler=process_immediately, ) # Batched archival archive = EventSubscriptionCoordinator( subscription="events-archive", handler=event_loader.add, ) manager = EventCoordinatorManager([realtime, archive]) await manager.start_all() ``` Tests: - 4 new unit tests (all passing) - All 242 tests passing - Fixed missing pytest-asyncio dependency Documentation: - Full docstrings with examples - README: Added dual-path architecture section - README: Removed all Firestore references - README: Updated roadmap with completed features --- README.md | 77 +++++-- src/eventkit/streaming/__init__.py | 10 + src/eventkit/streaming/coordinator.py | 281 +++++++++++++++++++++++ src/eventkit/streaming/manager.py | 67 ++++++ tests/unit/streaming/__init__.py | 0 tests/unit/streaming/test_coordinator.py | 54 +++++ tests/unit/streaming/test_manager.py | 29 +++ 7 files changed, 493 insertions(+), 25 deletions(-) create mode 100644 src/eventkit/streaming/__init__.py create mode 100644 src/eventkit/streaming/coordinator.py create mode 100644 src/eventkit/streaming/manager.py create mode 100644 tests/unit/streaming/__init__.py create mode 100644 tests/unit/streaming/test_coordinator.py create mode 100644 tests/unit/streaming/test_manager.py diff --git a/README.md b/README.md index c97f339..0fc27bd 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Event ingestion and processing primitives for Python. - **Flexible ingestion** - Accept any JSON payload with Segment-compatible API - **Stream-based routing** - Separate processing pipelines by event type for isolation and scalability - **Adapter pattern** - Pluggable validators for multiple event formats and sources -- **Pluggable storage** - Write to GCS + BigQuery (default), Firestore, or implement custom backends +- **Pluggable storage** - Write to GCS + BigQuery (default) or implement custom backends - **Error handling** - Built-in dead letter queue for validation failures and retries - **Type-safe** - Full Pydantic v2 validation with strict typing throughout - **Async-first** - Built on FastAPI with async/await for high throughput @@ -46,7 +46,6 @@ settings = Settings( gcp_gcs_bucket="your-events-bucket", gcp_bigquery_dataset="events", gcp_bigquery_table="raw_events", - eventkit_event_store="gcs" # or "firestore" for Firestore mode ) # Add eventkit routes @@ -84,7 +83,7 @@ curl -X POST http://localhost:8000/api/v1/identify \ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ -│ Processing Pipeline │ +│ Processing Pipeline │ │ • Adapters - Validate & normalize to typed events │ │ • Validators - Composable field checks │ │ • Sequencer - Hash-based routing for consistency │ @@ -94,9 +93,8 @@ curl -X POST http://localhost:8000/api/v1/identify \ ┌─────────────────────────────────────────────────────────┐ │ Storage Layer (Pluggable) │ │ • GCS + BigQuery - Production data warehouse (default) │ -│ • Firestore - Managed, serverless (dev/testing) │ │ • Custom - Implement EventStore protocol │ -│ │ +│ │ │ Warehouse Loader (Background Service) │ │ • BigQueryLoader - Batch load GCS → BigQuery │ │ • Custom - Implement WarehouseLoader protocol │ @@ -115,6 +113,43 @@ curl -X POST http://localhost:8000/api/v1/identify \ | **Event Store** | Persist events to storage | Interface for multiple backends | | **Error Store** | Dead letter queue for failures | Never lose data, debug later | | **Warehouse Loader** | Load events to data warehouse | Background service for batch loading | +| **EventSubscriptionCoordinator** | Pub/Sub subscription management | Enables dual-path architecture | + +--- + +### Dual-Path Architecture (Advanced) + +For applications requiring both real-time processing and cost-optimized storage, `eventkit` supports consuming the same event stream through multiple coordinators: + +```python +from eventkit.streaming import EventSubscriptionCoordinator, EventCoordinatorManager + +# Path 1: Real-time processing (<1s latency) +realtime_coordinator = EventSubscriptionCoordinator( + project_id="my-project", + subscription="events-realtime", + handler=process_immediately, # e.g., profile updates +) + +# Path 2: Batched archival (5-10 min latency, cost-optimized) +archive_coordinator = EventSubscriptionCoordinator( + project_id="my-project", + subscription="events-archive", + handler=event_loader.add, # Batches to GCS +) + +# Manage both +manager = EventCoordinatorManager([ + realtime_coordinator, + archive_coordinator, +]) + +await manager.start_all() +``` + +**Use cases**: +- Real-time: Profile updates, personalization, live dashboards +- Archive: Analytics, compliance, data warehousing ## Design Philosophy @@ -305,17 +340,6 @@ python -m scripts.run_bigquery_loader See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details. -### Firestore (Development/Testing) - -Managed, serverless NoSQL database. Good for development and moderate throughput. - -```python -settings = Settings( - gcp_project_id="my-project", - eventkit_event_store="firestore", -) -``` - ### Custom Storage Implement the `EventStore` protocol for any backend: @@ -419,15 +443,15 @@ See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions. **Quick Start:** ```bash -# Start Firestore emulator +# Start GCS emulator (for local development) docker-compose up -d # Install dependencies uv sync # Run API server -export FIRESTORE_EMULATOR_HOST="localhost:8080" export GCP_PROJECT_ID="test-project" +export GCP_GCS_BUCKET="test-events" uv run uvicorn eventkit.api.app:app --reload # Run tests @@ -454,19 +478,22 @@ uv run ruff format src/ ### Core (v0.x) - [x] Composable validators (required fields, types, timestamps) - [x] Segment-compatible adapter with ValidationPipeline -- [ ] Collection API with stream routing -- [ ] Hash-based sequencer for consistent ordering -- [ ] Firestore storage backend (in progress) -- [ ] Error handling and dead letter queue -- [ ] Structured logging +- [x] Collection API with stream routing +- [x] GCS + BigQuery storage backend +- [x] Ring buffer with Write-Ahead Log +- [x] Pub/Sub queue integration +- [x] Event batching and loading +- [x] Prometheus metrics +- [x] EventSubscriptionCoordinator (dual-path architecture) +- [x] Hash-based sequencer for consistent ordering +- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation) - [ ] Performance benchmarks (10k+ events/sec) ### v1.0 -- [ ] ClickHouse storage backend -- [ ] Pub/Sub integration for async processing - [ ] OpenAPI spec and generated clients - [ ] Comprehensive examples and documentation - [ ] Production deployment guides (Cloud Run, GKE, ECS) +- [ ] S3 + Snowflake/Redshift storage adapters ### Future Ecosystem diff --git a/src/eventkit/streaming/__init__.py b/src/eventkit/streaming/__init__.py new file mode 100644 index 0000000..c3db144 --- /dev/null +++ b/src/eventkit/streaming/__init__.py @@ -0,0 +1,10 @@ +"""Event streaming coordination for Pub/Sub subscriptions.""" + +from eventkit.streaming.coordinator import EventHandler, EventSubscriptionCoordinator +from eventkit.streaming.manager import EventCoordinatorManager + +__all__ = [ + "EventHandler", + "EventSubscriptionCoordinator", + "EventCoordinatorManager", +] diff --git a/src/eventkit/streaming/coordinator.py b/src/eventkit/streaming/coordinator.py new file mode 100644 index 0000000..8ae36ee --- /dev/null +++ b/src/eventkit/streaming/coordinator.py @@ -0,0 +1,281 @@ +""" +Event Subscription Coordinator for managing Pub/Sub subscriptions. + +Coordinates the lifecycle of consuming events from Google Cloud Pub/Sub, +bridging synchronous callbacks to async event processing. +""" + +import asyncio +import json +import logging +from typing import Protocol + +from google.cloud import pubsub_v1 # type: ignore[attr-defined] +from google.cloud.pubsub_v1.subscriber.message import Message + +from eventkit.queues.metrics import ( + eventkit_queue_depth, + eventkit_queue_dequeued_total, + eventkit_queue_processed_total, +) +from eventkit.schema.raw import RawEvent + +logger = logging.getLogger(__name__) + + +class EventHandler(Protocol): + """ + Protocol for handlers that process events from a subscription. + + Handlers should be async functions that process a single event. + """ + + async def __call__(self, event: RawEvent) -> None: + """ + Process a single event. + + Args: + event: The raw event to process + + Raises: + Exception: Handler may raise to signal processing failure (will nack) + """ + ... + + +class EventSubscriptionCoordinator: + """ + Coordinates a Pub/Sub subscription with async event processing. + + Manages the full lifecycle of consuming events from Google Cloud Pub/Sub: + - Subscribes to a Pub/Sub subscription + - Receives messages asynchronously + - Bridges sync Pub/Sub callbacks to async handlers + - Manages ack/nack based on handler results + - Graceful shutdown with queue draining + + Example: + # Archive events to GCS (batched) + coordinator = EventSubscriptionCoordinator( + project_id="my-project", + subscription="events-archive", + handler=event_loader.add, + ) + + await coordinator.start() + # Coordinator now processing events... + await coordinator.stop() + + Example with profiles: + # Real-time profile processing + async def build_profile(event: RawEvent) -> None: + refs = extract_refs(event) + profile = await profile_store.get(refs) + updated = apply_event(profile, event) + await profile_store.save(updated) + + coordinator = EventSubscriptionCoordinator( + project_id="my-project", + subscription="events-profiles", + handler=build_profile, + ) + + await coordinator.start() + """ + + def __init__( + self, + project_id: str, + subscription: str, + handler: EventHandler, + max_workers: int = 4, + subscription_name: str | None = None, + ): + """ + Initialize the coordinator. + + Args: + project_id: Google Cloud project ID + subscription: Pub/Sub subscription name + handler: Async function to process each event + max_workers: Number of concurrent workers (default: 4) + subscription_name: Human-readable name for logging/metrics (default: subscription) + """ + self.project_id = project_id + self.subscription = subscription + self.handler = handler + self.max_workers = max_workers + self.subscription_name = subscription_name or subscription + + # Pub/Sub client + self.subscriber = pubsub_v1.SubscriberClient() + self.subscription_path = self.subscriber.subscription_path(project_id, subscription) + + # Internal state + self.loop: asyncio.AbstractEventLoop | None = None + self.streaming_pull_future = None + self.internal_queue: asyncio.Queue[tuple[RawEvent, Message]] = asyncio.Queue() + self.workers: list[asyncio.Task[None]] = [] + self.shutdown_event = asyncio.Event() + + async def start(self) -> None: + """ + Start coordinating the subscription. + + Starts the Pub/Sub subscriber and async workers. + """ + self.loop = asyncio.get_running_loop() + + # Start Pub/Sub subscriber (runs in thread pool) + self.streaming_pull_future = self.subscriber.subscribe( + self.subscription_path, + callback=self._sync_callback, + ) + + # Start async workers + self.shutdown_event.clear() + for i in range(self.max_workers): + worker = asyncio.create_task( + self._worker(i), name=f"EventCoordinator-{self.subscription_name}-{i}" + ) + self.workers.append(worker) + + logger.info( + f"EventSubscriptionCoordinator started: {self.subscription_name}", + extra={"subscription": self.subscription, "workers": self.max_workers}, + ) + + async def stop(self) -> None: + """ + Gracefully stop coordinating. + + Cancels the Pub/Sub subscriber, drains the internal queue, + and waits for all workers to finish. + """ + logger.info(f"EventSubscriptionCoordinator stopping: {self.subscription_name}") + + # Signal shutdown + self.shutdown_event.set() + + # Cancel Pub/Sub subscriber (stops receiving new messages) + if self.streaming_pull_future: + self.streaming_pull_future.cancel() + + # Wait for queue to drain (process remaining events) + await self.internal_queue.join() + logger.info(f"EventSubscriptionCoordinator: queue drained ({self.subscription_name})") + + # Cancel workers + for worker in self.workers: + if not worker.done(): + worker.cancel() + + # Wait for workers to finish + await asyncio.gather(*self.workers, return_exceptions=True) + + logger.info(f"EventSubscriptionCoordinator stopped: {self.subscription_name}") + + def _sync_callback(self, message: Message) -> None: + """ + Sync callback from Pub/Sub (runs in thread pool). + + Decodes the message and bridges to async processing. + """ + try: + # Decode message + payload = json.loads(message.data.decode("utf-8")) + event = RawEvent(**payload) + + # Bridge to async world + if self.loop: + asyncio.run_coroutine_threadsafe( + self.internal_queue.put((event, message)), self.loop + ) + + # Track message received + eventkit_queue_dequeued_total.labels( + queue_mode=f"coordinator_{self.subscription_name}" + ).inc() + else: + logger.error("Event loop not initialized, cannot enqueue message") + message.nack() + + # Track nack + eventkit_queue_processed_total.labels( + queue_mode=f"coordinator_{self.subscription_name}", + result="nack_no_loop", + ).inc() + + except Exception as e: + logger.error(f"Failed to decode message: {e}", exc_info=True) + message.nack() + + # Track decode failure + nack + eventkit_queue_processed_total.labels( + queue_mode=f"coordinator_{self.subscription_name}", + result="nack_decode_error", + ).inc() + + async def _worker(self, worker_id: int) -> None: + """ + Process events from internal queue. + + Args: + worker_id: Worker ID for logging and metrics + """ + logger.debug(f"Worker {worker_id} started ({self.subscription_name})") + + try: + while not self.shutdown_event.is_set(): + try: + # Get event with timeout (check shutdown signal periodically) + event, message = await asyncio.wait_for(self.internal_queue.get(), timeout=0.1) + + # Update queue depth gauge + depth = self.internal_queue.qsize() + eventkit_queue_depth.labels( + queue_mode=f"coordinator_{self.subscription_name}", + partition=str(worker_id), + ).set(depth) + + try: + # Process event through handler + await self.handler(event) + + # Success: ack + message.ack() + + # Track success + eventkit_queue_processed_total.labels( + queue_mode=f"coordinator_{self.subscription_name}", + result="ack_success", + ).inc() + + except Exception as e: + logger.error( + f"Worker {worker_id} handler failed ({self.subscription_name}): {e}", + exc_info=True, + ) + + # Failure: nack (will be redelivered) + message.nack() + + # Track failure + eventkit_queue_processed_total.labels( + queue_mode=f"coordinator_{self.subscription_name}", + result="nack_handler_error", + ).inc() + + finally: + # Mark task as done (for queue.join()) + self.internal_queue.task_done() + + except TimeoutError: + # No event available, check shutdown and continue + continue + + except asyncio.CancelledError: + logger.info(f"Worker {worker_id} cancelled ({self.subscription_name})") + raise + + logger.debug(f"Worker {worker_id} stopped ({self.subscription_name})") diff --git a/src/eventkit/streaming/manager.py b/src/eventkit/streaming/manager.py new file mode 100644 index 0000000..c0bd714 --- /dev/null +++ b/src/eventkit/streaming/manager.py @@ -0,0 +1,67 @@ +"""Manager for coordinating multiple event subscriptions.""" + +import asyncio +import logging + +from eventkit.streaming.coordinator import EventSubscriptionCoordinator + +logger = logging.getLogger(__name__) + + +class EventCoordinatorManager: + """ + Manages multiple EventSubscriptionCoordinators. + + Enables dual-path architecture where the same events flow through + different coordinators optimized for different purposes: + - Real-time processing (<1s latency) for user-facing features + - Batched archival (5-10 min latency) for cost-optimized storage + + Example: + # Profile processing (real-time) + profile_coordinator = EventSubscriptionCoordinator( + subscription="events-profiles", + handler=build_profile, + ) + + # Archive (batched) + archive_coordinator = EventSubscriptionCoordinator( + subscription="events-archive", + handler=event_loader.add, + ) + + # Manage both + manager = EventCoordinatorManager([ + profile_coordinator, + archive_coordinator, + ]) + + await manager.start_all() + # Both paths running independently... + await manager.stop_all() + """ + + def __init__(self, coordinators: list[EventSubscriptionCoordinator]): + """ + Initialize the manager. + + Args: + coordinators: List of coordinators to manage + """ + self.coordinators = coordinators + + async def start_all(self) -> None: + """Start all coordinators concurrently.""" + logger.info(f"Starting {len(self.coordinators)} event coordinators") + + await asyncio.gather(*[coordinator.start() for coordinator in self.coordinators]) + + logger.info(f"All {len(self.coordinators)} coordinators started") + + async def stop_all(self) -> None: + """Stop all coordinators gracefully.""" + logger.info(f"Stopping {len(self.coordinators)} event coordinators") + + await asyncio.gather(*[coordinator.stop() for coordinator in self.coordinators]) + + logger.info(f"All {len(self.coordinators)} coordinators stopped") diff --git a/tests/unit/streaming/__init__.py b/tests/unit/streaming/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/streaming/test_coordinator.py b/tests/unit/streaming/test_coordinator.py new file mode 100644 index 0000000..8f0081d --- /dev/null +++ b/tests/unit/streaming/test_coordinator.py @@ -0,0 +1,54 @@ +"""Unit tests for EventSubscriptionCoordinator.""" + +from unittest.mock import Mock, patch + +import pytest + +from eventkit.streaming.coordinator import EventSubscriptionCoordinator + + +@pytest.fixture +def mock_pubsub_client(): + """Mock Pub/Sub client to avoid authentication.""" + with patch("eventkit.streaming.coordinator.pubsub_v1.SubscriberClient") as mock: + client_instance = Mock() + client_instance.subscription_path.return_value = "projects/test/subscriptions/test-sub" + mock.return_value = client_instance + yield client_instance + + +class TestEventSubscriptionCoordinator: + """Tests for EventSubscriptionCoordinator.""" + + def test_initialization(self, mock_pubsub_client): + """Test coordinator initializes correctly.""" + + async def handler(event): + pass + + coordinator = EventSubscriptionCoordinator( + project_id="test-project", + subscription="test-sub", + handler=handler, + max_workers=4, + ) + + assert coordinator.project_id == "test-project" + assert coordinator.subscription == "test-sub" + assert coordinator.max_workers == 4 + assert coordinator.subscription_name == "test-sub" + + def test_initialization_with_custom_name(self, mock_pubsub_client): + """Test coordinator with custom subscription name.""" + + async def handler(event): + pass + + coordinator = EventSubscriptionCoordinator( + project_id="test-project", + subscription="test-sub", + handler=handler, + subscription_name="My Custom Sub", + ) + + assert coordinator.subscription_name == "My Custom Sub" diff --git a/tests/unit/streaming/test_manager.py b/tests/unit/streaming/test_manager.py new file mode 100644 index 0000000..fbd919a --- /dev/null +++ b/tests/unit/streaming/test_manager.py @@ -0,0 +1,29 @@ +"""Unit tests for EventCoordinatorManager.""" + +from eventkit.streaming.manager import EventCoordinatorManager + + +class TestEventCoordinatorManager: + """Tests for EventCoordinatorManager.""" + + def test_initialization(self): + """Test manager initializes with coordinators list.""" + coordinators = [] + + manager = EventCoordinatorManager(coordinators) + + assert manager.coordinators == coordinators + assert len(manager.coordinators) == 0 + + def test_initialization_with_coordinators(self): + """Test manager stores coordinator references.""" + from unittest.mock import Mock + + coord1 = Mock() + coord2 = Mock() + + manager = EventCoordinatorManager([coord1, coord2]) + + assert len(manager.coordinators) == 2 + assert manager.coordinators[0] == coord1 + assert manager.coordinators[1] == coord2