Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pingora-core/src/listeners/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ impl ListenerEndpointBuilder {
let mut table = fds_table.lock().await;

if let Some(fd) = table.get(addr_str) {
from_raw_fd(&listen_addr, *fd)?
let fd = *fd;
// Mark as claimed so close_unused_inherited() does not close this
// actively-used socket on a subsequent graceful upgrade.
table.mark_used(addr_str);
from_raw_fd(&listen_addr, fd)?
} else {
// not found
let listener = bind(&listen_addr).await?;
Expand Down Expand Up @@ -554,9 +558,18 @@ impl ListenerEndpointBuilder {
// old-style "addr" entry. Without the removal, serialize() would send
// old_fd twice (as "addr" and "addr#0"), causing a fd leak in the next
// process that receives it but never uses the "addr" copy.
// add() moves thread_key out of inherited_pending (no-op here since
// thread_key was never in inherited_pending — it's a new key).
// remove(addr_str) clears the original inherited_pending entry.
let mut table = fds_table.lock().await;
table.add(thread_key, fd);
table.remove(addr_str);
} else {
// FD is reused directly under its existing per-thread key.
// add() is not called, so mark_used() prevents close_unused_inherited()
// from treating this actively-used FD as stale.
let mut table = fds_table.lock().await;
table.mark_used(&thread_key);
}
from_raw_fd(&listen_addr, fd)?
}
Expand Down
49 changes: 47 additions & 2 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ impl Server {
.ok();

if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
let mut fds = fds.lock().await;
// Close inherited FDs that no listener claimed (ports removed from config).
// This frees ports for other processes and prevents stale FDs from being
// forwarded to the next upgrade generation.
fds.close_unused_inherited();
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Expand Down Expand Up @@ -553,7 +557,48 @@ impl Server {
.grace_period_seconds
.unwrap_or(EXIT_TIMEOUT);
info!("Graceful shutdown: grace period {}s starts", exit_timeout);
thread::sleep(Duration::from_secs(exit_timeout));

// Re-register signal handlers on the still-running server_runtime.
// main_loop's select! consumed the originals; re-registering here allows
// a second SIGTERM/SIGINT/SIGQUIT to interrupt a long grace period early.
let fast_exit = Arc::new(std::sync::atomic::AtomicBool::new(false));
#[cfg(unix)]
{
let fast_exit = Arc::clone(&fast_exit);
server_runtime.get_handle().spawn(async move {
let sigterm = unix::signal(unix::SignalKind::terminate());
let sigint = unix::signal(unix::SignalKind::interrupt());
let sigquit = unix::signal(unix::SignalKind::quit());
match (sigterm, sigint, sigquit) {
(Ok(mut sigterm), Ok(mut sigint), Ok(mut sigquit)) => {
tokio::select! {
_ = sigterm.recv() => {
info!("SIGTERM received during grace period, exiting early");
}
_ = sigint.recv() => {
info!("SIGINT received during grace period, exiting early");
}
_ = sigquit.recv() => {
info!("SIGQUIT received during grace period, exiting early");
}
}
fast_exit.store(true, std::sync::atomic::Ordering::Release);
}
_ => {
error!(
"Failed to register signal handlers; \
grace period will not be interruptible by signals"
);
}
}
});
}
for _ in 0..exit_timeout {
if fast_exit.load(std::sync::atomic::Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(1));
}
info!("Graceful shutdown: grace period ends");
}

Expand Down
199 changes: 192 additions & 7 deletions pingora-core/src/server/transfer_fd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,80 @@ use std::{thread, time};

/// Container for open file descriptors and their associated bind addresses.
pub struct Fds {
/// Actively used file descriptors, keyed by bind address.
/// Populated by `add()` and by listener setup via `mark_used()`.
map: HashMap<String, RawFd>,
/// Inherited FDs not yet claimed by a listener.
/// Populated exclusively by `deserialize()`. When a listener claims an inherited
/// FD via `add()` or `mark_used()`, the entry is moved from here to `map`.
/// Any entries remaining at `close_unused_inherited()` time are stale (the
/// port was removed from the current config) and will be closed.
inherited_pending: HashMap<String, RawFd>,
}

impl Fds {
pub fn new() -> Self {
Fds {
map: HashMap::new(),
inherited_pending: HashMap::new(),
}
}

pub fn add(&mut self, bind: String, fd: RawFd) {
// If this key arrived via deserialize(), move it out of inherited_pending
// so close_unused_inherited() doesn't treat it as stale.
self.inherited_pending.remove(&bind);
self.map.insert(bind, fd);
}

pub fn get(&self, bind: &str) -> Option<&RawFd> {
self.map.get(bind)
// Claimed FDs live in map; inherited-but-not-yet-claimed ones live in
// inherited_pending. Check map first so post-claim lookups are fast.
self.map
.get(bind)
.or_else(|| self.inherited_pending.get(bind))
}

pub fn remove(&mut self, bind: &str) {
self.inherited_pending.remove(bind);
self.map.remove(bind);
}

/// Mark a key as actively used.
///
/// When an inherited FD is reused directly under its existing key (without going
/// through `add()`), call this to move the entry from `inherited_pending` to `map`
/// so that `close_unused_inherited()` does not treat it as stale.
pub fn mark_used(&mut self, key: &str) {
if let Some(fd) = self.inherited_pending.remove(key) {
self.map.insert(key.to_string(), fd);
}
}

pub fn serialize(&self) -> (Vec<String>, Vec<RawFd>) {
// `inherited_pending` must be empty here — call `close_unused_inherited()` before
// `send_to_sock()` to ensure stale inherited FDs are not forwarded.
self.map.iter().map(|(key, val)| (key.clone(), val)).unzip()
}

pub fn deserialize(&mut self, binds: Vec<String>, fds: Vec<RawFd>) {
assert_eq!(binds.len(), fds.len());
for (bind, fd) in binds.into_iter().zip(fds) {
self.map.insert(bind, fd);
// Inherited FDs live in inherited_pending until a listener claims them.
// They are NOT inserted into map until claimed via add() or mark_used().
self.inherited_pending.insert(bind, fd);
}
}

/// Close and remove all inherited FDs that were never claimed by a listener.
///
/// Call this before `send_to_sock()` during a graceful upgrade so that stale FDs
/// (ports removed from the current config) are neither kept open nor forwarded to
/// the next process generation.
pub fn close_unused_inherited(&mut self) {
for (key, fd) in self.inherited_pending.drain() {
log::info!("Closing unclaimed inherited fd {} for {}", fd, key);
let _ = nix::unistd::close(fd);
}
}

Expand All @@ -82,6 +126,14 @@ impl Fds {
}
}

impl Drop for Fds {
fn drop(&mut self) {
// Safety net: close any unclaimed inherited FDs remaining in inherited_pending.
// FDs in map are owned by their respective listeners and must NOT be closed here.
self.close_unused_inherited();
}
}

fn serialize_vec_string(vec_string: &[String]) -> Vec<u8> {
// Space-separated serialization. Uses dynamic allocation to avoid silent truncation.
vec_string.join(" ").into_bytes()
Expand Down Expand Up @@ -619,18 +671,22 @@ mod tests {
#[test]
fn test_table_serde() {
init_log();
let mut fds = Fds::new();
let fd1 = make_pipe_fd();
let fd2 = make_pipe_fd();
let key1 = "1.1.1.1:80".to_string();
fds.add(key1.clone(), 128);
let key2 = "1.1.1.1:443".to_string();
fds.add(key2.clone(), 129);
let mut fds = Fds::new();
fds.add(key1.clone(), fd1);
fds.add(key2.clone(), fd2);

let (k, v) = fds.serialize();
let mut fds2 = Fds::new();
fds2.deserialize(k, v);

assert_eq!(128, *fds2.get(&key1).unwrap());
assert_eq!(129, *fds2.get(&key2).unwrap());
assert_eq!(fd1, *fds2.get(&key1).unwrap());
assert_eq!(fd2, *fds2.get(&key2).unwrap());
// fds2 drops here: close_unused_inherited() closes fd1 and fd2 (inherited, unclaimed).
// fds drops here: its map entries are NOT closed (add()-owned fds, correct).
}

#[test]
Expand Down Expand Up @@ -779,4 +835,133 @@ mod tests {
elapsed
);
}

// ── inherited_pending / close_unused_inherited tests ───────────────────
//
// These tests use real pipe fds (via nix::unistd::pipe()) as the "inherited" fds
// so that close_unused_inherited() operates on fds we actually own, not arbitrary
// integers that might alias open fds belonging to other test threads.
// Each test only checks map state — not fd liveness — to avoid races between
// close() and the OS reusing the fd number.

fn make_pipe_fd() -> RawFd {
let (read_end, write_end) = nix::unistd::pipe().expect("pipe()");
// Close the read end immediately; we only need a single real fd for tests.
let _ = nix::unistd::close(read_end);
write_end
}

#[test]
fn test_close_unused_inherited_removes_unclaimed() {
init_log();
let fd_stale = make_pipe_fd();
let fd_active = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(
vec!["0.0.0.0:8080".to_string(), "0.0.0.0:9090".to_string()],
vec![fd_stale, fd_active],
);
// Claim one via add() so it is moved from inherited_pending to map.
fds.add("0.0.0.0:9090".to_string(), fd_active);

fds.close_unused_inherited(); // closes fd_stale, keeps fd_active

assert!(
fds.get("0.0.0.0:8080").is_none(),
"unclaimed key should not be accessible after close_unused_inherited()"
);
assert_eq!(
*fds.get("0.0.0.0:9090").unwrap(),
fd_active,
"claimed key should remain in map"
);
// fd_active is still open — close it to avoid a resource leak.
let _ = nix::unistd::close(fd_active);
}

#[test]
fn test_serialize_after_cleanup_excludes_stale_fds() {
init_log();
let fd_stale = make_pipe_fd();
let fd_active = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(
vec!["0.0.0.0:1111".to_string(), "0.0.0.0:2222".to_string()],
vec![fd_stale, fd_active],
);
fds.add("0.0.0.0:2222".to_string(), fd_active);

fds.close_unused_inherited(); // closes fd_stale

let (keys, raw_fds) = fds.serialize();
assert_eq!(keys.len(), 1, "only the active FD should remain");
assert_eq!(keys[0], "0.0.0.0:2222");
assert_eq!(raw_fds[0], fd_active);
let _ = nix::unistd::close(fd_active);
}

#[test]
fn test_mark_used_prevents_removal() {
init_log();
let fd = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(vec!["0.0.0.0:3333".to_string()], vec![fd]);

fds.mark_used("0.0.0.0:3333");
fds.close_unused_inherited(); // should NOT close fd (was marked used)

assert_eq!(
*fds.get("0.0.0.0:3333").unwrap(),
fd,
"mark_used() key should remain in map after close_unused_inherited()"
);
let _ = nix::unistd::close(fd);
}

#[test]
fn test_remove_clears_inherited_pending() {
init_log();
let fd = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(vec!["0.0.0.0:4444".to_string()], vec![fd]);

fds.remove("0.0.0.0:4444");
// Should be a no-op — no panics, no double-close.
fds.close_unused_inherited();

assert!(fds.get("0.0.0.0:4444").is_none());
// In production, remove() is called to clean up old-style addr keys after the
// fd was already re-registered under a new per-thread key via add().
// Here we close the fd directly to avoid a resource leak in the test.
let _ = nix::unistd::close(fd);
}

#[test]
fn test_add_removes_from_inherited_pending() {
init_log();
let fd = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(vec!["0.0.0.0:5555".to_string()], vec![fd]);

fds.add("0.0.0.0:5555".to_string(), fd);
fds.close_unused_inherited(); // should NOT close fd

assert_eq!(
*fds.get("0.0.0.0:5555").unwrap(),
fd,
"add()-claimed key should not be removed from map"
);
let _ = nix::unistd::close(fd);
}

#[test]
fn test_close_unused_inherited_idempotent() {
init_log();
let fd = make_pipe_fd();
let mut fds = Fds::new();
fds.deserialize(vec!["0.0.0.0:6666".to_string()], vec![fd]);
fds.close_unused_inherited(); // closes fd
fds.close_unused_inherited(); // second call must be a no-op, no EBADF abort
assert!(fds.get("0.0.0.0:6666").is_none());
}
}
Loading
Loading