From b1d5e7c00ba46bc9db686d66067852e5113fd502 Mon Sep 17 00:00:00 2001 From: Georges Haidar Date: Wed, 20 May 2026 13:48:30 +0100 Subject: [PATCH 1/4] fix: correct OutboxGCWorkflow timeout and tighten schedule config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The workflow was timing out on every run. After GCOutboxProcessedRows returned 0 eligible rows it called workflow.Sleep(1h), but the schedule set WorkflowRunTimeout to 15 minutes — guaranteeing the workflow was always killed before the sleep fired. Changes: - Remove the in-workflow sleep entirely. The workflow now returns nil once a partial batch confirms no further rows remain; the Temporal schedule handles re-triggering. - Tighten timeouts to match actual workload: schedule interval 6h→5min, activity StartToCloseTimeout 10min→1min, WorkflowRunTimeout 15min→2min. At current volume (~40K rows steady-state, ~4 rows/min arriving) each run deletes ~20 rows in a single activity call that completes in milliseconds. - Make AddOutboxGCSchedule upsert: on ErrScheduleAlreadyRunning it now calls handle.Update to push the new spec and action to the existing schedule, so config changes take effect on deploy without manual intervention via the Temporal UI. DB impact: the more frequent schedule means terminal rows are cleaned up within ~5 minutes of crossing the 7-day retention threshold instead of up to 6 hours late. Each run issues at most one batched DELETE of ≤100 rows, well within autovacuum's ability to reclaim dead tuples at this volume. Co-Authored-By: Claude Sonnet 4.6 --- server/internal/background/outbox_gc.go | 52 +++++++++++++++---------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/server/internal/background/outbox_gc.go b/server/internal/background/outbox_gc.go index ed5c30bcbf..43aab34d99 100644 --- a/server/internal/background/outbox_gc.go +++ b/server/internal/background/outbox_gc.go @@ -17,15 +17,14 @@ import ( const ( outboxGCScheduleID = "v1:outbox-gc-schedule" outboxGCWorkflowID = outboxGCScheduleID + "/scheduled" - outboxGCInterval = 6 * time.Hour + outboxGCInterval = 5 * time.Minute outboxGCRetentionPeriod = 7 * 24 * time.Hour outboxGCBatchSize int32 = 100 - outboxGCSleepInterval = time.Hour ) func OutboxGCWorkflow(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, + StartToCloseTimeout: time.Minute, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 3, InitialInterval: 5 * time.Second, @@ -50,32 +49,45 @@ func OutboxGCWorkflow(ctx workflow.Context) error { workflow.GetLogger(ctx).Info("outbox gc batch completed", "rows_deleted", rows) - if rows >= int64(outboxGCBatchSize) { - continue // batch was full — more rows likely remain - } - - if err := workflow.Sleep(ctx, outboxGCSleepInterval); err != nil { - return err + if rows < int64(outboxGCBatchSize) { + return nil // all rows processed — schedule will re-run at next interval } } } func AddOutboxGCSchedule(ctx context.Context, temporalEnv *tenv.Environment) error { - _, err := temporalEnv.Client().ScheduleClient().Create(ctx, client.ScheduleOptions{ + sc := temporalEnv.Client().ScheduleClient() + + spec := client.ScheduleSpec{ + Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}}, + } + action := &client.ScheduleWorkflowAction{ + ID: outboxGCWorkflowID, + Workflow: OutboxGCWorkflow, + TaskQueue: string(temporalEnv.Queue()), + WorkflowRunTimeout: 2 * time.Minute, + } + + _, err := sc.Create(ctx, client.ScheduleOptions{ ID: outboxGCScheduleID, Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP, - Spec: client.ScheduleSpec{ - Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}}, - }, - Action: &client.ScheduleWorkflowAction{ - ID: outboxGCWorkflowID, - Workflow: OutboxGCWorkflow, - TaskQueue: string(temporalEnv.Queue()), - WorkflowRunTimeout: 15 * time.Minute, - }, + Spec: spec, + Action: action, }) - if err != nil && !errors.Is(err, temporal.ErrScheduleAlreadyRunning) { + switch { + case errors.Is(err, temporal.ErrScheduleAlreadyRunning): + if err := sc.GetHandle(ctx, outboxGCScheduleID).Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + input.Description.Schedule.Spec = &spec + input.Description.Schedule.Action = action + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }); err != nil { + return fmt.Errorf("update existing outbox gc schedule: %w", err) + } + case err != nil: return fmt.Errorf("create outbox gc schedule: %w", err) } + return nil } From 38db72af640319c621fe3444459116b3e0f75f11 Mon Sep 17 00:00:00 2001 From: Georges Haidar Date: Wed, 20 May 2026 14:10:14 +0100 Subject: [PATCH 2/4] fix lint --- server/internal/background/outbox_gc.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/internal/background/outbox_gc.go b/server/internal/background/outbox_gc.go index 43aab34d99..ccef636296 100644 --- a/server/internal/background/outbox_gc.go +++ b/server/internal/background/outbox_gc.go @@ -80,7 +80,10 @@ func AddOutboxGCSchedule(ctx context.Context, temporalEnv *tenv.Environment) err DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { input.Description.Schedule.Spec = &spec input.Description.Schedule.Action = action - return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + TypedSearchAttributes: nil, + }, nil }, }); err != nil { return fmt.Errorf("update existing outbox gc schedule: %w", err) From 3380426f7fb1e9ff154e08caf57c66a578a5a784 Mon Sep 17 00:00:00 2001 From: Georges Haidar Date: Wed, 20 May 2026 14:19:22 +0100 Subject: [PATCH 3/4] tune timings --- server/internal/background/outbox_gc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/internal/background/outbox_gc.go b/server/internal/background/outbox_gc.go index ccef636296..9174ae2d37 100644 --- a/server/internal/background/outbox_gc.go +++ b/server/internal/background/outbox_gc.go @@ -17,7 +17,7 @@ import ( const ( outboxGCScheduleID = "v1:outbox-gc-schedule" outboxGCWorkflowID = outboxGCScheduleID + "/scheduled" - outboxGCInterval = 5 * time.Minute + outboxGCInterval = 10 * time.Minute outboxGCRetentionPeriod = 7 * 24 * time.Hour outboxGCBatchSize int32 = 100 ) @@ -65,7 +65,7 @@ func AddOutboxGCSchedule(ctx context.Context, temporalEnv *tenv.Environment) err ID: outboxGCWorkflowID, Workflow: OutboxGCWorkflow, TaskQueue: string(temporalEnv.Queue()), - WorkflowRunTimeout: 2 * time.Minute, + WorkflowRunTimeout: 5 * time.Minute, } _, err := sc.Create(ctx, client.ScheduleOptions{ From b6a4854e92075340bc374292586111753b87b00e Mon Sep 17 00:00:00 2001 From: Georges Haidar Date: Wed, 20 May 2026 15:32:10 +0100 Subject: [PATCH 4/4] update tests --- server/internal/background/outbox_gc_test.go | 54 ++++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/server/internal/background/outbox_gc_test.go b/server/internal/background/outbox_gc_test.go index 7ae1f93dd8..479367a3dc 100644 --- a/server/internal/background/outbox_gc_test.go +++ b/server/internal/background/outbox_gc_test.go @@ -22,13 +22,10 @@ func TestOutboxGCWorkflow_PartialBatch(t *testing.T) { env.RegisterActivityWithOptions( func(_ context.Context, cutoff time.Time, batchSize int32) (int64, error) { gcCallCount++ - if gcCallCount == 1 { - gotCutoff = cutoff - gotBatchSize = batchSize - // Partial batch — fewer rows than limit, workflow should sleep then loop again. - return int64(outboxGCBatchSize) - 1, nil - } - return 0, errStop + gotCutoff = cutoff + gotBatchSize = batchSize + // Partial batch — fewer rows than limit, workflow should return nil immediately. + return int64(outboxGCBatchSize) - 1, nil }, activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, ) @@ -36,13 +33,13 @@ func TestOutboxGCWorkflow_PartialBatch(t *testing.T) { env.ExecuteWorkflow(OutboxGCWorkflow) require.True(t, env.IsWorkflowCompleted()) - require.Error(t, env.GetWorkflowError()) // stopped by sentinel - require.Equal(t, 2, gcCallCount) // first partial batch, second errStop after sleep + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 1, gcCallCount) require.Equal(t, outboxGCBatchSize, gotBatchSize) require.WithinDuration(t, time.Now().Add(-outboxGCRetentionPeriod), gotCutoff, time.Second) } -func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { +func TestOutboxGCWorkflow_FullBatchContinues(t *testing.T) { t.Parallel() var suite testsuite.WorkflowTestSuite @@ -54,14 +51,37 @@ func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { gcCallCount++ switch gcCallCount { case 1: - // Full batch — workflow must NOT sleep before next poll. + // Full batch — workflow must immediately re-poll without sleeping. return int64(outboxGCBatchSize), nil - case 2: - // Partial batch — workflow sleeps, then loops. - return 0, nil default: - return 0, errStop + // Partial batch — workflow returns nil. + return 0, nil + } + }, + activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, + ) + + env.ExecuteWorkflow(OutboxGCWorkflow) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 2, gcCallCount) // full batch → immediate re-poll → partial batch → return nil +} + +func TestOutboxGCWorkflow_MultipleFullBatches(t *testing.T) { + t.Parallel() + + var suite testsuite.WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + gcCallCount := 0 + env.RegisterActivityWithOptions( + func(_ context.Context, _ time.Time, _ int32) (int64, error) { + gcCallCount++ + if gcCallCount < 4 { + return int64(outboxGCBatchSize), nil } + return 0, nil }, activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, ) @@ -69,8 +89,8 @@ func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { env.ExecuteWorkflow(OutboxGCWorkflow) require.True(t, env.IsWorkflowCompleted()) - require.Error(t, env.GetWorkflowError()) // stopped by sentinel - require.Equal(t, 3, gcCallCount) // full batch → immediate re-poll → partial batch → sleep → errStop + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 4, gcCallCount) // 3 full batches → immediate re-poll each time → partial → return nil } func TestOutboxGCWorkflow_ActivityError(t *testing.T) {