From 7eebbb30e911dc425943c9ab147da09c202509e8 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 18 Jan 2026 09:32:48 -0800 Subject: [PATCH] Have stuck detection account for worker-level timeouts as well as client-level As reported by #1125, the stuck job detection mechanism currently only considers client-level timeouts and without properly accounting for the possibility of worker-level overrides. Here, make sure to account for worker-level timeouts as well by hosting the job timeout calculation further up in the executor logic. Fixes #1125. --- CHANGELOG.md | 4 + internal/execution/execution.go | 14 --- internal/jobexecutor/job_executor.go | 117 ++++++++++++---------- internal/jobexecutor/job_executor_test.go | 50 ++++++++- 4 files changed, 115 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7fd401..46238aa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Stuck job detection now accounts for worker-level timeouts as well as client-level timeouts. [PR #1125](https://github.com/riverqueue/river/pull/1125). + ## [0.30.0] - 2026-01-11 ### Fixed diff --git a/internal/execution/execution.go b/internal/execution/execution.go index 63608cb1..f166337d 100644 --- a/internal/execution/execution.go +++ b/internal/execution/execution.go @@ -2,7 +2,6 @@ package execution import ( "context" - "time" "github.com/riverqueue/river/rivertype" ) @@ -13,19 +12,6 @@ type ContextKeyInsideTestWorker struct{} type Func func(ctx context.Context) error -// MaybeApplyTimeout returns a context that will be cancelled after the given -// timeout. If the timeout is <= 0, the context will not be timed out, but it -// will still have a cancel function returned. In either case the cancel -// function should be called after execution (in a defer). -func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { - // No timeout if a -1 was specified. - if timeout > 0 { - return context.WithTimeout(ctx, timeout) - } - - return context.WithCancel(ctx) -} - // MiddlewareChain chains together the given middleware functions, returning a // single function that applies them all in reverse order. func MiddlewareChain(globalMiddleware []rivertype.Middleware, workerMiddleware []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func { diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 4d386dd3..69e655ce 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -177,59 +177,6 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { metadataUpdates := make(map[string]any) ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates) - // Watches for jobs that may have become stuck. i.e. They've run longer than - // their job timeout (plus a small margin) and don't appear to be responding - // to context cancellation (unfortunately, quite an easy error to make in - // Go). - // - // Currently we don't do anything if we notice a job is stuck. Knowing about - // stuck jobs is just used for informational purposes in the producer in - // generating periodic stats. - if e.ClientJobTimeout > 0 { - // We add a WithoutCancel here so that this inner goroutine becomes - // immune to all context cancellations _except_ the one where it's - // cancelled because we leave JobExecutor.execute. - // - // This shadows the context outside the e.ClientJobTimeout > 0 check. - ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) - defer cancel() - - go func() { - const stuckThresholdDefault = 5 * time.Second - - select { - case <-ctx.Done(): - // context cancelled as we leave JobExecutor.execute - - case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): - e.ProducerCallbacks.Stuck() - - e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", - slog.Int64("job_id", e.JobRow.ID), - slog.String("kind", e.JobRow.Kind), - slog.Duration("timeout", e.ClientJobTimeout), - ) - - // context cancelled as we leave JobExecutor.execute - <-ctx.Done() - - // In case the executor ever becomes unstuck, inform the - // producer. However, if we got all the way here there's a good - // chance this will never happen (the worker is really stuck and - // will never return). - defer e.ProducerCallbacks.Unstuck() - - defer func() { - e.Logger.InfoContext(ctx, e.Name+": Job became unstuck", - slog.Duration("duration", time.Since(e.start)), - slog.Int64("job_id", e.JobRow.ID), - slog.String("kind", e.JobRow.Kind), - ) - }() - } - }() - } - defer func() { if recovery := recover(); recovery != nil { e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker", @@ -278,8 +225,15 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { } jobTimeout := cmp.Or(e.WorkUnit.Timeout(), e.ClientJobTimeout) - ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) - defer cancel() + + if jobTimeout > 0 { + var timeoutCancel context.CancelFunc + ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout) + defer timeoutCancel() + + watchStuckCancel := e.watchStuck(ctx) + defer watchStuckCancel() + } err := e.WorkUnit.Work(ctx) @@ -305,6 +259,59 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { return &jobExecutorResult{Err: executeFunc(ctx), MetadataUpdates: metadataUpdates} } +// Watches for jobs that may have become stuck. i.e. They've run longer than +// their job timeout (plus a small margin) and don't appear to be responding to +// context cancellation (unfortunately, quite an easy error to make in Go). +// +// Currently we don't do anything if we notice a job is stuck. Knowing about +// stuck jobs is just used for informational purposes in the producer in +// generating periodic stats. +func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { + // We add a WithoutCancel here so that this inner goroutine becomes + // immune to all context cancellations _except_ the one where it's + // cancelled because we leave JobExecutor.execute. + // + // This shadows the context outside the e.ClientJobTimeout > 0 check. + ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) + + go func() { + const stuckThresholdDefault = 5 * time.Second + + select { + case <-ctx.Done(): + // context cancelled as we leave JobExecutor.execute + + case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): + e.ProducerCallbacks.Stuck() + + e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", + slog.Int64("job_id", e.JobRow.ID), + slog.String("kind", e.JobRow.Kind), + slog.Duration("timeout", e.ClientJobTimeout), + ) + + // context cancelled as we leave JobExecutor.execute + <-ctx.Done() + + // In case the executor ever becomes unstuck, inform the + // producer. However, if we got all the way here there's a good + // chance this will never happen (the worker is really stuck and + // will never return). + defer e.ProducerCallbacks.Unstuck() + + defer func() { + e.Logger.InfoContext(ctx, e.Name+": Job became unstuck", + slog.Duration("duration", time.Since(e.start)), + slog.Int64("job_id", e.JobRow.ID), + slog.String("kind", e.JobRow.Kind), + ) + }() + } + }() + + return cancel +} + func (e *JobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorResult) bool { invokeAndHandlePanic := func(funcName string, errorHandler func() *ErrorHandlerResult) *ErrorHandlerResult { defer func() { diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 86e36d45..cbe0a45e 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -709,7 +709,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect } - t.Run("StuckDetectionActivates", func(t *testing.T) { + t.Run("StuckDetectionActivatesForClientTimeout", func(t *testing.T) { t.Parallel() executor, bundle := setup(t) @@ -748,6 +748,54 @@ func TestJobExecutor_Execute(t *testing.T) { riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived) }) + t.Run("StuckDetectionActivatesForWorkerTimeout", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + // Does not use configureStuckDetection to avoid ClientJobTimeout being + // set. Instead, customizableWorkUnit.timeout is set below instead. + executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect + + var ( + informProducerStuckReceived = make(chan struct{}) + informProducerUnstuckReceived = make(chan struct{}) + ) + executor.ProducerCallbacks.Stuck = func() { + t.Log("Job executor reported stuck") + close(informProducerStuckReceived) + } + executor.ProducerCallbacks.Unstuck = func() { + t.Log("Job executor reported unstuck (after being stuck)") + close(informProducerUnstuckReceived) + } + + workUnit := &customizableWorkUnit{ + timeout: 5 * time.Millisecond, + work: func() error { + riversharedtest.WaitOrTimeout(t, informProducerStuckReceived) + + select { + case <-informProducerUnstuckReceived: + require.FailNow(t, "Executor should not have reported unstuck immediately") + case <-time.After(10 * time.Millisecond): + t.Log("Job executor still stuck after wait (this is expected)") + } + + return nil + }, + } + + executor.WorkUnit = (&workUnitFactory{ + workUnit: workUnit, + }).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + _ = riversharedtest.WaitOrTimeout(t, bundle.updateCh) + + riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived) + }) + // Checks that even if a work context is cancelled immediately, stuck // detection still works as expected. t.Run("StuckDetectionIgnoresParentContextCancellation", func(t *testing.T) {