Skip to content

feat(orchestrator): add ublk block device provider as alternative to NBD#2834

Open
harmlessii wants to merge 1 commit into
e2b-dev:mainfrom
harmlessii:feat/ublk
Open

feat(orchestrator): add ublk block device provider as alternative to NBD#2834
harmlessii wants to merge 1 commit into
e2b-dev:mainfrom
harmlessii:feat/ublk

Conversation

@harmlessii
Copy link
Copy Markdown

Summary

Add ublk-based block device provider as an alternative to NBD for sandbox rootfs.

Motivation

  • On high-core-count machines (hundreds of cores), NBD device connection takes hundreds of milliseconds. In contrast, ublk device creation takes only ~5ms on the same machine.
  • ublk eliminates the user↔kernel context switches that NBD requires per IO — NBD routes every IO through a userspace daemon via sockets (2 context switches + 2 copies per IO), while ublk uses zero-copy io_uring queues.
  • No need to pre-allocate a static device pool (kernel assigns device IDs dynamically).

Changes

  • rootfs/ublk.go — UblkProvider implementing the Provider interface
  • rootfs/ublk_backend.go — Minimal adapter binding context to ublk-go callbacks
  • rootfs/ublk_test.go — Integration tests matching NBD's path_direct_test.go coverage
  • ublk/pool.go — Device pool with OTel metrics and structured logging

Prerequisites

  • Linux kernel 6.0+ with ublk_drv module loaded (modprobe ublk_drv)
  • Dependency: github.com/e2b-dev/ublk-go

Testing

All 6 integration tests pass (requires root + ublk_drv):

  • Write, WriteAtOffset, Direct4MB, Direct32MB, LargeWrite (1GB dd), LargeRead (1GB dd)

@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 28, 2026

Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhaojianyu.harmless.
This is most likely caused by a git client misconfiguration; please make sure to:

  1. check if your git client is configured with an email to sign commits git config --list | grep email
  2. If not, set it up using git config --global user.email email@example.com
  3. Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

In packages/orchestrator/pkg/sandbox/rootfs/ublk_backend.go, the ReadAt and WriteAt methods violate the io.ReaderAt and io.WriterAt contracts when an error occurs because they return the raw number of bytes read or written on the underlying device, which can exceed the slice length len(p). Additionally, the aligned read case unnecessarily allocates a temporary buffer and copies data instead of reading directly into p.

In packages/orchestrator/pkg/sandbox/rootfs/ublk.go, using context.Background() to create the background context discards all trace spans, logger metadata, and other context values from the request context. Utilizing context.WithoutCancel(ctx) instead preserves these valuable telemetry and metadata values for asynchronous block IO operations while still preventing premature cancellation.

In packages/orchestrator/pkg/sandbox/rootfs/ublk.go, sending to u.finishedOperations directly can cause a deadlock if Close is called more than once, as the channel has a buffer capacity of 1 and will block on subsequent sends. Using a non-blocking select statement ensures that the send operation never blocks.

In packages/orchestrator/pkg/sandbox/ublk/pool.go, calling d.Close() directly on the devices in Shutdown without removing them from p.devices introduces a race condition if another goroutine concurrently calls p.Close(ctx, dev). To ensure thread safety and consistent state, the shutdown loop should call p.Close(ctx, d) instead of closing the device directly.

Comment on lines +39 to +78
func (b *ublkBackend) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}

if b.isAligned(off, len(p)) {
tmp := make([]byte, len(p))
n, err := b.dev.ReadAt(b.ctx, tmp, off)
if n > 0 {
copied := n
if copied > len(p) {
copied = len(p)
}
copy(p, tmp[:copied])
}

if err != nil {
if n > len(p) {
n = len(p)
}

return n, err
}

return len(p), nil
}

alignedOff, alignedLen := b.alignedRange(off, len(p))
tmp := make([]byte, alignedLen)

n, err := b.dev.ReadAt(b.ctx, tmp, alignedOff)
if err != nil {
return n, err
}

start := int(off - alignedOff)
copy(p, tmp[start:start+len(p)])

return len(p), nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

In the unaligned read case, if b.dev.ReadAt returns an error, the method returns n, err where n is the number of bytes read into the temporary aligned buffer tmp. This violates the io.ReaderAt contract because n can be larger than len(p), and since no data is copied to p, the caller is incorrectly informed that n bytes were read into p. Additionally, the aligned read case unnecessarily allocates a temporary buffer and copies data instead of reading directly into p.

