diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 5563c7e30e..d3d030a5c5 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -3,7 +3,6 @@ package block import ( - "bytes" "context" "errors" "fmt" @@ -16,9 +15,7 @@ import ( "syscall" "time" - "github.com/RoaringBitmap/roaring/v2" "github.com/edsrzf/mmap-go" - "github.com/google/uuid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" @@ -206,250 +203,6 @@ func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMet return diffMetadata, nil } -type dedupPlan struct { - pageDirty *roaring.Bitmap - pageEmpty *roaring.Bitmap - exportedSize int64 -} - -// dedupCompare classifies each dirty page against base into pageDirty or -// pageEmpty. Per-page IsCached so a single uncached neighbour can't poison -// cached pages of the same block when the parent header is page-granular. -func dedupCompare( - ctx context.Context, - src func(absOff int64) ([]byte, error), - base ReadonlyDevice, - dirty *roaring.Bitmap, - blockSize int64, - bestEffort bool, -) (*dedupPlan, error) { - pageDirty := roaring.New() - pageEmpty := roaring.New() - var exportedSize int64 - - baseHeader := base.Header() - peeker, _ := base.(CachePeeker) - - for r := range BitsetRanges(dirty, blockSize) { - exportedSize += r.Size - - for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { - if err := ctx.Err(); err != nil { - return nil, err - } - - absOff := r.Start + chunkOff - srcBuf, err := src(absOff) - if err != nil { - return nil, err - } - - for i := int64(0); i < blockSize; i += header.PageSize { - srcPage := srcBuf[i : i+header.PageSize] - pageIdx := uint32((absOff + i) / header.PageSize) - pageOff := absOff + i - - if header.IsZero(srcPage) { - pageEmpty.Add(pageIdx) - - continue - } - - skip := false - if baseHeader != nil { - if m, err := baseHeader.GetShiftedMapping(ctx, pageOff); err == nil { - if m.BuildId == uuid.Nil && int64(m.Length) >= header.PageSize { - skip = true - } - } - } - if !skip && bestEffort && peeker != nil && !peeker.IsCached(ctx, pageOff, header.PageSize) { - skip = true - } - if skip { - pageDirty.Add(pageIdx) - - continue - } - - basePage, sErr := base.Slice(ctx, pageOff, header.PageSize) - if sErr != nil { - return nil, fmt.Errorf("slice base at %d: %w", pageOff, sErr) - } - if bytes.Equal(srcPage, basePage) { - continue - } - - pageDirty.Add(pageIdx) - } - } - } - - return &dedupPlan{pageDirty: pageDirty, pageEmpty: pageEmpty, exportedSize: exportedSize}, nil -} - -// dedupDrain writes pageDirty pages from src to outPath packed at PageSize. -func dedupDrain( - ctx context.Context, - src func(absOff int64) ([]byte, error), - pageDirty *roaring.Bitmap, - blockSize int64, - outPath string, - directIO bool, -) (*Cache, error) { - openFlags := os.O_RDWR | os.O_CREATE - if directIO { - openFlags |= unix.O_DIRECT - } - f, err := os.OpenFile(outPath, openFlags, 0o644) - if err != nil { - return nil, fmt.Errorf("open dedup cache: %w", err) - } - if want := int64(pageDirty.GetCardinality()) * header.PageSize; directIO && want > 0 { - if fErr := unix.Fallocate(int(f.Fd()), 0, 0, want); fErr != nil { - logger.L().Warn(ctx, "fallocate dedup cache; proceeding without preallocation", zap.Error(fErr)) - } - } - - fileOff, err := drainDirtyPages(ctx, int(f.Fd()), src, pageDirty, blockSize) - if err != nil { - return nil, errors.Join(err, f.Close(), os.Remove(outPath)) - } - - if directIO { - if err := f.Truncate(fileOff); err != nil { - return nil, errors.Join(fmt.Errorf("truncate dedup cache: %w", err), f.Close(), os.Remove(outPath)) - } - } - if err := f.Close(); err != nil { - return nil, errors.Join(err, os.Remove(outPath)) - } - - cache, err := NewCache(fileOff, header.PageSize, outPath, false) - if err != nil { - return nil, errors.Join(err, os.Remove(outPath)) - } - cache.setIsCached(0, fileOff) - - return cache, nil -} - -func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages int64, compareDur, writeDur time.Duration) { - dedupedPages := totalPages - uniquePages - emptyPages - ratio := 0.0 - if totalPages > 0 { - ratio = float64(dedupedPages) / float64(totalPages) - } - telemetry.SetAttributes(ctx, - attribute.Int64("dedup.total_pages", totalPages), - attribute.Int64("dedup.deduped_pages", dedupedPages), - attribute.Int64("dedup.unique_pages", uniquePages), - attribute.Int64("dedup.empty_pages", emptyPages), - attribute.Float64("dedup.ratio", ratio), - attribute.Int64("dedup.compare_ms", compareDur.Milliseconds()), - attribute.Int64("dedup.write_ms", writeDur.Milliseconds()), - ) -} - -// drainDirtyPages packs pageDirty pages from src into fd. Mirrors -// Cache.copyProcessMemory: coalesce contiguous pages into ranges, carve at -// source-block boundaries, pre-split over MAX_RW_COUNT, then drainIovs. -func drainDirtyPages(ctx context.Context, fd int, src func(absOff int64) ([]byte, error), pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { - var ranges []Range - for r := range BitsetRanges(pageDirty, header.PageSize) { - for off := r.Start; off < r.End(); { - blockOff := (off / blockSize) * blockSize - chunkEnd := min(r.End(), blockOff+blockSize) - ranges = append(ranges, Range{Start: off, Size: chunkEnd - off}) - off = chunkEnd - } - } - ranges = splitOversizedRanges(ranges, getAlignedMaxRwCount(header.PageSize)) - - if err := drainIovs(ranges, func(r Range) int64 { return r.Size }, header.PageSize, - func(destOff int64, batch []Range, _ int64) error { - if err := ctx.Err(); err != nil { - return err - } - iovs := make([][]byte, len(batch)) - for i, r := range batch { - blockOff := (r.Start / blockSize) * blockSize - buf, srcErr := src(blockOff) - if srcErr != nil { - return fmt.Errorf("slice src at %d: %w", blockOff, srcErr) - } - iovs[i] = buf[r.Start-blockOff : r.Start-blockOff+r.Size] - } - if err := pwritevAll(fd, destOff, iovs); err != nil { - return fmt.Errorf("pwritev dedup pages: %w", err) - } - - return nil - }); err != nil { - return 0, err - } - - return GetSize(ranges), nil -} - -// Dedup writes pages from c that differ from base, packed at PageSize, to -// outPath. bestEffort skips uncached blocks; directIO uses O_DIRECT. -func (c *Cache) Dedup( - ctx context.Context, - base ReadonlyDevice, - dirty *roaring.Bitmap, - blockSize int64, - outPath string, - bestEffort bool, - directIO bool, -) (*Cache, *header.DiffMetadata, error) { - ctx, span := tracer.Start(ctx, "dedup-pages") - defer span.End() - - // c is packed in BitsetRanges order; map abs offset → packed offset. - packed := make(map[int64]int64, dirty.GetCardinality()) - var cum int64 - for r := range BitsetRanges(dirty, blockSize) { - for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { - packed[r.Start+chunkOff] = cum - cum += blockSize - } - } - src := func(absOff int64) ([]byte, error) { - idx, ok := packed[absOff] - if !ok { - return nil, fmt.Errorf("dedup src: %d not packed", absOff) - } - - return c.Slice(idx, blockSize) - } - - compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort) - if err != nil { - return nil, nil, err - } - compareDur := time.Since(compareStart) - - writeStart := time.Now() - cache, err := dedupDrain(ctx, src, plan.pageDirty, blockSize, outPath, directIO) - if err != nil { - return nil, nil, err - } - recordDedupAttrs(ctx, - plan.exportedSize/header.PageSize, - int64(plan.pageDirty.GetCardinality()), - int64(plan.pageEmpty.GetCardinality()), - compareDur, time.Since(writeStart), - ) - - return cache, &header.DiffMetadata{ - Dirty: plan.pageDirty, - Empty: plan.pageEmpty, - BlockSize: header.PageSize, - }, nil -} - func (c *Cache) ReadAt(b []byte, off int64) (int, error) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/packages/orchestrator/pkg/sandbox/block/cache_test.go b/packages/orchestrator/pkg/sandbox/block/cache_test.go index e72e5a35b7..97da903fa7 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache_test.go +++ b/packages/orchestrator/pkg/sandbox/block/cache_test.go @@ -786,7 +786,7 @@ func runDedup(t *testing.T, srcMem, baseMem []byte, dirty *roaring.Bitmap, block t.Helper() src := buildPackedSrcCache(t, srcMem, dirty, blockSize) - cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false) + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseMem}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -908,7 +908,7 @@ func TestCacheDedup_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) cancel() - _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false) + _, _, err = src.Dedup(ctx, &fakeOriginalDevice{data: data}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.ErrorIs(t, err, context.Canceled) } @@ -932,6 +932,7 @@ func TestCacheDedup_OriginalMemfileReadError(t *testing.T) { t.TempDir()+"/dedup", false, false, + DedupBudget{}, ) require.ErrorIs(t, err, sentinel) } @@ -965,7 +966,7 @@ func TestCacheDedup_EmptyParentMappingSkipsBaseReadAt(t *testing.T) { junk := bytes.Repeat([]byte{0xFF}, int(size)) base := &fakeOriginalDevice{data: junk, hdr: hdr} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1058,7 +1059,7 @@ func TestCacheDedup_BestEffortUncachedSkipsBaseReadAt(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: false} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1090,7 +1091,7 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { src := buildPackedSrcCache(t, srcData, dirty, blockSize) base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: baseData}, cached: true} - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1098,6 +1099,40 @@ func TestCacheDedup_BestEffortCachedMatchesNormalPath(t *testing.T) { require.EqualValues(t, 0, meta.Empty.GetCardinality()) } +func TestCacheDedup_FetchRunBudgetPromotesSmallParentRun(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + size := blockSize + + baseData := make([]byte, size) + _, err := rand.Read(baseData) + require.NoError(t, err) + srcData := make([]byte, size) + copy(srcData, baseData) + srcData[0] ^= 0xFF + srcData[2*pageSize] ^= 0xFF + clear(srcData[3*pageSize : 4*pageSize]) + + dirty := fullDirty(size, blockSize) + src := buildPackedSrcCache(t, srcData, dirty, blockSize) + + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{MaxFetchWindowsPerBlock: 1, MaxPromotedParentPagesPerBlock: 1, FetchRunWindowPages: 4}) + require.NoError(t, err) + t.Cleanup(func() { _ = cache.Close() }) + + require.EqualValues(t, 3, meta.Dirty.GetCardinality()) + require.EqualValues(t, 1, meta.Empty.GetCardinality()) + + for _, i := range []int64{0, 1, 2} { + got := make([]byte, pageSize) + _, err := cache.ReadAt(got, i*pageSize) + require.NoError(t, err) + require.Equal(t, srcData[i*pageSize:(i+1)*pageSize], got, "promoted page %d", i) + } +} + type perPagePeeker struct { fakeOriginalDevice @@ -1132,7 +1167,7 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { cachedPages: map[uint32]bool{0: true, 1: false, 2: true, 3: false}, } - cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false) + cache, meta, err := src.Dedup(t.Context(), base, dirty, blockSize, t.TempDir()+"/dedup", true, false, DedupBudget{}) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) @@ -1140,3 +1175,39 @@ func TestCacheDedup_BestEffortPerPageCacheCheck(t *testing.T) { require.True(t, meta.Dirty.Contains(1)) require.True(t, meta.Dirty.Contains(3)) } + +// With no promotion budget, parent pages that match the base stay deduped even +// when the block exceeds MaxFetchWindowsPerBlock: the compaction can't spend +// any promotions, so it must leave the parents out of the diff rather than +// over-promote them into Dirty. +func TestCacheDedup_BudgetExhaustionKeepsParentsDeduped(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + size := blockSize + + baseData := make([]byte, size) + _, err := rand.Read(baseData) + require.NoError(t, err) + srcData := make([]byte, size) + copy(srcData, baseData) + // Pages 0 and 2 differ (current); pages 1 and 3 match base (parent). + srcData[0] ^= 0xFF + srcData[2*pageSize] ^= 0xFF + + dirty := fullDirty(size, blockSize) + src := buildPackedSrcCache(t, srcData, dirty, blockSize) + + // MaxFetchWindowsPerBlock unsatisfiable, but no promotions allowed. + cache, meta, err := src.Dedup(t.Context(), &fakeOriginalDevice{data: baseData}, dirty, blockSize, t.TempDir()+"/dedup", false, false, DedupBudget{MaxFetchWindowsPerBlock: 0, MaxPromotedParentPagesPerBlock: 0, FetchRunWindowPages: 4}) + require.NoError(t, err) + t.Cleanup(func() { _ = cache.Close() }) + + // Only the two genuinely differing pages are stored; matching parents are + // deduped away (not promoted), and nothing is empty. + require.EqualValues(t, 2, meta.Dirty.GetCardinality()) + require.True(t, meta.Dirty.Contains(0)) + require.True(t, meta.Dirty.Contains(2)) + require.EqualValues(t, 0, meta.Empty.GetCardinality()) +} diff --git a/packages/orchestrator/pkg/sandbox/block/dedup.go b/packages/orchestrator/pkg/sandbox/block/dedup.go new file mode 100644 index 0000000000..db8125a601 --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/block/dedup.go @@ -0,0 +1,566 @@ +//go:build linux + +package block + +import ( + "bytes" + "context" + "errors" + "fmt" + "iter" + "maps" + "os" + "slices" + "time" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + "golang.org/x/sys/unix" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" +) + +type dedupPlan struct { + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 +} + +type DedupBudget struct { + MaxFetchWindowsPerBlock int + MaxPromotedParentPagesPerBlock int + FetchRunWindowPages int +} + +type dedupPageKind byte + +const ( + dedupPageEmpty dedupPageKind = iota + dedupPageParent + dedupPageCurrent +) + +type fetchSource byte + +const ( + currentFetchSource fetchSource = iota + 1 + parentFetchSource +) + +const ( + defaultDedupFetchWindowPages = storage.DefaultCompressFrameSize / header.PageSize +) + +type dedupFetchKey struct { + sourceType fetchSource + buildID uuid.UUID + window int +} + +type dedupPageInfo struct { + kind dedupPageKind + key dedupFetchKey +} + +// blockReader returns the source bytes for the block at absOff. +type blockReader func(absOff int64) ([]byte, error) + +// dedupConfig holds the immutable inputs of a dedup comparison. Its methods +// only read it; all mutable accumulation lives in dedupAccum. +type dedupConfig struct { + src blockReader + base ReadonlyDevice + baseHeader *header.Header + peeker CachePeeker + + blockSize int64 + bestEffort bool + budget DedupBudget +} + +// dedupAccum is the mutable running state of a dedup comparison. compareBlock +// accumulates each block's results into it in place. +type dedupAccum struct { + pageDirty *roaring.Bitmap + pageEmpty *roaring.Bitmap + exportedSize int64 + promotedBlocks int64 + promotedPages int64 + currentStoredPages int64 +} + +// dedupCompare classifies each dirty page against base into pageDirty or +// pageEmpty. Per-page IsCached so a single uncached neighbour can't poison +// cached pages of the same block when the parent header is page-granular. +func dedupCompare( + ctx context.Context, + src blockReader, + base ReadonlyDevice, + dirty *roaring.Bitmap, + blockSize int64, + bestEffort bool, + budget DedupBudget, +) (*dedupPlan, error) { + if budget.FetchRunWindowPages <= 0 { + budget.FetchRunWindowPages = defaultDedupFetchWindowPages + } + + peeker, _ := base.(CachePeeker) + cfg := dedupConfig{ + src: src, + base: base, + baseHeader: base.Header(), + peeker: peeker, + blockSize: blockSize, + bestEffort: bestEffort, + budget: budget, + } + acc := dedupAccum{ + pageDirty: roaring.New(), + pageEmpty: roaring.New(), + } + + for r := range BitsetRanges(dirty, blockSize) { + acc.exportedSize += r.Size + + for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { + if err := cfg.compareBlock(ctx, r.Start+chunkOff, &acc); err != nil { + return nil, err + } + } + } + + return &dedupPlan{ + pageDirty: acc.pageDirty, + pageEmpty: acc.pageEmpty, + exportedSize: acc.exportedSize, + promotedBlocks: acc.promotedBlocks, + promotedPages: acc.promotedPages, + }, nil +} + +// compareBlock classifies one block and accumulates its results into acc, +// mutating it in place. +func (c dedupConfig) compareBlock(ctx context.Context, absOff int64, acc *dedupAccum) error { + if err := ctx.Err(); err != nil { + return err + } + + srcBuf, err := c.src(absOff) + if err != nil { + return err + } + + pagesPerBlock := int(c.blockSize / header.PageSize) + blockPages := make([]dedupPageInfo, pagesPerBlock) + + for page := range pagesPerBlock { + pageStart := int64(page) * header.PageSize + srcPage := srcBuf[pageStart : pageStart+header.PageSize] + + info, err := c.classifyPage(ctx, srcPage, absOff+pageStart) + if err != nil { + return err + } + + blockPages[page] = info + } + + promoted := c.promoteBlockPages(blockPages, acc.currentStoredPages) + if promoted > 0 { + acc.promotedBlocks++ + acc.promotedPages += int64(promoted) + } + + acc.currentStoredPages += recordBlockPages(absOff, blockPages, acc.pageDirty, acc.pageEmpty) + + return nil +} + +func (c dedupConfig) classifyPage(ctx context.Context, srcPage []byte, pageOff int64) (dedupPageInfo, error) { + if header.IsZero(srcPage) { + return dedupPageInfo{}, nil + } + + mapped, err := c.baseHeader.GetShiftedMapping(ctx, pageOff) + hasMapping := err == nil + + if hasMapping && mapped.BuildId == uuid.Nil && int64(mapped.Length) >= header.PageSize { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + if c.skipUncachedPage(ctx, pageOff) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + basePage, err := c.base.Slice(ctx, pageOff, header.PageSize) + if err != nil { + return dedupPageInfo{}, fmt.Errorf("slice base at %d: %w", pageOff, err) + } + + if !bytes.Equal(srcPage, basePage) { + return dedupPageInfo{kind: dedupPageCurrent}, nil + } + + windowBytes := c.budget.FetchRunWindowPages * header.PageSize + key := dedupFetchKey{sourceType: parentFetchSource} + if hasMapping { + key.buildID = mapped.BuildId + key.window = int(mapped.Offset / uint64(windowBytes)) + } else { + key.window = int(pageOff / int64(windowBytes)) + } + + return dedupPageInfo{kind: dedupPageParent, key: key}, nil +} + +func (c dedupConfig) skipUncachedPage(ctx context.Context, pageOff int64) bool { + return c.bestEffort && c.peeker != nil && !c.peeker.IsCached(ctx, pageOff, header.PageSize) +} + +func (c dedupConfig) promoteBlockPages(blockPages []dedupPageInfo, currentStoredPages int64) int { + w := fetchWindower{ + windowPages: c.budget.FetchRunWindowPages, + currentStart: currentStoredPages, + } + + return w.compact(blockPages, c.budget.MaxFetchWindowsPerBlock, c.budget.MaxPromotedParentPagesPerBlock) +} + +// recordBlockPages writes this block's classified pages into the diff bitmaps +// and returns how many current pages were stored. +func recordBlockPages(absOff int64, blockPages []dedupPageInfo, pageDirty, pageEmpty *roaring.Bitmap) int64 { + var storedPages int64 + for page, info := range blockPages { + pageIdx := uint32(absOff/header.PageSize) + uint32(page) + switch info.kind { + case dedupPageEmpty: + pageEmpty.Add(pageIdx) + case dedupPageCurrent: + pageDirty.Add(pageIdx) + storedPages++ + } + } + + return storedPages +} + +// fetchWindower groups pages into fetch-run windows. windowPages and +// currentStart are invariant for the lifetime of a compact pass, so they live +// on the receiver instead of being threaded through every call. +type fetchWindower struct { + windowPages int + currentStart int64 +} + +// compact promotes parent pages to current until the block fits within +// maxWindows fetch windows or the promotion budget is exhausted. +func (w fetchWindower) compact(pages []dedupPageInfo, maxWindows, maxPromoted int) int { + if maxWindows <= 0 || maxPromoted <= 0 { + return 0 + } + + var promoted int + for promoted < maxPromoted && w.count(pages) > maxWindows { + idxs := w.bestParentRun(pages, maxPromoted-promoted) + if len(idxs) == 0 { + break + } + for _, i := range idxs { + pages[i].kind = dedupPageCurrent + pages[i].key = dedupFetchKey{} + promoted++ + } + } + + return promoted +} + +func (w fetchWindower) count(pages []dedupPageInfo) int { + keys := make(map[dedupFetchKey]struct{}) + var currentOrdinal int64 + for _, p := range pages { + switch p.kind { + case dedupPageParent: + keys[p.key] = struct{}{} + case dedupPageCurrent: + keys[dedupFetchKey{ + sourceType: currentFetchSource, + window: int(w.currentStart+currentOrdinal) / w.windowPages, + }] = struct{}{} + currentOrdinal++ + } + } + + return len(keys) +} + +// bestParentRun returns the parent page indices whose promotion to current best +// reduces fetch windows per promoted page, scanning contiguous parent/empty +// runs first and falling back to per-key promotion. +func (w fetchWindower) bestParentRun(pages []dedupPageInfo, budget int) []int { + before := w.count(pages) + if best := w.bestByRatio(pages, budget, before, parentRuns(pages)); best != nil { + return best + } + + return w.bestByRatio(pages, budget, before, parentKeyGroups(pages)) +} + +// bestByRatio picks the candidate page set whose promotion removes the most +// fetch windows per promoted page (highest benefit/cost), within budget. +func (w fetchWindower) bestByRatio(pages []dedupPageInfo, budget, before int, candidates iter.Seq[[]int]) []int { + var best []int + bestBenefit, bestCost := 0, 0 + for idxs := range candidates { + // An over-budget candidate can still help via an affordable prefix; the + // benefit check below discards prefixes that don't remove a window. + if len(idxs) > budget { + idxs = idxs[:budget] + } + cost := len(idxs) + if cost == 0 { + continue + } + benefit := before - w.countAfter(pages, idxs) + if benefit <= 0 { + continue + } + if best == nil || cost*bestBenefit < bestCost*benefit { + best, bestBenefit, bestCost = idxs, benefit, cost + } + } + + return best +} + +// parentRuns yields the parent page indices of each maximal run of parent/empty +// pages. A run starts at a parent page and extends across adjacent parent and +// empty pages; current pages and the slice ends bound it. +func parentRuns(pages []dedupPageInfo) iter.Seq[[]int] { + return func(yield func([]int) bool) { + var run []int + for i, p := range pages { + switch p.kind { + case dedupPageParent: + run = append(run, i) + case dedupPageEmpty: + // Empties extend an in-progress run but never start one. + default: // dedupPageCurrent + if len(run) > 0 && !yield(run) { + return + } + run = nil + } + } + if len(run) > 0 { + yield(run) + } + } +} + +// parentKeyGroups yields the parent page indices grouped by fetch key, so a set +// of non-adjacent parents sharing one fetch window can be promoted together. +// Groups are ordered by their first page index so the selection is +// deterministic regardless of map iteration order. +func parentKeyGroups(pages []dedupPageInfo) iter.Seq[[]int] { + idxByKey := make(map[dedupFetchKey][]int) + for i, p := range pages { + if p.kind == dedupPageParent { + idxByKey[p.key] = append(idxByKey[p.key], i) + } + } + + groups := slices.Collect(maps.Values(idxByKey)) + slices.SortFunc(groups, func(a, b []int) int { + return a[0] - b[0] + }) + + return slices.Values(groups) +} + +// countAfter counts fetch windows as if the given parent indices were promoted +// to current, without mutating pages. +func (w fetchWindower) countAfter(pages []dedupPageInfo, promote []int) int { + candidate := slices.Clone(pages) + for _, i := range promote { + candidate[i].kind = dedupPageCurrent + candidate[i].key = dedupFetchKey{} + } + + return w.count(candidate) +} + +// dedupDrain writes pageDirty pages from src to outPath packed at PageSize. +func dedupDrain( + ctx context.Context, + src blockReader, + pageDirty *roaring.Bitmap, + blockSize int64, + outPath string, + directIO bool, +) (*Cache, error) { + openFlags := os.O_RDWR | os.O_CREATE + if directIO { + openFlags |= unix.O_DIRECT + } + f, err := os.OpenFile(outPath, openFlags, 0o644) + if err != nil { + return nil, fmt.Errorf("open dedup cache: %w", err) + } + if want := int64(pageDirty.GetCardinality()) * header.PageSize; directIO && want > 0 { + if fErr := unix.Fallocate(int(f.Fd()), 0, 0, want); fErr != nil { + logger.L().Warn(ctx, "fallocate dedup cache; proceeding without preallocation", zap.Error(fErr)) + } + } + + fileOff, err := drainDirtyPages(ctx, int(f.Fd()), src, pageDirty, blockSize) + if err != nil { + return nil, errors.Join(err, f.Close(), os.Remove(outPath)) + } + + if directIO { + if err := f.Truncate(fileOff); err != nil { + return nil, errors.Join(fmt.Errorf("truncate dedup cache: %w", err), f.Close(), os.Remove(outPath)) + } + } + if err := f.Close(); err != nil { + return nil, errors.Join(err, os.Remove(outPath)) + } + + cache, err := NewCache(fileOff, header.PageSize, outPath, false) + if err != nil { + return nil, errors.Join(err, os.Remove(outPath)) + } + cache.setIsCached(0, fileOff) + + return cache, nil +} + +func recordDedupAttrs(ctx context.Context, totalPages, uniquePages, emptyPages, promotedBlocks, promotedPages int64, compareDur, writeDur time.Duration) { + dedupedPages := totalPages - uniquePages - emptyPages + ratio := 0.0 + if totalPages > 0 { + ratio = float64(dedupedPages) / float64(totalPages) + } + telemetry.SetAttributes(ctx, + attribute.Int64("dedup.total_pages", totalPages), + attribute.Int64("dedup.deduped_pages", dedupedPages), + attribute.Int64("dedup.unique_pages", uniquePages), + attribute.Int64("dedup.empty_pages", emptyPages), + attribute.Int64("dedup.promoted_blocks", promotedBlocks), + attribute.Int64("dedup.promoted_pages", promotedPages), + attribute.Float64("dedup.ratio", ratio), + attribute.Int64("dedup.compare_ms", compareDur.Milliseconds()), + attribute.Int64("dedup.write_ms", writeDur.Milliseconds()), + ) +} + +// drainDirtyPages packs pageDirty pages from src into fd. Mirrors +// Cache.copyProcessMemory: coalesce contiguous pages into ranges, carve at +// source-block boundaries, pre-split over MAX_RW_COUNT, then drainIovs. +func drainDirtyPages(ctx context.Context, fd int, src blockReader, pageDirty *roaring.Bitmap, blockSize int64) (int64, error) { + var ranges []Range + for r := range BitsetRanges(pageDirty, header.PageSize) { + for off := r.Start; off < r.End(); { + blockOff := (off / blockSize) * blockSize + chunkEnd := min(r.End(), blockOff+blockSize) + ranges = append(ranges, Range{Start: off, Size: chunkEnd - off}) + off = chunkEnd + } + } + ranges = splitOversizedRanges(ranges, getAlignedMaxRwCount(header.PageSize)) + + if err := drainIovs(ranges, func(r Range) int64 { return r.Size }, header.PageSize, + func(destOff int64, batch []Range, _ int64) error { + if err := ctx.Err(); err != nil { + return err + } + iovs := make([][]byte, len(batch)) + for i, r := range batch { + blockOff := (r.Start / blockSize) * blockSize + buf, srcErr := src(blockOff) + if srcErr != nil { + return fmt.Errorf("slice src at %d: %w", blockOff, srcErr) + } + iovs[i] = buf[r.Start-blockOff : r.Start-blockOff+r.Size] + } + if err := pwritevAll(fd, destOff, iovs); err != nil { + return fmt.Errorf("pwritev dedup pages: %w", err) + } + + return nil + }); err != nil { + return 0, err + } + + return GetSize(ranges), nil +} + +// Dedup writes pages from c that differ from base, packed at PageSize, to +// outPath. bestEffort skips uncached blocks; directIO uses O_DIRECT. +func (c *Cache) Dedup( + ctx context.Context, + base ReadonlyDevice, + dirty *roaring.Bitmap, + blockSize int64, + outPath string, + bestEffort bool, + directIO bool, + budget DedupBudget, +) (*Cache, *header.DiffMetadata, error) { + ctx, span := tracer.Start(ctx, "dedup-pages") + defer span.End() + + // c is packed in BitsetRanges order; map abs offset → packed offset. + packed := make(map[int64]int64, dirty.GetCardinality()) + var cum int64 + for r := range BitsetRanges(dirty, blockSize) { + for chunkOff := int64(0); chunkOff < r.Size; chunkOff += blockSize { + packed[r.Start+chunkOff] = cum + cum += blockSize + } + } + src := func(absOff int64) ([]byte, error) { + idx, ok := packed[absOff] + if !ok { + return nil, fmt.Errorf("dedup src: %d not packed", absOff) + } + + return c.Slice(idx, blockSize) + } + + compareStart := time.Now() + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) + if err != nil { + return nil, nil, err + } + compareDur := time.Since(compareStart) + + writeStart := time.Now() + cache, err := dedupDrain(ctx, src, plan.pageDirty, blockSize, outPath, directIO) + if err != nil { + return nil, nil, err + } + recordDedupAttrs(ctx, + plan.exportedSize/header.PageSize, + int64(plan.pageDirty.GetCardinality()), + int64(plan.pageEmpty.GetCardinality()), + plan.promotedBlocks, + plan.promotedPages, + compareDur, time.Since(writeStart), + ) + + return cache, &header.DiffMetadata{ + Dirty: plan.pageDirty, + Empty: plan.pageEmpty, + BlockSize: header.PageSize, + }, nil +} diff --git a/packages/orchestrator/pkg/sandbox/block/dedup_test.go b/packages/orchestrator/pkg/sandbox/block/dedup_test.go new file mode 100644 index 0000000000..fe75b33e57 --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/block/dedup_test.go @@ -0,0 +1,606 @@ +//go:build linux + +package block + +import ( + "bytes" + "slices" + "testing" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" +) + +func parentPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageParent} } +func currentPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageCurrent} } +func emptyPage() dedupPageInfo { return dedupPageInfo{kind: dedupPageEmpty} } + +func collectRuns(seq func(func([]int) bool)) [][]int { + var runs [][]int + for r := range seq { + runs = append(runs, slices.Clone(r)) + } + + return runs +} + +func TestParentRuns(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + pages []dedupPageInfo + want [][]int + }{ + { + name: "empty input", + pages: nil, + want: nil, + }, + { + name: "no parents", + pages: []dedupPageInfo{currentPage(), emptyPage(), currentPage()}, + want: nil, + }, + { + name: "single parent", + pages: []dedupPageInfo{parentPage()}, + want: [][]int{{0}}, + }, + { + name: "contiguous parents are one run", + pages: []dedupPageInfo{parentPage(), parentPage(), parentPage()}, + want: [][]int{{0, 1, 2}}, + }, + { + name: "current page splits runs", + pages: []dedupPageInfo{parentPage(), currentPage(), parentPage()}, + want: [][]int{{0}, {2}}, + }, + { + // Empty pages are virtual (never stored, no fetch window) so they + // keep surrounding parents in the same run. + name: "empty page bridges parents into one run", + pages: []dedupPageInfo{parentPage(), emptyPage(), parentPage()}, + want: [][]int{{0, 2}}, + }, + { + name: "leading empty does not start a run", + pages: []dedupPageInfo{emptyPage(), parentPage()}, + want: [][]int{{1}}, + }, + { + name: "trailing empty is dropped from run", + pages: []dedupPageInfo{parentPage(), emptyPage()}, + want: [][]int{{0}}, + }, + { + name: "mixed runs separated by current", + pages: []dedupPageInfo{ + parentPage(), emptyPage(), parentPage(), // run {0,2} + currentPage(), + parentPage(), // run {4} + }, + want: [][]int{{0, 2}, {4}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got := collectRuns(parentRuns(tt.pages)) + require.Equal(t, tt.want, got) + }) + } +} + +func TestParentRuns_EarlyStopOnYieldFalse(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentPage(), currentPage(), parentPage(), currentPage(), parentPage(), + } + + var seen [][]int + for r := range parentRuns(pages) { + seen = append(seen, slices.Clone(r)) + + break // consumer stops after the first run + } + + require.Equal(t, [][]int{{0}}, seen) +} + +// compareBlock accumulates a block's results into the accumulator in place. +func TestCompareBlock_AccumulatesInPlace(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + + // An all-zero source block makes every page classify as empty via the + // header.IsZero fast path, so no base device read is needed. + srcBlock := make([]byte, blockSize) + + cfg := dedupConfig{ + src: func(int64) ([]byte, error) { return srcBlock, nil }, + blockSize: blockSize, + budget: DedupBudget{FetchRunWindowPages: 4}, + } + + acc := dedupAccum{ + pageDirty: roaring.New(), + pageEmpty: roaring.New(), + exportedSize: 7, // pre-existing value compareBlock must not touch + } + + err := cfg.compareBlock(t.Context(), 0, &acc) + require.NoError(t, err) + + // The four all-zero pages land in pageEmpty; nothing is stored as current. + require.EqualValues(t, 4, acc.pageEmpty.GetCardinality()) + require.EqualValues(t, 0, acc.pageDirty.GetCardinality()) + require.EqualValues(t, 0, acc.currentStoredPages, "all-empty block stores no current pages") + require.EqualValues(t, 0, acc.promotedBlocks) + require.EqualValues(t, 7, acc.exportedSize, "compareBlock does not touch exportedSize") +} + +// compareBlock accumulates across multiple calls into the same accumulator. +func TestCompareBlock_AccumulatesAcrossCalls(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + blockSize := 4 * pageSize + srcBlock := make([]byte, blockSize) + + cfg := dedupConfig{ + src: func(int64) ([]byte, error) { return srcBlock, nil }, + blockSize: blockSize, + budget: DedupBudget{FetchRunWindowPages: 4}, + } + + acc := dedupAccum{pageDirty: roaring.New(), pageEmpty: roaring.New()} + + require.NoError(t, cfg.compareBlock(t.Context(), 0, &acc)) + require.NoError(t, cfg.compareBlock(t.Context(), blockSize, &acc)) + + // Two all-zero blocks => 8 distinct empty page indices accumulated. + require.EqualValues(t, 8, acc.pageEmpty.GetCardinality()) +} + +func TestParentKeyGroups(t *testing.T) { + t.Parallel() + + keyA := dedupFetchKey{sourceType: parentFetchSource, window: 1} + keyB := dedupFetchKey{sourceType: parentFetchSource, window: 2} + + pages := []dedupPageInfo{ + {kind: dedupPageParent, key: keyA}, + {kind: dedupPageCurrent}, + {kind: dedupPageParent, key: keyB}, + {kind: dedupPageParent, key: keyA}, + {kind: dedupPageEmpty}, + } + + // Groups are emitted ordered by their first page index, so the order is + // stable across runs despite the underlying map. keyA's first index (0) + // precedes keyB's (2). + got := slices.Collect(parentKeyGroups(pages)) + require.Equal(t, [][]int{{0, 3}, {2}}, got) +} + +// parentKeyPage returns a parent page whose fetch window is derived from +// byteOff, so count() groups same-window parents together. +func parentKeyPage(buildID uuid.UUID, byteOff int64, windowPages int) dedupPageInfo { + return dedupPageInfo{ + kind: dedupPageParent, + key: dedupFetchKey{ + sourceType: parentFetchSource, + buildID: buildID, + window: int(byteOff / (int64(windowPages) * header.PageSize)), + }, + } +} + +func TestFetchWindowerCount(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + t.Run("empty pages contribute no windows", func(t *testing.T) { + t.Parallel() + + require.Equal(t, 0, w.count([]dedupPageInfo{emptyPage(), emptyPage()})) + }) + + t.Run("parents in same window collapse to one", func(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, header.PageSize, windowPages), + } + require.Equal(t, 1, w.count(pages)) + }) + + t.Run("parents in different windows count separately", func(t *testing.T) { + t.Parallel() + + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + } + require.Equal(t, 2, w.count(pages)) + }) +} + +func TestFetchWindowerBestParentRun(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + + t.Run("no parents yields nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{currentPage(), currentPage()} + require.Nil(t, w.bestParentRun(pages, 4)) + }) + + t.Run("budget too small to remove a window yields nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two parents in distinct windows; promoting one of them costs 1 but the + // other still needs its window, and the promoted page joins a current + // window, so a single promotion does not reduce the total. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + } + require.Nil(t, w.bestParentRun(pages, 0)) + }) + + t.Run("falls back to key grouping for non-adjacent parents", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two parents sharing one window but separated by a current page: no + // contiguous run covers both, so the key-group fallback selects them. + sharedOff0 := int64(0) + sharedOff1 := int64(header.PageSize) + pages := []dedupPageInfo{ + parentKeyPage(build, sharedOff0, windowPages), + currentPage(), + parentKeyPage(build, sharedOff1, windowPages), + } + + got := w.bestParentRun(pages, 4) + require.Equal(t, []int{0, 2}, got) + }) +} + +func TestRecordBlockPages(t *testing.T) { + t.Parallel() + + pageSize := int64(header.PageSize) + + tests := []struct { + name string + absOff int64 + pages []dedupPageInfo + wantDirty []uint32 + wantEmpty []uint32 + wantStored int64 + }{ + { + name: "empty input writes nothing", + absOff: 0, + pages: nil, + wantDirty: nil, + wantEmpty: nil, + wantStored: 0, + }, + { + name: "mixed kinds route to the right bitmap", + absOff: 0, + pages: []dedupPageInfo{ + emptyPage(), // page 0 -> empty + currentPage(), // page 1 -> dirty + parentPage(), // page 2 -> neither (deduped) + currentPage(), // page 3 -> dirty + }, + wantDirty: []uint32{1, 3}, + wantEmpty: []uint32{0}, + wantStored: 2, + }, + { + name: "absOff offsets the page index", + absOff: 4 * pageSize, // base page index 4 + pages: []dedupPageInfo{ + currentPage(), // -> index 4 + emptyPage(), // -> index 5 + }, + wantDirty: []uint32{4}, + wantEmpty: []uint32{5}, + wantStored: 1, + }, + { + name: "all parents store nothing", + absOff: 0, + pages: []dedupPageInfo{parentPage(), parentPage()}, + // parents go into neither bitmap and are not counted. + wantStored: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + dirty := roaring.New() + empty := roaring.New() + + stored := recordBlockPages(tt.absOff, tt.pages, dirty, empty) + + require.Equal(t, tt.wantStored, stored) + require.ElementsMatch(t, tt.wantDirty, dirty.ToArray()) + require.ElementsMatch(t, tt.wantEmpty, empty.ToArray()) + }) + } +} + +func TestFetchWindowerCompact(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + + t.Run("non-positive maxWindows promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{parentKeyPage(build, 0, windowPages)} + require.Equal(t, 0, w.compact(pages, 0, 4)) + require.Equal(t, dedupPageParent, pages[0].kind, "page must stay parent") + }) + + t.Run("non-positive maxPromoted promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + pages := []dedupPageInfo{parentKeyPage(build, 0, windowPages)} + require.Equal(t, 0, w.compact(pages, 1, 0)) + require.Equal(t, dedupPageParent, pages[0].kind) + }) + + t.Run("already within window budget promotes nothing", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // A single parent window already satisfies maxWindows=1. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, header.PageSize, windowPages), + } + require.Equal(t, 0, w.compact(pages, 1, 4)) + require.Equal(t, dedupPageParent, pages[0].kind) + require.Equal(t, dedupPageParent, pages[1].kind) + }) + + t.Run("promotes a parent run to meet the window budget", func(t *testing.T) { + t.Parallel() + + w := fetchWindower{windowPages: windowPages, currentStart: 0} + // Two current pages in distinct windows plus one parent: two windows + // total, over maxWindows=1. Promoting the lone parent removes its + // parent window by folding it into a current window. + pages := []dedupPageInfo{ + currentPage(), + parentKeyPage(build, 0, windowPages), + currentPage(), + } + + promoted := w.compact(pages, 1, 4) + require.Positive(t, promoted) + require.Equal(t, dedupPageCurrent, pages[1].kind, "parent must be promoted") + require.Equal(t, dedupFetchKey{}, pages[1].key, "promoted key is cleared") + }) +} + +func TestFetchWindowerCountAfter(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + // One parent window plus one current window => two windows. + pages := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + currentPage(), + } + require.Equal(t, 2, w.count(pages)) + + // Promoting the parent folds it into the current window: one window left. + require.Equal(t, 1, w.countAfter(pages, []int{0})) + + // countAfter must not mutate the input slice. + require.Equal(t, dedupPageParent, pages[0].kind, "input page kind unchanged") + require.Equal(t, + dedupFetchKey{sourceType: parentFetchSource, buildID: build, window: 0}, + pages[0].key, + "input page key unchanged", + ) +} + +func TestFetchWindowerBestByRatio(t *testing.T) { + t.Parallel() + + windowPages := 4 + build := uuid.New() + w := fetchWindower{windowPages: windowPages, currentStart: 0} + + // current, parent (window 0), current: the parent is its own fetch window, + // so promoting it into the surrounding current window removes one window. + pages := []dedupPageInfo{ + currentPage(), + parentKeyPage(build, 0, windowPages), + currentPage(), + } + before := w.count(pages) + require.Equal(t, 2, before) + + t.Run("candidate over budget is skipped", func(t *testing.T) { + t.Parallel() + + got := w.bestByRatio(pages, 0, before, slices.Values([][]int{{1}})) + require.Nil(t, got, "cost 1 exceeds budget 0") + }) + + t.Run("beneficial candidate within budget is selected", func(t *testing.T) { + t.Parallel() + + got := w.bestByRatio(pages, windowPages, before, slices.Values([][]int{{1}})) + require.Equal(t, []int{1}, got) + }) + + t.Run("zero-benefit candidate is skipped", func(t *testing.T) { + t.Parallel() + + // Index 0 is already current; promoting it removes no fetch window. + got := w.bestByRatio(pages, windowPages, before, slices.Values([][]int{{0}})) + require.Nil(t, got) + }) + + t.Run("over-budget run is clamped to an affordable, beneficial prefix", func(t *testing.T) { + t.Parallel() + + // Three parent pages in three distinct single-page windows: the whole + // run costs 3 but budget is 2. Clamping to the first two folds them into + // one current window, leaving one parent window (3 -> 2 windows). + run := []dedupPageInfo{ + parentKeyPage(build, 0, windowPages), + parentKeyPage(build, int64(windowPages)*header.PageSize, windowPages), + parentKeyPage(build, 2*int64(windowPages)*header.PageSize, windowPages), + } + got := w.bestByRatio(run, 2, w.count(run), slices.Values([][]int{{0, 1, 2}})) + require.Equal(t, []int{0, 1}, got) + }) +} + +// classifyPage maps a source page to empty/parent/current based on the parent +// header mapping, the cache-peek (best-effort) check, and the base bytes. +func TestClassifyPage(t *testing.T) { + t.Parallel() + + const windowPages = 4 + pageSize := int64(header.PageSize) + build := uuid.New() + + zeroPage := make([]byte, header.PageSize) + nonZero := bytes.Repeat([]byte{0xAB}, header.PageSize) + budget := DedupBudget{FetchRunWindowPages: windowPages} + + t.Run("zero source page is empty without reading base", func(t *testing.T) { + t.Parallel() + + base := &fakeOriginalDevice{data: make([]byte, pageSize)} + cfg := dedupConfig{base: base, baseHeader: base.Header(), blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), zeroPage, 0) + require.NoError(t, err) + require.Equal(t, dedupPageEmpty, info.kind) + require.Zero(t, base.reads, "zero fast path must not read base") + }) + + t.Run("nil-build parent hole is current without reading base", func(t *testing.T) { + t.Parallel() + + hdr, err := header.NewHeader( + header.NewTemplateMetadata(uuid.Nil, uint64(pageSize), uint64(pageSize)), + []header.BuildMap{{Offset: 0, Length: uint64(pageSize), BuildId: uuid.Nil}}, + ) + require.NoError(t, err) + base := &fakeOriginalDevice{data: bytes.Repeat([]byte{0xFF}, int(pageSize)), hdr: hdr} + cfg := dedupConfig{base: base, baseHeader: hdr, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Zero(t, base.reads, "nil-build hole must not read base") + }) + + t.Run("page matching mapped parent is keyed by build and storage window", func(t *testing.T) { + t.Parallel() + + hdr, err := header.NewHeader( + header.NewTemplateMetadata(build, uint64(pageSize), uint64(pageSize)), + []header.BuildMap{{Offset: 0, Length: uint64(pageSize), BuildId: build}}, + ) + require.NoError(t, err) + base := &fakeOriginalDevice{data: nonZero, hdr: hdr} + cfg := dedupConfig{base: base, baseHeader: hdr, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageParent, info.kind) + require.Equal(t, + dedupFetchKey{sourceType: parentFetchSource, buildID: build, window: 0}, + info.key, + ) + require.Equal(t, 1, base.reads, "matching path reads base once") + }) + + t.Run("page matching unmapped parent is keyed by offset window", func(t *testing.T) { + t.Parallel() + + pageOff := int64(8) * pageSize + data := make([]byte, 9*pageSize) + copy(data[pageOff:], nonZero) + base := &fakeOriginalDevice{data: data} // nil header -> no mapping + cfg := dedupConfig{base: base, baseHeader: nil, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, pageOff) + require.NoError(t, err) + require.Equal(t, dedupPageParent, info.kind) + require.Equal(t, + dedupFetchKey{ + sourceType: parentFetchSource, + window: int(pageOff / (int64(windowPages) * pageSize)), + }, + info.key, + ) + }) + + t.Run("page differing from base is current", func(t *testing.T) { + t.Parallel() + + base := &fakeOriginalDevice{data: bytes.Repeat([]byte{0x11}, int(pageSize))} + cfg := dedupConfig{base: base, baseHeader: nil, blockSize: pageSize, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Equal(t, 1, base.reads, "diff path reads base once") + }) + + t.Run("best-effort uncached page is current without reading base", func(t *testing.T) { + t.Parallel() + + base := &peekingOriginalDevice{fakeOriginalDevice: fakeOriginalDevice{data: nonZero}, cached: false} + cfg := dedupConfig{base: base, baseHeader: nil, peeker: base, blockSize: pageSize, bestEffort: true, budget: budget} + + info, err := cfg.classifyPage(t.Context(), nonZero, 0) + require.NoError(t, err) + require.Equal(t, dedupPageCurrent, info.kind) + require.Zero(t, base.reads, "uncached best-effort page must not read base") + }) +} diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go index dce7b371da..ebef339b47 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd.go @@ -246,6 +246,7 @@ func NewCacheFromMemfdDeduped( dirty *roaring.Bitmap, bestEffort bool, directIO bool, + budget DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (*DedupedMemfdCache, error) { @@ -258,7 +259,7 @@ func NewCacheFromMemfdDeduped( cancel: cancel, done: utils.NewSetOnce[*Cache](), } - go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, inputEmpty, metaOut) + go d.runDedup(drainCtx, base, blockSize, memfd, dirty, bestEffort, directIO, budget, inputEmpty, metaOut) return d, nil } @@ -270,6 +271,7 @@ func (d *DedupedMemfdCache) runDedup( memfd *Memfd, dirty *roaring.Bitmap, bestEffort, directIO bool, + budget DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) { @@ -279,7 +281,7 @@ func (d *DedupedMemfdCache) runDedup( src := func(absOff int64) ([]byte, error) { return memfd.Slice(absOff, blockSize) } compareStart := time.Now() - plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort) + plan, err := dedupCompare(ctx, src, base, dirty, blockSize, bestEffort, budget) compareDur := time.Since(compareStart) if err != nil { logSetOnceErr(ctx, "dedup metaOut", metaOut.SetError(err)) @@ -308,6 +310,8 @@ func (d *DedupedMemfdCache) runDedup( plan.exportedSize/header.PageSize, int64(plan.pageDirty.GetCardinality()), int64(plan.pageEmpty.GetCardinality()), + plan.promotedBlocks, + plan.promotedPages, compareDur, writeDur, ) logSetOnceErr(ctx, "dedup done", d.done.SetResult(cache, err)) diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go index d11a9c2d5b..d7e4a66278 100644 --- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go +++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go @@ -227,7 +227,7 @@ func TestNewCacheFromMemfdDeduped_DetachesCompareAndDrain(t *testing.T) { metaOut := utils.NewSetOnce[*header.DiffMetadata]() cache, err := NewCacheFromMemfdDeduped( ctx, &fakeOriginalDevice{data: baseData}, pageSize, cachePath, memfd, dirty, false, false, - nil, metaOut, + DedupBudget{}, nil, metaOut, ) require.NoError(t, err) t.Cleanup(func() { _ = cache.Close() }) diff --git a/packages/orchestrator/pkg/sandbox/fc/memory.go b/packages/orchestrator/pkg/sandbox/fc/memory.go index 6b2b25df1d..58a63488f3 100644 --- a/packages/orchestrator/pkg/sandbox/fc/memory.go +++ b/packages/orchestrator/pkg/sandbox/fc/memory.go @@ -80,6 +80,7 @@ func (p *Process) ExportMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, + dedupBudget block.DedupBudget, inputEmpty *roaring.Bitmap, metaOut *utils.SetOnce[*header.DiffMetadata], ) (_ block.DiffSource, e error) { @@ -98,7 +99,7 @@ func (p *Process) ExportMemory( if memfd != nil { if originalMemfile != nil { return block.NewCacheFromMemfdDeduped(ctx, originalMemfile, blockSize, cachePath, memfd, include, - dedupBestEffort, dedupDirectIO, inputEmpty, metaOut) + dedupBestEffort, dedupDirectIO, dedupBudget, inputEmpty, metaOut) } var ( src block.DiffSource @@ -131,7 +132,7 @@ func (p *Process) ExportMemory( return cache, nil } // .dedup suffix avoids clobbering the source mmap during truncate. - dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO) + dedupCache, meta, err := cache.Dedup(ctx, originalMemfile, include, blockSize, cachePath+".dedup", dedupBestEffort, dedupDirectIO, dedupBudget) if err != nil { return nil, fmt.Errorf("dedup memfile diff: %w", errors.Join(err, cache.Close())) } diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index f57fa26bd4..302c8dc138 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -1146,11 +1146,17 @@ func (s *Sandbox) Pause( // Start POSTPROCESSING var dedupBase block.ReadonlyDevice var dedupBestEffort, dedupDirectIO bool + var dedupBudget block.DedupBudget dedupCfg := s.featureFlags.JSONFlag(ctx, featureflags.MemfileDiffDedupFlag, sandboxLDContext(s.Runtime, s.Config)).AsValueMap() if dedupCfg.Get("enabled").BoolValue() { dedupBase = originalMemfile dedupBestEffort = dedupCfg.Get("bestEffort").BoolValue() dedupDirectIO = dedupCfg.Get("directIO").BoolValue() + dedupBudget = block.DedupBudget{ + MaxFetchWindowsPerBlock: dedupCfg.Get("maxFetchWindowsPerBlock").IntValue(), + MaxPromotedParentPagesPerBlock: dedupCfg.Get("maxPromotedParentPagesPerBlock").IntValue(), + FetchRunWindowPages: dedupCfg.Get("fetchRunWindowPages").IntValue(), + } } memfileDiff, memfileDiffHeader, err := pauseProcessMemory( ctx, @@ -1164,6 +1170,7 @@ func (s *Sandbox) Pause( dedupBase, dedupBestEffort, dedupDirectIO, + dedupBudget, ) if err != nil { return nil, fmt.Errorf("error while post processing: %w", err) @@ -1237,6 +1244,7 @@ func pauseProcessMemory( originalMemfile block.ReadonlyDevice, dedupBestEffort bool, dedupDirectIO bool, + dedupBudget block.DedupBudget, ) (d build.Diff, h *DiffHeader, e error) { ctx, span := tracer.Start(ctx, "process-memory") defer span.End() @@ -1246,7 +1254,7 @@ func pauseProcessMemory( // ExportMemory owns memfd and closes it on all paths. cache, err := fc.ExportMemory( ctx, diffMetadata.Dirty, memfileDiffPath, diffMetadata.BlockSize, memfd, bgCopy, - originalMemfile, dedupBestEffort, dedupDirectIO, diffMetadata.Empty, metaOut, + originalMemfile, dedupBestEffort, dedupDirectIO, dedupBudget, diffMetadata.Empty, metaOut, ) if err != nil { return nil, nil, fmt.Errorf("failed to export memory: %w", err) diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 068308e811..95c9986226 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -140,12 +140,15 @@ var ( MemfdBackgroundCopyFlag = NewBoolFlag("memfd-background-copy", false) // MemfileDiffDedupFlag enables 4 KiB-page dedup of the memfile diff - // against the base memfile. bestEffort skips uncached blocks; - // directIO opens the dedup output with O_DIRECT. + // against the base memfile. bestEffort skips uncached blocks; directIO + // opens the dedup output with O_DIRECT. MemfileDiffDedupFlag = NewJSONFlag("memfile-diff-dedup", ldvalue.FromJSONMarshal(map[string]any{ - "enabled": false, - "bestEffort": false, - "directIO": false, + "enabled": false, + "bestEffort": false, + "directIO": false, + "maxFetchWindowsPerBlock": 0, + "maxPromotedParentPagesPerBlock": 0, + "fetchRunWindowPages": 0, })) // PeerToPeerChunkTransferFlag enables peer-to-peer chunk routing. diff --git a/packages/shared/pkg/storage/header/header.go b/packages/shared/pkg/storage/header/header.go index b1852e2ff9..c3dd74c0d5 100644 --- a/packages/shared/pkg/storage/header/header.go +++ b/packages/shared/pkg/storage/header/header.go @@ -150,6 +150,10 @@ func (t *Header) IsNormalizeFixApplied() bool { // The read path uses this to find which build owns the data, then calls // GetBuildFrameData to get the FrameTable for C-space lookup. func (t *Header) GetShiftedMapping(ctx context.Context, offset int64) (BuildMap, error) { + if t == nil { + return BuildMap{}, errors.New("nil header") + } + mapping, shift, err := t.getMapping(ctx, offset) if err != nil { return BuildMap{}, err