Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 42 additions & 87 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# eventkit

Event ingestion and processing primitives for Python.
Event ingestion and processing kit for Python.

## Overview

`eventkit` is a high-performance, type-safe library for building event collection pipelines. It provides the core infrastructure for customer data platforms, product analytics, and event-driven architectures.
`eventkit` is a production-ready **kit** for building event collection pipelines. Clone it, customize it, make it yours.

**Philosophy**: Provide a solid starting point with battle-tested patterns, then get out of your way. Customize for your specific needs.

### Key Features

Expand All @@ -27,13 +29,15 @@ Event ingestion and processing primitives for Python.

## Quick Start

Install from PyPI:
Clone and customize:

```bash
pip install eventkit
git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline
uv sync
```

Add to your FastAPI application:
Customize for your needs:

```python
from fastapi import FastAPI
Expand Down Expand Up @@ -181,25 +185,31 @@ Inspired by open-source CDP architectures:
- [PostHog](https://github.com/PostHog/posthog) - Modern Python stack (FastAPI, async)
- [Snowplow](https://github.com/snowplow/snowplow) - Schema-first validation (optional)

## Installation
## Getting Started

**Basic:**
```bash
pip install eventkit
```
**EventKit is a kit**, not a library. Clone and make it your own:

**With ClickHouse support:**
```bash
pip install eventkit[clickhouse]
```
# 1. Clone the repo
git clone https://github.com/prosdevlab/eventkit.git my-event-pipeline
cd my-event-pipeline

**Development:**
```bash
git clone https://github.com/prosdev/eventkit.git
cd eventkit
pip install -e ".[dev]"
# 2. Install dependencies
uv sync

# 3. Start local dev
docker-compose up -d # GCS + PubSub emulators
uv run uvicorn eventkit.api.app:app --reload

# 4. Customize for your needs
# - Modify validation rules in src/eventkit/adapters/
# - Add custom storage backends in src/eventkit/stores/
# - Adjust queue behavior in src/eventkit/queues/
# - Make it yours!
```

See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed setup.

## API Endpoints

### Collection Endpoints
Expand Down Expand Up @@ -340,62 +350,6 @@ python -m scripts.run_bigquery_loader

See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details.

### Error Store (Dead Letter Queue)

All failed events are stored in a GCS-based dead letter queue for debugging and retry:

**Two Error Types:**
- **Validation Errors**: Missing required fields, invalid schema
- **Processing Errors**: Storage failures, unexpected exceptions

**Storage Structure:**
```
gs://bucket/errors/
date=2026-01-15/
error_type=validation/
error-20260115-100000-abc123.parquet
error_type=processing/
error-20260115-100500-def456.parquet
```

**Create BigQuery Errors Table:**
```bash
cd scripts/bigquery
export PROJECT_ID=my-project DATASET=events
cat create_errors_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false
```

**Query Errors:**
```sql
-- Find validation errors in last 24 hours
SELECT
error_message,
stream,
COUNT(*) as count
FROM `project.dataset.errors`
WHERE date >= CURRENT_DATE() - 1
AND error_type = 'validation_error'
GROUP BY error_message, stream
ORDER BY count DESC;

-- Get processing errors with stack traces
SELECT
timestamp,
error_message,
JSON_EXTRACT_SCALAR(error_details, '$.exception_type') as exception,
JSON_EXTRACT_SCALAR(error_details, '$.stack_trace') as stack_trace
FROM `project.dataset.errors`
WHERE error_type = 'processing_error'
ORDER BY timestamp DESC
LIMIT 10;
```

**Key Features:**
- Never loses events - all failures stored for debugging
- Automatic 30-day retention (GCS lifecycle rules)
- Full event context (payload, error, timestamp, stream)
- Queryable via BigQuery for pattern analysis

### Custom Storage

Implement the `EventStore` protocol for any backend:
Expand Down Expand Up @@ -531,7 +485,7 @@ uv run ruff format src/

## Roadmap

### Core (v0.x)
### Core Kit (v0.x)
- [x] Composable validators (required fields, types, timestamps)
- [x] Segment-compatible adapter with ValidationPipeline
- [x] Collection API with stream routing
Expand All @@ -542,24 +496,25 @@ uv run ruff format src/
- [x] Prometheus metrics
- [x] EventSubscriptionCoordinator (dual-path architecture)
- [x] Hash-based sequencer for consistent ordering
- [x] Error store with dead letter queue (GCS-based)
- [ ] Performance benchmarks (10k+ events/sec)
- [x] Performance benchmarks (10k+ events/sec validated)
- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)

### v1.0
- [ ] OpenAPI spec and generated clients
- [ ] Comprehensive examples and documentation
### v1.0 - Production Ready
- [ ] Comprehensive examples and use cases
- [ ] Production deployment guides (Cloud Run, GKE, ECS)
- [ ] S3 + Snowflake/Redshift storage adapters
- [ ] Nextra documentation site

### Future: Extract Focused Libraries

### Future Ecosystem
As patterns stabilize, we may extract reusable components:

These capabilities are intentionally scoped as separate packages to keep the core focused:
- **eventkit-ring-buffer** - SQLite WAL durability layer (could be used standalone)
- **eventkit-queues** - Queue abstractions (AsyncQueue, PubSub patterns)
- **eventkit-validators** - Composable validation framework
- **eventkit-storage** - Storage backend protocols and implementations

- **eventkit-profiles** - Profile building and field-level merge strategies
- **eventkit-identity** - Graph-based identity resolution across devices
- **eventkit-enrichment** - IP geolocation, user agent parsing, company enrichment
- **eventkit-destinations** - Activate data to marketing and analytics tools
- **eventkit-privacy** - GDPR/CCPA compliance utilities (deletion, anonymization)
These would be pip-installable libraries while the kit remains a starting point.

## Contributing

Expand Down
Loading