-
Notifications
You must be signed in to change notification settings - Fork 135
Have stuck detection account for worker-level timeouts as well as client-level #1126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -177,59 +177,6 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { | |
| metadataUpdates := make(map[string]any) | ||
| ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates) | ||
|
|
||
| // Watches for jobs that may have become stuck. i.e. They've run longer than | ||
| // their job timeout (plus a small margin) and don't appear to be responding | ||
| // to context cancellation (unfortunately, quite an easy error to make in | ||
| // Go). | ||
| // | ||
| // Currently we don't do anything if we notice a job is stuck. Knowing about | ||
| // stuck jobs is just used for informational purposes in the producer in | ||
| // generating periodic stats. | ||
| if e.ClientJobTimeout > 0 { | ||
| // We add a WithoutCancel here so that this inner goroutine becomes | ||
| // immune to all context cancellations _except_ the one where it's | ||
| // cancelled because we leave JobExecutor.execute. | ||
| // | ||
| // This shadows the context outside the e.ClientJobTimeout > 0 check. | ||
| ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) | ||
| defer cancel() | ||
|
|
||
| go func() { | ||
| const stuckThresholdDefault = 5 * time.Second | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| // context cancelled as we leave JobExecutor.execute | ||
|
|
||
| case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): | ||
| e.ProducerCallbacks.Stuck() | ||
|
|
||
| e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", | ||
| slog.Int64("job_id", e.JobRow.ID), | ||
| slog.String("kind", e.JobRow.Kind), | ||
| slog.Duration("timeout", e.ClientJobTimeout), | ||
| ) | ||
|
|
||
| // context cancelled as we leave JobExecutor.execute | ||
| <-ctx.Done() | ||
|
|
||
| // In case the executor ever becomes unstuck, inform the | ||
| // producer. However, if we got all the way here there's a good | ||
| // chance this will never happen (the worker is really stuck and | ||
| // will never return). | ||
| defer e.ProducerCallbacks.Unstuck() | ||
|
|
||
| defer func() { | ||
| e.Logger.InfoContext(ctx, e.Name+": Job became unstuck", | ||
| slog.Duration("duration", time.Since(e.start)), | ||
| slog.Int64("job_id", e.JobRow.ID), | ||
| slog.String("kind", e.JobRow.Kind), | ||
| ) | ||
| }() | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| defer func() { | ||
| if recovery := recover(); recovery != nil { | ||
| e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker", | ||
|
|
@@ -278,8 +225,15 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { | |
| } | ||
|
|
||
| jobTimeout := cmp.Or(e.WorkUnit.Timeout(), e.ClientJobTimeout) | ||
| ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) | ||
| defer cancel() | ||
|
|
||
| if jobTimeout > 0 { | ||
| var timeoutCancel context.CancelFunc | ||
| ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout) | ||
| defer timeoutCancel() | ||
|
|
||
| watchStuckCancel := e.watchStuck(ctx) | ||
| defer watchStuckCancel() | ||
|
Comment on lines
+234
to
+235
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this is so much more readable than the large inline block before. I don't feel like I lose the context and flow of the surrounding code, while the fairly complex stuck job watching logic is nicely contained in a helper. 👏
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah +1. Should have done this from the beginning. |
||
| } | ||
|
|
||
| err := e.WorkUnit.Work(ctx) | ||
|
|
||
|
|
@@ -305,6 +259,59 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { | |
| return &jobExecutorResult{Err: executeFunc(ctx), MetadataUpdates: metadataUpdates} | ||
| } | ||
|
|
||
| // Watches for jobs that may have become stuck. i.e. They've run longer than | ||
| // their job timeout (plus a small margin) and don't appear to be responding to | ||
| // context cancellation (unfortunately, quite an easy error to make in Go). | ||
| // | ||
| // Currently we don't do anything if we notice a job is stuck. Knowing about | ||
| // stuck jobs is just used for informational purposes in the producer in | ||
| // generating periodic stats. | ||
| func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { | ||
| // We add a WithoutCancel here so that this inner goroutine becomes | ||
| // immune to all context cancellations _except_ the one where it's | ||
| // cancelled because we leave JobExecutor.execute. | ||
| // | ||
| // This shadows the context outside the e.ClientJobTimeout > 0 check. | ||
| ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) | ||
|
|
||
| go func() { | ||
| const stuckThresholdDefault = 5 * time.Second | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| // context cancelled as we leave JobExecutor.execute | ||
|
|
||
| case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): | ||
| e.ProducerCallbacks.Stuck() | ||
|
|
||
| e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", | ||
| slog.Int64("job_id", e.JobRow.ID), | ||
| slog.String("kind", e.JobRow.Kind), | ||
| slog.Duration("timeout", e.ClientJobTimeout), | ||
| ) | ||
|
|
||
| // context cancelled as we leave JobExecutor.execute | ||
| <-ctx.Done() | ||
|
|
||
| // In case the executor ever becomes unstuck, inform the | ||
| // producer. However, if we got all the way here there's a good | ||
| // chance this will never happen (the worker is really stuck and | ||
| // will never return). | ||
| defer e.ProducerCallbacks.Unstuck() | ||
|
|
||
| defer func() { | ||
| e.Logger.InfoContext(ctx, e.Name+": Job became unstuck", | ||
| slog.Duration("duration", time.Since(e.start)), | ||
| slog.Int64("job_id", e.JobRow.ID), | ||
| slog.String("kind", e.JobRow.Kind), | ||
| ) | ||
| }() | ||
| } | ||
| }() | ||
|
|
||
| return cancel | ||
| } | ||
|
|
||
| func (e *JobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorResult) bool { | ||
| invokeAndHandlePanic := func(funcName string, errorHandler func() *ErrorHandlerResult) *ErrorHandlerResult { | ||
| defer func() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was doing very little (it's the same as a
if jobTimeout > 0 {check) and I needed the conditional for something else, so I just pulled it intoexecute.