diff --git a/src/devices/src/virtio/net/backend.rs b/src/devices/src/virtio/net/backend.rs index b73b910b9..dab01c943 100644 --- a/src/devices/src/virtio/net/backend.rs +++ b/src/devices/src/virtio/net/backend.rs @@ -43,4 +43,12 @@ pub trait NetBackend { fn has_unfinished_write(&self) -> bool; fn try_finish_write(&mut self, hdr_len: usize, buf: &[u8]) -> Result<(), WriteError>; fn raw_socket_fd(&self) -> RawFd; + + /// Delay in microseconds before retrying after NothingWritten. + /// Returns 0 if no delay-based retry is needed (e.g. on Linux where + /// EAGAIN + EPOLLET handles retries via writable events). + #[allow(dead_code)] + fn write_retry_delay_us(&self) -> u64 { + 0 + } } diff --git a/src/devices/src/virtio/net/unixgram.rs b/src/devices/src/virtio/net/unixgram.rs index 1ceb0722e..600b6b41b 100644 --- a/src/devices/src/virtio/net/unixgram.rs +++ b/src/devices/src/virtio/net/unixgram.rs @@ -28,6 +28,7 @@ const SOCKET_RCVBUF: usize = DEFAULT_SOCKET_BUF_SIZE; pub struct Unixgram { fd: OwnedFd, + retries: u64, } impl Unixgram { @@ -61,7 +62,7 @@ impl Unixgram { }; } - Self { fd } + Self { fd, retries: 0 } } /// Create the backend opening a connection to the userspace network proxy. @@ -128,13 +129,27 @@ impl NetBackend for Unixgram { /// Try to write a frame to the proxy. fn write_frame(&mut self, hdr_len: usize, buf: &mut [u8]) -> Result<(), WriteError> { - let ret = send(self.fd.as_raw_fd(), &buf[hdr_len..], MsgFlags::empty()) - .map_err(WriteError::Internal)?; - debug!( - "Written frame size={}, written={}", - buf.len() - hdr_len, - ret - ); + let ret = match send(self.fd.as_raw_fd(), &buf[hdr_len..], MsgFlags::empty()) { + Ok(ret) => ret, + // macOS returns ENOBUFS when the kernel socket buffer is full, + // rather than blocking or returning EAGAIN on non-blocking sockets. + Err(nix::Error::ENOBUFS) => { + if self.retries == 0 { + info!("write_frame: ENOBUFS"); + } + self.retries += 1; + return Err(WriteError::NothingWritten); + } + Err(e) => return Err(WriteError::Internal(e)), + }; + if self.retries > 0 { + info!( + "write_frame: ENOBUFS resolved after {} retries", + self.retries + ); + self.retries = 0; + } + debug!("Written eth frame to proxy: {ret} bytes"); Ok(()) } @@ -150,4 +165,9 @@ impl NetBackend for Unixgram { fn raw_socket_fd(&self) -> RawFd { self.fd.as_raw_fd() } + + #[cfg(target_os = "macos")] + fn write_retry_delay_us(&self) -> u64 { + 50 + } } diff --git a/src/devices/src/virtio/net/worker.rs b/src/devices/src/virtio/net/worker.rs index 05341df3c..133e23b46 100644 --- a/src/devices/src/virtio/net/worker.rs +++ b/src/devices/src/virtio/net/worker.rs @@ -10,6 +10,8 @@ use super::backend::{NetBackend, ReadError, WriteError}; use super::device::{FrontendError, RxError, TxError, VirtioNetBackend}; use super::VNET_HDR_LEN; +#[cfg(target_os = "macos")] +use std::os::fd::RawFd; use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::thread; use std::{cmp, result}; @@ -31,6 +33,7 @@ pub struct NetWorker { tx_iovec: Vec<(GuestAddress, usize)>, tx_frame_buf: [u8; MAX_BUFFER_SIZE], tx_frame_len: usize, + tx_has_deferred_frame: bool, } impl NetWorker { @@ -82,6 +85,7 @@ impl NetWorker { tx_frame_buf: [0u8; MAX_BUFFER_SIZE], tx_frame_len: 0, tx_iovec: Vec::with_capacity(QUEUE_SIZE as usize), + tx_has_deferred_frame: false, }) } @@ -93,6 +97,9 @@ impl NetWorker { } fn work(mut self) { + #[cfg(target_os = "macos")] + const TX_TIMER_FD: RawFd = -2; + let virtq_rx_ev_fd = self.rx_q.event.as_raw_fd(); let virtq_tx_ev_fd = self.tx_q.event.as_raw_fd(); let backend_socket = self.backend.raw_socket_fd(); @@ -148,6 +155,10 @@ impl NetWorker { } } } + #[cfg(target_os = "macos")] + _ if event_set.is_empty() && source == TX_TIMER_FD => { + self.process_tx_loop(); + } _ => { log::warn!( "Received unknown event: {event_set:?} from fd: {source:?}" @@ -155,6 +166,16 @@ impl NetWorker { } } } + + // Arm the retry timer after processing all events, so it + // reflects the final state of tx_has_deferred_frame. + #[cfg(target_os = "macos")] + if self.tx_has_deferred_frame { + let delay = self.backend.write_retry_delay_us(); + if delay > 0 { + epoll.add_oneshot_timer(delay, TX_TIMER_FD as u64); + } + } } Err(e) => { debug!("vsock: failed to consume muxer epoll event: {e}"); @@ -259,11 +280,17 @@ impl NetWorker { loop { self.tx_q.queue.disable_notification(&self.mem).unwrap(); - if let Err(e) = self.process_tx() { - log::error!("Failed to process rx: {e:?} (triggered by backend socket readable)"); + self.tx_has_deferred_frame = match self.process_tx() { + Err(TxError::Backend(WriteError::NothingWritten)) => true, + Err(e) => { + log::error!("Failed to process tx: {e:?}"); + false + } + _ => false, }; - if !self.tx_q.queue.enable_notification(&self.mem).unwrap() { + let has_new_entries = self.tx_q.queue.enable_notification(&self.mem).unwrap(); + if self.tx_has_deferred_frame || !has_new_entries { break; } } @@ -283,6 +310,7 @@ impl NetWorker { } let mut raise_irq = false; + let mut result = Ok(()); while let Some(head) = tx_queue.pop(&self.mem) { let head_index = head.index; @@ -332,6 +360,7 @@ impl NetWorker { } Err(WriteError::NothingWritten) => { tx_queue.undo_pop(); + result = Err(TxError::Backend(WriteError::NothingWritten)); break; } Err(WriteError::PartialWrite) => { @@ -365,7 +394,7 @@ impl NetWorker { .map_err(TxError::DeviceError)?; } - Ok(()) + result } // Copies a single frame from `self.rx_frame_buf` into the guest. diff --git a/src/utils/src/macos/epoll.rs b/src/utils/src/macos/epoll.rs index af3f86a57..2e15c5bae 100644 --- a/src/utils/src/macos/epoll.rs +++ b/src/utils/src/macos/epoll.rs @@ -13,11 +13,23 @@ use bitflags::bitflags; use log::debug; fn event_name(filter: i16, flags: u16) -> &'static str { - match (filter, flags & libc::EV_EOF != 0) { - (libc::EVFILT_READ, false) => "READ", - (libc::EVFILT_READ, true) => "READ+EOF", - (libc::EVFILT_WRITE, false) => "WRITE", - (libc::EVFILT_WRITE, true) => "WRITE+EOF", + let eof = flags & libc::EV_EOF != 0; + match filter { + libc::EVFILT_READ => { + if eof { + "READ+EOF" + } else { + "READ" + } + } + libc::EVFILT_WRITE => { + if eof { + "WRITE+EOF" + } else { + "WRITE" + } + } + libc::EVFILT_TIMER => "TIMER", _ => "UNKNOWN", } } @@ -295,6 +307,9 @@ impl Epoll { if kevs[i].0.flags & libc::EV_EOF != 0 { events[i].events |= EventSet::HANG_UP.bits(); } + } else if kevs[i].0.filter == libc::EVFILT_TIMER { + // No epoll equivalent; caller identifies timer by udata. + events[i].events = EventSet::empty().bits(); } events[i].u64 = kevs[i].udata(); @@ -308,6 +323,22 @@ impl Epoll { Ok(nevents) } + + /// Register a one-shot timer that fires after `delay_us` microseconds. + /// The resulting event will have `data` set to `udata`. + pub fn add_oneshot_timer(&self, delay_us: u64, udata: u64) { + let kev = libc::kevent { + ident: 0, + filter: libc::EVFILT_TIMER, + flags: libc::EV_ADD | libc::EV_ONESHOT, + fflags: libc::NOTE_USECONDS, + data: delay_us as isize, + udata: udata as *mut libc::c_void, + }; + unsafe { + libc::kevent(self.queue, &kev, 1, ptr::null_mut(), 0, ptr::null()); + } + } } impl AsRawFd for Epoll {