Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,62 @@ python -m scripts.run_bigquery_loader

See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details.

### Error Store (Dead Letter Queue)

All failed events are stored in a GCS-based dead letter queue for debugging and retry:

**Two Error Types:**
- **Validation Errors**: Missing required fields, invalid schema
- **Processing Errors**: Storage failures, unexpected exceptions

**Storage Structure:**
```
gs://bucket/errors/
date=2026-01-15/
error_type=validation/
error-20260115-100000-abc123.parquet
error_type=processing/
error-20260115-100500-def456.parquet
```

**Create BigQuery Errors Table:**
```bash
cd scripts/bigquery
export PROJECT_ID=my-project DATASET=events
cat create_errors_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false
```

**Query Errors:**
```sql
-- Find validation errors in last 24 hours
SELECT
error_message,
stream,
COUNT(*) as count
FROM `project.dataset.errors`
WHERE date >= CURRENT_DATE() - 1
AND error_type = 'validation_error'
GROUP BY error_message, stream
ORDER BY count DESC;

-- Get processing errors with stack traces
SELECT
timestamp,
error_message,
JSON_EXTRACT_SCALAR(error_details, '$.exception_type') as exception,
JSON_EXTRACT_SCALAR(error_details, '$.stack_trace') as stack_trace
FROM `project.dataset.errors`
WHERE error_type = 'processing_error'
ORDER BY timestamp DESC
LIMIT 10;
```

**Key Features:**
- Never loses events - all failures stored for debugging
- Automatic 30-day retention (GCS lifecycle rules)
- Full event context (payload, error, timestamp, stream)
- Queryable via BigQuery for pattern analysis

### Custom Storage

Implement the `EventStore` protocol for any backend:
Expand Down Expand Up @@ -486,7 +542,7 @@ uv run ruff format src/
- [x] Prometheus metrics
- [x] EventSubscriptionCoordinator (dual-path architecture)
- [x] Hash-based sequencer for consistent ordering
- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)
- [x] Error store with dead letter queue (GCS-based)
- [ ] Performance benchmarks (10k+ events/sec)

### v1.0
Expand Down
19 changes: 19 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ dev = [
"ruff>=0.1.0",
"mypy>=1.7.0",
"httpx>=0.25.0", # For testing FastAPI
"locust>=2.20.0", # Load testing
"psutil>=5.9.0", # Resource monitoring
# Type stubs for mypy
"types-python-dateutil>=2.8.0",
"pandas-stubs>=2.1.0",
]
clickhouse = [
"clickhouse-driver>=0.2.6",
Expand Down Expand Up @@ -79,11 +84,25 @@ strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
# Disable import-untyped for packages without stubs
disable_error_code = ["import-untyped", "attr-defined"]

[[tool.mypy.overrides]]
module = "google.cloud"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "google.cloud.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "prometheus_client.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "pyarrow.*"
ignore_missing_imports = true

[dependency-groups]
dev = [
"types-python-dateutil>=2.9.0.20251115",
Expand Down
74 changes: 74 additions & 0 deletions scripts/bigquery/create_errors_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
-- Create errors table for dead letter queue
--
-- Usage:
-- export PROJECT_ID=your-project
-- export DATASET=events
-- cat create_errors_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET}.errors` (
error_id STRING NOT NULL,
timestamp TIMESTAMP NOT NULL,
error_type STRING NOT NULL, -- validation_error | processing_error
error_message STRING NOT NULL,
error_details JSON,
stream STRING NOT NULL,
original_payload JSON NOT NULL,
retry_count INT64 DEFAULT 0,
retry_after TIMESTAMP,
date DATE NOT NULL
)
PARTITION BY date
CLUSTER BY error_type, stream
OPTIONS(
description="Dead letter queue for failed events. All events that fail validation or processing are stored here with full context for debugging.",
partition_expiration_days=30
);

-- Create metadata table for tracking loaded error files
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET}._loaded_error_files` (
file_path STRING NOT NULL,
loaded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
row_count INT64,
error_type STRING
)
PARTITION BY DATE(loaded_at)
OPTIONS(
description="Metadata tracking which error files have been loaded to prevent duplicates"
);

-- Example queries for debugging

-- Find validation errors in last 24 hours
-- SELECT
-- error_type,
-- error_message,
-- stream,
-- COUNT(*) as count
-- FROM `{PROJECT_ID}.{DATASET}.errors`
-- WHERE date >= CURRENT_DATE() - 1
-- AND error_type = 'validation_error'
-- GROUP BY error_type, error_message, stream
-- ORDER BY count DESC;

-- Find processing errors with stack traces
-- SELECT
-- timestamp,
-- error_message,
-- JSON_EXTRACT_SCALAR(error_details, '$.exception_type') as exception,
-- JSON_EXTRACT_SCALAR(error_details, '$.stack_trace') as stack_trace,
-- original_payload
-- FROM `{PROJECT_ID}.{DATASET}.errors`
-- WHERE error_type = 'processing_error'
-- ORDER BY timestamp DESC
-- LIMIT 10;

-- Error rate by stream
-- SELECT
-- stream,
-- error_type,
-- COUNT(*) as error_count,
-- ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY stream), 2) as pct
-- FROM `{PROJECT_ID}.{DATASET}.errors`
-- WHERE date >= CURRENT_DATE() - 7
-- GROUP BY stream, error_type
-- ORDER BY stream, error_count DESC;
10 changes: 7 additions & 3 deletions src/eventkit/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from eventkit.processing.processor import Processor
from eventkit.processing.sequencer import HashSequencer
from eventkit.queues import EventQueue, create_queue
from eventkit.stores.error_store import ErrorStore, LoggingErrorStore
from eventkit.stores.error_store import ErrorStore
from eventkit.stores.event_store import EventStore
from eventkit.stores.gcs import GCSEventStore
from eventkit.stores.gcs_error import GCSErrorStore


