Skip to content

Commit 9bf4092

Browse files
intel352Copilot
andcommitted
fix: add panic recovery to unprotected goroutines
Prevents observer/callback panics from crashing the entire process. Matches existing recovery patterns used in other goroutine sites. Adds defer/recover blocks to goroutines across 20 files: - Root: reload_orchestrator, contract_verifier, application_observer - Scheduler: dispatcher, shutdown waiter - EventLogger: processing loop, startup emission, shutdown waiter - HTTPServer: runServer - ConfigWatcher: context listener, event loop - EventBus: memory/NATS/Kinesis/Kafka/Redis/durable/custom workers, shutdown waiters, async handlers, event emission, observer notification - Cache: memory cleanup goroutine - ReverseProxy: health checker, dry-run requests, circuit breaker and direct proxy goroutines - LetsEncrypt: certificate renewal goroutine Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 54582c5 commit 9bf4092

20 files changed

Lines changed: 207 additions & 0 deletions

File tree

application_observer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ func (app *ObservableApplication) NotifyObservers(ctx context.Context, event clo
134134
func (app *ObservableApplication) emitEvent(ctx context.Context, event cloudevents.Event) {
135135
// Use a separate goroutine to avoid blocking application operations
136136
go func() {
137+
defer func() {
138+
if r := recover(); r != nil {
139+
app.logger.Error("panic recovered in application event emission", "error", r)
140+
}
141+
}()
137142
if err := app.NotifyObservers(ctx, event); err != nil {
138143
app.logger.Error("Failed to notify observers", "event", event.Type(), "error", err)
139144
}

configwatcher/configwatcher.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ func (w *ConfigWatcher) Start(ctx context.Context) error {
7070
w.wg.Add(1)
7171
go func() {
7272
defer w.wg.Done()
73+
defer func() {
74+
if r := recover(); r != nil {
75+
w.logger.Error("panic recovered in config watcher context listener", "error", r)
76+
}
77+
}()
7378
select {
7479
case <-ctx.Done():
7580
_ = w.stopWatching()
@@ -117,6 +122,11 @@ func (w *ConfigWatcher) stopWatching() error {
117122

118123
func (w *ConfigWatcher) eventLoop() {
119124
defer w.wg.Done()
125+
defer func() {
126+
if r := recover(); r != nil {
127+
w.logger.Error("panic recovered in config watcher event loop", "error", r)
128+
}
129+
}()
120130
var timer *time.Timer
121131
changedPaths := make(map[string]struct{})
122132
var mu sync.Mutex

contract_verifier.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func (v *StandardContractVerifier) runReloadWithGuard(module Reloadable, label s
128128
type result struct{ err error }
129129
ch := make(chan result, 1)
130130
go func() {
131+
defer func() {
132+
if r := recover(); r != nil {
133+
ch <- result{err: fmt.Errorf("Reload panicked: %v", r)}
134+
}
135+
}()
131136
ch <- result{err: module.Reload(ctx, nil)}
132137
}()
133138

@@ -170,6 +175,11 @@ func (v *StandardContractVerifier) VerifyHealthContract(provider HealthProvider)
170175
}
171176
ch := make(chan result, 1)
172177
go func() {
178+
defer func() {
179+
if r := recover(); r != nil {
180+
ch <- result{err: fmt.Errorf("HealthCheck panicked: %v", r)}
181+
}
182+
}()
173183
reports, err := provider.HealthCheck(ctx)
174184
ch <- result{reports, err}
175185
}()

eventlogger/module.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,13 @@ func (m *EventLoggerModule) Start(ctx context.Context) error {
340340

341341
// emitStartupOperationalEvents performs the operational event emission without holding the Start mutex.
342342
func (m *EventLoggerModule) emitStartupOperationalEvents(ctx context.Context, sync bool, outputsLen, bufferLen int, targetConfigs []OutputTargetConfig) {
343+
defer func() {
344+
if r := recover(); r != nil {
345+
if m.logger != nil {
346+
m.logger.Error("panic recovered in event logger startup event emission", "error", r)
347+
}
348+
}
349+
}()
343350
// Protect field reads with RLock to avoid race with Stop() which writes m.started under Lock
344351
m.mutex.RLock()
345352
logger := m.logger
@@ -431,6 +438,13 @@ func (m *EventLoggerModule) Stop(ctx context.Context) error {
431438
// Wait for processing with optional timeout
432439
done := make(chan struct{})
433440
go func() {
441+
defer func() {
442+
if r := recover(); r != nil {
443+
if m.logger != nil {
444+
m.logger.Error("panic recovered in event logger shutdown waiter", "error", r)
445+
}
446+
}
447+
}()
434448
m.wg.Wait()
435449
close(done)
436450
}()
@@ -759,6 +773,13 @@ func (m *EventLoggerModule) ObserverID() string {
759773
// processEvents processes events from both event channels.
760774
func (m *EventLoggerModule) processEvents(ctx context.Context) {
761775
defer m.wg.Done()
776+
defer func() {
777+
if r := recover(); r != nil {
778+
if m.logger != nil {
779+
m.logger.Error("panic recovered in event logger processing loop", "error", r)
780+
}
781+
}
782+
}()
762783

763784
flushTicker := time.NewTicker(m.config.FlushInterval)
764785
defer flushTicker.Stop()

httpserver/module.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,11 @@ func (m *HTTPServerModule) Start(ctx context.Context) error {
285285

286286
// runServer starts the HTTP server with appropriate TLS configuration
287287
func (m *HTTPServerModule) runServer(ctx context.Context, addr string) {
288+
defer func() {
289+
if r := recover(); r != nil {
290+
m.logger.Error("panic recovered in HTTP server", "error", r)
291+
}
292+
}()
288293
m.logger.Info("Starting HTTP server", "address", addr)
289294
var err error
290295

modules/cache/memory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"log/slog"
56
"sync"
67
"time"
78

@@ -76,6 +77,11 @@ func (c *MemoryCache) Connect(ctx context.Context) error {
7677
// Start cleanup goroutine with derived context
7778
c.cleanupCtx, c.cancelFunc = context.WithCancel(ctx)
7879
go func() {
80+
defer func() {
81+
if r := recover(); r != nil {
82+
slog.Error("panic recovered in cache cleanup goroutine", "error", r)
83+
}
84+
}()
7985
c.startCleanupTimer(c.cleanupCtx)
8086
}()
8187
return nil

modules/eventbus/custom_memory.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,11 @@ func (c *CustomMemoryEventBus) matchesTopic(eventTopic, subscriptionTopic string
427427

428428
// handleEvents processes events for a custom subscription
429429
func (c *CustomMemoryEventBus) handleEvents(sub *customMemorySubscription) {
430+
defer func() {
431+
if r := recover(); r != nil {
432+
slog.Error("panic recovered in custom memory event handler", "error", r, "topic", sub.topic)
433+
}
434+
}()
430435
for {
431436
select {
432437
case <-c.ctx.Done():
@@ -469,6 +474,11 @@ func (c *CustomMemoryEventBus) handleEvents(sub *customMemorySubscription) {
469474

470475
// metricsCollector periodically logs metrics
471476
func (c *CustomMemoryEventBus) metricsCollector() {
477+
defer func() {
478+
if r := recover(); r != nil {
479+
slog.Error("panic recovered in custom memory eventbus metrics collector", "error", r)
480+
}
481+
}()
472482
ticker := time.NewTicker(c.config.MetricsInterval)
473483
defer ticker.Stop()
474484

modules/eventbus/durable_memory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ func (d *DurableMemoryEventBus) Stop(ctx context.Context) error {
235235

236236
done := make(chan struct{})
237237
go func() {
238+
defer func() {
239+
if r := recover(); r != nil {
240+
slog.Error("panic recovered in durable memory eventbus shutdown waiter", "error", r)
241+
}
242+
}()
238243
d.wg.Wait()
239244
close(done)
240245
}()

modules/eventbus/kafka.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@ func (k *KafkaEventBus) Stop(ctx context.Context) error {
267267
// Wait for all workers to finish
268268
done := make(chan struct{})
269269
go func() {
270+
defer func() {
271+
if r := recover(); r != nil {
272+
slog.Error("panic recovered in Kafka eventbus shutdown waiter", "error", r)
273+
}
274+
}()
270275
k.wg.Wait()
271276
close(done)
272277
}()
@@ -368,6 +373,11 @@ func (k *KafkaEventBus) subscribe(ctx context.Context, topic string, handler Eve
368373

369374
// startConsumerGroup starts the Kafka consumer group
370375
func (k *KafkaEventBus) startConsumerGroup() {
376+
defer func() {
377+
if r := recover(); r != nil {
378+
slog.Error("panic recovered in Kafka consumer group", "error", r)
379+
}
380+
}()
371381
handler := &KafkaConsumerGroupHandler{
372382
eventBus: k,
373383
subscriptions: make(map[string]*kafkaSubscription),
@@ -467,5 +477,10 @@ func (k *KafkaEventBus) processEvent(sub *kafkaSubscription, event Event) {
467477

468478
// processEventAsync processes an event asynchronously
469479
func (k *KafkaEventBus) processEventAsync(sub *kafkaSubscription, event Event) {
480+
defer func() {
481+
if r := recover(); r != nil {
482+
slog.Error("panic recovered in Kafka async event handler", "error", r, "topic", event.Type())
483+
}
484+
}()
470485
k.processEvent(sub, event)
471486
}

modules/eventbus/kinesis.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ func (k *KinesisEventBus) Stop(ctx context.Context) error {
242242
// Wait for all workers to finish
243243
done := make(chan struct{})
244244
go func() {
245+
defer func() {
246+
if r := recover(); r != nil {
247+
slog.Error("panic recovered in Kinesis eventbus shutdown waiter", "error", r)
248+
}
249+
}()
245250
k.wg.Wait()
246251
close(done)
247252
}()
@@ -402,6 +407,11 @@ func (k *KinesisEventBus) readShard(shardID string) {
402407
delete(k.activeShards, shardID)
403408
k.shardMutex.Unlock()
404409
}()
410+
defer func() {
411+
if r := recover(); r != nil {
412+
slog.Error("panic recovered in Kinesis shard reader", "error", r, "shard", shardID)
413+
}
414+
}()
405415

406416
// Get shard iterator
407417
iterResp, err := k.client.GetShardIterator(k.ctx, &kinesis.GetShardIteratorInput{
@@ -608,5 +618,10 @@ func (k *KinesisEventBus) processEvent(sub *kinesisSubscription, event Event) {
608618

609619
// processEventAsync processes an event asynchronously
610620
func (k *KinesisEventBus) processEventAsync(sub *kinesisSubscription, event Event) {
621+
defer func() {
622+
if r := recover(); r != nil {
623+
slog.Error("panic recovered in Kinesis async event handler", "error", r, "topic", event.Type())
624+
}
625+
}()
611626
k.processEvent(sub, event)
612627
}

0 commit comments

Comments
 (0)