Skip to content

prosdevlab/eventkit

Repository files navigation

eventkit

Event ingestion and processing kit for Python.

Overview

eventkit is a production-ready kit for building event collection pipelines. Clone it, customize it, make it yours.

Philosophy: Provide a solid starting point with battle-tested patterns, then get out of your way. Customize for your specific needs.

📚 View Full Documentation

Key Features

  • Flexible ingestion - Accept any JSON payload with Segment-compatible API
  • Stream-based routing - Separate processing pipelines by event type for isolation and scalability
  • Adapter pattern - Pluggable validators for multiple event formats and sources
  • Pluggable storage - Write to GCS + BigQuery (default) or implement custom backends
  • Error handling - Built-in dead letter queue for validation failures and retries
  • Type-safe - Full Pydantic v2 validation with strict typing throughout
  • Async-first - Built on FastAPI with async/await for high throughput
  • Cloud-native - Designed for Cloud Run, GKE, and containerized deployments

Use Cases

  • Build a lightweight CDP without vendor lock-in
  • Collect product analytics events at scale
  • Centralize event data from multiple sources (web, mobile, server-side)
  • Create custom event processing pipelines with your own business logic
  • Prototype and validate data architectures before committing to enterprise solutions

Quick Start

Clone and customize:

git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline
uv sync

Customize for your needs:

from fastapi import FastAPI
from eventkit.api import router as eventkit_router
from eventkit.config import Settings

# Configure storage (GCS + BigQuery default)
settings = Settings(
    gcp_project_id="your-project",
    gcp_gcs_bucket="your-events-bucket",
    gcp_bigquery_dataset="events",
    gcp_bigquery_table="raw_events",
)

# Add eventkit routes
app = FastAPI()
app.include_router(eventkit_router, prefix="/api")

# Start collecting events
# POST /api/collect/users
# POST /api/v1/identify
# POST /api/v1/track

Send your first event:

curl -X POST http://localhost:8000/api/v1/identify \
  -H "Content-Type: application/json" \
  -d '{
    "userId": "user_123",
    "traits": {
      "email": "user@example.com",
      "plan": "enterprise"
    }
  }'

Architecture

┌─────────────────────────────────────────────────────────┐
│  Collection API (FastAPI)                               │
│  • /collect/{stream} - Flexible ingestion               │
│  • /v1/identify, /v1/track - Segment-compatible         │
│  • Accept any JSON, never reject at edge                │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│  Processing Pipeline                                    │
│  • Adapters - Validate & normalize to typed events      │
│  • Validators - Composable field checks                 │
│  • Sequencer - Hash-based routing for consistency       │
│  • EventLoader - Batch events for write efficiency      │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│  Storage Layer (Pluggable)                              │
│  • GCS + BigQuery - Production data warehouse (default) │
│  • Custom - Implement EventStore protocol               │
│                                                         │
│  Warehouse Loader (Background Service)                  │
│  • BigQueryLoader - Batch load GCS → BigQuery           │
│  • Custom - Implement WarehouseLoader protocol          │
└─────────────────────────────────────────────────────────┘

Core Components

Component Purpose Pattern
Collection API HTTP endpoints for event ingestion Accept everything, validate downstream
RawEvent Flexible container for any JSON Schema-agnostic at ingestion
Adapters Validate and normalize to typed events Protocol-based, pluggable
Sequencer Hash-based event routing Consistent ordering per identity
EventLoader Batch events before storage Reduce write amplification
Event Store Persist events to storage Interface for multiple backends
Error Store Dead letter queue for failures Never lose data, debug later
Warehouse Loader Load events to data warehouse Background service for batch loading
EventSubscriptionCoordinator Pub/Sub subscription management Enables dual-path architecture

Dual-Path Architecture (Advanced)

For applications requiring both real-time processing and cost-optimized storage, eventkit supports consuming the same event stream through multiple coordinators:

from eventkit.streaming import EventSubscriptionCoordinator, EventCoordinatorManager

# Path 1: Real-time processing (<1s latency)
realtime_coordinator = EventSubscriptionCoordinator(
    project_id="my-project",
    subscription="events-realtime",
    handler=process_immediately,  # e.g., profile updates
)

# Path 2: Batched archival (5-10 min latency, cost-optimized)
archive_coordinator = EventSubscriptionCoordinator(
    project_id="my-project",
    subscription="events-archive",
    handler=event_loader.add,  # Batches to GCS
)

# Manage both
manager = EventCoordinatorManager([
    realtime_coordinator,
    archive_coordinator,
])

await manager.start_all()

Use cases:

  • Real-time: Profile updates, personalization, live dashboards
  • Archive: Analytics, compliance, data warehousing

Design Philosophy

eventkit follows battle-tested patterns from production CDP systems:

Flexible Ingestion, Strict Processing

Accept any JSON payload at the collection endpoint, then validate and normalize downstream. This approach maximizes reliability—invalid events are routed to dead letter queues rather than rejected, allowing you to iterate on validation rules without data loss.

Stream-Based Isolation

