Skip to content

Conversation

@prosdev
Copy link
Contributor

@prosdev prosdev commented Jan 15, 2026

Summary

Adds a new abstraction to enable dual-path event processing (real-time + batched), laying the foundation for future profile building.

Changes

New Components

  • EventSubscriptionCoordinator: Manages Pub/Sub subscription lifecycle, bridges sync callbacks to async handlers
  • EventCoordinatorManager: Coordinates multiple subscription coordinators
  • EventHandler Protocol: Clean interface for event processing

Features

  • Graceful start/stop with queue draining
  • Configurable worker pool (default: 4)
  • Full Prometheus metrics integration (reuses existing queue metrics)
  • Support for dual-path architecture (e.g., real-time profiles + archival)

Documentation

  • README: Added "Dual-Path Architecture (Advanced)" section
  • README: Removed all Firestore references
  • README: Updated roadmap with completed features
  • Full docstrings with real-world examples

Tests

  • 4 new unit tests (all passing)
  • All 242 tests passing
  • Fixed missing pytest-asyncio dependency

Example Usage

# Real-time processing
realtime = EventSubscriptionCoordinator(
    subscription="events-realtime",
    handler=process_immediately,
)

# Batched archival  
archive = EventSubscriptionCoordinator(
    subscription="events-archive",
    handler=event_loader.add,
)

manager = EventCoordinatorManager([realtime, archive])
await manager.start_all()

What This Enables

  • Future profile building project can use coordinator for real-time processing
  • Archive path can continue using EventLoader
  • Both paths process same events independently with different latency/cost tradeoffs

Related

  • Spec: specs/015-stream-subscription-coordinator.md
  • Notes: notes/projects/eventkit-impl/015-stream-subscription-coordinator.md

@prosdev prosdev force-pushed the feat/event-subscription-coordinator branch 2 times, most recently from 35de94a to 3dec678 Compare January 15, 2026 02:02
Add new streaming package with coordinator pattern:
- EventSubscriptionCoordinator: Manages Pub/Sub subscription lifecycle
- EventCoordinatorManager: Coordinates multiple subscriptions
- EventHandler Protocol: Interface for event processing

Features:
- Bridge sync Pub/Sub callbacks to async handlers
- Graceful start/stop with queue draining
- Prometheus metrics (reusing queue metrics with coordinator labels)
- Support for dual-path: real-time + batched processing

Example usage:
```python
# Real-time processing
realtime = EventSubscriptionCoordinator(
    subscription="events-realtime",
    handler=process_immediately,
)

# Batched archival
archive = EventSubscriptionCoordinator(
    subscription="events-archive",
    handler=event_loader.add,
)

manager = EventCoordinatorManager([realtime, archive])
await manager.start_all()
```

Tests:
- 4 new unit tests (all passing)
- All 242 tests passing
- Fixed missing pytest-asyncio dependency

Documentation:
- Full docstrings with examples
- README: Added dual-path architecture section
- README: Removed all Firestore references
- README: Updated roadmap with completed features
@prosdev prosdev force-pushed the feat/event-subscription-coordinator branch from 3dec678 to 96ada68 Compare January 15, 2026 02:05
@prosdev prosdev merged commit 05d1c38 into main Jan 15, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants