Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

Generic, type-safe handy worker pool in Go.


It executes tasks using a fixed number of worker goroutines passed to the pool constructor.

Task submission is coordinated through a buffered channel. The buffer size is configurable and defines the maximum number of tasks that can be queued before backpressure is applied. When the buffer is full, `Submit` blocks until capacity becomes available.
Two implementations are available:

- **Bounded** (default): tasks flow through a buffered channel. The buffer size is configurable via `WithBuffer`. When the buffer is full, `Submit` blocks until capacity becomes available. Use this when you want backpressure and bounded memory usage.
- **Unbounded** (opt-in via `WithUnboundedQueue`): tasks are buffered in a dynamic slice. `Submit` never blocks on capacity. Use this when you must never block the caller, at the cost of unbounded memory growth during backpressure.

The pool supports both immediate cancellation via context and graceful shutdown.

Expand All @@ -21,7 +23,9 @@ Immediate cancellation stops workers and drops queued tasks that have not yet st
go get github.com/alesr/workerpool
```

How:
## Usage

### Bounded pool (default)

```go
type bazooka struct {
Expand All @@ -30,7 +34,6 @@ type bazooka struct {
bodyCount *atomic.Int32
}

// Do simulate some bazooking
func (b *bazooka) Do(ctx context.Context) {
b.ammo--
fmt.Fprintln(os.Stderr, "bazooking: "+b.targetID)
Expand All @@ -41,19 +44,34 @@ func main() {
pool := workerpool.New[*bazooka](context.TODO(), 3)

var bodyCount atomic.Int32

bazookas := []bazooka{
{ammo: 69, targetID: "foo-id", bodyCount: &bodyCount},
{ammo: 42, targetID: "bar-id", bodyCount: &bodyCount},
{ammo: 11, targetID: "qux-id", bodyCount: &bodyCount},
}

for i := range bazookas {
_ = pool.Submit(context.TODO(), &bazookas[i])
}

_ = pool.GracefulShutdown()

fmt.Printf("Body count: %d\n", bodyCount.Load())

fmt.Printf("Body count: %d\n", bodyCount.Load())
}
```

### Unbounded pool (opt-in)

Use `WithUnboundedQueue` to create a pool where `Submit` never blocks:

```go
func main() {
pool := workerpool.New(context.TODO(), 3, workerpool.WithUnboundedQueue[*bazooka]())

// ... same usage as the bounded pool ...
}
```

When the backpressure is undesired, the unbounded variant
gives callers a guarantee that `Submit` will always return immediately.
59 changes: 59 additions & 0 deletions boundedpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package workerpool

import "context"

// boundedPool implements a fixed-size channel-backed pool
type boundedPool[T Task] struct {
poolCore[T]
}

func newBoundedPool[T Task](ctx context.Context, workers int, buffer int) *boundedPool[T] {
poolCtx, cancel := context.WithCancel(ctx)

p := &boundedPool[T]{}
p.ctx = poolCtx
p.cancel = cancel
p.stop = make(chan struct{})
p.dispatch = make(chan entry[T], buffer)

p.wg.Add(workers)
for range workers {
go p.worker()
}
return p
}

func (p *boundedPool[T]) worker() {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
p.ungracefulStop.Store(true)
return
case <-p.stop:
for {
select {
case entry := <-p.dispatch:
entry.job.Do(entry.ctx)
default:
return
}
}
case entry := <-p.dispatch:
entry.job.Do(entry.ctx)
}
}
}

func (p *boundedPool[T]) Submit(ctx context.Context, task T) error {
select {
case <-ctx.Done():
return ErrTaskCancelled
case <-p.ctx.Done():
return ErrPoolClosed
case <-p.stop:
return ErrPoolClosed
case p.dispatch <- entry[T]{ctx: ctx, job: task}:
return nil
}
}
Loading
Loading