Events are routed to named streams (users, events, pages, webhooks) for independent processing. This enables:

  • Different validation rules per stream
  • Independent scaling and resource allocation
  • Clear separation of concerns
  • Easy addition of new event sources

Pluggable Architecture

Every component implements a protocol/interface:

  • Adapters - Bring your own validation logic
  • Storage - Swap Firestore for ClickHouse, PostgreSQL, or S3
  • Processors - Add custom enrichment or transformation stages

Production Patterns

Inspired by open-source CDP architectures:

  • RudderStack - Modular validator pattern
  • PostHog - Modern Python stack (FastAPI, async)
  • Snowplow - Schema-first validation (optional)

Getting Started

EventKit is a kit, not a library. Clone and make it your own:

# 1. Clone the repo
git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline

# 2. Install dependencies
uv sync

# 3. Start local dev
docker-compose up -d  # GCS + PubSub emulators
uv run uvicorn eventkit.api.app:app --reload

# 4. Customize for your needs
# - Modify validation rules in src/eventkit/adapters/
# - Add custom storage backends in src/eventkit/stores/
# - Adjust queue behavior in src/eventkit/queues/
# - Make it yours!

See LOCAL_DEV.md for detailed setup.

API Endpoints

Collection Endpoints

# Core collection endpoint (accepts single event or array)
POST /collect/{stream}
POST /collect  # defaults to "default" stream

# Convenience endpoints (Segment-compatible, single events)
POST /v1/identify  # routes to "users" stream
POST /v1/track     # routes to "events" stream
POST /v1/page      # routes to "pages" stream

Example Payloads

Identify:

{
  "type": "identify",
  "userId": "user123",
  "traits": {
    "email": "user@example.com",
    "name": "John Doe"
  }
}

Track:

{
  "type": "track",
  "userId": "user123",
  "event": "Button Clicked",
  "properties": {
    "button_id": "cta-signup",
    "page": "/home"
  }
}

Validation & Adaptation

eventkit uses a two-tier architecture for transforming raw events into typed, validated events:

Validators

Composable validators perform lightweight checks on raw event payloads:

from eventkit.adapters.validators import (
    ValidationPipeline,
    RequiredFieldsValidator,
    TypeCheckValidator,
    TimestampValidator,
)

# Build custom validation pipeline
pipeline = ValidationPipeline([
    RequiredFieldsValidator(["type", "userId"]),
    TypeCheckValidator({"type": str, "userId": str}),
    TimestampValidator(),
])

is_valid, error = pipeline.validate({"type": "identify", "userId": "123"})

Built-in Validators:

  • RequiredFieldsValidator - Ensure required fields are present
  • TypeCheckValidator - Validate field types (supports union types)
  • TimestampValidator - Parse ISO 8601 and Unix timestamps
  • ValidationPipeline - Compose validators with fail-fast behavior

Adapters

Schema adapters transform validated RawEvent → typed TypedEvent:

from eventkit.adapters import SegmentSchemaAdapter
from eventkit.schema.raw import RawEvent

adapter = SegmentSchemaAdapter()
raw = RawEvent(payload={"type": "identify", "userId": "123"})
result = adapter.adapt(raw)

if result.ok:
    event = result.event  # IdentifyEvent | TrackEvent | PageEvent
else:
    error = result.error  # Route to dead letter queue

Built-in Adapters:

  • SegmentSchemaAdapter - Segment-compatible event spec (identify/track/page)

Future Adapters:

  • CustomSchemaAdapter - Dynamic per-account schemas (in eventkit-schema package)
  • SnowplowAdapter - Snowplow event format
  • AmplitudeAdapter - Amplitude HTTP API format

Storage Options

GCS + BigQuery (Recommended for Production)

Write events to Google Cloud Storage (GCS) as Parquet files, then batch load to BigQuery for analytics. This pattern provides:

  • Cost efficiency: GCS storage is ~50% cheaper than BigQuery active storage
  • Flexibility: Raw events available for reprocessing
  • Pluggable warehouses: Bring your own warehouse (Snowflake, Redshift, etc.)
  • Production-proven: Used by PostHog, RudderStack, and other CDPs
from eventkit.config import Settings

settings = Settings(
    gcp_project_id="my-project",
    gcp_gcs_bucket="my-events",
    gcp_bigquery_dataset="events",
    gcp_bigquery_table="raw_events",
    eventkit_event_store="gcs",  # Default
    eventkit_warehouse_enabled=True,  # Auto-load to BigQuery
)

Setup BigQuery:

# Create tables
cd scripts/bigquery
export PROJECT_ID=my-project DATASET=events
cat create_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false

Run Standalone Loader (optional):

# Deploy as separate service for independent scaling
python -m scripts.run_bigquery_loader

See scripts/bigquery/README.md and specs/gcs-bigquery-storage/ for full details.

Custom Storage

Implement the EventStore protocol for any backend:

from eventkit.stores import EventStore

class MyCustomStore(EventStore):
    async def store(self, event: TypedEvent) -> None: ...
    async def store_batch(self, events: list[TypedEvent]) -> None: ...
    def health_check(self) -> bool: ...

