Skip to content

Commit 34ef61c

Browse files
Copilotintel352
andauthored
Fix memory eventbus antipatterns causing silent drops and unbounded growth; add durable-memory engine (#188)
* Initial plan * Fix eventbus memory engine antipatterns: worker pool buffer, drop logging, history cap, timer race Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * Add durable-memory engine: per-subscriber FIFO queue with backpressure, zero event loss Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * Address review feedback: fix MaxDurableQueueDepth validator, nolint explanation, engine comment Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 6306be0 commit 34ef61c

7 files changed

Lines changed: 906 additions & 15 deletions

File tree

modules/eventbus/README.md

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ The EventBus Module provides a publish-subscribe messaging system for Modular ap
1515
- **Worker Pool Management**: Configurable worker pools for async event processing
1616

1717
### Supported Engines
18-
- **Memory**: In-process event bus using Go channels (default)
18+
- **Memory**: In-process event bus using Go channels (default). Configurable delivery modes: `drop`, `block`, `timeout`.
19+
- **Durable Memory**: In-process event bus that **never drops events**. Uses a per-subscriber FIFO queue and blocks publishers (backpressure) when the queue is full. Ideal when event loss is unacceptable within a single process.
1920
- **Redis**: Distributed messaging using Redis pub/sub
2021
- **Kafka**: Enterprise messaging using Apache Kafka
2122
- **Kinesis**: AWS-native streaming using Amazon Kinesis
@@ -169,6 +170,7 @@ Tuning Guidance:
169170
- Start with `drop` in high-throughput low-criticality paths where occasional loss is acceptable.
170171
- Use `timeout` with a modest `publishBlockTimeout` (e.g. 5-50ms) for balanced fairness and latency in mixed-speed subscriber sets.
171172
- Reserve `block` for critical fan-out where all subscribers must process every event and you are comfortable applying backpressure to publishers.
173+
- Use `durable-memory` when you need zero event loss within a process without the operational overhead of an external broker.
172174

173175
Example (balanced):
174176
```yaml
@@ -184,6 +186,56 @@ eventbus:
184186
rotateSubscriberOrder: true
185187
```
186188

189+
### Durable Memory Engine (Zero Event Loss)
190+
191+
The `durable-memory` engine is an in-process alternative to `memory` that **never drops events**. Instead of dropping events when a subscriber is busy, publishers block (backpressure) until the subscriber's queue has space. Memory usage is bounded by `maxDurableQueueDepth × number-of-subscribers`.
192+
193+
**When to use:**
194+
- Event loss is unacceptable and the application runs in a single process
195+
- You need the simplicity of in-process delivery with stronger guarantees than `memory`+`block` mode
196+
- You want bounded memory without the operational overhead of Redis or Kafka
197+
198+
**Single-engine configuration:**
199+
```yaml
200+
eventbus:
201+
engine: durable-memory
202+
maxEventQueueSize: 1000 # fallback queue depth if maxDurableQueueDepth is 0
203+
maxDurableQueueDepth: 500 # per-subscriber queue depth (0 = use maxEventQueueSize)
204+
```
205+
206+
**Multi-engine configuration (mixed with a lossy fast path):**
207+
```yaml
208+
eventbus:
209+
engines:
210+
- name: "fast"
211+
type: "memory"
212+
config:
213+
workerCount: 8
214+
deliveryMode: drop
215+
- name: "critical"
216+
type: "durable-memory"
217+
config:
218+
maxDurableQueueDepth: 200
219+
routing:
220+
- topics: ["payment.*", "order.*"]
221+
engine: "critical"
222+
- topics: ["*"]
223+
engine: "fast"
224+
```
225+
226+
**Trade-offs vs `memory` + `block`:**
227+
228+
| | `durable-memory` | `memory` + `block` |
229+
|---|---|---|
230+
| Event loss | None | None |
231+
| Backpressure unit | Per-subscriber queue | Per-subscriber channel slot |
232+
| Queue structure | Linked list (unbounded growth before cap) | Fixed-size Go channel |
233+
| Memory bound | `maxDurableQueueDepth` | `defaultEventBufferSize` |
234+
| Async handler parallelism | Sequential per subscriber | Shared worker pool |
235+
236+
**Important note on cross-process durability:**
237+
`durable-memory` only protects against in-process loss (e.g. a slow subscriber). It does **not** survive process restarts or crashes. For durable-across-restarts guarantees, use Redis, Kafka, or Kinesis.
238+
187239
### Metrics Export (Prometheus & Datadog)
188240

189241
Delivery statistics (delivered vs dropped) can be exported via the built-in Prometheus Collector or a Datadog StatsD exporter.

modules/eventbus/config.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type EngineConfig struct {
2020
Name string `json:"name" yaml:"name" validate:"required"`
2121

2222
// Type specifies the engine implementation to use.
23-
// Supported values: "memory", "redis", "kafka", "kinesis", "custom"
24-
Type string `json:"type" yaml:"type" validate:"required,oneof=memory redis kafka kinesis custom"`
23+
// Supported values: "memory", "redis", "kafka", "kinesis", "nats", "custom", "durable-memory"
24+
Type string `json:"type" yaml:"type" validate:"required,oneof=memory redis kafka kinesis nats custom durable-memory"`
2525

2626
// Config contains engine-specific configuration as a map.
2727
// The structure depends on the engine type.
@@ -76,10 +76,10 @@ type EventBusConfig struct {
7676
// --- Single Engine Configuration (Legacy Support) ---
7777

7878
// Engine specifies the event bus engine to use for single-engine mode.
79-
// Supported values: "memory", "redis", "kafka", "kinesis"
79+
// Supported values: "memory", "redis", "kafka", "kinesis", "nats", "custom", "durable-memory"
8080
// Default: "memory"
8181
// Note: This field is used only when Engines is empty (legacy mode)
82-
Engine string `json:"engine,omitempty" yaml:"engine,omitempty" validate:"omitempty,oneof=memory redis kafka kinesis" env:"ENGINE"`
82+
Engine string `json:"engine,omitempty" yaml:"engine,omitempty" validate:"omitempty,oneof=memory redis kafka kinesis nats custom durable-memory" env:"ENGINE"`
8383

8484
// MaxEventQueueSize is the maximum number of events to queue per topic.
8585
// When this limit is reached, new events may be dropped or publishers
@@ -109,6 +109,13 @@ type EventBusConfig struct {
109109
// PublishBlockTimeout is used when DeliveryMode == "timeout". Zero means no wait.
110110
PublishBlockTimeout time.Duration `json:"publishBlockTimeout,omitempty" yaml:"publishBlockTimeout,omitempty" env:"PUBLISH_BLOCK_TIMEOUT"`
111111

112+
// MaxDurableQueueDepth is the per-subscriber queue depth for the "durable-memory" engine.
113+
// When a subscriber's queue is full, publishers block (backpressure) until the subscriber
114+
// consumes an event, ensuring zero event loss.
115+
// When 0 (default), the value of MaxEventQueueSize is used.
116+
// Set to a positive value to override independently of MaxEventQueueSize.
117+
MaxDurableQueueDepth int `json:"maxDurableQueueDepth,omitempty" yaml:"maxDurableQueueDepth,omitempty" validate:"omitempty" env:"MAX_DURABLE_QUEUE_DEPTH"`
118+
112119
// RotateSubscriberOrder when true rotates the ordering of subscribers per publish
113120
// to reduce starvation and provide fairer drop distribution.
114121
RotateSubscriberOrder bool `json:"rotateSubscriberOrder,omitempty" yaml:"rotateSubscriberOrder,omitempty" env:"ROTATE_SUBSCRIBER_ORDER"`

0 commit comments

Comments
 (0)