Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/devices/src/virtio/net/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ pub trait NetBackend {
fn has_unfinished_write(&self) -> bool;
fn try_finish_write(&mut self, hdr_len: usize, buf: &[u8]) -> Result<(), WriteError>;
fn raw_socket_fd(&self) -> RawFd;

/// Delay in microseconds before retrying after NothingWritten.
/// Returns 0 if no delay-based retry is needed (e.g. on Linux where
/// EAGAIN + EPOLLET handles retries via writable events).
#[allow(dead_code)]
fn write_retry_delay_us(&self) -> u64 {
0
}
}
36 changes: 28 additions & 8 deletions src/devices/src/virtio/net/unixgram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const SOCKET_RCVBUF: usize = DEFAULT_SOCKET_BUF_SIZE;

pub struct Unixgram {
fd: OwnedFd,
retries: u64,
}

impl Unixgram {
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Unixgram {
};
}

Self { fd }
Self { fd, retries: 0 }
}

/// Create the backend opening a connection to the userspace network proxy.
Expand Down Expand Up @@ -128,13 +129,27 @@ impl NetBackend for Unixgram {

/// Try to write a frame to the proxy.
fn write_frame(&mut self, hdr_len: usize, buf: &mut [u8]) -> Result<(), WriteError> {
let ret = send(self.fd.as_raw_fd(), &buf[hdr_len..], MsgFlags::empty())
.map_err(WriteError::Internal)?;
debug!(
"Written frame size={}, written={}",
buf.len() - hdr_len,
ret
);
let ret = match send(self.fd.as_raw_fd(), &buf[hdr_len..], MsgFlags::empty()) {
Ok(ret) => ret,
// macOS returns ENOBUFS when the kernel socket buffer is full,
// rather than blocking or returning EAGAIN on non-blocking sockets.
Err(nix::Error::ENOBUFS) => {
if self.retries == 0 {
info!("write_frame: ENOBUFS");
}
self.retries += 1;
return Err(WriteError::NothingWritten);
}
Err(e) => return Err(WriteError::Internal(e)),
};
if self.retries > 0 {
info!(
"write_frame: ENOBUFS resolved after {} retries",
self.retries
);
self.retries = 0;
}
debug!("Written eth frame to proxy: {ret} bytes");
Ok(())
}

Expand All @@ -150,4 +165,9 @@ impl NetBackend for Unixgram {
fn raw_socket_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}

#[cfg(target_os = "macos")]
fn write_retry_delay_us(&self) -> u64 {
50
}
}
37 changes: 33 additions & 4 deletions src/devices/src/virtio/net/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use super::backend::{NetBackend, ReadError, WriteError};
use super::device::{FrontendError, RxError, TxError, VirtioNetBackend};
use super::VNET_HDR_LEN;

#[cfg(target_os = "macos")]
use std::os::fd::RawFd;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::thread;
use std::{cmp, result};
Expand All @@ -31,6 +33,7 @@ pub struct NetWorker {
tx_iovec: Vec<(GuestAddress, usize)>,
tx_frame_buf: [u8; MAX_BUFFER_SIZE],
tx_frame_len: usize,
tx_has_deferred_frame: bool,
}

impl NetWorker {
Expand Down Expand Up @@ -82,6 +85,7 @@ impl NetWorker {
tx_frame_buf: [0u8; MAX_BUFFER_SIZE],
tx_frame_len: 0,
tx_iovec: Vec::with_capacity(QUEUE_SIZE as usize),
tx_has_deferred_frame: false,
})
}

Expand All @@ -93,6 +97,9 @@ impl NetWorker {
}

fn work(mut self) {
#[cfg(target_os = "macos")]
const TX_TIMER_FD: RawFd = -2;

let virtq_rx_ev_fd = self.rx_q.event.as_raw_fd();
let virtq_tx_ev_fd = self.tx_q.event.as_raw_fd();
let backend_socket = self.backend.raw_socket_fd();
Expand Down Expand Up @@ -148,13 +155,27 @@ impl NetWorker {
}
}
}
#[cfg(target_os = "macos")]
_ if event_set.is_empty() && source == TX_TIMER_FD => {
self.process_tx_loop();
}
_ => {
log::warn!(
"Received unknown event: {event_set:?} from fd: {source:?}"
);
}
}
}

