From 5bd7c2da8346b400d6c4cfee268884f62bd42b41 Mon Sep 17 00:00:00 2001 From: Gabriel Oprisan Date: Fri, 3 Apr 2026 19:31:28 +0300 Subject: [PATCH] fix(graceful-upgrade): fix uninterruptible grace period and stale FD accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue 1 — Uninterruptible grace period after SIGTERM/SIGQUIT: After main_loop consumed signal handlers via tokio::select!, a bare thread::sleep(grace_period) left the process unresponsive to further signals for up to 20 minutes. Replace with a 1-second polling loop that checks an AtomicBool set by a signal-watching task spawned on the still-running server_runtime. A second SIGTERM/SIGINT/SIGQUIT during the grace period now causes an early exit. Issue 2 — Stale inherited FDs accumulate across upgrade generations: serialize() dumped the entire Fds map, including ports removed from the current config. These zombie FDs were forwarded to every subsequent process generation, holding ports open indefinitely. Fix with a two-map design in Fds: inherited FDs from deserialize() live exclusively in inherited_pending until a listener claims them via add() or mark_used(). close_unused_inherited() drains inherited_pending, closing stale FDs before send_to_sock() on SIGQUIT. Drop provides a safety net. Also fix the non-NoSteal listen() path in l4.rs which never called mark_used(), causing active listener sockets to be closed by close_unused_inherited() on third-generation upgrades. Add integration tests: - server_phase_gracefulupgrade: SIGQUIT → full phase progression - server_phase_early_exit: second SIGTERM interrupts grace period --- pingora-core/src/listeners/l4.rs | 15 +- pingora-core/src/server/mod.rs | 49 ++++- pingora-core/src/server/transfer_fd/mod.rs | 199 +++++++++++++++++- pingora-core/tests/server_phase_early_exit.rs | 104 +++++++++ .../tests/server_phase_gracefulupgrade.rs | 90 ++++++++ 5 files changed, 447 insertions(+), 10 deletions(-) create mode 100644 pingora-core/tests/server_phase_early_exit.rs create mode 100644 pingora-core/tests/server_phase_gracefulupgrade.rs diff --git a/pingora-core/src/listeners/l4.rs b/pingora-core/src/listeners/l4.rs index 990914d2..e7b1e7d4 100644 --- a/pingora-core/src/listeners/l4.rs +++ b/pingora-core/src/listeners/l4.rs @@ -415,7 +415,11 @@ impl ListenerEndpointBuilder { let mut table = fds_table.lock().await; if let Some(fd) = table.get(addr_str) { - from_raw_fd(&listen_addr, *fd)? + let fd = *fd; + // Mark as claimed so close_unused_inherited() does not close this + // actively-used socket on a subsequent graceful upgrade. + table.mark_used(addr_str); + from_raw_fd(&listen_addr, fd)? } else { // not found let listener = bind(&listen_addr).await?; @@ -554,9 +558,18 @@ impl ListenerEndpointBuilder { // old-style "addr" entry. Without the removal, serialize() would send // old_fd twice (as "addr" and "addr#0"), causing a fd leak in the next // process that receives it but never uses the "addr" copy. + // add() moves thread_key out of inherited_pending (no-op here since + // thread_key was never in inherited_pending — it's a new key). + // remove(addr_str) clears the original inherited_pending entry. let mut table = fds_table.lock().await; table.add(thread_key, fd); table.remove(addr_str); + } else { + // FD is reused directly under its existing per-thread key. + // add() is not called, so mark_used() prevents close_unused_inherited() + // from treating this actively-used FD as stale. + let mut table = fds_table.lock().await; + table.mark_used(&thread_key); } from_raw_fd(&listen_addr, fd)? } diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index ef4515c6..f0ab6a1d 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -259,7 +259,11 @@ impl Server { .ok(); if let Some(fds) = &self.listen_fds { - let fds = fds.lock().await; + let mut fds = fds.lock().await; + // Close inherited FDs that no listener claimed (ports removed from config). + // This frees ports for other processes and prevents stale FDs from being + // forwarded to the next upgrade generation. + fds.close_unused_inherited(); info!("Trying to send socks"); // XXX: this is blocking IO match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) { @@ -553,7 +557,48 @@ impl Server { .grace_period_seconds .unwrap_or(EXIT_TIMEOUT); info!("Graceful shutdown: grace period {}s starts", exit_timeout); - thread::sleep(Duration::from_secs(exit_timeout)); + + // Re-register signal handlers on the still-running server_runtime. + // main_loop's select! consumed the originals; re-registering here allows + // a second SIGTERM/SIGINT/SIGQUIT to interrupt a long grace period early. + let fast_exit = Arc::new(std::sync::atomic::AtomicBool::new(false)); + #[cfg(unix)] + { + let fast_exit = Arc::clone(&fast_exit); + server_runtime.get_handle().spawn(async move { + let sigterm = unix::signal(unix::SignalKind::terminate()); + let sigint = unix::signal(unix::SignalKind::interrupt()); + let sigquit = unix::signal(unix::SignalKind::quit()); + match (sigterm, sigint, sigquit) { + (Ok(mut sigterm), Ok(mut sigint), Ok(mut sigquit)) => { + tokio::select! { + _ = sigterm.recv() => { + info!("SIGTERM received during grace period, exiting early"); + } + _ = sigint.recv() => { + info!("SIGINT received during grace period, exiting early"); + } + _ = sigquit.recv() => { + info!("SIGQUIT received during grace period, exiting early"); + } + } + fast_exit.store(true, std::sync::atomic::Ordering::Release); + } + _ => { + error!( + "Failed to register signal handlers; \ + grace period will not be interruptible by signals" + ); + } + } + }); + } + for _ in 0..exit_timeout { + if fast_exit.load(std::sync::atomic::Ordering::Acquire) { + break; + } + thread::sleep(Duration::from_secs(1)); + } info!("Graceful shutdown: grace period ends"); } diff --git a/pingora-core/src/server/transfer_fd/mod.rs b/pingora-core/src/server/transfer_fd/mod.rs index 5e616b70..264cb341 100644 --- a/pingora-core/src/server/transfer_fd/mod.rs +++ b/pingora-core/src/server/transfer_fd/mod.rs @@ -31,36 +31,80 @@ use std::{thread, time}; /// Container for open file descriptors and their associated bind addresses. pub struct Fds { + /// Actively used file descriptors, keyed by bind address. + /// Populated by `add()` and by listener setup via `mark_used()`. map: HashMap, + /// Inherited FDs not yet claimed by a listener. + /// Populated exclusively by `deserialize()`. When a listener claims an inherited + /// FD via `add()` or `mark_used()`, the entry is moved from here to `map`. + /// Any entries remaining at `close_unused_inherited()` time are stale (the + /// port was removed from the current config) and will be closed. + inherited_pending: HashMap, } impl Fds { pub fn new() -> Self { Fds { map: HashMap::new(), + inherited_pending: HashMap::new(), } } pub fn add(&mut self, bind: String, fd: RawFd) { + // If this key arrived via deserialize(), move it out of inherited_pending + // so close_unused_inherited() doesn't treat it as stale. + self.inherited_pending.remove(&bind); self.map.insert(bind, fd); } pub fn get(&self, bind: &str) -> Option<&RawFd> { - self.map.get(bind) + // Claimed FDs live in map; inherited-but-not-yet-claimed ones live in + // inherited_pending. Check map first so post-claim lookups are fast. + self.map + .get(bind) + .or_else(|| self.inherited_pending.get(bind)) } pub fn remove(&mut self, bind: &str) { + self.inherited_pending.remove(bind); self.map.remove(bind); } + /// Mark a key as actively used. + /// + /// When an inherited FD is reused directly under its existing key (without going + /// through `add()`), call this to move the entry from `inherited_pending` to `map` + /// so that `close_unused_inherited()` does not treat it as stale. + pub fn mark_used(&mut self, key: &str) { + if let Some(fd) = self.inherited_pending.remove(key) { + self.map.insert(key.to_string(), fd); + } + } + pub fn serialize(&self) -> (Vec, Vec) { + // `inherited_pending` must be empty here — call `close_unused_inherited()` before + // `send_to_sock()` to ensure stale inherited FDs are not forwarded. self.map.iter().map(|(key, val)| (key.clone(), val)).unzip() } pub fn deserialize(&mut self, binds: Vec, fds: Vec) { assert_eq!(binds.len(), fds.len()); for (bind, fd) in binds.into_iter().zip(fds) { - self.map.insert(bind, fd); + // Inherited FDs live in inherited_pending until a listener claims them. + // They are NOT inserted into map until claimed via add() or mark_used(). + self.inherited_pending.insert(bind, fd); + } + } + + /// Close and remove all inherited FDs that were never claimed by a listener. + /// + /// Call this before `send_to_sock()` during a graceful upgrade so that stale FDs + /// (ports removed from the current config) are neither kept open nor forwarded to + /// the next process generation. + pub fn close_unused_inherited(&mut self) { + for (key, fd) in self.inherited_pending.drain() { + log::info!("Closing unclaimed inherited fd {} for {}", fd, key); + let _ = nix::unistd::close(fd); } } @@ -82,6 +126,14 @@ impl Fds { } } +impl Drop for Fds { + fn drop(&mut self) { + // Safety net: close any unclaimed inherited FDs remaining in inherited_pending. + // FDs in map are owned by their respective listeners and must NOT be closed here. + self.close_unused_inherited(); + } +} + fn serialize_vec_string(vec_string: &[String]) -> Vec { // Space-separated serialization. Uses dynamic allocation to avoid silent truncation. vec_string.join(" ").into_bytes() @@ -619,18 +671,22 @@ mod tests { #[test] fn test_table_serde() { init_log(); - let mut fds = Fds::new(); + let fd1 = make_pipe_fd(); + let fd2 = make_pipe_fd(); let key1 = "1.1.1.1:80".to_string(); - fds.add(key1.clone(), 128); let key2 = "1.1.1.1:443".to_string(); - fds.add(key2.clone(), 129); + let mut fds = Fds::new(); + fds.add(key1.clone(), fd1); + fds.add(key2.clone(), fd2); let (k, v) = fds.serialize(); let mut fds2 = Fds::new(); fds2.deserialize(k, v); - assert_eq!(128, *fds2.get(&key1).unwrap()); - assert_eq!(129, *fds2.get(&key2).unwrap()); + assert_eq!(fd1, *fds2.get(&key1).unwrap()); + assert_eq!(fd2, *fds2.get(&key2).unwrap()); + // fds2 drops here: close_unused_inherited() closes fd1 and fd2 (inherited, unclaimed). + // fds drops here: its map entries are NOT closed (add()-owned fds, correct). } #[test] @@ -779,4 +835,133 @@ mod tests { elapsed ); } + + // ── inherited_pending / close_unused_inherited tests ─────────────────── + // + // These tests use real pipe fds (via nix::unistd::pipe()) as the "inherited" fds + // so that close_unused_inherited() operates on fds we actually own, not arbitrary + // integers that might alias open fds belonging to other test threads. + // Each test only checks map state — not fd liveness — to avoid races between + // close() and the OS reusing the fd number. + + fn make_pipe_fd() -> RawFd { + let (read_end, write_end) = nix::unistd::pipe().expect("pipe()"); + // Close the read end immediately; we only need a single real fd for tests. + let _ = nix::unistd::close(read_end); + write_end + } + + #[test] + fn test_close_unused_inherited_removes_unclaimed() { + init_log(); + let fd_stale = make_pipe_fd(); + let fd_active = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize( + vec!["0.0.0.0:8080".to_string(), "0.0.0.0:9090".to_string()], + vec![fd_stale, fd_active], + ); + // Claim one via add() so it is moved from inherited_pending to map. + fds.add("0.0.0.0:9090".to_string(), fd_active); + + fds.close_unused_inherited(); // closes fd_stale, keeps fd_active + + assert!( + fds.get("0.0.0.0:8080").is_none(), + "unclaimed key should not be accessible after close_unused_inherited()" + ); + assert_eq!( + *fds.get("0.0.0.0:9090").unwrap(), + fd_active, + "claimed key should remain in map" + ); + // fd_active is still open — close it to avoid a resource leak. + let _ = nix::unistd::close(fd_active); + } + + #[test] + fn test_serialize_after_cleanup_excludes_stale_fds() { + init_log(); + let fd_stale = make_pipe_fd(); + let fd_active = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize( + vec!["0.0.0.0:1111".to_string(), "0.0.0.0:2222".to_string()], + vec![fd_stale, fd_active], + ); + fds.add("0.0.0.0:2222".to_string(), fd_active); + + fds.close_unused_inherited(); // closes fd_stale + + let (keys, raw_fds) = fds.serialize(); + assert_eq!(keys.len(), 1, "only the active FD should remain"); + assert_eq!(keys[0], "0.0.0.0:2222"); + assert_eq!(raw_fds[0], fd_active); + let _ = nix::unistd::close(fd_active); + } + + #[test] + fn test_mark_used_prevents_removal() { + init_log(); + let fd = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize(vec!["0.0.0.0:3333".to_string()], vec![fd]); + + fds.mark_used("0.0.0.0:3333"); + fds.close_unused_inherited(); // should NOT close fd (was marked used) + + assert_eq!( + *fds.get("0.0.0.0:3333").unwrap(), + fd, + "mark_used() key should remain in map after close_unused_inherited()" + ); + let _ = nix::unistd::close(fd); + } + + #[test] + fn test_remove_clears_inherited_pending() { + init_log(); + let fd = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize(vec!["0.0.0.0:4444".to_string()], vec![fd]); + + fds.remove("0.0.0.0:4444"); + // Should be a no-op — no panics, no double-close. + fds.close_unused_inherited(); + + assert!(fds.get("0.0.0.0:4444").is_none()); + // In production, remove() is called to clean up old-style addr keys after the + // fd was already re-registered under a new per-thread key via add(). + // Here we close the fd directly to avoid a resource leak in the test. + let _ = nix::unistd::close(fd); + } + + #[test] + fn test_add_removes_from_inherited_pending() { + init_log(); + let fd = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize(vec!["0.0.0.0:5555".to_string()], vec![fd]); + + fds.add("0.0.0.0:5555".to_string(), fd); + fds.close_unused_inherited(); // should NOT close fd + + assert_eq!( + *fds.get("0.0.0.0:5555").unwrap(), + fd, + "add()-claimed key should not be removed from map" + ); + let _ = nix::unistd::close(fd); + } + + #[test] + fn test_close_unused_inherited_idempotent() { + init_log(); + let fd = make_pipe_fd(); + let mut fds = Fds::new(); + fds.deserialize(vec!["0.0.0.0:6666".to_string()], vec![fd]); + fds.close_unused_inherited(); // closes fd + fds.close_unused_inherited(); // second call must be a no-op, no EBADF abort + assert!(fds.get("0.0.0.0:6666").is_none()); + } } diff --git a/pingora-core/tests/server_phase_early_exit.rs b/pingora-core/tests/server_phase_early_exit.rs new file mode 100644 index 00000000..03bd2b2c --- /dev/null +++ b/pingora-core/tests/server_phase_early_exit.rs @@ -0,0 +1,104 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: This test sends a shutdown signal to itself, +// so it needs to be in an isolated test to prevent concurrency. + +use pingora_core::server::{configuration::ServerConf, ExecutionPhase, RunArgs, Server}; + +/// Verify that a signal received **during** the grace period causes the server to exit +/// early instead of waiting for the full grace period to elapse. +/// +/// Before this fix, `run()` used a bare `thread::sleep(grace_period)` after consuming +/// the signal handlers in `main_loop`, so no signal could interrupt it. +#[test] +fn test_signal_during_grace_period_exits_early() { + let conf = ServerConf { + // Long grace period — the fix must cause early exit well before this. + grace_period_seconds: Some(30), + graceful_shutdown_timeout_seconds: Some(1), + ..Default::default() + }; + let mut server = Server::new_with_opt_and_conf(None, conf); + + let mut phase = server.watch_execution_phase(); + + let join = std::thread::spawn(move || { + server.bootstrap(); + server.run(RunArgs::default()); + }); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Bootstrap + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::BootstrapComplete, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Running, + )); + + // Wait for the main_loop signal handlers to be installed. + std::thread::sleep(std::time::Duration::from_millis(500)); + + // Trigger a graceful shutdown with a 30-second grace period. + unsafe { + libc::raise(libc::SIGTERM); + } + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::GracefulTerminate, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownStarted, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownGracePeriod, + )); + + // The grace-period polling loop checks fast_exit once per second. Wait long enough + // for the signal-watcher thread to have registered its handlers (first iteration of + // the polling loop takes 1 s) before sending the interrupting signal. + std::thread::sleep(std::time::Duration::from_millis(1500)); + + let interrupt_at = std::time::Instant::now(); + unsafe { + libc::raise(libc::SIGTERM); + } + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownRuntimes, + )); + let elapsed = interrupt_at.elapsed(); + + // Must exit well before the remaining 28+ seconds of the grace period. + assert!( + elapsed.as_secs() < 5, + "expected early exit in <5s after second SIGTERM, took {elapsed:?}" + ); + + join.join().unwrap(); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Terminated, + )); +} diff --git a/pingora-core/tests/server_phase_gracefulupgrade.rs b/pingora-core/tests/server_phase_gracefulupgrade.rs new file mode 100644 index 00000000..aeac75ce --- /dev/null +++ b/pingora-core/tests/server_phase_gracefulupgrade.rs @@ -0,0 +1,90 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: This test sends a shutdown signal to itself, +// so it needs to be in an isolated test to prevent concurrency. + +use pingora_core::server::{configuration::ServerConf, ExecutionPhase, RunArgs, Server}; + +/// Verify that SIGQUIT triggers the GracefulUpgrade execution phase path and that +/// `upgrade_grace_period_seconds` is used for the grace period rather than the +/// (longer) `grace_period_seconds` value. +#[test] +fn test_server_execution_phase_monitor_graceful_upgrade() { + let conf = ServerConf { + // Short grace period so the test finishes quickly. + grace_period_seconds: Some(1), + graceful_shutdown_timeout_seconds: Some(1), + ..Default::default() + }; + let mut server = Server::new_with_opt_and_conf(None, conf); + + let mut phase = server.watch_execution_phase(); + + let join = std::thread::spawn(move || { + server.bootstrap(); + server.run(RunArgs::default()); + }); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Bootstrap + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::BootstrapComplete, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Running, + )); + + // Wait for signal handlers to be installed. + std::thread::sleep(std::time::Duration::from_millis(500)); + + unsafe { + libc::raise(libc::SIGQUIT); + } + + // Server tries to forward FDs to a new process (none exists, send_to_sock will fail, + // but the phase progression continues regardless). + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::GracefulUpgradeTransferringFds, + )); + // CLOSE_TIMEOUT (5 s) elapses so the new process can bind before we stop accepting. + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::GracefulUpgradeCloseTimeout, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownStarted, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownGracePeriod, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownRuntimes, + )); + + join.join().unwrap(); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Terminated, + )); +}