diff --git a/Cargo.toml b/Cargo.toml index 8cb0ec8..5f64c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,9 +43,9 @@ tokio-seqpacket = "0.7" [dev-dependencies] env_logger = "0.11" tokio = { version = "1.37.0", features = [ - "io-util", - "macros", - "rt-multi-thread" + "io-util", + "macros", + "rt-multi-thread", ] } waitgroup = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 8c05072..43a5399 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ use std::{ use socket2::Socket; #[cfg(target_os = "linux")] #[cfg(feature = "tokio_ecdysis")] -pub use tokio_seqpacket::UnixSeqpacketListener; +pub use {crate::seqpacket::UnixSeqpacketListenerStream, tokio_seqpacket::UnixSeqpacketListener}; use executioner::{upgrade, UpgradeFinished}; use inheriter::{init_child, InheritError}; diff --git a/src/listener.rs b/src/listener.rs index 805f965..547d691 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,13 +1,13 @@ use std::{ io, net::{TcpListener, UdpSocket}, - os::{ - fd::{FromRawFd, RawFd}, - unix::{io::AsRawFd, net::UnixListener}, - }, + os::unix::{io::AsRawFd, net::UnixListener}, path::PathBuf, }; +#[cfg(feature = "systemd_sockets")] +use std::os::fd::{FromRawFd, RawFd}; + use crate::registry::{ListenerInfo, SockInfo}; pub trait Listener: AsRawFd { diff --git a/src/seqpacket.rs b/src/seqpacket.rs index c946f24..ad3ce48 100644 --- a/src/seqpacket.rs +++ b/src/seqpacket.rs @@ -4,7 +4,11 @@ use std::{ task::{Context, Poll}, }; +#[cfg(feature = "systemd_sockets")] +use std::os::fd::RawFd; + use futures::Stream; + use tokio_seqpacket::{UnixSeqpacket, UnixSeqpacketListener}; use crate::{ @@ -27,7 +31,7 @@ impl Listener for UnixSeqpacketListener { #[cfg(feature = "systemd_sockets")] impl TryFromRawFd for UnixSeqpacketListener { - unsafe fn try_from_raw_fd(fd: std::os::unix::prelude::RawFd) -> io::Result + unsafe fn try_from_raw_fd(fd: RawFd) -> io::Result where Self: Sized, { @@ -50,3 +54,18 @@ impl Stream for UnixSeqpacketListenerStream { self.get_mut().0.poll_accept(cx).map(Some) } } + +/// Set or unset nonblocking flag on socket file descriptor. +/// +/// UnixSeqpacketListener does not provide a set_nonblocking method, so we +/// implement one here. +#[cfg(feature = "systemd_sockets")] +pub fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + use nix::fcntl::{fcntl, FcntlArg, OFlag}; + + let mut oflags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?) + .ok_or(io::Error::other("fcntl F_GETFL returned invalid flags"))?; + oflags.set(OFlag::O_NONBLOCK, nonblocking); + fcntl(fd, FcntlArg::F_SETFL(oflags))?; + Ok(()) +} diff --git a/src/tokio_ecdysis/mod.rs b/src/tokio_ecdysis/mod.rs index fefd973..b2af842 100644 --- a/src/tokio_ecdysis/mod.rs +++ b/src/tokio_ecdysis/mod.rs @@ -612,9 +612,7 @@ impl TokioEcdysis { let listener: UnixSeqpacketListener = self .systemd_sock_of_proto(name, SockInfo::UnixSeqpacket(Some(path.as_ref().into()))) .await?; - // tokio-seqpacket sets O_NONBLOCK for us when we start accepting connections, - // so no need to set the nonblocking flag here. - // (see tokio_seqpacket::sys::accept) + crate::seqpacket::set_nonblocking(listener.as_raw_fd(), true)?; let stream = UnixSeqpacketListenerStream::new(listener); Ok(self .supervisor