Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 52 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Event ingestion and processing primitives for Python.
- **Flexible ingestion** - Accept any JSON payload with Segment-compatible API
- **Stream-based routing** - Separate processing pipelines by event type for isolation and scalability
- **Adapter pattern** - Pluggable validators for multiple event formats and sources
- **Pluggable storage** - Write to GCS + BigQuery (default), Firestore, or implement custom backends
- **Pluggable storage** - Write to GCS + BigQuery (default) or implement custom backends
- **Error handling** - Built-in dead letter queue for validation failures and retries
- **Type-safe** - Full Pydantic v2 validation with strict typing throughout
- **Async-first** - Built on FastAPI with async/await for high throughput
Expand Down Expand Up @@ -46,7 +46,6 @@ settings = Settings(
gcp_gcs_bucket="your-events-bucket",
gcp_bigquery_dataset="events",
gcp_bigquery_table="raw_events",
eventkit_event_store="gcs" # or "firestore" for Firestore mode
)

# Add eventkit routes
Expand Down Expand Up @@ -84,7 +83,7 @@ curl -X POST http://localhost:8000/api/v1/identify \
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Processing Pipeline
│ Processing Pipeline │
│ • Adapters - Validate & normalize to typed events │
│ • Validators - Composable field checks │
│ • Sequencer - Hash-based routing for consistency │
Expand All @@ -94,9 +93,8 @@ curl -X POST http://localhost:8000/api/v1/identify \
┌─────────────────────────────────────────────────────────┐
│ Storage Layer (Pluggable) │
│ • GCS + BigQuery - Production data warehouse (default) │
│ • Firestore - Managed, serverless (dev/testing) │
│ • Custom - Implement EventStore protocol │
│ │
│ Warehouse Loader (Background Service) │
│ • BigQueryLoader - Batch load GCS → BigQuery │
│ • Custom - Implement WarehouseLoader protocol │
Expand All @@ -115,6 +113,43 @@ curl -X POST http://localhost:8000/api/v1/identify \
| **Event Store** | Persist events to storage | Interface for multiple backends |
| **Error Store** | Dead letter queue for failures | Never lose data, debug later |
| **Warehouse Loader** | Load events to data warehouse | Background service for batch loading |
| **EventSubscriptionCoordinator** | Pub/Sub subscription management | Enables dual-path architecture |

---

### Dual-Path Architecture (Advanced)

For applications requiring both real-time processing and cost-optimized storage, `eventkit` supports consuming the same event stream through multiple coordinators:

```python
from eventkit.streaming import EventSubscriptionCoordinator, EventCoordinatorManager

# Path 1: Real-time processing (<1s latency)
realtime_coordinator = EventSubscriptionCoordinator(
project_id="my-project",
subscription="events-realtime",
handler=process_immediately, # e.g., profile updates
)

# Path 2: Batched archival (5-10 min latency, cost-optimized)
archive_coordinator = EventSubscriptionCoordinator(
project_id="my-project",
subscription="events-archive",
handler=event_loader.add, # Batches to GCS
)

# Manage both
manager = EventCoordinatorManager([
realtime_coordinator,
archive_coordinator,
])

await manager.start_all()
```

**Use cases**:
- Real-time: Profile updates, personalization, live dashboards
- Archive: Analytics, compliance, data warehousing

## Design Philosophy

Expand Down Expand Up @@ -305,17 +340,6 @@ python -m scripts.run_bigquery_loader

See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details.

### Firestore (Development/Testing)

Managed, serverless NoSQL database. Good for development and moderate throughput.

```python
settings = Settings(
gcp_project_id="my-project",
eventkit_event_store="firestore",
)
```

### Custom Storage

Implement the `EventStore` protocol for any backend:
Expand Down Expand Up @@ -419,15 +443,15 @@ See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions.

**Quick Start:**
```bash
# Start Firestore emulator
# Start GCS emulator (for local development)
docker-compose up -d

# Install dependencies
uv sync

# Run API server
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
uv run uvicorn eventkit.api.app:app --reload

# Run tests
Expand All @@ -454,19 +478,22 @@ uv run ruff format src/
### Core (v0.x)
- [x] Composable validators (required fields, types, timestamps)
- [x] Segment-compatible adapter with ValidationPipeline
- [ ] Collection API with stream routing
- [ ] Hash-based sequencer for consistent ordering
- [ ] Firestore storage backend (in progress)
- [ ] Error handling and dead letter queue
- [ ] Structured logging
- [x] Collection API with stream routing
- [x] GCS + BigQuery storage backend
- [x] Ring buffer with Write-Ahead Log
- [x] Pub/Sub queue integration
- [x] Event batching and loading
- [x] Prometheus metrics
- [x] EventSubscriptionCoordinator (dual-path architecture)
- [x] Hash-based sequencer for consistent ordering
- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)
- [ ] Performance benchmarks (10k+ events/sec)

### v1.0
- [ ] ClickHouse storage backend
- [ ] Pub/Sub integration for async processing
- [ ] OpenAPI spec and generated clients
- [ ] Comprehensive examples and documentation
- [ ] Production deployment guides (Cloud Run, GKE, ECS)
- [ ] S3 + Snowflake/Redshift storage adapters

### Future Ecosystem

Expand Down
10 changes: 10 additions & 0 deletions src/eventkit/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Event streaming coordination for Pub/Sub subscriptions."""

from eventkit.streaming.coordinator import EventHandler, EventSubscriptionCoordinator
from eventkit.streaming.manager import EventCoordinatorManager

__all__ = [
"EventHandler",
"EventSubscriptionCoordinator",
"EventCoordinatorManager",
]
Loading
Loading