From e188cfcbb1850a2551d9e39c8a2a511f5e5eb71d Mon Sep 17 00:00:00 2001 From: Alessandro Resta Date: Thu, 11 Jun 2026 13:56:22 +0300 Subject: [PATCH 1/3] Extend pool with unbounded queue option Also split the implementation in core, bounded and unbounded pools for readability --- boundedpool.go | 59 +++++++++++++++++++++ unboundedpool.go | 125 +++++++++++++++++++++++++++++++++++++++++++ workerpool.go | 135 +++++++++++++++++------------------------------ 3 files changed, 231 insertions(+), 88 deletions(-) create mode 100644 boundedpool.go create mode 100644 unboundedpool.go diff --git a/boundedpool.go b/boundedpool.go new file mode 100644 index 0000000..535f191 --- /dev/null +++ b/boundedpool.go @@ -0,0 +1,59 @@ +package workerpool + +import "context" + +// boundedPool implements a fixed-size channel-backed pool +type boundedPool[T Task] struct { + poolCore[T] +} + +func newBoundedPool[T Task](ctx context.Context, workers int, buffer int) *boundedPool[T] { + poolCtx, cancel := context.WithCancel(ctx) + + p := &boundedPool[T]{} + p.ctx = poolCtx + p.cancel = cancel + p.stop = make(chan struct{}) + p.dispatch = make(chan entry[T], buffer) + + p.wg.Add(workers) + for range workers { + go p.worker() + } + return p +} + +func (p *boundedPool[T]) worker() { + defer p.wg.Done() + for { + select { + case <-p.ctx.Done(): + p.ungracefulStop.Store(true) + return + case <-p.stop: + for { + select { + case entry := <-p.dispatch: + entry.job.Do(entry.ctx) + default: + return + } + } + case entry := <-p.dispatch: + entry.job.Do(entry.ctx) + } + } +} + +func (p *boundedPool[T]) Submit(ctx context.Context, task T) error { + select { + case <-ctx.Done(): + return ErrTaskCancelled + case <-p.ctx.Done(): + return ErrPoolClosed + case <-p.stop: + return ErrPoolClosed + case p.dispatch <- entry[T]{ctx: ctx, job: task}: + return nil + } +} diff --git a/unboundedpool.go b/unboundedpool.go new file mode 100644 index 0000000..2b9f720 --- /dev/null +++ b/unboundedpool.go @@ -0,0 +1,125 @@ +package workerpool + +import "context" + +// unboundedPool implements a memory-backed pool with unbounded queuing. +// Submit writes to a non-blocking ingestion channel; a broker goroutine +// drains it into a dynamic slice and feeds tasks to workers +type unboundedPool[T Task] struct { + poolCore[T] + ingress chan entry[T] +} + +func newUnboundedPool[T Task](ctx context.Context, workers int) *unboundedPool[T] { + poolCtx, cancel := context.WithCancel(ctx) + + ingressBuf := max(workers, 1) + + p := &unboundedPool[T]{} + + p.ctx = poolCtx + p.cancel = cancel + p.stop = make(chan struct{}) + p.ingress = make(chan entry[T], ingressBuf) + p.dispatch = make(chan entry[T], workers) + + p.wg.Add(workers) + for range workers { + go p.worker() + } + + go p.broker() + + return p +} + +func (p *unboundedPool[T]) worker() { + defer p.wg.Done() + + for { + select { + case <-p.ctx.Done(): + p.ungracefulStop.Store(true) + return + case entry, ok := <-p.dispatch: + if !ok { + return + } + entry.job.Do(entry.ctx) + } + } +} + +func (p *unboundedPool[T]) broker() { + var queue []entry[T] + + for { + var ( + front entry[T] + dispatchCh chan entry[T] + ) + + if len(queue) > 0 { + front = queue[0] + dispatchCh = p.dispatch + } + + select { + case <-p.ctx.Done(): + return + case <-p.stop: + drainLoop: + for { + select { + case e := <-p.ingress: + queue = append(queue, e) + default: + break drainLoop + } + } + + for _, e := range queue { + select { + case <-p.ctx.Done(): + return + case p.dispatch <- e: + } + } + close(p.dispatch) + return + case e := <-p.ingress: + queue = append(queue, e) + case dispatchCh <- front: + queue = queue[1:] + } + } +} + +func (p *unboundedPool[T]) Submit(ctx context.Context, task T) error { + e := entry[T]{ctx: ctx, job: task} + + select { + case <-p.stop: + return ErrPoolClosed + case <-p.ctx.Done(): + return ErrPoolClosed + default: + } + + select { + case p.ingress <- e: + return nil + default: + } + + select { + case p.ingress <- e: + return nil + case <-ctx.Done(): + return ErrTaskCancelled + case <-p.ctx.Done(): + return ErrPoolClosed + case <-p.stop: + return ErrPoolClosed + } +} diff --git a/workerpool.go b/workerpool.go index ae5e430..9c5e281 100644 --- a/workerpool.go +++ b/workerpool.go @@ -7,6 +7,11 @@ import ( "sync/atomic" ) +var ( + ErrPoolClosed = errors.New("pool is closed") + ErrTaskCancelled = errors.New("task context cancelled") +) + // Task defines the interface for a task to be executed by a worker pool. type Task interface { Do(context.Context) @@ -17,111 +22,70 @@ type entry[T Task] struct { job T } +// Pool is the interface that wraps the basic worker pool operations. +type Pool[T Task] interface { + Submit(ctx context.Context, task T) error + GracefulShutdown() error +} + +type config struct { + bufferSize int + unbounded bool +} + // Option configures a Pool. -type Option[T Task] func(*Pool[T]) +type Option[T Task] func(*config) -// WithBuffer sets the task channel buffer size. +// WithBuffer sets the task channel buffer size for a bounded pool. func WithBuffer[T Task](size int) Option[T] { - return func(p *Pool[T]) { - p.buffer = size + return func(cfg *config) { + cfg.bufferSize = size + cfg.unbounded = false } } -// Pool maintains fixed worker goroutines processing tasks from a channel. -type Pool[T Task] struct { - entries chan entry[T] // channel for jobs waiting to be processed - buffer int // size of the task channel - wg sync.WaitGroup // wait group for worker goroutines +// WithUnboundedQueue configures the pool to use an unbounded in-memory queue. +// Submit never blocks on capacity — tasks are buffered in a dynamic slice. +func WithUnboundedQueue[T Task]() Option[T] { + return func(cfg *config) { + cfg.unbounded = true + } +} - // immediate termination - ctx context.Context - cancel context.CancelFunc - ungracefulStop atomic.Bool +// poolCore holds shared lifecycle state for bounded and unbounded pool implementations. +type poolCore[T Task] struct { + ctx context.Context + cancel context.CancelFunc - // graceful shutdown - stop chan struct{} - shutdownOnce sync.Once + stop chan struct{} + dispatch chan entry[T] + + wg sync.WaitGroup + shutdownOnce sync.Once + ungracefulStop atomic.Bool } -// New creates a pool with number of available workers. -// The context can be used to stop the pool immediately, skipping any buffered -// tasks. In-flight tasks will still run to completion. -func New[T Task](ctx context.Context, workers int, opts ...Option[T]) *Pool[T] { +// New creates a worker pool. Returns a bounded channel-backed pool by default. +// Use WithUnboundedQueue to get a memory-backed pool that never blocks on Submit. +func New[T Task](ctx context.Context, workers int, opts ...Option[T]) Pool[T] { if workers <= 0 { workers = 1 } - poolCtx, cancel := context.WithCancel(ctx) - - p := &Pool[T]{ - ctx: poolCtx, - cancel: cancel, - stop: make(chan struct{}), - } - + var cfg config for _, opt := range opts { - opt(p) - } - - p.entries = make(chan entry[T], p.buffer) - - p.wg.Add(workers) - for range workers { - go p.worker() + opt(&cfg) } - return p -} -func (p *Pool[T]) worker() { - defer p.wg.Done() - for { - select { - case <-p.ctx.Done(): - // exit without draining buffered tasks - p.ungracefulStop.Store(true) - return - case <-p.stop: - // drain remaining buffered tasks before exiting - for { - select { - case entry := <-p.entries: - entry.job.Do(entry.ctx) - default: - // channel is empty. since p.stop is closed, - // no more tasks can be submitted - return - } - } - case entry := <-p.entries: - entry.job.Do(entry.ctx) - } - } -} - -var ( - ErrPoolClosed = errors.New("pool is closed") - ErrTaskCancelled = errors.New("task context cancelled") -) - -// Submit sends a task to the pool. Blocks if the task channel is full. -// Returns false if the pool is shutting down or the context was cancelled. -func (p *Pool[T]) Submit(ctx context.Context, task T) error { - select { - case <-ctx.Done(): - return ErrTaskCancelled - case <-p.ctx.Done(): // forcefully terminate via ctx - return ErrPoolClosed - case <-p.stop: // terminated via graceful shutdown - return ErrPoolClosed - case p.entries <- entry[T]{ctx: ctx, job: task}: - return nil + if cfg.unbounded { + return newUnboundedPool[T](ctx, workers) } + return newBoundedPool[T](ctx, workers, cfg.bufferSize) } // GracefulShutdown stops accepting new tasks, drains all buffered tasks, // and waits for in-flight tasks to complete before returning. -// Returns an error if the ctx was cancelled before shutdown completed. -func (p *Pool[T]) GracefulShutdown() error { +func (p *poolCore[T]) GracefulShutdown() error { if p.ungracefulStop.Load() { return errors.New("pool was forcefully terminated before shutdown") } @@ -130,11 +94,6 @@ func (p *Pool[T]) GracefulShutdown() error { close(p.stop) p.wg.Wait() p.cancel() - - // only close(p.entries) with a lock here and - // a read lock in Submit otherwise senders will panic =] - // but it's just a good to have, since p.stop is closed - // and submit already checks for that }) return nil } From 7b3af8fad2bf1c8c291bf8d182330ea894793f06 Mon Sep 17 00:00:00 2001 From: Alessandro Resta Date: Thu, 11 Jun 2026 13:57:45 +0300 Subject: [PATCH 2/3] Add unit and update example tests --- boundedpool_test.go | 269 ++++++++++++++++++++++++++++++++++ example_test.go | 28 +++- unboundedpool_test.go | 136 ++++++++++++++++++ workerpool_test.go | 325 +++++++++--------------------------------- 4 files changed, 494 insertions(+), 264 deletions(-) create mode 100644 boundedpool_test.go create mode 100644 unboundedpool_test.go diff --git a/boundedpool_test.go b/boundedpool_test.go new file mode 100644 index 0000000..1bdeb19 --- /dev/null +++ b/boundedpool_test.go @@ -0,0 +1,269 @@ +package workerpool + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "testing/synctest" +) + +func TestPool_WithBuffer(t *testing.T) { + t.Parallel() + + pool := New(context.TODO(), 3, WithBuffer[testTask](10)) + + var counter atomic.Int32 + + // submit 20 tasks + for i := 1; i <= 20; i++ { + task := testTask{ + value: i, + counter: &counter, + } + + if err := pool.Submit(context.TODO(), task); err != nil { + t.Error("failed to submit task") + } + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + // sum of 1..20 = 210 + if got := counter.Load(); got != 210 { + t.Errorf("expected counter = 210, got %d", got) + } +} + +func TestPool_Submit(t *testing.T) { + t.Parallel() + + t.Run("queues and executes tasks", func(t *testing.T) { + t.Parallel() + + pool := New[testTask](context.TODO(), 3) + + var counter atomic.Int32 + for i := 1; i <= 10; i++ { + task := testTask{ + value: i, + counter: &counter, + } + + if err := pool.Submit(context.TODO(), task); err != nil { + t.Error("failed to submit task") + } + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("shutdown error: %v", err) + } + + // sum of 1..10 = 55 + if got := counter.Load(); got != 55 { + t.Errorf("expected counter = 55, got %d", got) + } + }) + + t.Run("second submit blocks when the worker is busy", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + pool := New[testTask](t.Context(), 1) + + counter := &atomic.Int32{} + blocker := make(chan struct{}) + + go func() { + pool.Submit(context.TODO(), testTask{ + fn: func(_ context.Context) { + <-blocker + }, + value: 1, + counter: counter, + }) + }() + + synctest.Wait() + + // second Submit must block since channel is unbuffered and worker is busy + task2Submitted := make(chan struct{}) + go func() { + pool.Submit(context.TODO(), testTask{ + value: 2, + counter: counter, + }) + close(task2Submitted) + }() + + // task2 goroutine is durably blocked on the channel send + synctest.Wait() + + select { + case <-task2Submitted: + t.Error("submit should be blocked on unbuffered channel") + default: + // task2Submitted is not yet closed. Submit is still blocked + } + + close(blocker) // release the worker + + synctest.Wait() // worker processes both tasks and task2 goroutine exits + + select { + case <-task2Submitted: + // task2Submitted is closed and Submit completed after worker was unblocked, as expected + default: + t.Error("Submit should have completed after worker was unblocked") + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + if got := counter.Load(); got != 3 { + t.Errorf("expected counter = 3, got %d", got) + } + }) + }) + + t.Run("task observes its own context cancellation via Do", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + taskCtx, cancelTask := context.WithCancel(t.Context()) + + pool := New[testTask](t.Context(), 1) + + var counter atomic.Int32 + taskDone := make(chan struct{}) + + pool.Submit(taskCtx, testTask{ + fn: func(ctx context.Context) { + <-ctx.Done() // block + close(taskDone) + }, + value: 1, + counter: &counter, + }) + + synctest.Wait() // worker is now blocked + cancelTask() + + // wait until the <-ctx.Done() is complete (so it can close taskDone) + synctest.Wait() + + select { + case <-taskDone: + // task observed its ctx cancellation + default: + t.Error("task should have observed context cancellation") + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + }) + }) + + t.Run("handles concurrent submissions", func(t *testing.T) { + t.Parallel() + + const workers, tasks = 5, 100 + + pool := New[testTask](context.TODO(), workers) + + var ( + counter atomic.Int32 + wg sync.WaitGroup + ) + + wg.Add(tasks) + + for i := range tasks { + go func(val int) { + defer wg.Done() + + task := testTask{ + value: 1, + counter: &counter, + } + pool.Submit(context.TODO(), task) + }(i) + } + + wg.Wait() + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + if got := counter.Load(); got != tasks { + t.Errorf("expected counter = %d, got %d", tasks, got) + } + }) + + t.Run("blocks caller until capacity is available", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + pool := New[testTask](context.TODO(), 1) + + var counter atomic.Int32 + blocker := make(chan struct{}) + + // submit blocking task + task1 := testTask{ + fn: func(_ context.Context) { + <-blocker // block here + }, + value: 1, + counter: &counter, + } + + go pool.Submit(context.TODO(), task1) + + synctest.Wait() // let worker pick up task1 + + // try to submit second task should block since channel is unbuffered + task2Submitted := make(chan struct{}) + + go func() { + pool.Submit(context.TODO(), testTask{ + value: 2, + counter: &counter, + }) + close(task2Submitted) + }() + + synctest.Wait() // worker still blocked + + select { + case <-task2Submitted: + t.Error("Submit should be blocked while worker is busy") + default: + // expected + } + + close(blocker) // release the worker + synctest.Wait() // worker processes both tasks; task2 goroutine exits + + select { + case <-task2Submitted: + // expected: Submit completed + default: + t.Error("Submit should have completed after worker was unblocked") + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + if got := counter.Load(); got != 3 { + t.Errorf("expected counter = 3, got %d", got) + } + }) + }) +} diff --git a/example_test.go b/example_test.go index c91917f..7290834 100644 --- a/example_test.go +++ b/example_test.go @@ -22,9 +22,7 @@ func (b *bazooka) Do(ctx context.Context) { b.bodyCount.Add(1) } -func Example() { - // starts a pool with 3 workers - // use the context to cancel the pool without waiting for buffered tasks to complete +func Example_bounded() { pool := workerpool.New[*bazooka](context.TODO(), 3) var bodyCount atomic.Int32 @@ -47,3 +45,27 @@ func Example() { // Output: // Body count: 3 } + +func Example_unbounded() { + pool := workerpool.New(context.TODO(), 3, workerpool.WithUnboundedQueue[*bazooka]()) + + var bodyCount atomic.Int32 + + bazookas := []bazooka{ + {ammo: 69, targetID: "foo-id", bodyCount: &bodyCount}, + {ammo: 42, targetID: "bar-id", bodyCount: &bodyCount}, + {ammo: 11, targetID: "qux-id", bodyCount: &bodyCount}, + } + + for i := range bazookas { + _ = pool.Submit(context.TODO(), &bazookas[i]) + } + + if err := pool.GracefulShutdown(); err != nil { + fmt.Fprintf(os.Stderr, "shutdown error: %v\n", err) + } + + fmt.Printf("Body count: %d\n", bodyCount.Load()) + // Output: + // Body count: 3 +} diff --git a/unboundedpool_test.go b/unboundedpool_test.go new file mode 100644 index 0000000..ca0ecd2 --- /dev/null +++ b/unboundedpool_test.go @@ -0,0 +1,136 @@ +package workerpool + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "testing/synctest" +) + +func TestPool_Unbounded(t *testing.T) { + t.Parallel() + + t.Run("queues and executes tasks", func(t *testing.T) { + t.Parallel() + + pool := New[testTask](context.TODO(), 3, WithUnboundedQueue[testTask]()) + + var counter atomic.Int32 + for i := 1; i <= 10; i++ { + task := testTask{ + value: i, + counter: &counter, + } + if err := pool.Submit(context.TODO(), task); err != nil { + t.Error("failed to submit task") + } + } + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("shutdown error: %v", err) + } + + if got := counter.Load(); got != 55 { + t.Errorf("expected counter = 55, got %d", got) + } + }) + + t.Run("handles concurrent submissions", func(t *testing.T) { + t.Parallel() + + const workers, tasks = 5, 100 + + pool := New[testTask](context.TODO(), workers, WithUnboundedQueue[testTask]()) + + var ( + counter atomic.Int32 + wg sync.WaitGroup + ) + + wg.Add(tasks) + + for i := range tasks { + go func(val int) { + defer wg.Done() + task := testTask{ + value: 1, + counter: &counter, + } + pool.Submit(context.TODO(), task) + }(i) + } + + wg.Wait() + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + if got := counter.Load(); got != tasks { + t.Errorf("expected counter = %d, got %d", tasks, got) + } + }) + + t.Run("Submit after shutdown returns ErrPoolClosed", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + pool := New[testTask](t.Context(), 2, WithUnboundedQueue[testTask]()) + + if err := pool.GracefulShutdown(); err != nil { + t.Errorf("shutdown error: %v", err) + } + + err := pool.Submit(context.TODO(), testTask{value: 1}) + if err == nil { + t.Error("expected error after shutdown") + } + if !errors.Is(err, ErrPoolClosed) { + t.Errorf("expected ErrPoolClosed, got %v", err) + } + }) + }) + + t.Run("graceful shutdown drains all queued tasks", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + pool := New[testTask](t.Context(), 1, WithUnboundedQueue[testTask]()) + + var counter atomic.Int32 + blocker := make(chan struct{}) + + pool.Submit(context.TODO(), testTask{ + fn: func(_ context.Context) { + <-blocker + }, + value: 1, counter: &counter, + }) + + synctest.Wait() + + for range 5 { + pool.Submit(context.TODO(), testTask{ + value: 1, counter: &counter, + }) + } + + shutdownDone := make(chan error, 1) + go func() { + shutdownDone <- pool.GracefulShutdown() + }() + + close(blocker) + + if err := <-shutdownDone; err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } + + if got := counter.Load(); got != 6 { + t.Errorf("expected counter = 6, got %d", got) + } + }) + }) +} diff --git a/workerpool_test.go b/workerpool_test.go index 78fc437..2bfae64 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -3,7 +3,6 @@ package workerpool import ( "context" "errors" - "sync" "sync/atomic" "testing" "testing/synctest" @@ -51,266 +50,6 @@ func TestNew(t *testing.T) { }) } -func TestPool_WithBuffer(t *testing.T) { - t.Parallel() - - pool := New(context.TODO(), 3, WithBuffer[testTask](10)) - - var counter atomic.Int32 - - // submit 20 tasks - for i := 1; i <= 20; i++ { - task := testTask{ - value: i, - counter: &counter, - } - - if err := pool.Submit(context.TODO(), task); err != nil { - t.Error("failed to submit task") - } - } - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("unexpected shutdown error: %v", err) - } - - // sum of 1..20 = 210 - if got := counter.Load(); got != 210 { - t.Errorf("expected counter = 210, got %d", got) - } -} - -func TestPool_Submit(t *testing.T) { - t.Parallel() - - t.Run("queues and executes tasks", func(t *testing.T) { - t.Parallel() - - pool := New[testTask](context.TODO(), 3) - - var counter atomic.Int32 - for i := 1; i <= 10; i++ { - task := testTask{ - value: i, - counter: &counter, - } - - if err := pool.Submit(context.TODO(), task); err != nil { - t.Error("failed to submit task") - } - } - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("shutdown error: %v", err) - } - - // sum of 1..10 = 55 - if got := counter.Load(); got != 55 { - t.Errorf("expected counter = 55, got %d", got) - } - }) - - t.Run("second submit blocks when the worker is busy”", func(t *testing.T) { - t.Parallel() - - synctest.Test(t, func(t *testing.T) { - pool := New[testTask](t.Context(), 1) - - counter := &atomic.Int32{} - blocker := make(chan struct{}) - - go func() { - pool.Submit(context.TODO(), testTask{ - fn: func(_ context.Context) { - <-blocker - }, - value: 1, - counter: counter, - }) - }() - - synctest.Wait() - - // second Submit must block since channel is unbuffered and worker is busy - task2Submitted := make(chan struct{}) - go func() { - pool.Submit(context.TODO(), testTask{ - value: 2, - counter: counter, - }) - close(task2Submitted) - }() - - // task2 goroutine is durably blocked on the channel send - synctest.Wait() - - select { - case <-task2Submitted: - t.Error("submit should be blocked on unbuffered channel") - default: - // task2Submitted is not yet closed. Submit is still blocked - } - - close(blocker) // release the worker - - synctest.Wait() // worker processes both tasks and task2 goroutine exits - - select { - case <-task2Submitted: - // task2Submitted is closed and Submit completed after worker was unblocked, as expected - default: - t.Error("Submit should have completed after worker was unblocked") - } - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("unexpected shutdown error: %v", err) - } - - if got := counter.Load(); got != 3 { - t.Errorf("expected counter = 3, got %d", got) - } - }) - }) - - t.Run("task observes its own context cancellation via Do", func(t *testing.T) { - t.Parallel() - - synctest.Test(t, func(t *testing.T) { - taskCtx, cancelTask := context.WithCancel(t.Context()) - - pool := New[testTask](t.Context(), 1) - - var counter atomic.Int32 - taskDone := make(chan struct{}) - - pool.Submit(taskCtx, testTask{ - fn: func(ctx context.Context) { - <-ctx.Done() // block - close(taskDone) - }, - value: 1, - counter: &counter, - }) - - synctest.Wait() // worker is now blocked - cancelTask() - - // wait until the <-ctx.Done() is complete (so it can close taskDone) - synctest.Wait() - - select { - case <-taskDone: - // task observed its ctx cancellation - default: - t.Error("task should have observed context cancellation") - } - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("unexpected shutdown error: %v", err) - } - }) - }) - - t.Run("handles concurrent submissions", func(t *testing.T) { - t.Parallel() - - const workers, tasks = 5, 100 - - pool := New[testTask](context.TODO(), workers) - - var ( - counter atomic.Int32 - wg sync.WaitGroup - ) - - wg.Add(tasks) - - for i := range tasks { - go func(val int) { - defer wg.Done() - - task := testTask{ - value: 1, - counter: &counter, - } - pool.Submit(context.TODO(), task) - }(i) - } - - wg.Wait() - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("unexpected shutdown error: %v", err) - } - - if got := counter.Load(); got != tasks { - t.Errorf("expected counter = %d, got %d", tasks, got) - } - }) - - t.Run("blocks caller until capacity is available", func(t *testing.T) { - t.Parallel() - - synctest.Test(t, func(t *testing.T) { - pool := New[testTask](context.TODO(), 1) - - var counter atomic.Int32 - blocker := make(chan struct{}) - - // submit blocking task - task1 := testTask{ - fn: func(_ context.Context) { - <-blocker // block here - }, - value: 1, - counter: &counter, - } - - go pool.Submit(context.TODO(), task1) - - synctest.Wait() // let worker pick up task1 - - // try to submit second task should block since channel is unbuffered - task2Submitted := make(chan struct{}) - - go func() { - pool.Submit(context.TODO(), testTask{ - value: 2, - counter: &counter, - }) - close(task2Submitted) - }() - - synctest.Wait() // worker still blocked - - select { - case <-task2Submitted: - t.Error("Submit should be blocked while worker is busy") - default: - // expected - } - - close(blocker) // release the worker - synctest.Wait() // worker processes both tasks; task2 goroutine exits - - select { - case <-task2Submitted: - // expected: Submit completed - default: - t.Error("Submit should have completed after worker was unblocked") - } - - if err := pool.GracefulShutdown(); err != nil { - t.Errorf("unexpected shutdown error: %v", err) - } - - if got := counter.Load(); got != 3 { - t.Errorf("expected counter = 3, got %d", got) - } - }) - }) -} - func TestPool_GracefulShutdown(t *testing.T) { t.Parallel() @@ -465,3 +204,67 @@ func TestPool_GracefulShutdown(t *testing.T) { }) }) } + +func TestPool_ImmediateShutdown(t *testing.T) { + t.Parallel() + + t.Run("pool stops when context is cancelled", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + pool := New[testTask](ctx, 2) + + var counter atomic.Int32 + blocker := make(chan struct{}) + + pool.Submit(context.TODO(), testTask{ + fn: func(_ context.Context) { + <-blocker + }, + value: 1, counter: &counter, + }) + + synctest.Wait() + + cancel() // immediate shutdown + synctest.Wait() + + close(blocker) // release the task — but pool is already stopped + }) + }) + + t.Run("GracefulShutdown returns error after immediate stop", func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + pool := New[testTask](ctx, 2) + + cancel() + synctest.Wait() + + err := pool.GracefulShutdown() + if err == nil { + t.Error("expected error when calling GracefulShutdown after immediate stop") + } + }) + }) + + t.Run("Submit returns ErrPoolClosed after immediate stop", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + pool := New[testTask](ctx, 2) + + cancel() + + err := pool.Submit(context.TODO(), testTask{value: 1}) + if err == nil { + t.Error("expected error after immediate stop") + } + if !errors.Is(err, ErrPoolClosed) { + t.Errorf("expected ErrPoolClosed, got %v", err) + } + }) +} From 44c3897bc886502f79e683abedf7b6d8724bc8ec Mon Sep 17 00:00:00 2001 From: Alessandro Resta Date: Thu, 11 Jun 2026 14:09:56 +0300 Subject: [PATCH 3/3] Update readme --- README.md | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 8084bd4..9098ed2 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,12 @@ Generic, type-safe handy worker pool in Go. - It executes tasks using a fixed number of worker goroutines passed to the pool constructor. -Task submission is coordinated through a buffered channel. The buffer size is configurable and defines the maximum number of tasks that can be queued before backpressure is applied. When the buffer is full, `Submit` blocks until capacity becomes available. +Two implementations are available: + +- **Bounded** (default): tasks flow through a buffered channel. The buffer size is configurable via `WithBuffer`. When the buffer is full, `Submit` blocks until capacity becomes available. Use this when you want backpressure and bounded memory usage. +- **Unbounded** (opt-in via `WithUnboundedQueue`): tasks are buffered in a dynamic slice. `Submit` never blocks on capacity. Use this when you must never block the caller, at the cost of unbounded memory growth during backpressure. The pool supports both immediate cancellation via context and graceful shutdown. @@ -21,7 +23,9 @@ Immediate cancellation stops workers and drops queued tasks that have not yet st go get github.com/alesr/workerpool ``` -How: +## Usage + +### Bounded pool (default) ```go type bazooka struct { @@ -30,7 +34,6 @@ type bazooka struct { bodyCount *atomic.Int32 } -// Do simulate some bazooking func (b *bazooka) Do(ctx context.Context) { b.ammo-- fmt.Fprintln(os.Stderr, "bazooking: "+b.targetID) @@ -41,19 +44,34 @@ func main() { pool := workerpool.New[*bazooka](context.TODO(), 3) var bodyCount atomic.Int32 - + bazookas := []bazooka{ {ammo: 69, targetID: "foo-id", bodyCount: &bodyCount}, {ammo: 42, targetID: "bar-id", bodyCount: &bodyCount}, {ammo: 11, targetID: "qux-id", bodyCount: &bodyCount}, } - + for i := range bazookas { _ = pool.Submit(context.TODO(), &bazookas[i]) } - + _ = pool.GracefulShutdown() - - fmt.Printf("Body count: %d\n", bodyCount.Load()) + + fmt.Printf("Body count: %d\n", bodyCount.Load()) +} +``` + +### Unbounded pool (opt-in) + +Use `WithUnboundedQueue` to create a pool where `Submit` never blocks: + +```go +func main() { + pool := workerpool.New(context.TODO(), 3, workerpool.WithUnboundedQueue[*bazooka]()) + + // ... same usage as the bounded pool ... } ``` + +When the backpressure is undesired, the unbounded variant +gives callers a guarantee that `Submit` will always return immediately.