diff --git a/packages/orchestrator/pkg/sandbox/build/build.go b/packages/orchestrator/pkg/sandbox/build/build.go index 605a103b21..c4814e9558 100644 --- a/packages/orchestrator/pkg/sandbox/build/build.go +++ b/packages/orchestrator/pkg/sandbox/build/build.go @@ -12,9 +12,11 @@ import ( "github.com/google/uuid" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block" blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" @@ -54,115 +56,155 @@ func (b *File) SwapHeader(h *header.Header) { b.header.Store(h) } -func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { - // Cache some resolved Diffs per BuildId for the duration of one ReadAt to avoid - // hitting the DiffStore TTL cache (and its mutex) on every iteration. +// ReadAt fills p from the mapped build segments, optionally in parallel. +// Cache eviction or a peer transition re-resolves and retries. +func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { + maxParallel := b.store.flags.IntFlag(ctx, featureflags.MaxParallelBuildReadSegments) + + for { + segments, n, err := b.planRead(ctx, p, off) + if err == nil { + err = b.readSegments(ctx, p, segments, maxParallel) + } + if err == nil { + // Fewer bytes than requested means the mappings ran out: report the + // bytes filled so far with io.EOF, matching io.ReaderAt semantics. + if n < len(p) { + return n, io.EOF + } + + return n, nil + } + + // A Diff can be evicted and closed between planning and reading. Re-plan + // the whole read; reads are idempotent, so re-filling already-written + // regions is safe and getBuild re-resolves the closed Diff. + var closed *block.CacheClosedError + if errors.As(err, &closed) { + continue + } + // A peer transition swaps the header to the finalized one; retry against it. + if retry, swapErr := b.retryOnTransition(ctx, err); retry { + continue + } else if swapErr != nil { + return 0, swapErr + } + + return 0, err + } +} + +type readSegment struct { + dstOff int + srcOff int64 + length int64 + diff Diff + ft *storage.FrameTable +} + +func (b *File) readSegments(ctx context.Context, p []byte, segments []readSegment, maxParallel int) error { + if maxParallel > 1 && len(segments) > 1 { + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(maxParallel) + for _, s := range segments { + seg := s + g.Go(func() error { return b.readSegment(gctx, p, seg) }) + } + + return g.Wait() + } + + for _, s := range segments { + if err := b.readSegment(ctx, p, s); err != nil { + return err + } + } + + return nil +} + +func (b *File) readSegment(ctx context.Context, p []byte, s readSegment) error { + n, err := s.diff.ReadAt(ctx, p[s.dstOff:s.dstOff+int(s.length)], s.srcOff, s.ft) + if err != nil { + return err + } + if int64(n) != s.length { + return io.ErrUnexpectedEOF + } + + return nil +} + +// planRead resolves the segments covering p, zero-filling uuid.Nil regions. +// A returned byte count below len(p) means the mappings ran out (EOF). +func (b *File) planRead(ctx context.Context, p []byte, off int64) ([]readSegment, int, error) { + // Per-read Diff cache: avoids the DiffStore TTL cache mutex on every mapping. const buildCacheSize = 16 var ( underlyingIDs [buildCacheSize]uuid.UUID underlyingDiffs [buildCacheSize]Diff cacheIDs = underlyingIDs[:0] cacheDiffs = underlyingDiffs[:0] + segments []readSegment ) + var n int for n < len(p) { h := b.Header() - - // Find out what build and ranghe we need to read, from mappings. mappedToBuild, err := h.GetShiftedMapping(ctx, off+int64(n)) if err != nil { - return 0, fmt.Errorf("failed to get mapping: %w", err) + return nil, 0, fmt.Errorf("failed to get mapping: %w", err) } - - remainingReadLength := int64(len(p)) - int64(n) - readLength := min(int64(mappedToBuild.Length), remainingReadLength) - + readLength := min(int64(mappedToBuild.Length), int64(len(p)-n)) + // A zero-length mapping means off+n is past the last mapping (EOF); stop + // and let the caller surface io.EOF for the bytes covered so far. if readLength <= 0 { - logger.L().Error(ctx, fmt.Sprintf( - "(%d bytes left to read, off %d) reading %d bytes from %+v/%+v: [%d:] -> [%d:%d] <> %d (mapped length: %d, remaining read length: %d)\n>>> EOF\n", - len(p)-n, - off, - readLength, - mappedToBuild.BuildId, - b.fileType, - mappedToBuild.Offset, - n, - int64(n)+readLength, - n, - mappedToBuild.Length, - remainingReadLength, - )) - - return n, io.EOF + return segments, n, nil } - + // uuid.Nil marks an unmapped/empty region; zero-fill it in place. if mappedToBuild.BuildId == uuid.Nil { - clear(p[n : int64(n)+readLength]) + clear(p[n : n+int(readLength)]) n += int(readLength) continue } - size := b.buildFileSize(h, mappedToBuild.BuildId) - ft := h.GetBuildFrameData(mappedToBuild.BuildId) - - // Find the build in the caches. - var mappedBuild Diff - hitIdx := -1 - for i, id := range cacheIDs { - if id == mappedToBuild.BuildId { - mappedBuild = cacheDiffs[i] - hitIdx = i - - break - } - } - if mappedBuild == nil { - mappedBuild, err = b.getBuild(ctx, mappedToBuild.BuildId, size, ft.CompressionType()) - if err != nil { - return 0, fmt.Errorf("failed to get build: %w", err) - } - if len(cacheIDs) < cap(cacheIDs) { - cacheIDs = append(cacheIDs, mappedToBuild.BuildId) - cacheDiffs = append(cacheDiffs, mappedBuild) - hitIdx = len(cacheIDs) - 1 - } - } - - // Read from that build, and handle the various retries. - buildN, err := mappedBuild.ReadAt(ctx, - p[n:int64(n)+readLength], - int64(mappedToBuild.Offset), - ft, - ) + diff, err := b.cachedBuild(ctx, h, mappedToBuild.BuildId, &cacheIDs, &cacheDiffs) if err != nil { - // Cache may have evicted+closed the Diff between resolve and ReadAt; - // drop the stale entry and re-resolve on the next iteration. - var closed *block.CacheClosedError - if errors.As(err, &closed) { - if hitIdx >= 0 { - last := len(cacheIDs) - 1 - cacheIDs[hitIdx] = cacheIDs[last] - cacheDiffs[hitIdx] = cacheDiffs[last] - cacheIDs = cacheIDs[:last] - cacheDiffs = cacheDiffs[:last] - } + return nil, 0, err + } + segments = append(segments, readSegment{ + dstOff: n, + srcOff: int64(mappedToBuild.Offset), + length: readLength, + diff: diff, + ft: h.GetBuildFrameData(mappedToBuild.BuildId), + }) + n += int(readLength) + } - continue - } - if retry, swapErr := b.retryOnTransition(ctx, err); retry { - continue - } else if swapErr != nil { - return 0, swapErr - } + return segments, n, nil +} - return 0, fmt.Errorf("failed to read from source: %w", err) +func (b *File) cachedBuild(ctx context.Context, h *header.Header, buildID uuid.UUID, ids *[]uuid.UUID, diffs *[]Diff) (Diff, error) { + for i, id := range *ids { + if id == buildID { + return (*diffs)[i], nil } + } - n += buildN + // CompressionType is nil-safe (nil frame table -> CompressionNone). + ct := h.GetBuildFrameData(buildID).CompressionType() + diff, err := b.getBuild(ctx, buildID, b.buildFileSize(h, buildID), ct) + if err != nil { + return nil, fmt.Errorf("failed to get build: %w", err) + } + if len(*ids) < cap(*ids) { + *ids = append(*ids, buildID) + *diffs = append(*diffs, diff) } - return n, nil + return diff, nil } // Slice returns [off, off+length). Zero-copy when the range fits in a diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 28b6f8e370..1d5b27bacc 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -268,6 +268,10 @@ var ( MaxConcurrentSnapshotBuildQueries = NewIntFlag("max-concurrent-snapshot-build-queries", 0) MinChunkerReadSizeKB = NewIntFlag("min-chunker-read-size-kb", 16) + + // MaxParallelBuildReadSegments limits concurrent backing reads within one fragmented build read. + // 1 or lower keeps the existing serial path. + MaxParallelBuildReadSegments = NewIntFlag("max-parallel-build-read-segments", 1) ) // ReclaimConfigFlag holds per-step caps in milliseconds for the pre-pause