// Arm the retry timer after processing all events, so it
// reflects the final state of tx_has_deferred_frame.
#[cfg(target_os = "macos")]
if self.tx_has_deferred_frame {
let delay = self.backend.write_retry_delay_us();
if delay > 0 {
epoll.add_oneshot_timer(delay, TX_TIMER_FD as u64);
}
}
}
Err(e) => {
debug!("vsock: failed to consume muxer epoll event: {e}");
Expand Down Expand Up @@ -259,11 +280,17 @@ impl NetWorker {
loop {
self.tx_q.queue.disable_notification(&self.mem).unwrap();

if let Err(e) = self.process_tx() {
log::error!("Failed to process rx: {e:?} (triggered by backend socket readable)");
self.tx_has_deferred_frame = match self.process_tx() {
Err(TxError::Backend(WriteError::NothingWritten)) => true,
Err(e) => {
log::error!("Failed to process tx: {e:?}");
false
}
_ => false,
};

if !self.tx_q.queue.enable_notification(&self.mem).unwrap() {
let has_new_entries = self.tx_q.queue.enable_notification(&self.mem).unwrap();
if self.tx_has_deferred_frame || !has_new_entries {
break;
}
}
Expand All @@ -283,6 +310,7 @@ impl NetWorker {
}

let mut raise_irq = false;
let mut result = Ok(());

while let Some(head) = tx_queue.pop(&self.mem) {
let head_index = head.index;
Expand Down Expand Up @@ -332,6 +360,7 @@ impl NetWorker {
}
Err(WriteError::NothingWritten) => {
tx_queue.undo_pop();
result = Err(TxError::Backend(WriteError::NothingWritten));
break;
}
Err(WriteError::PartialWrite) => {
Expand Down Expand Up @@ -365,7 +394,7 @@ impl NetWorker {
.map_err(TxError::DeviceError)?;
}

Ok(())
result
}

// Copies a single frame from `self.rx_frame_buf` into the guest.
Expand Down
41 changes: 36 additions & 5 deletions src/utils/src/macos/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@ use bitflags::bitflags;
use log::debug;

fn event_name(filter: i16, flags: u16) -> &'static str {
match (filter, flags & libc::EV_EOF != 0) {
(libc::EVFILT_READ, false) => "READ",
(libc::EVFILT_READ, true) => "READ+EOF",
(libc::EVFILT_WRITE, false) => "WRITE",
(libc::EVFILT_WRITE, true) => "WRITE+EOF",
let eof = flags & libc::EV_EOF != 0;
match filter {
libc::EVFILT_READ => {
if eof {
"READ+EOF"
} else {
"READ"
}
}
libc::EVFILT_WRITE => {
if eof {
"WRITE+EOF"
} else {
"WRITE"
}
}
libc::EVFILT_TIMER => "TIMER",
_ => "UNKNOWN",
}
}
Expand Down Expand Up @@ -295,6 +307,9 @@ impl Epoll {
if kevs[i].0.flags & libc::EV_EOF != 0 {
events[i].events |= EventSet::HANG_UP.bits();
}
} else if kevs[i].0.filter == libc::EVFILT_TIMER {
// No epoll equivalent; caller identifies timer by udata.
events[i].events = EventSet::empty().bits();
}
events[i].u64 = kevs[i].udata();

Expand All @@ -308,6 +323,22 @@ impl Epoll {

Ok(nevents)
}

/// Register a one-shot timer that fires after `delay_us` microseconds.
/// The resulting event will have `data` set to `udata`.
pub fn add_oneshot_timer(&self, delay_us: u64, udata: u64) {
let kev = libc::kevent {
ident: 0,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ONESHOT,
fflags: libc::NOTE_USECONDS,
data: delay_us as isize,
udata: udata as *mut libc::c_void,
};
unsafe {
libc::kevent(self.queue, &kev, 1, ptr::null_mut(), 0, ptr::null());
}
}
}

impl AsRawFd for Epoll {
Expand Down
Loading