From 376a933c0de4dd269cfad03a46e1b55e5880fcee Mon Sep 17 00:00:00 2001 From: Fran Grau Date: Sun, 4 Aug 2024 13:03:44 +0200 Subject: [PATCH 1/2] Added support for jobType filtering on workers and pool workers --- client.go | 29 ++++++++--------------------- ctx.go | 4 +--- job_query.go | 41 +++++++++++++++++++++++++++++++++++++++++ worker.go | 8 +++++--- worker_option.go | 14 ++++++++++++++ 5 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 job_query.go diff --git a/client.go b/client.go index 2f697b3..5bc894f 100644 --- a/client.go +++ b/client.go @@ -186,14 +186,9 @@ func (c *Client) execEnqueue(ctx context.Context, jobs []*Job, q adapter.Queryab // // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). -func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at -FROM gue_jobs -WHERE queue = $1 AND run_at <= $2 -ORDER BY priority ASC -LIMIT 1 FOR UPDATE SKIP LOCKED` - - return c.execLockJob(ctx, true, sql, queue, time.Now().UTC()) +func (c *Client) LockJob(ctx context.Context, queue string, jobTypes ...string) (*Job, error) { + sql, args := newLockJobQuery(queue, jobTypes) + return c.execLockJob(ctx, true, sql, args...) } // LockJobByID attempts to retrieve a specific Job from the database. @@ -207,11 +202,8 @@ LIMIT 1 FOR UPDATE SKIP LOCKED` // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at -FROM gue_jobs -WHERE job_id = $1 FOR UPDATE SKIP LOCKED` - - return c.execLockJob(ctx, false, sql, id.String()) + sql, args := newLockByIDQuery(id) + return c.execLockJob(ctx, false, sql, args...) } // LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. @@ -227,14 +219,9 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED` // // After the Job has been worked, you must call either Job.Done() or Job.Error() on it // in order to commit transaction to persist Job changes (remove or update it). -func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) { - sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at -FROM gue_jobs -WHERE queue = $1 AND run_at <= $2 -ORDER BY run_at, priority ASC -LIMIT 1 FOR UPDATE SKIP LOCKED` - - return c.execLockJob(ctx, true, sql, queue, time.Now().UTC()) +func (c *Client) LockNextScheduledJob(ctx context.Context, queue string, queueTypes ...string) (*Job, error) { + sql, args := newLockNextScheduledJobQuery(queue, queueTypes) + return c.execLockJob(ctx, true, sql, args...) } func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql string, args ...any) (*Job, error) { diff --git a/ctx.go b/ctx.go index 76434ac..4b6b33a 100644 --- a/ctx.go +++ b/ctx.go @@ -4,9 +4,7 @@ import "context" type ctxKey struct{} -var ( - workerIdxKey = ctxKey{} -) +var workerIdxKey = ctxKey{} const ( // WorkerIdxUnknown is returned when worker index in the pool is not set for some reason. diff --git a/job_query.go b/job_query.go new file mode 100644 index 0000000..2d82e2c --- /dev/null +++ b/job_query.go @@ -0,0 +1,41 @@ +package gue + +import ( + "fmt" + "time" + + "github.com/oklog/ulid/v2" +) + +const ( + sqlSelectFrom = "SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at FROM gue_jobs" + sqlLimitLock = "LIMIT 1 FOR UPDATE SKIP LOCKED" + sqlOrderByPriority = "ORDER BY priority ASC" + sqlOrderByRunAtPriority = "ORDER BY run_at, priority ASC" +) + +func newLockJobQuery(queue string, jobTypes []string) (string, []any) { + whereClause, args := buildWhereClause(queue, jobTypes) + sql := fmt.Sprintf(`%s %s %s %s`, sqlSelectFrom, whereClause, sqlOrderByPriority, sqlLimitLock) + return sql, args +} + +func newLockByIDQuery(id ulid.ULID) (string, []any) { + return fmt.Sprintf(`%s WHERE job_id = $1 %s`, sqlSelectFrom, sqlLimitLock), []any{id} +} + +func newLockNextScheduledJobQuery(queue string, jobTypes []string) (string, []any) { + whereClause, args := buildWhereClause(queue, jobTypes) + sql := fmt.Sprintf(`%s %s %s %s`, sqlSelectFrom, whereClause, sqlOrderByRunAtPriority, sqlLimitLock) + return sql, args +} + +func buildWhereClause(queue string, jobTypes []string) (string, []any) { + whereClause := `WHERE queue = $1 AND run_at <= $2` + args := []interface{}{queue, time.Now().UTC()} + if len(jobTypes) > 0 { + whereClause += " AND job_type = ANY ($3::TEXT[])" + args = append(args, jobTypes) + } + return whereClause, args +} diff --git a/worker.go b/worker.go index cdee178..868e51f 100644 --- a/worker.go +++ b/worker.go @@ -58,7 +58,7 @@ type HookFunc func(ctx context.Context, j *Job, err error) type WorkMap map[string]WorkFunc // pollFunc is a function that queries the DB for the next job to work on -type pollFunc func(context.Context, string) (*Job, error) +type pollFunc func(context.Context, string, ...string) (*Job, error) // Worker is a single worker that pulls jobs off the specified queue. If no Job // is found, the Worker will sleep for interval seconds. @@ -66,6 +66,7 @@ type Worker struct { wm WorkMap interval time.Duration queue string + jobTypes []string c *Client id string logger adapter.Logger @@ -191,7 +192,7 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { defer span.End() } - j, err := w.pollFunc(ctx, w.queue) + j, err := w.pollFunc(ctx, w.queue, w.jobTypes...) if err != nil { span.RecordError(fmt.Errorf("worker failed to lock a job: %w", err)) w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(""), attrSuccess.Bool(false))) @@ -408,6 +409,7 @@ type WorkerPool struct { wm WorkMap interval time.Duration queue string + jobTypes []string c *Client workers []*Worker id string @@ -468,6 +470,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt w.wm, WithWorkerPollInterval(w.interval), WithWorkerQueue(w.queue), + WithWorkerJobTypes(w.jobTypes), WithWorkerID(fmt.Sprintf("%s/worker-%d", w.id, i)), WithWorkerLogger(w.logger), WithWorkerPollStrategy(w.pollStrategy), @@ -482,7 +485,6 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt WithWorkerJobTTL(w.jobTTL), WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF), ) - if err != nil { return nil, fmt.Errorf("could not init worker instance: %w", err) } diff --git a/worker_option.go b/worker_option.go index 3f31a4b..b0399e6 100644 --- a/worker_option.go +++ b/worker_option.go @@ -31,6 +31,13 @@ func WithWorkerQueue(queue string) WorkerOption { } } +// WithWorkerJobTypes limits/filters the job types this worker will fetch from the DB. +func WithWorkerJobTypes(jobTypes []string) WorkerOption { + return func(w *Worker) { + w.jobTypes = jobTypes + } +} + // WithWorkerID sets worker ID for easier identification in logs func WithWorkerID(id string) WorkerOption { return func(w *Worker) { @@ -171,6 +178,13 @@ func WithPoolQueue(queue string) WorkerPoolOption { } } +// WithPoolJobTypes limits/filters the job types this worker will fetch from the DB. +func WithPoolJobTypes(jobTypes ...string) WorkerPoolOption { + return func(w *WorkerPool) { + w.jobTypes = jobTypes + } +} + // WithPoolID sets worker pool ID for easier identification in logs func WithPoolID(id string) WorkerPoolOption { return func(w *WorkerPool) { From 7320687e486759d9919c8390262405d71d171e43 Mon Sep 17 00:00:00 2001 From: Fran Grau Date: Sun, 4 Aug 2024 13:30:58 +0200 Subject: [PATCH 2/2] Updated idx_gue_jobs_selector index to include job_type --- migrations/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/schema.sql b/migrations/schema.sql index 04a2b6b..f9a2e33 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -12,4 +12,4 @@ CREATE TABLE IF NOT EXISTS gue_jobs updated_at TIMESTAMPTZ NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, run_at, priority); +CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, job_type, run_at, priority);