Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,9 @@ func (c *Client) execEnqueue(ctx context.Context, jobs []*Job, q adapter.Queryab
//
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`

return c.execLockJob(ctx, true, sql, queue, time.Now().UTC())
func (c *Client) LockJob(ctx context.Context, queue string, jobTypes ...string) (*Job, error) {
sql, args := newLockJobQuery(queue, jobTypes)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice from the DRY point of view, but makes code readability way worse

I prefer having full queries here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that now the job_type condition is not fixed as the previous conditions where before.
So to keep it there we need:

  • Define the first part of the SQL
  • Define the base Where clause
  • Define the base args
  • If there are job types, append to the where clause the condition and append to args the array
  • Add the end of the query
  • Call the exec
    As you can see it gets messy and a ton of duplicated lines among functions.

Could you suggest another approach that would not make those functions messy?

PS: I've been always trying to separate as much as possible logic from SQL, as you separate HTML/CSS from logic.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you suggest another approach that would not make those functions messy?

sql := `SELECT ... `
if len(jobTypes) > 0 {
  sql = `SELECT ...`
}

and tests that will cover branching

return c.execLockJob(ctx, true, sql, args...)
}

// LockJobByID attempts to retrieve a specific Job from the database.
Expand All @@ -207,11 +202,8 @@ LIMIT 1 FOR UPDATE SKIP LOCKED`
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE job_id = $1 FOR UPDATE SKIP LOCKED`

return c.execLockJob(ctx, false, sql, id.String())
sql, args := newLockByIDQuery(id)
return c.execLockJob(ctx, false, sql, args...)
}

// LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue.
Expand All @@ -227,14 +219,9 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED`
//
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY run_at, priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`

return c.execLockJob(ctx, true, sql, queue, time.Now().UTC())
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string, queueTypes ...string) (*Job, error) {
sql, args := newLockNextScheduledJobQuery(queue, queueTypes)
return c.execLockJob(ctx, true, sql, args...)
}

func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql string, args ...any) (*Job, error) {
Expand Down
4 changes: 1 addition & 3 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import "context"

type ctxKey struct{}

var (
workerIdxKey = ctxKey{}
)
var workerIdxKey = ctxKey{}

const (
// WorkerIdxUnknown is returned when worker index in the pool is not set for some reason.
Expand Down
41 changes: 41 additions & 0 deletions job_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package gue

import (
"fmt"
"time"

"github.com/oklog/ulid/v2"
)

const (
sqlSelectFrom = "SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at FROM gue_jobs"
sqlLimitLock = "LIMIT 1 FOR UPDATE SKIP LOCKED"
sqlOrderByPriority = "ORDER BY priority ASC"
sqlOrderByRunAtPriority = "ORDER BY run_at, priority ASC"
)

func newLockJobQuery(queue string, jobTypes []string) (string, []any) {
whereClause, args := buildWhereClause(queue, jobTypes)
sql := fmt.Sprintf(`%s %s %s %s`, sqlSelectFrom, whereClause, sqlOrderByPriority, sqlLimitLock)
return sql, args
}

func newLockByIDQuery(id ulid.ULID) (string, []any) {
return fmt.Sprintf(`%s WHERE job_id = $1 %s`, sqlSelectFrom, sqlLimitLock), []any{id}
}

func newLockNextScheduledJobQuery(queue string, jobTypes []string) (string, []any) {
whereClause, args := buildWhereClause(queue, jobTypes)
sql := fmt.Sprintf(`%s %s %s %s`, sqlSelectFrom, whereClause, sqlOrderByRunAtPriority, sqlLimitLock)
return sql, args
}

func buildWhereClause(queue string, jobTypes []string) (string, []any) {
whereClause := `WHERE queue = $1 AND run_at <= $2`
args := []interface{}{queue, time.Now().UTC()}
if len(jobTypes) > 0 {
whereClause += " AND job_type = ANY ($3::TEXT[])"
args = append(args, jobTypes)
}
return whereClause, args
}
2 changes: 1 addition & 1 deletion migrations/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ CREATE TABLE IF NOT EXISTS gue_jobs
updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, run_at, priority);
CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, job_type, run_at, priority);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is backward-compatibility breaking change actually

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a breaking change? if the index exists it will ignore the line.
Do you want me to add an additional index for this?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because w/out this change new functionality may cause performance issues, and given that gue is just a library - the app using it may start experiencing serious issues, e.g. running out of available DB connections because gue worker queries are taking too long

previously all DB changes were part of the major version update

I'm fine to let this index change in the minor version update, but then you need to prepare a migration that can be mentioned in the change notes

8 changes: 5 additions & 3 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ type HookFunc func(ctx context.Context, j *Job, err error)
type WorkMap map[string]WorkFunc

// pollFunc is a function that queries the DB for the next job to work on
type pollFunc func(context.Context, string) (*Job, error)
type pollFunc func(context.Context, string, ...string) (*Job, error)

// Worker is a single worker that pulls jobs off the specified queue. If no Job
// is found, the Worker will sleep for interval seconds.
type Worker struct {
wm WorkMap
interval time.Duration
queue string
jobTypes []string
c *Client
id string
logger adapter.Logger
Expand Down Expand Up @@ -191,7 +192,7 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
defer span.End()
}

j, err := w.pollFunc(ctx, w.queue)
j, err := w.pollFunc(ctx, w.queue, w.jobTypes...)
if err != nil {
span.RecordError(fmt.Errorf("worker failed to lock a job: %w", err))
w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(""), attrSuccess.Bool(false)))
Expand Down Expand Up @@ -408,6 +409,7 @@ type WorkerPool struct {
wm WorkMap
interval time.Duration
queue string
jobTypes []string
c *Client
workers []*Worker
id string
Expand Down Expand Up @@ -468,6 +470,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
w.wm,
WithWorkerPollInterval(w.interval),
WithWorkerQueue(w.queue),
WithWorkerJobTypes(w.jobTypes),
WithWorkerID(fmt.Sprintf("%s/worker-%d", w.id, i)),
WithWorkerLogger(w.logger),
WithWorkerPollStrategy(w.pollStrategy),
Expand All @@ -482,7 +485,6 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerJobTTL(w.jobTTL),
WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF),
)

if err != nil {
return nil, fmt.Errorf("could not init worker instance: %w", err)
}
Expand Down
14 changes: 14 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func WithWorkerQueue(queue string) WorkerOption {
}
}

// WithWorkerJobTypes limits/filters the job types this worker will fetch from the DB.
func WithWorkerJobTypes(jobTypes []string) WorkerOption {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be variadic to keep the options consistent

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change! 👍

return func(w *Worker) {
w.jobTypes = jobTypes
}
}

// WithWorkerID sets worker ID for easier identification in logs
func WithWorkerID(id string) WorkerOption {
return func(w *Worker) {
Expand Down Expand Up @@ -171,6 +178,13 @@ func WithPoolQueue(queue string) WorkerPoolOption {
}
}

// WithPoolJobTypes limits/filters the job types this worker will fetch from the DB.
func WithPoolJobTypes(jobTypes ...string) WorkerPoolOption {
return func(w *WorkerPool) {
w.jobTypes = jobTypes
}
}

// WithPoolID sets worker pool ID for easier identification in logs
func WithPoolID(id string) WorkerPoolOption {
return func(w *WorkerPool) {
Expand Down