Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions server/internal/background/outbox_gc.go
Comment thread
disintegrator marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ import (
const (
outboxGCScheduleID = "v1:outbox-gc-schedule"
outboxGCWorkflowID = outboxGCScheduleID + "/scheduled"
outboxGCInterval = 6 * time.Hour
outboxGCInterval = 10 * time.Minute
outboxGCRetentionPeriod = 7 * 24 * time.Hour
outboxGCBatchSize int32 = 100
outboxGCSleepInterval = time.Hour
)

func OutboxGCWorkflow(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
Comment thread
disintegrator marked this conversation as resolved.
InitialInterval: 5 * time.Second,
Expand All @@ -50,32 +49,48 @@ func OutboxGCWorkflow(ctx workflow.Context) error {

workflow.GetLogger(ctx).Info("outbox gc batch completed", "rows_deleted", rows)

if rows >= int64(outboxGCBatchSize) {
continue // batch was full — more rows likely remain
}

if err := workflow.Sleep(ctx, outboxGCSleepInterval); err != nil {
return err
if rows < int64(outboxGCBatchSize) {
return nil // all rows processed — schedule will re-run at next interval
}
}
}

func AddOutboxGCSchedule(ctx context.Context, temporalEnv *tenv.Environment) error {
_, err := temporalEnv.Client().ScheduleClient().Create(ctx, client.ScheduleOptions{
sc := temporalEnv.Client().ScheduleClient()

spec := client.ScheduleSpec{
Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}},
}
action := &client.ScheduleWorkflowAction{
ID: outboxGCWorkflowID,
Workflow: OutboxGCWorkflow,
TaskQueue: string(temporalEnv.Queue()),
WorkflowRunTimeout: 5 * time.Minute,
}

_, err := sc.Create(ctx, client.ScheduleOptions{
ID: outboxGCScheduleID,
Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP,
Spec: client.ScheduleSpec{
Intervals: []client.ScheduleIntervalSpec{{Every: outboxGCInterval}},
},
Action: &client.ScheduleWorkflowAction{
ID: outboxGCWorkflowID,
Workflow: OutboxGCWorkflow,
TaskQueue: string(temporalEnv.Queue()),
WorkflowRunTimeout: 15 * time.Minute,
},
Spec: spec,
Action: action,
})
if err != nil && !errors.Is(err, temporal.ErrScheduleAlreadyRunning) {
switch {
case errors.Is(err, temporal.ErrScheduleAlreadyRunning):
if err := sc.GetHandle(ctx, outboxGCScheduleID).Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
input.Description.Schedule.Spec = &spec
input.Description.Schedule.Action = action
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
TypedSearchAttributes: nil,
}, nil
},
}); err != nil {
return fmt.Errorf("update existing outbox gc schedule: %w", err)
}
case err != nil:
return fmt.Errorf("create outbox gc schedule: %w", err)
}

return nil
}
54 changes: 37 additions & 17 deletions server/internal/background/outbox_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,24 @@ func TestOutboxGCWorkflow_PartialBatch(t *testing.T) {
env.RegisterActivityWithOptions(
func(_ context.Context, cutoff time.Time, batchSize int32) (int64, error) {
gcCallCount++
if gcCallCount == 1 {
gotCutoff = cutoff
gotBatchSize = batchSize
// Partial batch — fewer rows than limit, workflow should sleep then loop again.
return int64(outboxGCBatchSize) - 1, nil
}
return 0, errStop
gotCutoff = cutoff
gotBatchSize = batchSize
// Partial batch — fewer rows than limit, workflow should return nil immediately.
return int64(outboxGCBatchSize) - 1, nil
},
activity.RegisterOptions{Name: "GCOutboxProcessedRows"},
)

env.ExecuteWorkflow(OutboxGCWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError()) // stopped by sentinel
require.Equal(t, 2, gcCallCount) // first partial batch, second errStop after sleep
require.NoError(t, env.GetWorkflowError())
require.Equal(t, 1, gcCallCount)
require.Equal(t, outboxGCBatchSize, gotBatchSize)
require.WithinDuration(t, time.Now().Add(-outboxGCRetentionPeriod), gotCutoff, time.Second)
}

func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) {
func TestOutboxGCWorkflow_FullBatchContinues(t *testing.T) {
t.Parallel()

var suite testsuite.WorkflowTestSuite
Expand All @@ -54,23 +51,46 @@ func TestOutboxGCWorkflow_FullBatchSkipsSleep(t *testing.T) {
gcCallCount++
switch gcCallCount {
case 1:
// Full batch — workflow must NOT sleep before next poll.
// Full batch — workflow must immediately re-poll without sleeping.
return int64(outboxGCBatchSize), nil
case 2:
// Partial batch — workflow sleeps, then loops.
return 0, nil
default:
return 0, errStop
// Partial batch — workflow returns nil.
return 0, nil
}
},
activity.RegisterOptions{Name: "GCOutboxProcessedRows"},
)

env.ExecuteWorkflow(OutboxGCWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
require.Equal(t, 2, gcCallCount) // full batch → immediate re-poll → partial batch → return nil
}

func TestOutboxGCWorkflow_MultipleFullBatches(t *testing.T) {
t.Parallel()

var suite testsuite.WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()

gcCallCount := 0
env.RegisterActivityWithOptions(
func(_ context.Context, _ time.Time, _ int32) (int64, error) {
gcCallCount++
if gcCallCount < 4 {
return int64(outboxGCBatchSize), nil
}
return 0, nil
},
activity.RegisterOptions{Name: "GCOutboxProcessedRows"},
)

env.ExecuteWorkflow(OutboxGCWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError()) // stopped by sentinel
require.Equal(t, 3, gcCallCount) // full batch → immediate re-poll → partial batch → sleeperrStop
require.NoError(t, env.GetWorkflowError())
require.Equal(t, 4, gcCallCount) // 3 full batches → immediate re-poll each time → partialreturn nil
}

func TestOutboxGCWorkflow_ActivityError(t *testing.T) {
Expand Down
Loading