Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ jobs:
- name: Run go vet
run: go vet ./...

lint:
name: Lint
needs: deps
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true

- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest

- name: Run staticcheck
run: staticcheck ./...

test:
name: Test
needs: deps
Expand Down
12 changes: 1 addition & 11 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,14 @@ func Example() {

var bodyCount atomic.Int32

// list of tasks to perform
bazookas := []bazooka{
{ammo: 69, targetID: "foo-id", bodyCount: &bodyCount},
{ammo: 42, targetID: "bar-id", bodyCount: &bodyCount},
{ammo: 11, targetID: "qux-id", bodyCount: &bodyCount},
}

// loop over the tasks initializing and
// submitting the tasks to the pool

for _, bazz := range bazookas {
task := workerpool.Task[*bazooka]{
Fn: func(input *bazooka) {
input.Do(context.TODO())
},
Input: &bazz,
}
pool.Submit(task)
pool.Submit(&bazz)
}

if err := pool.GracefulShutdown(); err != nil {
Expand Down
28 changes: 11 additions & 17 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,24 @@ import (
"sync/atomic"
)

// Input wraps a task's execution.
type Input interface {
// Task defines the interface for a task to be executed by a worker pool.
type Task interface {
Do(context.Context)
}

// Task bundles a function with its input.
type Task[T Input] struct {
Fn func(T)
Input T
}

// Option configures a Pool.
type Option[T Input] func(*Pool[T])
type Option[T Task] func(*Pool[T])

// WithBuffer sets the task channel buffer size.
func WithBuffer[T Input](size int) Option[T] {
func WithBuffer[T Task](size int) Option[T] {
return func(p *Pool[T]) {
p.buffer = size
}
}

// Pool maintains fixed worker goroutines processing tasks from a channel.
type Pool[T Input] struct {
tasks chan Task[T] // channel for tasks waiting to be processed
type Pool[T Task] struct {
tasks chan T // channel for tasks waiting to be processed
buffer int // size of the task channel
wg sync.WaitGroup // wait group for worker goroutines

Expand All @@ -47,7 +41,7 @@ type Pool[T Input] struct {
// New creates a pool with numOfWorkers workers.
// The context can be used to stop the pool immediately, skipping any buffered
// tasks. In-flight tasks will still run to completion.
func New[T Input](ctx context.Context, numOfWorkers int, opts ...Option[T]) *Pool[T] {
func New[T Task](ctx context.Context, numOfWorkers int, opts ...Option[T]) *Pool[T] {
if numOfWorkers <= 0 {
numOfWorkers = 1
}
Expand All @@ -64,7 +58,7 @@ func New[T Input](ctx context.Context, numOfWorkers int, opts ...Option[T]) *Poo
opt(p)
}

p.tasks = make(chan Task[T], p.buffer)
p.tasks = make(chan T, p.buffer)

p.wg.Add(numOfWorkers)
for range numOfWorkers {
Expand All @@ -86,20 +80,20 @@ func (p *Pool[T]) worker() {
for {
select {
case task := <-p.tasks:
task.Fn(task.Input)
task.Do(p.ctx)
default:
return
}
}
case task := <-p.tasks:
task.Fn(task.Input)
task.Do(p.ctx)
}
}
}

// Submit sends a task to the pool. Blocks if the task channel is full.
// Returns false if the pool is shutting down or the context was cancelled.
func (p *Pool[T]) Submit(task Task[T]) bool {
func (p *Pool[T]) Submit(task T) bool {
select {
case <-p.ctx.Done(): // forcefully terminate via ctx
return false
Expand Down
Loading
Loading