@lru_cache
Expand Down Expand Up @@ -55,7 +56,7 @@ def get_queue() -> EventQueue:

Creates all dependencies and wires them together:
- EventStore (GCS + BigQuery)
- ErrorStore (Logging)
- ErrorStore (GCS-based dead letter queue)
- Adapter (SegmentSchemaAdapter)
- Sequencer (HashSequencer)
- EventLoader (batches events to storage)
Expand All @@ -78,7 +79,10 @@ async def collect(queue: EventQueue = Depends(get_queue)):

# Create stores
event_store = get_event_store()
error_store: ErrorStore = LoggingErrorStore()
error_store: ErrorStore = GCSErrorStore(
project_id=settings.GCP_PROJECT_ID,
bucket_name=settings.GCP_GCS_BUCKET,
)

# Create processing components
adapter = SegmentSchemaAdapter()
Expand Down
2 changes: 1 addition & 1 deletion src/eventkit/loaders/bigquery_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time

import structlog
from google.cloud import bigquery, storage # type: ignore[attr-defined]
from google.cloud import bigquery, storage

logger = structlog.get_logger(__name__)

Expand Down
107 changes: 67 additions & 40 deletions src/eventkit/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
- Lifecycle management (start/stop for graceful shutdown)
"""

from datetime import UTC, datetime

import structlog

from eventkit.adapters.base import EventAdapter
Expand Down Expand Up @@ -119,55 +117,84 @@ async def process_event(self, raw_event: RawEvent) -> None:
stream=raw_event.stream,
)

# Step 1: Adapt (validate & normalize)
result = self.adapter.adapt(raw_event)
try:
# Step 1: Adapt (validate & normalize)
result = self.adapter.adapt(raw_event)

if not result.ok:
# Invalid event → error store
# Track failed event
eventkit_events_failed_total.labels(
stream=raw_event.stream,
event_type=event_type_str,
cause="adaptation_error",
).inc()

logger.warning(
"adaptation_failed",
error=result.error or "Unknown error",
event_type=raw_event.payload.get("type"),
stream=raw_event.stream,
)
await self.error_store.store_error(
raw_event=raw_event,
error_message=result.error or "Unknown error",
error_type="validation_error",
error_details={"adapter": self.adapter.__class__.__name__},
)
return

# Type narrowing: If ok=True, event must be present
assert result.event is not None, "Adapter returned ok=True but event is None"

# Track successfully processed event
event_type_value = (
result.event.event_type.value
if hasattr(result.event.event_type, "value")
else str(result.event.event_type)
)
eventkit_events_processed_total.labels(
stream=raw_event.stream,
event_type=event_type_value,
).inc()

logger.debug("event_adapted", event_type=result.event.event_type)

# Step 2: Sequence (consistent routing)
partition_id = self.sequencer.get_partition_id(result.event)

logger.debug("event_sequenced", partition=partition_id)

# Step 3: Load (batch to storage)
await self.event_loader.enqueue(result.event, partition_id)

except Exception as e:
# Unexpected processing error → error store
import traceback

if not result.ok:
# Invalid event → error store
# Track failed event
eventkit_events_failed_total.labels(
stream=raw_event.stream,
event_type=event_type_str,
cause="adaptation_error",
cause="processing_error",
).inc()

logger.warning(
"adaptation_failed",
error=result.error or "Unknown error",
logger.error(
"processing_error",
error=str(e),
event_type=raw_event.payload.get("type"),
stream=raw_event.stream,
exc_info=True,
)

await self.error_store.store_error(
payload=raw_event.payload,
error=result.error or "Unknown error",
timestamp=datetime.now(UTC),
metadata={"stream": raw_event.stream},
raw_event=raw_event,
error_message=str(e),
error_type="processing_error",
error_details={
"exception_type": type(e).__name__,
"stack_trace": traceback.format_exc(),
},
)
return

# Type narrowing: If ok=True, event must be present
assert result.event is not None, "Adapter returned ok=True but event is None"

# Track successfully processed event
event_type_value = (
result.event.event_type.value
if hasattr(result.event.event_type, "value")
else str(result.event.event_type)
)
eventkit_events_processed_total.labels(
stream=raw_event.stream,
event_type=event_type_value,
).inc()

logger.debug("event_adapted", event_type=result.event.event_type)

# Step 2: Sequence (consistent routing)
partition_id = self.sequencer.get_partition_id(result.event)

logger.debug("event_sequenced", partition=partition_id)

# Step 3: Load (batch to storage)
await self.event_loader.enqueue(result.event, partition_id)

async def start(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/eventkit/queues/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
from typing import TYPE_CHECKING

from google.cloud import pubsub_v1 # type: ignore[attr-defined]
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

from eventkit.config import Settings
Expand Down
9 changes: 8 additions & 1 deletion src/eventkit/stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@
from eventkit.stores.error_store import ErrorStore, LoggingErrorStore
from eventkit.stores.event_store import EventStore
from eventkit.stores.gcs import GCSEventStore
from eventkit.stores.gcs_error import GCSErrorStore

__all__ = ["EventStore", "ErrorStore", "LoggingErrorStore", "GCSEventStore"]
__all__ = [
"EventStore",
"ErrorStore",
"LoggingErrorStore",
"GCSEventStore",
"GCSErrorStore",
]
Loading
Loading