Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/orchestrator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/e2b-dev/infra/packages/clickhouse v0.0.0
github.com/e2b-dev/infra/packages/shared v0.0.0
github.com/e2b-dev/ublk-go v0.1.3
github.com/edsrzf/mmap-go v1.2.1-0.20241212181136-fad1cd13edbd
github.com/firecracker-microvm/firecracker-go-sdk v1.0.0
github.com/getkin/kin-openapi v0.137.0
Expand Down
2 changes: 2 additions & 0 deletions packages/orchestrator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion packages/orchestrator/pkg/factories/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/cgroup"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/nbd"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/ublk"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/network"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/template"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/template/peerclient"
Expand Down Expand Up @@ -530,6 +531,13 @@ func run(config cfg.Config, opts Options) (success bool) {
})
closers = append(closers, closer{"device pool", devicePool.Close})

// ublk device pool
ublkPool, err := ublk.NewDevicePool(0)
if err != nil {
logger.L().Fatal(ctx, "failed to create ublk pool", zap.Error(err))
}
closers = append(closers, closer{"ublk pool", ublkPool.Shutdown})

// network pool
slotStorage, err := newStorage(ctx, nodeID, config.NetworkConfig, egressSetup.Proxy)
if err != nil {
Expand All @@ -544,7 +552,7 @@ func run(config cfg.Config, opts Options) (success bool) {
closers = append(closers, closer{"network pool", networkPool.Close})

// sandbox factory
sandboxFactory := sandbox.NewFactory(config.BuilderConfig, networkPool, devicePool, featureFlags, hostStatsDelivery, cgroupManager, egressSetup.Proxy, sandboxes)
sandboxFactory := sandbox.NewFactory(config.BuilderConfig, networkPool, devicePool, ublkPool, featureFlags, hostStatsDelivery, cgroupManager, egressSetup.Proxy, sandboxes)

// isolated filesystems cache (for nfs proxy)
builder := chrooted.NewBuilder(config)
Expand Down
180 changes: 180 additions & 0 deletions packages/orchestrator/pkg/sandbox/rootfs/ublk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package rootfs

import (
"context"
"errors"
"fmt"
"go.uber.org/zap"
"os"

"github.com/e2b-dev/ublk-go/ublk"
"golang.org/x/sys/unix"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block"
ublkpool "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/ublk"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

type UblkProvider struct {
ctx context.Context
cancel context.CancelFunc
overlay *block.Overlay
backend *ublkBackend
dev *ublk.Device
pool *ublkpool.DevicePool

ready *utils.SetOnce[string]
finishedOperations chan struct{}
blockSize int64
}

func NewUblkProvider(
ctx context.Context,
rootfs block.ReadonlyDevice,
cachePath string,
pool *ublkpool.DevicePool,
) (Provider, error) {
size, err := rootfs.Size(ctx)
if err != nil {
return nil, fmt.Errorf("error getting device size: %w", err)
}

blockSize := rootfs.BlockSize()

cache, err := block.NewCache(size, blockSize, cachePath, false)
if err != nil {
return nil, fmt.Errorf("error creating cache: %w", err)
}

overlay := block.NewOverlay(rootfs, cache)

// Use a background context so the ublk backend outlives the CreateSandbox
// request context. Only cancelled explicitly in Close().
runCtx, cancel := context.WithCancel(context.Background())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using context.Background() to create the background context discards all trace spans, logger metadata, and other context values from the request context. Utilizing context.WithoutCancel(ctx) instead preserves these valuable telemetry and metadata values for asynchronous block IO operations while still preventing premature cancellation.

Suggested change
runCtx, cancel := context.WithCancel(context.Background())
runCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))

return &UblkProvider{
ctx: runCtx,
cancel: cancel,
overlay: overlay,
backend: newUblkBackend(runCtx, overlay),
pool: pool,
ready: utils.NewSetOnce[string](),
finishedOperations: make(chan struct{}, 1),
blockSize: blockSize,
}, nil
}

func (u *UblkProvider) Start(ctx context.Context) error {
size, err := u.overlay.Size(ctx)
if err != nil {
return u.ready.SetError(err)
}

telemetry.ReportEvent(ctx, "creating ublk device")

dev, err := u.pool.New(ctx, u.backend, uint64(size))
if err != nil {
return u.ready.SetError(fmt.Errorf("ublk.New: %w", err))
}
u.dev = dev
telemetry.ReportEvent(ctx, "ublk device created")
return u.ready.SetValue(dev.Path())
}

func (u *UblkProvider) Path() (string, error) { return u.ready.Wait() }

func (u *UblkProvider) Close(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "ublk-close")
defer span.End()

var errs []error

err := u.sync(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("ublk flush: %w", err))
}

if u.dev != nil {
err = u.pool.Close(ctx, u.dev)
if err != nil {
errs = append(errs, fmt.Errorf("ublk close: %w", err))
}
}
u.cancel()

u.finishedOperations <- struct{}{}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Sending to u.finishedOperations directly can cause a deadlock if Close is called more than once, as the channel has a buffer capacity of 1 and will block on subsequent sends. Using a non-blocking select statement ensures that the send operation never blocks.

	select {
	case u.finishedOperations <- struct{}{}:
	default:
	}


err = u.overlay.Close()
if err != nil {
errs = append(errs, fmt.Errorf("overlay close: %w", err))
}
logger.L().Info(ctx, "ublk overlay device released")
return errors.Join(errs...)
}

func (u *UblkProvider) ExportDiff(
ctx context.Context, out *os.File,
closeSandbox func(context.Context) error,
) (*header.DiffMetadata, error) {
ctx, span := tracer.Start(ctx, "ublk-export")
defer span.End()

cache, err := u.overlay.EjectCache()
if err != nil {
return nil, fmt.Errorf("eject cache: %w", err)
}

go func() {
err := closeSandbox(ctx)
if err != nil {
logger.L().Error(ctx, "stop sandbox on cow export", zap.Error(err))
}
}()

select {
case <-u.finishedOperations:
case <-ctx.Done():
return nil, fmt.Errorf("timeout waiting for ublk device released")
Comment on lines +138 to +139
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Close ejected cache before returning on export timeout

When the export context is canceled while waiting for the sandbox to stop, the cache has already been ejected from the overlay, so overlay.Close() will no longer close or unmap it; unlike the NBD export path, this early return leaks the mmap/cache file for every timed-out ublk diff export, so close cache before returning on this path.

Useful? React with 👍 / 👎.

}
telemetry.ReportEvent(ctx, "sandbox stopped")

m, err := cache.ExportToDiff(ctx, out)
if err != nil {
return nil, fmt.Errorf("error exporting cache: %w", err)
}
telemetry.ReportEvent(ctx, "cache exported")

err = cache.Close()
if err != nil {
return nil, fmt.Errorf("error closing cache: %w", err)
}
return m, nil
}

func (u *UblkProvider) sync(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "ublk-sync")
defer span.End()

path, err := u.Path()
if err != nil {
return fmt.Errorf("failed to get cow path: %w", err)
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("open %s: %w", path, err)
}
defer func() {
err := file.Close()
if err != nil {
logger.L().Error(ctx, "failed to close ublk file", zap.Error(err))
}
}()

if err := unix.IoctlSetInt(int(file.Fd()), unix.BLKFLSBUF, 0); err != nil {
return fmt.Errorf("ioctl BLKFLSBUF: %w", err)
}
return flush(ctx, path)
}
Loading