Monitoring & Metrics

eventkit exposes Prometheus metrics on a dedicated port (default: 9090) for production observability.

Metrics Server

The metrics server runs independently from the main API server, isolating monitoring traffic from production requests:

settings = Settings(
    eventkit_metrics_enabled=True,  # Default
    eventkit_metrics_port=9090,      # Default
)

Access metrics:

curl http://localhost:9090/metrics  # Prometheus format
curl http://localhost:9090/health    # Health check

Available Metrics

API Layer:

  • eventkit_api_requests_total - Total HTTP requests (labels: endpoint, method, status)
  • eventkit_api_request_duration_seconds - Request latency histogram (labels: endpoint, method)

Event Processing:

  • eventkit_events_received_total - Events received at ingestion
  • eventkit_events_processed_total - Events successfully processed (label: event_type)
  • eventkit_events_failed_total - Events that failed validation (label: reason)

Storage:

  • eventkit_storage_bytes_written_total - Bytes written to storage
  • eventkit_storage_files_written_total - Files written to storage (label: storage_type)

Queue & Ring Buffer:

  • eventkit_queue_enqueued_total - Events enqueued (label: queue_mode)
  • eventkit_queue_dequeued_total - Events dequeued (label: queue_mode)
  • eventkit_queue_processed_total - Events processed by workers (labels: queue_mode, result)
  • eventkit_queue_depth - Current queue depth gauge (labels: queue_mode, partition)
  • eventkit_ringbuffer_written_total - Events written to ring buffer
  • eventkit_ringbuffer_published_total - Events published from ring buffer (label: result)
  • eventkit_ringbuffer_marked_published_total - Events marked as published
  • eventkit_ringbuffer_size - Total ring buffer size gauge
  • eventkit_ringbuffer_unpublished - Unpublished events gauge

Warehouse Loader:

  • eventkit_warehouse_loader_files_processed_total - Files loaded to warehouse
  • eventkit_warehouse_loader_errors_total - Loader errors

System:

  • eventkit_info - Version and platform info
  • eventkit_uptime_seconds - Process uptime
  • eventkit_component_health - Component health status (labels: component, status)

Grafana Dashboard

Example queries for building dashboards:

# API request rate
rate(eventkit_api_requests_total[5m])

# Event processing throughput
rate(eventkit_events_processed_total[5m])

# Storage write rate (bytes/sec)
rate(eventkit_storage_bytes_written_total[5m])

# Queue depth (monitor backlog)
eventkit_queue_depth

# Error rate
rate(eventkit_events_failed_total[5m]) / rate(eventkit_events_received_total[5m])

Design Principles

  • Counter-focused - Prefer counters over gauges for aggregation
  • Low cardinality labels - No unbounded values (user IDs, etc.)
  • Separate server - Metrics on dedicated port isolates monitoring traffic
  • Standard naming - Follow Prometheus conventions: {namespace}_{verb_noun}_{unit}_{suffix}

Development

See LOCAL_DEV.md for detailed local development instructions.

Quick Start:

# Start GCS emulator (for local development)
docker-compose up -d

# Install dependencies
uv sync

# Run API server
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
uv run uvicorn eventkit.api.app:app --reload

# Run tests
uv run pytest --cov=src/eventkit

Type check:

uv run mypy src/eventkit

Lint:

uv run ruff check src/

Format:

uv run ruff format src/

Roadmap

Core Kit (v0.x) ✅

  • Composable validators (required fields, types, timestamps)
  • Segment-compatible adapter with ValidationPipeline
  • Collection API with stream routing
  • GCS + BigQuery storage backend
  • Ring buffer with Write-Ahead Log
  • Pub/Sub queue integration
  • Event batching and loading
  • Prometheus metrics
  • EventSubscriptionCoordinator (dual-path architecture)
  • Hash-based sequencer for consistent ordering
  • Performance benchmarks (10k+ events/sec validated)
  • Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)

v1.0 - Production Ready

  • Comprehensive examples and use cases
  • Production deployment guides (Cloud Run, GKE, ECS)
  • S3 + Snowflake/Redshift storage adapters
  • Nextra documentation site

Future: Extract Focused Libraries

As patterns stabilize, we may extract reusable components:

  • eventkit-ring-buffer - SQLite WAL durability layer (could be used standalone)
  • eventkit-queues - Queue abstractions (AsyncQueue, PubSub patterns)
  • eventkit-validators - Composable validation framework
  • eventkit-storage - Storage backend protocols and implementations

These would be pip-installable libraries while the kit remains a starting point.

Contributing

We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation, your help makes eventkit better for everyone.

Getting Started

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes with tests
  4. Run the test suite (pytest)
  5. Submit a pull request

Code Standards

  • Type hints required (mypy strict mode)
  • Test coverage for new features
  • Follow existing code style (ruff format)
  • Update documentation for API changes

Community

  • Issues - Bug reports and feature requests
  • Discussions - Questions and ideas
  • Pull Requests - Code contributions welcome

License

MIT License - see LICENSE

About

Event processing primitives for Python: flexible ingestion, identity resolution, and profile building.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published