Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/fetch_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"sync"
"sync/atomic"

"github.com/e2b-dev/infra/packages/shared/pkg/storage"
)

type fetchSession struct {
Expand All @@ -21,12 +23,23 @@ type fetchSession struct {
fetchErr error
done bool // true once terminated (success or error)

// source is the backend that served the fetch; set by runFetch.
source atomic.Int32

// bytesReady is the byte count (from chunkOff) up to which all blocks
// are fully written and marked cached. Atomic so registerAndWait can
// do a lock-free fast-path check: bytesReady only increases.
bytesReady atomic.Int64
}

func (s *fetchSession) setSource(src storage.Source) {
s.source.Store(int32(src))
}

func (s *fetchSession) Source() storage.Source {
return storage.Source(s.source.Load())
}

// contains reports whether the session covers the byte range [off, off+length).
func (s *fetchSession) contains(off, length int64) bool {
return s.chunkOff <= off && s.chunkOff+s.chunkLen >= off+length
Expand Down
14 changes: 13 additions & 1 deletion packages/orchestrator/pkg/sandbox/block/metrics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@ const (
orchestratorBlockSlices = "orchestrator.blocks.slices"
orchestratorBlockChunksFetch = "orchestrator.blocks.chunks.fetch"
orchestratorBlockChunksStore = "orchestrator.blocks.chunks.store"
orchestratorChunkSlice = "orchestrator.chunk.slice"
)

type Metrics struct {
// SlicesMetric is used to measure page faulting performance.
SlicesTimerFactory telemetry.TimerFactory

// WriteChunksMetric is used to measure the time taken to download chunks from remote storage
// RemoteReadsTimerFactory is used to measure the time taken to download chunks from remote storage
RemoteReadsTimerFactory telemetry.TimerFactory

// WriteChunksMetric is used to measure performance of writing chunks to disk.
WriteChunksTimerFactory telemetry.TimerFactory

ChunkSliceTimerFactory telemetry.FloatTimerFactory
}

func NewMetrics(meterProvider metric.MeterProvider) (Metrics, error) {
Expand Down Expand Up @@ -58,5 +61,14 @@ func NewMetrics(meterProvider metric.MeterProvider) (Metrics, error) {
return m, fmt.Errorf("failed to get stored chunks metric: %w", err)
}

if m.ChunkSliceTimerFactory, err = telemetry.NewFloatTimerFactory(
blocksMeter, orchestratorChunkSlice,
"Time taken by Chunker to serve a Slice() (source=mmap when served from cache)",
"Bytes returned",
"Slice call count",
); err != nil {
return m, fmt.Errorf("error creating chunk slice timer factory: %w", err)
}

return m, nil
}
92 changes: 78 additions & 14 deletions packages/orchestrator/pkg/sandbox/block/streaming_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Chunker struct {
metrics metrics.Metrics
fetchTimeout time.Duration
featureFlags *featureflags.Client
objType storage.SeekableObjectType

size int64

Expand All @@ -49,6 +50,7 @@ func NewChunker(
upstream storage.StreamingReader,
cachePath string,
metrics metrics.Metrics,
objType storage.SeekableObjectType,
) (*Chunker, error) {
cache, err := NewCache(size, blockSize, cachePath, false)
if err != nil {
Expand All @@ -62,6 +64,7 @@ func NewChunker(
metrics: metrics,
featureFlags: ff,
fetchTimeout: defaultFetchTimeout,
objType: objType,
}, nil
}

Expand All @@ -75,54 +78,66 @@ func (c *Chunker) ReadAt(ctx context.Context, b []byte, off int64, ft *storage.F
}

func (c *Chunker) Slice(ctx context.Context, off, length int64, ft *storage.FrameTable) ([]byte, error) {
ct := ft.CompressionType()
attrs := chunkerAttrs
if ft.IsCompressed() {
attrs = chunkerAttrsCompressed
}

sliceStart := time.Now()
timer := c.metrics.SlicesTimerFactory.Begin()

// Fast path: already cached
// Fast path: already cached.
b, err := c.cache.Slice(off, length)
if err == nil {
timer.RecordRaw(ctx, length, attrs.successFromCache)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), length, storage.OKAttrs(c.objType, storage.SourceMmap, ct))

return b, nil
}

if !errors.As(err, &BytesNotAvailableError{}) {
timer.RecordRaw(ctx, length, attrs.failCacheRead)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, storage.SourceMmap, ct, err))

return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err)
}

// Fetch every chunk the range spans (one fetch session per chunk).
var src storage.Source
end := off + length
for cur := off; cur < end; {
chunkOff, chunkLen, lerr := c.locateChunk(cur, ft)
if lerr != nil {
timer.RecordRaw(ctx, length, attrs.failRemoteFetch)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, src, ct, lerr))

