From f7a0b35aa4c5f15789eb09c24d0cb412a60c0a50 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 28 May 2026 00:27:11 -0700 Subject: [PATCH 1/3] feat(storage): observability for the read path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an orchestrator.read.* metric family with consistent attributes (file_type/source/codec/outcome) covering each stage of a read, plus per-layer chunker and build-file timers, so dashboards can attribute latency end-to-end from sandbox-visible read to backend fetch. Does not remove any of the prior metrics, this will be done separately after the dashboards are updated. Metrics: - orchestrator.file.read_at build.File.ReadAt — per-fault unit, aggregates all underlying mappings into one record - orchestrator.chunk.slice Chunker.Slice — per per-mapping unit, source=mmap on cache hit else the backend that served - orchestrator.read.open OpenRangeReader (open / TTFB) - orchestrator.read.read source-read wall, compressed bytes - orchestrator.read.decompress decompress CPU + uncompressed bytes - orchestrator.read.fetch total fetch wall + bytes delivered - orchestrator.read.writeback NFS cache writeback wall + bytes - orchestrator.read.pipeline.efficiency fetch / (open+read+decompress) - orchestrator.read.cache NFS hit/miss/writeback events - orchestrator.read.inflight concurrent fetches gauge Spans: - chunk.fetch runFetch goroutine span --- .../pkg/sandbox/block/fetch_session.go | 13 + .../pkg/sandbox/block/metrics/main.go | 14 +- .../pkg/sandbox/block/streaming_chunk.go | 92 +++++- .../pkg/sandbox/block/streaming_chunk_test.go | 18 +- .../orchestrator/pkg/sandbox/build/build.go | 38 ++- .../pkg/sandbox/build/storage_diff.go | 2 +- .../sandbox/template/peerclient/seekable.go | 14 +- .../template/peerclient/seekable_test.go | 22 +- .../sandbox/template/peerclient/storage.go | 4 +- .../shared/pkg/storage/compress_decode.go | 66 ++++- packages/shared/pkg/storage/io_wrappers.go | 101 ++++--- packages/shared/pkg/storage/mock_seekable.go | 24 +- packages/shared/pkg/storage/read_attrs.go | 26 ++ .../pkg/storage/read_attrs_precomputed.go | 121 ++++++++ .../shared/pkg/storage/read_attrs_test.go | 57 ++++ packages/shared/pkg/storage/read_metrics.go | 151 ++++++++++ packages/shared/pkg/storage/storage.go | 27 +- packages/shared/pkg/storage/storage_aws.go | 10 +- packages/shared/pkg/storage/storage_cache.go | 1 + .../storage/storage_cache_compressed_test.go | 22 +- .../pkg/storage/storage_cache_seekable.go | 41 ++- .../storage_cache_seekable_compressed.go | 36 ++- .../storage/storage_cache_seekable_test.go | 275 +++++++----------- packages/shared/pkg/storage/storage_fs.go | 27 +- packages/shared/pkg/storage/storage_google.go | 22 +- packages/shared/pkg/telemetry/meters.go | 53 +++- .../sandbox_rapid_pause_resume_test.go | 2 +- 27 files changed, 943 insertions(+), 336 deletions(-) create mode 100644 packages/shared/pkg/storage/read_attrs.go create mode 100644 packages/shared/pkg/storage/read_attrs_precomputed.go create mode 100644 packages/shared/pkg/storage/read_attrs_test.go create mode 100644 packages/shared/pkg/storage/read_metrics.go diff --git a/packages/orchestrator/pkg/sandbox/block/fetch_session.go b/packages/orchestrator/pkg/sandbox/block/fetch_session.go index 8b013afb01..9f80484261 100644 --- a/packages/orchestrator/pkg/sandbox/block/fetch_session.go +++ b/packages/orchestrator/pkg/sandbox/block/fetch_session.go @@ -7,6 +7,8 @@ import ( "fmt" "sync" "sync/atomic" + + "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) type fetchSession struct { @@ -21,12 +23,23 @@ type fetchSession struct { fetchErr error done bool // true once terminated (success or error) + // source is the backend that served the fetch; set by runFetch. + source atomic.Int32 + // bytesReady is the byte count (from chunkOff) up to which all blocks // are fully written and marked cached. Atomic so registerAndWait can // do a lock-free fast-path check: bytesReady only increases. bytesReady atomic.Int64 } +func (s *fetchSession) setSource(src storage.Source) { + s.source.Store(int32(src)) +} + +func (s *fetchSession) Source() storage.Source { + return storage.Source(s.source.Load()) +} + // contains reports whether the session covers the byte range [off, off+length). func (s *fetchSession) contains(off, length int64) bool { return s.chunkOff <= off && s.chunkOff+s.chunkLen >= off+length diff --git a/packages/orchestrator/pkg/sandbox/block/metrics/main.go b/packages/orchestrator/pkg/sandbox/block/metrics/main.go index f1d67440da..25da8b0b71 100644 --- a/packages/orchestrator/pkg/sandbox/block/metrics/main.go +++ b/packages/orchestrator/pkg/sandbox/block/metrics/main.go @@ -12,17 +12,20 @@ const ( orchestratorBlockSlices = "orchestrator.blocks.slices" orchestratorBlockChunksFetch = "orchestrator.blocks.chunks.fetch" orchestratorBlockChunksStore = "orchestrator.blocks.chunks.store" + orchestratorChunkSlice = "orchestrator.chunk.slice" ) type Metrics struct { // SlicesMetric is used to measure page faulting performance. SlicesTimerFactory telemetry.TimerFactory - // WriteChunksMetric is used to measure the time taken to download chunks from remote storage + // RemoteReadsTimerFactory is used to measure the time taken to download chunks from remote storage RemoteReadsTimerFactory telemetry.TimerFactory // WriteChunksMetric is used to measure performance of writing chunks to disk. WriteChunksTimerFactory telemetry.TimerFactory + + ChunkSliceTimerFactory telemetry.FloatTimerFactory } func NewMetrics(meterProvider metric.MeterProvider) (Metrics, error) { @@ -58,5 +61,14 @@ func NewMetrics(meterProvider metric.MeterProvider) (Metrics, error) { return m, fmt.Errorf("failed to get stored chunks metric: %w", err) } + if m.ChunkSliceTimerFactory, err = telemetry.NewFloatTimerFactory( + blocksMeter, orchestratorChunkSlice, + "Time taken by Chunker to serve a Slice() (source=mmap when served from cache)", + "Bytes returned", + "Slice call count", + ); err != nil { + return m, fmt.Errorf("error creating chunk slice timer factory: %w", err) + } + return m, nil } diff --git a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go index a3041f9256..37adb51dc0 100644 --- a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go @@ -31,6 +31,7 @@ type Chunker struct { metrics metrics.Metrics fetchTimeout time.Duration featureFlags *featureflags.Client + objType storage.SeekableObjectType size int64 @@ -49,6 +50,7 @@ func NewChunker( upstream storage.StreamingReader, cachePath string, metrics metrics.Metrics, + objType storage.SeekableObjectType, ) (*Chunker, error) { cache, err := NewCache(size, blockSize, cachePath, false) if err != nil { @@ -62,6 +64,7 @@ func NewChunker( metrics: metrics, featureFlags: ff, fetchTimeout: defaultFetchTimeout, + objType: objType, }, nil } @@ -75,42 +78,52 @@ func (c *Chunker) ReadAt(ctx context.Context, b []byte, off int64, ft *storage.F } func (c *Chunker) Slice(ctx context.Context, off, length int64, ft *storage.FrameTable) ([]byte, error) { + ct := ft.CompressionType() attrs := chunkerAttrs if ft.IsCompressed() { attrs = chunkerAttrsCompressed } + + sliceStart := time.Now() timer := c.metrics.SlicesTimerFactory.Begin() - // Fast path: already cached + // Fast path: already cached. b, err := c.cache.Slice(off, length) if err == nil { timer.RecordRaw(ctx, length, attrs.successFromCache) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), length, storage.OKAttrs(c.objType, storage.SourceMmap, ct)) return b, nil } if !errors.As(err, &BytesNotAvailableError{}) { timer.RecordRaw(ctx, length, attrs.failCacheRead) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, storage.SourceMmap, ct, err)) return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err) } // Fetch every chunk the range spans (one fetch session per chunk). + var src storage.Source end := off + length for cur := off; cur < end; { chunkOff, chunkLen, lerr := c.locateChunk(cur, ft) if lerr != nil { timer.RecordRaw(ctx, length, attrs.failRemoteFetch) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, src, ct, lerr)) return nil, fmt.Errorf("failed to locate chunk for offset %d: %w", cur, lerr) } chunkEnd := chunkOff + chunkLen rangeEnd := min(end, chunkEnd) - if err := c.fetch(ctx, cur, rangeEnd-cur, ft); err != nil { + s, err := c.fetch(ctx, cur, rangeEnd-cur, ft) + if err != nil { timer.RecordRaw(ctx, length, attrs.failRemoteFetch) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, s, ct, err)) return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", cur, rangeEnd, err) } + src = max(src, s) cur = chunkEnd } @@ -118,11 +131,13 @@ func (c *Chunker) Slice(ctx context.Context, off, length int64, ft *storage.Fram b, cacheErr := c.cache.sliceDirect(off, length) if cacheErr != nil { timer.RecordRaw(ctx, length, attrs.failLocalReadAgain) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), 0, storage.ErrAttrs(c.objType, src, ct, cacheErr)) return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr) } timer.RecordRaw(ctx, length, attrs.successFromRemote) + c.metrics.ChunkSliceTimerFactory.Record(ctx, time.Since(sliceStart), length, storage.OKAttrs(c.objType, src, ct)) return b, nil } @@ -161,15 +176,15 @@ func (c *Chunker) getOrCreateSession(ctx context.Context, off, length int64, ft // fetch ensures the chunk for [off, off+length) is fetched and waits // for every block the range spans (a span can cross block boundaries // after dedup; waiting only on the start block leaves the tail unfetched). -func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.FrameTable) error { +func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.FrameTable) (storage.Source, error) { chunkOff, chunkLen, err := c.locateChunk(off, ft) if err != nil { - return fmt.Errorf("failed to locate chunk for offset %d: %w", off, err) + return storage.UnknownSource, fmt.Errorf("failed to locate chunk for offset %d: %w", off, err) } session, justGotCached := c.getOrCreateSession(ctx, chunkOff, chunkLen, ft) if justGotCached { - return nil + return storage.SourceMmap, nil } blockSize := c.cache.BlockSize() @@ -181,11 +196,11 @@ func (c *Chunker) fetch(ctx context.Context, off, length int64, ft *storage.Fram break // tail belongs to the caller's next chunk fetch. } if err := session.registerAndWait(ctx, b); err != nil { - return err + return session.Source(), err } } - return nil + return session.Source(), nil } // runFetch fetches data from storage into the mmap cache. Runs in a background goroutine. @@ -193,6 +208,13 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra ctx, cancel := context.WithTimeout(ctx, c.fetchTimeout) defer cancel() + ctx, span := tracer.Start(ctx, "chunk.fetch") + defer span.End() + span.SetAttributes( + attribute.Int64("off", s.chunkOff), + attribute.Int64("len", s.chunkLen), + ) + defer c.releaseSession(s) // Unconditionally terminate the session on exit so registerAndWait @@ -216,16 +238,26 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra } defer releaseLock() + ct := ft.CompressionType() + attrs := chunkerAttrs if ft.IsCompressed() { attrs = chunkerAttrsCompressed } fetchTimer := c.metrics.RemoteReadsTimerFactory.Begin() - readBytes, err := c.progressiveRead(ctx, s, mmapSlice, ft) + fetchStart := time.Now() + + stats, src, open, err := c.progressiveRead(ctx, s, mmapSlice, ft) + var readBytes int64 + if stats != nil { + readBytes = stats.UncompressedBytes + } if err != nil { fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteFailure) + storage.RecordReadFetch(ctx, time.Since(fetchStart), readBytes, storage.ErrAttrs(c.objType, src, ct, err)) + s.fail(err) return @@ -236,24 +268,56 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra // closing the TOCTOU window in getOrCreateSession. c.cache.setIsCached(s.chunkOff, s.chunkLen) + fetchWall := time.Since(fetchStart) fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteSuccess) + storage.RecordReadFetch(ctx, fetchWall, readBytes, storage.OKAttrs(c.objType, src, ct)) + + // fetch wall / (open + read + decompress); >1 = unaccounted overhead. + if stats != nil { + if work := open + stats.Read + stats.Decompress; work > 0 { + ratio := fetchWall.Seconds() / work.Seconds() + storage.RecordPipelineEfficiency(ctx, ratio, storage.OKAttrs(c.objType, src, ct)) + } + } + s.setDone() } -func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlice []byte, ft *storage.FrameTable) (totalRead int64, err error) { - reader, err := c.upstream.OpenRangeReader(ctx, s.chunkOff, s.chunkLen, ft) +func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlice []byte, ft *storage.FrameTable) (stats *storage.ReadStats, source storage.Source, open time.Duration, err error) { + doneInflight := storage.StartInflight(ctx, storage.InflightFetchAttrs(c.objType)) + defer doneInflight() + + openStart := time.Now() + reader, source, err := c.upstream.OpenRangeReader(ctx, s.chunkOff, s.chunkLen, ft) + open = time.Since(openStart) + s.setSource(source) if err != nil { - return 0, fmt.Errorf("failed to open range reader at %d: %w", s.chunkOff, err) + return nil, source, open, fmt.Errorf("failed to open range reader at %d: %w", s.chunkOff, err) } + defer func() { - if closeErr := reader.Close(ctx); closeErr != nil && err == nil { + var closeErr error + stats, closeErr = reader.Close(ctx) + if closeErr != nil && err == nil { err = closeErr } + if err != nil { + return + } + + ct := ft.CompressionType() + okAttrs := storage.OKAttrs(c.objType, source, ct) + + storage.RecordReadOpen(ctx, open, 0, okAttrs) + if stats != nil { + storage.RecordReadRead(ctx, stats.Read, stats.CompressedBytes, okAttrs) + } }() blockSize := c.cache.BlockSize() readBatch := max(blockSize, int64(c.featureFlags.IntFlag(ctx, featureflags.MinChunkerReadSizeKB))*1024) + var totalRead int64 for totalRead < s.chunkLen { // Read in batches of max(blockSize, minReadBatchSize) to align notification // granularity with the read size and minimize lock/notify overhead. @@ -272,11 +336,11 @@ func (c *Chunker) progressiveRead(ctx context.Context, s *fetchSession, mmapSlic break // all bytes received; trailing EOF is expected } - return totalRead, fmt.Errorf("failed reading at offset %d after %d bytes: %w", s.chunkOff, totalRead, readErr) + return stats, source, open, fmt.Errorf("failed reading at offset %d after %d bytes: %w", s.chunkOff, totalRead, readErr) } } - return totalRead, nil + return stats, source, open, nil } // releaseSession removes s from the active list (swap-delete). diff --git a/packages/orchestrator/pkg/sandbox/block/streaming_chunk_test.go b/packages/orchestrator/pkg/sandbox/block/streaming_chunk_test.go index ae47c70d65..7cc5e494dc 100644 --- a/packages/orchestrator/pkg/sandbox/block/streaming_chunk_test.go +++ b/packages/orchestrator/pkg/sandbox/block/streaming_chunk_test.go @@ -68,7 +68,7 @@ type testControl struct { func newTestChunker(t *testing.T, file storage.Seekable, size int64) *Chunker { t.Helper() - c, err := NewChunker(&featureflags.Client{}, size, testBlockSize, file, t.TempDir()+"/cache", newTestMetrics(t)) + c, err := NewChunker(&featureflags.Client{}, size, testBlockSize, file, t.TempDir()+"/cache", newTestMetrics(t), storage.MemfileObjectType) require.NoError(t, err) return c @@ -82,7 +82,7 @@ func (s *fakeSeekable) StoreFile(context.Context, string, ...storage.PutOption) panic("not used") } -func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, error) { +func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, storage.Source, error) { s.fetchCount.Add(1) if s.ctrl != nil { @@ -103,14 +103,14 @@ func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length advance: s.ctrl.advance, consumed: s.ctrl.consumed, closed: s.ctrl.closed, - }), nil + }), storage.SourceFS, nil } var fetchOff, fetchLen int64 if frameTable.IsCompressed() { r, err := frameTable.LocateCompressed(offsetU) if err != nil { - return nil, fmt.Errorf("frame lookup: %w", err) + return nil, storage.UnknownSource, fmt.Errorf("frame lookup: %w", err) } fetchOff = r.Offset @@ -127,10 +127,12 @@ func (s *fakeSeekable) OpenRangeReader(_ context.Context, offsetU int64, length r := io.Reader(bytes.NewReader(s.data[fetchOff:end])) if frameTable.IsCompressed() { - return storage.NewDecompressingReader(storage.NewRangeReader(io.NopCloser(r)), frameTable.CompressionType()) + dec, err := storage.NewDecompressingReader(storage.NewRangeReader(io.NopCloser(r)), frameTable.CompressionType()) + + return dec, storage.SourceFS, err } - return storage.NewRangeReader(io.NopCloser(r)), nil + return storage.NewRangeReader(io.NopCloser(r)), storage.SourceFS, nil } func makeCompressedTestData(tb testing.TB, data []byte) (*storage.FrameTable, *fakeSeekable) { @@ -428,13 +430,13 @@ func (s *panicSeekable) StoreFile(context.Context, string, ...storage.PutOption) panic("not used") } -func (s *panicSeekable) OpenRangeReader(_ context.Context, off int64, length int64, _ *storage.FrameTable) (storage.RangeReader, error) { +func (s *panicSeekable) OpenRangeReader(_ context.Context, off int64, length int64, _ *storage.FrameTable) (storage.RangeReader, storage.Source, error) { end := min(off+length, int64(len(s.data))) return storage.NewRangeReader(&panicReader{ data: s.data[off:end], panicAfter: int(s.panicAfter - off), - }), nil + }), storage.SourceFS, nil } type panicReader struct { diff --git a/packages/orchestrator/pkg/sandbox/build/build.go b/packages/orchestrator/pkg/sandbox/build/build.go index 196436bb79..83973d6e5c 100644 --- a/packages/orchestrator/pkg/sandbox/build/build.go +++ b/packages/orchestrator/pkg/sandbox/build/build.go @@ -11,6 +11,8 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block" @@ -18,14 +20,22 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) +var fileReadAtTimer = utils.Must(telemetry.NewFloatTimerFactory(meter, "orchestrator.file.read_at", + "Time to serve a build.File ReadAt across all source builds", + "Bytes read", "ReadAt call count")) + type File struct { header atomic.Pointer[header.Header] store *DiffStore fileType DiffType persistence storage.StorageProvider metrics blockmetrics.Metrics + + okAttrs metric.MeasurementOption } func NewFile( @@ -40,12 +50,23 @@ func NewFile( fileType: fileType, persistence: persistence, metrics: metrics, + okAttrs: metric.WithAttributes( + attribute.String(storage.AttrFileType, string(fileType)), + attribute.String(storage.AttrOutcome, storage.OutcomeOK), + ), } f.header.Store(header) return f } +func (b *File) errAttrs(err error) metric.MeasurementOption { + return metric.WithAttributes( + attribute.String(storage.AttrFileType, string(b.fileType)), + attribute.String(storage.AttrOutcome, storage.Outcome(err)), + ) +} + func (b *File) Header() *header.Header { return b.header.Load() } @@ -55,6 +76,21 @@ func (b *File) SwapHeader(h *header.Header) { } func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + start := time.Now() + n, err = b.readAt(ctx, p, off) + if err != nil { + fileReadAtTimer.Record(ctx, time.Since(start), int64(n), b.errAttrs(err)) + + return n, err + } + fileReadAtTimer.Record(ctx, time.Since(start), int64(n), b.okAttrs) + + return n, nil +} + +// readAt is the uninstrumented worker; Slice's compose path reuses it without +// double-counting file.read_at. +func (b *File) readAt(ctx context.Context, p []byte, off int64) (n int, err error) { for n < len(p) { h := b.Header() @@ -148,7 +184,7 @@ func (b *File) Slice(ctx context.Context, off, length int64) ([]byte, error) { } } out := make([]byte, length) - if _, err := b.ReadAt(ctx, out, off); err != nil { + if _, err := b.readAt(ctx, out, off); err != nil { return nil, fmt.Errorf("failed to read at: %w", err) } diff --git a/packages/orchestrator/pkg/sandbox/build/storage_diff.go b/packages/orchestrator/pkg/sandbox/build/storage_diff.go index 1f4b519953..034c4d0c1f 100644 --- a/packages/orchestrator/pkg/sandbox/build/storage_diff.go +++ b/packages/orchestrator/pkg/sandbox/build/storage_diff.go @@ -96,7 +96,7 @@ func (b *StorageDiff) Init(ctx context.Context) error { } } - c, err := block.NewChunker(b.featureFlags, size, b.blockSize, obj, b.cachePath, b.metrics) + c, err := block.NewChunker(b.featureFlags, size, b.blockSize, obj, b.cachePath, b.metrics, b.storageObjectType) if err != nil { return fmt.Errorf("failed to create chunker: %w", err) } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go index 540850205d..99a1ce0970 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go @@ -102,7 +102,7 @@ func (s *peerSeekable) Size(ctx context.Context) (int64, error) { return base.Size(ctx) } -func (s *peerSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, error) { +func (s *peerSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *storage.FrameTable) (storage.RangeReader, storage.Source, error) { res, err := tryPeer(ctx, &s.peerHandle, "peer-seekable-open-range-reader", attrOpRangeReader, func(ctx context.Context) (peerAttempt[storage.RangeReader], error) { streamCtx, cancel := context.WithCancel(ctx) @@ -126,31 +126,31 @@ func (s *peerSeekable) OpenRangeReader(ctx context.Context, off int64, length in }, nil }) if res.hit { - return res.value, err + return res.value, storage.SourcePeer, err } if s.uploaded != nil && s.uploaded.Load() { now := time.Now() if s.transitionAt.CompareAndSwap(nil, &now) { - return nil, &storage.PeerTransitionedError{} + return nil, storage.SourcePeer, &storage.PeerTransitionedError{} } } base, err := s.getBase(ctx, frameTable.CompressionType()) if err != nil { - return nil, err + return nil, storage.SourcePeer, err } - rc, err := base.OpenRangeReader(ctx, off, length, frameTable) + rc, src, err := base.OpenRangeReader(ctx, off, length, frameTable) // GCS can briefly 404 a just-finalized object; within the retry window // re-emit so build.File reloads the header and retries with backoff. if errors.Is(err, storage.ErrObjectNotExist) { if at := s.transitionAt.Load(); at != nil && time.Since(*at) < postTransitionRetryWindow { - return nil, &storage.PeerTransitionedError{RetryAfter: postTransitionRetryDelay} + return nil, src, &storage.PeerTransitionedError{RetryAfter: postTransitionRetryDelay} } } - return rc, err + return rc, src, err } func (s *peerSeekable) StoreFile(context.Context, string, ...storage.PutOption) (*storage.FrameTable, [32]byte, error) { diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go index e90187d971..450de7f09c 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go @@ -72,7 +72,7 @@ func TestPeerSeekable_OpenRangeReader_PeerSucceeds(t *testing.T) { })).Return(stream, nil) s := &peerSeekable{peerHandle: peerHandle{client: client, buildID: "build-1", name: storage.MemfileName, uploaded: &atomic.Bool{}}} - rc, err := s.OpenRangeReader(t.Context(), 10, int64(len(data)), nil) + rc, _, err := s.OpenRangeReader(t.Context(), 10, int64(len(data)), nil) require.NoError(t, err) defer rc.Close(t.Context()) @@ -89,7 +89,7 @@ func TestPeerSeekable_OpenRangeReader_PeerError_FallsBackToBase(t *testing.T) { client.EXPECT().ReadAtBuildSeekable(mock.Anything, mock.Anything).Return(nil, errors.New("peer unavailable")) baseSeekable := storage.NewMockSeekable(t) - baseSeekable.EXPECT().OpenRangeReader(mock.Anything, int64(0), int64(len(baseData)), (*storage.FrameTable)(nil)).Return(storage.NewRangeReader(io.NopCloser(bytes.NewReader(baseData))), nil) + baseSeekable.EXPECT().OpenRangeReader(mock.Anything, int64(0), int64(len(baseData)), (*storage.FrameTable)(nil)).Return(storage.NewRangeReader(io.NopCloser(bytes.NewReader(baseData))), storage.SourceFS, nil) base := storage.NewMockStorageProvider(t) base.EXPECT().OpenSeekable(mock.Anything, "build-1/memfile", storage.MemfileObjectType).Return(baseSeekable, nil) @@ -104,7 +104,7 @@ func TestPeerSeekable_OpenRangeReader_PeerError_FallsBackToBase(t *testing.T) { basePersistence: base, objType: storage.MemfileObjectType, } - rc, err := s.OpenRangeReader(t.Context(), 0, int64(len(baseData)), nil) + rc, _, err := s.OpenRangeReader(t.Context(), 0, int64(len(baseData)), nil) require.NoError(t, err) defer rc.Close(t.Context()) @@ -136,7 +136,7 @@ func TestPeerSeekable_OpenRangeReader_Uploaded_ReturnsPeerTransitionedError(t *t objType: storage.MemfileObjectType, } - _, err := s.OpenRangeReader(t.Context(), 0, 100, nil) + _, _, err := s.OpenRangeReader(t.Context(), 0, 100, nil) require.Error(t, err) var transErr *storage.PeerTransitionedError @@ -187,7 +187,7 @@ func TestPeerStorageProvider_FullTransitionFlow(t *testing.T) { postBaseSeekable := storage.NewMockSeekable(t) postBaseSeekable.EXPECT(). OpenRangeReader(mock.Anything, int64(0), int64(len(postBaseBytes)), mock.Anything). - Return(storage.NewRangeReader(io.NopCloser(bytes.NewReader(postBaseBytes))), nil).Once() + Return(storage.NewRangeReader(io.NopCloser(bytes.NewReader(postBaseBytes))), storage.SourceFS, nil).Once() base := storage.NewMockStorageProvider(t) base.EXPECT(). @@ -199,28 +199,30 @@ func TestPeerStorageProvider_FullTransitionFlow(t *testing.T) { require.NoError(t, err) // 1. Pre-transition read via peer. ft={ct=None} (V3 header). - rc, err := seekable.OpenRangeReader(t.Context(), 0, int64(len(prePeerBytes)), + rc, _, err := seekable.OpenRangeReader(t.Context(), 0, int64(len(prePeerBytes)), storage.NewFrameTable(storage.CompressionNone, nil)) require.NoError(t, err) got, err := io.ReadAll(rc) require.NoError(t, err) - require.NoError(t, rc.Close(t.Context())) + _, err = rc.Close(t.Context()) + require.NoError(t, err) assert.Equal(t, prePeerBytes, got) require.True(t, uploaded.Load(), "uploaded flag should be set after peer EOF with UseStorage") // 2. First post-transition call: retriable error, no peer/base contact. - _, err = seekable.OpenRangeReader(t.Context(), 0, 1, + _, _, err = seekable.OpenRangeReader(t.Context(), 0, 1, storage.NewFrameTable(storage.CompressionNone, nil)) var transErr *storage.PeerTransitionedError require.ErrorAs(t, err, &transErr) // 3. Caller reloads V4 header and retries with ct=Zstd. This must hit the // compressed path on base. - rc, err = seekable.OpenRangeReader(t.Context(), 0, int64(len(postBaseBytes)), + rc, _, err = seekable.OpenRangeReader(t.Context(), 0, int64(len(postBaseBytes)), storage.NewFrameTable(storage.CompressionZstd, nil)) require.NoError(t, err) got, err = io.ReadAll(rc) require.NoError(t, err) - require.NoError(t, rc.Close(t.Context())) + _, err = rc.Close(t.Context()) + require.NoError(t, err) assert.Equal(t, postBaseBytes, got) } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go index c6313149af..7c299ed8f9 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go @@ -304,8 +304,8 @@ func (r *peerStreamReader) Read(p []byte) (int, error) { } } -func (r *peerStreamReader) Close(context.Context) error { +func (r *peerStreamReader) Close(context.Context) (*storage.ReadStats, error) { r.cancel() - return nil + return nil, nil } diff --git a/packages/shared/pkg/storage/compress_decode.go b/packages/shared/pkg/storage/compress_decode.go index a0a706bab3..6f38eb7568 100644 --- a/packages/shared/pkg/storage/compress_decode.go +++ b/packages/shared/pkg/storage/compress_decode.go @@ -2,9 +2,11 @@ package storage import ( "context" + "errors" "fmt" "io" "sync" + "time" "github.com/klauspost/compress/zstd" lz4 "github.com/pierrec/lz4/v4" @@ -56,25 +58,42 @@ func putZstdDecoder(dec *zstd.Decoder) { zstdDecoderPool.Put(dec) } -// decompressReader decompresses inner on Read; Close releases the codec back -// to its pool and closes inner. +// decompressReader decompresses on Read, metering raw pulls vs decoded output +// separately so source-read wall and decompression CPU are split. On Close it +// drains the decoder to EOF (so any wrapper below — e.g. captureReader — +// observes the full encoded stream) and emits the orchestrator.read.decompress +// record (decompress time + uncompressed bytes, keyed by file_type/source/codec). type decompressReader struct { - inner RangeReader - dec io.Reader + inner RangeReader // retained to call Close; + meteredIn *meteredReader + meteredOut *meteredReader releaseCodec func() + ct CompressionType + source Source + objType SeekableObjectType + readErr error } +// NewDecompressingReader wraps inner so Read returns decompressed bytes; +// metric attribution falls back to defaults (callers that care provide it via +// newDecompressReader directly). func NewDecompressingReader(inner RangeReader, ct CompressionType) (RangeReader, error) { + return newDecompressReader(inner, ct, UnknownSource, UnknownSeekableObjectType) +} + +func newDecompressReader(inner RangeReader, ct CompressionType, src Source, ot SeekableObjectType) (*decompressReader, error) { + compressed := &meteredReader{inner: inner} + var dec io.Reader var releaseCodec func() switch ct { case CompressionLZ4: - d := getLZ4Decoder(inner) + d := getLZ4Decoder(compressed) dec, releaseCodec = d, func() { putLZ4Decoder(d) } case CompressionZstd: - d, err := getZstdDecoder(inner) + d, err := getZstdDecoder(compressed) if err != nil { return nil, fmt.Errorf("failed to create zstd decoder: %w", err) } @@ -86,17 +105,44 @@ func NewDecompressingReader(inner RangeReader, ct CompressionType) (RangeReader, return &decompressReader{ inner: inner, - dec: dec, + meteredIn: compressed, + meteredOut: &meteredReader{inner: dec}, releaseCodec: releaseCodec, + ct: ct, + source: src, + objType: ot, }, nil } func (r *decompressReader) Read(p []byte) (int, error) { - return r.dec.Read(p) + n, err := r.meteredOut.Read(p) + if err != nil && !errors.Is(err, io.EOF) { + r.readErr = err + } + + return n, err } -func (r *decompressReader) Close(ctx context.Context) error { +func (r *decompressReader) Close(ctx context.Context) (*ReadStats, error) { + // Drain to EOF so any wrapper below observes the full encoded stream. With + // LZ4 BlockChecksum=true / Checksum=false the 4-byte EndMark is otherwise + // left unread. + if r.readErr == nil { + _, _ = io.Copy(io.Discard, r.meteredOut) + } + r.releaseCodec() - return r.inner.Close(ctx) + stats := &ReadStats{ + CompressedBytes: r.meteredIn.bytes, + UncompressedBytes: r.meteredOut.bytes, + Read: time.Duration(r.meteredIn.nanos), + Decompress: max(0, time.Duration(r.meteredOut.nanos)-time.Duration(r.meteredIn.nanos)), + } + + recordDecompressStep(ctx, r, stats, r.readErr) + + _, innerErr := r.inner.Close(ctx) + + return stats, innerErr } diff --git a/packages/shared/pkg/storage/io_wrappers.go b/packages/shared/pkg/storage/io_wrappers.go index 1e79763b89..850d81ab03 100644 --- a/packages/shared/pkg/storage/io_wrappers.go +++ b/packages/shared/pkg/storage/io_wrappers.go @@ -6,6 +6,7 @@ import ( "errors" "io" "os" + "time" "go.opentelemetry.io/otel/trace" @@ -14,6 +15,7 @@ import ( var ( _ io.Reader = (*offsetReader)(nil) + _ io.Reader = (*meteredReader)(nil) _ RangeReader = (*sectionReader)(nil) _ RangeReader = (*observableReader)(nil) _ RangeReader = (*rangeReader)(nil) @@ -39,15 +41,15 @@ func newOffsetReader(reader io.ReaderAt, offset int64) *offsetReader { } // rangeReader adapts an io.ReadCloser into a RangeReader by ignoring the -// Close context. +// Close context. It does not meter, so Close returns nil stats. type rangeReader struct { io.ReadCloser } func NewRangeReader(rc io.ReadCloser) RangeReader { return &rangeReader{ReadCloser: rc} } -func (p *rangeReader) Close(context.Context) error { - return p.ReadCloser.Close() +func (p *rangeReader) Close(context.Context) (*ReadStats, error) { + return nil, p.ReadCloser.Close() } type sectionReader struct { @@ -63,29 +65,41 @@ func newSectionReader(f *os.File, off, length int64) *sectionReader { } } -func (r *sectionReader) Close(context.Context) error { - return r.file.Close() +func (r *sectionReader) Close(context.Context) (*ReadStats, error) { + return nil, r.file.Close() +} + +// meteredReader records cumulative time and bytes spent pulling from inner so +// a decoder built on top can separate source-read wall from decompression CPU. +// Single-goroutine: Read is sequential, stats are read after EOF in Close. +type meteredReader struct { + inner io.Reader + nanos int64 + bytes int64 +} + +func (m *meteredReader) Read(p []byte) (int, error) { + t0 := time.Now() + n, err := m.inner.Read(p) + m.nanos += int64(time.Since(t0)) + m.bytes += int64(n) + + return n, err } // captureReader tees every read byte into a buffer and hands the captured -// bytes to onClose on Close. Used by the cache writeback paths. -// -// drainOnClose=true reads inner to EOF on Close even if the caller above hasn't -// consumed everything. Needed when capturing under a decoder that stops short -// of EOF on its source (e.g. lz4.Reader skips the 4-byte EndMark). +// bytes to onClose on Close. Used by the compressed cache writeback path. type captureReader struct { - inner RangeReader - buf *bytes.Buffer - onClose func(ctx context.Context, captured []byte) - drainOnClose bool + inner RangeReader + buf *bytes.Buffer + onClose func(ctx context.Context, captured []byte) } -func newCaptureReader(inner RangeReader, capHint int, drainOnClose bool, onClose func(context.Context, []byte)) *captureReader { +func newCaptureReader(inner RangeReader, capHint int, onClose func(context.Context, []byte)) *captureReader { return &captureReader{ - inner: inner, - buf: bytes.NewBuffer(make([]byte, 0, capHint)), - onClose: onClose, - drainOnClose: drainOnClose, + inner: inner, + buf: bytes.NewBuffer(make([]byte, 0, capHint)), + onClose: onClose, } } @@ -98,34 +112,47 @@ func (r *captureReader) Read(p []byte) (int, error) { return n, err } -func (r *captureReader) Close(ctx context.Context) error { - if r.drainOnClose { - _, _ = io.Copy(io.Discard, r) - } - err := r.inner.Close(ctx) +func (r *captureReader) Close(ctx context.Context) (*ReadStats, error) { + stats, err := r.inner.Close(ctx) r.onClose(ctx, r.buf.Bytes()) - return err + return stats, err } -// observableReader layers OTEL observability (legacy per-backend timer + span) -// onto an inner RangeReader, all applied on Close. timer and span are optional; -// pass nil if unused. +// observableReader layers OTEL observability onto an inner RangeReader, all +// applied on Close. The with* builder methods are optional and chainable. type observableReader struct { inner RangeReader + timer *telemetry.Stopwatch span trace.Span bytes int64 readErr error + + read time.Duration } -func newObservableReader(inner RangeReader, timer *telemetry.Stopwatch, span trace.Span) *observableReader { - return &observableReader{inner: inner, timer: timer, span: span} +func newObservableReader(inner RangeReader) *observableReader { + return &observableReader{inner: inner} +} + +func (r *observableReader) withTimer(t *telemetry.Stopwatch) *observableReader { + r.timer = t + + return r +} + +func (r *observableReader) withSpan(s trace.Span) *observableReader { + r.span = s + + return r } func (r *observableReader) Read(p []byte) (int, error) { + t0 := time.Now() n, err := r.inner.Read(p) + r.read += time.Since(t0) r.bytes += int64(n) if err != nil && !errors.Is(err, io.EOF) { @@ -135,8 +162,16 @@ func (r *observableReader) Read(p []byte) (int, error) { return n, err } -func (r *observableReader) Close(ctx context.Context) error { - closeErr := r.inner.Close(ctx) +func (r *observableReader) Close(ctx context.Context) (*ReadStats, error) { + stats, closeErr := r.inner.Close(ctx) + + if stats == nil { + stats = &ReadStats{ + CompressedBytes: r.bytes, + UncompressedBytes: r.bytes, + Read: r.read, + } + } if r.timer != nil { if r.readErr != nil || closeErr != nil { @@ -156,5 +191,5 @@ func (r *observableReader) Close(ctx context.Context) error { r.span.End() } - return closeErr + return stats, closeErr } diff --git a/packages/shared/pkg/storage/mock_seekable.go b/packages/shared/pkg/storage/mock_seekable.go index e33c5d8141..3f766623f0 100644 --- a/packages/shared/pkg/storage/mock_seekable.go +++ b/packages/shared/pkg/storage/mock_seekable.go @@ -38,7 +38,7 @@ func (_m *MockSeekable) EXPECT() *MockSeekable_Expecter { } // OpenRangeReader provides a mock function for the type MockSeekable -func (_mock *MockSeekable) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, error) { +func (_mock *MockSeekable) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, Source, error) { ret := _mock.Called(ctx, offsetU, length, frameTable) if len(ret) == 0 { @@ -46,8 +46,9 @@ func (_mock *MockSeekable) OpenRangeReader(ctx context.Context, offsetU int64, l } var r0 RangeReader - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, int64, int64, *FrameTable) (RangeReader, error)); ok { + var r1 Source + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, int64, int64, *FrameTable) (RangeReader, Source, error)); ok { return returnFunc(ctx, offsetU, length, frameTable) } if returnFunc, ok := ret.Get(0).(func(context.Context, int64, int64, *FrameTable) RangeReader); ok { @@ -57,12 +58,17 @@ func (_mock *MockSeekable) OpenRangeReader(ctx context.Context, offsetU int64, l r0 = ret.Get(0).(RangeReader) } } - if returnFunc, ok := ret.Get(1).(func(context.Context, int64, int64, *FrameTable) error); ok { + if returnFunc, ok := ret.Get(1).(func(context.Context, int64, int64, *FrameTable) Source); ok { r1 = returnFunc(ctx, offsetU, length, frameTable) } else { - r1 = ret.Error(1) + r1 = ret.Get(1).(Source) } - return r0, r1 + if returnFunc, ok := ret.Get(2).(func(context.Context, int64, int64, *FrameTable) error); ok { + r2 = returnFunc(ctx, offsetU, length, frameTable) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 } // MockSeekable_OpenRangeReader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OpenRangeReader' @@ -107,12 +113,12 @@ func (_c *MockSeekable_OpenRangeReader_Call) Run(run func(ctx context.Context, o return _c } -func (_c *MockSeekable_OpenRangeReader_Call) Return(rangeReader RangeReader, err error) *MockSeekable_OpenRangeReader_Call { - _c.Call.Return(rangeReader, err) +func (_c *MockSeekable_OpenRangeReader_Call) Return(rangeReader RangeReader, s Source, err error) *MockSeekable_OpenRangeReader_Call { + _c.Call.Return(rangeReader, s, err) return _c } -func (_c *MockSeekable_OpenRangeReader_Call) RunAndReturn(run func(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, error)) *MockSeekable_OpenRangeReader_Call { +func (_c *MockSeekable_OpenRangeReader_Call) RunAndReturn(run func(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, Source, error)) *MockSeekable_OpenRangeReader_Call { _c.Call.Return(run) return _c } diff --git a/packages/shared/pkg/storage/read_attrs.go b/packages/shared/pkg/storage/read_attrs.go new file mode 100644 index 0000000000..2c1559c12d --- /dev/null +++ b/packages/shared/pkg/storage/read_attrs.go @@ -0,0 +1,26 @@ +package storage + +// Closed-enum attribute vocabulary for the orchestrator.read.* / +// orchestrator.chunk.* metric families. + +const ( + AttrSource = "source" + AttrCodec = "codec" + AttrOutcome = "outcome" + AttrEvent = "event" + AttrFileType = "file_type" +) + +const ( + OutcomeOK = "ok" + OutcomeErrCanceled = "err_canceled" + OutcomeErrIO = "err_io" + OutcomeErrTimeout = "err_timeout" +) + +const ( + CacheEventHit = "hit" + CacheEventMiss = "miss" + CacheEventWritebackOK = "writeback_ok" + CacheEventWritebackErr = "writeback_err" +) diff --git a/packages/shared/pkg/storage/read_attrs_precomputed.go b/packages/shared/pkg/storage/read_attrs_precomputed.go new file mode 100644 index 0000000000..7ad7f64d3d --- /dev/null +++ b/packages/shared/pkg/storage/read_attrs_precomputed.go @@ -0,0 +1,121 @@ +package storage + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Precomputed attribute sets for hot-path read.*/chunk.* emissions; cold error +// paths build attrs inline via ErrAttrs. + +// Source identifies the backend that served a read. The zero value +// (UnknownSource) is the default for pre-resolution failures and any state +// before the backend is known. +type Source int8 + +const ( + // Order is latency-ascending and load-bearing: a multi-chunk Slice records + // the slowest source it touched via max() over per-fetch sources. Unknown + // at 0 is lighter than every real source so it gets replaced on first hit. + UnknownSource Source = iota + SourceMmap + SourceFS + SourcePeer + SourceNFS + SourceGCS + SourceAWS + numSources +) + +func (s Source) String() string { return sourceStrings[s] } + +var sourceStrings = [numSources]string{ + UnknownSource: "unknown", + SourceMmap: "mmap", + SourceFS: "fs", + SourcePeer: "peer", + SourceNFS: "nfs", + SourceGCS: "gcs", + SourceAWS: "aws", +} + +const numCodecs = 3 // CompressionNone, Zstd, LZ4 + +var ( + tableOK [numSeekableObjectTypes][numSources][numCodecs]metric.MeasurementOption + + tableCacheHit [numSeekableObjectTypes][numSources][numCodecs]metric.MeasurementOption + tableCacheMiss [numSeekableObjectTypes][numSources][numCodecs]metric.MeasurementOption + tableCacheWritebackOK [numSeekableObjectTypes][numSources][numCodecs]metric.MeasurementOption + tableCacheWritebackErr [numSeekableObjectTypes][numSources][numCodecs]metric.MeasurementOption + + // keyed by file_type only: inflight is incremented before the source is + // known (the OpenRangeReader call itself dominates GCS latency). + tableInflightFetch [numSeekableObjectTypes]metric.MeasurementOption +) + +func init() { + set := func(kvs ...attribute.KeyValue) metric.MeasurementOption { + return metric.WithAttributeSet(attribute.NewSet(kvs...)) + } + + for ot := range numSeekableObjectTypes { + ftAttr := attribute.String(AttrFileType, ot.String()) + + tableInflightFetch[ot] = set(ftAttr) + + for s := range numSources { + srcAttr := attribute.String(AttrSource, sourceStrings[s]) + + for ct := range CompressionType(numCodecs) { + codecAttr := attribute.String(AttrCodec, ct.String()) + outcomeOK := attribute.String(AttrOutcome, OutcomeOK) + + tableOK[ot][s][ct] = set( + ftAttr, srcAttr, codecAttr, outcomeOK, + ) + + tableCacheHit[ot][s][ct] = set( + ftAttr, attribute.String(AttrEvent, CacheEventHit), + srcAttr, codecAttr, + ) + tableCacheMiss[ot][s][ct] = set( + ftAttr, attribute.String(AttrEvent, CacheEventMiss), + srcAttr, codecAttr, + ) + tableCacheWritebackOK[ot][s][ct] = set( + ftAttr, attribute.String(AttrEvent, CacheEventWritebackOK), + srcAttr, codecAttr, + ) + tableCacheWritebackErr[ot][s][ct] = set( + ftAttr, attribute.String(AttrEvent, CacheEventWritebackErr), + srcAttr, codecAttr, + ) + } + } + } +} + +func OKAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + return tableOK[o][s][c] +} + +func CacheHitAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + return tableCacheHit[o][s][c] +} + +func CacheMissAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + return tableCacheMiss[o][s][c] +} + +func CacheWritebackOKAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + return tableCacheWritebackOK[o][s][c] +} + +func CacheWritebackErrAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + return tableCacheWritebackErr[o][s][c] +} + +func InflightFetchAttrs(o SeekableObjectType) metric.MeasurementOption { + return tableInflightFetch[o] +} diff --git a/packages/shared/pkg/storage/read_attrs_test.go b/packages/shared/pkg/storage/read_attrs_test.go new file mode 100644 index 0000000000..7d1b549f5f --- /dev/null +++ b/packages/shared/pkg/storage/read_attrs_test.go @@ -0,0 +1,57 @@ +package storage + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSourceStringsPopulated(t *testing.T) { + t.Parallel() + + for s := range numSources { + require.NotEmptyf(t, s.String(), "source %d has empty string label", s) + } +} + +func TestSeekableObjectTypeStrings(t *testing.T) { + t.Parallel() + + require.Equal(t, "memfile", MemfileObjectType.String()) + require.Equal(t, "rootfs", RootFSObjectType.String()) + require.Equal(t, "unknown", UnknownSeekableObjectType.String()) + for o := range numSeekableObjectTypes { + require.NotEmptyf(t, o.String(), "object type %d has empty file_type label", o) + } +} + +func TestOutcomeMapping(t *testing.T) { + t.Parallel() + + require.Equal(t, OutcomeOK, Outcome(nil)) + require.Equal(t, OutcomeErrCanceled, Outcome(context.Canceled)) + require.Equal(t, OutcomeErrTimeout, Outcome(context.DeadlineExceeded)) + require.Equal(t, OutcomeErrIO, Outcome(errors.New("boom"))) +} + +// TestPrecomputedAttrsPopulated guards the invariant that every emission site +// finds a non-nil precomputed attribute set — no enum combination is missed by +// the init() loops. +func TestPrecomputedAttrsPopulated(t *testing.T) { + t.Parallel() + + for o := range numSeekableObjectTypes { + require.NotNil(t, InflightFetchAttrs(o)) + for s := range numSources { + for c := range CompressionType(numCodecs) { + require.NotNil(t, OKAttrs(o, s, c)) + require.NotNil(t, CacheHitAttrs(o, s, c)) + require.NotNil(t, CacheMissAttrs(o, s, c)) + require.NotNil(t, CacheWritebackOKAttrs(o, s, c)) + require.NotNil(t, CacheWritebackErrAttrs(o, s, c)) + } + } + } +} diff --git a/packages/shared/pkg/storage/read_metrics.go b/packages/shared/pkg/storage/read_metrics.go new file mode 100644 index 0000000000..ae88776442 --- /dev/null +++ b/packages/shared/pkg/storage/read_metrics.go @@ -0,0 +1,151 @@ +package storage + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" +) + +// Instruments for the `orchestrator.read.*` family — one metric per stage +// (open/read/decompress/fetch/writeback) keyed by file_type/source/codec/outcome. + +func mustFloatHist(name, desc, unit string) metric.Float64Histogram { + return utils.Must(meter.Float64Histogram(name, + metric.WithDescription(desc), + metric.WithUnit(unit), + )) +} + +var ( + readOpen = utils.Must(telemetry.NewFloatTimerFactory(meter, + "orchestrator.read.open", + "OpenRangeReader (open / TTFB) wall", + "Bytes (always 0 — open transfers no payload)", + "Number of opens", + )) + readRead = utils.Must(telemetry.NewFloatTimerFactory(meter, + "orchestrator.read.read", + "Raw source-read wall (decompression excluded)", + "Compressed/stored bytes read from the source", + "Number of source reads", + )) + readDecompress = utils.Must(telemetry.NewFloatTimerFactory(meter, + "orchestrator.read.decompress", + "Decompression CPU wall (decoder read time minus source transfer)", + "Uncompressed bytes produced", + "Number of decompress records", + )) + readFetch = utils.Must(telemetry.NewFloatTimerFactory(meter, + "orchestrator.read.fetch", + "Total fetch wall — should ≈ open + read + decompress; any excess is overhead (see read.pipeline.efficiency)", + "Bytes delivered to the app", + "Number of fetches", + )) + readWriteback = utils.Must(telemetry.NewFloatTimerFactory(meter, + "orchestrator.read.writeback", + "Async NFS cache writeback wall", + "Bytes written to NFS", + "Number of writebacks", + )) + + // fetch / (open + read + decompress); 1.0 = fully explained, >1 = overhead. + readPipelineEfficiency = mustFloatHist( + "orchestrator.read.pipeline.efficiency", + "fetch / (open + read + decompress) — 1.0 = fetch wall fully explained by work, >1 = overhead", "1", + ) + + readCache = utils.Must(meter.Int64Counter( + "orchestrator.read.cache", + metric.WithDescription("NFS read-cache events (hit / miss / writeback). The mmap tier is orchestrator.chunk.cache."), + metric.WithUnit("1"), + )) + + readInflight = utils.Must(meter.Int64UpDownCounter( + "orchestrator.read.inflight", + metric.WithDescription("In-flight read-path fetches (cache miss → backend), by file_type"), + metric.WithUnit("1"), + )) +) + +func RecordReadOpen(ctx context.Context, dur time.Duration, bytes int64, attrs metric.MeasurementOption) { + readOpen.Record(ctx, dur, bytes, attrs) +} + +func RecordReadRead(ctx context.Context, dur time.Duration, bytes int64, attrs metric.MeasurementOption) { + readRead.Record(ctx, dur, bytes, attrs) +} + +func RecordReadFetch(ctx context.Context, dur time.Duration, bytes int64, attrs metric.MeasurementOption) { + readFetch.Record(ctx, dur, bytes, attrs) +} + +func RecordReadDecompress(ctx context.Context, dur time.Duration, bytes int64, attrs metric.MeasurementOption) { + readDecompress.Record(ctx, dur, bytes, attrs) +} + +func RecordPipelineEfficiency(ctx context.Context, ratio float64, attrs metric.MeasurementOption) { + readPipelineEfficiency.Record(ctx, ratio, attrs) +} + +// StartInflight increments the read.inflight gauge and returns a func that +// decrements it; defer the returned func so the +1/-1 can't drift apart. +func StartInflight(ctx context.Context, attrs metric.MeasurementOption) func() { + readInflight.Add(ctx, 1, attrs) + + return func() { readInflight.Add(ctx, -1, attrs) } +} + +// Outcome maps a read-path error to the closed read.* outcome enum. +func Outcome(err error) string { + switch { + case err == nil: + return OutcomeOK + case errors.Is(err, context.Canceled): + return OutcomeErrCanceled + case errors.Is(err, context.DeadlineExceeded): + return OutcomeErrTimeout + default: + return OutcomeErrIO + } +} + +// ErrAttrs builds the error-path attribute set for read.* records. Hot OK +// paths use the precomputed OKAttrs. +func ErrAttrs(o SeekableObjectType, s Source, c CompressionType, err error) metric.MeasurementOption { + return metric.WithAttributes( + attribute.String(AttrFileType, o.String()), + attribute.String(AttrSource, s.String()), + attribute.String(AttrCodec, c.String()), + attribute.String(AttrOutcome, Outcome(err)), + ) +} + +func recordDecompressStep(ctx context.Context, r *decompressReader, stats *ReadStats, readErr error) { + if readErr == nil { + readDecompress.Record(ctx, stats.Decompress, stats.UncompressedBytes, OKAttrs(r.objType, r.source, r.ct)) + + return + } + + readDecompress.Record(ctx, stats.Decompress, stats.UncompressedBytes, ErrAttrs(r.objType, r.source, r.ct, readErr)) +} + +// recordWriteback emits the read.writeback timer and its read.cache event. +// src is the originating fetch source (kept for cross-correlation); writebacks +// always target NFS. +func recordWriteback(ctx context.Context, dur time.Duration, bytes int64, ot SeekableObjectType, src Source, ct CompressionType, err error) { + if err == nil { + readWriteback.Record(ctx, dur, bytes, OKAttrs(ot, src, ct)) + readCache.Add(ctx, 1, CacheWritebackOKAttrs(ot, SourceNFS, ct)) + + return + } + readWriteback.Record(ctx, dur, bytes, ErrAttrs(ot, src, ct, err)) + readCache.Add(ctx, 1, CacheWritebackErrAttrs(ot, SourceNFS, ct)) +} diff --git a/packages/shared/pkg/storage/storage.go b/packages/shared/pkg/storage/storage.go index 2ec1607463..ace68d4891 100644 --- a/packages/shared/pkg/storage/storage.go +++ b/packages/shared/pkg/storage/storage.go @@ -68,8 +68,20 @@ const ( UnknownSeekableObjectType SeekableObjectType = iota MemfileObjectType RootFSObjectType + numSeekableObjectTypes ) +func (t SeekableObjectType) String() string { + switch t { + case MemfileObjectType: + return "memfile" + case RootFSObjectType: + return "rootfs" + default: + return "unknown" + } +} + type ObjectType int const ( @@ -143,14 +155,25 @@ type SeekableReader interface { Size(ctx context.Context) (int64, error) } +// ReadStats is what a RangeReader did over its lifetime; returned from Close. +type ReadStats struct { + CompressedBytes int64 + UncompressedBytes int64 + Read time.Duration // source I/O wall, excluding open and decompression + Decompress time.Duration +} + type RangeReader interface { io.Reader - Close(ctx context.Context) error + // Close returns stats from the reader's lifetime, or nil if the reader did + // not meter (e.g. a pure adapter). Callers should treat nil as "no stats". + Close(ctx context.Context) (*ReadStats, error) } // StreamingReader supports progressive reads via a streaming range reader. +// OpenRangeReader returns the Source that served the bytes. type StreamingReader interface { - OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, error) + OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, Source, error) } type SeekableWriter interface { diff --git a/packages/shared/pkg/storage/storage_aws.go b/packages/shared/pkg/storage/storage_aws.go index 8429d33e61..a18bc9e71d 100644 --- a/packages/shared/pkg/storage/storage_aws.go +++ b/packages/shared/pkg/storage/storage_aws.go @@ -233,9 +233,9 @@ func (o *awsObject) Put(ctx context.Context, data []byte, opts ...PutOption) err return nil } -func (o *awsObject) OpenRangeReader(ctx context.Context, off, length int64, frameTable *FrameTable) (RangeReader, error) { +func (o *awsObject) OpenRangeReader(ctx context.Context, off, length int64, frameTable *FrameTable) (RangeReader, Source, error) { if frameTable.IsCompressed() { - return nil, errors.New("compressed reads are not supported on AWS") + return nil, SourceAWS, errors.New("compressed reads are not supported on AWS") } readRange := aws.String(fmt.Sprintf("bytes=%d-%d", off, off+length-1)) @@ -247,13 +247,13 @@ func (o *awsObject) OpenRangeReader(ctx context.Context, off, length int64, fram if err != nil { var nsk *types.NoSuchKey if errors.As(err, &nsk) { - return nil, ErrObjectNotExist + return nil, SourceAWS, ErrObjectNotExist } - return nil, fmt.Errorf("failed to create S3 range reader for %q: %w", o.path, err) + return nil, SourceAWS, fmt.Errorf("failed to create S3 range reader for %q: %w", o.path, err) } - return NewRangeReader(resp.Body), nil + return NewRangeReader(resp.Body), SourceAWS, nil } func (o *awsObject) Size(ctx context.Context) (int64, error) { diff --git a/packages/shared/pkg/storage/storage_cache.go b/packages/shared/pkg/storage/storage_cache.go index 7d5838758c..e53a7110c7 100644 --- a/packages/shared/pkg/storage/storage_cache.go +++ b/packages/shared/pkg/storage/storage_cache.go @@ -122,6 +122,7 @@ func (c cache) OpenSeekable(ctx context.Context, path string, objectType Seekabl inner: innerObject, flags: c.flags, tracer: c.tracer, + objType: objectType, }, nil } diff --git a/packages/shared/pkg/storage/storage_cache_compressed_test.go b/packages/shared/pkg/storage/storage_cache_compressed_test.go index 5de0ebf55b..6683eb4d45 100644 --- a/packages/shared/pkg/storage/storage_cache_compressed_test.go +++ b/packages/shared/pkg/storage/storage_cache_compressed_test.go @@ -69,9 +69,9 @@ func TestDecompressingCacheReader(t *testing.T) { c := newTestCache(t) framePath := makeFrameFilename(c.path, Range{Offset: 0, Length: len(compressed)}) - capturing := newCaptureReader(bytesRangeReader(compressed), len(compressed), true, - c.compressedFrameWriteback(framePath, 0, len(compressed), trace.SpanContext{})) - rc, err := NewDecompressingReader(capturing, CompressionLZ4) + capturing := newCaptureReader(bytesRangeReader(compressed), len(compressed), + c.compressedFrameWriteback(framePath, 0, len(compressed), SourceFS, CompressionLZ4, trace.SpanContext{})) + rc, err := newDecompressReader(capturing, CompressionLZ4, SourceFS, c.objType) require.NoError(t, err) got, err := io.ReadAll(rc) @@ -102,9 +102,9 @@ func TestDecompressingCacheReader(t *testing.T) { compressedProd := lz4CompressProd(t, original) framePath := makeFrameFilename(c.path, Range{Offset: 0, Length: len(compressedProd)}) - capturing := newCaptureReader(bytesRangeReader(compressedProd), len(compressedProd), true, - c.compressedFrameWriteback(framePath, 0, len(compressedProd), trace.SpanContext{})) - rc, err := NewDecompressingReader(capturing, CompressionLZ4) + capturing := newCaptureReader(bytesRangeReader(compressedProd), len(compressedProd), + c.compressedFrameWriteback(framePath, 0, len(compressedProd), SourceFS, CompressionLZ4, trace.SpanContext{})) + rc, err := newDecompressReader(capturing, CompressionLZ4, SourceFS, c.objType) require.NoError(t, err) out := make([]byte, len(original)) @@ -113,7 +113,7 @@ func TestDecompressingCacheReader(t *testing.T) { require.Equal(t, len(original), n) require.Equal(t, original, out) - closeErr := rc.Close(t.Context()) + _, closeErr := rc.Close(t.Context()) require.NoError(t, closeErr, "writeback failure must not surface as a read error") c.wg.Wait() @@ -127,16 +127,16 @@ func TestDecompressingCacheReader(t *testing.T) { c := newTestCache(t) framePath := makeFrameFilename(c.path, Range{Offset: 0, Length: len(compressed)}) - capturing := newCaptureReader(bytesRangeReader(compressed), len(compressed)+100, true, - c.compressedFrameWriteback(framePath, 0, len(compressed)+100, trace.SpanContext{})) // wrong expected size - rc, err := NewDecompressingReader(capturing, CompressionLZ4) + capturing := newCaptureReader(bytesRangeReader(compressed), len(compressed)+100, + c.compressedFrameWriteback(framePath, 0, len(compressed)+100, SourceFS, CompressionLZ4, trace.SpanContext{})) // wrong expected size + rc, err := newDecompressReader(capturing, CompressionLZ4, SourceFS, c.objType) require.NoError(t, err) got, err := io.ReadAll(rc) require.NoError(t, err) require.Equal(t, original, got, "decompressed data should be correct regardless") - closeErr := rc.Close(t.Context()) + _, closeErr := rc.Close(t.Context()) require.NoError(t, closeErr, "writeback failure must not surface as a read error") c.wg.Wait() diff --git a/packages/shared/pkg/storage/storage_cache_seekable.go b/packages/shared/pkg/storage/storage_cache_seekable.go index 5b542120bd..baf153ed4a 100644 --- a/packages/shared/pkg/storage/storage_cache_seekable.go +++ b/packages/shared/pkg/storage/storage_cache_seekable.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strconv" "sync" + "time" "github.com/google/uuid" "github.com/launchdarkly/go-sdk-common/v3/ldcontext" @@ -64,6 +65,7 @@ type cachedSeekable struct { inner Seekable flags featureFlagsClient tracer trace.Tracer + objType SeekableObjectType wg sync.WaitGroup } @@ -73,7 +75,7 @@ var ( _ StreamingReader = (*cachedSeekable)(nil) ) -func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (RangeReader, error) { +func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (RangeReader, Source, error) { compressed := frameTable.IsCompressed() ctx, span := c.tracer.Start(ctx, "read", trace.WithAttributes( @@ -83,24 +85,28 @@ func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length )) var rc RangeReader + var source Source var err error - if compressed { - rc, err = c.openReaderCompressed(ctx, off, frameTable) - } else if err = c.validateReadParams(length, off); err == nil { - rc, err = c.openReaderUncompressed(ctx, off, length) + switch { + case compressed: + rc, source, err = c.openReaderCompressed(ctx, off, frameTable) + default: + if err = c.validateReadParams(length, off); err == nil { + rc, source, err = c.openReaderUncompressed(ctx, off, length) + } } if err != nil { recordError(span, err) span.End() - return nil, err + return nil, source, err } - return newObservableReader(rc, nil, span), nil + return newObservableReader(rc).withSpan(span), source, nil } -func (c *cachedSeekable) openReaderUncompressed(ctx context.Context, off, length int64) (RangeReader, error) { +func (c *cachedSeekable) openReaderUncompressed(ctx context.Context, off, length int64) (RangeReader, Source, error) { timer := cacheSlabReadTimerFactory.Begin( attribute.String(nfsCacheOperationAttr, nfsCacheOperationAttrReadAt), attribute.Bool("compressed", false), @@ -112,8 +118,9 @@ func (c *cachedSeekable) openReaderUncompressed(ctx context.Context, off, length if err == nil { recordCacheRead(ctx, true, length, cacheTypeSeekable, cacheOpOpenRangeReader) timer.Success(ctx, length) + readCache.Add(ctx, 1, CacheHitAttrs(c.objType, SourceNFS, CompressionNone)) - return newSectionReader(fp, 0, length), nil + return newSectionReader(fp, 0, length), SourceNFS, nil } if !os.IsNotExist(err) { @@ -121,27 +128,28 @@ func (c *cachedSeekable) openReaderUncompressed(ctx context.Context, off, length } timer.Failure(ctx, 0) + readCache.Add(ctx, 1, CacheMissAttrs(c.objType, SourceNFS, CompressionNone)) - rc, err := c.inner.OpenRangeReader(ctx, off, length, nil) + rc, innerSource, err := c.inner.OpenRangeReader(ctx, off, length, nil) if err != nil { - return nil, fmt.Errorf("failed to open inner range reader: %w", err) + return nil, innerSource, fmt.Errorf("failed to open inner range reader: %w", err) } recordCacheRead(ctx, false, length, cacheTypeSeekable, cacheOpOpenRangeReader) if !skipCacheWriteback(ctx) { - rc = newCaptureReader(rc, int(length), false, - c.uncompressedChunkWriteback(chunkPath, off, length, trace.SpanContextFromContext(ctx))) + rc = newCaptureReader(rc, int(length), + c.uncompressedChunkWriteback(chunkPath, off, length, innerSource, trace.SpanContextFromContext(ctx))) } - return rc, nil + return rc, innerSource, nil } // uncompressedChunkWriteback returns a captureReader callback that persists // the captured chunk to the NFS cache in a detached goroutine. Best-effort: // a short capture (e.g. upstream truncation) is dropped silently — a streaming // reader always ends in EOF, so byte count is the only reliable signal. -func (c *cachedSeekable) uncompressedChunkWriteback(chunkPath string, off, expectedLen int64, fetchSpan trace.SpanContext) func(context.Context, []byte) { +func (c *cachedSeekable) uncompressedChunkWriteback(chunkPath string, off, expectedLen int64, src Source, fetchSpan trace.SpanContext) func(context.Context, []byte) { return func(ctx context.Context, captured []byte) { if !isCompleteRead(len(captured), int(expectedLen), nil) { return @@ -151,7 +159,10 @@ func (c *cachedSeekable) uncompressedChunkWriteback(chunkPath string, off, expec ctx, span := c.tracer.Start(ctx, "write range reader chunk back to cache", writebackSpanOpts(fetchSpan)...) defer span.End() + start := time.Now() err := c.writeToCache(ctx, off, chunkPath, captured) + recordWriteback(ctx, time.Since(start), int64(len(captured)), c.objType, src, CompressionNone, err) + if err != nil { recordError(span, err) recordCacheWriteError(ctx, cacheTypeSeekable, cacheOpOpenRangeReader, err) diff --git a/packages/shared/pkg/storage/storage_cache_seekable_compressed.go b/packages/shared/pkg/storage/storage_cache_seekable_compressed.go index 98a1fe4cf0..e86aead867 100644 --- a/packages/shared/pkg/storage/storage_cache_seekable_compressed.go +++ b/packages/shared/pkg/storage/storage_cache_seekable_compressed.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -18,10 +19,10 @@ var compressedCacheReadAttrs = []attribute.KeyValue{ // openReaderCompressed handles the compressed cache path for OpenRangeReader. // NFS stores compressed frames (.frm); on hit we decompress, on miss we fetch // raw compressed bytes and tee them to NFS on Close. -func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64, frameTable *FrameTable) (RangeReader, error) { +func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64, frameTable *FrameTable) (RangeReader, Source, error) { r, err := frameTable.LocateCompressed(offsetU) if err != nil { - return nil, fmt.Errorf("frame lookup for offset %d: %w", offsetU, err) + return nil, UnknownSource, fmt.Errorf("frame lookup for offset %d: %w", offsetU, err) } path := makeFrameFilename(c.path, r) @@ -37,14 +38,15 @@ func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64 recordCacheRead(ctx, true, int64(r.Length), cacheTypeSeekable, cacheOpOpenRangeReader) timer.Success(ctx, int64(r.Length)) - dec, err := NewDecompressingReader(NewRangeReader(f), ct) + dec, err := newDecompressReader(NewRangeReader(f), ct, SourceNFS, c.objType) if err != nil { f.Close() - return nil, fmt.Errorf("decompress cached frame: %w", err) + return nil, SourceNFS, fmt.Errorf("decompress cached frame: %w", err) } + readCache.Add(ctx, 1, CacheHitAttrs(c.objType, SourceNFS, ct)) - return dec, nil + return dec, SourceNFS, nil case statErr == nil: // Confirmed size mismatch: drop the file so the miss path rewrites it. f.Close() @@ -61,36 +63,37 @@ func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64 } timer.Failure(ctx, 0) + readCache.Add(ctx, 1, CacheMissAttrs(c.objType, SourceNFS, ct)) // Cache miss: fetch raw compressed bytes via OpenRangeReader(nil frameTable). - raw, err := c.inner.OpenRangeReader(ctx, r.Offset, int64(r.Length), nil) + raw, innerSource, err := c.inner.OpenRangeReader(ctx, r.Offset, int64(r.Length), nil) if err != nil { - return nil, fmt.Errorf("raw fetch at C=%d: %w", r.Offset, err) + return nil, innerSource, fmt.Errorf("raw fetch at C=%d: %w", r.Offset, err) } recordCacheRead(ctx, false, int64(r.Length), cacheTypeSeekable, cacheOpOpenRangeReader) - in := raw + src := raw if !skipCacheWriteback(ctx) { - in = newCaptureReader(raw, r.Length, true, - c.compressedFrameWriteback(path, offsetU, r.Length, trace.SpanContextFromContext(ctx))) + src = newCaptureReader(raw, r.Length, + c.compressedFrameWriteback(path, offsetU, r.Length, innerSource, ct, trace.SpanContextFromContext(ctx))) } - dec, err := NewDecompressingReader(in, ct) + dec, err := newDecompressReader(src, ct, innerSource, c.objType) if err != nil { - in.Close(ctx) + src.Close(ctx) - return nil, fmt.Errorf("create decompressor: %w", err) + return nil, innerSource, fmt.Errorf("create decompressor: %w", err) } - return dec, nil + return dec, innerSource, nil } // compressedFrameWriteback returns a captureReader callback that // persists the captured frame to the NFS cache in a detached goroutine. // Best-effort: a short capture is logged and skipped — the caller already // got valid decompressed bytes. -func (c *cachedSeekable) compressedFrameWriteback(framePath string, offset int64, expectedSize int, fetchSpan trace.SpanContext) func(context.Context, []byte) { +func (c *cachedSeekable) compressedFrameWriteback(framePath string, offset int64, expectedSize int, src Source, codec CompressionType, fetchSpan trace.SpanContext) func(context.Context, []byte) { return func(ctx context.Context, frame []byte) { if !isCompleteRead(len(frame), expectedSize, nil) { recordCacheWriteError(ctx, cacheTypeSeekable, cacheOpOpenRangeReader, @@ -103,7 +106,10 @@ func (c *cachedSeekable) compressedFrameWriteback(framePath string, offset int64 ctx, span := c.tracer.Start(ctx, "write compressed frame back to cache", writebackSpanOpts(fetchSpan)...) defer span.End() + start := time.Now() err := c.writeToCache(ctx, offset, framePath, frame) + recordWriteback(ctx, time.Since(start), int64(len(frame)), c.objType, src, codec, err) + if err != nil { recordError(span, err) recordCacheWriteError(ctx, cacheTypeSeekable, cacheOpOpenRangeReader, err) diff --git a/packages/shared/pkg/storage/storage_cache_seekable_test.go b/packages/shared/pkg/storage/storage_cache_seekable_test.go index 50c584bd88..783666ca62 100644 --- a/packages/shared/pkg/storage/storage_cache_seekable_test.go +++ b/packages/shared/pkg/storage/storage_cache_seekable_test.go @@ -15,28 +15,49 @@ import ( "go.opentelemetry.io/otel/trace" ) -// mustClose closes a RangeReader and asserts no error. -func mustClose(t *testing.T, rc RangeReader) { +// testCache returns a cachedSeekable with a fresh temp dir, the given chunk +// size, and a MockSeekable for inner. Cast c.inner to *MockSeekable to attach +// EXPECTs; for cache-only tests, leave it untouched (mockery only flags +// unexpected calls, not unused mocks). +func testCache(t *testing.T, chunkSize int64) cachedSeekable { t.Helper() - require.NoError(t, rc.Close(t.Context())) + + return cachedSeekable{ + path: t.TempDir(), + chunkSize: chunkSize, + inner: NewMockSeekable(t), + tracer: noopTracer, + } } -// bytesRangeReader wraps an in-memory byte slice as a RangeReader for tests. -func bytesRangeReader(b []byte) RangeReader { - return NewRangeReader(io.NopCloser(bytes.NewReader(b))) +// testCacheMock is testCache plus the inner mock already cast, for tests that +// attach EXPECTs. The returned cachedSeekable shares c.inner with the mock, so +// callers should not reassign c.inner before setting expectations. +func testCacheMock(t *testing.T, chunkSize int64) (c cachedSeekable, m *MockSeekable) { + t.Helper() + + m = NewMockSeekable(t) + c = cachedSeekable{ + path: t.TempDir(), + chunkSize: chunkSize, + inner: m, + tracer: noopTracer, + } + + return } // testReadAt emulates the removed cachedSeekable.ReadAt via OpenRangeReader. // This preserves the base test structure after ReadAt was removed from the Seekable interface. func testReadAt(ctx context.Context, c *cachedSeekable, buff []byte, off int64) (int, error) { - rc, err := c.OpenRangeReader(ctx, off, int64(len(buff)), nil) + rc, _, err := c.OpenRangeReader(ctx, off, int64(len(buff)), nil) if err != nil { return 0, err } n, err := io.ReadFull(rc, buff) - closeErr := rc.Close(ctx) + _, closeErr := rc.Close(ctx) if errors.Is(err, io.ErrUnexpectedEOF) { err = io.EOF } @@ -48,6 +69,18 @@ func testReadAt(ctx context.Context, c *cachedSeekable, buff []byte, off int64) return n, err } +// mustClose closes a RangeReader and asserts no error, discarding the stats. +func mustClose(t *testing.T, rc RangeReader) { + t.Helper() + _, err := rc.Close(t.Context()) + require.NoError(t, err) +} + +// bytesRangeReader wraps an in-memory byte slice as a RangeReader for tests. +func bytesRangeReader(b []byte) RangeReader { + return NewRangeReader(io.NopCloser(bytes.NewReader(b))) +} + func TestCachedFileObjectProvider_MakeChunkFilename(t *testing.T) { t.Parallel() @@ -64,10 +97,8 @@ func TestCachedFileObjectProvider_Size(t *testing.T) { const expectedSize int64 = 1024 - inner := NewMockSeekable(t) - inner.EXPECT().Size(mock.Anything).Return(expectedSize, nil) - - c := cachedSeekable{path: t.TempDir(), inner: inner, tracer: noopTracer} + c, inner := testCacheMock(t, 10) + inner.EXPECT().Size(mock.Anything).Return(expectedSize, nil).Once() // first call will write to cache size, err := c.Size(t.Context()) @@ -92,38 +123,33 @@ func TestCachedFileObjectProvider_WriteFromFileSystem(t *testing.T) { t.Run("can be cached successfully", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - cacheDir := filepath.Join(tempDir, "cache") - tempFilename := filepath.Join(tempDir, "temp.bin") + tempFilename := filepath.Join(t.TempDir(), "temp.bin") data := []byte("hello world") - err := os.MkdirAll(cacheDir, os.ModePerm) - require.NoError(t, err) - - err = os.WriteFile(tempFilename, data, 0o644) - require.NoError(t, err) + require.NoError(t, os.WriteFile(tempFilename, data, 0o644)) - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 1024) inner.EXPECT(). StoreFile(mock.Anything, mock.Anything). - Return(nil, [32]byte{}, nil) + Return(nil, [32]byte{}, nil). + Once() featureFlags := NewMockFeatureFlagsClient(t) featureFlags.EXPECT().BoolFlag(mock.Anything, mock.Anything).Return(true) featureFlags.EXPECT().IntFlag(mock.Anything, mock.Anything).Return(10) - - c := cachedSeekable{path: cacheDir, inner: inner, chunkSize: 1024, flags: featureFlags, tracer: noopTracer} + c.flags = featureFlags // write temp file - _, _, err = c.StoreFile(t.Context(), tempFilename) + _, _, err := c.StoreFile(t.Context(), tempFilename) require.NoError(t, err) // file is written asynchronously, wait for it to finish c.wg.Wait() + // Remaining reads should come from cache only. c.inner = nil + c.flags = nil - // size should be cached size, err := c.Size(t.Context()) require.NoError(t, err) assert.Equal(t, int64(len(data)), size) @@ -143,18 +169,12 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("read from cache when the file exists", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - - tempPath := filepath.Join(tempDir, "a", "b", "c") - c := cachedSeekable{path: tempPath, chunkSize: 3, tracer: noopTracer} + c := testCache(t, 3) // create cache file cacheFilename := c.makeChunkFilename(0) - dirName := filepath.Dir(cacheFilename) - err := os.MkdirAll(dirName, 0o755) - require.NoError(t, err) - err = os.WriteFile(cacheFilename, []byte{1, 2, 3}, 0o600) - require.NoError(t, err) + require.NoError(t, os.MkdirAll(filepath.Dir(cacheFilename), 0o755)) + require.NoError(t, os.WriteFile(cacheFilename, []byte{1, 2, 3}, 0o600)) buffer := make([]byte, 3) read, err := testReadAt(t.Context(), &c, buffer, 0) @@ -166,9 +186,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("short cache file returns EOF via ReadAt", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - - c := cachedSeekable{path: tempDir, chunkSize: 10, tracer: noopTracer} + c := testCache(t, 10) // Plant a 3-byte cache file (valid last chunk). chunkPath := c.makeChunkFilename(0) @@ -189,23 +207,15 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Parallel() fakeData := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 3) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - RunAndReturn(func(_ context.Context, off int64, length int64, _ *FrameTable) (RangeReader, error) { + RunAndReturn(func(_ context.Context, off int64, length int64, _ *FrameTable) (RangeReader, Source, error) { end := min(int(off)+int(length), len(fakeData)) - return NewRangeReader(io.NopCloser(bytes.NewReader(fakeData[off:end]))), nil - }) - - tempDir := t.TempDir() - c := cachedSeekable{ - path: tempDir, - chunkSize: 3, - inner: inner, - tracer: noopTracer, - } + return bytesRangeReader(fakeData[off:end]), SourceFS, nil + }).Once() // first read goes to source buffer := make([]byte, 3) @@ -219,6 +229,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { // second read pulls from cache c.inner = nil // prevent remote reads, force cache read + buffer = make([]byte, 3) read, err = testReadAt(t.Context(), &c, buffer, 3) require.NoError(t, err) @@ -324,18 +335,10 @@ func TestCachedSeekableObjectProvider_ReadAt(t *testing.T) { t.Run("zero byte read with EOF is not cached", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader(nil))), nil) - - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } + Return(bytesRangeReader(nil), SourceFS, nil) buff := make([]byte, 10) count, err := testReadAt(t.Context(), &c, buff, 0) @@ -352,19 +355,12 @@ func TestCachedSeekableObjectProvider_ReadAt(t *testing.T) { t.Run("full read without EOF is cached", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() data := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - inner := NewMockSeekable(t) + + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader(data))), nil) - - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } + Return(bytesRangeReader(data), SourceFS, nil) buff := make([]byte, 10) count, err := testReadAt(t.Context(), &c, buff, 0) @@ -415,18 +411,10 @@ func TestCachedSeekable_ReadAt_PreservesEOF(t *testing.T) { t.Run("EOF from inner is returned to caller unchanged", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader([]byte{1, 2, 3}))), nil) - - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } + Return(bytesRangeReader([]byte{1, 2, 3}), SourceFS, nil) buff := make([]byte, 10) n, err := testReadAt(t.Context(), &c, buff, 0) @@ -439,18 +427,10 @@ func TestCachedSeekable_ReadAt_PreservesEOF(t *testing.T) { t.Run("nil error from inner is returned to caller unchanged", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}))), nil) - - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } + Return(bytesRangeReader([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), SourceFS, nil) buff := make([]byte, 10) n, err := testReadAt(t.Context(), &c, buff, 0) @@ -464,22 +444,15 @@ func TestCachedSeekable_ReadAt_PreservesEOF(t *testing.T) { func TestCachedSeekable_ReadAt_SkipCacheWriteback(t *testing.T) { t.Parallel() - tempDir := t.TempDir() data := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - inner := NewMockSeekable(t) + + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, mock.Anything, mock.Anything, (*FrameTable)(nil)). - RunAndReturn(func(_ context.Context, _ int64, _ int64, _ *FrameTable) (RangeReader, error) { - return NewRangeReader(io.NopCloser(bytes.NewReader(data))), nil + RunAndReturn(func(_ context.Context, _ int64, _ int64, _ *FrameTable) (RangeReader, Source, error) { + return bytesRangeReader(data), SourceFS, nil }) - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } - ctx := WithSkipCacheWriteback(t.Context()) buff := make([]byte, 10) n, err := testReadAt(ctx, &c, buff, 0) @@ -499,74 +472,59 @@ func TestCachedSeekable_OpenRangeReader(t *testing.T) { t.Run("cache miss then full read populates cache for next call", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() data := []byte("hello") - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, int64(0), int64(len(data)), (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader(data))), nil). + Return(bytesRangeReader(data), SourceFS, nil). Once() - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } - // First call: cache miss, reads from inner. - rc, err := c.OpenRangeReader(t.Context(), 0, int64(len(data)), nil) + rc, _, err := c.OpenRangeReader(t.Context(), 0, int64(len(data)), nil) require.NoError(t, err) got, err := io.ReadAll(rc) require.NoError(t, err) assert.Equal(t, data, got) - require.NoError(t, rc.Close(t.Context())) + mustClose(t, rc) c.wg.Wait() // Second call: should serve from NFS cache, inner not called again. c.inner = nil - rc2, err := c.OpenRangeReader(t.Context(), 0, int64(len(data)), nil) + + rc2, _, err := c.OpenRangeReader(t.Context(), 0, int64(len(data)), nil) require.NoError(t, err) got2, err := io.ReadAll(rc2) require.NoError(t, err) assert.Equal(t, data, got2) - require.NoError(t, rc2.Close(t.Context())) + mustClose(t, rc2) }) t.Run("skip cache writeback returns inner directly", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() data := []byte("hello") - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, int64(0), int64(len(data)), (*FrameTable)(nil)). - RunAndReturn(func(_ context.Context, _ int64, _ int64, _ *FrameTable) (RangeReader, error) { - return NewRangeReader(io.NopCloser(bytes.NewReader(data))), nil + RunAndReturn(func(_ context.Context, _ int64, _ int64, _ *FrameTable) (RangeReader, Source, error) { + return bytesRangeReader(data), SourceFS, nil }). Times(2) - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } - ctx := WithSkipCacheWriteback(t.Context()) - rc, err := c.OpenRangeReader(ctx, 0, int64(len(data)), nil) + rc, _, err := c.OpenRangeReader(ctx, 0, int64(len(data)), nil) require.NoError(t, err) got, err := io.ReadAll(rc) require.NoError(t, err) assert.Equal(t, data, got) - require.NoError(t, rc.Close(ctx)) + mustClose(t, rc) c.wg.Wait() @@ -575,39 +533,30 @@ func TestCachedSeekable_OpenRangeReader(t *testing.T) { _, err = os.Stat(chunkPath) assert.True(t, os.IsNotExist(err), "skip writeback should not populate cache") - rc2, err := c.OpenRangeReader(ctx, 0, int64(len(data)), nil) + rc2, _, err := c.OpenRangeReader(ctx, 0, int64(len(data)), nil) require.NoError(t, err) got2, err := io.ReadAll(rc2) require.NoError(t, err) assert.Equal(t, data, got2) - require.NoError(t, rc2.Close(ctx)) + mustClose(t, rc2) }) t.Run("truncated inner read does not populate cache", func(t *testing.T) { t.Parallel() - tempDir := t.TempDir() - - inner := NewMockSeekable(t) + c, inner := testCacheMock(t, 10) inner.EXPECT(). OpenRangeReader(mock.Anything, int64(0), int64(5), (*FrameTable)(nil)). - Return(NewRangeReader(io.NopCloser(bytes.NewReader([]byte{0xAA, 0xBB}))), nil) - - c := cachedSeekable{ - path: tempDir, - chunkSize: 10, - inner: inner, - tracer: noopTracer, - } + Return(bytesRangeReader([]byte{0xAA, 0xBB}), SourceFS, nil) - rc, err := c.OpenRangeReader(t.Context(), 0, 5, nil) + rc, _, err := c.OpenRangeReader(t.Context(), 0, 5, nil) require.NoError(t, err) got, err := io.ReadAll(rc) require.NoError(t, err) assert.Equal(t, []byte{0xAA, 0xBB}, got) - require.NoError(t, rc.Close(t.Context())) + mustClose(t, rc) c.wg.Wait() @@ -620,31 +569,21 @@ func TestCachedSeekable_OpenRangeReader(t *testing.T) { func TestCacheWriteThroughReader(t *testing.T) { t.Parallel() - newTestCache := func(t *testing.T) cachedSeekable { - t.Helper() - - return cachedSeekable{ - path: t.TempDir(), - chunkSize: 10, - tracer: noopTracer, - } - } - t.Run("complete read is cached", func(t *testing.T) { t.Parallel() - c := newTestCache(t) + c := testCache(t, 10) data := []byte("hello") - inner := NewRangeReader(io.NopCloser(bytes.NewReader(data))) + inner := bytesRangeReader(data) - r := newCaptureReader(inner, len(data), false, - c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, int64(len(data)), trace.SpanContext{})) + r := newCaptureReader(inner, len(data), + c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, int64(len(data)), SourceFS, trace.SpanContext{})) got, err := io.ReadAll(r) require.NoError(t, err) assert.Equal(t, data, got) - require.NoError(t, r.Close(t.Context())) + mustClose(t, r) c.wg.Wait() cached, err := os.ReadFile(c.makeChunkFilename(0)) @@ -655,20 +594,20 @@ func TestCacheWriteThroughReader(t *testing.T) { t.Run("truncated upstream fully consumed is not cached", func(t *testing.T) { t.Parallel() - c := newTestCache(t) + c := testCache(t, 10) // Inner has only 2 bytes but expectedLen is 5. The reader is // fully consumed (EOF is reached), yet the total doesn't match // the expected length so it must not be cached. - inner := NewRangeReader(io.NopCloser(bytes.NewReader([]byte{0xAA, 0xBB}))) + inner := bytesRangeReader([]byte{0xAA, 0xBB}) - r := newCaptureReader(inner, 5, false, - c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, 5, trace.SpanContext{})) + r := newCaptureReader(inner, 5, + c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, 5, SourceFS, trace.SpanContext{})) got, err := io.ReadAll(r) require.NoError(t, err) assert.Equal(t, []byte{0xAA, 0xBB}, got) - require.NoError(t, r.Close(t.Context())) + mustClose(t, r) c.wg.Wait() _, err = os.Stat(c.makeChunkFilename(0)) @@ -678,12 +617,12 @@ func TestCacheWriteThroughReader(t *testing.T) { t.Run("partially consumed reader closed early is not cached", func(t *testing.T) { t.Parallel() - c := newTestCache(t) + c := testCache(t, 10) data := []byte("hello") - inner := NewRangeReader(io.NopCloser(bytes.NewReader(data))) + inner := bytesRangeReader(data) - r := newCaptureReader(inner, len(data), false, - c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, int64(len(data)), trace.SpanContext{})) + r := newCaptureReader(inner, len(data), + c.uncompressedChunkWriteback(c.makeChunkFilename(0), 0, int64(len(data)), SourceFS, trace.SpanContext{})) // Read only 2 of 5 bytes, then close without reaching EOF. buf := make([]byte, 2) @@ -691,7 +630,7 @@ func TestCacheWriteThroughReader(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, n) - require.NoError(t, r.Close(t.Context())) + mustClose(t, r) c.wg.Wait() _, err = os.Stat(c.makeChunkFilename(0)) diff --git a/packages/shared/pkg/storage/storage_fs.go b/packages/shared/pkg/storage/storage_fs.go index e76d8feb34..783b8dcf1b 100644 --- a/packages/shared/pkg/storage/storage_fs.go +++ b/packages/shared/pkg/storage/storage_fs.go @@ -30,7 +30,8 @@ type fsStorage struct { var _ StorageProvider = (*fsStorage)(nil) type fsObject struct { - path string + path string + objType SeekableObjectType } var ( @@ -71,14 +72,15 @@ func (s *fsStorage) UploadSignedURL(_ context.Context, path string, ttl time.Dur return u, nil } -func (s *fsStorage) OpenSeekable(_ context.Context, path string, _ SeekableObjectType) (Seekable, error) { +func (s *fsStorage) OpenSeekable(_ context.Context, path string, objectType SeekableObjectType) (Seekable, error) { dir := filepath.Dir(s.getPath(path)) if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err } return &fsObject{ - path: s.getPath(path), + path: s.getPath(path), + objType: objectType, }, nil } @@ -301,27 +303,32 @@ func (u *fsPartUploader) Complete(_ context.Context) error { return os.WriteFile(u.fullPath, u.Assemble(), 0o644) } -func (o *fsObject) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, error) { +func (o *fsObject) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, Source, error) { if frameTable.IsCompressed() { r, err := frameTable.LocateCompressed(offsetU) if err != nil { - return nil, fmt.Errorf("get frame for offset %d, FS:%s: %w", offsetU, o.path, err) + return nil, SourceFS, fmt.Errorf("get frame for offset %d, FS:%s: %w", offsetU, o.path, err) } raw, err := o.openRangeReader(ctx, r.Offset, int64(r.Length)) if err != nil { - return nil, err + return nil, SourceFS, err } - dec, err := NewDecompressingReader(raw, frameTable.CompressionType()) + dec, err := newDecompressReader(raw, frameTable.CompressionType(), SourceFS, o.objType) if err != nil { raw.Close(ctx) - return nil, err + return nil, SourceFS, err } - return dec, nil + return dec, SourceFS, nil + } + + raw, err := o.openRangeReader(ctx, offsetU, length) + if err != nil { + return nil, SourceFS, err } - return o.openRangeReader(ctx, offsetU, length) + return newObservableReader(raw), SourceFS, nil } diff --git a/packages/shared/pkg/storage/storage_google.go b/packages/shared/pkg/storage/storage_google.go index 68fab203d8..051a6df8a0 100644 --- a/packages/shared/pkg/storage/storage_google.go +++ b/packages/shared/pkg/storage/storage_google.go @@ -85,6 +85,7 @@ type gcpObject struct { storage *gcpStorage path string handle *storage.ObjectHandle + objType SeekableObjectType limiter *limit.Limiter } @@ -167,7 +168,7 @@ func (s *gcpStorage) UploadSignedURL(_ context.Context, path string, ttl time.Du return url, nil } -func (s *gcpStorage) OpenSeekable(_ context.Context, path string, _ SeekableObjectType) (Seekable, error) { +func (s *gcpStorage) OpenSeekable(_ context.Context, path string, objectType SeekableObjectType) (Seekable, error) { handle := s.bucket.Object(path).Retryer( storage.WithMaxAttempts(googleMaxAttempts), storage.WithPolicy(storage.RetryAlways), @@ -184,6 +185,7 @@ func (s *gcpStorage) OpenSeekable(_ context.Context, path string, _ SeekableObje storage: s, path: path, handle: handle, + objType: objectType, limiter: s.limiter, }, nil @@ -592,7 +594,7 @@ func parseServiceAccountBase64(serviceAccount string) (*gcpServiceToken, error) return &sa, nil } -func (o *gcpObject) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, error) { +func (o *gcpObject) OpenRangeReader(ctx context.Context, offsetU int64, length int64, frameTable *FrameTable) (RangeReader, Source, error) { timer := googleReadTimerFactory.Begin(attribute.String(gcsOperationAttr, gcsOperationAttrReadAt)) if !frameTable.IsCompressed() { @@ -600,35 +602,37 @@ func (o *gcpObject) OpenRangeReader(ctx context.Context, offsetU int64, length i if err != nil { timer.Failure(ctx, 0) - return nil, err + return nil, SourceGCS, err } - return newObservableReader(NewRangeReader(rc), timer, nil), nil + return newObservableReader(NewRangeReader(rc)). + withTimer(timer), SourceGCS, nil } r, err := frameTable.LocateCompressed(offsetU) if err != nil { timer.Failure(ctx, 0) - return nil, fmt.Errorf("get frame for offset %d, GCS:%s: %w", offsetU, o.path, err) + return nil, SourceGCS, fmt.Errorf("get frame for offset %d, GCS:%s: %w", offsetU, o.path, err) } raw, err := o.openRangeReader(ctx, r.Offset, int64(r.Length)) if err != nil { timer.Failure(ctx, 0) - return nil, err + return nil, SourceGCS, err } - dec, err := NewDecompressingReader(NewRangeReader(raw), frameTable.CompressionType()) + dec, err := newDecompressReader(NewRangeReader(raw), frameTable.CompressionType(), SourceGCS, o.objType) if err != nil { raw.Close() timer.Failure(ctx, 0) - return nil, err + return nil, SourceGCS, err } - return newObservableReader(dec, timer, nil), nil + return newObservableReader(dec). + withTimer(timer), SourceGCS, nil } func isResourceExhausted(err error) bool { diff --git a/packages/shared/pkg/telemetry/meters.go b/packages/shared/pkg/telemetry/meters.go index 718e9feb7b..3e441bf5a2 100644 --- a/packages/shared/pkg/telemetry/meters.go +++ b/packages/shared/pkg/telemetry/meters.go @@ -488,6 +488,51 @@ func NewTimerFactory( return TimerFactory{duration, bytes, count}, nil } +// FloatTimerFactory records duration as fractional milliseconds so +// sub-millisecond operations aren't truncated to 0. Callers supply the +// duration via Record. +type FloatTimerFactory struct { + duration metric.Float64Histogram + bytes metric.Int64Counter + count metric.Int64Counter +} + +func NewFloatTimerFactory( + meter metric.Meter, + metricName, durationDescription, bytesDescription, counterDescription string, +) (FloatTimerFactory, error) { + duration, err := meter.Float64Histogram(metricName, + metric.WithDescription(durationDescription), + metric.WithUnit("ms"), + ) + if err != nil { + return FloatTimerFactory{}, fmt.Errorf("failed to create duration histogram: %w", err) + } + + bytes, err := meter.Int64Counter(metricName, + metric.WithDescription(bytesDescription), + metric.WithUnit("By"), + ) + if err != nil { + return FloatTimerFactory{}, fmt.Errorf("failed to create bytes counter: %w", err) + } + + count, err := meter.Int64Counter(metricName, + metric.WithDescription(counterDescription), + ) + if err != nil { + return FloatTimerFactory{}, fmt.Errorf("failed to create count counter: %w", err) + } + + return FloatTimerFactory{duration, bytes, count}, nil +} + +func (f *FloatTimerFactory) Record(ctx context.Context, dur time.Duration, total int64, attrs metric.MeasurementOption) { + f.duration.Record(ctx, float64(dur)/float64(time.Millisecond), attrs) + f.bytes.Add(ctx, total, attrs) + f.count.Add(ctx, 1, attrs) +} + func (f *TimerFactory) Begin(kv ...attribute.KeyValue) *Stopwatch { return &Stopwatch{ histogram: f.duration, @@ -542,9 +587,9 @@ func PrecomputeAttrs(kv ...attribute.KeyValue) metric.MeasurementOption { // RecordRaw records an operation using a precomputed attribute option, it does // not include any previous attributes passed at Begin(). Zero-allocation // alternative to Success/Failure for hot paths. -func (t Stopwatch) RecordRaw(ctx context.Context, total int64, precomputedAttrs metric.MeasurementOption) { +func (t Stopwatch) RecordRaw(ctx context.Context, total int64, allAttrs metric.MeasurementOption) { amount := time.Since(t.start).Milliseconds() - t.histogram.Record(ctx, amount, precomputedAttrs) - t.sum.Add(ctx, total, precomputedAttrs) - t.count.Add(ctx, 1, precomputedAttrs) + t.histogram.Record(ctx, amount, allAttrs) + t.sum.Add(ctx, total, allAttrs) + t.count.Add(ctx, 1, allAttrs) } diff --git a/tests/integration/internal/tests/api/sandboxes/sandbox_rapid_pause_resume_test.go b/tests/integration/internal/tests/api/sandboxes/sandbox_rapid_pause_resume_test.go index 6da9cb9b90..5ea7965310 100644 --- a/tests/integration/internal/tests/api/sandboxes/sandbox_rapid_pause_resume_test.go +++ b/tests/integration/internal/tests/api/sandboxes/sandbox_rapid_pause_resume_test.go @@ -147,7 +147,7 @@ func verifyChecksum(t *testing.T, ctx context.Context, persistence storage.Stora obj, err := persistence.OpenSeekable(ctx, dataPath, objType) require.NoErrorf(t, err, "%s/%s: open data file %s", node.name, fileName, dataPath) - rc, err := obj.OpenRangeReader(ctx, 0, bd.Size, bd.FrameData) + rc, _, err := obj.OpenRangeReader(ctx, 0, bd.Size, bd.FrameData) require.NoErrorf(t, err, "%s/%s: open range reader", node.name, fileName) defer rc.Close(ctx) From a8b2cff410e3483aaac19013093c4ec629a6ab17 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 28 May 2026 14:33:28 +0000 Subject: [PATCH 2/3] chore: auto-commit generated changes --- packages/shared/pkg/storage/mock_seekable.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/shared/pkg/storage/mock_seekable.go b/packages/shared/pkg/storage/mock_seekable.go index 3f766623f0..6f2bfe4e78 100644 --- a/packages/shared/pkg/storage/mock_seekable.go +++ b/packages/shared/pkg/storage/mock_seekable.go @@ -113,8 +113,8 @@ func (_c *MockSeekable_OpenRangeReader_Call) Run(run func(ctx context.Context, o return _c } -func (_c *MockSeekable_OpenRangeReader_Call) Return(rangeReader RangeReader, s Source, err error) *MockSeekable_OpenRangeReader_Call { - _c.Call.Return(rangeReader, s, err) +func (_c *MockSeekable_OpenRangeReader_Call) Return(rangeReader RangeReader, source Source, err error) *MockSeekable_OpenRangeReader_Call { + _c.Call.Return(rangeReader, source, err) return _c } From 8b1aab03a03cdb63b9fb4940ef98708dc6339bb5 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 28 May 2026 15:32:22 -0700 Subject: [PATCH 3/3] =?UTF-8?q?fix(storage):=20PR=20review=20=E2=80=94=20w?= =?UTF-8?q?rap=20S3/peer=20readers,=20guard=20codec=20lookups?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sandbox/template/peerclient/storage.go | 21 +++++++++++++++- .../pkg/storage/read_attrs_precomputed.go | 24 +++++++++++++++++++ packages/shared/pkg/storage/storage_aws.go | 2 +- 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go index 7c299ed8f9..c1817ec1b4 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go @@ -264,11 +264,17 @@ var _ storage.RangeReader = (*peerStreamReader)(nil) // peerStreamReader wraps a gRPC streaming recv function as a storage.RangeReader. // cancel is called on Close to signal the server to terminate the stream. +// bytes/read accumulate the source-read wall and byte count so Close can +// return non-nil ReadStats — Chunker.runFetch otherwise records this fetch as +// 0 bytes and skips orchestrator.read.read for peer-served reads. type peerStreamReader struct { recv func() ([]byte, error) current *bytes.Reader done bool cancel context.CancelFunc + + bytes int64 + read time.Duration } func newPeerStreamReader(recv func() ([]byte, error), cancel context.CancelFunc) *peerStreamReader { @@ -279,6 +285,15 @@ func newPeerStreamReader(recv func() ([]byte, error), cancel context.CancelFunc) } func (r *peerStreamReader) Read(p []byte) (int, error) { + t0 := time.Now() + n, err := r.read1(p) + r.read += time.Since(t0) + r.bytes += int64(n) + + return n, err +} + +func (r *peerStreamReader) read1(p []byte) (int, error) { for { if r.current != nil && r.current.Len() > 0 { return r.current.Read(p) @@ -307,5 +322,9 @@ func (r *peerStreamReader) Read(p []byte) (int, error) { func (r *peerStreamReader) Close(context.Context) (*storage.ReadStats, error) { r.cancel() - return nil, nil + return &storage.ReadStats{ + CompressedBytes: r.bytes, + UncompressedBytes: r.bytes, + Read: r.read, + }, nil } diff --git a/packages/shared/pkg/storage/read_attrs_precomputed.go b/packages/shared/pkg/storage/read_attrs_precomputed.go index 7ad7f64d3d..08944bd0c8 100644 --- a/packages/shared/pkg/storage/read_attrs_precomputed.go +++ b/packages/shared/pkg/storage/read_attrs_precomputed.go @@ -96,23 +96,47 @@ func init() { } } +func safeAttrIdx(o SeekableObjectType, s Source, c CompressionType) (SeekableObjectType, Source, CompressionType) { + if uint(o) >= uint(numSeekableObjectTypes) { + o = UnknownSeekableObjectType + } + if uint(s) >= uint(numSources) { + s = UnknownSource + } + if uint(c) >= numCodecs { + c = CompressionNone + } + + return o, s, c +} + func OKAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + o, s, c = safeAttrIdx(o, s, c) + return tableOK[o][s][c] } func CacheHitAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + o, s, c = safeAttrIdx(o, s, c) + return tableCacheHit[o][s][c] } func CacheMissAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + o, s, c = safeAttrIdx(o, s, c) + return tableCacheMiss[o][s][c] } func CacheWritebackOKAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + o, s, c = safeAttrIdx(o, s, c) + return tableCacheWritebackOK[o][s][c] } func CacheWritebackErrAttrs(o SeekableObjectType, s Source, c CompressionType) metric.MeasurementOption { + o, s, c = safeAttrIdx(o, s, c) + return tableCacheWritebackErr[o][s][c] } diff --git a/packages/shared/pkg/storage/storage_aws.go b/packages/shared/pkg/storage/storage_aws.go index a18bc9e71d..739817fd41 100644 --- a/packages/shared/pkg/storage/storage_aws.go +++ b/packages/shared/pkg/storage/storage_aws.go @@ -253,7 +253,7 @@ func (o *awsObject) OpenRangeReader(ctx context.Context, off, length int64, fram return nil, SourceAWS, fmt.Errorf("failed to create S3 range reader for %q: %w", o.path, err) } - return NewRangeReader(resp.Body), SourceAWS, nil + return newObservableReader(NewRangeReader(resp.Body)), SourceAWS, nil } func (o *awsObject) Size(ctx context.Context) (int64, error) {