Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ import (
"time"
)

var DefaultBackoffPolicy = BackoffPolicy{
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
UseJitter: true,
JitterRangeMs: 300,
}

// Backoff defines the interface for calculating backoff delays between retries.
// Implementations of this interface can provide custom logic for determining
// how long to wait before retrying a failed task based on the number of retries.
type Backoff interface {
// Calculate returns the duration to wait before the next retry attempt.
// The input parameter retries indicates how many times the task has already been retried.
Calculate(retries int) time.Duration
}

// BackoffPolicy defines the configuration for handling retries with backoff logic.
// It provides settings for base delay, maximum delay, jitter, and the range for jitter.
type BackoffPolicy struct {
Expand Down
12 changes: 3 additions & 9 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ManagerOption func(*ManagerConfig)

// ManagerConfig holds configuration options for the Manager.
type ManagerConfig struct {
BackoffPolicy *BackoffPolicy
BackoffPolicy Backoff
}

// Manager is responsible for managing the lifecycle of workers,
Expand All @@ -37,7 +37,7 @@ type Manager struct {
// Example usage:
//
// manager := NewManager(bp, numOfWorkers, WithBackoffPolicy(customBackoffPolicy))
func WithBackoffPolicy(bp *BackoffPolicy) ManagerOption {
func WithBackoffPolicy(bp Backoff) ManagerOption {
return func(cfg *ManagerConfig) {
cfg.BackoffPolicy = bp
}
Expand Down Expand Up @@ -79,14 +79,8 @@ func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...Manager
opt(managerConfig)
}

// Set default backoff if cfg.BackoffPolicy == nil
if managerConfig.BackoffPolicy == nil {
managerConfig.BackoffPolicy = &BackoffPolicy{
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
UseJitter: true,
JitterRangeMs: 300,
}
managerConfig.BackoffPolicy = &DefaultBackoffPolicy
}

manager := &Manager{broker: broker, ctx: ctx, cancel: cancel}
Expand Down
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type WorkerFactory func(cfg WorkerConfig) Worker
type WorkerConfig struct {
ID int
Broker Broker
Backoff *BackoffPolicy
Backoff Backoff
WG *sync.WaitGroup
}

Expand All @@ -86,7 +86,7 @@ type WorkerConfig struct {
type DefaultWorker struct {
id int
broker Broker
backoff *BackoffPolicy
backoff Backoff
handlers map[string]TaskHandlerFunc
wg *sync.WaitGroup
}
Expand Down