Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions packages/shared/pkg/storage/compress_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (m *memPartUploader) Assemble() []byte {
type frame struct {
uncompressedSize int
compressed []byte
input *[]byte
}

type part struct {
Expand All @@ -85,11 +84,13 @@ func newPart(index int, parentCtx context.Context, workers int, inputPool *sync.
return p, ctx
}

func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) {
frameInPart := &frame{uncompressedSize: len(uncompressedData)}
func (p *part) addFrame(ctx context.Context, bufPtr *[]byte, n int, pool *sync.Pool) {
frameInPart := &frame{uncompressedSize: n}
p.frames = append(p.frames, frameInPart)
uncompressedData := (*bufPtr)[:n]

p.compress.Go(func() error {
defer p.inputPool.Put(bufPtr)
if err := ctx.Err(); err != nil {
return err
}
Expand All @@ -106,15 +107,6 @@ func (p *part) addFrame(ctx context.Context, uncompressedData []byte, pool *sync
})
}

func (p *part) releaseInputBuffers() {
for _, f := range p.frames {
if f.input != nil {
p.inputPool.Put(f.input)
f.input = nil
}
}
}

func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -146,9 +138,7 @@ func compressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploa
var cOffset int64
var loopErr error
for p := range q {
err := p.compress.Wait()
p.releaseInputBuffers()
if err != nil {
if err := p.compress.Wait(); err != nil {
loopErr = fmt.Errorf("compress frames: %w", err)
cancel()

Expand Down Expand Up @@ -226,8 +216,7 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W

if n > 0 {
hasher.Write(buf[:n])
p.addFrame(compressCtx, buf[:n], compressors)
p.frames[len(p.frames)-1].input = bufPtr
p.addFrame(compressCtx, bufPtr, n, compressors)
} else {
inputPool.Put(bufPtr)
}
Expand All @@ -237,8 +226,6 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W
select {
case q <- p:
case <-ctx.Done():
p.releaseInputBuffers()

return ctx.Err()
}
}
Expand All @@ -250,8 +237,6 @@ func readLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.W
select {
case q <- p:
case <-ctx.Done():
p.releaseInputBuffers()

return ctx.Err()
}

Expand Down
Loading
Loading