From ac7b7a97cfeed53ba95b462a1f38582ba487981b Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 14:24:12 -0700 Subject: [PATCH 01/13] perf(orchestrator): add memfile dedup density threshold --- .../orchestrator/pkg/sandbox/block/cache.go | 51 +++++++++++++++---- .../pkg/sandbox/block/cache_test.go | 45 +++++++++++++--- .../orchestrator/pkg/sandbox/block/memfd.go | 8 ++- .../pkg/sandbox/block/memfd_test.go | 2 +- .../orchestrator/pkg/sandbox/fc/memory.go | 5 +- packages/orchestrator/pkg/sandbox/sandbox.go | 6 ++- packages/shared/pkg/featureflags/flags.go | 12 +++-- 7 files changed, 103 insertions(+), 26 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 5563c7e30e..0bd1bd753d 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -207,9 +207,11 @@ func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMet } type dedupPlan struct { - pageDirty *roaring.Bitmap - pageEmpty *roaring.Bitmap - exportedSize int64 + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 } // dedupCompare classifies each dirty page against base into pageDirty or @@ -222,10 +224,13 @@ func dedupCompare( dirty *roaring.Bitmap, blockSize int64, bestEffort bool, + minChangedPagesPerBlock int, ) (*dedupPlan, error) { pageDirty := roaring.New() pageEmpty := roaring.New() var exportedSize int64 + var promotedBlocks int64 + var promotedPages int64 baseHeader := base.Header() peeker, _ := base.(CachePeeker) @@ -244,13 +249,16 @@ func dedupCompare( return nil, err } + blockDirty := roaring.New() + blockEmpty := roaring.New() + for i := int64(0); i < blockSize; i += header.PageSize { srcPage := srcBuf[i : i+header.PageSize] pageIdx := uint32((absOff + i) / header.PageSize) pageOff := absOff + i if header.IsZero(srcPage) { - pageEmpty.Add(pageIdx) + blockEmpty.Add(pageIdx) continue } @@ -267,7 +275,7 @@ func dedupCompare( skip = true } if skip { - pageDirty.Add(pageIdx) + blockDirty.Add(pageIdx) continue } @@ -280,12 +288,32 @@ func dedupCompare( continue } - pageDirty.Add(pageIdx) + blockDirty.Add(pageIdx) + } + + changedPages := int(blockDirty.GetCardinality()) + if minChangedPagesPerBlock > 0 && changedPages > 0 && changedPages <= minChangedPagesPerBlock { + startPage := uint64(absOff / header.PageSize) + endPage := startPage + uint64(blockSize/header.PageSize) + pageDirty.AddRange(startPage, endPage) + promotedBlocks++ + promotedPages += int64(endPage - startPage) + + continue } + + pageDirty.Or(blockDirty) + pageEmpty.Or(blockEmpty) } } - return &dedupPlan{pageDirty: pageDirty, pageEmpty: pageEmpty, exportedSize: exportedSize}, nil + return &dedupPlan{ + pageDirty: pageDirty, + pageEmpty: pageEmpty, + exportedSize: exportedSize, + promotedBlocks: promotedBlocks, + promotedPages: promotedPages, + }, nil } // dedupDrain writes pageDirty pages from src to outPath packed at PageSize. @@ -334,7 +362,7 @@ func dedupDrain( return cache, nil } -func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages int64, compareDur, writeDur time.Duration) { +func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages, promotedBlocks, promotedPages int64, compareDur, writeDur time.Duration) { dedupedPages := totalPages - uniquePages - emptyPages ratio := 0.0 if totalPages > 0 { @@ -345,6 +373,8 @@ func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages i attribute.Int64("dedup.deduped_pages", dedupedPages), attribute.Int64("dedup.unique_pages", uniquePages), attribute.Int64("dedup.empty_pages", emptyPages), + attribute.Int64("dedup.promoted_blocks", promotedBlocks), + attribute.Int64("dedup.promoted_pages", promotedPages), attribute.Float64("dedup.ratio", ratio), attribute.Int64("dedup.compare_ms", compareDur.Milliseconds()), attribute.Int64("dedup.write_ms", writeDur.Milliseconds()), @@ -402,6 +432,7 @@ func (c *Cache) Dedup( outPath string, bestEffort bool, directIO bool, + minChangedPagesPerBlock int, ) (*Cache, *header.DiffMetadata, error) { ctx, span := tracer.Start(ctx, "dedup-pages") defer span.End() @@ -425,7 +456,7 @@ func (c *Cache) Dedup( } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, minChangedPagesPerBlock) if err != nil { return nil, nil, err } @@ -440,6 +471,8 @@ func (c *Cache) Dedup( plan.exportedSize/header.PageSize, int64(plan.pageDirty.GetCardinality()), int64(plan.pageEmpty.GetCardinality()), + plan.promotedBlocks, + plan.promotedPages, compareDur, time.Since(writeStart), ) diff --git a/packages/orchestrator/pkg/sandbox/block/cache_test.go b/packages/orchestrator/pkg/sandbox/block/cache_test.go index e72e5a35b7..62b631368d 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache_test.go +++ b/packages/orchestrator/pkg/sandbox/block/cache_test.go @@ -786,7 +786,7 @@ func runDedup(t *testing.T, srcMem, baseMem []byte, dirty *roaring.Bitmap, block t.Helper() src := buildPackedSrcCache(t, srcMem, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -908,7 +908,7 @@ func TestCacheDedup_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) cancel() - _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false) + _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) require.ErrorIs(t, err, context.Canceled) } @@ -932,6 +932,7 @@ func TestCacheDedup_OriginalMemfileReadError(t *testing.T) { t.TempDir()+"/dedup", false, false, + 0, ) require.ErrorIs(t, err, sentinel) } @@ -965,7 +966,7 @@ func TestCacheDedup_EmptyParentMappingSkipsBaseReadAt(t *testing.T) { junk := bytes.Repeat([]byte{0xFF}, int(size)) base := &fakeOriginalDevice{data: junk, hdr: hdr} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1058,7 +1059,7 @@ func TestCacheDedup_BestEffortUncachedSkipsBaseReadAt(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: false} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1090,7 +1091,7 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: true} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1098,6 +1099,38 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { require.EqualValues(t, 0, meta.Empty.GetCardinality()) } +func TestCacheDedup_MinChangedPagesPromotesSparseBlock(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + size := blockSize + + baseData := make([]byte, size) + _, err := rand.Read(baseData) + require.NoError(t, err) + srcData := make([]byte, size) + copy(srcData, baseData) + srcData[pageSize] ^= 0xFF + + dirty := fullDirty(size, blockSize) + src := buildPackedSrcCache(t, srcData, dirty, blockSize) + + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 1) + require.NoError(t, err) + t.Cleanup(func() { _ = cache.Close() }) + + require.EqualValues(t, blockSize/pageSize, meta.Dirty.GetCardinality()) + require.Zero(t, meta.Empty.GetCardinality()) + + for i := range blockSize / pageSize { + got := make([]byte, pageSize) + _, err := cache.ReadAt(got, i*pageSize) + require.NoError(t, err) + require.Equal(t, srcData[i*pageSize:(i+1)*pageSize], got, "promoted page %d", i) + } +} + type perPagePeeker struct { fakeOriginalDevice @@ -1132,7 +1165,7 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { cachedPages: map[uint32]bool{0: true, 1: false, 2: true, 3: false}, } - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go index dce7b371da..4c562697a0 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd.go @@ -246,6 +246,7 @@ func NewCacheFromMemfdDeduped( dirty *roaring.Bitmap, bestEffort bool, directIO bool, + minChangedPagesPerBlock int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (*DedupedMemfdCache, error) { @@ -258,7 +259,7 @@ func NewCacheFromMemfdDeduped( cancel: cancel, done: utils.NewSetOnce[*Cache](), } - go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, inputEmpty, metaOut) + go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, minChangedPagesPerBlock, inputEmpty, metaOut) return d, nil } @@ -270,6 +271,7 @@ func (d *DedupedMemfdCache) runDedup( memfd *Memfd, dirty *roaring.Bitmap, bestEffort, directIO bool, + minChangedPagesPerBlock int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) { @@ -279,7 +281,7 @@ func (d *DedupedMemfdCache) runDedup( src := func(absOff int64) ([]byte, error) { return memfd.Slice(absOff, blockSize) } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, minChangedPagesPerBlock) compareDur := time.Since(compareStart) if err != nil { logSetOnceErr(ctx, "dedup metaOut", metaOut.SetError(err)) @@ -308,6 +310,8 @@ func (d *DedupedMemfdCache) runDedup( plan.exportedSize/header.PageSize, int64(plan.pageDirty.GetCardinality()), int64(plan.pageEmpty.GetCardinality()), + plan.promotedBlocks, + plan.promotedPages, compareDur, writeDur, ) logSetOnceErr(ctx, "dedup done", d.done.SetResult(cache, err)) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go index d11a9c2d5b..3e8317b75e 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go @@ -227,7 +227,7 @@ func TestNewCacheFromMemfdDeduped_DetachesCompareAndDrain(t *testing.T) { metaOut := utils.NewSetOnce[*header.DiffMetadata]() cache, err := NewCacheFromMemfdDeduped( ctx, &fakeOriginalDevice{data: baseData}, pageSize, cachePath, memfd, dirty, false, false, - nil, metaOut, + 0, nil, metaOut, ) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/fc/memory.go b/packages/orchestrator/pkg/sandbox/fc/memory.go index 6b2b25df1d..6c41d5bd00 100644 --- a/packages/orchestrator/pkg/sandbox/fc/memory.go +++ b/packages/orchestrator/pkg/sandbox/fc/memory.go @@ -80,6 +80,7 @@ func (p *Process) ExportMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, + dedupMinChangedPagesPerBlock int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (_ block.DiffSource, e error) { @@ -98,7 +99,7 @@ func (p *Process) ExportMemory( if memfd != nil { if originalMemfile != nil { return block.NewCacheFromMemfdDeduped(ctx, originalMemfile, blockSize, cachePath, memfd, include, - dedupBestEffort, dedupDirectIO, inputEmpty, metaOut) + dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock, inputEmpty, metaOut) } var ( src block.DiffSource @@ -131,7 +132,7 @@ func (p *Process) ExportMemory( return cache, nil } // .dedup suffix avoids clobbering the source mmap during truncate. - dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO) + dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock) if err != nil { return nil, fmt.Errorf("dedup memfile diff: %w", errors.Join(err, cache.Close())) } diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index f57fa26bd4..ad43c52b08 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -1146,11 +1146,13 @@ func (s *Sandbox) Pause( // Start POSTPROCESSING var dedupBase block.ReadonlyDevice var dedupBestEffort, dedupDirectIO bool + var dedupMinChangedPagesPerBlock int dedupCfg := s.featureFlags.JSONFlag(ctx, featureflags.MemfileDiffDedupFlag, sandboxLDContext(s.Runtime, s.Config)).AsValueMap() if dedupCfg.Get("enabled").BoolValue() { dedupBase = originalMemfile dedupBestEffort = dedupCfg.Get("bestEffort").BoolValue() dedupDirectIO = dedupCfg.Get("directIO").BoolValue() + dedupMinChangedPagesPerBlock = dedupCfg.Get("minChangedPagesPerBlock").IntValue() } memfileDiff, memfileDiffHeader, err := pauseProcessMemory( ctx, @@ -1164,6 +1166,7 @@ func (s *Sandbox) Pause( dedupBase, dedupBestEffort, dedupDirectIO, + dedupMinChangedPagesPerBlock, ) if err != nil { return nil, fmt.Errorf("error while post processing: %w", err) @@ -1237,6 +1240,7 @@ func pauseProcessMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, + dedupMinChangedPagesPerBlock int, ) (d build.Diff, h *DiffHeader, e error) { ctx, span := tracer.Start(ctx, "process-memory") defer span.End() @@ -1246,7 +1250,7 @@ func pauseProcessMemory( // ExportMemory owns memfd and closes it on all paths. cache, err := fc.ExportMemory( ctx, diffMetadata.Dirty, memfileDiffPath, diffMetadata.BlockSize, memfd, bgCopy, - originalMemfile, dedupBestEffort, dedupDirectIO, diffMetadata.Empty, metaOut, + originalMemfile, dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock, diffMetadata.Empty, metaOut, ) if err != nil { return nil, nil, fmt.Errorf("failed to export memory: %w", err) diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 068308e811..bf11c81676 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -140,12 +140,14 @@ var ( MemfdBackgroundCopyFlag = NewBoolFlag("memfd-background-copy", false) // MemfileDiffDedupFlag enables 4 KiB-page dedup of the memfile diff - // against the base memfile. bestEffort skips uncached blocks; - // directIO opens the dedup output with O_DIRECT. + // against the base memfile. bestEffort skips uncached blocks; directIO + // opens the dedup output with O_DIRECT; minChangedPagesPerBlock stores + // sparsely changed dirty blocks whole instead of emitting tiny mappings. MemfileDiffDedupFlag = NewJSONFlag("memfile-diff-dedup", ldvalue.FromJSONMarshal(map[string]any{ - "enabled": false, - "bestEffort": false, - "directIO": false, + "enabled": false, + "bestEffort": false, + "directIO": false, + "minChangedPagesPerBlock": 0, })) // PeerToPeerChunkTransferFlag enables peer-to-peer chunk routing. From 45404d10abc8f20e91b6d6d62076babf8504e1ab Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Fri, 29 May 2026 16:08:00 -0700 Subject: [PATCH 02/13] perf(orch): compact dedup fetch runs within memory blocks --- .../orchestrator/pkg/sandbox/block/cache.go | 186 +++++++++++++++--- .../pkg/sandbox/block/cache_test.go | 28 +-- .../orchestrator/pkg/sandbox/block/memfd.go | 12 +- .../pkg/sandbox/block/memfd_test.go | 2 +- .../orchestrator/pkg/sandbox/fc/memory.go | 8 +- packages/orchestrator/pkg/sandbox/sandbox.go | 16 +- packages/shared/pkg/featureflags/flags.go | 13 +- 7 files changed, 207 insertions(+), 58 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 0bd1bd753d..f55f0e97be 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -11,6 +11,7 @@ import ( "math" "math/rand" "os" + "slices" "sync" "sync/atomic" "syscall" @@ -214,6 +215,32 @@ type dedupPlan struct { promotedPages int64 } +const ( + dedupPageEmpty byte = iota + dedupPageParent + dedupPageCurrent +) + +const ( + currentFetchSource byte = iota + 1 + parentFetchSource +) + +const ( + defaultDedupFetchWindowPages = 2 << 20 / header.PageSize +) + +type dedupFetchKey struct { + source byte + build uuid.UUID + window uint64 +} + +type dedupPageInfo struct { + kind byte + key dedupFetchKey +} + // dedupCompare classifies each dirty page against base into pageDirty or // pageEmpty. Per-page IsCached so a single uncached neighbour can't poison // cached pages of the same block when the parent header is page-granular. @@ -224,13 +251,20 @@ func dedupCompare( dirty *roaring.Bitmap, blockSize int64, bestEffort bool, - minChangedPagesPerBlock int, + maxFetchWindowsPerBlock int, + maxPromotedParentPagesPerBlock int, + fetchWindowPages int, ) (*dedupPlan, error) { pageDirty := roaring.New() pageEmpty := roaring.New() var exportedSize int64 var promotedBlocks int64 var promotedPages int64 + var currentStoredPages int64 + + if fetchWindowPages <= 0 { + fetchWindowPages = defaultDedupFetchWindowPages + } baseHeader := base.Header() peeker, _ := base.(CachePeeker) @@ -249,33 +283,33 @@ func dedupCompare( return nil, err } - blockDirty := roaring.New() - blockEmpty := roaring.New() + pagesPerBlock := int(blockSize / header.PageSize) + blockPages := make([]dedupPageInfo, pagesPerBlock) - for i := int64(0); i < blockSize; i += header.PageSize { + for page := range pagesPerBlock { + i := int64(page) * header.PageSize srcPage := srcBuf[i : i+header.PageSize] - pageIdx := uint32((absOff + i) / header.PageSize) pageOff := absOff + i if header.IsZero(srcPage) { - blockEmpty.Add(pageIdx) - continue } - skip := false + var mapped header.BuildMap + hasMapping := false if baseHeader != nil { if m, err := baseHeader.GetShiftedMapping(ctx, pageOff); err == nil { + mapped = m + hasMapping = true if m.BuildId == uuid.Nil && int64(m.Length) >= header.PageSize { - skip = true + blockPages[page].kind = dedupPageCurrent + + continue } } } - if !skip && bestEffort && peeker != nil && !peeker.IsCached(ctx, pageOff, header.PageSize) { - skip = true - } - if skip { - blockDirty.Add(pageIdx) + if bestEffort && peeker != nil && !peeker.IsCached(ctx, pageOff, header.PageSize) { + blockPages[page].kind = dedupPageCurrent continue } @@ -285,25 +319,35 @@ func dedupCompare( return nil, fmt.Errorf("slice base at %d: %w", pageOff, sErr) } if bytes.Equal(srcPage, basePage) { + blockPages[page].kind = dedupPageParent + if hasMapping { + blockPages[page].key = dedupFetchKey{source: parentFetchSource, build: mapped.BuildId, window: mapped.Offset / uint64(fetchWindowPages*header.PageSize)} + } else { + blockPages[page].key = dedupFetchKey{source: parentFetchSource, window: uint64(pageOff) / uint64(fetchWindowPages*header.PageSize)} + } + continue } - blockDirty.Add(pageIdx) + blockPages[page].kind = dedupPageCurrent } - changedPages := int(blockDirty.GetCardinality()) - if minChangedPagesPerBlock > 0 && changedPages > 0 && changedPages <= minChangedPagesPerBlock { - startPage := uint64(absOff / header.PageSize) - endPage := startPage + uint64(blockSize/header.PageSize) - pageDirty.AddRange(startPage, endPage) + promoted := compactDedupFetchWindows(blockPages, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchWindowPages, currentStoredPages) + if promoted > 0 { promotedBlocks++ - promotedPages += int64(endPage - startPage) - - continue + promotedPages += int64(promoted) } - pageDirty.Or(blockDirty) - pageEmpty.Or(blockEmpty) + for page, info := range blockPages { + pageIdx := uint32(absOff/header.PageSize) + uint32(page) + switch info.kind { + case dedupPageEmpty: + pageEmpty.Add(pageIdx) + case dedupPageCurrent: + pageDirty.Add(pageIdx) + currentStoredPages++ + } + } } } @@ -316,6 +360,92 @@ func dedupCompare( }, nil } +func compactDedupFetchWindows(pages []dedupPageInfo, maxWindows, maxPromoted, windowPages int, currentStart int64) int { + if maxWindows <= 0 || maxPromoted <= 0 { + return 0 + } + + var promoted int + for promoted < maxPromoted && countFetchWindows(pages, windowPages, currentStart) > maxWindows { + start, end := bestParentRunToPromote(pages, maxPromoted-promoted, windowPages, currentStart) + if start < 0 { + break + } + for i := start; i < end; i++ { + if pages[i].kind == dedupPageParent { + pages[i].kind = dedupPageCurrent + pages[i].key = dedupFetchKey{} + promoted++ + } + } + } + + return promoted +} + +func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int64) int { + keys := make(map[dedupFetchKey]struct{}) + var currentOrdinal int64 + for _, p := range pages { + switch p.kind { + case dedupPageParent: + keys[p.key] = struct{}{} + case dedupPageCurrent: + keys[dedupFetchKey{source: currentFetchSource, window: uint64(currentStart+currentOrdinal) / uint64(windowPages)}] = struct{}{} + currentOrdinal++ + } + } + + return len(keys) +} + +func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64) (int, int) { + before := countFetchWindows(pages, windowPages, currentStart) + bestStart, bestEnd, bestBenefit, bestCost := -1, -1, 0, 0 + for i := 0; i < len(pages); { + for i < len(pages) && pages[i].kind != dedupPageParent { + i++ + } + start := i + for i < len(pages) && (pages[i].kind == dedupPageParent || pages[i].kind == dedupPageEmpty) { + i++ + } + end := i + cost := countKind(pages[start:end], dedupPageParent) + if cost == 0 || cost > budget { + continue + } + + candidate := slices.Clone(pages) + for j := start; j < end; j++ { + if candidate[j].kind == dedupPageParent { + candidate[j].kind = dedupPageCurrent + candidate[j].key = dedupFetchKey{} + } + } + benefit := before - countFetchWindows(candidate, windowPages, currentStart) + if benefit <= 0 { + continue + } + if bestStart < 0 || cost*bestBenefit < bestCost*benefit { + bestStart, bestEnd, bestBenefit, bestCost = start, end, benefit, cost + } + } + + return bestStart, bestEnd +} + +func countKind(pages []dedupPageInfo, kind byte) int { + var n int + for _, p := range pages { + if p.kind == kind { + n++ + } + } + + return n +} + // dedupDrain writes pageDirty pages from src to outPath packed at PageSize. func dedupDrain( ctx context.Context, @@ -432,7 +562,9 @@ func (c *Cache) Dedup( outPath string, bestEffort bool, directIO bool, - minChangedPagesPerBlock int, + maxFetchWindowsPerBlock int, + maxPromotedParentPagesPerBlock int, + fetchRunWindowPages int, ) (*Cache, *header.DiffMetadata, error) { ctx, span := tracer.Start(ctx, "dedup-pages") defer span.End() @@ -456,7 +588,7 @@ func (c *Cache) Dedup( } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, minChangedPagesPerBlock) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages) if err != nil { return nil, nil, err } diff --git a/packages/orchestrator/pkg/sandbox/block/cache_test.go b/packages/orchestrator/pkg/sandbox/block/cache_test.go index 62b631368d..a9387d29d7 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache_test.go +++ b/packages/orchestrator/pkg/sandbox/block/cache_test.go @@ -786,7 +786,7 @@ func runDedup(t *testing.T, srcMem, baseMem []byte, dirty *roaring.Bitmap, block t.Helper() src := buildPackedSrcCache(t, srcMem, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -908,7 +908,7 @@ func TestCacheDedup_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) cancel() - _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) + _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) require.ErrorIs(t, err, context.Canceled) } @@ -933,6 +933,8 @@ func TestCacheDedup_OriginalMemfileReadError(t *testing.T) { false, false, 0, + 0, + 0, ) require.ErrorIs(t, err, sentinel) } @@ -966,7 +968,7 @@ func TestCacheDedup_EmptyParentMappingSkipsBaseReadAt(t *testing.T) { junk := bytes.Repeat([]byte{0xFF}, int(size)) base := &fakeOriginalDevice{data: junk, hdr: hdr} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1059,7 +1061,7 @@ func TestCacheDedup_BestEffortUncachedSkipsBaseReadAt(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: false} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1091,7 +1093,7 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: true} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1099,7 +1101,7 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { require.EqualValues(t, 0, meta.Empty.GetCardinality()) } -func TestCacheDedup_MinChangedPagesPromotesSparseBlock(t *testing.T) { +func TestCacheDedup_FetchRunBudgetPromotesSmallParentRun(t *testing.T) { t.Parallel() pageSize := int64(header.PageSize) @@ -1111,19 +1113,21 @@ func TestCacheDedup_MinChangedPagesPromotesSparseBlock(t *testing.T) { require.NoError(t, err) srcData := make([]byte, size) copy(srcData, baseData) - srcData[pageSize] ^= 0xFF + srcData[0] ^= 0xFF + srcData[2*pageSize] ^= 0xFF + clear(srcData[3*pageSize : 4*pageSize]) dirty := fullDirty(size, blockSize) src := buildPackedSrcCache(t, srcData, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 1) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 1, 1, 4) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) - require.EqualValues(t, blockSize/pageSize, meta.Dirty.GetCardinality()) - require.Zero(t, meta.Empty.GetCardinality()) + require.EqualValues(t, 3, meta.Dirty.GetCardinality()) + require.EqualValues(t, 1, meta.Empty.GetCardinality()) - for i := range blockSize / pageSize { + for _, i := range []int64{0, 1, 2} { got := make([]byte, pageSize) _, err := cache.ReadAt(got, i*pageSize) require.NoError(t, err) @@ -1165,7 +1169,7 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { cachedPages: map[uint32]bool{0: true, 1: false, 2: true, 3: false}, } - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go index 4c562697a0..ba6be49463 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd.go @@ -246,7 +246,9 @@ func NewCacheFromMemfdDeduped( dirty *roaring.Bitmap, bestEffort bool, directIO bool, - minChangedPagesPerBlock int, + maxFetchWindowsPerBlock int, + maxPromotedParentPagesPerBlock int, + fetchRunWindowPages int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (*DedupedMemfdCache, error) { @@ -259,7 +261,7 @@ func NewCacheFromMemfdDeduped( cancel: cancel, done: utils.NewSetOnce[*Cache](), } - go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, minChangedPagesPerBlock, inputEmpty, metaOut) + go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages, inputEmpty, metaOut) return d, nil } @@ -271,7 +273,9 @@ func (d *DedupedMemfdCache) runDedup( memfd *Memfd, dirty *roaring.Bitmap, bestEffort, directIO bool, - minChangedPagesPerBlock int, + maxFetchWindowsPerBlock int, + maxPromotedParentPagesPerBlock int, + fetchRunWindowPages int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) { @@ -281,7 +285,7 @@ func (d *DedupedMemfdCache) runDedup( src := func(absOff int64) ([]byte, error) { return memfd.Slice(absOff, blockSize) } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, minChangedPagesPerBlock) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages) compareDur := time.Since(compareStart) if err != nil { logSetOnceErr(ctx, "dedup metaOut", metaOut.SetError(err)) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go index 3e8317b75e..de8f9343c0 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go @@ -227,7 +227,7 @@ func TestNewCacheFromMemfdDeduped_DetachesCompareAndDrain(t *testing.T) { metaOut := utils.NewSetOnce[*header.DiffMetadata]() cache, err := NewCacheFromMemfdDeduped( ctx, &fakeOriginalDevice{data: baseData}, pageSize, cachePath, memfd, dirty, false, false, - 0, nil, metaOut, + 0, 0, 0, nil, metaOut, ) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/fc/memory.go b/packages/orchestrator/pkg/sandbox/fc/memory.go index 6c41d5bd00..b8f0a86abc 100644 --- a/packages/orchestrator/pkg/sandbox/fc/memory.go +++ b/packages/orchestrator/pkg/sandbox/fc/memory.go @@ -80,7 +80,9 @@ func (p *Process) ExportMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, - dedupMinChangedPagesPerBlock int, + dedupMaxFetchWindowsPerBlock int, + dedupMaxPromotedParentPagesPerBlock int, + dedupFetchRunWindowPages int, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (_ block.DiffSource, e error) { @@ -99,7 +101,7 @@ func (p *Process) ExportMemory( if memfd != nil { if originalMemfile != nil { return block.NewCacheFromMemfdDeduped(ctx, originalMemfile, blockSize, cachePath, memfd, include, - dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock, inputEmpty, metaOut) + dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages, inputEmpty, metaOut) } var ( src block.DiffSource @@ -132,7 +134,7 @@ func (p *Process) ExportMemory( return cache, nil } // .dedup suffix avoids clobbering the source mmap during truncate. - dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock) + dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages) if err != nil { return nil, fmt.Errorf("dedup memfile diff: %w", errors.Join(err, cache.Close())) } diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index ad43c52b08..edfc8b532c 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -1146,13 +1146,15 @@ func (s *Sandbox) Pause( // Start POSTPROCESSING var dedupBase block.ReadonlyDevice var dedupBestEffort, dedupDirectIO bool - var dedupMinChangedPagesPerBlock int + var dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages int dedupCfg := s.featureFlags.JSONFlag(ctx, featureflags.MemfileDiffDedupFlag, sandboxLDContext(s.Runtime, s.Config)).AsValueMap() if dedupCfg.Get("enabled").BoolValue() { dedupBase = originalMemfile dedupBestEffort = dedupCfg.Get("bestEffort").BoolValue() dedupDirectIO = dedupCfg.Get("directIO").BoolValue() - dedupMinChangedPagesPerBlock = dedupCfg.Get("minChangedPagesPerBlock").IntValue() + dedupMaxFetchWindowsPerBlock = dedupCfg.Get("maxFetchWindowsPerBlock").IntValue() + dedupMaxPromotedParentPagesPerBlock = dedupCfg.Get("maxPromotedParentPagesPerBlock").IntValue() + dedupFetchRunWindowPages = dedupCfg.Get("fetchRunWindowPages").IntValue() } memfileDiff, memfileDiffHeader, err := pauseProcessMemory( ctx, @@ -1166,7 +1168,9 @@ func (s *Sandbox) Pause( dedupBase, dedupBestEffort, dedupDirectIO, - dedupMinChangedPagesPerBlock, + dedupMaxFetchWindowsPerBlock, + dedupMaxPromotedParentPagesPerBlock, + dedupFetchRunWindowPages, ) if err != nil { return nil, fmt.Errorf("error while post processing: %w", err) @@ -1240,7 +1244,9 @@ func pauseProcessMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, - dedupMinChangedPagesPerBlock int, + dedupMaxFetchWindowsPerBlock int, + dedupMaxPromotedParentPagesPerBlock int, + dedupFetchRunWindowPages int, ) (d build.Diff, h *DiffHeader, e error) { ctx, span := tracer.Start(ctx, "process-memory") defer span.End() @@ -1250,7 +1256,7 @@ func pauseProcessMemory( // ExportMemory owns memfd and closes it on all paths. cache, err := fc.ExportMemory( ctx, diffMetadata.Dirty, memfileDiffPath, diffMetadata.BlockSize, memfd, bgCopy, - originalMemfile, dedupBestEffort, dedupDirectIO, dedupMinChangedPagesPerBlock, diffMetadata.Empty, metaOut, + originalMemfile, dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages, diffMetadata.Empty, metaOut, ) if err != nil { return nil, nil, fmt.Errorf("failed to export memory: %w", err) diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index bf11c81676..95c9986226 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -141,13 +141,14 @@ var ( // MemfileDiffDedupFlag enables 4 KiB-page dedup of the memfile diff // against the base memfile. bestEffort skips uncached blocks; directIO - // opens the dedup output with O_DIRECT; minChangedPagesPerBlock stores - // sparsely changed dirty blocks whole instead of emitting tiny mappings. + // opens the dedup output with O_DIRECT. MemfileDiffDedupFlag = NewJSONFlag("memfile-diff-dedup", ldvalue.FromJSONMarshal(map[string]any{ - "enabled": false, - "bestEffort": false, - "directIO": false, - "minChangedPagesPerBlock": 0, + "enabled": false, + "bestEffort": false, + "directIO": false, + "maxFetchWindowsPerBlock": 0, + "maxPromotedParentPagesPerBlock": 0, + "fetchRunWindowPages": 0, })) // PeerToPeerChunkTransferFlag enables peer-to-peer chunk routing. From e4f45c379848d7532ad96e556e48c87d755a424b Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 13:34:44 -0700 Subject: [PATCH 03/13] fix(orch): clarify dedup fetch window constants --- .../orchestrator/pkg/sandbox/block/cache.go | 79 +++++++++++++++---- .../pkg/sandbox/block/cache_test.go | 18 ++--- .../orchestrator/pkg/sandbox/block/memfd.go | 12 +-- .../pkg/sandbox/block/memfd_test.go | 2 +- .../orchestrator/pkg/sandbox/fc/memory.go | 8 +- packages/orchestrator/pkg/sandbox/sandbox.go | 20 +++-- 6 files changed, 89 insertions(+), 50 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index f55f0e97be..3d4ab03487 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -26,6 +26,7 @@ import ( "golang.org/x/sys/unix" "github.com/e2b-dev/infra/packages/shared/pkg/logger" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -215,6 +216,12 @@ type dedupPlan struct { promotedPages int64 } +type DedupBudget struct { + MaxFetchWindowsPerBlock int + MaxPromotedParentPagesPerBlock int + FetchRunWindowPages int +} + const ( dedupPageEmpty byte = iota dedupPageParent @@ -227,13 +234,13 @@ const ( ) const ( - defaultDedupFetchWindowPages = 2 << 20 / header.PageSize + defaultDedupFetchWindowPages = storage.DefaultCompressFrameSize / header.PageSize ) type dedupFetchKey struct { source byte build uuid.UUID - window uint64 + window int } type dedupPageInfo struct { @@ -251,9 +258,7 @@ func dedupCompare( dirty *roaring.Bitmap, blockSize int64, bestEffort bool, - maxFetchWindowsPerBlock int, - maxPromotedParentPagesPerBlock int, - fetchWindowPages int, + budget DedupBudget, ) (*dedupPlan, error) { pageDirty := roaring.New() pageEmpty := roaring.New() @@ -262,8 +267,8 @@ func dedupCompare( var promotedPages int64 var currentStoredPages int64 - if fetchWindowPages <= 0 { - fetchWindowPages = defaultDedupFetchWindowPages + if budget.FetchRunWindowPages <= 0 { + budget.FetchRunWindowPages = defaultDedupFetchWindowPages } baseHeader := base.Header() @@ -320,11 +325,14 @@ func dedupCompare( } if bytes.Equal(srcPage, basePage) { blockPages[page].kind = dedupPageParent + key := dedupFetchKey{source: parentFetchSource} if hasMapping { - blockPages[page].key = dedupFetchKey{source: parentFetchSource, build: mapped.BuildId, window: mapped.Offset / uint64(fetchWindowPages*header.PageSize)} + key.build = mapped.BuildId + key.window = int(mapped.Offset / uint64(budget.FetchRunWindowPages*header.PageSize)) } else { - blockPages[page].key = dedupFetchKey{source: parentFetchSource, window: uint64(pageOff) / uint64(fetchWindowPages*header.PageSize)} + key.window = int(pageOff / int64(budget.FetchRunWindowPages*header.PageSize)) } + blockPages[page].key = key continue } @@ -332,7 +340,7 @@ func dedupCompare( blockPages[page].kind = dedupPageCurrent } - promoted := compactDedupFetchWindows(blockPages, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchWindowPages, currentStoredPages) + promoted := compactDedupFetchWindows(blockPages, budget.MaxFetchWindowsPerBlock, budget.MaxPromotedParentPagesPerBlock, budget.FetchRunWindowPages, currentStoredPages) if promoted > 0 { promotedBlocks++ promotedPages += int64(promoted) @@ -391,7 +399,7 @@ func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int6 case dedupPageParent: keys[p.key] = struct{}{} case dedupPageCurrent: - keys[dedupFetchKey{source: currentFetchSource, window: uint64(currentStart+currentOrdinal) / uint64(windowPages)}] = struct{}{} + keys[dedupFetchKey{source: currentFetchSource, window: int(currentStart+currentOrdinal) / windowPages}] = struct{}{} currentOrdinal++ } } @@ -431,6 +439,49 @@ func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, curr bestStart, bestEnd, bestBenefit, bestCost = start, end, benefit, cost } } + if bestStart < 0 { + return bestParentKeyToPromote(pages, budget, windowPages, currentStart, before) + } + + return bestStart, bestEnd +} + +func bestParentKeyToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64, before int) (int, int) { + type span struct{ start, end, cost int } + spans := make(map[dedupFetchKey]span) + for i, p := range pages { + if p.kind != dedupPageParent { + continue + } + s := spans[p.key] + if s.cost == 0 { + s.start = i + } + s.end = i + 1 + s.cost++ + spans[p.key] = s + } + + bestStart, bestEnd, bestBenefit, bestCost := -1, -1, 0, 0 + for key, s := range spans { + if s.cost > budget { + continue + } + candidate := slices.Clone(pages) + for i := s.start; i < s.end; i++ { + if candidate[i].kind == dedupPageParent && candidate[i].key == key { + candidate[i].kind = dedupPageCurrent + candidate[i].key = dedupFetchKey{} + } + } + benefit := before - countFetchWindows(candidate, windowPages, currentStart) + if benefit <= 0 { + continue + } + if bestStart < 0 || s.cost*bestBenefit < bestCost*benefit { + bestStart, bestEnd, bestBenefit, bestCost = s.start, s.end, benefit, s.cost + } + } return bestStart, bestEnd } @@ -562,9 +613,7 @@ func (c *Cache) Dedup( outPath string, bestEffort bool, directIO bool, - maxFetchWindowsPerBlock int, - maxPromotedParentPagesPerBlock int, - fetchRunWindowPages int, + budget DedupBudget, ) (*Cache, *header.DiffMetadata, error) { ctx, span := tracer.Start(ctx, "dedup-pages") defer span.End() @@ -588,7 +637,7 @@ func (c *Cache) Dedup( } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) if err != nil { return nil, nil, err } diff --git a/packages/orchestrator/pkg/sandbox/block/cache_test.go b/packages/orchestrator/pkg/sandbox/block/cache_test.go index a9387d29d7..fe46f50fe4 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache_test.go +++ b/packages/orchestrator/pkg/sandbox/block/cache_test.go @@ -786,7 +786,7 @@ func runDedup(t *testing.T, srcMem, baseMem []byte, dirty *roaring.Bitmap, block t.Helper() src := buildPackedSrcCache(t, srcMem, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -908,7 +908,7 @@ func TestCacheDedup_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) cancel() - _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) + _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.ErrorIs(t, err, context.Canceled) } @@ -932,9 +932,7 @@ func TestCacheDedup_OriginalMemfileReadError(t *testing.T) { t.TempDir()+"/dedup", false, false, - 0, - 0, - 0, + DedupBudget{}, ) require.ErrorIs(t, err, sentinel) } @@ -968,7 +966,7 @@ func TestCacheDedup_EmptyParentMappingSkipsBaseReadAt(t *testing.T) { junk := bytes.Repeat([]byte{0xFF}, int(size)) base := &fakeOriginalDevice{data: junk, hdr: hdr} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, 0, 0, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1061,7 +1059,7 @@ func TestCacheDedup_BestEffortUncachedSkipsBaseReadAt(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: false} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1093,7 +1091,7 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: true} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1120,7 +1118,7 @@ func TestCacheDedup_FetchRunBudgetPromotesSmallParentRun(t *testing.T) { dirty := fullDirty(size, blockSize) src := buildPackedSrcCache(t, srcData, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, 1, 1, 4) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{MaxFetchWindowsPerBlock: 1, MaxPromotedParentPagesPerBlock: 1, FetchRunWindowPages: 4}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1169,7 +1167,7 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { cachedPages: map[uint32]bool{0: true, 1: false, 2: true, 3: false}, } - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, 0, 0, 0) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go index ba6be49463..ebef339b47 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd.go @@ -246,9 +246,7 @@ func NewCacheFromMemfdDeduped( dirty *roaring.Bitmap, bestEffort bool, directIO bool, - maxFetchWindowsPerBlock int, - maxPromotedParentPagesPerBlock int, - fetchRunWindowPages int, + budget DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (*DedupedMemfdCache, error) { @@ -261,7 +259,7 @@ func NewCacheFromMemfdDeduped( cancel: cancel, done: utils.NewSetOnce[*Cache](), } - go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages, inputEmpty, metaOut) + go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, budget, inputEmpty, metaOut) return d, nil } @@ -273,9 +271,7 @@ func (d *DedupedMemfdCache) runDedup( memfd *Memfd, dirty *roaring.Bitmap, bestEffort, directIO bool, - maxFetchWindowsPerBlock int, - maxPromotedParentPagesPerBlock int, - fetchRunWindowPages int, + budget DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) { @@ -285,7 +281,7 @@ func (d *DedupedMemfdCache) runDedup( src := func(absOff int64) ([]byte, error) { return memfd.Slice(absOff, blockSize) } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, maxFetchWindowsPerBlock, maxPromotedParentPagesPerBlock, fetchRunWindowPages) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) compareDur := time.Since(compareStart) if err != nil { logSetOnceErr(ctx, "dedup metaOut", metaOut.SetError(err)) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go index de8f9343c0..d7e4a66278 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go @@ -227,7 +227,7 @@ func TestNewCacheFromMemfdDeduped_DetachesCompareAndDrain(t *testing.T) { metaOut := utils.NewSetOnce[*header.DiffMetadata]() cache, err := NewCacheFromMemfdDeduped( ctx, &fakeOriginalDevice{data: baseData}, pageSize, cachePath, memfd, dirty, false, false, - 0, 0, 0, nil, metaOut, + DedupBudget{}, nil, metaOut, ) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/fc/memory.go b/packages/orchestrator/pkg/sandbox/fc/memory.go index b8f0a86abc..58a63488f3 100644 --- a/packages/orchestrator/pkg/sandbox/fc/memory.go +++ b/packages/orchestrator/pkg/sandbox/fc/memory.go @@ -80,9 +80,7 @@ func (p *Process) ExportMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, - dedupMaxFetchWindowsPerBlock int, - dedupMaxPromotedParentPagesPerBlock int, - dedupFetchRunWindowPages int, + dedupBudget block.DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (_ block.DiffSource, e error) { @@ -101,7 +99,7 @@ func (p *Process) ExportMemory( if memfd != nil { if originalMemfile != nil { return block.NewCacheFromMemfdDeduped(ctx, originalMemfile, blockSize, cachePath, memfd, include, - dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages, inputEmpty, metaOut) + dedupBestEffort, dedupDirectIO, dedupBudget, inputEmpty, metaOut) } var ( src block.DiffSource @@ -134,7 +132,7 @@ func (p *Process) ExportMemory( return cache, nil } // .dedup suffix avoids clobbering the source mmap during truncate. - dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages) + dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupBudget) if err != nil { return nil, fmt.Errorf("dedup memfile diff: %w", errors.Join(err, cache.Close())) } diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index edfc8b532c..302c8dc138 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -1146,15 +1146,17 @@ func (s *Sandbox) Pause( // Start POSTPROCESSING var dedupBase block.ReadonlyDevice var dedupBestEffort, dedupDirectIO bool - var dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages int + var dedupBudget block.DedupBudget dedupCfg := s.featureFlags.JSONFlag(ctx, featureflags.MemfileDiffDedupFlag, sandboxLDContext(s.Runtime, s.Config)).AsValueMap() if dedupCfg.Get("enabled").BoolValue() { dedupBase = originalMemfile dedupBestEffort = dedupCfg.Get("bestEffort").BoolValue() dedupDirectIO = dedupCfg.Get("directIO").BoolValue() - dedupMaxFetchWindowsPerBlock = dedupCfg.Get("maxFetchWindowsPerBlock").IntValue() - dedupMaxPromotedParentPagesPerBlock = dedupCfg.Get("maxPromotedParentPagesPerBlock").IntValue() - dedupFetchRunWindowPages = dedupCfg.Get("fetchRunWindowPages").IntValue() + dedupBudget = block.DedupBudget{ + MaxFetchWindowsPerBlock: dedupCfg.Get("maxFetchWindowsPerBlock").IntValue(), + MaxPromotedParentPagesPerBlock: dedupCfg.Get("maxPromotedParentPagesPerBlock").IntValue(), + FetchRunWindowPages: dedupCfg.Get("fetchRunWindowPages").IntValue(), + } } memfileDiff, memfileDiffHeader, err := pauseProcessMemory( ctx, @@ -1168,9 +1170,7 @@ func (s *Sandbox) Pause( dedupBase, dedupBestEffort, dedupDirectIO, - dedupMaxFetchWindowsPerBlock, - dedupMaxPromotedParentPagesPerBlock, - dedupFetchRunWindowPages, + dedupBudget, ) if err != nil { return nil, fmt.Errorf("error while post processing: %w", err) @@ -1244,9 +1244,7 @@ func pauseProcessMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, - dedupMaxFetchWindowsPerBlock int, - dedupMaxPromotedParentPagesPerBlock int, - dedupFetchRunWindowPages int, + dedupBudget block.DedupBudget, ) (d build.Diff, h *DiffHeader, e error) { ctx, span := tracer.Start(ctx, "process-memory") defer span.End() @@ -1256,7 +1254,7 @@ func pauseProcessMemory( // ExportMemory owns memfd and closes it on all paths. cache, err := fc.ExportMemory( ctx, diffMetadata.Dirty, memfileDiffPath, diffMetadata.BlockSize, memfd, bgCopy, - originalMemfile, dedupBestEffort, dedupDirectIO, dedupMaxFetchWindowsPerBlock, dedupMaxPromotedParentPagesPerBlock, dedupFetchRunWindowPages, diffMetadata.Empty, metaOut, + originalMemfile, dedupBestEffort, dedupDirectIO, dedupBudget, diffMetadata.Empty, metaOut, ) if err != nil { return nil, nil, fmt.Errorf("failed to export memory: %w", err) From 7eed0aa892dfdd061864f058c669127ec2280066 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 18:31:33 -0700 Subject: [PATCH 04/13] fix(orch): promote only matching parent pages in dedup fallback --- .../orchestrator/pkg/sandbox/block/cache.go | 103 ++++++++---------- 1 file changed, 45 insertions(+), 58 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 3d4ab03487..0dce386426 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -375,16 +375,14 @@ func compactDedupFetchWindows(pages []dedupPageInfo, maxWindows, maxPromoted, wi var promoted int for promoted < maxPromoted && countFetchWindows(pages, windowPages, currentStart) > maxWindows { - start, end := bestParentRunToPromote(pages, maxPromoted-promoted, windowPages, currentStart) - if start < 0 { + idxs := bestParentRunToPromote(pages, maxPromoted-promoted, windowPages, currentStart) + if len(idxs) == 0 { break } - for i := start; i < end; i++ { - if pages[i].kind == dedupPageParent { - pages[i].kind = dedupPageCurrent - pages[i].key = dedupFetchKey{} - promoted++ - } + for _, i := range idxs { + pages[i].kind = dedupPageCurrent + pages[i].key = dedupFetchKey{} + promoted++ } } @@ -407,9 +405,13 @@ func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int6 return len(keys) } -func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64) (int, int) { +// bestParentRunToPromote returns the parent page indices whose promotion to +// current best reduces fetch windows per promoted page, scanning contiguous +// parent/empty runs first and falling back to per-key promotion. +func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64) []int { before := countFetchWindows(pages, windowPages, currentStart) - bestStart, bestEnd, bestBenefit, bestCost := -1, -1, 0, 0 + var best []int + bestBenefit, bestCost := 0, 0 for i := 0; i < len(pages); { for i < len(pages) && pages[i].kind != dedupPageParent { i++ @@ -418,83 +420,68 @@ func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, curr for i < len(pages) && (pages[i].kind == dedupPageParent || pages[i].kind == dedupPageEmpty) { i++ } - end := i - cost := countKind(pages[start:end], dedupPageParent) + var idxs []int + for j := start; j < i; j++ { + if pages[j].kind == dedupPageParent { + idxs = append(idxs, j) + } + } + cost := len(idxs) if cost == 0 || cost > budget { continue } - - candidate := slices.Clone(pages) - for j := start; j < end; j++ { - if candidate[j].kind == dedupPageParent { - candidate[j].kind = dedupPageCurrent - candidate[j].key = dedupFetchKey{} - } - } - benefit := before - countFetchWindows(candidate, windowPages, currentStart) + benefit := before - countFetchWindowsAfter(pages, idxs, windowPages, currentStart) if benefit <= 0 { continue } - if bestStart < 0 || cost*bestBenefit < bestCost*benefit { - bestStart, bestEnd, bestBenefit, bestCost = start, end, benefit, cost + if best == nil || cost*bestBenefit < bestCost*benefit { + best, bestBenefit, bestCost = idxs, benefit, cost } } - if bestStart < 0 { + if best == nil { return bestParentKeyToPromote(pages, budget, windowPages, currentStart, before) } - return bestStart, bestEnd + return best } -func bestParentKeyToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64, before int) (int, int) { - type span struct{ start, end, cost int } - spans := make(map[dedupFetchKey]span) +func bestParentKeyToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64, before int) []int { + idxByKey := make(map[dedupFetchKey][]int) for i, p := range pages { - if p.kind != dedupPageParent { - continue - } - s := spans[p.key] - if s.cost == 0 { - s.start = i + if p.kind == dedupPageParent { + idxByKey[p.key] = append(idxByKey[p.key], i) } - s.end = i + 1 - s.cost++ - spans[p.key] = s } - bestStart, bestEnd, bestBenefit, bestCost := -1, -1, 0, 0 - for key, s := range spans { - if s.cost > budget { + var best []int + bestBenefit, bestCost := 0, 0 + for _, idxs := range idxByKey { + cost := len(idxs) + if cost > budget { continue } - candidate := slices.Clone(pages) - for i := s.start; i < s.end; i++ { - if candidate[i].kind == dedupPageParent && candidate[i].key == key { - candidate[i].kind = dedupPageCurrent - candidate[i].key = dedupFetchKey{} - } - } - benefit := before - countFetchWindows(candidate, windowPages, currentStart) + benefit := before - countFetchWindowsAfter(pages, idxs, windowPages, currentStart) if benefit <= 0 { continue } - if bestStart < 0 || s.cost*bestBenefit < bestCost*benefit { - bestStart, bestEnd, bestBenefit, bestCost = s.start, s.end, benefit, s.cost + if best == nil || cost*bestBenefit < bestCost*benefit { + best, bestBenefit, bestCost = idxs, benefit, cost } } - return bestStart, bestEnd + return best } -func countKind(pages []dedupPageInfo, kind byte) int { - var n int - for _, p := range pages { - if p.kind == kind { - n++ - } +// countFetchWindowsAfter counts fetch windows as if the given parent indices +// were promoted to current, without mutating pages. +func countFetchWindowsAfter(pages []dedupPageInfo, promote []int, windowPages int, currentStart int64) int { + candidate := slices.Clone(pages) + for _, i := range promote { + candidate[i].kind = dedupPageCurrent + candidate[i].key = dedupFetchKey{} } - return n + return countFetchWindows(candidate, windowPages, currentStart) } // dedupDrain writes pageDirty pages from src to outPath packed at PageSize. From c825a691b2900981c1625c504c60087ad6287ed7 Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 00:52:45 -0700 Subject: [PATCH 05/13] refactor(orch): name dedup page kind and fetch source types Give the dedup page-kind and fetch-source enums named byte types instead of bare byte, and rename dedupFetchKey fields source/build to sourceType/buildID for clarity. No behavior change. --- .../orchestrator/pkg/sandbox/block/cache.go | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 0dce386426..add26d6ef1 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -222,14 +222,18 @@ type DedupBudget struct { FetchRunWindowPages int } +type dedupPageKind byte + const ( - dedupPageEmpty byte = iota + dedupPageEmpty dedupPageKind = iota dedupPageParent dedupPageCurrent ) +type fetchSource byte + const ( - currentFetchSource byte = iota + 1 + currentFetchSource fetchSource = iota + 1 parentFetchSource ) @@ -238,13 +242,13 @@ const ( ) type dedupFetchKey struct { - source byte - build uuid.UUID - window int + sourceType fetchSource + buildID uuid.UUID + window int } type dedupPageInfo struct { - kind byte + kind dedupPageKind key dedupFetchKey } @@ -325,9 +329,9 @@ func dedupCompare( } if bytes.Equal(srcPage, basePage) { blockPages[page].kind = dedupPageParent - key := dedupFetchKey{source: parentFetchSource} + key := dedupFetchKey{sourceType: parentFetchSource} if hasMapping { - key.build = mapped.BuildId + key.buildID = mapped.BuildId key.window = int(mapped.Offset / uint64(budget.FetchRunWindowPages*header.PageSize)) } else { key.window = int(pageOff / int64(budget.FetchRunWindowPages*header.PageSize)) @@ -397,7 +401,7 @@ func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int6 case dedupPageParent: keys[p.key] = struct{}{} case dedupPageCurrent: - keys[dedupFetchKey{source: currentFetchSource, window: int(currentStart+currentOrdinal) / windowPages}] = struct{}{} + keys[dedupFetchKey{sourceType: currentFetchSource, window: int(currentStart+currentOrdinal) / windowPages}] = struct{}{} currentOrdinal++ } } From e7058690b9903709254915710f4a34b762e7d555 Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 00:54:15 -0700 Subject: [PATCH 06/13] feat(header): make GetShiftedMapping nil-receiver safe Return an error instead of panicking when called on a nil *Header, so callers can treat a missing header as 'no mapping' without a separate nil check. --- packages/shared/pkg/storage/header/header.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/shared/pkg/storage/header/header.go b/packages/shared/pkg/storage/header/header.go index b1852e2ff9..c3dd74c0d5 100644 --- a/packages/shared/pkg/storage/header/header.go +++ b/packages/shared/pkg/storage/header/header.go @@ -150,6 +150,10 @@ func (t *Header) IsNormalizeFixApplied() bool { // The read path uses this to find which build owns the data, then calls // GetBuildFrameData to get the FrameTable for C-space lookup. func (t *Header) GetShiftedMapping(ctx context.Context, offset int64) (BuildMap, error) { + if t == nil { + return BuildMap{}, errors.New("nil header") + } + mapping, shift, err := t.getMapping(ctx, offset) if err != nil { return BuildMap{}, err From d1cb55157c48dcf534d2a75e00d6361553885db0 Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 00:54:22 -0700 Subject: [PATCH 07/13] refactor(orch): split dedupCompare into focused helpers Break the monolithic dedupCompare (cyclomatic complexity 22) into a read-only dedupConfig with per-block compareBlock and per-page classifyPage helpers, plus an in-place dedupAccum accumulator. Extract the fetch-window promotion heuristic onto a fetchWindower receiver, collapsing the duplicated benefit/cost argmax into bestByRatio and the run/key candidate scanning into the parentRuns and parentKeyGroups iterators. parentKeyGroups now yields groups ordered by first page index so promotion selection is deterministic regardless of map order. classifyPage drops its baseHeader nil check now that GetShiftedMapping is nil-safe. No behavior change. --- .../orchestrator/pkg/sandbox/block/cache.go | 356 +++++++++++------- 1 file changed, 223 insertions(+), 133 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index add26d6ef1..a8c6ef63f2 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -8,6 +8,8 @@ import ( "errors" "fmt" "io" + "iter" + "maps" "math" "math/rand" "os" @@ -252,134 +254,207 @@ type dedupPageInfo struct { key dedupFetchKey } +// blockReader returns the source bytes for the block at absOff. +type blockReader func(absOff int64) ([]byte, error) + +// dedupConfig holds the immutable inputs of a dedup comparison. Its methods +// only read it; all mutable accumulation lives in dedupAccum. +type dedupConfig struct { + src blockReader + base ReadonlyDevice + baseHeader *header.Header + peeker CachePeeker + + blockSize int64 + bestEffort bool + budget DedupBudget +} + +// dedupAccum is the mutable running state of a dedup comparison. compareBlock +// accumulates each block's results into it in place. +type dedupAccum struct { + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 + currentStoredPages int64 +} + // dedupCompare classifies each dirty page against base into pageDirty or // pageEmpty. Per-page IsCached so a single uncached neighbour can't poison // cached pages of the same block when the parent header is page-granular. func dedupCompare( ctx context.Context, - src func(absOff int64) ([]byte, error), + src blockReader, base ReadonlyDevice, dirty *roaring.Bitmap, blockSize int64, bestEffort bool, budget DedupBudget, ) (*dedupPlan, error) { - pageDirty := roaring.New() - pageEmpty := roaring.New() - var exportedSize int64 - var promotedBlocks int64 - var promotedPages int64 - var currentStoredPages int64 - if budget.FetchRunWindowPages <= 0 { budget.FetchRunWindowPages = defaultDedupFetchWindowPages } - baseHeader := base.Header() peeker, _ := base.(CachePeeker) + cfg := dedupConfig{ + src: src, + base: base, + baseHeader: base.Header(), + peeker: peeker, + blockSize: blockSize, + bestEffort: bestEffort, + budget: budget, + } + acc := dedupAccum{ + pageDirty: roaring.New(), + pageEmpty: roaring.New(), + } for r := range BitsetRanges(dirty, blockSize) { - exportedSize += r.Size + acc.exportedSize += r.Size for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { - if err := ctx.Err(); err != nil { + if err := cfg.compareBlock(ctx, r.Start+chunkOff, &acc); err != nil { return nil, err } + } + } - absOff := r.Start + chunkOff - srcBuf, err := src(absOff) - if err != nil { - return nil, err - } + return &dedupPlan{ + pageDirty: acc.pageDirty, + pageEmpty: acc.pageEmpty, + exportedSize: acc.exportedSize, + promotedBlocks: acc.promotedBlocks, + promotedPages: acc.promotedPages, + }, nil +} - pagesPerBlock := int(blockSize / header.PageSize) - blockPages := make([]dedupPageInfo, pagesPerBlock) +// compareBlock classifies one block and accumulates its results into acc, +// mutating it in place. +func (c dedupConfig) compareBlock(ctx context.Context, absOff int64, acc *dedupAccum) error { + if err := ctx.Err(); err != nil { + return err + } - for page := range pagesPerBlock { - i := int64(page) * header.PageSize - srcPage := srcBuf[i : i+header.PageSize] - pageOff := absOff + i + srcBuf, err := c.src(absOff) + if err != nil { + return err + } - if header.IsZero(srcPage) { - continue - } + pagesPerBlock := int(c.blockSize / header.PageSize) + blockPages := make([]dedupPageInfo, pagesPerBlock) - var mapped header.BuildMap - hasMapping := false - if baseHeader != nil { - if m, err := baseHeader.GetShiftedMapping(ctx, pageOff); err == nil { - mapped = m - hasMapping = true - if m.BuildId == uuid.Nil && int64(m.Length) >= header.PageSize { - blockPages[page].kind = dedupPageCurrent - - continue - } - } - } - if bestEffort && peeker != nil && !peeker.IsCached(ctx, pageOff, header.PageSize) { - blockPages[page].kind = dedupPageCurrent + for page := range pagesPerBlock { + pageStart := int64(page) * header.PageSize + srcPage := srcBuf[pageStart : pageStart+header.PageSize] - continue - } + info, err := c.classifyPage(ctx, srcPage, absOff+pageStart) + if err != nil { + return err + } - basePage, sErr := base.Slice(ctx, pageOff, header.PageSize) - if sErr != nil { - return nil, fmt.Errorf("slice base at %d: %w", pageOff, sErr) - } - if bytes.Equal(srcPage, basePage) { - blockPages[page].kind = dedupPageParent - key := dedupFetchKey{sourceType: parentFetchSource} - if hasMapping { - key.buildID = mapped.BuildId - key.window = int(mapped.Offset / uint64(budget.FetchRunWindowPages*header.PageSize)) - } else { - key.window = int(pageOff / int64(budget.FetchRunWindowPages*header.PageSize)) - } - blockPages[page].key = key + blockPages[page] = info + } - continue - } + promoted := c.promoteBlockPages(blockPages, acc.currentStoredPages) + if promoted > 0 { + acc.promotedBlocks++ + acc.promotedPages += int64(promoted) + } - blockPages[page].kind = dedupPageCurrent - } + acc.currentStoredPages += recordBlockPages(absOff, blockPages, acc.pageDirty, acc.pageEmpty) - promoted := compactDedupFetchWindows(blockPages, budget.MaxFetchWindowsPerBlock, budget.MaxPromotedParentPagesPerBlock, budget.FetchRunWindowPages, currentStoredPages) - if promoted > 0 { - promotedBlocks++ - promotedPages += int64(promoted) - } + return nil +} - for page, info := range blockPages { - pageIdx := uint32(absOff/header.PageSize) + uint32(page) - switch info.kind { - case dedupPageEmpty: - pageEmpty.Add(pageIdx) - case dedupPageCurrent: - pageDirty.Add(pageIdx) - currentStoredPages++ - } - } +func (c dedupConfig) classifyPage(ctx context.Context, srcPage []byte, pageOff int64) (dedupPageInfo, error) { + if header.IsZero(srcPage) { + return dedupPageInfo{}, nil + } + + mapped, err := c.baseHeader.GetShiftedMapping(ctx, pageOff) + hasMapping := err == nil + + if hasMapping && mapped.BuildId == uuid.Nil && int64(mapped.Length) >= header.PageSize { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + if c.skipUncachedPage(ctx, pageOff) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + basePage, err := c.base.Slice(ctx, pageOff, header.PageSize) + if err != nil { + return dedupPageInfo{}, fmt.Errorf("slice base at %d: %w", pageOff, err) + } + + if !bytes.Equal(srcPage, basePage) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + windowBytes := c.budget.FetchRunWindowPages * header.PageSize + key := dedupFetchKey{sourceType: parentFetchSource} + if hasMapping { + key.buildID = mapped.BuildId + key.window = int(mapped.Offset / uint64(windowBytes)) + } else { + key.window = int(pageOff / int64(windowBytes)) + } + + return dedupPageInfo{kind: dedupPageParent, key: key}, nil +} + +func (c dedupConfig) skipUncachedPage(ctx context.Context, pageOff int64) bool { + return c.bestEffort && c.peeker != nil && !c.peeker.IsCached(ctx, pageOff, header.PageSize) +} + +func (c dedupConfig) promoteBlockPages(blockPages []dedupPageInfo, currentStoredPages int64) int { + w := fetchWindower{ + windowPages: c.budget.FetchRunWindowPages, + currentStart: currentStoredPages, + } + + return w.compact(blockPages, c.budget.MaxFetchWindowsPerBlock, c.budget.MaxPromotedParentPagesPerBlock) +} + +// recordBlockPages writes this block's classified pages into the diff bitmaps +// and returns how many current pages were stored. +func recordBlockPages(absOff int64, blockPages []dedupPageInfo, pageDirty, pageEmpty *roaring.Bitmap) int64 { + var storedPages int64 + for page, info := range blockPages { + pageIdx := uint32(absOff/header.PageSize) + uint32(page) + switch info.kind { + case dedupPageEmpty: + pageEmpty.Add(pageIdx) + case dedupPageCurrent: + pageDirty.Add(pageIdx) + storedPages++ } } - return &dedupPlan{ - pageDirty: pageDirty, - pageEmpty: pageEmpty, - exportedSize: exportedSize, - promotedBlocks: promotedBlocks, - promotedPages: promotedPages, - }, nil + return storedPages } -func compactDedupFetchWindows(pages []dedupPageInfo, maxWindows, maxPromoted, windowPages int, currentStart int64) int { +// fetchWindower groups pages into fetch-run windows. windowPages and +// currentStart are invariant for the lifetime of a compact pass, so they live +// on the receiver instead of being threaded through every call. +type fetchWindower struct { + windowPages int + currentStart int64 +} + +// compact promotes parent pages to current until the block fits within +// maxWindows fetch windows or the promotion budget is exhausted. +func (w fetchWindower) compact(pages []dedupPageInfo, maxWindows, maxPromoted int) int { if maxWindows <= 0 || maxPromoted <= 0 { return 0 } var promoted int - for promoted < maxPromoted && countFetchWindows(pages, windowPages, currentStart) > maxWindows { - idxs := bestParentRunToPromote(pages, maxPromoted-promoted, windowPages, currentStart) + for promoted < maxPromoted && w.count(pages) > maxWindows { + idxs := w.bestParentRun(pages, maxPromoted-promoted) if len(idxs) == 0 { break } @@ -393,7 +468,7 @@ func compactDedupFetchWindows(pages []dedupPageInfo, maxWindows, maxPromoted, wi return promoted } -func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int64) int { +func (w fetchWindower) count(pages []dedupPageInfo) int { keys := make(map[dedupFetchKey]struct{}) var currentOrdinal int64 for _, p := range pages { @@ -401,7 +476,10 @@ func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int6 case dedupPageParent: keys[p.key] = struct{}{} case dedupPageCurrent: - keys[dedupFetchKey{sourceType: currentFetchSource, window: int(currentStart+currentOrdinal) / windowPages}] = struct{}{} + keys[dedupFetchKey{ + sourceType: currentFetchSource, + window: int(w.currentStart+currentOrdinal) / w.windowPages, + }] = struct{}{} currentOrdinal++ } } @@ -409,32 +487,29 @@ func countFetchWindows(pages []dedupPageInfo, windowPages int, currentStart int6 return len(keys) } -// bestParentRunToPromote returns the parent page indices whose promotion to -// current best reduces fetch windows per promoted page, scanning contiguous -// parent/empty runs first and falling back to per-key promotion. -func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64) []int { - before := countFetchWindows(pages, windowPages, currentStart) +// bestParentRun returns the parent page indices whose promotion to current best +// reduces fetch windows per promoted page, scanning contiguous parent/empty +// runs first and falling back to per-key promotion. +func (w fetchWindower) bestParentRun(pages []dedupPageInfo, budget int) []int { + before := w.count(pages) + if best := w.bestByRatio(pages, budget, before, parentRuns(pages)); best != nil { + return best + } + + return w.bestByRatio(pages, budget, before, parentKeyGroups(pages)) +} + +// bestByRatio picks the candidate page set whose promotion removes the most +// fetch windows per promoted page (highest benefit/cost), within budget. +func (w fetchWindower) bestByRatio(pages []dedupPageInfo, budget, before int, candidates iter.Seq[[]int]) []int { var best []int bestBenefit, bestCost := 0, 0 - for i := 0; i < len(pages); { - for i < len(pages) && pages[i].kind != dedupPageParent { - i++ - } - start := i - for i < len(pages) && (pages[i].kind == dedupPageParent || pages[i].kind == dedupPageEmpty) { - i++ - } - var idxs []int - for j := start; j < i; j++ { - if pages[j].kind == dedupPageParent { - idxs = append(idxs, j) - } - } + for idxs := range candidates { cost := len(idxs) if cost == 0 || cost > budget { continue } - benefit := before - countFetchWindowsAfter(pages, idxs, windowPages, currentStart) + benefit := before - w.countAfter(pages, idxs) if benefit <= 0 { continue } @@ -442,14 +517,40 @@ func bestParentRunToPromote(pages []dedupPageInfo, budget, windowPages int, curr best, bestBenefit, bestCost = idxs, benefit, cost } } - if best == nil { - return bestParentKeyToPromote(pages, budget, windowPages, currentStart, before) - } return best } -func bestParentKeyToPromote(pages []dedupPageInfo, budget, windowPages int, currentStart int64, before int) []int { +// parentRuns yields the parent page indices of each maximal run of parent/empty +// pages. A run starts at a parent page and extends across adjacent parent and +// empty pages; current pages and the slice ends bound it. +func parentRuns(pages []dedupPageInfo) iter.Seq[[]int] { + return func(yield func([]int) bool) { + var run []int + for i, p := range pages { + switch p.kind { + case dedupPageParent: + run = append(run, i) + case dedupPageEmpty: + // Empties extend an in-progress run but never start one. + default: // dedupPageCurrent + if len(run) > 0 && !yield(run) { + return + } + run = nil + } + } + if len(run) > 0 { + yield(run) + } + } +} + +// parentKeyGroups yields the parent page indices grouped by fetch key, so a set +// of non-adjacent parents sharing one fetch window can be promoted together. +// Groups are ordered by their first page index so the selection is +// deterministic regardless of map iteration order. +func parentKeyGroups(pages []dedupPageInfo) iter.Seq[[]int] { idxByKey := make(map[dedupFetchKey][]int) for i, p := range pages { if p.kind == dedupPageParent { @@ -457,41 +558,30 @@ func bestParentKeyToPromote(pages []dedupPageInfo, budget, windowPages int, curr } } - var best []int - bestBenefit, bestCost := 0, 0 - for _, idxs := range idxByKey { - cost := len(idxs) - if cost > budget { - continue - } - benefit := before - countFetchWindowsAfter(pages, idxs, windowPages, currentStart) - if benefit <= 0 { - continue - } - if best == nil || cost*bestBenefit < bestCost*benefit { - best, bestBenefit, bestCost = idxs, benefit, cost - } - } + groups := slices.Collect(maps.Values(idxByKey)) + slices.SortFunc(groups, func(a, b []int) int { + return a[0] - b[0] + }) - return best + return slices.Values(groups) } -// countFetchWindowsAfter counts fetch windows as if the given parent indices -// were promoted to current, without mutating pages. -func countFetchWindowsAfter(pages []dedupPageInfo, promote []int, windowPages int, currentStart int64) int { +// countAfter counts fetch windows as if the given parent indices were promoted +// to current, without mutating pages. +func (w fetchWindower) countAfter(pages []dedupPageInfo, promote []int) int { candidate := slices.Clone(pages) for _, i := range promote { candidate[i].kind = dedupPageCurrent candidate[i].key = dedupFetchKey{} } - return countFetchWindows(candidate, windowPages, currentStart) + return w.count(candidate) } // dedupDrain writes pageDirty pages from src to outPath packed at PageSize. func dedupDrain( ctx context.Context, - src func(absOff int64) ([]byte, error), + src blockReader, pageDirty *roaring.Bitmap, blockSize int64, outPath string, @@ -556,7 +646,7 @@ func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages, // drainDirtyPages packs pageDirty pages from src into fd. Mirrors // Cache.copyProcessMemory: coalesce contiguous pages into ranges, carve at // source-block boundaries, pre-split over MAX_RW_COUNT, then drainIovs. -func drainDirtyPages(ctx context.Context, fd int, src func(absOff int64) ([]byte, error), pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { +func drainDirtyPages(ctx context.Context, fd int, src blockReader, pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { var ranges []Range for r := range BitsetRanges(pageDirty, header.PageSize) { for off := r.Start; off < r.End(); { From a2bec87676eedca777e05303aa4be0c937e6122c Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 00:54:27 -0700 Subject: [PATCH 08/13] test(orch): cover dedup page classification helpers Add unit tests for the extracted dedup helpers: parentRuns segmentation (empties bridge runs, current pages and ends bound them), parentKeyGroups deterministic ordering, fetchWindower count/bestParentRun including the key-group fallback, and compareBlock in-place accumulation. --- .../pkg/sandbox/block/dedup_test.go | 287 ++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 packages/orchestrator/pkg/sandbox/block/dedup_test.go diff --git a/packages/orchestrator/pkg/sandbox/block/dedup_test.go b/packages/orchestrator/pkg/sandbox/block/dedup_test.go new file mode 100644 index 0000000000..a97c85735d --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/block/dedup_test.go @@ -0,0 +1,287 @@ +//go:build linux + +package block + +import ( + "slices" + "testing" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" +) + +func parentPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageParent} } +func currentPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageCurrent} } +func emptyPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageEmpty} } + +func collectRuns(seq func(func([]int) bool)) [][]int { + var runs [][]int + for r := range seq { + runs = append(runs, slices.Clone(r)) + } + + return runs +} + +func TestParentRuns(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + pages []dedupPageInfo + want [][]int + }{ + { + name: "empty input", + pages: nil, + want: nil, + }, + { + name: "no parents", + pages: []dedupPageInfo{currentPage(), emptyPage(), currentPage()}, + want: nil, + }, + { + name: "single parent", + pages: []dedupPageInfo{parentPage()}, + want: [][]int{{0}}, + }, + { + name: "contiguous parents are one run", + pages: []dedupPageInfo{parentPage(), parentPage(), parentPage()}, + want: [][]int{{0, 1, 2}}, + }, + { + name: "current page splits runs", + pages: []dedupPageInfo{parentPage(), currentPage(), parentPage()}, + want: [][]int{{0}, {2}}, + }, + { + // Empty pages are virtual (never stored, no fetch window) so they + // keep surrounding parents in the same run. + name: "empty page bridges parents into one run", + pages: []dedupPageInfo{parentPage(), emptyPage(), parentPage()}, + want: [][]int{{0, 2}}, + }, + { + name: "leading empty does not start a run", + pages: []dedupPageInfo{emptyPage(), parentPage()}, + want: [][]int{{1}}, + }, + { + name: "trailing empty is dropped from run", + pages: []dedupPageInfo{parentPage(), emptyPage()}, + want: [][]int{{0}}, + }, + { + name: "mixed runs separated by current", + pages: []dedupPageInfo{ + parentPage(), emptyPage(), parentPage(), // run {0,2} + currentPage(), + parentPage(), // run {4} + }, + want: [][]int{{0, 2}, {4}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got := collectRuns(parentRuns(tt.pages)) + require.Equal(t, tt.want, got) + }) + } +} + +func TestParentRuns_EarlyStopOnYieldFalse(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentPage(), currentPage(), parentPage(), currentPage(), parentPage(), + } + + var seen [][]int + for r := range parentRuns(pages) { + seen = append(seen, slices.Clone(r)) + + break // consumer stops after the first run + } + + require.Equal(t, [][]int{{0}}, seen) +} + +// compareBlock accumulates a block's results into the accumulator in place. +func TestCompareBlock_AccumulatesInPlace(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + + // An all-zero source block makes every page classify as empty via the + // header.IsZero fast path, so no base device read is needed. + srcBlock := make([]byte, blockSize) + + cfg := dedupConfig{ + src: func(int64) ([]byte, error) { return srcBlock, nil }, + blockSize: blockSize, + budget: DedupBudget{FetchRunWindowPages: 4}, + } + + acc := dedupAccum{ + pageDirty: roaring.New(), + pageEmpty: roaring.New(), + exportedSize: 7, // pre-existing value compareBlock must not touch + } + + err := cfg.compareBlock(t.Context(), 0, &acc) + require.NoError(t, err) + + // The four all-zero pages land in pageEmpty; nothing is stored as current. + require.EqualValues(t, 4, acc.pageEmpty.GetCardinality()) + require.EqualValues(t, 0, acc.pageDirty.GetCardinality()) + require.EqualValues(t, 0, acc.currentStoredPages, "all-empty block stores no current pages") + require.EqualValues(t, 0, acc.promotedBlocks) + require.EqualValues(t, 7, acc.exportedSize, "compareBlock does not touch exportedSize") +} + +// compareBlock accumulates across multiple calls into the same accumulator. +func TestCompareBlock_AccumulatesAcrossCalls(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + srcBlock := make([]byte, blockSize) + + cfg := dedupConfig{ + src: func(int64) ([]byte, error) { return srcBlock, nil }, + blockSize: blockSize, + budget: DedupBudget{FetchRunWindowPages: 4}, + } + + acc := dedupAccum{pageDirty: roaring.New(), pageEmpty: roaring.New()} + + require.NoError(t, cfg.compareBlock(t.Context(), 0, &acc)) + require.NoError(t, cfg.compareBlock(t.Context(), blockSize, &acc)) + + // Two all-zero blocks => 8 distinct empty page indices accumulated. + require.EqualValues(t, 8, acc.pageEmpty.GetCardinality()) +} + +func TestParentKeyGroups(t *testing.T) { + t.Parallel() + + keyA := dedupFetchKey{sourceType: parentFetchSource, window: 1} + keyB := dedupFetchKey{sourceType: parentFetchSource, window: 2} + + pages := []dedupPageInfo{ + {kind: dedupPageParent, key: keyA}, + {kind: dedupPageCurrent}, + {kind: dedupPageParent, key: keyB}, + {kind: dedupPageParent, key: keyA}, + {kind: dedupPageEmpty}, + } + + // Groups are emitted ordered by their first page index, so the order is + // stable across runs despite the underlying map. keyA's first index (0) + // precedes keyB's (2). + got := slices.Collect(parentKeyGroups(pages)) + require.Equal(t, [][]int{{0, 3}, {2}}, got) +} + +// parentKeyPage returns a parent page whose fetch window is derived from +// byteOff, so count() groups same-window parents together. +func parentKeyPage(buildID uuid.UUID, byteOff int64, windowPages int) dedupPageInfo { + return dedupPageInfo{ + kind: dedupPageParent, + key: dedupFetchKey{ + sourceType: parentFetchSource, + buildID: buildID, + window: int(byteOff / (int64(windowPages) * header.PageSize)), + }, + } +} + +func TestFetchWindowerCount(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + t.Run("empty pages contribute no windows", func(t *testing.T) { + t.Parallel() + + require.Equal(t, 0, w.count([]dedupPageInfo{emptyPage(), emptyPage()})) + }) + + t.Run("parents in same window collapse to one", func(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, header.PageSize, windowPages), + } + require.Equal(t, 1, w.count(pages)) + }) + + t.Run("parents in different windows count separately", func(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + } + require.Equal(t, 2, w.count(pages)) + }) +} + +func TestFetchWindowerBestParentRun(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + + t.Run("no parents yields nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{currentPage(), currentPage()} + require.Nil(t, w.bestParentRun(pages, 4)) + }) + + t.Run("budget too small to remove a window yields nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two parents in distinct windows; promoting one of them costs 1 but the + // other still needs its window, and the promoted page joins a current + // window, so a single promotion does not reduce the total. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + } + require.Nil(t, w.bestParentRun(pages, 0)) + }) + + t.Run("falls back to key grouping for non-adjacent parents", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two parents sharing one window but separated by a current page: no + // contiguous run covers both, so the key-group fallback selects them. + sharedOff0 := int64(0) + sharedOff1 := int64(header.PageSize) + pages := []dedupPageInfo{ + parentKeyPage(build, sharedOff0, windowPages), + currentPage(), + parentKeyPage(build, sharedOff1, windowPages), + } + + got := w.bestParentRun(pages, 4) + require.Equal(t, []int{0, 2}, got) + }) +} From 97ef21a700a9851763a668208dc89496dd3fb70d Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 00:58:56 -0700 Subject: [PATCH 09/13] test(orch): cover dedup budget exhaustion Verify that when the promotion budget is zero, parent pages matching the base stay deduped even if the block exceeds MaxFetchWindowsPerBlock, rather than being over-promoted into the diff. --- .../pkg/sandbox/block/cache_test.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/packages/orchestrator/pkg/sandbox/block/cache_test.go b/packages/orchestrator/pkg/sandbox/block/cache_test.go index fe46f50fe4..97da903fa7 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache_test.go +++ b/packages/orchestrator/pkg/sandbox/block/cache_test.go @@ -1175,3 +1175,39 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { require.True(t, meta.Dirty.Contains(1)) require.True(t, meta.Dirty.Contains(3)) } + +// With no promotion budget, parent pages that match the base stay deduped even +// when the block exceeds MaxFetchWindowsPerBlock: the compaction can't spend +// any promotions, so it must leave the parents out of the diff rather than +// over-promote them into Dirty. +func TestCacheDedup_BudgetExhaustionKeepsParentsDeduped(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + size := blockSize + + baseData := make([]byte, size) + _, err := rand.Read(baseData) + require.NoError(t, err) + srcData := make([]byte, size) + copy(srcData, baseData) + // Pages 0 and 2 differ (current); pages 1 and 3 match base (parent). + srcData[0] ^= 0xFF + srcData[2*pageSize] ^= 0xFF + + dirty := fullDirty(size, blockSize) + src := buildPackedSrcCache(t, srcData, dirty, blockSize) + + // MaxFetchWindowsPerBlock unsatisfiable, but no promotions allowed. + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{MaxFetchWindowsPerBlock: 0, MaxPromotedParentPagesPerBlock: 0, FetchRunWindowPages: 4}) + require.NoError(t, err) + t.Cleanup(func() { _ = cache.Close() }) + + // Only the two genuinely differing pages are stored; matching parents are + // deduped away (not promoted), and nothing is empty. + require.EqualValues(t, 2, meta.Dirty.GetCardinality()) + require.True(t, meta.Dirty.Contains(0)) + require.True(t, meta.Dirty.Contains(2)) + require.EqualValues(t, 0, meta.Empty.GetCardinality()) +} From c694a9a71695d2a5f34c7f73bd20bffe97b47112 Mon Sep 17 00:00:00 2001 From: Jakub Dobry Date: Sun, 31 May 2026 01:01:32 -0700 Subject: [PATCH 10/13] test(orch): cover recordBlockPages and compact helpers Add direct unit tests for recordBlockPages (bitmap routing, absOff page index offset, stored-page count) and fetchWindower.compact (budget guards, no-op when within window budget, parent-run promotion). --- .../pkg/sandbox/block/dedup_test.go | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/packages/orchestrator/pkg/sandbox/block/dedup_test.go b/packages/orchestrator/pkg/sandbox/block/dedup_test.go index a97c85735d..e2129c12eb 100644 --- a/packages/orchestrator/pkg/sandbox/block/dedup_test.go +++ b/packages/orchestrator/pkg/sandbox/block/dedup_test.go @@ -285,3 +285,131 @@ func TestFetchWindowerBestParentRun(t *testing.T) { require.Equal(t, []int{0, 2}, got) }) } + +func TestRecordBlockPages(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + + tests := []struct { + name string + absOff int64 + pages []dedupPageInfo + wantDirty []uint32 + wantEmpty []uint32 + wantStored int64 + }{ + { + name: "empty input writes nothing", + absOff: 0, + pages: nil, + wantDirty: nil, + wantEmpty: nil, + wantStored: 0, + }, + { + name: "mixed kinds route to the right bitmap", + absOff: 0, + pages: []dedupPageInfo{ + emptyPage(), // page 0 -> empty + currentPage(), // page 1 -> dirty + parentPage(), // page 2 -> neither (deduped) + currentPage(), // page 3 -> dirty + }, + wantDirty: []uint32{1, 3}, + wantEmpty: []uint32{0}, + wantStored: 2, + }, + { + name: "absOff offsets the page index", + absOff: 4 * pageSize, // base page index 4 + pages: []dedupPageInfo{ + currentPage(), // -> index 4 + emptyPage(), // -> index 5 + }, + wantDirty: []uint32{4}, + wantEmpty: []uint32{5}, + wantStored: 1, + }, + { + name: "all parents store nothing", + absOff: 0, + pages: []dedupPageInfo{parentPage(), parentPage()}, + // parents go into neither bitmap and are not counted. + wantStored: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + dirty := roaring.New() + empty := roaring.New() + + stored := recordBlockPages(tt.absOff, tt.pages, dirty, empty) + + require.Equal(t, tt.wantStored, stored) + require.ElementsMatch(t, tt.wantDirty, dirty.ToArray()) + require.ElementsMatch(t, tt.wantEmpty, empty.ToArray()) + }) + } +} + +func TestFetchWindowerCompact(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + + t.Run("non-positive maxWindows promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{parentKeyPage(build, 0, windowPages)} + require.Equal(t, 0, w.compact(pages, 0, 4)) + require.Equal(t, dedupPageParent, pages[0].kind, "page must stay parent") + }) + + t.Run("non-positive maxPromoted promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{parentKeyPage(build, 0, windowPages)} + require.Equal(t, 0, w.compact(pages, 1, 0)) + require.Equal(t, dedupPageParent, pages[0].kind) + }) + + t.Run("already within window budget promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // A single parent window already satisfies maxWindows=1. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, header.PageSize, windowPages), + } + require.Equal(t, 0, w.compact(pages, 1, 4)) + require.Equal(t, dedupPageParent, pages[0].kind) + require.Equal(t, dedupPageParent, pages[1].kind) + }) + + t.Run("promotes a parent run to meet the window budget", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two current pages in distinct windows plus one parent: two windows + // total, over maxWindows=1. Promoting the lone parent removes its + // parent window by folding it into a current window. + pages := []dedupPageInfo{ + currentPage(), + parentKeyPage(build, 0, windowPages), + currentPage(), + } + + promoted := w.compact(pages, 1, 4) + require.Positive(t, promoted) + require.Equal(t, dedupPageCurrent, pages[1].kind, "parent must be promoted") + require.Equal(t, dedupFetchKey{}, pages[1].key, "promoted key is cleared") + }) +} From 9d5eb2969499ae6ebfcf6ac967a68b37992a7595 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sun, 31 May 2026 19:51:35 -0700 Subject: [PATCH 11/13] test(orch): cover dedup page classification and window ratio Add direct unit tests for classifyPage (empty/parent/current routing, mapping vs offset fetch-window keying, best-effort uncached skip) and the fetchWindower countAfter/bestByRatio budget and benefit gating. --- .../pkg/sandbox/block/dedup_test.go | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/packages/orchestrator/pkg/sandbox/block/dedup_test.go b/packages/orchestrator/pkg/sandbox/block/dedup_test.go index e2129c12eb..7e3b1d0067 100644 --- a/packages/orchestrator/pkg/sandbox/block/dedup_test.go +++ b/packages/orchestrator/pkg/sandbox/block/dedup_test.go @@ -3,6 +3,7 @@ package block import ( + "bytes" "slices" "testing" @@ -413,3 +414,178 @@ func TestFetchWindowerCompact(t *testing.T) { require.Equal(t, dedupFetchKey{}, pages[1].key, "promoted key is cleared") }) } + +func TestFetchWindowerCountAfter(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + // One parent window plus one current window => two windows. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + currentPage(), + } + require.Equal(t, 2, w.count(pages)) + + // Promoting the parent folds it into the current window: one window left. + require.Equal(t, 1, w.countAfter(pages, []int{0})) + + // countAfter must not mutate the input slice. + require.Equal(t, dedupPageParent, pages[0].kind, "input page kind unchanged") + require.Equal(t, + dedupFetchKey{sourceType: parentFetchSource, buildID: build, window: 0}, + pages[0].key, + "input page key unchanged", + ) +} + +func TestFetchWindowerBestByRatio(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + // current, parent (window 0), current: the parent is its own fetch window, + // so promoting it into the surrounding current window removes one window. + pages := []dedupPageInfo{ + currentPage(), + parentKeyPage(build, 0, windowPages), + currentPage(), + } + before := w.count(pages) + require.Equal(t, 2, before) + + t.Run("candidate over budget is skipped", func(t *testing.T) { + t.Parallel() + + got := w.bestByRatio(pages, 0, before, slices.Values([][]int{{1}})) + require.Nil(t, got, "cost 1 exceeds budget 0") + }) + + t.Run("beneficial candidate within budget is selected", func(t *testing.T) { + t.Parallel() + + got := w.bestByRatio(pages, windowPages, before, slices.Values([][]int{{1}})) + require.Equal(t, []int{1}, got) + }) + + t.Run("zero-benefit candidate is skipped", func(t *testing.T) { + t.Parallel() + + // Index 0 is already current; promoting it removes no fetch window. + got := w.bestByRatio(pages, windowPages, before, slices.Values([][]int{{0}})) + require.Nil(t, got) + }) +} + +// classifyPage maps a source page to empty/parent/current based on the parent +// header mapping, the cache-peek (best-effort) check, and the base bytes. +func TestClassifyPage(t *testing.T) { + t.Parallel() + + const windowPages = 4 + pageSize := int64(header.PageSize) + build := uuid.New() + + zeroPage := make([]byte, header.PageSize) + nonZero := bytes.Repeat([]byte{0xAB}, header.PageSize) + budget := DedupBudget{FetchRunWindowPages: windowPages} + + t.Run("zero source page is empty without reading base", func(t *testing.T) { + t.Parallel() + + base := &fakeOriginalDevice{data: make([]byte, pageSize)} + cfg := dedupConfig{base: base, baseHeader: base.Header(), blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), zeroPage, 0) + require.NoError(t, err) + require.Equal(t, dedupPageEmpty, info.kind) + require.Zero(t, base.reads, "zero fast path must not read base") + }) + + t.Run("nil-build parent hole is current without reading base", func(t *testing.T) { + t.Parallel() + + hdr, err := header.NewHeader( + header.NewTemplateMetadata(uuid.Nil, uint64(pageSize), uint64(pageSize)), + []header.BuildMap{{Offset: 0, Length: uint64(pageSize), BuildId: uuid.Nil}}, + ) + require.NoError(t, err) + base := &fakeOriginalDevice{data: bytes.Repeat([]byte{0xFF}, int(pageSize)), hdr: hdr} + cfg := dedupConfig{base: base, baseHeader: hdr, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Zero(t, base.reads, "nil-build hole must not read base") + }) + + t.Run("page matching mapped parent is keyed by build and storage window", func(t *testing.T) { + t.Parallel() + + hdr, err := header.NewHeader( + header.NewTemplateMetadata(build, uint64(pageSize), uint64(pageSize)), + []header.BuildMap{{Offset: 0, Length: uint64(pageSize), BuildId: build}}, + ) + require.NoError(t, err) + base := &fakeOriginalDevice{data: nonZero, hdr: hdr} + cfg := dedupConfig{base: base, baseHeader: hdr, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageParent, info.kind) + require.Equal(t, + dedupFetchKey{sourceType: parentFetchSource, buildID: build, window: 0}, + info.key, + ) + require.Equal(t, 1, base.reads, "matching path reads base once") + }) + + t.Run("page matching unmapped parent is keyed by offset window", func(t *testing.T) { + t.Parallel() + + pageOff := int64(8) * pageSize + data := make([]byte, 9*pageSize) + copy(data[pageOff:], nonZero) + base := &fakeOriginalDevice{data: data} // nil header -> no mapping + cfg := dedupConfig{base: base, baseHeader: nil, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, pageOff) + require.NoError(t, err) + require.Equal(t, dedupPageParent, info.kind) + require.Equal(t, + dedupFetchKey{ + sourceType: parentFetchSource, + window: int(pageOff / (int64(windowPages) * pageSize)), + }, + info.key, + ) + }) + + t.Run("page differing from base is current", func(t *testing.T) { + t.Parallel() + + base := &fakeOriginalDevice{data: bytes.Repeat([]byte{0x11}, int(pageSize))} + cfg := dedupConfig{base: base, baseHeader: nil, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Equal(t, 1, base.reads, "diff path reads base once") + }) + + t.Run("best-effort uncached page is current without reading base", func(t *testing.T) { + t.Parallel() + + base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: nonZero}, cached: false} + cfg := dedupConfig{base: base, baseHeader: nil, peeker: base, blockSize: pageSize, bestEffort: true, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Zero(t, base.reads, "uncached best-effort page must not read base") + }) +} From c4b6e2c02b093fa9a601f3b85691bf6002979635 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sun, 31 May 2026 20:23:29 -0700 Subject: [PATCH 12/13] consider affordable prefix of over-budget parent runs --- packages/orchestrator/pkg/sandbox/block/cache.go | 7 ++++++- .../orchestrator/pkg/sandbox/block/dedup_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index a8c6ef63f2..6edca2c86d 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -505,8 +505,13 @@ func (w fetchWindower) bestByRatio(pages []dedupPageInfo, budget, before int, ca var best []int bestBenefit, bestCost := 0, 0 for idxs := range candidates { + // An over-budget candidate can still help via an affordable prefix; the + // benefit check below discards prefixes that don't remove a window. + if len(idxs) > budget { + idxs = idxs[:budget] + } cost := len(idxs) - if cost == 0 || cost > budget { + if cost == 0 { continue } benefit := before - w.countAfter(pages, idxs) diff --git a/packages/orchestrator/pkg/sandbox/block/dedup_test.go b/packages/orchestrator/pkg/sandbox/block/dedup_test.go index 7e3b1d0067..fe75b33e57 100644 --- a/packages/orchestrator/pkg/sandbox/block/dedup_test.go +++ b/packages/orchestrator/pkg/sandbox/block/dedup_test.go @@ -479,6 +479,21 @@ func TestFetchWindowerBestByRatio(t *testing.T) { got := w.bestByRatio(pages, windowPages, before, slices.Values([][]int{{0}})) require.Nil(t, got) }) + + t.Run("over-budget run is clamped to an affordable, beneficial prefix", func(t *testing.T) { + t.Parallel() + + // Three parent pages in three distinct single-page windows: the whole + // run costs 3 but budget is 2. Clamping to the first two folds them into + // one current window, leaving one parent window (3 -> 2 windows). + run := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + parentKeyPage(build, 2*int64(windowPages)*header.PageSize, windowPages), + } + got := w.bestByRatio(run, 2, w.count(run), slices.Values([][]int{{0, 1, 2}})) + require.Equal(t, []int{0, 1}, got) + }) } // classifyPage maps a source page to empty/parent/current based on the parent From 146c0071b1053fa543cc3a8ff5e1069afd1c412a Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sun, 31 May 2026 22:53:37 -0700 Subject: [PATCH 13/13] refactor(orch): move memfile dedup into its own file Extract the dedup classification, fetch-window promotion, and drain logic out of the generic block cache into dedup.go (same package, no behavior change), leaving Cache.Dedup as the hook so the strategy is contained and easier to swap or compare later. --- .../orchestrator/pkg/sandbox/block/cache.go | 547 ----------------- .../orchestrator/pkg/sandbox/block/dedup.go | 566 ++++++++++++++++++ 2 files changed, 566 insertions(+), 547 deletions(-) create mode 100644 packages/orchestrator/pkg/sandbox/block/dedup.go diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 6edca2c86d..d3d030a5c5 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -3,32 +3,25 @@ package block import ( - "bytes" "context" "errors" "fmt" "io" - "iter" - "maps" "math" "math/rand" "os" - "slices" "sync" "sync/atomic" "syscall" "time" - "github.com/RoaringBitmap/roaring/v2" "github.com/edsrzf/mmap-go" - "github.com/google/uuid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "golang.org/x/sys/unix" "github.com/e2b-dev/infra/packages/shared/pkg/logger" - "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -210,546 +203,6 @@ func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMet return diffMetadata, nil } -type dedupPlan struct { - pageDirty *roaring.Bitmap - pageEmpty *roaring.Bitmap - exportedSize int64 - promotedBlocks int64 - promotedPages int64 -} - -type DedupBudget struct { - MaxFetchWindowsPerBlock int - MaxPromotedParentPagesPerBlock int - FetchRunWindowPages int -} - -type dedupPageKind byte - -const ( - dedupPageEmpty dedupPageKind = iota - dedupPageParent - dedupPageCurrent -) - -type fetchSource byte - -const ( - currentFetchSource fetchSource = iota + 1 - parentFetchSource -) - -const ( - defaultDedupFetchWindowPages = storage.DefaultCompressFrameSize / header.PageSize -) - -type dedupFetchKey struct { - sourceType fetchSource - buildID uuid.UUID - window int -} - -type dedupPageInfo struct { - kind dedupPageKind - key dedupFetchKey -} - -// blockReader returns the source bytes for the block at absOff. -type blockReader func(absOff int64) ([]byte, error) - -// dedupConfig holds the immutable inputs of a dedup comparison. Its methods -// only read it; all mutable accumulation lives in dedupAccum. -type dedupConfig struct { - src blockReader - base ReadonlyDevice - baseHeader *header.Header - peeker CachePeeker - - blockSize int64 - bestEffort bool - budget DedupBudget -} - -// dedupAccum is the mutable running state of a dedup comparison. compareBlock -// accumulates each block's results into it in place. -type dedupAccum struct { - pageDirty *roaring.Bitmap - pageEmpty *roaring.Bitmap - exportedSize int64 - promotedBlocks int64 - promotedPages int64 - currentStoredPages int64 -} - -// dedupCompare classifies each dirty page against base into pageDirty or -// pageEmpty. Per-page IsCached so a single uncached neighbour can't poison -// cached pages of the same block when the parent header is page-granular. -func dedupCompare( - ctx context.Context, - src blockReader, - base ReadonlyDevice, - dirty *roaring.Bitmap, - blockSize int64, - bestEffort bool, - budget DedupBudget, -) (*dedupPlan, error) { - if budget.FetchRunWindowPages <= 0 { - budget.FetchRunWindowPages = defaultDedupFetchWindowPages - } - - peeker, _ := base.(CachePeeker) - cfg := dedupConfig{ - src: src, - base: base, - baseHeader: base.Header(), - peeker: peeker, - blockSize: blockSize, - bestEffort: bestEffort, - budget: budget, - } - acc := dedupAccum{ - pageDirty: roaring.New(), - pageEmpty: roaring.New(), - } - - for r := range BitsetRanges(dirty, blockSize) { - acc.exportedSize += r.Size - - for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { - if err := cfg.compareBlock(ctx, r.Start+chunkOff, &acc); err != nil { - return nil, err - } - } - } - - return &dedupPlan{ - pageDirty: acc.pageDirty, - pageEmpty: acc.pageEmpty, - exportedSize: acc.exportedSize, - promotedBlocks: acc.promotedBlocks, - promotedPages: acc.promotedPages, - }, nil -} - -// compareBlock classifies one block and accumulates its results into acc, -// mutating it in place. -func (c dedupConfig) compareBlock(ctx context.Context, absOff int64, acc *dedupAccum) error { - if err := ctx.Err(); err != nil { - return err - } - - srcBuf, err := c.src(absOff) - if err != nil { - return err - } - - pagesPerBlock := int(c.blockSize / header.PageSize) - blockPages := make([]dedupPageInfo, pagesPerBlock) - - for page := range pagesPerBlock { - pageStart := int64(page) * header.PageSize - srcPage := srcBuf[pageStart : pageStart+header.PageSize] - - info, err := c.classifyPage(ctx, srcPage, absOff+pageStart) - if err != nil { - return err - } - - blockPages[page] = info - } - - promoted := c.promoteBlockPages(blockPages, acc.currentStoredPages) - if promoted > 0 { - acc.promotedBlocks++ - acc.promotedPages += int64(promoted) - } - - acc.currentStoredPages += recordBlockPages(absOff, blockPages, acc.pageDirty, acc.pageEmpty) - - return nil -} - -func (c dedupConfig) classifyPage(ctx context.Context, srcPage []byte, pageOff int64) (dedupPageInfo, error) { - if header.IsZero(srcPage) { - return dedupPageInfo{}, nil - } - - mapped, err := c.baseHeader.GetShiftedMapping(ctx, pageOff) - hasMapping := err == nil - - if hasMapping && mapped.BuildId == uuid.Nil && int64(mapped.Length) >= header.PageSize { - return dedupPageInfo{kind: dedupPageCurrent}, nil - } - - if c.skipUncachedPage(ctx, pageOff) { - return dedupPageInfo{kind: dedupPageCurrent}, nil - } - - basePage, err := c.base.Slice(ctx, pageOff, header.PageSize) - if err != nil { - return dedupPageInfo{}, fmt.Errorf("slice base at %d: %w", pageOff, err) - } - - if !bytes.Equal(srcPage, basePage) { - return dedupPageInfo{kind: dedupPageCurrent}, nil - } - - windowBytes := c.budget.FetchRunWindowPages * header.PageSize - key := dedupFetchKey{sourceType: parentFetchSource} - if hasMapping { - key.buildID = mapped.BuildId - key.window = int(mapped.Offset / uint64(windowBytes)) - } else { - key.window = int(pageOff / int64(windowBytes)) - } - - return dedupPageInfo{kind: dedupPageParent, key: key}, nil -} - -func (c dedupConfig) skipUncachedPage(ctx context.Context, pageOff int64) bool { - return c.bestEffort && c.peeker != nil && !c.peeker.IsCached(ctx, pageOff, header.PageSize) -} - -func (c dedupConfig) promoteBlockPages(blockPages []dedupPageInfo, currentStoredPages int64) int { - w := fetchWindower{ - windowPages: c.budget.FetchRunWindowPages, - currentStart: currentStoredPages, - } - - return w.compact(blockPages, c.budget.MaxFetchWindowsPerBlock, c.budget.MaxPromotedParentPagesPerBlock) -} - -// recordBlockPages writes this block's classified pages into the diff bitmaps -// and returns how many current pages were stored. -func recordBlockPages(absOff int64, blockPages []dedupPageInfo, pageDirty, pageEmpty *roaring.Bitmap) int64 { - var storedPages int64 - for page, info := range blockPages { - pageIdx := uint32(absOff/header.PageSize) + uint32(page) - switch info.kind { - case dedupPageEmpty: - pageEmpty.Add(pageIdx) - case dedupPageCurrent: - pageDirty.Add(pageIdx) - storedPages++ - } - } - - return storedPages -} - -// fetchWindower groups pages into fetch-run windows. windowPages and -// currentStart are invariant for the lifetime of a compact pass, so they live -// on the receiver instead of being threaded through every call. -type fetchWindower struct { - windowPages int - currentStart int64 -} - -// compact promotes parent pages to current until the block fits within -// maxWindows fetch windows or the promotion budget is exhausted. -func (w fetchWindower) compact(pages []dedupPageInfo, maxWindows, maxPromoted int) int { - if maxWindows <= 0 || maxPromoted <= 0 { - return 0 - } - - var promoted int - for promoted < maxPromoted && w.count(pages) > maxWindows { - idxs := w.bestParentRun(pages, maxPromoted-promoted) - if len(idxs) == 0 { - break - } - for _, i := range idxs { - pages[i].kind = dedupPageCurrent - pages[i].key = dedupFetchKey{} - promoted++ - } - } - - return promoted -} - -func (w fetchWindower) count(pages []dedupPageInfo) int { - keys := make(map[dedupFetchKey]struct{}) - var currentOrdinal int64 - for _, p := range pages { - switch p.kind { - case dedupPageParent: - keys[p.key] = struct{}{} - case dedupPageCurrent: - keys[dedupFetchKey{ - sourceType: currentFetchSource, - window: int(w.currentStart+currentOrdinal) / w.windowPages, - }] = struct{}{} - currentOrdinal++ - } - } - - return len(keys) -} - -// bestParentRun returns the parent page indices whose promotion to current best -// reduces fetch windows per promoted page, scanning contiguous parent/empty -// runs first and falling back to per-key promotion. -func (w fetchWindower) bestParentRun(pages []dedupPageInfo, budget int) []int { - before := w.count(pages) - if best := w.bestByRatio(pages, budget, before, parentRuns(pages)); best != nil { - return best - } - - return w.bestByRatio(pages, budget, before, parentKeyGroups(pages)) -} - -// bestByRatio picks the candidate page set whose promotion removes the most -// fetch windows per promoted page (highest benefit/cost), within budget. -func (w fetchWindower) bestByRatio(pages []dedupPageInfo, budget, before int, candidates iter.Seq[[]int]) []int { - var best []int - bestBenefit, bestCost := 0, 0 - for idxs := range candidates { - // An over-budget candidate can still help via an affordable prefix; the - // benefit check below discards prefixes that don't remove a window. - if len(idxs) > budget { - idxs = idxs[:budget] - } - cost := len(idxs) - if cost == 0 { - continue - } - benefit := before - w.countAfter(pages, idxs) - if benefit <= 0 { - continue - } - if best == nil || cost*bestBenefit < bestCost*benefit { - best, bestBenefit, bestCost = idxs, benefit, cost - } - } - - return best -} - -// parentRuns yields the parent page indices of each maximal run of parent/empty -// pages. A run starts at a parent page and extends across adjacent parent and -// empty pages; current pages and the slice ends bound it. -func parentRuns(pages []dedupPageInfo) iter.Seq[[]int] { - return func(yield func([]int) bool) { - var run []int - for i, p := range pages { - switch p.kind { - case dedupPageParent: - run = append(run, i) - case dedupPageEmpty: - // Empties extend an in-progress run but never start one. - default: // dedupPageCurrent - if len(run) > 0 && !yield(run) { - return - } - run = nil - } - } - if len(run) > 0 { - yield(run) - } - } -} - -// parentKeyGroups yields the parent page indices grouped by fetch key, so a set -// of non-adjacent parents sharing one fetch window can be promoted together. -// Groups are ordered by their first page index so the selection is -// deterministic regardless of map iteration order. -func parentKeyGroups(pages []dedupPageInfo) iter.Seq[[]int] { - idxByKey := make(map[dedupFetchKey][]int) - for i, p := range pages { - if p.kind == dedupPageParent { - idxByKey[p.key] = append(idxByKey[p.key], i) - } - } - - groups := slices.Collect(maps.Values(idxByKey)) - slices.SortFunc(groups, func(a, b []int) int { - return a[0] - b[0] - }) - - return slices.Values(groups) -} - -// countAfter counts fetch windows as if the given parent indices were promoted -// to current, without mutating pages. -func (w fetchWindower) countAfter(pages []dedupPageInfo, promote []int) int { - candidate := slices.Clone(pages) - for _, i := range promote { - candidate[i].kind = dedupPageCurrent - candidate[i].key = dedupFetchKey{} - } - - return w.count(candidate) -} - -// dedupDrain writes pageDirty pages from src to outPath packed at PageSize. -func dedupDrain( - ctx context.Context, - src blockReader, - pageDirty *roaring.Bitmap, - blockSize int64, - outPath string, - directIO bool, -) (*Cache, error) { - openFlags := os.O_RDWR | os.O_CREATE - if directIO { - openFlags |= unix.O_DIRECT - } - f, err := os.OpenFile(outPath, openFlags, 0o644) - if err != nil { - return nil, fmt.Errorf("open dedup cache: %w", err) - } - if want := int64(pageDirty.GetCardinality()) * header.PageSize; directIO && want > 0 { - if fErr := unix.Fallocate(int(f.Fd()), 0, 0, want); fErr != nil { - logger.L().Warn(ctx, "fallocate dedup cache; proceeding without preallocation", zap.Error(fErr)) - } - } - - fileOff, err := drainDirtyPages(ctx, int(f.Fd()), src, pageDirty, blockSize) - if err != nil { - return nil, errors.Join(err, f.Close(), os.Remove(outPath)) - } - - if directIO { - if err := f.Truncate(fileOff); err != nil { - return nil, errors.Join(fmt.Errorf("truncate dedup cache: %w", err), f.Close(), os.Remove(outPath)) - } - } - if err := f.Close(); err != nil { - return nil, errors.Join(err, os.Remove(outPath)) - } - - cache, err := NewCache(fileOff, header.PageSize, outPath, false) - if err != nil { - return nil, errors.Join(err, os.Remove(outPath)) - } - cache.setIsCached(0, fileOff) - - return cache, nil -} - -func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages, promotedBlocks, promotedPages int64, compareDur, writeDur time.Duration) { - dedupedPages := totalPages - uniquePages - emptyPages - ratio := 0.0 - if totalPages > 0 { - ratio = float64(dedupedPages) / float64(totalPages) - } - telemetry.SetAttributes(ctx, - attribute.Int64("dedup.total_pages", totalPages), - attribute.Int64("dedup.deduped_pages", dedupedPages), - attribute.Int64("dedup.unique_pages", uniquePages), - attribute.Int64("dedup.empty_pages", emptyPages), - attribute.Int64("dedup.promoted_blocks", promotedBlocks), - attribute.Int64("dedup.promoted_pages", promotedPages), - attribute.Float64("dedup.ratio", ratio), - attribute.Int64("dedup.compare_ms", compareDur.Milliseconds()), - attribute.Int64("dedup.write_ms", writeDur.Milliseconds()), - ) -} - -// drainDirtyPages packs pageDirty pages from src into fd. Mirrors -// Cache.copyProcessMemory: coalesce contiguous pages into ranges, carve at -// source-block boundaries, pre-split over MAX_RW_COUNT, then drainIovs. -func drainDirtyPages(ctx context.Context, fd int, src blockReader, pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { - var ranges []Range - for r := range BitsetRanges(pageDirty, header.PageSize) { - for off := r.Start; off < r.End(); { - blockOff := (off / blockSize) * blockSize - chunkEnd := min(r.End(), blockOff+blockSize) - ranges = append(ranges, Range{Start: off, Size: chunkEnd - off}) - off = chunkEnd - } - } - ranges = splitOversizedRanges(ranges, getAlignedMaxRwCount(header.PageSize)) - - if err := drainIovs(ranges, func(r Range) int64 { return r.Size }, header.PageSize, - func(destOff int64, batch []Range, _ int64) error { - if err := ctx.Err(); err != nil { - return err - } - iovs := make([][]byte, len(batch)) - for i, r := range batch { - blockOff := (r.Start / blockSize) * blockSize - buf, srcErr := src(blockOff) - if srcErr != nil { - return fmt.Errorf("slice src at %d: %w", blockOff, srcErr) - } - iovs[i] = buf[r.Start-blockOff : r.Start-blockOff+r.Size] - } - if err := pwritevAll(fd, destOff, iovs); err != nil { - return fmt.Errorf("pwritev dedup pages: %w", err) - } - - return nil - }); err != nil { - return 0, err - } - - return GetSize(ranges), nil -} - -// Dedup writes pages from c that differ from base, packed at PageSize, to -// outPath. bestEffort skips uncached blocks; directIO uses O_DIRECT. -func (c *Cache) Dedup( - ctx context.Context, - base ReadonlyDevice, - dirty *roaring.Bitmap, - blockSize int64, - outPath string, - bestEffort bool, - directIO bool, - budget DedupBudget, -) (*Cache, *header.DiffMetadata, error) { - ctx, span := tracer.Start(ctx, "dedup-pages") - defer span.End() - - // c is packed in BitsetRanges order; map abs offset → packed offset. - packed := make(map[int64]int64, dirty.GetCardinality()) - var cum int64 - for r := range BitsetRanges(dirty, blockSize) { - for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { - packed[r.Start+chunkOff] = cum - cum += blockSize - } - } - src := func(absOff int64) ([]byte, error) { - idx, ok := packed[absOff] - if !ok { - return nil, fmt.Errorf("dedup src: %d not packed", absOff) - } - - return c.Slice(idx, blockSize) - } - - compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) - if err != nil { - return nil, nil, err - } - compareDur := time.Since(compareStart) - - writeStart := time.Now() - cache, err := dedupDrain(ctx, src, plan.pageDirty, blockSize, outPath, directIO) - if err != nil { - return nil, nil, err - } - recordDedupAttrs(ctx, - plan.exportedSize/header.PageSize, - int64(plan.pageDirty.GetCardinality()), - int64(plan.pageEmpty.GetCardinality()), - plan.promotedBlocks, - plan.promotedPages, - compareDur, time.Since(writeStart), - ) - - return cache, &header.DiffMetadata{ - Dirty: plan.pageDirty, - Empty: plan.pageEmpty, - BlockSize: header.PageSize, - }, nil -} - func (c *Cache) ReadAt(b []byte, off int64) (int, error) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/packages/orchestrator/pkg/sandbox/block/dedup.go b/packages/orchestrator/pkg/sandbox/block/dedup.go new file mode 100644 index 0000000000..db8125a601 --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/block/dedup.go @@ -0,0 +1,566 @@ +//go:build linux + +package block + +import ( + "bytes" + "context" + "errors" + "fmt" + "iter" + "maps" + "os" + "slices" + "time" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + "golang.org/x/sys/unix" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" +) + +type dedupPlan struct { + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 +} + +type DedupBudget struct { + MaxFetchWindowsPerBlock int + MaxPromotedParentPagesPerBlock int + FetchRunWindowPages int +} + +type dedupPageKind byte + +const ( + dedupPageEmpty dedupPageKind = iota + dedupPageParent + dedupPageCurrent +) + +type fetchSource byte + +const ( + currentFetchSource fetchSource = iota + 1 + parentFetchSource +) + +const ( + defaultDedupFetchWindowPages = storage.DefaultCompressFrameSize / header.PageSize +) + +type dedupFetchKey struct { + sourceType fetchSource + buildID uuid.UUID + window int +} + +type dedupPageInfo struct { + kind dedupPageKind + key dedupFetchKey +} + +// blockReader returns the source bytes for the block at absOff. +type blockReader func(absOff int64) ([]byte, error) + +// dedupConfig holds the immutable inputs of a dedup comparison. Its methods +// only read it; all mutable accumulation lives in dedupAccum. +type dedupConfig struct { + src blockReader + base ReadonlyDevice + baseHeader *header.Header + peeker CachePeeker + + blockSize int64 + bestEffort bool + budget DedupBudget +} + +// dedupAccum is the mutable running state of a dedup comparison. compareBlock +// accumulates each block's results into it in place. +type dedupAccum struct { + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 + currentStoredPages int64 +} + +// dedupCompare classifies each dirty page against base into pageDirty or +// pageEmpty. Per-page IsCached so a single uncached neighbour can't poison +// cached pages of the same block when the parent header is page-granular. +func dedupCompare( + ctx context.Context, + src blockReader, + base ReadonlyDevice, + dirty *roaring.Bitmap, + blockSize int64, + bestEffort bool, + budget DedupBudget, +) (*dedupPlan, error) { + if budget.FetchRunWindowPages <= 0 { + budget.FetchRunWindowPages = defaultDedupFetchWindowPages + } + + peeker, _ := base.(CachePeeker) + cfg := dedupConfig{ + src: src, + base: base, + baseHeader: base.Header(), + peeker: peeker, + blockSize: blockSize, + bestEffort: bestEffort, + budget: budget, + } + acc := dedupAccum{ + pageDirty: roaring.New(), + pageEmpty: roaring.New(), + } + + for r := range BitsetRanges(dirty, blockSize) { + acc.exportedSize += r.Size + + for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { + if err := cfg.compareBlock(ctx, r.Start+chunkOff, &acc); err != nil { + return nil, err + } + } + } + + return &dedupPlan{ + pageDirty: acc.pageDirty, + pageEmpty: acc.pageEmpty, + exportedSize: acc.exportedSize, + promotedBlocks: acc.promotedBlocks, + promotedPages: acc.promotedPages, + }, nil +} + +// compareBlock classifies one block and accumulates its results into acc, +// mutating it in place. +func (c dedupConfig) compareBlock(ctx context.Context, absOff int64, acc *dedupAccum) error { + if err := ctx.Err(); err != nil { + return err + } + + srcBuf, err := c.src(absOff) + if err != nil { + return err + } + + pagesPerBlock := int(c.blockSize / header.PageSize) + blockPages := make([]dedupPageInfo, pagesPerBlock) + + for page := range pagesPerBlock { + pageStart := int64(page) * header.PageSize + srcPage := srcBuf[pageStart : pageStart+header.PageSize] + + info, err := c.classifyPage(ctx, srcPage, absOff+pageStart) + if err != nil { + return err + } + + blockPages[page] = info + } + + promoted := c.promoteBlockPages(blockPages, acc.currentStoredPages) + if promoted > 0 { + acc.promotedBlocks++ + acc.promotedPages += int64(promoted) + } + + acc.currentStoredPages += recordBlockPages(absOff, blockPages, acc.pageDirty, acc.pageEmpty) + + return nil +} + +func (c dedupConfig) classifyPage(ctx context.Context, srcPage []byte, pageOff int64) (dedupPageInfo, error) { + if header.IsZero(srcPage) { + return dedupPageInfo{}, nil + } + + mapped, err := c.baseHeader.GetShiftedMapping(ctx, pageOff) + hasMapping := err == nil + + if hasMapping && mapped.BuildId == uuid.Nil && int64(mapped.Length) >= header.PageSize { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + if c.skipUncachedPage(ctx, pageOff) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + basePage, err := c.base.Slice(ctx, pageOff, header.PageSize) + if err != nil { + return dedupPageInfo{}, fmt.Errorf("slice base at %d: %w", pageOff, err) + } + + if !bytes.Equal(srcPage, basePage) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + windowBytes := c.budget.FetchRunWindowPages * header.PageSize + key := dedupFetchKey{sourceType: parentFetchSource} + if hasMapping { + key.buildID = mapped.BuildId + key.window = int(mapped.Offset / uint64(windowBytes)) + } else { + key.window = int(pageOff / int64(windowBytes)) + } + + return dedupPageInfo{kind: dedupPageParent, key: key}, nil +} + +func (c dedupConfig) skipUncachedPage(ctx context.Context, pageOff int64) bool { + return c.bestEffort && c.peeker != nil && !c.peeker.IsCached(ctx, pageOff, header.PageSize) +} + +func (c dedupConfig) promoteBlockPages(blockPages []dedupPageInfo, currentStoredPages int64) int { + w := fetchWindower{ + windowPages: c.budget.FetchRunWindowPages, + currentStart: currentStoredPages, + } + + return w.compact(blockPages, c.budget.MaxFetchWindowsPerBlock, c.budget.MaxPromotedParentPagesPerBlock) +} + +// recordBlockPages writes this block's classified pages into the diff bitmaps +// and returns how many current pages were stored. +func recordBlockPages(absOff int64, blockPages []dedupPageInfo, pageDirty, pageEmpty *roaring.Bitmap) int64 { + var storedPages int64 + for page, info := range blockPages { + pageIdx := uint32(absOff/header.PageSize) + uint32(page) + switch info.kind { + case dedupPageEmpty: + pageEmpty.Add(pageIdx) + case dedupPageCurrent: + pageDirty.Add(pageIdx) + storedPages++ + } + } + + return storedPages +} + +// fetchWindower groups pages into fetch-run windows. windowPages and +// currentStart are invariant for the lifetime of a compact pass, so they live +// on the receiver instead of being threaded through every call. +type fetchWindower struct { + windowPages int + currentStart int64 +} + +// compact promotes parent pages to current until the block fits within +// maxWindows fetch windows or the promotion budget is exhausted. +func (w fetchWindower) compact(pages []dedupPageInfo, maxWindows, maxPromoted int) int { + if maxWindows <= 0 || maxPromoted <= 0 { + return 0 + } + + var promoted int + for promoted < maxPromoted && w.count(pages) > maxWindows { + idxs := w.bestParentRun(pages, maxPromoted-promoted) + if len(idxs) == 0 { + break + } + for _, i := range idxs { + pages[i].kind = dedupPageCurrent + pages[i].key = dedupFetchKey{} + promoted++ + } + } + + return promoted +} + +func (w fetchWindower) count(pages []dedupPageInfo) int { + keys := make(map[dedupFetchKey]struct{}) + var currentOrdinal int64 + for _, p := range pages { + switch p.kind { + case dedupPageParent: + keys[p.key] = struct{}{} + case dedupPageCurrent: + keys[dedupFetchKey{ + sourceType: currentFetchSource, + window: int(w.currentStart+currentOrdinal) / w.windowPages, + }] = struct{}{} + currentOrdinal++ + } + } + + return len(keys) +} + +// bestParentRun returns the parent page indices whose promotion to current best +// reduces fetch windows per promoted page, scanning contiguous parent/empty +// runs first and falling back to per-key promotion. +func (w fetchWindower) bestParentRun(pages []dedupPageInfo, budget int) []int { + before := w.count(pages) + if best := w.bestByRatio(pages, budget, before, parentRuns(pages)); best != nil { + return best + } + + return w.bestByRatio(pages, budget, before, parentKeyGroups(pages)) +} + +// bestByRatio picks the candidate page set whose promotion removes the most +// fetch windows per promoted page (highest benefit/cost), within budget. +func (w fetchWindower) bestByRatio(pages []dedupPageInfo, budget, before int, candidates iter.Seq[[]int]) []int { + var best []int + bestBenefit, bestCost := 0, 0 + for idxs := range candidates { + // An over-budget candidate can still help via an affordable prefix; the + // benefit check below discards prefixes that don't remove a window. + if len(idxs) > budget { + idxs = idxs[:budget] + } + cost := len(idxs) + if cost == 0 { + continue + } + benefit := before - w.countAfter(pages, idxs) + if benefit <= 0 { + continue + } + if best == nil || cost*bestBenefit < bestCost*benefit { + best, bestBenefit, bestCost = idxs, benefit, cost + } + } + + return best +} + +// parentRuns yields the parent page indices of each maximal run of parent/empty +// pages. A run starts at a parent page and extends across adjacent parent and +// empty pages; current pages and the slice ends bound it. +func parentRuns(pages []dedupPageInfo) iter.Seq[[]int] { + return func(yield func([]int) bool) { + var run []int + for i, p := range pages { + switch p.kind { + case dedupPageParent: + run = append(run, i) + case dedupPageEmpty: + // Empties extend an in-progress run but never start one. + default: // dedupPageCurrent + if len(run) > 0 && !yield(run) { + return + } + run = nil + } + } + if len(run) > 0 { + yield(run) + } + } +} + +// parentKeyGroups yields the parent page indices grouped by fetch key, so a set +// of non-adjacent parents sharing one fetch window can be promoted together. +// Groups are ordered by their first page index so the selection is +// deterministic regardless of map iteration order. +func parentKeyGroups(pages []dedupPageInfo) iter.Seq[[]int] { + idxByKey := make(map[dedupFetchKey][]int) + for i, p := range pages { + if p.kind == dedupPageParent { + idxByKey[p.key] = append(idxByKey[p.key], i) + } + } + + groups := slices.Collect(maps.Values(idxByKey)) + slices.SortFunc(groups, func(a, b []int) int { + return a[0] - b[0] + }) + + return slices.Values(groups) +} + +// countAfter counts fetch windows as if the given parent indices were promoted +// to current, without mutating pages. +func (w fetchWindower) countAfter(pages []dedupPageInfo, promote []int) int { + candidate := slices.Clone(pages) + for _, i := range promote { + candidate[i].kind = dedupPageCurrent + candidate[i].key = dedupFetchKey{} + } + + return w.count(candidate) +} + +// dedupDrain writes pageDirty pages from src to outPath packed at PageSize. +func dedupDrain( + ctx context.Context, + src blockReader, + pageDirty *roaring.Bitmap, + blockSize int64, + outPath string, + directIO bool, +) (*Cache, error) { + openFlags := os.O_RDWR | os.O_CREATE + if directIO { + openFlags |= unix.O_DIRECT + } + f, err := os.OpenFile(outPath, openFlags, 0o644) + if err != nil { + return nil, fmt.Errorf("open dedup cache: %w", err) + } + if want := int64(pageDirty.GetCardinality()) * header.PageSize; directIO && want > 0 { + if fErr := unix.Fallocate(int(f.Fd()), 0, 0, want); fErr != nil { + logger.L().Warn(ctx, "fallocate dedup cache; proceeding without preallocation", zap.Error(fErr)) + } + } + + fileOff, err := drainDirtyPages(ctx, int(f.Fd()), src, pageDirty, blockSize) + if err != nil { + return nil, errors.Join(err, f.Close(), os.Remove(outPath)) + } + + if directIO { + if err := f.Truncate(fileOff); err != nil { + return nil, errors.Join(fmt.Errorf("truncate dedup cache: %w", err), f.Close(), os.Remove(outPath)) + } + } + if err := f.Close(); err != nil { + return nil, errors.Join(err, os.Remove(outPath)) + } + + cache, err := NewCache(fileOff, header.PageSize, outPath, false) + if err != nil { + return nil, errors.Join(err, os.Remove(outPath)) + } + cache.setIsCached(0, fileOff) + + return cache, nil +} + +func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages, promotedBlocks, promotedPages int64, compareDur, writeDur time.Duration) { + dedupedPages := totalPages - uniquePages - emptyPages + ratio := 0.0 + if totalPages > 0 { + ratio = float64(dedupedPages) / float64(totalPages) + } + telemetry.SetAttributes(ctx, + attribute.Int64("dedup.total_pages", totalPages), + attribute.Int64("dedup.deduped_pages", dedupedPages), + attribute.Int64("dedup.unique_pages", uniquePages), + attribute.Int64("dedup.empty_pages", emptyPages), + attribute.Int64("dedup.promoted_blocks", promotedBlocks), + attribute.Int64("dedup.promoted_pages", promotedPages), + attribute.Float64("dedup.ratio", ratio), + attribute.Int64("dedup.compare_ms", compareDur.Milliseconds()), + attribute.Int64("dedup.write_ms", writeDur.Milliseconds()), + ) +} + +// drainDirtyPages packs pageDirty pages from src into fd. Mirrors +// Cache.copyProcessMemory: coalesce contiguous pages into ranges, carve at +// source-block boundaries, pre-split over MAX_RW_COUNT, then drainIovs. +func drainDirtyPages(ctx context.Context, fd int, src blockReader, pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { + var ranges []Range + for r := range BitsetRanges(pageDirty, header.PageSize) { + for off := r.Start; off < r.End(); { + blockOff := (off / blockSize) * blockSize + chunkEnd := min(r.End(), blockOff+blockSize) + ranges = append(ranges, Range{Start: off, Size: chunkEnd - off}) + off = chunkEnd + } + } + ranges = splitOversizedRanges(ranges, getAlignedMaxRwCount(header.PageSize)) + + if err := drainIovs(ranges, func(r Range) int64 { return r.Size }, header.PageSize, + func(destOff int64, batch []Range, _ int64) error { + if err := ctx.Err(); err != nil { + return err + } + iovs := make([][]byte, len(batch)) + for i, r := range batch { + blockOff := (r.Start / blockSize) * blockSize + buf, srcErr := src(blockOff) + if srcErr != nil { + return fmt.Errorf("slice src at %d: %w", blockOff, srcErr) + } + iovs[i] = buf[r.Start-blockOff : r.Start-blockOff+r.Size] + } + if err := pwritevAll(fd, destOff, iovs); err != nil { + return fmt.Errorf("pwritev dedup pages: %w", err) + } + + return nil + }); err != nil { + return 0, err + } + + return GetSize(ranges), nil +} + +// Dedup writes pages from c that differ from base, packed at PageSize, to +// outPath. bestEffort skips uncached blocks; directIO uses O_DIRECT. +func (c *Cache) Dedup( + ctx context.Context, + base ReadonlyDevice, + dirty *roaring.Bitmap, + blockSize int64, + outPath string, + bestEffort bool, + directIO bool, + budget DedupBudget, +) (*Cache, *header.DiffMetadata, error) { + ctx, span := tracer.Start(ctx, "dedup-pages") + defer span.End() + + // c is packed in BitsetRanges order; map abs offset → packed offset. + packed := make(map[int64]int64, dirty.GetCardinality()) + var cum int64 + for r := range BitsetRanges(dirty, blockSize) { + for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { + packed[r.Start+chunkOff] = cum + cum += blockSize + } + } + src := func(absOff int64) ([]byte, error) { + idx, ok := packed[absOff] + if !ok { + return nil, fmt.Errorf("dedup src: %d not packed", absOff) + } + + return c.Slice(idx, blockSize) + } + + compareStart := time.Now() + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) + if err != nil { + return nil, nil, err + } + compareDur := time.Since(compareStart) + + writeStart := time.Now() + cache, err := dedupDrain(ctx, src, plan.pageDirty, blockSize, outPath, directIO) + if err != nil { + return nil, nil, err + } + recordDedupAttrs(ctx, + plan.exportedSize/header.PageSize, + int64(plan.pageDirty.GetCardinality()), + int64(plan.pageEmpty.GetCardinality()), + plan.promotedBlocks, + plan.promotedPages, + compareDur, time.Since(writeStart), + ) + + return cache, &header.DiffMetadata{ + Dirty: plan.pageDirty, + Empty: plan.pageEmpty, + BlockSize: header.PageSize, + }, nil +}