func (b *ublkBackend) ReadAt(p []byte, off int64) (int, error) {
	if len(p) == 0 {
		return 0, nil
	}

	if b.isAligned(off, len(p)) {
		return b.dev.ReadAt(b.ctx, p, off)
	}

	alignedOff, alignedLen := b.alignedRange(off, len(p))
	tmp := make([]byte, alignedLen)

	n, err := b.dev.ReadAt(b.ctx, tmp, alignedOff)
	if n > 0 {
		start := int(off - alignedOff)
		if n > start {
			copied := copy(p, tmp[start:min(n, start+len(p))])
			if err != nil {
				return copied, err
			}
			return copied, nil
		}
	}
	if err != nil {
		return 0, err
	}
	return 0, nil
}

Comment on lines +80 to +129
func (b *ublkBackend) WriteAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}

alignedOff, alignedLen := b.alignedRange(off, len(p))
unlock := b.lockRange(alignedOff, alignedLen)
defer unlock()

if b.isAligned(off, len(p)) {
return b.dev.WriteAt(p, off)
}

tmp := make([]byte, alignedLen)
requestEnd := off + int64(len(p))

for blockOff := alignedOff; blockOff < alignedOff+int64(alignedLen); blockOff += b.blockSize {
blockStart := blockOff
blockEnd := blockOff + b.blockSize
tmpStart := int(blockStart - alignedOff)
tmpEnd := tmpStart + int(b.blockSize)
blockBuf := tmp[tmpStart:tmpEnd]

writeStart := max(blockStart, off)
writeEnd := min(blockEnd, requestEnd)

if writeStart > blockStart || writeEnd < blockEnd {
n, err := b.dev.ReadAt(b.ctx, blockBuf, blockStart)
if err != nil {
return n, err
}
}

copyStart := int(writeStart - off)
copyEnd := int(writeEnd - off)
blockCopyStart := int(writeStart - blockStart)
copy(blockBuf[blockCopyStart:blockCopyStart+(copyEnd-copyStart)], p[copyStart:copyEnd])
}

n, err := b.dev.WriteAt(tmp, alignedOff)
if err != nil {
return n, err
}

if n != alignedLen {
return n, fmt.Errorf("short aligned write: wrote %d want %d", n, alignedLen)
}

return len(p), nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

If b.dev.ReadAt or b.dev.WriteAt fails during the write operation, the method returns n, err where n is the number of bytes read or written on the underlying device. This violates the io.WriterAt contract because n can be larger than len(p), and it does not accurately represent the number of bytes written from p. If an error occurs, the method should return 0, err or the actual number of bytes of p successfully written.

func (b *ublkBackend) WriteAt(p []byte, off int64) (int, error) {
	if len(p) == 0 {
		return 0, nil
	}

	alignedOff, alignedLen := b.alignedRange(off, len(p))
	unlock := b.lockRange(alignedOff, alignedLen)
	defer unlock()

	if b.isAligned(off, len(p)) {
		return b.dev.WriteAt(p, off)
	}

	tmp := make([]byte, alignedLen)
	requestEnd := off + int64(len(p))

	for blockOff := alignedOff; blockOff < alignedOff+int64(alignedLen); blockOff += b.blockSize {
		blockStart := blockOff
		blockEnd := blockOff + b.blockSize
		tmpStart := int(blockStart - alignedOff)
		tmpEnd := tmpStart + int(b.blockSize)
		blockBuf := tmp[tmpStart:tmpEnd]

		writeStart := max(blockStart, off)
		writeEnd := min(blockEnd, requestEnd)

		if writeStart > blockStart || writeEnd < blockEnd {
			_, err := b.dev.ReadAt(b.ctx, blockBuf, blockStart)
			if err != nil {
				return 0, err
			}
		}

		copyStart := int(writeStart - off)
		copyEnd := int(writeEnd - off)
		blockCopyStart := int(writeStart - blockStart)
		copy(blockBuf[blockCopyStart:blockCopyStart+(copyEnd-copyStart)], p[copyStart:copyEnd])
	}

	n, err := b.dev.WriteAt(tmp, alignedOff)
	if err != nil {
		return 0, err
	}

	if n != alignedLen {
		return 0, fmt.Errorf("short aligned write: wrote %d want %d", n, alignedLen)
	}

	return len(p), nil
}


