diff --git a/README.md b/README.md index 374dfd7..8084bd4 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![codecov](https://codecov.io/gh/alesr/workerpool/graph/badge.svg?token=4dxDuntYgf)](https://codecov.io/gh/alesr/workerpool) [![Go Report Card](https://goreportcard.com/badge/github.com/alesr/workerpool)](https://goreportcard.com/report/github.com/alesr/workerpool) -[![Go Reference](https://pkg.go.dev/badge/github.com/alesr/workerpool.git.svg)](https://pkg.go.dev/github.com/alesr/workerpool.git) +[![Go Reference](https://pkg.go.dev/badge/github.com/alesr/workerpool.git.svg)](https://pkg.go.dev/github.com/alesr/workerpool) Generic, type-safe handy worker pool in Go. diff --git a/adapters/redisstream/go.mod b/adapters/redisstream/go.mod new file mode 100644 index 0000000..dab9cf3 --- /dev/null +++ b/adapters/redisstream/go.mod @@ -0,0 +1,10 @@ +module github.com/alesr/workerpool/adapters/redisstream + +go 1.26 + +require github.com/redis/go-redis/v9 v9.19.0 + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + go.uber.org/atomic v1.11.0 // indirect +) diff --git a/adapters/redisstream/go.sum b/adapters/redisstream/go.sum new file mode 100644 index 0000000..41952ed --- /dev/null +++ b/adapters/redisstream/go.sum @@ -0,0 +1,22 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k= +github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/adapters/redisstream/redisstream.go b/adapters/redisstream/redisstream.go new file mode 100644 index 0000000..8c43b78 --- /dev/null +++ b/adapters/redisstream/redisstream.go @@ -0,0 +1,131 @@ +package redisstream + +import ( + "context" + "errors" + "fmt" + "log" + "strings" + "time" + + "github.com/redis/go-redis/v9" +) + +type redisClient interface { + XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) *redis.StatusCmd + XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd + XAck(ctx context.Context, stream string, group string, ids ...string) *redis.IntCmd +} + +type TaskHandler interface { + Submit(ctx context.Context, id string, payload map[string]string, ack func() error) error +} + +type StreamOptions struct { + BatchSize int64 + BlockTimeout time.Duration + ErrHandler func(err error) +} + +func DefaultStreamOptions() StreamOptions { + return StreamOptions{ + BatchSize: 10, + BlockTimeout: 2 * time.Second, + ErrHandler: func(err error) { + log.Printf("[redispool] stream error: %v", err) + }, + } +} + +type StreamAdapter struct { + rdb redisClient + streamName string + groupName string + consumerID string + opts StreamOptions +} + +func NewStreamAdapter(rdb redisClient, stream, group, consumer string, opts StreamOptions) *StreamAdapter { + if opts.BatchSize <= 0 { + opts.BatchSize = DefaultStreamOptions().BatchSize + } + if opts.BlockTimeout <= 0 { + opts.BlockTimeout = DefaultStreamOptions().BlockTimeout + } + if opts.ErrHandler == nil { + opts.ErrHandler = DefaultStreamOptions().ErrHandler + } + + return &StreamAdapter{ + rdb: rdb, + streamName: stream, + groupName: group, + consumerID: consumer, + opts: opts, + } +} + +func (a *StreamAdapter) Initialize(ctx context.Context, offset string) error { + if err := a.rdb.XGroupCreateMkStream(ctx, a.streamName, a.groupName, offset).Err(); err != nil { + if strings.HasPrefix(err.Error(), "BUSYGROUP") { + return nil + } + return fmt.Errorf("could not create group stream: %w", err) + } + return nil +} + +// Consume replaces FetchTasks injecting tasks directly into the handler. +func (a *StreamAdapter) Consume(ctx context.Context, handler TaskHandler) { + for { + select { + case <-ctx.Done(): + return + default: + streams, err := a.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: a.groupName, + Consumer: a.consumerID, + Streams: []string{a.streamName, ">"}, + Count: a.opts.BatchSize, + Block: a.opts.BlockTimeout, + }).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + continue + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + a.opts.ErrHandler(err) + + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + continue + } + + for _, stream := range streams { + for _, msg := range stream.Messages { + msgID := msg.ID + + payload := make(map[string]string) + for k, v := range msg.Values { + if str, ok := v.(string); ok { + payload[k] = str + } + } + + if err := handler.Submit(ctx, msg.ID, payload, func() error { + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return a.rdb.XAck(ackCtx, a.streamName, a.groupName, msgID).Err() + }); err != nil { + a.opts.ErrHandler(fmt.Errorf("could not submit task to handler: %w", err)) + } + } + } + } + } +}