queue is a queue and workflow library with pluggable backends and runtime extensions.
go get github.com/goforj/queueimport (
"context"
"fmt"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool(
queue.WithWorkers(2), // optional; default: runtime.NumCPU() (min 1)
)
type EmailPayload struct {
To string `json:"to"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
_ = m.Bind(&payload)
fmt.Println("send to", payload.To)
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{To: "user@example.com"}),
)
}SQL-backed queues (
sqlite,mysql,postgres) are durable and convenient, but they trade throughput for operational simplicity. They default to1worker, and increasing concurrency may require DB tuning (indexes, connection pool, lock contention). Prefer broker-backed drivers for higher-throughput workloads.Queue Admin status: the cross-driver admin contract is defined in core (
ListJobs,RetryJob,CancelJob,DeleteJob,ClearQueue,QueueHistory), but full queue admin operations are currently implemented only for Redis. Other drivers returnErrQueueAdminUnsupportedfor unsupported admin actions.
Use root constructors for in-process backends, and driver-module constructors for external backends. See the Driver Constructors API section below for full constructor shapes (New(...) and NewWithConfig(...)).
Driver backends live in separate packages so applications only import/link the optional backend dependencies they actually use (smaller builds, less dependency overhead, cleaner deploys).
package main
import (
"github.com/goforj/queue"
"github.com/goforj/queue/driver/mysqlqueue"
"github.com/goforj/queue/driver/natsqueue"
"github.com/goforj/queue/driver/postgresqueue"
"github.com/goforj/queue/driver/rabbitmqqueue"
"github.com/goforj/queue/driver/redisqueue"
"github.com/goforj/queue/driver/sqlitequeue"
"github.com/goforj/queue/driver/sqsqueue"
)
func main() {
queue.NewSync() // in-process sync
queue.NewWorkerpool() // in-process worker pool
queue.NewNull() // drop-only / disabled mode
sqlitequeue.New("file:queue.db?_busy_timeout=5000") // SQL durable queue (SQLite)
mysqlqueue.New("user:pass@tcp(127.0.0.1:3306)/app") // SQL durable queue (MySQL)
postgresqueue.New("postgres://user:pass@127.0.0.1:5432/app?sslmode=disable") // SQL durable queue (Postgres)
redisqueue.New("127.0.0.1:6379") // Redis/Asynq
natsqueue.New("nats://127.0.0.1:4222") // NATS
sqsqueue.New("us-east-1") // SQS
rabbitmqqueue.New("amqp://guest:guest@127.0.0.1:5672/") // RabbitMQ
}import (
"context"
"github.com/goforj/queue"
)
type EmailPayload struct {
ID int `json:"id"`
}
func main() {
q, _ := queue.NewWorkerpool()
q.Register("reports:generate", func(ctx context.Context, m queue.Message) error {
return nil
})
q.Register("reports:upload", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})
q.Register("users:notify_report_ready", func(ctx context.Context, m queue.Message) error {
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
chainID, _ := q.Chain(
// 1) generate report data
queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
// 2) upload report artifact after generate succeeds
queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
// 3) notify user only after upload succeeds
queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
).OnQueue("critical").Dispatch(context.Background())
_ = chainID
}Use Run(ctx) for long-lived workers: it starts processing, waits for shutdown signals, and performs graceful termination.
import (
"context"
"log"
"os/signal"
"syscall"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool()
// Register handlers before starting workers.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
return nil
})
// Create a context that is canceled on SIGINT/SIGTERM (Ctrl+C, container stop).
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Run starts workers, blocks until ctx is canceled, then gracefully shuts down.
if err := q.Run(ctx); err != nil {
log.Fatal(err)
}
}Job: Typed work unit for app handlers.
_, _ = q.Dispatch(
queue.NewJob("emails:send").Payload(EmailPayload{To: "user@example.com"}),
)Chain: Ordered workflow (A then B then C).
_, _ = q.Chain(
queue.NewJob("reports:generate"),
queue.NewJob("reports:upload"),
queue.NewJob("users:notify_report_ready"),
).Dispatch(context.Background())Batch: Parallel workflow with callbacks.
_, _ = q.Batch(
queue.NewJob("emails:send"),
queue.NewJob("sms:send"),
).Then(queue.NewJob("notifications:done")).Dispatch(context.Background())Middleware: Cross-cutting execution policy.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)Events: Lifecycle hooks and observability.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool, Observer: queue.NewStatsCollector()},
)Backends: Driver/runtime transport selection.
q, _ := queue.NewWorkerpool()
rq, _ := redisqueue.New("127.0.0.1:6379")
_, _ = q, rq// Define a struct for your job payload.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
// Payload can be bytes, structs, maps, or JSON-marshalable values.
// Default payload is empty.
Payload(EmailPayload{ID: 123, To: "user@example.com"}).
// OnQueue sets the queue name.
// Default is empty; broker-style drivers expect an explicit queue.
OnQueue("default").
// Timeout sets per-job execution timeout.
// Default is unset; some drivers may apply driver/runtime defaults.
Timeout(20 * time.Second).
// Retry sets max retries.
// Default is 0, which means one total attempt.
Retry(3).
// Backoff sets retry delay.
// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
Backoff(500 * time.Millisecond).
// Delay schedules first execution in the future.
// Default is 0 (run immediately).
Delay(2 * time.Second).
// UniqueFor deduplicates Type+Payload for a TTL window.
// Default is 0 (no dedupe).
UniqueFor(45 * time.Second)
// Dispatch the job to the queue.
_, _ = q.Dispatch(job)
// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})Run local + integration-backed benchmarks (requires Docker/testcontainers):
cd docs && GOWORK=off INTEGRATION_BACKEND=all GOCACHE=/tmp/queue-gocache go test -tags=benchrender ./bench -run '^TestRenderBenchmarks$'| Class | Driver | ns/op | ops/s | B/op | allocs/op |
|---|---|---|---|---|---|
| External | nats | 774 | 1291823 | 1258 | 13 |
| External | redis | 95295 | 10494 | 2113 | 33 |
| External | rabbitmq | 165780 | 6032 | 1882 | 57 |
| External | sqlite | 202380 | 4941 | 1931 | 47 |
| External | postgres | 1056731 | 946 | 3809 | 78 |
| External | sqs | 1873911 | 534 | 94784 | 1082 |
| External | mysql | 2286406 | 437 | 3303 | 62 |
| Local | null | 37 | 26673780 | 128 | 1 |
| Local | sync | 282 | 3539823 | 408 | 6 |
| Local | workerpool | 650 | 1538462 | 456 | 7 |
Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior to workflow job execution (logging, filtering, and error policy).
Common patterns:
- wrap handler execution (before/after logging, timing, tracing)
- skip jobs conditionally (maintenance mode, feature flags)
- convert matched errors into terminal failures (no retry)
var errValidation = errors.New("validation failed")
maintenanceMode := false
audit := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
log.Printf("start job=%s", m.JobType)
err := next(ctx, m)
log.Printf("done job=%s err=%v", m.JobType, err)
return err
})
skipMaintenance := queue.SkipWhen{
Predicate: func(context.Context, queue.Message) bool {
return maintenanceMode
},
}
fatalValidation := queue.FailOnError{
When: func(err error) bool {
return errors.Is(err, errValidation)
},
}
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)
_ = qUse queue.Observer implementations to capture normalized runtime events across drivers.
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ObserverFunc(func(event queue.Event) {
_ = event.Kind
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qStatsCollectorcounters are process-local and event-driven.- In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
- Prefer backend-native stats when available.
queue.SupportsNativeStats(q)indicates native driver snapshot support.queue.Snapshot(ctx, q, collector)merges native + collector where possible.
events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ChannelObserver{
Events: events,
DropIfFull: true,
},
queue.ObserverFunc(func(e queue.Event) {
_ = e
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qRunnable example: examples/observeall/main.go
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)
switch event.Kind {
case queue.EventEnqueueAccepted:
logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
case queue.EventEnqueueRejected:
logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
case queue.EventEnqueueDuplicate:
logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
case queue.EventEnqueueCanceled:
logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
case queue.EventProcessStarted:
logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
case queue.EventProcessSucceeded:
logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
case queue.EventProcessFailed:
logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
case queue.EventProcessRetried:
logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventProcessArchived:
logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventQueuePaused:
logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
case queue.EventQueueResumed:
logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
default:
logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
logger.Info("workflow event",
"kind", event.Kind,
"dispatch_id", event.DispatchID,
"job_id", event.JobID,
"chain_id", event.ChainID,
"batch_id", event.BatchID,
"job_type", event.JobType,
"queue", event.Queue,
"attempt", event.Attempt,
"duration", event.Duration,
"err", event.Err,
)
})
q, _ := queue.New(
queue.Config{
Driver: queue.DriverSync,
Observer: runtimeObserver,
},
queue.WithObserver(workflowObserver),
)
_ = q| Type | EventKind | Meaning |
|---|---|---|
| queue | enqueue_accepted | Job accepted by driver for enqueue. |
| queue | enqueue_rejected | Job enqueue failed. |
| queue | enqueue_duplicate | Duplicate job rejected due to uniqueness key. |
| queue | enqueue_canceled | Context cancellation prevented enqueue. |
| queue | process_started | Worker began processing job. |
| queue | process_succeeded | Handler returned success. |
| queue | process_failed | Handler returned error. |
| queue | process_retried | Driver scheduled retry attempt. |
| queue | process_archived | Job moved to terminal failure state. |
| queue | queue_paused | Queue was paused (driver supports pause). |
| queue | queue_resumed | Queue was resumed. |
| workflow | dispatch_started | Workflow runtime accepted a dispatch request and created a dispatch record. |
| workflow | dispatch_succeeded | Dispatch was successfully enqueued to the underlying queue runtime. |
| workflow | dispatch_failed | Dispatch failed before job execution could start. |
| workflow | job_started | A workflow job handler started execution. |
| workflow | job_succeeded | A workflow job handler completed successfully. |
| workflow | job_failed | A workflow job handler returned an error. |
| workflow | chain_started | A chain workflow was created and started. |
| workflow | chain_advanced | Chain progressed from one node to the next node. |
| workflow | chain_completed | Chain reached terminal success. |
| workflow | chain_failed | Chain reached terminal failure. |
| workflow | batch_started | A batch workflow was created and started. |
| workflow | batch_progressed | Batch state changed as jobs completed/failed. |
| workflow | batch_completed | Batch reached terminal success (or allowed-failure completion). |
| workflow | batch_failed | Batch reached terminal failure. |
| workflow | batch_cancelled | Batch was cancelled before normal completion. |
| workflow | callback_started | Chain/batch callback execution started. |
| workflow | callback_succeeded | Chain/batch callback completed successfully. |
| workflow | callback_failed | Chain/batch callback returned an error. |
Runnable examples live in the separate examples module (./examples).
They are not included when applications import github.com/goforj/queue, which keeps dependency graphs and build/link overhead smaller.
Queue admin APIs are part of the core contract so additional drivers can implement them over time.
At this time, full admin operations (ListJobs, RetryJob, CancelJob, DeleteJob, ClearQueue) are Redis-only.
Use queue.SupportsQueueAdmin(q) (or handle queue.ErrQueueAdminUnsupported) to gate admin workflows per runtime.
The API section below is autogenerated; do not edit between the markers.
CancelJob cancels a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.CancelJob(context.Background(), q, "job-id")
_ = errCancelJob cancels a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.CancelJob(context.Background(), "job-id")
_ = errClearQueue clears queue jobs when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.ClearQueue(context.Background(), q, "default")
_ = errClearQueue clears queue jobs via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.ClearQueue(context.Background(), "default")
_ = errDeleteJob deletes a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.DeleteJob(context.Background(), q, "default", "job-id")
_ = errDeleteJob deletes a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.DeleteJob(context.Background(), "default", "job-id")
_ = errHistory returns queue history points via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
points, err := q.History(context.Background(), "default", queue.QueueHistoryHour)
_ = errListJobs lists jobs for a queue and state when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.ListJobs(context.Background(), q, queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = errListJobs lists jobs via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = q.ListJobs(context.Background(), queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = errNormalize returns a safe options payload with defaults applied.
opts := queue.ListJobsOptions{Queue: "", State: "", Page: 0, PageSize: 1000}
normalized := opts.Normalize()
fmt.Println(normalized.Queue, normalized.State, normalized.Page, normalized.PageSize)
// Output: default pending 1 500QueueHistory returns queue history points when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.QueueHistory(context.Background(), q, "default", queue.QueueHistoryHour)
_ = errRetryJob retries (runs now) a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.RetryJob(context.Background(), q, "default", "job-id")
_ = errRetryJob retries (runs now) a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.RetryJob(context.Background(), "default", "job-id")
_ = errSinglePointHistory converts a snapshot into a single current-history point. This helper is intended for driver modules that do not expose historical buckets.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 12, Failed: 1},
},
}
points := queue.SinglePointHistory(snapshot, "default")
fmt.Println(len(points), points[0].Processed, points[0].Failed)
// Output: 1 12 1SupportsQueueAdmin reports whether queue admin operations are available.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
fmt.Println(queue.SupportsQueueAdmin(q))
// Output: trueTimelineHistoryFromSnapshot records queue counters and returns windowed points. This is intended for drivers that don't expose native multi-point history.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 5, Failed: 1},
},
}
points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
fmt.Println(len(points) >= 1)
// Output: trueNew creates the high-level Queue API based on Config.Driver.
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{ID: 1}).
OnQueue("default"),
)NewNull creates a Queue on the null backend.
q, err := queue.NewNull()
if err != nil {
return
}NewStatsCollector creates an event collector for queue counters.
collector := queue.NewStatsCollector()NewSync creates a Queue on the synchronous in-process backend.
q, err := queue.NewSync()
if err != nil {
return
}NewWorkerpool creates a Queue on the in-process workerpool backend.
q, err := queue.NewWorkerpool()
if err != nil {
return
}Backoff sets delay between retries.
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)Bind unmarshals job payload JSON into dst.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return
}
_ = payload.ToDelay defers execution by duration.
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)NewJob creates a job value with a required job type.
job := queue.NewJob("emails:send")OnQueue sets the target queue name.
job := queue.NewJob("emails:send").OnQueue("critical")Payload sets job payload from common value types.
Example: payload bytes
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))Example: payload struct
type Meta struct {
Nested bool `json:"nested"`
}
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
Meta Meta `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
Meta: Meta{Nested: true},
})Example: payload map
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
"id": 1,
"to": "user@example.com",
"meta": map[string]any{"nested": true},
})PayloadBytes returns a copy of job payload bytes.
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()PayloadJSON marshals payload as JSON.
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})Retry sets max retry attempts.
job := queue.NewJob("emails:send").Retry(4)Timeout sets per-job execution timeout.
job := queue.NewJob("emails:send").Timeout(10 * time.Second)UniqueFor enables uniqueness dedupe within the given TTL.
job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)Active returns active count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Active: 2},
},
}
fmt.Println(snapshot.Active("default"))
// Output: 2Archived returns archived count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Archived: 7},
},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7Failed returns failed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Failed: 2},
},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2MultiObserver fans out events to multiple observers.
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
queue.ChannelObserver{Events: events},
queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1Observe forwards an event to the configured channel.
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-chObserve handles a queue runtime event.
var observer queue.Observer
observer.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
})Observe calls the wrapped function.
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
logger.Info("queue event",
"kind", event.Kind,
"driver", event.Driver,
"queue", event.Queue,
"job_type", event.JobType,
"attempt", event.Attempt,
"max_retry", event.MaxRetry,
"duration", event.Duration,
"err", event.Err,
)
})
observer.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobType: "emails:send",
})Observe records an event and updates normalized counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})Pause pauses queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1Paused returns paused count for a queue.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventQueuePaused,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1Pending returns pending count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Pending: 3},
},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3Processed returns processed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 11},
},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11Queue returns queue counters for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1Queues returns sorted queue names present in the snapshot.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "critical",
Time: time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 criticalReady validates backend readiness for the provided queue runtime.
q, _ := queue.NewSync()
fmt.Println(queue.Ready(context.Background(), q) == nil)
// trueResume resumes queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0RetryCount returns retry count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Retry: 1},
},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1SafeObserve delivers an event to an observer and recovers observer panics.
This is an advanced helper intended for driver-module implementations.
Scheduled returns scheduled count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Scheduled: 4},
},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4Snapshot returns driver-native stats, falling back to collector data.
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: trueSnapshot returns a copy of collected counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessStarted,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Duration: 12 * time.Millisecond,
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: trueSupportsPause reports whether a queue runtime supports Pause/Resume.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: trueThroughput returns rolling throughput windows for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}Batch creates a batch builder for fan-out workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())Chain creates a chain builder for sequential workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
queue.NewJob("first"),
queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())Dispatch enqueues a high-level job using context.Background.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)DispatchCtx enqueues a high-level job using the provided context.
Driver reports the configured backend driver for the underlying queue runtime.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Driver())
// Output: syncFindBatch returns current batch state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindBatch(context.Background(), batchID)FindChain returns current chain state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindChain(context.Background(), chainID)Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Pause(context.Background(), "default")
}Prune deletes old workflow state records.
q, err := queue.NewSync()
if err != nil {
return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))Ready validates queue backend readiness for dispatch/worker operation.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Ready(context.Background()) == nil)
// trueRegister binds a handler for a high-level job type.
q, err := queue.NewSync()
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})Resume resumes consumption for a queue when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Resume(context.Background(), "default")
}Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
_ = q.Run(ctx)Shutdown drains workers and closes underlying resources.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())StartWorkers starts worker processing.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())Stats returns a normalized snapshot when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsNativeStats(q) {
_, _ = q.Stats(context.Background())
}WithClock overrides the workflow runtime clock.
q, err := queue.New(
queue.Config{Driver: queue.DriverSync},
queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
return
}WithMiddleware appends queue workflow middleware.
mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
return
}WithObserver installs a workflow lifecycle observer.
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
return
}WithStore overrides the workflow orchestration store.
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
return
}WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync).
q, err := queue.NewWorkerpool(
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}WithWorkers sets desired worker concurrency before StartWorkers.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)AssertCount fails when dispatch count is not expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)AssertDispatched fails when jobType was not dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")AssertDispatchedOn fails when jobType was not dispatched on queueName.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")AssertDispatchedTimes fails when jobType dispatch count does not match expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)AssertNotDispatched fails when jobType was dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")AssertNothingDispatched fails when any dispatch was recorded.
fake := queue.NewFake()
fake.AssertNothingDispatched(t)Dispatch records a typed job payload in-memory using the fake default queue.
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))DispatchCtx submits a typed job payload using the provided context.
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: trueDriver returns the active queue driver.
fake := queue.NewFake()
driver := fake.Driver()NewFake creates a queue fake that records dispatches and provides assertions.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:sendReady validates fake queue readiness.
fake := queue.NewFake()
fmt.Println(fake.Ready(context.Background()) == nil)
// trueRecords returns a copy of all dispatch records.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:sendRegister associates a handler with a job type.
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })Reset clears all recorded dispatches.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0Shutdown drains running work and releases resources.
fake := queue.NewFake()
err := fake.Shutdown(context.Background())StartWorkers starts worker execution.
fake := queue.NewFake()
err := fake.StartWorkers(context.Background())Workers sets desired worker concurrency before StartWorkers.
fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: trueNew creates a high-level Queue using the MySQL SQL backend.
q, err := mysqlqueue.New(
"user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit MySQL SQL driver config.
q, err := mysqlqueue.NewWithConfig(
mysqlqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the NATS backend.
q, err := natsqueue.New(
"nats://127.0.0.1:4222",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit NATS driver config.
q, err := natsqueue.NewWithConfig(
natsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "nats://127.0.0.1:4222", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the Postgres SQL backend.
q, err := postgresqueue.New(
"postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit Postgres SQL driver config.
q, err := postgresqueue.NewWithConfig(
postgresqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the RabbitMQ backend.
q, err := rabbitmqqueue.New(
"amqp://guest:guest@127.0.0.1:5672/",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit RabbitMQ driver config.
q, err := rabbitmqqueue.NewWithConfig(
rabbitmqqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "amqp://guest:guest@127.0.0.1:5672/", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the Redis backend.
q, err := redisqueue.New(
"127.0.0.1:6379",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit Redis driver config.
q, err := redisqueue.NewWithConfig(
redisqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Addr: "127.0.0.1:6379", // required
Password: "", // optional; default empty
DB: 0, // optional; default 0
ServerLogger: nil, // optional; default backend logger
ServerLogLevel: redisqueue.ServerLogLevelDefault, // optional
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the SQLite SQL backend.
q, err := sqlitequeue.New(
"file:queue.db?_busy_timeout=5000",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit SQLite SQL driver config.
q, err := sqlitequeue.NewWithConfig(
sqlitequeue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "file:queue.db?_busy_timeout=5000", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the SQS backend.
q, err := sqsqueue.New(
"us-east-1",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit SQS driver config.
q, err := sqsqueue.NewWithConfig(
sqsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Region: "us-east-1", // default if empty: "us-east-1"
Endpoint: "", // optional; set for LocalStack/custom endpoint
AccessKey: "", // optional; static credentials
SecretKey: "", // optional; static credentials
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}Examples in this section assume they are used inside tests and t is a *testing.T (or testing.TB).
AssertBatchCount fails if total recorded workflow batch count does not match n.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertBatchCount(t, 1)AssertBatched fails unless at least one recorded workflow batch matches predicate.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertBatched(t, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })AssertChained fails if no recorded workflow chain matches expected job type order.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertChained(t, []string{"a", "b"})AssertCount fails when total dispatch count is not expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
f.AssertCount(t, 2)AssertDispatched fails when jobType was not dispatched.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
f.AssertDispatched(t, "emails:send")AssertDispatchedOn fails when jobType was not dispatched on queueName.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
f.AssertDispatchedOn(t, "critical", "emails:send")AssertDispatchedTimes fails when jobType dispatch count does not match expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
f.AssertDispatchedTimes(t, "emails:send", 2)AssertNotDispatched fails when jobType was dispatched.
f := queuefake.New()
f.AssertNotDispatched(t, "emails:send")AssertNothingBatched fails if any workflow batch was recorded.
f := queuefake.New()
f.AssertNothingBatched(t)AssertNothingDispatched fails when any dispatch was recorded.
f := queuefake.New()
f.AssertNothingDispatched(t)AssertNothingWorkflowDispatched fails when any workflow dispatch was recorded.
f := queuefake.New()
f.AssertNothingWorkflowDispatched(t)AssertWorkflowDispatched fails when jobType was not workflow-dispatched.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatched(t, "a")AssertWorkflowDispatchedOn fails when jobType was not workflow-dispatched on queueName.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(nil)
f.AssertWorkflowDispatchedOn(t, "critical", "a")AssertWorkflowDispatchedTimes fails when workflow dispatch count for jobType does not match expected.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatchedTimes(t, "a", 2)AssertWorkflowNotDispatched fails when jobType was workflow-dispatched.
f := queuefake.New()
f.AssertWorkflowNotDispatched(t, "emails:send")Count returns the total number of recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
_ = f.Count()CountJob returns how many times a job type was dispatched.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = f.CountJob("emails:send")CountOn returns how many times a job type was dispatched on a queue.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
_ = f.CountOn("critical", "emails:send")New creates a fake queue harness backed by queue.NewFake().
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
f.AssertDispatched(t, "emails:send")
f.AssertCount(t, 1)Queue returns the queue fake to inject into code under test.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))Records returns a copy of recorded dispatches.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
records := f.Records()Reset clears recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
f.Reset()
f.AssertNothingDispatched(t)Workflow returns the workflow/orchestration fake for chain/batch assertions.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(
bus.NewJob("a", nil),
bus.NewJob("b", nil),
).Dispatch(context.Background())
f.AssertChained(t, []string{"a", "b"})Unit tests (root module):
go test ./...Integration tests (separate integration module):
go test -tags=integration ./integration/...Select specific backends with INTEGRATION_BACKEND (comma-separated), for example:
INTEGRATION_BACKEND=sqlite go test -tags=integration ./integration/...
INTEGRATION_BACKEND=redis,rabbitmq go test -tags=integration ./integration/... -count=1
INTEGRATION_BACKEND=all go test -tags=integration ./integration/... -count=1Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.
