-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmodule.go
More file actions
729 lines (665 loc) · 24 KB
/
module.go
File metadata and controls
729 lines (665 loc) · 24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
// Package eventbus provides a flexible event-driven messaging system for the modular framework.
//
// This module enables decoupled communication between application components through
// an event bus pattern. It supports both synchronous and asynchronous event processing,
// multiple event bus engines, and configurable event handling strategies.
//
// # Features
//
// The eventbus module offers the following capabilities:
// - Topic-based event publishing and subscription
// - Synchronous and asynchronous event processing
// - Multiple engine support (memory, Redis, Kafka)
// - Configurable worker pools for async processing
// - CloudEvents 1.0 compliant event model with extensions
// - Subscription management with unique identifiers
// - Event TTL and retention policies
//
// # Configuration
//
// The module can be configured through the EventBusConfig structure:
//
// config := &EventBusConfig{
// Engine: "memory", // or "redis", "kafka"
// MaxEventQueueSize: 1000, // events per topic queue
// DefaultEventBufferSize: 10, // subscription channel buffer
// WorkerCount: 5, // async processing workers
// EventTTL: 3600, // event time-to-live in seconds
// RetentionDays: 7, // event history retention
// ExternalBrokerURL: "", // for external brokers
// ExternalBrokerUser: "", // broker authentication
// ExternalBrokerPassword: "", // broker password
// }
//
// # Service Registration
//
// The module registers itself as a service for dependency injection:
//
// // Get the event bus service
// eventBus := app.GetService("eventbus.provider").(*EventBusModule)
//
// // Publish an event
// err := eventBus.Publish(ctx, "user.created", userData)
//
// // Subscribe to events
// subscription, err := eventBus.Subscribe(ctx, "user.*", userEventHandler)
//
// # Usage Examples
//
// Basic event publishing:
//
// err := eventBus.Publish(ctx, "order.placed", orderData)
//
// Event subscription patterns:
//
// // Synchronous subscription
// subscription, err := eventBus.Subscribe(ctx, "user.updated", func(ctx context.Context, event Event) error {
// var user UserData
// if err := event.DataAs(&user); err != nil {
// return err
// }
// return updateUserCache(user)
// })
//
// // Asynchronous subscription for heavy processing
// asyncSub, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
// var imageData ImageData
// if err := event.DataAs(&imageData); err != nil {
// return err
// }
// return processImageThumbnails(imageData)
// })
//
// // Wildcard subscriptions
// allOrdersSub, err := eventBus.Subscribe(ctx, "order.*", orderEventHandler)
//
// Subscription management:
//
// // Check subscription details
// fmt.Printf("Subscribed to: %s (ID: %s, Async: %v)",
// subscription.Topic(), subscription.ID(), subscription.IsAsync())
//
// // Cancel specific subscriptions
// err := eventBus.Unsubscribe(ctx, subscription)
//
// // Or cancel through the subscription itself
// err := subscription.Cancel()
//
// # Event Processing Patterns
//
// The module supports different event processing patterns:
//
// **Synchronous Processing**: Events are processed immediately in the same goroutine
// that published them. Best for lightweight operations and when ordering is important.
//
// **Asynchronous Processing**: Events are queued and processed by worker goroutines.
// Best for heavy operations, external API calls, or when you don't want to block
// the publisher.
//
// # Engine Support
//
// Currently supported engines:
// - **memory**: In-process event bus using Go channels
// - **redis**: Distributed event bus using Redis pub/sub
// - **kafka**: Enterprise event bus using Apache Kafka
// - **nats**: Cloud-native event bus using NATS
// - **kinesis**: AWS Kinesis stream-based event bus
package eventbus
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/GoCodeAlone/modular"
cloudevents "github.com/cloudevents/sdk-go/v2"
cevent "github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
)
// ModuleName is the unique identifier for the eventbus module.
const ModuleName = "eventbus"
// ServiceName is the name of the service provided by this module.
// Other modules can use this name to request the event bus service through dependency injection.
const ServiceName = "eventbus.provider"
// EventBusModule provides event-driven messaging capabilities for the modular framework.
// It implements a publish-subscribe pattern with support for multiple event bus engines,
// asynchronous processing, and flexible subscription management.
//
// The module implements the following interfaces:
// - modular.Module: Basic module lifecycle
// - modular.Configurable: Configuration management
// - modular.ServiceAware: Service dependency management
// - modular.Startable: Startup logic
// - modular.Stoppable: Shutdown logic
// - modular.ObservableModule: Event observation and emission
// - EventBus: Event publishing and subscription interface
//
// Event processing is thread-safe and supports concurrent publishers and subscribers.
type EventBusModule struct {
name string
config *EventBusConfig
logger modular.Logger
router *EngineRouter
mutex sync.RWMutex
isStarted atomic.Bool
subject modular.Subject // For event observation (guarded by mutex)
}
// DeliveryStats represents basic delivery outcomes for an engine or aggregate.
// These counters are monotonically increasing from module start. They are
// intentionally simple (uint64) to keep overhead negligible; consumers wanting
// rates should compute deltas externally.
type DeliveryStats struct {
Delivered uint64 `json:"delivered" yaml:"delivered"`
Dropped uint64 `json:"dropped" yaml:"dropped"`
}
// NewModule creates a new instance of the event bus module.
// This is the primary constructor for the eventbus module and should be used
// when registering the module with the application.
//
// Example:
//
// app.RegisterModule(eventbus.NewModule())
func NewModule() modular.Module {
return &EventBusModule{
name: ModuleName,
}
}
// Name returns the unique identifier for this module.
// This name is used for service registration, dependency resolution,
// and configuration section identification.
func (m *EventBusModule) Name() string {
return m.name
}
// RegisterConfig registers the module's configuration structure.
// This method is called during application initialization to register
// the default configuration values for the eventbus module.
//
// Default configuration:
// - Engine: "memory"
// - MaxEventQueueSize: 1000 events per topic
// - DefaultEventBufferSize: 10 events per subscription channel
// - WorkerCount: 5 async processing workers
// - EventTTL: 3600 seconds (1 hour)
// - RetentionDays: 7 days for event history
// - ExternalBroker settings: empty (not used for memory engine)
func (m *EventBusModule) RegisterConfig(app modular.Application) error {
// Register the configuration with default values
defaultConfig := &EventBusConfig{
Engine: "memory",
MaxEventQueueSize: 1000,
DefaultEventBufferSize: 10,
WorkerCount: 5,
RetentionDays: 7,
ExternalBrokerURL: "",
ExternalBrokerUser: "",
ExternalBrokerPassword: "",
}
app.RegisterConfigSection(m.Name(), modular.NewStdConfigProvider(defaultConfig))
return nil
}
// Init initializes the eventbus module with the application context.
// This method is called after all modules have been registered and their
// configurations loaded. It sets up the event bus engine(s) based on configuration.
//
// The initialization process:
// 1. Retrieves the module's configuration
// 2. Sets up logging
// 3. Validates configuration
// 4. Initializes the engine router with configured engines
// 5. Prepares the event bus for startup
//
// Supported engines:
// - "memory": In-process event bus using Go channels
// - "redis": Distributed event bus using Redis pub/sub
// - "kafka": Enterprise event bus using Apache Kafka
// - "kinesis": AWS Kinesis streams
// - "custom": Custom engine implementations
func (m *EventBusModule) Init(app modular.Application) error {
// Retrieve the registered config section for access
cfg, err := app.GetConfigSection(m.name)
if err != nil {
return fmt.Errorf("failed to get config section '%s': %w", m.name, err)
}
m.config = cfg.GetConfig().(*EventBusConfig)
m.logger = app.Logger()
// Validate configuration
if err := m.config.ValidateConfig(); err != nil {
return fmt.Errorf("invalid eventbus configuration: %w", err)
}
// Initialize the engine router
m.router, err = NewEngineRouter(m.config)
if err != nil {
return fmt.Errorf("failed to create engine router: %w", err)
}
// Set module reference for memory engines to enable event emission
m.router.SetModuleReference(m)
if m.config.IsMultiEngine() {
m.logger.Info("Initialized multi-engine eventbus",
"engines", len(m.config.Engines),
"routing_rules", len(m.config.Routing))
for _, engine := range m.config.Engines {
m.logger.Debug("Configured engine", "name", engine.Name, "type", engine.Type)
}
} else {
m.logger.Info("Initialized single-engine eventbus", "engine", m.config.Engine)
}
// Emit config loaded event
m.emitEvent(modular.WithSynchronousNotification(context.Background()), EventTypeConfigLoaded, map[string]interface{}{
"engine": m.config.Engine,
"max_queue_size": m.config.MaxEventQueueSize,
"worker_count": m.config.WorkerCount,
"event_ttl": m.config.EventTTL,
"retention_days": m.config.RetentionDays,
})
m.logger.Info("Event bus module initialized")
return nil
}
// Start performs startup logic for the module.
// This method starts all configured event bus engines and begins processing events.
// It's called after all modules have been initialized and are ready to start.
//
// The startup process:
// 1. Checks if already started (idempotent)
// 2. Starts all underlying event bus engines
// 3. Initializes worker pools for async processing
// 4. Prepares topic management and subscription tracking
//
// This method is thread-safe and can be called multiple times safely.
func (m *EventBusModule) Start(ctx context.Context) error {
m.logger.Info("Starting event bus module")
m.mutex.Lock()
defer m.mutex.Unlock()
if m.isStarted.Load() {
return nil
}
// Start the engine router (which starts all engines)
err := m.router.Start(ctx)
if err != nil {
return fmt.Errorf("starting engine router: %w", err)
}
m.isStarted.Store(true)
if m.config.IsMultiEngine() {
m.logger.Info("Event bus started with multiple engines",
"engines", m.router.GetEngineNames())
} else {
m.logger.Info("Event bus started")
}
// Emit bus started event
event := modular.NewCloudEvent(EventTypeBusStarted, "eventbus-service", map[string]interface{}{
"engine": func() string {
if m.config != nil {
return m.config.Engine
}
return "unknown"
}(),
"workers": func() int {
if m.config != nil {
return m.config.WorkerCount
}
return 0
}(),
}, nil)
go func() {
defer func() {
if r := recover(); r != nil {
m.logger.Error("observer panic", "error", r)
}
}()
// Use helper to silence benign missing-subject cases
m.emitEvent(ctx, EventTypeBusStarted, map[string]interface{}{
"engine": event.Extensions()["engine"],
"workers": event.Extensions()["workers"],
})
}()
return nil
}
// Stop performs shutdown logic for the module.
// This method gracefully shuts down all event bus engines, ensuring all in-flight
// events are processed and all subscriptions are properly cleaned up.
//
// The shutdown process:
// 1. Checks if already stopped (idempotent)
// 2. Stops accepting new events
// 3. Waits for in-flight events to complete
// 4. Cancels all active subscriptions
// 5. Shuts down worker pools
// 6. Closes all underlying event bus engines
//
// This method is thread-safe and can be called multiple times safely.
func (m *EventBusModule) Stop(ctx context.Context) error {
m.logger.Info("Stopping event bus module")
m.mutex.Lock()
if !m.isStarted.Load() {
m.mutex.Unlock()
return nil
}
// Stop the engine router (which stops all engines)
err := m.router.Stop(ctx)
if err != nil {
m.mutex.Unlock()
return fmt.Errorf("stopping engine router: %w", err)
}
m.isStarted.Store(false)
engineName := "unknown"
if m.config != nil {
engineName = m.config.Engine
}
m.mutex.Unlock()
m.logger.Info("Event bus stopped")
// Emit bus stopped event synchronously now that the mutex is released.
m.emitEvent(ctx, EventTypeBusStopped, map[string]interface{}{
"engine": engineName,
})
return nil
}
// Dependencies returns the names of modules this module depends on.
// The eventbus module operates independently and has no dependencies.
func (m *EventBusModule) Dependencies() []string {
return nil
}
// ProvidesServices declares services provided by this module.
// The eventbus module provides an event bus service that can be injected
// into other modules for event-driven communication.
//
// Provided services:
// - "eventbus.provider": The main event bus service interface
func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: ServiceName,
Description: "Event bus for message distribution",
Instance: m,
},
}
}
// RequiresServices declares services required by this module.
// The eventbus module operates independently and requires no external services.
func (m *EventBusModule) RequiresServices() []modular.ServiceDependency {
return nil
}
// Constructor provides a dependency injection constructor for the module.
// This method is used by the dependency injection system to create
// the module instance with any required services.
func (m *EventBusModule) Constructor() modular.ModuleConstructor {
return func(app modular.Application, services map[string]any) (modular.Module, error) {
return m, nil
}
}
// Publish publishes an event to the event bus.
// Creates an Event struct with the provided type and payload, then
// sends it through the event bus for processing by subscribers.
//
// The event will be delivered to all active subscribers of the topic.
// Type patterns and wildcards may be supported depending on the engine.
// With multiple engines, the event is routed to the appropriate engine
// based on the configured routing rules.
//
// Example:
//
// err := eventBus.Publish(ctx, "user.created", userData)
// err := eventBus.Publish(ctx, "order.payment.failed", paymentData)
func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error {
event := cevent.New()
event.SetType(topic)
event.SetSource(m.config.Source)
event.SetID(uuid.New().String())
event.SetTime(time.Now())
if err := event.SetData("application/json", payload); err != nil {
return fmt.Errorf("failed to set event data: %w", err)
}
startTime := time.Now()
err := m.router.Publish(ctx, event)
duration := time.Since(startTime)
if err != nil {
// Emit message failed event
go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{
"topic": topic,
"error": err.Error(),
"duration_ms": duration.Milliseconds(),
})
return fmt.Errorf("publishing event to topic %s: %w", topic, err)
}
// Emit message published event
go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{
"topic": topic,
"duration_ms": duration.Milliseconds(),
})
return nil
}
// Subscribe subscribes to a topic on the event bus with synchronous processing.
// The provided handler will be called immediately when an event is published
// to the specified topic. The handler blocks the event delivery until it completes.
//
// With multiple engines, the subscription is created on the engine that
// handles the specified topic according to the routing configuration.
//
// Use synchronous subscriptions for:
// - Lightweight event processing
// - When event ordering is important
// - Critical event handlers that must complete before continuing
//
// Example:
//
// subscription, err := eventBus.Subscribe(ctx, "user.login", func(ctx context.Context, event Event) error {
// var user UserData
// if err := event.DataAs(&user); err != nil {
// return err
// }
// return updateLastLoginTime(user.ID)
// })
func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) {
sub, err := m.router.Subscribe(ctx, topic, handler)
if err != nil {
return nil, fmt.Errorf("subscribing to topic %s: %w", topic, err)
}
// Emit subscription created event
go m.emitEvent(ctx, EventTypeSubscriptionCreated, map[string]interface{}{
"topic": topic,
"subscription_id": sub.ID(),
"async": false,
})
return sub, nil
}
// SubscribeAsync subscribes to a topic with asynchronous event processing.
// The provided handler will be queued for processing by worker goroutines,
// allowing the event publisher to continue without waiting for processing.
//
// With multiple engines, the subscription is created on the engine that
// handles the specified topic according to the routing configuration.
//
// Use asynchronous subscriptions for:
// - Heavy processing operations
// - External API calls
// - Non-critical event handlers
// - When you want to avoid blocking publishers
//
// Example:
//
// subscription, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
// var imageData ImageData
// if err := event.DataAs(&imageData); err != nil {
// return err
// }
// return generateThumbnails(imageData)
// })
func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) {
sub, err := m.router.SubscribeAsync(ctx, topic, handler)
if err != nil {
return nil, fmt.Errorf("subscribing async to topic %s: %w", topic, err)
}
// Emit subscription created event
go m.emitEvent(ctx, EventTypeSubscriptionCreated, map[string]interface{}{
"topic": topic,
"subscription_id": sub.ID(),
"async": true,
})
return sub, nil
}
// Unsubscribe cancels a subscription and stops receiving events.
// The subscription will be removed from the event bus and no longer
// receive events for its topic.
//
// This method is idempotent - calling it multiple times on the same
// subscription is safe and will not cause errors.
//
// Example:
//
// err := eventBus.Unsubscribe(ctx, subscription)
func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error {
// Store subscription info before unsubscribing
topic := subscription.Topic()
subscriptionID := subscription.ID()
err := m.router.Unsubscribe(ctx, subscription)
if err != nil {
return fmt.Errorf("unsubscribing: %w", err)
}
// Emit subscription removed event
go m.emitEvent(ctx, EventTypeSubscriptionRemoved, map[string]interface{}{
"topic": topic,
"subscription_id": subscriptionID,
})
return nil
}
// Topics returns a list of all active topics that have subscribers.
// This can be useful for debugging, monitoring, or building administrative
// interfaces that show current event bus activity.
//
// Example:
//
// activeTopics := eventBus.Topics()
// for _, topic := range activeTopics {
// count := eventBus.SubscriberCount(topic)
// fmt.Printf("Topic: %s, Subscribers: %d\n", topic, count)
// }
func (m *EventBusModule) Topics() []string {
return m.router.Topics()
}
// SubscriberCount returns the number of active subscribers for a topic.
// This includes both synchronous and asynchronous subscriptions.
// Returns 0 if the topic has no subscribers.
//
// Example:
//
// count := eventBus.SubscriberCount("user.created")
// if count == 0 {
// log.Warn("No subscribers for user creation events")
// }
func (m *EventBusModule) SubscriberCount(topic string) int {
return m.router.SubscriberCount(topic)
}
// GetRouter returns the underlying engine router for advanced operations.
// This method provides access to engine-specific functionality like
// checking which engine a topic routes to.
//
// Example:
//
// router := eventBus.GetRouter()
// engine := router.GetEngineForTopic("user.created")
// fmt.Printf("Topic routes to engine: %s", engine)
func (m *EventBusModule) GetRouter() *EngineRouter {
return m.router
}
// Stats returns aggregated delivery statistics for all underlying engines that
// support them (currently only the in-memory engine). This is intended for
// lightweight monitoring/metrics and testing. Returns zeros if the module has
// not been started yet or no engines expose stats.
func (m *EventBusModule) Stats() (delivered uint64, dropped uint64) {
if m.router == nil {
return 0, 0
}
return m.router.CollectStats()
}
// PerEngineStats returns delivery statistics broken down per configured engine
// (only engines that expose stats are included). Safe to call before Start;
// returns an empty map if router not yet built.
func (m *EventBusModule) PerEngineStats() map[string]DeliveryStats {
if m.router == nil {
return map[string]DeliveryStats{}
}
return m.router.CollectPerEngineStats()
}
// Static errors for err113 compliance
var (
_ = ErrNoSubjectForEventEmission // Reference the local error
)
// RegisterObservers implements the ObservableModule interface.
// This allows the eventbus module to register as an observer for events it's interested in.
func (m *EventBusModule) RegisterObservers(subject modular.Subject) error {
m.mutex.Lock()
m.subject = subject
m.mutex.Unlock()
// The eventbus module currently does not need to observe other events,
// but this method stores the subject for event emission.
return nil
}
// EmitEvent implements the ObservableModule interface.
// This allows the eventbus module to emit events to registered observers.
func (m *EventBusModule) EmitEvent(ctx context.Context, event cloudevents.Event) error {
m.mutex.RLock()
subj := m.subject
m.mutex.RUnlock()
if subj == nil {
return ErrNoSubjectForEventEmission
}
// Use a goroutine to prevent blocking eventbus operations with event emission
go func(s modular.Subject, e cloudevents.Event) {
defer func() {
if r := recover(); r != nil {
if m.logger != nil {
m.logger.Error("panic recovered in eventbus observer notification", "error", r, "event_type", e.Type())
}
}
}()
if err := s.NotifyObservers(ctx, e); err != nil {
if m.logger != nil {
m.logger.Debug("Failed to notify observers", "error", err, "event_type", e.Type())
}
}
}(subj, event)
return nil
}
// emitEvent is a helper method to create and emit CloudEvents for the eventbus module.
// This centralizes the event creation logic and ensures consistent event formatting.
// If no subject is available for event emission, it silently skips the event emission
// to avoid noisy error messages in tests and non-observable applications.
func (m *EventBusModule) emitEvent(ctx context.Context, eventType string, data map[string]interface{}) {
defer func() {
if r := recover(); r != nil {
if m.logger != nil {
m.logger.Error("panic recovered in eventbus event emission", "error", r, "event_type", eventType)
}
}
}()
// Skip event emission if no subject is available (non-observable application)
m.mutex.RLock()
subj := m.subject
m.mutex.RUnlock()
if subj == nil {
return
}
event := modular.NewCloudEvent(eventType, "eventbus-service", data, nil)
if emitErr := m.EmitEvent(ctx, event); emitErr != nil {
// If no subject is registered, quietly skip to allow non-observable apps to run cleanly
if errors.Is(emitErr, ErrNoSubjectForEventEmission) {
return
}
// Further error logging handled by EmitEvent method itself
}
}
// GetRegisteredEventTypes implements the ObservableModule interface.
// Returns all event types that this eventbus module can emit.
func (m *EventBusModule) GetRegisteredEventTypes() []string {
return []string{
EventTypeMessagePublished,
EventTypeMessageReceived,
EventTypeMessageFailed,
EventTypeTopicCreated,
EventTypeTopicDeleted,
EventTypeSubscriptionCreated,
EventTypeSubscriptionRemoved,
EventTypeBusStarted,
EventTypeBusStopped,
EventTypeConfigLoaded,
}
}