return nil, fmt.Errorf("failed to locate chunk for offset %d: %w", cur, lerr)
}
chunkEnd := chunkOff + chunkLen
rangeEnd := min(end, chunkEnd)
if err := c.fetch(ctx, cur, rangeEnd-cur, ft); err != nil {
s, err := c.fetch(ctx, cur, rangeEnd-cur, ft)
if err != nil {
timer.RecordRaw(ctx, length, attrs.failRemoteFetch)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, s, ct, err))

return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", cur, rangeEnd, err)
}
src = max(src, s)
cur = chunkEnd
}

// sliceDirect skips isCached — the waiter already confirmed the data is in the mmap.
b, cacheErr := c.cache.sliceDirect(off, length)
if cacheErr != nil {
timer.RecordRaw(ctx, length, attrs.failLocalReadAgain)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, src, ct, cacheErr))

return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr)
}

timer.RecordRaw(ctx, length, attrs.successFromRemote)
c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), length, storage.OKAttrs(c.objType, src, ct))

return b, nil
}
Expand Down Expand Up @@ -161,15 +176,15 @@ func (c *Chunker) getOrCreateSession(ctx context.Context, off, length int64, ft
// fetch ensures the chunk for [off, off+length) is fetched and waits
// for every block the range spans (a span can cross block boundaries
// after dedup; waiting only on the start block leaves the tail unfetched).
func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.FrameTable) error {
func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.FrameTable) (storage.Source, error) {
chunkOff, chunkLen, err := c.locateChunk(off, ft)
if err != nil {
return fmt.Errorf("failed to locate chunk for offset %d: %w", off, err)
return storage.UnknownSource, fmt.Errorf("failed to locate chunk for offset %d: %w", off, err)
}

session, justGotCached := c.getOrCreateSession(ctx, chunkOff, chunkLen, ft)
if justGotCached {
return nil
return storage.SourceMmap, nil
}

blockSize := c.cache.BlockSize()
Expand All @@ -181,18 +196,25 @@ func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.Fram
break // tail belongs to the caller's next chunk fetch.
}
if err := session.registerAndWait(ctx, b); err != nil {
return err
return session.Source(), err
}
}

return nil
return session.Source(), nil
}

// runFetch fetches data from storage into the mmap cache. Runs in a background goroutine.
func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.FrameTable) {
ctx, cancel := context.WithTimeout(ctx, c.fetchTimeout)
defer cancel()

ctx, span := tracer.Start(ctx, "chunk.fetch")
defer span.End()
span.SetAttributes(
attribute.Int64("off", s.chunkOff),
attribute.Int64("len", s.chunkLen),
)

defer c.releaseSession(s)

// Unconditionally terminate the session on exit so registerAndWait
Expand All @@ -216,16 +238,26 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra
}
defer releaseLock()

ct := ft.CompressionType()

attrs := chunkerAttrs
if ft.IsCompressed() {
attrs = chunkerAttrsCompressed
}
fetchTimer := c.metrics.RemoteReadsTimerFactory.Begin()

readBytes, err := c.progressiveRead(ctx, s, mmapSlice, ft)
fetchStart := time.Now()

stats, src, open, err := c.progressiveRead(ctx, s, mmapSlice, ft)
var readBytes int64
if stats != nil {
readBytes = stats.UncompressedBytes
}
if err != nil {
fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteFailure)

storage.RecordReadFetch(ctx, time.Since(fetchStart), readBytes, storage.ErrAttrs(c.objType, src, ct, err))

s.fail(err)

return
Expand All @@ -236,24 +268,56 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra
// closing the TOCTOU window in getOrCreateSession.
c.cache.setIsCached(s.chunkOff, s.chunkLen)

fetchWall := time.Since(fetchStart)
fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteSuccess)
storage.RecordReadFetch(ctx, fetchWall, readBytes, storage.OKAttrs(c.objType, src, ct))

// fetch wall / (open + read + decompress); >1 = unaccounted overhead.
if stats != nil {
if work := open + stats.Read + stats.Decompress; work > 0 {
ratio := fetchWall.Seconds() / work.Seconds()
storage.RecordPipelineEfficiency(ctx, ratio, storage.OKAttrs(c.objType, src, ct))
}
}

s.setDone()
}

