Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Stuck job detection now accounts for worker-level timeouts as well as client-level timeouts. [PR #1125](https://github.com/riverqueue/river/pull/1125).

## [0.30.0] - 2026-01-11

### Fixed
Expand Down
14 changes: 0 additions & 14 deletions internal/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package execution

import (
"context"
"time"

"github.com/riverqueue/river/rivertype"
)
Expand All @@ -13,19 +12,6 @@ type ContextKeyInsideTestWorker struct{}

type Func func(ctx context.Context) error

// MaybeApplyTimeout returns a context that will be cancelled after the given
// timeout. If the timeout is <= 0, the context will not be timed out, but it
// will still have a cancel function returned. In either case the cancel
// function should be called after execution (in a defer).
func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
// No timeout if a -1 was specified.
if timeout > 0 {
return context.WithTimeout(ctx, timeout)
}

return context.WithCancel(ctx)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was doing very little (it's the same as a if jobTimeout > 0 { check) and I needed the conditional for something else, so I just pulled it into execute.


// MiddlewareChain chains together the given middleware functions, returning a
// single function that applies them all in reverse order.
func MiddlewareChain(globalMiddleware []rivertype.Middleware, workerMiddleware []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func {
Expand Down
117 changes: 62 additions & 55 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,59 +177,6 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
metadataUpdates := make(map[string]any)
ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates)

// Watches for jobs that may have become stuck. i.e. They've run longer than
// their job timeout (plus a small margin) and don't appear to be responding
// to context cancellation (unfortunately, quite an easy error to make in
// Go).
//
// Currently we don't do anything if we notice a job is stuck. Knowing about
// stuck jobs is just used for informational purposes in the producer in
// generating periodic stats.
if e.ClientJobTimeout > 0 {
// We add a WithoutCancel here so that this inner goroutine becomes
// immune to all context cancellations _except_ the one where it's
// cancelled because we leave JobExecutor.execute.
//
// This shadows the context outside the e.ClientJobTimeout > 0 check.
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
defer cancel()

go func() {
const stuckThresholdDefault = 5 * time.Second

select {
case <-ctx.Done():
// context cancelled as we leave JobExecutor.execute

case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
e.ProducerCallbacks.Stuck()

e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
slog.Duration("timeout", e.ClientJobTimeout),
)

// context cancelled as we leave JobExecutor.execute
<-ctx.Done()

// In case the executor ever becomes unstuck, inform the
// producer. However, if we got all the way here there's a good
// chance this will never happen (the worker is really stuck and
// will never return).
defer e.ProducerCallbacks.Unstuck()

defer func() {
e.Logger.InfoContext(ctx, e.Name+": Job became unstuck",
slog.Duration("duration", time.Since(e.start)),
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
)
}()
}
}()
}

defer func() {
if recovery := recover(); recovery != nil {
e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker",
Expand Down Expand Up @@ -278,8 +225,15 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
}

jobTimeout := cmp.Or(e.WorkUnit.Timeout(), e.ClientJobTimeout)
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
defer cancel()

if jobTimeout > 0 {
var timeoutCancel context.CancelFunc
ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout)
defer timeoutCancel()

watchStuckCancel := e.watchStuck(ctx)
defer watchStuckCancel()
Comment on lines +234 to +235
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is so much more readable than the large inline block before. I don't feel like I lose the context and flow of the surrounding code, while the fairly complex stuck job watching logic is nicely contained in a helper. 👏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah +1. Should have done this from the beginning.

}

err := e.WorkUnit.Work(ctx)

Expand All @@ -305,6 +259,59 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
return &jobExecutorResult{Err: executeFunc(ctx), MetadataUpdates: metadataUpdates}
}

// Watches for jobs that may have become stuck. i.e. They've run longer than
// their job timeout (plus a small margin) and don't appear to be responding to
// context cancellation (unfortunately, quite an easy error to make in Go).
//
// Currently we don't do anything if we notice a job is stuck. Knowing about
// stuck jobs is just used for informational purposes in the producer in
// generating periodic stats.
func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc {
// We add a WithoutCancel here so that this inner goroutine becomes
// immune to all context cancellations _except_ the one where it's
// cancelled because we leave JobExecutor.execute.
//
// This shadows the context outside the e.ClientJobTimeout > 0 check.
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))

go func() {
const stuckThresholdDefault = 5 * time.Second

select {
case <-ctx.Done():
// context cancelled as we leave JobExecutor.execute

case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
e.ProducerCallbacks.Stuck()

e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
slog.Duration("timeout", e.ClientJobTimeout),
)

// context cancelled as we leave JobExecutor.execute
<-ctx.Done()

// In case the executor ever becomes unstuck, inform the
// producer. However, if we got all the way here there's a good
// chance this will never happen (the worker is really stuck and
// will never return).
defer e.ProducerCallbacks.Unstuck()

defer func() {
e.Logger.InfoContext(ctx, e.Name+": Job became unstuck",
slog.Duration("duration", time.Since(e.start)),
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
)
}()
}
}()

return cancel
}

func (e *JobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorResult) bool {
invokeAndHandlePanic := func(funcName string, errorHandler func() *ErrorHandlerResult) *ErrorHandlerResult {
defer func() {
Expand Down
50 changes: 49 additions & 1 deletion internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func TestJobExecutor_Execute(t *testing.T) {
executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect
}

t.Run("StuckDetectionActivates", func(t *testing.T) {
t.Run("StuckDetectionActivatesForClientTimeout", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)
Expand Down Expand Up @@ -748,6 +748,54 @@ func TestJobExecutor_Execute(t *testing.T) {
riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
})

t.Run("StuckDetectionActivatesForWorkerTimeout", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

// Does not use configureStuckDetection to avoid ClientJobTimeout being
// set. Instead, customizableWorkUnit.timeout is set below instead.
executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect

var (
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
executor.ProducerCallbacks.Unstuck = func() {
t.Log("Job executor reported unstuck (after being stuck)")
close(informProducerUnstuckReceived)
}

workUnit := &customizableWorkUnit{
timeout: 5 * time.Millisecond,
work: func() error {
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)

select {
case <-informProducerUnstuckReceived:
require.FailNow(t, "Executor should not have reported unstuck immediately")
case <-time.After(10 * time.Millisecond):
t.Log("Job executor still stuck after wait (this is expected)")
}

return nil
},
}

executor.WorkUnit = (&workUnitFactory{
workUnit: workUnit,
}).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)

riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
})

// Checks that even if a work context is cancelled immediately, stuck
// detection still works as expected.
t.Run("StuckDetectionIgnoresParentContextCancellation", func(t *testing.T) {
Expand Down
Loading