diff --git a/server/internal/background/outbox_gc.go b/server/internal/background/outbox_gc.go index ed5c30bcbf..9174ae2d37 100644 --- a/server/internal/background/outbox_gc.go +++ b/server/internal/background/outbox_gc.go @@ -17,15 +17,14 @@ import ( const ( outboxGCScheduleID = "v1:outbox-gc-schedule" outboxGCWorkflowID = outboxGCScheduleID + "/scheduled" - outboxGCInterval = 6 * time.Hour + outboxGCInterval = 10 * time.Minute outboxGCRetentionPeriod = 7 * 24 * time.Hour outboxGCBatchSize int32 = 100 - outboxGCSleepInterval = time.Hour ) func OutboxGCWorkflow(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, + StartToCloseTimeout: time.Minute, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 3, InitialInterval: 5 * time.Second, @@ -50,32 +49,48 @@ func OutboxGCWorkflow(ctx workflow.Context) error { workflow.GetLogger(ctx).Info("outbox gc batch completed", "rows_deleted", rows) - if rows >= int64(outboxGCBatchSize) { - continue // batch was full — more rows likely remain - } - - if err := workflow.Sleep(ctx, outboxGCSleepInterval); err != nil { - return err + if rows < int64(outboxGCBatchSize) { + return nil // all rows processed — schedule will re-run at next interval } } } func AddOutboxGCSchedule(ctx context.Context, temporalEnv *tenv.Environment) error { - _, err := temporalEnv.Client().ScheduleClient().Create(ctx, client.ScheduleOptions{ + sc := temporalEnv.Client().ScheduleClient() + + spec := client.ScheduleSpec{ + Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}}, + } + action := &client.ScheduleWorkflowAction{ + ID: outboxGCWorkflowID, + Workflow: OutboxGCWorkflow, + TaskQueue: string(temporalEnv.Queue()), + WorkflowRunTimeout: 5 * time.Minute, + } + + _, err := sc.Create(ctx, client.ScheduleOptions{ ID: outboxGCScheduleID, Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP, - Spec: client.ScheduleSpec{ - Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}}, - }, - Action: &client.ScheduleWorkflowAction{ - ID: outboxGCWorkflowID, - Workflow: OutboxGCWorkflow, - TaskQueue: string(temporalEnv.Queue()), - WorkflowRunTimeout: 15 * time.Minute, - }, + Spec: spec, + Action: action, }) - if err != nil && !errors.Is(err, temporal.ErrScheduleAlreadyRunning) { + switch { + case errors.Is(err, temporal.ErrScheduleAlreadyRunning): + if err := sc.GetHandle(ctx, outboxGCScheduleID).Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + input.Description.Schedule.Spec = &spec + input.Description.Schedule.Action = action + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + TypedSearchAttributes: nil, + }, nil + }, + }); err != nil { + return fmt.Errorf("update existing outbox gc schedule: %w", err) + } + case err != nil: return fmt.Errorf("create outbox gc schedule: %w", err) } + return nil } diff --git a/server/internal/background/outbox_gc_test.go b/server/internal/background/outbox_gc_test.go index 7ae1f93dd8..479367a3dc 100644 --- a/server/internal/background/outbox_gc_test.go +++ b/server/internal/background/outbox_gc_test.go @@ -22,13 +22,10 @@ func TestOutboxGCWorkflow_PartialBatch(t *testing.T) { env.RegisterActivityWithOptions( func(_ context.Context, cutoff time.Time, batchSize int32) (int64, error) { gcCallCount++ - if gcCallCount == 1 { - gotCutoff = cutoff - gotBatchSize = batchSize - // Partial batch — fewer rows than limit, workflow should sleep then loop again. - return int64(outboxGCBatchSize) - 1, nil - } - return 0, errStop + gotCutoff = cutoff + gotBatchSize = batchSize + // Partial batch — fewer rows than limit, workflow should return nil immediately. + return int64(outboxGCBatchSize) - 1, nil }, activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, ) @@ -36,13 +33,13 @@ func TestOutboxGCWorkflow_PartialBatch(t *testing.T) { env.ExecuteWorkflow(OutboxGCWorkflow) require.True(t, env.IsWorkflowCompleted()) - require.Error(t, env.GetWorkflowError()) // stopped by sentinel - require.Equal(t, 2, gcCallCount) // first partial batch, second errStop after sleep + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 1, gcCallCount) require.Equal(t, outboxGCBatchSize, gotBatchSize) require.WithinDuration(t, time.Now().Add(-outboxGCRetentionPeriod), gotCutoff, time.Second) } -func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { +func TestOutboxGCWorkflow_FullBatchContinues(t *testing.T) { t.Parallel() var suite testsuite.WorkflowTestSuite @@ -54,14 +51,37 @@ func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { gcCallCount++ switch gcCallCount { case 1: - // Full batch — workflow must NOT sleep before next poll. + // Full batch — workflow must immediately re-poll without sleeping. return int64(outboxGCBatchSize), nil - case 2: - // Partial batch — workflow sleeps, then loops. - return 0, nil default: - return 0, errStop + // Partial batch — workflow returns nil. + return 0, nil + } + }, + activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, + ) + + env.ExecuteWorkflow(OutboxGCWorkflow) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 2, gcCallCount) // full batch → immediate re-poll → partial batch → return nil +} + +func TestOutboxGCWorkflow_MultipleFullBatches(t *testing.T) { + t.Parallel() + + var suite testsuite.WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + gcCallCount := 0 + env.RegisterActivityWithOptions( + func(_ context.Context, _ time.Time, _ int32) (int64, error) { + gcCallCount++ + if gcCallCount < 4 { + return int64(outboxGCBatchSize), nil } + return 0, nil }, activity.RegisterOptions{Name: "GCOutboxProcessedRows"}, ) @@ -69,8 +89,8 @@ func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) { env.ExecuteWorkflow(OutboxGCWorkflow) require.True(t, env.IsWorkflowCompleted()) - require.Error(t, env.GetWorkflowError()) // stopped by sentinel - require.Equal(t, 3, gcCallCount) // full batch → immediate re-poll → partial batch → sleep → errStop + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, 4, gcCallCount) // 3 full batches → immediate re-poll each time → partial → return nil } func TestOutboxGCWorkflow_ActivityError(t *testing.T) {