Event ingestion and processing kit for Python.
eventkit is a production-ready kit for building event collection pipelines. Clone it, customize it, make it yours.
Philosophy: Provide a solid starting point with battle-tested patterns, then get out of your way. Customize for your specific needs.
- Flexible ingestion - Accept any JSON payload with Segment-compatible API
- Stream-based routing - Separate processing pipelines by event type for isolation and scalability
- Adapter pattern - Pluggable validators for multiple event formats and sources
- Pluggable storage - Write to GCS + BigQuery (default) or implement custom backends
- Error handling - Built-in dead letter queue for validation failures and retries
- Type-safe - Full Pydantic v2 validation with strict typing throughout
- Async-first - Built on FastAPI with async/await for high throughput
- Cloud-native - Designed for Cloud Run, GKE, and containerized deployments
- Build a lightweight CDP without vendor lock-in
- Collect product analytics events at scale
- Centralize event data from multiple sources (web, mobile, server-side)
- Create custom event processing pipelines with your own business logic
- Prototype and validate data architectures before committing to enterprise solutions
Clone and customize:
git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline
uv syncCustomize for your needs:
from fastapi import FastAPI
from eventkit.api import router as eventkit_router
from eventkit.config import Settings
# Configure storage (GCS + BigQuery default)
settings = Settings(
gcp_project_id="your-project",
gcp_gcs_bucket="your-events-bucket",
gcp_bigquery_dataset="events",
gcp_bigquery_table="raw_events",
)
# Add eventkit routes
app = FastAPI()
app.include_router(eventkit_router, prefix="/api")
# Start collecting events
# POST /api/collect/users
# POST /api/v1/identify
# POST /api/v1/trackSend your first event:
curl -X POST http://localhost:8000/api/v1/identify \
-H "Content-Type: application/json" \
-d '{
"userId": "user_123",
"traits": {
"email": "user@example.com",
"plan": "enterprise"
}
}'┌─────────────────────────────────────────────────────────┐
│ Collection API (FastAPI) │
│ • /collect/{stream} - Flexible ingestion │
│ • /v1/identify, /v1/track - Segment-compatible │
│ • Accept any JSON, never reject at edge │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Processing Pipeline │
│ • Adapters - Validate & normalize to typed events │
│ • Validators - Composable field checks │
│ • Sequencer - Hash-based routing for consistency │
│ • EventLoader - Batch events for write efficiency │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Storage Layer (Pluggable) │
│ • GCS + BigQuery - Production data warehouse (default) │
│ • Custom - Implement EventStore protocol │
│ │
│ Warehouse Loader (Background Service) │
│ • BigQueryLoader - Batch load GCS → BigQuery │
│ • Custom - Implement WarehouseLoader protocol │
└─────────────────────────────────────────────────────────┘
| Component | Purpose | Pattern |
|---|---|---|
| Collection API | HTTP endpoints for event ingestion | Accept everything, validate downstream |
| RawEvent | Flexible container for any JSON | Schema-agnostic at ingestion |
| Adapters | Validate and normalize to typed events | Protocol-based, pluggable |
| Sequencer | Hash-based event routing | Consistent ordering per identity |
| EventLoader | Batch events before storage | Reduce write amplification |
| Event Store | Persist events to storage | Interface for multiple backends |
| Error Store | Dead letter queue for failures | Never lose data, debug later |
| Warehouse Loader | Load events to data warehouse | Background service for batch loading |
| EventSubscriptionCoordinator | Pub/Sub subscription management | Enables dual-path architecture |
For applications requiring both real-time processing and cost-optimized storage, eventkit supports consuming the same event stream through multiple coordinators:
from eventkit.streaming import EventSubscriptionCoordinator, EventCoordinatorManager
# Path 1: Real-time processing (<1s latency)
realtime_coordinator = EventSubscriptionCoordinator(
project_id="my-project",
subscription="events-realtime",
handler=process_immediately, # e.g., profile updates
)
# Path 2: Batched archival (5-10 min latency, cost-optimized)
archive_coordinator = EventSubscriptionCoordinator(
project_id="my-project",
subscription="events-archive",
handler=event_loader.add, # Batches to GCS
)
# Manage both
manager = EventCoordinatorManager([
realtime_coordinator,
archive_coordinator,
])
await manager.start_all()Use cases:
- Real-time: Profile updates, personalization, live dashboards
- Archive: Analytics, compliance, data warehousing
eventkit follows battle-tested patterns from production CDP systems:
Accept any JSON payload at the collection endpoint, then validate and normalize downstream. This approach maximizes reliability—invalid events are routed to dead letter queues rather than rejected, allowing you to iterate on validation rules without data loss.
Events are routed to named streams (users, events, pages, webhooks) for independent processing. This enables:
- Different validation rules per stream
- Independent scaling and resource allocation
- Clear separation of concerns
- Easy addition of new event sources
Every component implements a protocol/interface:
- Adapters - Bring your own validation logic
- Storage - Swap Firestore for ClickHouse, PostgreSQL, or S3
- Processors - Add custom enrichment or transformation stages
Inspired by open-source CDP architectures:
- RudderStack - Modular validator pattern
- PostHog - Modern Python stack (FastAPI, async)
- Snowplow - Schema-first validation (optional)
EventKit is a kit, not a library. Clone and make it your own:
# 1. Clone the repo
git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline
# 2. Install dependencies
uv sync
# 3. Start local dev
docker-compose up -d # GCS + PubSub emulators
uv run uvicorn eventkit.api.app:app --reload
# 4. Customize for your needs
# - Modify validation rules in src/eventkit/adapters/
# - Add custom storage backends in src/eventkit/stores/
# - Adjust queue behavior in src/eventkit/queues/
# - Make it yours!See LOCAL_DEV.md for detailed setup.
# Core collection endpoint (accepts single event or array)
POST /collect/{stream}
POST /collect # defaults to "default" stream
# Convenience endpoints (Segment-compatible, single events)
POST /v1/identify # routes to "users" stream
POST /v1/track # routes to "events" stream
POST /v1/page # routes to "pages" streamIdentify:
{
"type": "identify",
"userId": "user123",
"traits": {
"email": "user@example.com",
"name": "John Doe"
}
}Track:
{
"type": "track",
"userId": "user123",
"event": "Button Clicked",
"properties": {
"button_id": "cta-signup",
"page": "/home"
}
}eventkit uses a two-tier architecture for transforming raw events into typed, validated events:
Composable validators perform lightweight checks on raw event payloads:
from eventkit.adapters.validators import (
ValidationPipeline,
RequiredFieldsValidator,
TypeCheckValidator,
TimestampValidator,
)
# Build custom validation pipeline
pipeline = ValidationPipeline([
RequiredFieldsValidator(["type", "userId"]),
TypeCheckValidator({"type": str, "userId": str}),
TimestampValidator(),
])
is_valid, error = pipeline.validate({"type": "identify", "userId": "123"})Built-in Validators:
RequiredFieldsValidator- Ensure required fields are presentTypeCheckValidator- Validate field types (supports union types)TimestampValidator- Parse ISO 8601 and Unix timestampsValidationPipeline- Compose validators with fail-fast behavior
Schema adapters transform validated RawEvent → typed TypedEvent:
from eventkit.adapters import SegmentSchemaAdapter
from eventkit.schema.raw import RawEvent
adapter = SegmentSchemaAdapter()
raw = RawEvent(payload={"type": "identify", "userId": "123"})
result = adapter.adapt(raw)
if result.ok:
event = result.event # IdentifyEvent | TrackEvent | PageEvent
else:
error = result.error # Route to dead letter queueBuilt-in Adapters:
SegmentSchemaAdapter- Segment-compatible event spec (identify/track/page)
Future Adapters:
CustomSchemaAdapter- Dynamic per-account schemas (ineventkit-schemapackage)SnowplowAdapter- Snowplow event formatAmplitudeAdapter- Amplitude HTTP API format
Write events to Google Cloud Storage (GCS) as Parquet files, then batch load to BigQuery for analytics. This pattern provides:
- Cost efficiency: GCS storage is ~50% cheaper than BigQuery active storage
- Flexibility: Raw events available for reprocessing
- Pluggable warehouses: Bring your own warehouse (Snowflake, Redshift, etc.)
- Production-proven: Used by PostHog, RudderStack, and other CDPs
from eventkit.config import Settings
settings = Settings(
gcp_project_id="my-project",
gcp_gcs_bucket="my-events",
gcp_bigquery_dataset="events",
gcp_bigquery_table="raw_events",
eventkit_event_store="gcs", # Default
eventkit_warehouse_enabled=True, # Auto-load to BigQuery
)Setup BigQuery:
# Create tables
cd scripts/bigquery
export PROJECT_ID=my-project DATASET=events
cat create_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=falseRun Standalone Loader (optional):
# Deploy as separate service for independent scaling
python -m scripts.run_bigquery_loaderSee scripts/bigquery/README.md and specs/gcs-bigquery-storage/ for full details.
Implement the EventStore protocol for any backend:
from eventkit.stores import EventStore
class MyCustomStore(EventStore):
async def store(self, event: TypedEvent) -> None: ...
async def store_batch(self, events: list[TypedEvent]) -> None: ...
def health_check(self) -> bool: ...eventkit exposes Prometheus metrics on a dedicated port (default: 9090) for production observability.
The metrics server runs independently from the main API server, isolating monitoring traffic from production requests:
settings = Settings(
eventkit_metrics_enabled=True, # Default
eventkit_metrics_port=9090, # Default
)Access metrics:
curl http://localhost:9090/metrics # Prometheus format
curl http://localhost:9090/health # Health checkAPI Layer:
eventkit_api_requests_total- Total HTTP requests (labels:endpoint,method,status)eventkit_api_request_duration_seconds- Request latency histogram (labels:endpoint,method)
Event Processing:
eventkit_events_received_total- Events received at ingestioneventkit_events_processed_total- Events successfully processed (label:event_type)eventkit_events_failed_total- Events that failed validation (label:reason)
Storage:
eventkit_storage_bytes_written_total- Bytes written to storageeventkit_storage_files_written_total- Files written to storage (label:storage_type)
Queue & Ring Buffer:
eventkit_queue_enqueued_total- Events enqueued (label:queue_mode)eventkit_queue_dequeued_total- Events dequeued (label:queue_mode)eventkit_queue_processed_total- Events processed by workers (labels:queue_mode,result)eventkit_queue_depth- Current queue depth gauge (labels:queue_mode,partition)eventkit_ringbuffer_written_total- Events written to ring buffereventkit_ringbuffer_published_total- Events published from ring buffer (label:result)eventkit_ringbuffer_marked_published_total- Events marked as publishedeventkit_ringbuffer_size- Total ring buffer size gaugeeventkit_ringbuffer_unpublished- Unpublished events gauge
Warehouse Loader:
eventkit_warehouse_loader_files_processed_total- Files loaded to warehouseeventkit_warehouse_loader_errors_total- Loader errors
System:
eventkit_info- Version and platform infoeventkit_uptime_seconds- Process uptimeeventkit_component_health- Component health status (labels:component,status)
Example queries for building dashboards:
# API request rate
rate(eventkit_api_requests_total[5m])
# Event processing throughput
rate(eventkit_events_processed_total[5m])
# Storage write rate (bytes/sec)
rate(eventkit_storage_bytes_written_total[5m])
# Queue depth (monitor backlog)
eventkit_queue_depth
# Error rate
rate(eventkit_events_failed_total[5m]) / rate(eventkit_events_received_total[5m])
- Counter-focused - Prefer counters over gauges for aggregation
- Low cardinality labels - No unbounded values (user IDs, etc.)
- Separate server - Metrics on dedicated port isolates monitoring traffic
- Standard naming - Follow Prometheus conventions:
{namespace}_{verb_noun}_{unit}_{suffix}
See LOCAL_DEV.md for detailed local development instructions.
Quick Start:
# Start GCS emulator (for local development)
docker-compose up -d
# Install dependencies
uv sync
# Run API server
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
uv run uvicorn eventkit.api.app:app --reload
# Run tests
uv run pytest --cov=src/eventkitType check:
uv run mypy src/eventkitLint:
uv run ruff check src/Format:
uv run ruff format src/- Composable validators (required fields, types, timestamps)
- Segment-compatible adapter with ValidationPipeline
- Collection API with stream routing
- GCS + BigQuery storage backend
- Ring buffer with Write-Ahead Log
- Pub/Sub queue integration
- Event batching and loading
- Prometheus metrics
- EventSubscriptionCoordinator (dual-path architecture)
- Hash-based sequencer for consistent ordering
- Performance benchmarks (10k+ events/sec validated)
- Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)
- Comprehensive examples and use cases
- Production deployment guides (Cloud Run, GKE, ECS)
- S3 + Snowflake/Redshift storage adapters
- Nextra documentation site
As patterns stabilize, we may extract reusable components:
- eventkit-ring-buffer - SQLite WAL durability layer (could be used standalone)
- eventkit-queues - Queue abstractions (AsyncQueue, PubSub patterns)
- eventkit-validators - Composable validation framework
- eventkit-storage - Storage backend protocols and implementations
These would be pip-installable libraries while the kit remains a starting point.
We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation, your help makes eventkit better for everyone.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run the test suite (
pytest) - Submit a pull request
- Type hints required (mypy strict mode)
- Test coverage for new features
- Follow existing code style (ruff format)
- Update documentation for API changes
- Issues - Bug reports and feature requests
- Discussions - Questions and ideas
- Pull Requests - Code contributions welcome
MIT License - see LICENSE