// Use a background context so the ublk backend outlives the CreateSandbox
// request context. Only cancelled explicitly in Close().
runCtx, cancel := context.WithCancel(context.Background())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using context.Background() to create the background context discards all trace spans, logger metadata, and other context values from the request context. Utilizing context.WithoutCancel(ctx) instead preserves these valuable telemetry and metadata values for asynchronous block IO operations while still preventing premature cancellation.

Suggested change
runCtx, cancel := context.WithCancel(context.Background())
runCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))

}
u.cancel()

u.finishedOperations <- struct{}{}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Sending to u.finishedOperations directly can cause a deadlock if Close is called more than once, as the channel has a buffer capacity of 1 and will block on subsequent sends. Using a non-blocking select statement ensures that the send operation never blocks.

	select {
	case u.finishedOperations <- struct{}{}:
	default:
	}

Comment on lines +121 to +151
func (p *DevicePool) Shutdown(ctx context.Context) error {
close(p.closed)
p.mu.Lock()
devs := make([]*ublk.Device, 0, len(p.devices))
for d := range p.devices {
devs = append(devs, d)
}
p.mu.Unlock()

if len(devs) == 0 {
return nil
}
logger.L().Info(ctx, "shutting down ublk pool", zap.Int("remaining", len(devs)))

var wg sync.WaitGroup
for _, d := range devs {
wg.Add(1)
go func(d *ublk.Device) {
defer wg.Done()
if err := d.Close(); err != nil {
logger.L().Error(ctx, "ublk shutdown: device close error",
zap.String("path", d.Path()),
zap.Error(err),
)
}
}(d)
}
wg.Wait()
logger.L().Info(ctx, "ublk pool shutdown complete")
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Shutdown, calling d.Close() directly on the devices without removing them from p.devices introduces a race condition if another goroutine concurrently calls p.Close(ctx, dev). To ensure thread safety and consistent state, the shutdown loop should call p.Close(ctx, d) instead of closing the device directly.

func (p *DevicePool) Shutdown(ctx context.Context) error {
	close(p.closed)
	p.mu.Lock()
	devs := make([]*ublk.Device, 0, len(p.devices))
	for d := range p.devices {
		devs = append(devs, d)
	}
	p.mu.Unlock()

	if len(devs) == 0 {
		return nil
	}
	logger.L().Info(ctx, "shutting down ublk pool", zap.Int("remaining", len(devs)))

	var wg sync.WaitGroup
	for _, d := range devs {
		wg.Add(1)
		go func(d *ublk.Device) {
			defer wg.Done()
			if err := p.Close(ctx, d); err != nil {
				logger.L().Error(ctx, "ublk shutdown: device close error",
					zap.String("path", d.Path()),
					zap.Error(err),
				)
			}
		}(d)
	}
	wg.Wait()
	logger.L().Info(ctx, "ublk pool shutdown complete")
	return nil
}

@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 30, 2026

We require contributors to sign our Contributor License Agreement, and we don't have @harmlessii on file. You can sign our CLA at https://e2b.dev/docs/cla . Once you've signed, post a comment here that says '@cla-bot check'

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f0a72d4be3

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

config cfg.BuilderConfig,
networkPool *network.Pool,
devicePool *nbd.DevicePool,
ublkPool *ublk.DevicePool,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Update remaining NewFactory callers

Adding the required ublkPool parameter here leaves several Linux call sites on the old signature; rg "NewFactory\(" packages/orchestrator shows unchanged calls in benchmarks/benchmark_test.go, benchmarks/concurrent_benchmark_test.go, cmd/create-build/main.go, cmd/resume-build/main.go, and cmd/smoketest/smoke_test.go, so those packages no longer compile until they pass a ublk pool or the constructor remains compatible.

Useful? React with 👍 / 👎.

Comment on lines +138 to +139
case <-ctx.Done():
return nil, fmt.Errorf("timeout waiting for ublk device released")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Close ejected cache before returning on export timeout

When the export context is canceled while waiting for the sandbox to stop, the cache has already been ejected from the overlay, so overlay.Close() will no longer close or unmap it; unlike the NBD export path, this early return leaks the mmap/cache file for every timed-out ublk diff export, so close cache before returning on this path.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant