Skip to content

Commit 804ec49

Browse files
intel352claude
andcommitted
fix: convert remaining isStarted bool to atomic.Bool and add panic recovery to handleEvents goroutines
- EventBusModule.isStarted: bool → atomic.Bool (module.go) - CustomMemoryEventBus.isStarted: bool → atomic.Bool (custom_memory.go) - Update test helpers to use .Load() for atomic bool access - Add defer recover() to handleEvents goroutine wrapper in memory.go and durable_memory.go Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5777cac commit 804ec49

6 files changed

Lines changed: 27 additions & 15 deletions

File tree

modules/eventbus/bdd_multi_engine_error_isolation_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (ctx *EventBusBDDTestContext) oneEngineEncountersAnError() error {
3030
}
3131

3232
// Ensure service is started before trying to publish
33-
if !ctx.service.isStarted {
33+
if !ctx.service.isStarted.Load() {
3434
err := ctx.service.Start(context.Background())
3535
if err != nil {
3636
return fmt.Errorf("failed to start eventbus: %w", err)

modules/eventbus/bdd_multi_engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (ctx *EventBusBDDTestContext) iPublishAnEventToTopic(topic string) error {
136136
ctx.publishedTopics[topic] = true
137137

138138
// Start the service if not already started
139-
if !ctx.service.isStarted {
139+
if !ctx.service.isStarted.Load() {
140140
err := ctx.service.Start(context.Background())
141141
if err != nil {
142142
return fmt.Errorf("failed to start eventbus: %w", err)

modules/eventbus/custom_memory.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"log/slog"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/google/uuid"
@@ -19,7 +20,7 @@ type CustomMemoryEventBus struct {
1920
topicMutex sync.RWMutex
2021
ctx context.Context
2122
cancel context.CancelFunc
22-
isStarted bool
23+
isStarted atomic.Bool
2324
eventMetrics *EventMetrics
2425
eventFilters []EventFilter
2526
}
@@ -189,7 +190,7 @@ func NewCustomMemoryEventBus(config map[string]interface{}) (EventBus, error) {
189190

190191
// Start initializes the custom memory event bus
191192
func (c *CustomMemoryEventBus) Start(ctx context.Context) error {
192-
if c.isStarted {
193+
if c.isStarted.Load() {
193194
return nil
194195
}
195196

@@ -200,7 +201,7 @@ func (c *CustomMemoryEventBus) Start(ctx context.Context) error {
200201
go c.metricsCollector()
201202
}
202203

203-
c.isStarted = true
204+
c.isStarted.Store(true)
204205
slog.Info("Custom memory event bus started with enhanced features",
205206
"metricsEnabled", c.config.EnableMetrics,
206207
"filterCount", len(c.eventFilters))
@@ -209,7 +210,7 @@ func (c *CustomMemoryEventBus) Start(ctx context.Context) error {
209210

210211
// Stop shuts down the custom memory event bus
211212
func (c *CustomMemoryEventBus) Stop(ctx context.Context) error {
212-
if !c.isStarted {
213+
if !c.isStarted.Load() {
213214
return nil
214215
}
215216

@@ -227,7 +228,7 @@ func (c *CustomMemoryEventBus) Stop(ctx context.Context) error {
227228
}
228229
c.topicMutex.Unlock()
229230

230-
c.isStarted = false
231+
c.isStarted.Store(false)
231232
slog.Info("Custom memory event bus stopped",
232233
"totalEvents", c.eventMetrics.TotalEvents,
233234
"topics", len(c.eventMetrics.EventsPerTopic))
@@ -236,7 +237,7 @@ func (c *CustomMemoryEventBus) Stop(ctx context.Context) error {
236237

237238
// Publish sends an event to the specified topic with custom filtering and metrics
238239
func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error {
239-
if !c.isStarted {
240+
if !c.isStarted.Load() {
240241
return ErrEventBusNotStarted
241242
}
242243

@@ -309,7 +310,7 @@ func (c *CustomMemoryEventBus) SubscribeAsync(ctx context.Context, topic string,
309310

310311
// subscribe is the internal implementation for both Subscribe and SubscribeAsync
311312
func (c *CustomMemoryEventBus) subscribe(ctx context.Context, topic string, handler EventHandler, isAsync bool) (Subscription, error) {
312-
if !c.isStarted {
313+
if !c.isStarted.Load() {
313314
return nil, ErrEventBusNotStarted
314315
}
315316

@@ -347,7 +348,7 @@ func (c *CustomMemoryEventBus) subscribe(ctx context.Context, topic string, hand
347348

348349
// Unsubscribe removes a subscription
349350
func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error {
350-
if !c.isStarted {
351+
if !c.isStarted.Load() {
351352
return ErrEventBusNotStarted
352353
}
353354

modules/eventbus/durable_memory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ func (d *DurableMemoryEventBus) subscribe(_ context.Context, topic string, handl
339339
started := make(chan struct{})
340340
d.wg.Add(1)
341341
go func() {
342+
defer func() {
343+
if r := recover(); r != nil {
344+
slog.Error("handleEvents panic recovered", "error", r, "topic", sub.topic)
345+
}
346+
}()
342347
close(started)
343348
d.handleEvents(sub)
344349
}()

modules/eventbus/memory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@ func (m *MemoryEventBus) subscribe(ctx context.Context, topic string, handler Ev
360360
started := make(chan struct{})
361361
m.wg.Add(1)
362362
go func() {
363+
defer func() {
364+
if r := recover(); r != nil {
365+
slog.Error("handleEvents panic recovered", "error", r, "topic", sub.topic)
366+
}
367+
}()
363368
close(started) // Signal that the goroutine has started
364369
m.handleEvents(sub)
365370
}()

modules/eventbus/module.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ import (
111111
"errors"
112112
"fmt"
113113
"sync"
114+
"sync/atomic"
114115
"time"
115116

116117
"github.com/GoCodeAlone/modular"
@@ -146,7 +147,7 @@ type EventBusModule struct {
146147
logger modular.Logger
147148
router *EngineRouter
148149
mutex sync.RWMutex
149-
isStarted bool
150+
isStarted atomic.Bool
150151
subject modular.Subject // For event observation (guarded by mutex)
151152
}
152153

@@ -290,7 +291,7 @@ func (m *EventBusModule) Start(ctx context.Context) error {
290291
m.mutex.Lock()
291292
defer m.mutex.Unlock()
292293

293-
if m.isStarted {
294+
if m.isStarted.Load() {
294295
return nil
295296
}
296297

@@ -300,7 +301,7 @@ func (m *EventBusModule) Start(ctx context.Context) error {
300301
return fmt.Errorf("starting engine router: %w", err)
301302
}
302303

303-
m.isStarted = true
304+
m.isStarted.Store(true)
304305
if m.config.IsMultiEngine() {
305306
m.logger.Info("Event bus started with multiple engines",
306307
"engines", m.router.GetEngineNames())
@@ -358,7 +359,7 @@ func (m *EventBusModule) Stop(ctx context.Context) error {
358359

359360
m.mutex.Lock()
360361

361-
if !m.isStarted {
362+
if !m.isStarted.Load() {
362363
m.mutex.Unlock()
363364
return nil
364365
}
@@ -370,7 +371,7 @@ func (m *EventBusModule) Stop(ctx context.Context) error {
370371
return fmt.Errorf("stopping engine router: %w", err)
371372
}
372373

373-
m.isStarted = false
374+
m.isStarted.Store(false)
374375
engineName := "unknown"
375376
if m.config != nil {
376377
engineName = m.config.Engine

0 commit comments

Comments
 (0)