func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlice []byte, ft *storage.FrameTable) (totalRead int64, err error) {
reader, err := c.upstream.OpenRangeReader(ctx, s.chunkOff, s.chunkLen, ft)
func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlice []byte, ft *storage.FrameTable) (stats *storage.ReadStats, source storage.Source, open time.Duration, err error) {
doneInflight := storage.StartInflight(ctx, storage.InflightFetchAttrs(c.objType))
defer doneInflight()

openStart := time.Now()
reader, source, err := c.upstream.OpenRangeReader(ctx, s.chunkOff, s.chunkLen, ft)
open = time.Since(openStart)
s.setSource(source)
if err != nil {
return 0, fmt.Errorf("failed to open range reader at %d: %w", s.chunkOff, err)
return nil, source, open, fmt.Errorf("failed to open range reader at %d: %w", s.chunkOff, err)
}

defer func() {
if closeErr := reader.Close(ctx); closeErr != nil && err == nil {
var closeErr error
stats, closeErr = reader.Close(ctx)
if closeErr != nil && err == nil {
err = closeErr
}
if err != nil {
return
}

ct := ft.CompressionType()
okAttrs := storage.OKAttrs(c.objType, source, ct)

storage.RecordReadOpen(ctx, open, 0, okAttrs)
if stats != nil {
storage.RecordReadRead(ctx, stats.Read, stats.CompressedBytes, okAttrs)
}
}()

blockSize := c.cache.BlockSize()
readBatch := max(blockSize, int64(c.featureFlags.IntFlag(ctx, featureflags.MinChunkerReadSizeKB))*1024)

var totalRead int64
for totalRead < s.chunkLen {
// Read in batches of max(blockSize, minReadBatchSize) to align notification
// granularity with the read size and minimize lock/notify overhead.
Expand All @@ -272,11 +336,11 @@ func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlic
break // all bytes received; trailing EOF is expected
}

return totalRead, fmt.Errorf("failed reading at offset %d after %d bytes: %w", s.chunkOff, totalRead, readErr)
return stats, source, open, fmt.Errorf("failed reading at offset %d after %d bytes: %w", s.chunkOff, totalRead, readErr)
}
}

return totalRead, nil
return stats, source, open, nil
}

// releaseSession removes s from the active list (swap-delete).
Expand Down
18 changes: 10 additions & 8 deletions packages/orchestrator/pkg/sandbox/block/streaming_chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type testControl struct {

func newTestChunker(t *testing.T, file storage.Seekable, size int64) *Chunker {
t.Helper()
c, err := NewChunker(&featureflags.Client{}, size, testBlockSize, file, t.TempDir()+"/cache", newTestMetrics(t))
c, err := NewChunker(&featureflags.Client{}, size, testBlockSize, file, t.TempDir()+"/cache", newTestMetrics(t), storage.MemfileObjectType)
require.NoError(t, err)

return c
Expand All @@ -82,7 +82,7 @@ func (s *fakeSeekable) StoreFile(context.Context, string, ...storage.PutOption)
panic("not used")
}

func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, error) {
func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, storage.Source, error) {
s.fetchCount.Add(1)

if s.ctrl != nil {
Expand All @@ -103,14 +103,14 @@ func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length
advance: s.ctrl.advance,
consumed: s.ctrl.consumed,
closed: s.ctrl.closed,
}), nil
}), storage.SourceFS, nil
}

var fetchOff, fetchLen int64
if frameTable.IsCompressed() {
r, err := frameTable.LocateCompressed(offsetU)
if err != nil {
return nil, fmt.Errorf("frame lookup: %w", err)
return nil, storage.UnknownSource, fmt.Errorf("frame lookup: %w", err)
}

fetchOff = r.Offset
Expand All @@ -127,10 +127,12 @@ func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length

r := io.Reader(bytes.NewReader(s.data[fetchOff:end]))
if frameTable.IsCompressed() {
return storage.NewDecompressingReader(storage.NewRangeReader(io.NopCloser(r)), frameTable.CompressionType())
dec, err := storage.NewDecompressingReader(storage.NewRangeReader(io.NopCloser(r)), frameTable.CompressionType())

return dec, storage.SourceFS, err
}

return storage.NewRangeReader(io.NopCloser(r)), nil
return storage.NewRangeReader(io.NopCloser(r)), storage.SourceFS, nil
}

func makeCompressedTestData(tb testing.TB, data []byte) (*storage.FrameTable, *fakeSeekable) {
Expand Down Expand Up @@ -428,13 +430,13 @@ func (s *panicSeekable) StoreFile(context.Context, string, ...storage.PutOption)
panic("not used")
}

func (s *panicSeekable) OpenRangeReader(_ context.Context, off int64, length int64, _ *storage.FrameTable) (storage.RangeReader, error) {
func (s *panicSeekable) OpenRangeReader(_ context.Context, off int64, length int64, _ *storage.FrameTable) (storage.RangeReader, storage.Source, error) {
end := min(off+length, int64(len(s.data)))

return storage.NewRangeReader(&panicReader{
data: s.data[off:end],
panicAfter: int(s.panicAfter - off),
}), nil
}), storage.SourceFS, nil
}

type panicReader struct {
Expand Down
Loading
Loading