Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 123 additions & 81 deletions packages/orchestrator/pkg/sandbox/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

"github.com/google/uuid"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block"
blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
Expand Down Expand Up @@ -54,115 +56,155 @@ func (b *File) SwapHeader(h *header.Header) {
b.header.Store(h)
}

func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
// Cache some resolved Diffs per BuildId for the duration of one ReadAt to avoid
// hitting the DiffStore TTL cache (and its mutex) on every iteration.
// ReadAt fills p from the mapped build segments, optionally in parallel.
// Cache eviction or a peer transition re-resolves and retries.
func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
maxParallel := b.store.flags.IntFlag(ctx, featureflags.MaxParallelBuildReadSegments)

for {
segments, n, err := b.planRead(ctx, p, off)
if err == nil {
err = b.readSegments(ctx, p, segments, maxParallel)
}
if err == nil {
// Fewer bytes than requested means the mappings ran out: report the
// bytes filled so far with io.EOF, matching io.ReaderAt semantics.
if n < len(p) {
return n, io.EOF
}

return n, nil
}

// A Diff can be evicted and closed between planning and reading. Re-plan
// the whole read; reads are idempotent, so re-filling already-written
// regions is safe and getBuild re-resolves the closed Diff.
var closed *block.CacheClosedError
if errors.As(err, &closed) {
continue
}
// A peer transition swaps the header to the finalized one; retry against it.
if retry, swapErr := b.retryOnTransition(ctx, err); retry {
continue
} else if swapErr != nil {
return 0, swapErr
}

return 0, err
}
}

type readSegment struct {
dstOff int
srcOff int64
length int64
diff Diff
ft *storage.FrameTable
}

func (b *File) readSegments(ctx context.Context, p []byte, segments []readSegment, maxParallel int) error {
if maxParallel > 1 && len(segments) > 1 {
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(maxParallel)
for _, s := range segments {
seg := s
g.Go(func() error { return b.readSegment(gctx, p, seg) })
}

return g.Wait()
}

for _, s := range segments {
if err := b.readSegment(ctx, p, s); err != nil {
return err
}
}

return nil
}

func (b *File) readSegment(ctx context.Context, p []byte, s readSegment) error {
n, err := s.diff.ReadAt(ctx, p[s.dstOff:s.dstOff+int(s.length)], s.srcOff, s.ft)
if err != nil {
return err
}
if int64(n) != s.length {
return io.ErrUnexpectedEOF
}

return nil
}

