From 5b4ac537806885c03a591e69303bbcfa72ceced2 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 14:27:07 -0700 Subject: [PATCH 1/7] perf(storage): reduce multipart upload copying --- .../shared/pkg/storage/compress_upload.go | 37 +++++++++++--- packages/shared/pkg/storage/gcp_multipart.go | 50 +++++++++++-------- .../shared/pkg/storage/gcp_multipart_test.go | 13 +---- 3 files changed, 61 insertions(+), 39 deletions(-) diff --git a/packages/shared/pkg/storage/compress_upload.go b/packages/shared/pkg/storage/compress_upload.go index c4022c8fcb..1186f7084a 100644 --- a/packages/shared/pkg/storage/compress_upload.go +++ b/packages/shared/pkg/storage/compress_upload.go @@ -65,6 +65,7 @@ func (m *memPartUploader) Assemble() []byte { type frame struct { uncompressedSize int compressed []byte + input []byte } type part struct { @@ -72,10 +73,11 @@ type part struct { frames []*frame compressedSize atomic.Int64 compress *errgroup.Group + inputPool *sync.Pool } -func newPart(index int, parentCtx context.Context, workers int) (*part, context.Context) { - p := &part{index: index} +func newPart(index int, parentCtx context.Context, workers int, inputPool *sync.Pool) (*part, context.Context) { + p := &part{index: index, inputPool: inputPool} var ctx context.Context p.compress, ctx = errgroup.WithContext(parentCtx) p.compress.SetLimit(workers) @@ -84,7 +86,7 @@ func newPart(index int, parentCtx context.Context, workers int) (*part, context. } func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { - frameInPart := &frame{uncompressedSize: len(uncompressedData)} + frameInPart := &frame{uncompressedSize: len(uncompressedData), input: uncompressedData[:cap(uncompressedData)]} p.frames = append(p.frames, frameInPart) p.compress.Go(func() error { @@ -104,6 +106,15 @@ func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync }) } +func (p *part) releaseInputBuffers() { + for _, f := range p.frames { + if f.input != nil { + p.inputPool.Put(f.input) + f.input = nil + } + } +} + func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -135,7 +146,9 @@ func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploa var cOffset int64 var loopErr error for p := range q { - if err := p.compress.Wait(); err != nil { + err := p.compress.Wait() + p.releaseInputBuffers() + if err != nil { loopErr = fmt.Errorf("compress frames: %w", err) cancel() @@ -186,24 +199,32 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W frameSize := cfg.FrameSize() minPartSize := cfg.MinPartSize() workers := max(cfg.FrameEncodeWorkers, 1) - p, compressCtx := newPart(1, ctx, workers) + inputPool := &sync.Pool{ + New: func() any { + return make([]byte, frameSize) + }, + } + p, compressCtx := newPart(1, ctx, workers, inputPool) for { if err := ctx.Err(); err != nil { return err } - buf := make([]byte, frameSize) + buf := inputPool.Get().([]byte) n, err := io.ReadFull(in, buf) eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) if err != nil && !eof { + inputPool.Put(buf) return fmt.Errorf("read frame: %w", err) } if n > 0 { hasher.Write(buf[:n]) p.addFrame(compressCtx, buf[:n], compressors) + } else { + inputPool.Put(buf) } if eof { @@ -211,6 +232,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W select { case q <- p: case <-ctx.Done(): + p.releaseInputBuffers() return ctx.Err() } } @@ -222,10 +244,11 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W select { case q <- p: case <-ctx.Done(): + p.releaseInputBuffers() return ctx.Err() } - p, compressCtx = newPart(p.index+1, ctx, workers) + p, compressCtx = newPart(p.index+1, ctx, workers, inputPool) } } } diff --git a/packages/shared/pkg/storage/gcp_multipart.go b/packages/shared/pkg/storage/gcp_multipart.go index 18d407e22d..0a22c2123b 100644 --- a/packages/shared/pkg/storage/gcp_multipart.go +++ b/packages/shared/pkg/storage/gcp_multipart.go @@ -4,8 +4,6 @@ import ( "bytes" "cmp" "context" - "crypto/md5" - "encoding/base64" "encoding/xml" "fmt" "hash" @@ -254,11 +252,6 @@ func (m *MultipartUploader) initiateUpload(ctx context.Context) (string, error) } func (m *MultipartUploader) uploadPart(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { - // Calculate MD5 for data integrity - hasher := md5.New() - hasher.Write(data) - md5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) - url := fmt.Sprintf("%s/%s?partNumber=%d&uploadId=%s", m.baseURL, m.objectName, partNumber, uploadID) @@ -269,7 +262,6 @@ func (m *MultipartUploader) uploadPart(ctx context.Context, uploadID string, par req.Header.Set("Authorization", "Bearer "+m.token) req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data))) - req.Header.Set("Content-MD5", md5Sum) resp, err := m.client.Do(req) if err != nil { @@ -291,29 +283,48 @@ func (m *MultipartUploader) uploadPart(ctx context.Context, uploadID string, par return etag, nil } +type multiSliceReader struct { + slices [][]byte + idx int + off int +} + +func (r *multiSliceReader) Read(p []byte) (int, error) { + var n int + for len(p) > 0 && r.idx < len(r.slices) { + current := r.slices[r.idx] + if r.off >= len(current) { + r.idx++ + r.off = 0 + continue + } + + copied := copy(p, current[r.off:]) + n += copied + r.off += copied + p = p[copied:] + } + + if n > 0 { + return n, nil + } + + return 0, io.EOF +} + // uploadPartSlices uploads a part from multiple byte slices without concatenating them. -// It computes MD5 by hashing each slice and uses a ReaderFunc for retryable reads. func (m *MultipartUploader) uploadPartSlices(ctx context.Context, uploadID string, partNumber int, slices [][]byte) (string, error) { - // Compute MD5 and total length without copying - hasher := md5.New() totalLen := 0 for _, s := range slices { - hasher.Write(s) totalLen += len(s) } - md5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) url := fmt.Sprintf("%s/%s?partNumber=%d&uploadId=%s", m.baseURL, m.objectName, partNumber, uploadID) // Use a ReaderFunc so the retryable client can replay the body on retries bodyFn := func() (io.Reader, error) { - readers := make([]io.Reader, len(slices)) - for i, s := range slices { - readers[i] = bytes.NewReader(s) - } - - return io.MultiReader(readers...), nil + return &multiSliceReader{slices: slices}, nil } req, err := retryablehttp.NewRequestWithContext(ctx, "PUT", url, retryablehttp.ReaderFunc(bodyFn)) @@ -323,7 +334,6 @@ func (m *MultipartUploader) uploadPartSlices(ctx context.Context, uploadID strin req.Header.Set("Authorization", "Bearer "+m.token) req.Header.Set("Content-Length", fmt.Sprintf("%d", totalLen)) - req.Header.Set("Content-MD5", md5Sum) resp, err := m.client.Do(req) if err != nil { diff --git a/packages/shared/pkg/storage/gcp_multipart_test.go b/packages/shared/pkg/storage/gcp_multipart_test.go index c30bb8c117..86dadee6ad 100644 --- a/packages/shared/pkg/storage/gcp_multipart_test.go +++ b/packages/shared/pkg/storage/gcp_multipart_test.go @@ -1,9 +1,7 @@ package storage import ( - "crypto/md5" "crypto/sha256" - "encoding/base64" "encoding/xml" "fmt" "io" @@ -121,20 +119,11 @@ func TestMultipartUploader_UploadPartSlices_Success(t *testing.T) { expectedETag := `"slice-etag"` slices := [][]byte{[]byte("hello "), []byte("world"), []byte("!")} - // Compute expected MD5 over all slices. - h := md5.New() - for _, s := range slices { - h.Write(s) - } - expectedMD5 := base64.StdEncoding.EncodeToString(h.Sum(nil)) - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, "PUT", r.Method) assert.Contains(t, r.URL.RawQuery, "partNumber=3") assert.Contains(t, r.URL.RawQuery, "uploadId=test-upload-id") - - // Verify MD5 matches the expected hash of all slices. - assert.Equal(t, expectedMD5, r.Header.Get("Content-MD5")) + assert.Empty(t, r.Header.Get("Content-MD5")) // Verify body is the concatenation of all slices. body, err := io.ReadAll(r.Body) From 0e50460c8876e1a0e365ed8d66f1a3b4fdfce39b Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 14:42:32 -0700 Subject: [PATCH 2/7] fix(storage): satisfy upload lint checks --- packages/shared/pkg/storage/compress_upload.go | 18 ++++++++++++------ packages/shared/pkg/storage/gcp_multipart.go | 1 + 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/packages/shared/pkg/storage/compress_upload.go b/packages/shared/pkg/storage/compress_upload.go index 1186f7084a..6b7ebbef26 100644 --- a/packages/shared/pkg/storage/compress_upload.go +++ b/packages/shared/pkg/storage/compress_upload.go @@ -65,7 +65,7 @@ func (m *memPartUploader) Assemble() []byte { type frame struct { uncompressedSize int compressed []byte - input []byte + input *[]byte } type part struct { @@ -86,7 +86,7 @@ func newPart(index int, parentCtx context.Context, workers int, inputPool *sync. } func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { - frameInPart := &frame{uncompressedSize: len(uncompressedData), input: uncompressedData[:cap(uncompressedData)]} + frameInPart := &frame{uncompressedSize: len(uncompressedData)} p.frames = append(p.frames, frameInPart) p.compress.Go(func() error { @@ -201,7 +201,8 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W workers := max(cfg.FrameEncodeWorkers, 1) inputPool := &sync.Pool{ New: func() any { - return make([]byte, frameSize) + buf := make([]byte, frameSize) + return &buf }, } p, compressCtx := newPart(1, ctx, workers, inputPool) @@ -211,20 +212,23 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W return err } - buf := inputPool.Get().([]byte) + bufPtr := inputPool.Get().(*[]byte) + buf := (*bufPtr)[:frameSize] n, err := io.ReadFull(in, buf) eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) if err != nil && !eof { - inputPool.Put(buf) + inputPool.Put(bufPtr) + return fmt.Errorf("read frame: %w", err) } if n > 0 { hasher.Write(buf[:n]) p.addFrame(compressCtx, buf[:n], compressors) + p.frames[len(p.frames)-1].input = bufPtr } else { - inputPool.Put(buf) + inputPool.Put(bufPtr) } if eof { @@ -233,6 +237,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W case q <- p: case <-ctx.Done(): p.releaseInputBuffers() + return ctx.Err() } } @@ -245,6 +250,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W case q <- p: case <-ctx.Done(): p.releaseInputBuffers() + return ctx.Err() } diff --git a/packages/shared/pkg/storage/gcp_multipart.go b/packages/shared/pkg/storage/gcp_multipart.go index 0a22c2123b..4ca46566c3 100644 --- a/packages/shared/pkg/storage/gcp_multipart.go +++ b/packages/shared/pkg/storage/gcp_multipart.go @@ -296,6 +296,7 @@ func (r *multiSliceReader) Read(p []byte) (int, error) { if r.off >= len(current) { r.idx++ r.off = 0 + continue } From ef2d92fa12ab5b731846faf28fcbf00cf4925493 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Fri, 29 May 2026 14:48:55 -0700 Subject: [PATCH 3/7] fix(storage): satisfy multipart reader and lint contracts --- packages/shared/pkg/storage/compress_upload.go | 1 + packages/shared/pkg/storage/gcp_multipart.go | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/packages/shared/pkg/storage/compress_upload.go b/packages/shared/pkg/storage/compress_upload.go index 6b7ebbef26..121f106a72 100644 --- a/packages/shared/pkg/storage/compress_upload.go +++ b/packages/shared/pkg/storage/compress_upload.go @@ -202,6 +202,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W inputPool := &sync.Pool{ New: func() any { buf := make([]byte, frameSize) + return &buf }, } diff --git a/packages/shared/pkg/storage/gcp_multipart.go b/packages/shared/pkg/storage/gcp_multipart.go index 4ca46566c3..c59fa8009a 100644 --- a/packages/shared/pkg/storage/gcp_multipart.go +++ b/packages/shared/pkg/storage/gcp_multipart.go @@ -290,6 +290,14 @@ type multiSliceReader struct { } func (r *multiSliceReader) Read(p []byte) (int, error) { + if len(p) == 0 { + if r.idx >= len(r.slices) { + return 0, io.EOF + } + + return 0, nil + } + var n int for len(p) > 0 && r.idx < len(r.slices) { current := r.slices[r.idx] From 616cf63082ed24cb97da8bd8db6b5d451893783e Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Sat, 30 May 2026 19:23:59 -0700 Subject: [PATCH 4/7] perf(storage): release frame input buffers per goroutine (#2869) Move buffer ownership into the compression goroutine: the goroutine that reads `bufPtr` also returns it to `inputPool` via defer when it exits. Drops `frame.input`, `part.releaseInputBuffers`, and the explicit release calls in the read/upload loops. The previous checkpoint design held every part's input buffers until p.compress.Wait() returned in the upload loop, so buffers stayed pinned across the GCS upload latency. The pool kept missing and calling New, leaving allocations on par with or worse than no pool at all. Now, per-goroutine release caps the pool's working set at workers+1 regardless of stream length, so allocations and peak heap actually shrink. Adds compress_upload_pool_demo_test.go (delete before merge) that mirrors the pre-PR-2863 and PR #2863 designs verbatim and runs them side-by-side with the real compressStream from this branch on 256 MiB and 1 GiB inputs with a simulated 500 ms per-part upload latency. ``` go test -run TestPoolLifecycleDemo -v -timeout=10m -count=1 ./packages/shared/pkg/storage/ 2>&1 === RUN TestPoolLifecycleDemo === RUN TestPoolLifecycleDemo/256MiB compress_upload_pool_demo_test.go:473: input=256 MiB, frame=2048 KiB, part=50 MiB, workers=4, upload_delay=500ms compress_upload_pool_demo_test.go:524: main (no pool) : total_alloc= 668.0 MiB mallocs= 2037 heap_inuse_after= 837.9 MiB compress_upload_pool_demo_test.go:524: tomas (PR #2863, checkpoint) : total_alloc= 689.5 MiB mallocs= 2281 heap_inuse_after= 953.7 MiB compress_upload_pool_demo_test.go:524: this branch (per-goroutine) : total_alloc= 370.2 MiB mallocs= 1430 heap_inuse_after= 638.6 MiB compress_upload_pool_demo_test.go:527: --- compress_upload_pool_demo_test.go:528: tomas vs main: total_alloc +21.6 MiB mallocs +244 heap_inuse +115.9 MiB compress_upload_pool_demo_test.go:532: branch vs main: total_alloc -297.8 MiB mallocs -607 heap_inuse -199.2 MiB compress_upload_pool_demo_test.go:536: branch vs tomas:total_alloc -319.4 MiB mallocs -851 heap_inuse -315.1 MiB === RUN TestPoolLifecycleDemo/1024MiB compress_upload_pool_demo_test.go:473: input=1024 MiB, frame=2048 KiB, part=50 MiB, workers=4, upload_delay=500ms compress_upload_pool_demo_test.go:524: main (no pool) : total_alloc= 2341.9 MiB mallocs= 4794 heap_inuse_after= 2986.0 MiB compress_upload_pool_demo_test.go:524: tomas (PR #2863, checkpoint) : total_alloc= 2417.5 MiB mallocs= 5895 heap_inuse_after= 3462.5 MiB compress_upload_pool_demo_test.go:524: this branch (per-goroutine) : total_alloc= 1334.3 MiB mallocs= 4367 heap_inuse_after= 2396.1 MiB compress_upload_pool_demo_test.go:527: --- compress_upload_pool_demo_test.go:528: tomas vs main: total_alloc +75.6 MiB mallocs +1101 heap_inuse +476.5 MiB compress_upload_pool_demo_test.go:532: branch vs main: total_alloc -1007.7 MiB mallocs -427 heap_inuse -589.9 MiB compress_upload_pool_demo_test.go:536: branch vs tomas:total_alloc -1083.3 MiB mallocs -1528 heap_inuse -1066.4 MiB --- PASS: TestPoolLifecycleDemo (6.48s) --- PASS: TestPoolLifecycleDemo/256MiB (2.20s) --- PASS: TestPoolLifecycleDemo/1024MiB (4.29s) PASS ok github.com/e2b-dev/infra/packages/shared/pkg/storage 6.956s ``` --- .../shared/pkg/storage/compress_upload.go | 27 +- .../storage/compress_upload_pool_demo_test.go | 542 ++++++++++++++++++ 2 files changed, 548 insertions(+), 21 deletions(-) create mode 100644 packages/shared/pkg/storage/compress_upload_pool_demo_test.go diff --git a/packages/shared/pkg/storage/compress_upload.go b/packages/shared/pkg/storage/compress_upload.go index 121f106a72..5e96293dd7 100644 --- a/packages/shared/pkg/storage/compress_upload.go +++ b/packages/shared/pkg/storage/compress_upload.go @@ -65,7 +65,6 @@ func (m *memPartUploader) Assemble() []byte { type frame struct { uncompressedSize int compressed []byte - input *[]byte } type part struct { @@ -85,11 +84,13 @@ func newPart(index int, parentCtx context.Context, workers int, inputPool *sync. return p, ctx } -func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { - frameInPart := &frame{uncompressedSize: len(uncompressedData)} +func (p *part) addFrame(ctx context.Context, bufPtr *[]byte, n int, pool *sync.Pool) { + frameInPart := &frame{uncompressedSize: n} p.frames = append(p.frames, frameInPart) + uncompressedData := (*bufPtr)[:n] p.compress.Go(func() error { + defer p.inputPool.Put(bufPtr) if err := ctx.Err(); err != nil { return err } @@ -106,15 +107,6 @@ func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync }) } -func (p *part) releaseInputBuffers() { - for _, f := range p.frames { - if f.input != nil { - p.inputPool.Put(f.input) - f.input = nil - } - } -} - func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -146,9 +138,7 @@ func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploa var cOffset int64 var loopErr error for p := range q { - err := p.compress.Wait() - p.releaseInputBuffers() - if err != nil { + if err := p.compress.Wait(); err != nil { loopErr = fmt.Errorf("compress frames: %w", err) cancel() @@ -226,8 +216,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W if n > 0 { hasher.Write(buf[:n]) - p.addFrame(compressCtx, buf[:n], compressors) - p.frames[len(p.frames)-1].input = bufPtr + p.addFrame(compressCtx, bufPtr, n, compressors) } else { inputPool.Put(bufPtr) } @@ -237,8 +226,6 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W select { case q <- p: case <-ctx.Done(): - p.releaseInputBuffers() - return ctx.Err() } } @@ -250,8 +237,6 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W select { case q <- p: case <-ctx.Done(): - p.releaseInputBuffers() - return ctx.Err() } diff --git a/packages/shared/pkg/storage/compress_upload_pool_demo_test.go b/packages/shared/pkg/storage/compress_upload_pool_demo_test.go new file mode 100644 index 0000000000..a0581cca8e --- /dev/null +++ b/packages/shared/pkg/storage/compress_upload_pool_demo_test.go @@ -0,0 +1,542 @@ +// Demo test: runs the production compressStream from this branch +// (per-goroutine buffer release) side-by-side with a verbatim mirror of +// the PR #2863 design (frame.input + releaseInputBuffers checkpoint). +// +// Both sides see the same input, same CompressConfig, same uploader, same +// real zstd compressor pool. We report runtime memstats deltas so the +// buffer-lifecycle difference shows up as a concrete byte count. +// +// Illustration file — delete before merging the PR. +package storage + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "math/rand" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "golang.org/x/sync/errgroup" +) + +// ============================================================================ +// Verbatim mirror of compress_upload.go from PR #2863 (commit ef2d92fa1). +// Renamed with `old` prefix to coexist with the production code in this branch. +// Only behavioral changes: none. Only syntactic: identifier renames. +// ============================================================================ + +type oldFrame struct { + uncompressedSize int + compressed []byte + input *[]byte +} + +type oldPart struct { + index int + frames []*oldFrame + compressedSize atomic.Int64 + compress *errgroup.Group + inputPool *sync.Pool +} + +func oldNewPart(index int, parentCtx context.Context, workers int, inputPool *sync.Pool) (*oldPart, context.Context) { + p := &oldPart{index: index, inputPool: inputPool} + var ctx context.Context + p.compress, ctx = errgroup.WithContext(parentCtx) + p.compress.SetLimit(workers) + + return p, ctx +} + +func (p *oldPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { + frameInPart := &oldFrame{uncompressedSize: len(uncompressedData)} + p.frames = append(p.frames, frameInPart) + + p.compress.Go(func() error { + if err := ctx.Err(); err != nil { + return err + } + c := pool.Get().(compressor) + out, err := c.compress(uncompressedData) + pool.Put(c) + if err != nil { + return err + } + frameInPart.compressed = out + p.compressedSize.Add(int64(len(out))) + + return nil + }) +} + +func (p *oldPart) releaseInputBuffers() { + for _, f := range p.frames { + if f.input != nil { + p.inputPool.Put(f.input) + f.input = nil + } + } +} + +func oldCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := uploader.Start(ctx); err != nil { + return nil, [32]byte{}, fmt.Errorf("start upload: %w", err) + } + defer uploader.Close() + + if maxUploadConcurrency < 1 { + maxUploadConcurrency = 1 + } + work, workCtx := errgroup.WithContext(ctx) + work.SetLimit(maxUploadConcurrency + 1) + + q := make(chan *oldPart, 2) + hasher := sha256.New() + work.Go(func() error { + defer close(q) + + return oldReadLoop(workCtx, in, cfg, hasher, q) + }) + + var frameSizes []FrameSize + var cOffset int64 + var loopErr error + for p := range q { + err := p.compress.Wait() + p.releaseInputBuffers() + if err != nil { + loopErr = fmt.Errorf("compress frames: %w", err) + cancel() + + break + } + + var compressed [][]byte + for _, f := range p.frames { + frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))}) + compressed = append(compressed, f.compressed) + if sink != nil { + sink(ctx, cOffset, f.compressed) + } + cOffset += int64(len(f.compressed)) + } + + pi := p.index + work.Go(func() error { + return uploader.UploadPart(workCtx, pi, compressed...) + }) + } + + for range q { //nolint:revive // intentional drain + } + workErr := work.Wait() + + if err := errors.Join(loopErr, workErr); err != nil { + return nil, [32]byte{}, err + } + + if err := uploader.Complete(ctx); err != nil { + return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err) + } + + ft := NewFrameTable(cfg.CompressionType(), frameSizes) + + return ft, sum256(hasher), nil +} + +func oldReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *oldPart) error { + compressors, err := newCompressorPool(cfg) + if err != nil { + return err + } + + frameSize := cfg.FrameSize() + minPartSize := cfg.MinPartSize() + workers := max(cfg.FrameEncodeWorkers, 1) + inputPool := &sync.Pool{ + New: func() any { + buf := make([]byte, frameSize) + + return &buf + }, + } + p, compressCtx := oldNewPart(1, ctx, workers, inputPool) + + for { + if err := ctx.Err(); err != nil { + return err + } + + bufPtr := inputPool.Get().(*[]byte) + buf := (*bufPtr)[:frameSize] + n, err := io.ReadFull(in, buf) + + eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) + if err != nil && !eof { + inputPool.Put(bufPtr) + + return fmt.Errorf("read frame: %w", err) + } + + if n > 0 { + hasher.Write(buf[:n]) + p.addFrame(compressCtx, buf[:n], compressors) + p.frames[len(p.frames)-1].input = bufPtr + } else { + inputPool.Put(bufPtr) + } + + if eof { + if len(p.frames) > 0 { + select { + case q <- p: + case <-ctx.Done(): + p.releaseInputBuffers() + + return ctx.Err() + } + } + + return nil + } + + if p.compressedSize.Load() >= minPartSize { + select { + case q <- p: + case <-ctx.Done(): + p.releaseInputBuffers() + + return ctx.Err() + } + p, compressCtx = oldNewPart(p.index+1, ctx, workers, inputPool) + } + } +} + +// ============================================================================ +// MAIN form — pre-PR #2863 design. No input pool at all: each frame allocates +// a fresh `make([]byte, frameSize)` per iteration and relies on GC. +// Mirrors compress_upload.go from immediately before commit 5b4ac5378. +// ============================================================================ + +type mainFrame struct { + uncompressedSize int + compressed []byte +} + +type mainPart struct { + index int + frames []*mainFrame + compressedSize atomic.Int64 + compress *errgroup.Group +} + +func mainNewPart(index int, parentCtx context.Context, workers int) (*mainPart, context.Context) { + p := &mainPart{index: index} + var ctx context.Context + p.compress, ctx = errgroup.WithContext(parentCtx) + p.compress.SetLimit(workers) + + return p, ctx +} + +func (p *mainPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { + frameInPart := &mainFrame{uncompressedSize: len(uncompressedData)} + p.frames = append(p.frames, frameInPart) + p.compress.Go(func() error { + if err := ctx.Err(); err != nil { + return err + } + c := pool.Get().(compressor) + out, err := c.compress(uncompressedData) + pool.Put(c) + if err != nil { + return err + } + frameInPart.compressed = out + p.compressedSize.Add(int64(len(out))) + + return nil + }) +} + +func mainCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if err := uploader.Start(ctx); err != nil { + return nil, [32]byte{}, fmt.Errorf("start upload: %w", err) + } + defer uploader.Close() + if maxUploadConcurrency < 1 { + maxUploadConcurrency = 1 + } + work, workCtx := errgroup.WithContext(ctx) + work.SetLimit(maxUploadConcurrency + 1) + q := make(chan *mainPart, 2) + hasher := sha256.New() + work.Go(func() error { + defer close(q) + + return mainReadLoop(workCtx, in, cfg, hasher, q) + }) + var frameSizes []FrameSize + var cOffset int64 + var loopErr error + for p := range q { + if err := p.compress.Wait(); err != nil { + loopErr = fmt.Errorf("compress frames: %w", err) + cancel() + + break + } + var compressed [][]byte + for _, f := range p.frames { + frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))}) + compressed = append(compressed, f.compressed) + if sink != nil { + sink(ctx, cOffset, f.compressed) + } + cOffset += int64(len(f.compressed)) + } + pi := p.index + work.Go(func() error { + return uploader.UploadPart(workCtx, pi, compressed...) + }) + } + for range q { //nolint:revive // intentional drain + } + workErr := work.Wait() + if err := errors.Join(loopErr, workErr); err != nil { + return nil, [32]byte{}, err + } + if err := uploader.Complete(ctx); err != nil { + return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err) + } + ft := NewFrameTable(cfg.CompressionType(), frameSizes) + + return ft, sum256(hasher), nil +} + +func mainReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *mainPart) error { + compressors, err := newCompressorPool(cfg) + if err != nil { + return err + } + frameSize := cfg.FrameSize() + minPartSize := cfg.MinPartSize() + workers := max(cfg.FrameEncodeWorkers, 1) + p, compressCtx := mainNewPart(1, ctx, workers) + for { + if err := ctx.Err(); err != nil { + return err + } + buf := make([]byte, frameSize) // fresh allocation every frame, no pool + n, err := io.ReadFull(in, buf) + eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) + if err != nil && !eof { + return fmt.Errorf("read frame: %w", err) + } + if n > 0 { + hasher.Write(buf[:n]) + p.addFrame(compressCtx, buf[:n], compressors) + } + if eof { + if len(p.frames) > 0 { + select { + case q <- p: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + } + if p.compressedSize.Load() >= minPartSize { + select { + case q <- p: + case <-ctx.Done(): + return ctx.Err() + } + p, compressCtx = mainNewPart(p.index+1, ctx, workers) + } + } +} + +// ============================================================================ +// Shared harness. +// ============================================================================ + +// demoBuildInput produces deterministic, mildly-compressible data so zstd +// runs with realistic timing instead of degenerate fast-path on zeros. +func demoBuildInput(bytesTotal int) []byte { + out := make([]byte, bytesTotal) + r := rand.New(rand.NewSource(0xCAFEBABE)) + const blockSz = 4096 + for i := 0; i < bytesTotal; i += blockSz { + end := min(i+blockSz, bytesTotal) + // repeat a small random block several times to give zstd something to find + seed := make([]byte, 64) + r.Read(seed) + for j := i; j < end; j++ { + out[j] = seed[(j-i)%len(seed)] + } + } + + return out +} + +// slowUploader wraps memPartUploader and adds a fixed per-part upload delay +// to simulate GCS multipart upload latency. A 50 MiB part to GCS typically +// takes 300-800 ms in-region; we use 500 ms as a representative figure. +type slowUploader struct { + inner memPartUploader + partDelay time.Duration +} + +func (s *slowUploader) Start(ctx context.Context) error { return s.inner.Start(ctx) } +func (s *slowUploader) UploadPart(ctx context.Context, partIndex int, data ...[]byte) error { + select { + case <-time.After(s.partDelay): + case <-ctx.Done(): + return ctx.Err() + } + + return s.inner.UploadPart(ctx, partIndex, data...) +} +func (s *slowUploader) Complete(ctx context.Context) error { return s.inner.Complete(ctx) } +func (s *slowUploader) Close() error { return s.inner.Close() } +func (s *slowUploader) Assemble() []byte { return s.inner.Assemble() } + +func demoCfg() CompressConfig { + return CompressConfig{ + Enabled: true, + Type: "zstd", + Level: 1, // fastest + FrameSizeKB: 2048, + MinPartSizeMB: 50, + FrameEncodeWorkers: 4, + EncoderConcurrency: 0, + } +} + +type demoStats struct { + totalAllocBytes uint64 + mallocs uint64 + heapInUseAfter uint64 +} + +func demoMeasure(b func()) demoStats { + // Explicit GCs isolate this variant's allocation count from previous + // variants' residue. Two consecutive GCs let the runtime clear any + // pending finalizer-held memory. + runtime.GC() //nolint:revive // intentional for measurement isolation + runtime.GC() //nolint:revive // intentional for measurement isolation + var before, after runtime.MemStats + runtime.ReadMemStats(&before) + b() + runtime.ReadMemStats(&after) + + return demoStats{ + totalAllocBytes: after.TotalAlloc - before.TotalAlloc, + mallocs: after.Mallocs - before.Mallocs, + heapInUseAfter: after.HeapInuse, + } +} + +// TestPoolLifecycleDemo runs three designs side-by-side and reports the memory +// cost. Single run, real zstd compression, simulated GCS upload latency. +// +// - main: pre-PR-2863, no pool (fresh `make([]byte, frameSize)` per frame). +// - tomas: PR #2863, pool + frame.input + releaseInputBuffers checkpoint. +// - this branch: pool + per-goroutine defer release. +// +// Cannot run in parallel: variants share process-wide runtime.MemStats. +// +//nolint:paralleltest // measurement requires serial execution +func TestPoolLifecycleDemo(t *testing.T) { + cfg := demoCfg() + const partUploadDelay = 500 * time.Millisecond // ~realistic GCS multipart part latency + + for _, sz := range []int{256 << 20, 1 << 30} { + //nolint:paralleltest // measurement requires serial execution + t.Run(fmt.Sprintf("%dMiB", sz>>20), func(t *testing.T) { + t.Logf("input=%d MiB, frame=%d KiB, part=%d MiB, workers=%d, upload_delay=%v", + sz>>20, cfg.FrameSizeKB, cfg.MinPartSizeMB, cfg.FrameEncodeWorkers, partUploadDelay) + + input := demoBuildInput(sz) + + type variantResult struct { + name string + stats demoStats + ft *FrameTable + hash [32]byte + dst []byte + } + + runVariant := func(name string, fn func(io.Reader, partUploader) (*FrameTable, [32]byte, error)) variantResult { + u := &slowUploader{partDelay: partUploadDelay} + var ft *FrameTable + var hash [32]byte + st := demoMeasure(func() { + var err error + ft, hash, err = fn(bytes.NewReader(input), u) + if err != nil { + t.Fatalf("%s: %v", name, err) + } + }) + + return variantResult{name: name, stats: st, ft: ft, hash: hash, dst: u.Assemble()} + } + + mainR := runVariant("main (no pool) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { + return mainCompressStream(t.Context(), r, cfg, u, 4, nil) + }) + tomasR := runVariant("tomas (PR #2863, checkpoint) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { + return oldCompressStream(t.Context(), r, cfg, u, 4, nil) + }) + branchR := runVariant("this branch (per-goroutine) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { + return compressStream(t.Context(), r, cfg, u, 4, nil) + }) + + if mainR.hash != tomasR.hash || tomasR.hash != branchR.hash { + t.Errorf("hash mismatch: main=%x tomas=%x branch=%x", mainR.hash, tomasR.hash, branchR.hash) + } + if mainR.ft.NumFrames() != tomasR.ft.NumFrames() || tomasR.ft.NumFrames() != branchR.ft.NumFrames() { + t.Errorf("frame count mismatch: main=%d tomas=%d branch=%d", + mainR.ft.NumFrames(), tomasR.ft.NumFrames(), branchR.ft.NumFrames()) + } + if !bytes.Equal(mainR.dst, tomasR.dst) || !bytes.Equal(tomasR.dst, branchR.dst) { + t.Errorf("uploaded payload mismatch across variants") + } + + mib := func(b uint64) float64 { return float64(b) / (1 << 20) } + for _, v := range []variantResult{mainR, tomasR, branchR} { + t.Logf("%s: total_alloc=%7.1f MiB mallocs=%6d heap_inuse_after=%7.1f MiB", + v.name, mib(v.stats.totalAllocBytes), v.stats.mallocs, mib(v.stats.heapInUseAfter)) + } + t.Logf("---") + t.Logf("tomas vs main: total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", + mib(tomasR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes), + int64(tomasR.stats.mallocs)-int64(mainR.stats.mallocs), + mib(tomasR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter)) + t.Logf("branch vs main: total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", + mib(branchR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes), + int64(branchR.stats.mallocs)-int64(mainR.stats.mallocs), + mib(branchR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter)) + t.Logf("branch vs tomas:total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", + mib(branchR.stats.totalAllocBytes)-mib(tomasR.stats.totalAllocBytes), + int64(branchR.stats.mallocs)-int64(tomasR.stats.mallocs), + mib(branchR.stats.heapInUseAfter)-mib(tomasR.stats.heapInUseAfter)) + }) + } +} From d61ad6d3e8ead36c887686093c13f8e3e642c51c Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 19:34:41 -0700 Subject: [PATCH 5/7] perf(storage): share frame input buffer pool across uploads --- .../shared/pkg/storage/compress_upload.go | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/packages/shared/pkg/storage/compress_upload.go b/packages/shared/pkg/storage/compress_upload.go index 5e96293dd7..ab779df40e 100644 --- a/packages/shared/pkg/storage/compress_upload.go +++ b/packages/shared/pkg/storage/compress_upload.go @@ -62,6 +62,25 @@ func (m *memPartUploader) Assemble() []byte { return buf.Bytes() } +// inputBufPool is shared across all uploads so frame-sized buffers (almost +// always DefaultCompressFrameSize) are reused between streams instead of being +// reallocated per call. The size guard keeps it correct for any frame size. +var inputBufPool sync.Pool + +func getInputBuf(size int) *[]byte { + if v := inputBufPool.Get(); v != nil { + bufPtr := v.(*[]byte) + if cap(*bufPtr) >= size { + *bufPtr = (*bufPtr)[:size] + + return bufPtr + } + } + buf := make([]byte, size) + + return &buf +} + type frame struct { uncompressedSize int compressed []byte @@ -72,11 +91,10 @@ type part struct { frames []*frame compressedSize atomic.Int64 compress *errgroup.Group - inputPool *sync.Pool } -func newPart(index int, parentCtx context.Context, workers int, inputPool *sync.Pool) (*part, context.Context) { - p := &part{index: index, inputPool: inputPool} +func newPart(index int, parentCtx context.Context, workers int) (*part, context.Context) { + p := &part{index: index} var ctx context.Context p.compress, ctx = errgroup.WithContext(parentCtx) p.compress.SetLimit(workers) @@ -90,7 +108,7 @@ func (p *part) addFrame(ctx context.Context, bufPtr *[]byte, n int, pool *sync.P uncompressedData := (*bufPtr)[:n] p.compress.Go(func() error { - defer p.inputPool.Put(bufPtr) + defer inputBufPool.Put(bufPtr) if err := ctx.Err(); err != nil { return err } @@ -189,27 +207,20 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W frameSize := cfg.FrameSize() minPartSize := cfg.MinPartSize() workers := max(cfg.FrameEncodeWorkers, 1) - inputPool := &sync.Pool{ - New: func() any { - buf := make([]byte, frameSize) - - return &buf - }, - } - p, compressCtx := newPart(1, ctx, workers, inputPool) + p, compressCtx := newPart(1, ctx, workers) for { if err := ctx.Err(); err != nil { return err } - bufPtr := inputPool.Get().(*[]byte) - buf := (*bufPtr)[:frameSize] + bufPtr := getInputBuf(frameSize) + buf := *bufPtr n, err := io.ReadFull(in, buf) eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) if err != nil && !eof { - inputPool.Put(bufPtr) + inputBufPool.Put(bufPtr) return fmt.Errorf("read frame: %w", err) } @@ -218,7 +229,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W hasher.Write(buf[:n]) p.addFrame(compressCtx, bufPtr, n, compressors) } else { - inputPool.Put(bufPtr) + inputBufPool.Put(bufPtr) } if eof { @@ -240,7 +251,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W return ctx.Err() } - p, compressCtx = newPart(p.index+1, ctx, workers, inputPool) + p, compressCtx = newPart(p.index+1, ctx, workers) } } } From f2cf61349c9726819a5abbc9fc2df604a8c3bf71 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 19:35:29 -0700 Subject: [PATCH 6/7] test(storage): remove temporary pool lifecycle demo --- .../storage/compress_upload_pool_demo_test.go | 542 ------------------ 1 file changed, 542 deletions(-) delete mode 100644 packages/shared/pkg/storage/compress_upload_pool_demo_test.go diff --git a/packages/shared/pkg/storage/compress_upload_pool_demo_test.go b/packages/shared/pkg/storage/compress_upload_pool_demo_test.go deleted file mode 100644 index a0581cca8e..0000000000 --- a/packages/shared/pkg/storage/compress_upload_pool_demo_test.go +++ /dev/null @@ -1,542 +0,0 @@ -// Demo test: runs the production compressStream from this branch -// (per-goroutine buffer release) side-by-side with a verbatim mirror of -// the PR #2863 design (frame.input + releaseInputBuffers checkpoint). -// -// Both sides see the same input, same CompressConfig, same uploader, same -// real zstd compressor pool. We report runtime memstats deltas so the -// buffer-lifecycle difference shows up as a concrete byte count. -// -// Illustration file — delete before merging the PR. -package storage - -import ( - "bytes" - "context" - "crypto/sha256" - "errors" - "fmt" - "io" - "math/rand" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - "golang.org/x/sync/errgroup" -) - -// ============================================================================ -// Verbatim mirror of compress_upload.go from PR #2863 (commit ef2d92fa1). -// Renamed with `old` prefix to coexist with the production code in this branch. -// Only behavioral changes: none. Only syntactic: identifier renames. -// ============================================================================ - -type oldFrame struct { - uncompressedSize int - compressed []byte - input *[]byte -} - -type oldPart struct { - index int - frames []*oldFrame - compressedSize atomic.Int64 - compress *errgroup.Group - inputPool *sync.Pool -} - -func oldNewPart(index int, parentCtx context.Context, workers int, inputPool *sync.Pool) (*oldPart, context.Context) { - p := &oldPart{index: index, inputPool: inputPool} - var ctx context.Context - p.compress, ctx = errgroup.WithContext(parentCtx) - p.compress.SetLimit(workers) - - return p, ctx -} - -func (p *oldPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { - frameInPart := &oldFrame{uncompressedSize: len(uncompressedData)} - p.frames = append(p.frames, frameInPart) - - p.compress.Go(func() error { - if err := ctx.Err(); err != nil { - return err - } - c := pool.Get().(compressor) - out, err := c.compress(uncompressedData) - pool.Put(c) - if err != nil { - return err - } - frameInPart.compressed = out - p.compressedSize.Add(int64(len(out))) - - return nil - }) -} - -func (p *oldPart) releaseInputBuffers() { - for _, f := range p.frames { - if f.input != nil { - p.inputPool.Put(f.input) - f.input = nil - } - } -} - -func oldCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if err := uploader.Start(ctx); err != nil { - return nil, [32]byte{}, fmt.Errorf("start upload: %w", err) - } - defer uploader.Close() - - if maxUploadConcurrency < 1 { - maxUploadConcurrency = 1 - } - work, workCtx := errgroup.WithContext(ctx) - work.SetLimit(maxUploadConcurrency + 1) - - q := make(chan *oldPart, 2) - hasher := sha256.New() - work.Go(func() error { - defer close(q) - - return oldReadLoop(workCtx, in, cfg, hasher, q) - }) - - var frameSizes []FrameSize - var cOffset int64 - var loopErr error - for p := range q { - err := p.compress.Wait() - p.releaseInputBuffers() - if err != nil { - loopErr = fmt.Errorf("compress frames: %w", err) - cancel() - - break - } - - var compressed [][]byte - for _, f := range p.frames { - frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))}) - compressed = append(compressed, f.compressed) - if sink != nil { - sink(ctx, cOffset, f.compressed) - } - cOffset += int64(len(f.compressed)) - } - - pi := p.index - work.Go(func() error { - return uploader.UploadPart(workCtx, pi, compressed...) - }) - } - - for range q { //nolint:revive // intentional drain - } - workErr := work.Wait() - - if err := errors.Join(loopErr, workErr); err != nil { - return nil, [32]byte{}, err - } - - if err := uploader.Complete(ctx); err != nil { - return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err) - } - - ft := NewFrameTable(cfg.CompressionType(), frameSizes) - - return ft, sum256(hasher), nil -} - -func oldReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *oldPart) error { - compressors, err := newCompressorPool(cfg) - if err != nil { - return err - } - - frameSize := cfg.FrameSize() - minPartSize := cfg.MinPartSize() - workers := max(cfg.FrameEncodeWorkers, 1) - inputPool := &sync.Pool{ - New: func() any { - buf := make([]byte, frameSize) - - return &buf - }, - } - p, compressCtx := oldNewPart(1, ctx, workers, inputPool) - - for { - if err := ctx.Err(); err != nil { - return err - } - - bufPtr := inputPool.Get().(*[]byte) - buf := (*bufPtr)[:frameSize] - n, err := io.ReadFull(in, buf) - - eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) - if err != nil && !eof { - inputPool.Put(bufPtr) - - return fmt.Errorf("read frame: %w", err) - } - - if n > 0 { - hasher.Write(buf[:n]) - p.addFrame(compressCtx, buf[:n], compressors) - p.frames[len(p.frames)-1].input = bufPtr - } else { - inputPool.Put(bufPtr) - } - - if eof { - if len(p.frames) > 0 { - select { - case q <- p: - case <-ctx.Done(): - p.releaseInputBuffers() - - return ctx.Err() - } - } - - return nil - } - - if p.compressedSize.Load() >= minPartSize { - select { - case q <- p: - case <-ctx.Done(): - p.releaseInputBuffers() - - return ctx.Err() - } - p, compressCtx = oldNewPart(p.index+1, ctx, workers, inputPool) - } - } -} - -// ============================================================================ -// MAIN form — pre-PR #2863 design. No input pool at all: each frame allocates -// a fresh `make([]byte, frameSize)` per iteration and relies on GC. -// Mirrors compress_upload.go from immediately before commit 5b4ac5378. -// ============================================================================ - -type mainFrame struct { - uncompressedSize int - compressed []byte -} - -type mainPart struct { - index int - frames []*mainFrame - compressedSize atomic.Int64 - compress *errgroup.Group -} - -func mainNewPart(index int, parentCtx context.Context, workers int) (*mainPart, context.Context) { - p := &mainPart{index: index} - var ctx context.Context - p.compress, ctx = errgroup.WithContext(parentCtx) - p.compress.SetLimit(workers) - - return p, ctx -} - -func (p *mainPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) { - frameInPart := &mainFrame{uncompressedSize: len(uncompressedData)} - p.frames = append(p.frames, frameInPart) - p.compress.Go(func() error { - if err := ctx.Err(); err != nil { - return err - } - c := pool.Get().(compressor) - out, err := c.compress(uncompressedData) - pool.Put(c) - if err != nil { - return err - } - frameInPart.compressed = out - p.compressedSize.Add(int64(len(out))) - - return nil - }) -} - -func mainCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - if err := uploader.Start(ctx); err != nil { - return nil, [32]byte{}, fmt.Errorf("start upload: %w", err) - } - defer uploader.Close() - if maxUploadConcurrency < 1 { - maxUploadConcurrency = 1 - } - work, workCtx := errgroup.WithContext(ctx) - work.SetLimit(maxUploadConcurrency + 1) - q := make(chan *mainPart, 2) - hasher := sha256.New() - work.Go(func() error { - defer close(q) - - return mainReadLoop(workCtx, in, cfg, hasher, q) - }) - var frameSizes []FrameSize - var cOffset int64 - var loopErr error - for p := range q { - if err := p.compress.Wait(); err != nil { - loopErr = fmt.Errorf("compress frames: %w", err) - cancel() - - break - } - var compressed [][]byte - for _, f := range p.frames { - frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))}) - compressed = append(compressed, f.compressed) - if sink != nil { - sink(ctx, cOffset, f.compressed) - } - cOffset += int64(len(f.compressed)) - } - pi := p.index - work.Go(func() error { - return uploader.UploadPart(workCtx, pi, compressed...) - }) - } - for range q { //nolint:revive // intentional drain - } - workErr := work.Wait() - if err := errors.Join(loopErr, workErr); err != nil { - return nil, [32]byte{}, err - } - if err := uploader.Complete(ctx); err != nil { - return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err) - } - ft := NewFrameTable(cfg.CompressionType(), frameSizes) - - return ft, sum256(hasher), nil -} - -func mainReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *mainPart) error { - compressors, err := newCompressorPool(cfg) - if err != nil { - return err - } - frameSize := cfg.FrameSize() - minPartSize := cfg.MinPartSize() - workers := max(cfg.FrameEncodeWorkers, 1) - p, compressCtx := mainNewPart(1, ctx, workers) - for { - if err := ctx.Err(); err != nil { - return err - } - buf := make([]byte, frameSize) // fresh allocation every frame, no pool - n, err := io.ReadFull(in, buf) - eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) - if err != nil && !eof { - return fmt.Errorf("read frame: %w", err) - } - if n > 0 { - hasher.Write(buf[:n]) - p.addFrame(compressCtx, buf[:n], compressors) - } - if eof { - if len(p.frames) > 0 { - select { - case q <- p: - case <-ctx.Done(): - return ctx.Err() - } - } - - return nil - } - if p.compressedSize.Load() >= minPartSize { - select { - case q <- p: - case <-ctx.Done(): - return ctx.Err() - } - p, compressCtx = mainNewPart(p.index+1, ctx, workers) - } - } -} - -// ============================================================================ -// Shared harness. -// ============================================================================ - -// demoBuildInput produces deterministic, mildly-compressible data so zstd -// runs with realistic timing instead of degenerate fast-path on zeros. -func demoBuildInput(bytesTotal int) []byte { - out := make([]byte, bytesTotal) - r := rand.New(rand.NewSource(0xCAFEBABE)) - const blockSz = 4096 - for i := 0; i < bytesTotal; i += blockSz { - end := min(i+blockSz, bytesTotal) - // repeat a small random block several times to give zstd something to find - seed := make([]byte, 64) - r.Read(seed) - for j := i; j < end; j++ { - out[j] = seed[(j-i)%len(seed)] - } - } - - return out -} - -// slowUploader wraps memPartUploader and adds a fixed per-part upload delay -// to simulate GCS multipart upload latency. A 50 MiB part to GCS typically -// takes 300-800 ms in-region; we use 500 ms as a representative figure. -type slowUploader struct { - inner memPartUploader - partDelay time.Duration -} - -func (s *slowUploader) Start(ctx context.Context) error { return s.inner.Start(ctx) } -func (s *slowUploader) UploadPart(ctx context.Context, partIndex int, data ...[]byte) error { - select { - case <-time.After(s.partDelay): - case <-ctx.Done(): - return ctx.Err() - } - - return s.inner.UploadPart(ctx, partIndex, data...) -} -func (s *slowUploader) Complete(ctx context.Context) error { return s.inner.Complete(ctx) } -func (s *slowUploader) Close() error { return s.inner.Close() } -func (s *slowUploader) Assemble() []byte { return s.inner.Assemble() } - -func demoCfg() CompressConfig { - return CompressConfig{ - Enabled: true, - Type: "zstd", - Level: 1, // fastest - FrameSizeKB: 2048, - MinPartSizeMB: 50, - FrameEncodeWorkers: 4, - EncoderConcurrency: 0, - } -} - -type demoStats struct { - totalAllocBytes uint64 - mallocs uint64 - heapInUseAfter uint64 -} - -func demoMeasure(b func()) demoStats { - // Explicit GCs isolate this variant's allocation count from previous - // variants' residue. Two consecutive GCs let the runtime clear any - // pending finalizer-held memory. - runtime.GC() //nolint:revive // intentional for measurement isolation - runtime.GC() //nolint:revive // intentional for measurement isolation - var before, after runtime.MemStats - runtime.ReadMemStats(&before) - b() - runtime.ReadMemStats(&after) - - return demoStats{ - totalAllocBytes: after.TotalAlloc - before.TotalAlloc, - mallocs: after.Mallocs - before.Mallocs, - heapInUseAfter: after.HeapInuse, - } -} - -// TestPoolLifecycleDemo runs three designs side-by-side and reports the memory -// cost. Single run, real zstd compression, simulated GCS upload latency. -// -// - main: pre-PR-2863, no pool (fresh `make([]byte, frameSize)` per frame). -// - tomas: PR #2863, pool + frame.input + releaseInputBuffers checkpoint. -// - this branch: pool + per-goroutine defer release. -// -// Cannot run in parallel: variants share process-wide runtime.MemStats. -// -//nolint:paralleltest // measurement requires serial execution -func TestPoolLifecycleDemo(t *testing.T) { - cfg := demoCfg() - const partUploadDelay = 500 * time.Millisecond // ~realistic GCS multipart part latency - - for _, sz := range []int{256 << 20, 1 << 30} { - //nolint:paralleltest // measurement requires serial execution - t.Run(fmt.Sprintf("%dMiB", sz>>20), func(t *testing.T) { - t.Logf("input=%d MiB, frame=%d KiB, part=%d MiB, workers=%d, upload_delay=%v", - sz>>20, cfg.FrameSizeKB, cfg.MinPartSizeMB, cfg.FrameEncodeWorkers, partUploadDelay) - - input := demoBuildInput(sz) - - type variantResult struct { - name string - stats demoStats - ft *FrameTable - hash [32]byte - dst []byte - } - - runVariant := func(name string, fn func(io.Reader, partUploader) (*FrameTable, [32]byte, error)) variantResult { - u := &slowUploader{partDelay: partUploadDelay} - var ft *FrameTable - var hash [32]byte - st := demoMeasure(func() { - var err error - ft, hash, err = fn(bytes.NewReader(input), u) - if err != nil { - t.Fatalf("%s: %v", name, err) - } - }) - - return variantResult{name: name, stats: st, ft: ft, hash: hash, dst: u.Assemble()} - } - - mainR := runVariant("main (no pool) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { - return mainCompressStream(t.Context(), r, cfg, u, 4, nil) - }) - tomasR := runVariant("tomas (PR #2863, checkpoint) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { - return oldCompressStream(t.Context(), r, cfg, u, 4, nil) - }) - branchR := runVariant("this branch (per-goroutine) ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) { - return compressStream(t.Context(), r, cfg, u, 4, nil) - }) - - if mainR.hash != tomasR.hash || tomasR.hash != branchR.hash { - t.Errorf("hash mismatch: main=%x tomas=%x branch=%x", mainR.hash, tomasR.hash, branchR.hash) - } - if mainR.ft.NumFrames() != tomasR.ft.NumFrames() || tomasR.ft.NumFrames() != branchR.ft.NumFrames() { - t.Errorf("frame count mismatch: main=%d tomas=%d branch=%d", - mainR.ft.NumFrames(), tomasR.ft.NumFrames(), branchR.ft.NumFrames()) - } - if !bytes.Equal(mainR.dst, tomasR.dst) || !bytes.Equal(tomasR.dst, branchR.dst) { - t.Errorf("uploaded payload mismatch across variants") - } - - mib := func(b uint64) float64 { return float64(b) / (1 << 20) } - for _, v := range []variantResult{mainR, tomasR, branchR} { - t.Logf("%s: total_alloc=%7.1f MiB mallocs=%6d heap_inuse_after=%7.1f MiB", - v.name, mib(v.stats.totalAllocBytes), v.stats.mallocs, mib(v.stats.heapInUseAfter)) - } - t.Logf("---") - t.Logf("tomas vs main: total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", - mib(tomasR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes), - int64(tomasR.stats.mallocs)-int64(mainR.stats.mallocs), - mib(tomasR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter)) - t.Logf("branch vs main: total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", - mib(branchR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes), - int64(branchR.stats.mallocs)-int64(mainR.stats.mallocs), - mib(branchR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter)) - t.Logf("branch vs tomas:total_alloc %+7.1f MiB mallocs %+5d heap_inuse %+7.1f MiB", - mib(branchR.stats.totalAllocBytes)-mib(tomasR.stats.totalAllocBytes), - int64(branchR.stats.mallocs)-int64(tomasR.stats.mallocs), - mib(branchR.stats.heapInUseAfter)-mib(tomasR.stats.heapInUseAfter)) - }) - } -} From 13a90438eb51204eb835fc35ce064d20b98ff4c0 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:05:20 -0700 Subject: [PATCH 7/7] fix(storage): keep multipart part checksums Restore Content-MD5 on GCS multipart uploads while keeping the pooled multi-slice reader path. --- packages/shared/pkg/storage/gcp_multipart.go | 9 +++++++++ packages/shared/pkg/storage/gcp_multipart_test.go | 11 ++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/packages/shared/pkg/storage/gcp_multipart.go b/packages/shared/pkg/storage/gcp_multipart.go index c59fa8009a..d9945be96a 100644 --- a/packages/shared/pkg/storage/gcp_multipart.go +++ b/packages/shared/pkg/storage/gcp_multipart.go @@ -4,6 +4,8 @@ import ( "bytes" "cmp" "context" + "crypto/md5" + "encoding/base64" "encoding/xml" "fmt" "hash" @@ -262,6 +264,8 @@ func (m *MultipartUploader) uploadPart(ctx context.Context, uploadID string, par req.Header.Set("Authorization", "Bearer "+m.token) req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data))) + sum := md5.Sum(data) //nolint:gosec // GCS multipart uses Content-MD5 for transport integrity. + req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(sum[:])) resp, err := m.client.Do(req) if err != nil { @@ -343,6 +347,11 @@ func (m *MultipartUploader) uploadPartSlices(ctx context.Context, uploadID strin req.Header.Set("Authorization", "Bearer "+m.token) req.Header.Set("Content-Length", fmt.Sprintf("%d", totalLen)) + h := md5.New() //nolint:gosec // GCS multipart uses Content-MD5 for transport integrity. + for _, s := range slices { + _, _ = h.Write(s) + } + req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(h.Sum(nil))) resp, err := m.client.Do(req) if err != nil { diff --git a/packages/shared/pkg/storage/gcp_multipart_test.go b/packages/shared/pkg/storage/gcp_multipart_test.go index 86dadee6ad..6479b45a0b 100644 --- a/packages/shared/pkg/storage/gcp_multipart_test.go +++ b/packages/shared/pkg/storage/gcp_multipart_test.go @@ -1,7 +1,9 @@ package storage import ( + "crypto/md5" "crypto/sha256" + "encoding/base64" "encoding/xml" "fmt" "io" @@ -102,6 +104,8 @@ func TestMultipartUploader_UploadPart_Success(t *testing.T) { body, err := io.ReadAll(r.Body) assert.NoError(t, err) assert.Equal(t, testData, body) + sum := md5.Sum(testData) //nolint:gosec // verifying GCS Content-MD5 header. + assert.Equal(t, base64.StdEncoding.EncodeToString(sum[:]), r.Header.Get("Content-MD5")) w.Header().Set("ETag", expectedETag) w.WriteHeader(http.StatusOK) @@ -123,12 +127,13 @@ func TestMultipartUploader_UploadPartSlices_Success(t *testing.T) { assert.Equal(t, "PUT", r.Method) assert.Contains(t, r.URL.RawQuery, "partNumber=3") assert.Contains(t, r.URL.RawQuery, "uploadId=test-upload-id") - assert.Empty(t, r.Header.Get("Content-MD5")) - // Verify body is the concatenation of all slices. body, err := io.ReadAll(r.Body) assert.NoError(t, err) - assert.Equal(t, []byte("hello world!"), body) + expected := []byte("hello world!") + assert.Equal(t, expected, body) + sum := md5.Sum(expected) //nolint:gosec // verifying GCS Content-MD5 header. + assert.Equal(t, base64.StdEncoding.EncodeToString(sum[:]), r.Header.Get("Content-MD5")) w.Header().Set("ETag", expectedETag) w.WriteHeader(http.StatusOK)