feat(orchestrator): add ublk block device provider as alternative to NBD#2834
feat(orchestrator): add ublk block device provider as alternative to NBD#2834harmlessii wants to merge 1 commit into
Conversation
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhaojianyu.harmless.
|
There was a problem hiding this comment.
Code Review
In packages/orchestrator/pkg/sandbox/rootfs/ublk_backend.go, the ReadAt and WriteAt methods violate the io.ReaderAt and io.WriterAt contracts when an error occurs because they return the raw number of bytes read or written on the underlying device, which can exceed the slice length len(p). Additionally, the aligned read case unnecessarily allocates a temporary buffer and copies data instead of reading directly into p.
In packages/orchestrator/pkg/sandbox/rootfs/ublk.go, using context.Background() to create the background context discards all trace spans, logger metadata, and other context values from the request context. Utilizing context.WithoutCancel(ctx) instead preserves these valuable telemetry and metadata values for asynchronous block IO operations while still preventing premature cancellation.
In packages/orchestrator/pkg/sandbox/rootfs/ublk.go, sending to u.finishedOperations directly can cause a deadlock if Close is called more than once, as the channel has a buffer capacity of 1 and will block on subsequent sends. Using a non-blocking select statement ensures that the send operation never blocks.
In packages/orchestrator/pkg/sandbox/ublk/pool.go, calling d.Close() directly on the devices in Shutdown without removing them from p.devices introduces a race condition if another goroutine concurrently calls p.Close(ctx, dev). To ensure thread safety and consistent state, the shutdown loop should call p.Close(ctx, d) instead of closing the device directly.
| func (b *ublkBackend) ReadAt(p []byte, off int64) (int, error) { | ||
| if len(p) == 0 { | ||
| return 0, nil | ||
| } | ||
|
|
||
| if b.isAligned(off, len(p)) { | ||
| tmp := make([]byte, len(p)) | ||
| n, err := b.dev.ReadAt(b.ctx, tmp, off) | ||
| if n > 0 { | ||
| copied := n | ||
| if copied > len(p) { | ||
| copied = len(p) | ||
| } | ||
| copy(p, tmp[:copied]) | ||
| } | ||
|
|
||
| if err != nil { | ||
| if n > len(p) { | ||
| n = len(p) | ||
| } | ||
|
|
||
| return n, err | ||
| } | ||
|
|
||
| return len(p), nil | ||
| } | ||
|
|
||
| alignedOff, alignedLen := b.alignedRange(off, len(p)) | ||
| tmp := make([]byte, alignedLen) | ||
|
|
||
| n, err := b.dev.ReadAt(b.ctx, tmp, alignedOff) | ||
| if err != nil { | ||
| return n, err | ||
| } | ||
|
|
||
| start := int(off - alignedOff) | ||
| copy(p, tmp[start:start+len(p)]) | ||
|
|
||
| return len(p), nil | ||
| } |
There was a problem hiding this comment.
In the unaligned read case, if b.dev.ReadAt returns an error, the method returns n, err where n is the number of bytes read into the temporary aligned buffer tmp. This violates the io.ReaderAt contract because n can be larger than len(p), and since no data is copied to p, the caller is incorrectly informed that n bytes were read into p. Additionally, the aligned read case unnecessarily allocates a temporary buffer and copies data instead of reading directly into p.
func (b *ublkBackend) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}
if b.isAligned(off, len(p)) {
return b.dev.ReadAt(b.ctx, p, off)
}
alignedOff, alignedLen := b.alignedRange(off, len(p))
tmp := make([]byte, alignedLen)
n, err := b.dev.ReadAt(b.ctx, tmp, alignedOff)
if n > 0 {
start := int(off - alignedOff)
if n > start {
copied := copy(p, tmp[start:min(n, start+len(p))])
if err != nil {
return copied, err
}
return copied, nil
}
}
if err != nil {
return 0, err
}
return 0, nil
}| func (b *ublkBackend) WriteAt(p []byte, off int64) (int, error) { | ||
| if len(p) == 0 { | ||
| return 0, nil | ||
| } | ||
|
|
||
| alignedOff, alignedLen := b.alignedRange(off, len(p)) | ||
| unlock := b.lockRange(alignedOff, alignedLen) | ||
| defer unlock() | ||
|
|
||
| if b.isAligned(off, len(p)) { | ||
| return b.dev.WriteAt(p, off) | ||
| } | ||
|
|
||
| tmp := make([]byte, alignedLen) | ||
| requestEnd := off + int64(len(p)) | ||
|
|
||
| for blockOff := alignedOff; blockOff < alignedOff+int64(alignedLen); blockOff += b.blockSize { | ||
| blockStart := blockOff | ||
| blockEnd := blockOff + b.blockSize | ||
| tmpStart := int(blockStart - alignedOff) | ||
| tmpEnd := tmpStart + int(b.blockSize) | ||
| blockBuf := tmp[tmpStart:tmpEnd] | ||
|
|
||
| writeStart := max(blockStart, off) | ||
| writeEnd := min(blockEnd, requestEnd) | ||
|
|
||
| if writeStart > blockStart || writeEnd < blockEnd { | ||
| n, err := b.dev.ReadAt(b.ctx, blockBuf, blockStart) | ||
| if err != nil { | ||
| return n, err | ||
| } | ||
| } | ||
|
|
||
| copyStart := int(writeStart - off) | ||
| copyEnd := int(writeEnd - off) | ||
| blockCopyStart := int(writeStart - blockStart) | ||
| copy(blockBuf[blockCopyStart:blockCopyStart+(copyEnd-copyStart)], p[copyStart:copyEnd]) | ||
| } | ||
|
|
||
| n, err := b.dev.WriteAt(tmp, alignedOff) | ||
| if err != nil { | ||
| return n, err | ||
| } | ||
|
|
||
| if n != alignedLen { | ||
| return n, fmt.Errorf("short aligned write: wrote %d want %d", n, alignedLen) | ||
| } | ||
|
|
||
| return len(p), nil | ||
| } |
There was a problem hiding this comment.
If b.dev.ReadAt or b.dev.WriteAt fails during the write operation, the method returns n, err where n is the number of bytes read or written on the underlying device. This violates the io.WriterAt contract because n can be larger than len(p), and it does not accurately represent the number of bytes written from p. If an error occurs, the method should return 0, err or the actual number of bytes of p successfully written.
func (b *ublkBackend) WriteAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}
alignedOff, alignedLen := b.alignedRange(off, len(p))
unlock := b.lockRange(alignedOff, alignedLen)
defer unlock()
if b.isAligned(off, len(p)) {
return b.dev.WriteAt(p, off)
}
tmp := make([]byte, alignedLen)
requestEnd := off + int64(len(p))
for blockOff := alignedOff; blockOff < alignedOff+int64(alignedLen); blockOff += b.blockSize {
blockStart := blockOff
blockEnd := blockOff + b.blockSize
tmpStart := int(blockStart - alignedOff)
tmpEnd := tmpStart + int(b.blockSize)
blockBuf := tmp[tmpStart:tmpEnd]
writeStart := max(blockStart, off)
writeEnd := min(blockEnd, requestEnd)
if writeStart > blockStart || writeEnd < blockEnd {
_, err := b.dev.ReadAt(b.ctx, blockBuf, blockStart)
if err != nil {
return 0, err
}
}
copyStart := int(writeStart - off)
copyEnd := int(writeEnd - off)
blockCopyStart := int(writeStart - blockStart)
copy(blockBuf[blockCopyStart:blockCopyStart+(copyEnd-copyStart)], p[copyStart:copyEnd])
}
n, err := b.dev.WriteAt(tmp, alignedOff)
if err != nil {
return 0, err
}
if n != alignedLen {
return 0, fmt.Errorf("short aligned write: wrote %d want %d", n, alignedLen)
}
return len(p), nil
}|
|
||
| // Use a background context so the ublk backend outlives the CreateSandbox | ||
| // request context. Only cancelled explicitly in Close(). | ||
| runCtx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Using context.Background() to create the background context discards all trace spans, logger metadata, and other context values from the request context. Utilizing context.WithoutCancel(ctx) instead preserves these valuable telemetry and metadata values for asynchronous block IO operations while still preventing premature cancellation.
| runCtx, cancel := context.WithCancel(context.Background()) | |
| runCtx, cancel := context.WithCancel(context.WithoutCancel(ctx)) |
| } | ||
| u.cancel() | ||
|
|
||
| u.finishedOperations <- struct{}{} |
There was a problem hiding this comment.
Sending to u.finishedOperations directly can cause a deadlock if Close is called more than once, as the channel has a buffer capacity of 1 and will block on subsequent sends. Using a non-blocking select statement ensures that the send operation never blocks.
select {
case u.finishedOperations <- struct{}{}:
default:
}| func (p *DevicePool) Shutdown(ctx context.Context) error { | ||
| close(p.closed) | ||
| p.mu.Lock() | ||
| devs := make([]*ublk.Device, 0, len(p.devices)) | ||
| for d := range p.devices { | ||
| devs = append(devs, d) | ||
| } | ||
| p.mu.Unlock() | ||
|
|
||
| if len(devs) == 0 { | ||
| return nil | ||
| } | ||
| logger.L().Info(ctx, "shutting down ublk pool", zap.Int("remaining", len(devs))) | ||
|
|
||
| var wg sync.WaitGroup | ||
| for _, d := range devs { | ||
| wg.Add(1) | ||
| go func(d *ublk.Device) { | ||
| defer wg.Done() | ||
| if err := d.Close(); err != nil { | ||
| logger.L().Error(ctx, "ublk shutdown: device close error", | ||
| zap.String("path", d.Path()), | ||
| zap.Error(err), | ||
| ) | ||
| } | ||
| }(d) | ||
| } | ||
| wg.Wait() | ||
| logger.L().Info(ctx, "ublk pool shutdown complete") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
In Shutdown, calling d.Close() directly on the devices without removing them from p.devices introduces a race condition if another goroutine concurrently calls p.Close(ctx, dev). To ensure thread safety and consistent state, the shutdown loop should call p.Close(ctx, d) instead of closing the device directly.
func (p *DevicePool) Shutdown(ctx context.Context) error {
close(p.closed)
p.mu.Lock()
devs := make([]*ublk.Device, 0, len(p.devices))
for d := range p.devices {
devs = append(devs, d)
}
p.mu.Unlock()
if len(devs) == 0 {
return nil
}
logger.L().Info(ctx, "shutting down ublk pool", zap.Int("remaining", len(devs)))
var wg sync.WaitGroup
for _, d := range devs {
wg.Add(1)
go func(d *ublk.Device) {
defer wg.Done()
if err := p.Close(ctx, d); err != nil {
logger.L().Error(ctx, "ublk shutdown: device close error",
zap.String("path", d.Path()),
zap.Error(err),
)
}
}(d)
}
wg.Wait()
logger.L().Info(ctx, "ublk pool shutdown complete")
return nil
}|
We require contributors to sign our Contributor License Agreement, and we don't have @harmlessii on file. You can sign our CLA at https://e2b.dev/docs/cla . Once you've signed, post a comment here that says '@cla-bot check' |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f0a72d4be3
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| config cfg.BuilderConfig, | ||
| networkPool *network.Pool, | ||
| devicePool *nbd.DevicePool, | ||
| ublkPool *ublk.DevicePool, |
There was a problem hiding this comment.
Update remaining NewFactory callers
Adding the required ublkPool parameter here leaves several Linux call sites on the old signature; rg "NewFactory\(" packages/orchestrator shows unchanged calls in benchmarks/benchmark_test.go, benchmarks/concurrent_benchmark_test.go, cmd/create-build/main.go, cmd/resume-build/main.go, and cmd/smoketest/smoke_test.go, so those packages no longer compile until they pass a ublk pool or the constructor remains compatible.
Useful? React with 👍 / 👎.
| case <-ctx.Done(): | ||
| return nil, fmt.Errorf("timeout waiting for ublk device released") |
There was a problem hiding this comment.
Close ejected cache before returning on export timeout
When the export context is canceled while waiting for the sandbox to stop, the cache has already been ejected from the overlay, so overlay.Close() will no longer close or unmap it; unlike the NBD export path, this early return leaks the mmap/cache file for every timed-out ublk diff export, so close cache before returning on this path.
Useful? React with 👍 / 👎.
Summary
Add ublk-based block device provider as an alternative to NBD for sandbox rootfs.
Motivation
Changes
rootfs/ublk.go— UblkProvider implementing the Provider interfacerootfs/ublk_backend.go— Minimal adapter binding context to ublk-go callbacksrootfs/ublk_test.go— Integration tests matching NBD's path_direct_test.go coverageublk/pool.go— Device pool with OTel metrics and structured loggingPrerequisites
ublk_drvmodule loaded (modprobe ublk_drv)Testing
All 6 integration tests pass (requires root + ublk_drv):