// planRead resolves the segments covering p, zero-filling uuid.Nil regions.
// A returned byte count below len(p) means the mappings ran out (EOF).
func (b *File) planRead(ctx context.Context, p []byte, off int64) ([]readSegment, int, error) {
// Per-read Diff cache: avoids the DiffStore TTL cache mutex on every mapping.
const buildCacheSize = 16
var (
underlyingIDs [buildCacheSize]uuid.UUID
underlyingDiffs [buildCacheSize]Diff
cacheIDs = underlyingIDs[:0]
cacheDiffs = underlyingDiffs[:0]
segments []readSegment
)

var n int
for n < len(p) {
h := b.Header()

// Find out what build and ranghe we need to read, from mappings.
mappedToBuild, err := h.GetShiftedMapping(ctx, off+int64(n))
if err != nil {
return 0, fmt.Errorf("failed to get mapping: %w", err)
return nil, 0, fmt.Errorf("failed to get mapping: %w", err)
}

remainingReadLength := int64(len(p)) - int64(n)
readLength := min(int64(mappedToBuild.Length), remainingReadLength)

readLength := min(int64(mappedToBuild.Length), int64(len(p)-n))
// A zero-length mapping means off+n is past the last mapping (EOF); stop
// and let the caller surface io.EOF for the bytes covered so far.
if readLength <= 0 {
logger.L().Error(ctx, fmt.Sprintf(
"(%d bytes left to read, off %d) reading %d bytes from %+v/%+v: [%d:] -> [%d:%d] <> %d (mapped length: %d, remaining read length: %d)\n>>> EOF\n",
len(p)-n,
off,
readLength,
mappedToBuild.BuildId,
b.fileType,
mappedToBuild.Offset,
n,
int64(n)+readLength,
n,
mappedToBuild.Length,
remainingReadLength,
))

return n, io.EOF
return segments, n, nil
}

// uuid.Nil marks an unmapped/empty region; zero-fill it in place.
if mappedToBuild.BuildId == uuid.Nil {
clear(p[n : int64(n)+readLength])
clear(p[n : n+int(readLength)])
n += int(readLength)

continue
}

size := b.buildFileSize(h, mappedToBuild.BuildId)
ft := h.GetBuildFrameData(mappedToBuild.BuildId)

// Find the build in the caches.
var mappedBuild Diff
hitIdx := -1
for i, id := range cacheIDs {
if id == mappedToBuild.BuildId {
mappedBuild = cacheDiffs[i]
hitIdx = i

break
}
}
if mappedBuild == nil {
mappedBuild, err = b.getBuild(ctx, mappedToBuild.BuildId, size, ft.CompressionType())
if err != nil {
return 0, fmt.Errorf("failed to get build: %w", err)
}
if len(cacheIDs) < cap(cacheIDs) {
cacheIDs = append(cacheIDs, mappedToBuild.BuildId)
cacheDiffs = append(cacheDiffs, mappedBuild)
hitIdx = len(cacheIDs) - 1
}
}

// Read from that build, and handle the various retries.
buildN, err := mappedBuild.ReadAt(ctx,
p[n:int64(n)+readLength],
int64(mappedToBuild.Offset),
ft,
)
diff, err := b.cachedBuild(ctx, h, mappedToBuild.BuildId, &cacheIDs, &cacheDiffs)
if err != nil {
// Cache may have evicted+closed the Diff between resolve and ReadAt;
// drop the stale entry and re-resolve on the next iteration.
var closed *block.CacheClosedError
if errors.As(err, &closed) {
if hitIdx >= 0 {
last := len(cacheIDs) - 1
cacheIDs[hitIdx] = cacheIDs[last]
cacheDiffs[hitIdx] = cacheDiffs[last]
cacheIDs = cacheIDs[:last]
cacheDiffs = cacheDiffs[:last]
}
return nil, 0, err
}
segments = append(segments, readSegment{
dstOff: n,
srcOff: int64(mappedToBuild.Offset),
length: readLength,
diff: diff,
ft: h.GetBuildFrameData(mappedToBuild.BuildId),
})
n += int(readLength)
}

continue
}
if retry, swapErr := b.retryOnTransition(ctx, err); retry {
continue
} else if swapErr != nil {
return 0, swapErr
}
return segments, n, nil
}

return 0, fmt.Errorf("failed to read from source: %w", err)
func (b *File) cachedBuild(ctx context.Context, h *header.Header, buildID uuid.UUID, ids *[]uuid.UUID, diffs *[]Diff) (Diff, error) {
for i, id := range *ids {
if id == buildID {
return (*diffs)[i], nil
}
}

n += buildN
// CompressionType is nil-safe (nil frame table -> CompressionNone).
ct := h.GetBuildFrameData(buildID).CompressionType()
diff, err := b.getBuild(ctx, buildID, b.buildFileSize(h, buildID), ct)
if err != nil {
return nil, fmt.Errorf("failed to get build: %w", err)
}
if len(*ids) < cap(*ids) {
*ids = append(*ids, buildID)
*diffs = append(*diffs, diff)
}

return n, nil
return diff, nil
}

// Slice returns [off, off+length). Zero-copy when the range fits in a
Expand Down
4 changes: 4 additions & 0 deletions packages/shared/pkg/featureflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ var (
MaxConcurrentSnapshotBuildQueries = NewIntFlag("max-concurrent-snapshot-build-queries", 0)

MinChunkerReadSizeKB = NewIntFlag("min-chunker-read-size-kb", 16)

// MaxParallelBuildReadSegments limits concurrent backing reads within one fragmented build read.
// 1 or lower keeps the existing serial path.
MaxParallelBuildReadSegments = NewIntFlag("max-parallel-build-read-segments", 1)
)

// ReclaimConfigFlag holds per-step caps in milliseconds for the pre-pause
Expand Down
Loading