From d8fc5074a6d14438cdbeb921884bf370cf46cf79 Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Fri, 29 May 2026 16:11:50 -0700 Subject: [PATCH 1/2] fix(pool): prevent num_idle underflow that wedges maintenance in a CPU spin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #3645 - A rare 100% CPU and a hang on shutdown. I made this fix w/ the assistance of an LLM, but I was looking quite carefully to make sure the code changes make sense. I opted to repro via a stress test - which shows the underflow if you run the test without the fix. Leaving an LLM generated description down below in case you find it helpful. --------------------------------------------------------------------- `PoolInner::release` made a returned connection acquirable (push to the idle queue + release its semaphore permit) *before* incrementing `num_idle`. A concurrent `pop_idle` (via `try_acquire`) could pop that connection and run its `num_idle.fetch_sub(1)` in the window before the increment landed; if `num_idle` was 0 at that moment the `usize` wrapped to `usize::MAX`. The background maintenance task builds its sweep range from `for _ in 0..pool.num_idle()`, and once the idle queue drains the loop body is fully synchronous (`try_acquire`/`release` never `.await`). So a single bad read of `usize::MAX` makes the task spin ~10^19 iterations without yielding, pinning a worker thread forever and never observing the pool close. This is wide open on shutdown, where dropping every `PoolConnection` spawns a `release`, racing the maintenance tick. A `MultiDbPool` (one pool per database) wedges several tasks at once, pegging every worker thread — the reported 100%-CPU-on-shutdown symptom. Fixes, in `sqlx-core/src/pool/inner.rs`: - release: increment `num_idle` *before* publishing the connection. With this order each completed `push` is preceded by its `fetch_add`, so `#dec <= #pop <= #push <= #inc` and `num_idle = #inc - #dec >= 0` always. Transient over-counting is harmless (`pop_idle` finds an empty queue and returns the permit without decrementing). The permit is still released after the push, preserving the woken-waiter-sees-the-connection invariant. - pop_idle: saturating decrement, so a stray `fetch_sub` can never wrap even if a future change reintroduces an ordering bug. Adds a multi-threaded stress test that drives the real `try_acquire` and `release` paths on a small, heavily oversubscribed pool (using the in-crate `Any` database with a stub backend, so no DB or runtime is needed) and asserts the sampled `num_idle` never exceeds `max_connections`. It catches the wrapped `usize::MAX` reliably on the original code (5/5 debug, 3/3 release runs) and passes with the fix. Co-Authored-By: Claude Opus 4.8 (1M context) --- sqlx-core/src/pool/inner.rs | 195 +++++++++++++++++++++++++++++++++++- 1 file changed, 192 insertions(+), 3 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index b698dc9df0..f19bf953ba 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -194,7 +194,14 @@ impl PoolInner { permit: AsyncSemaphoreReleaser<'a>, ) -> Result>, AsyncSemaphoreReleaser<'a>> { if let Some(idle) = self.idle_conns.pop() { - self.num_idle.fetch_sub(1, Ordering::AcqRel); + // Saturating: never underflow even if a concurrent `release` hasn't yet published + // its increment. An underflow would wrap `num_idle` to `usize::MAX` and wedge the + // maintenance task in a non-yielding spin (see `release` for the full invariant). + let _ = self.num_idle.fetch_update( + Ordering::AcqRel, + Ordering::Acquire, + |n| Some(n.saturating_sub(1)), + ); Ok(Floating::from_idle(idle, (*self).clone(), permit)) } else { Err(permit) @@ -206,6 +213,14 @@ impl PoolInner { let Floating { inner: idle, guard } = floating.into_idle(); + // Bump the idle counter *before* the connection becomes acquirable, so a concurrent + // `pop_idle` can never observe a popped connection without a matching increment. + // (Otherwise `num_idle.fetch_sub` can underflow a `usize` to `usize::MAX`, which makes + // the maintenance task's `for _ in 0..num_idle()` loop spin ~forever, pegging a CPU.) + // Over-counting transiently (incremented, not yet pushed) is harmless: `pop_idle` + // simply finds an empty queue and returns the permit without decrementing. + self.num_idle.fetch_add(1, Ordering::AcqRel); + if self.idle_conns.push(idle).is_err() { panic!("BUG: connection queue overflow in release()"); } @@ -213,8 +228,6 @@ impl PoolInner { // NOTE: we need to make sure we drop the permit *after* we push to the idle queue // don't decrease the size guard.release_permit(); - - self.num_idle.fetch_add(1, Ordering::AcqRel); } /// Try to atomically increment the pool size for a new connection. @@ -621,3 +634,179 @@ impl Drop for DecrementSizeGuard { } } } + +// Uses the in-crate `Any` database with a stub backend so we can drive `PoolInner` internals +// directly. (We can't use a real driver here: the only `Database` impls outside this crate +// would be a different `sqlx-core` instance via the dev-dependency cycle.) +#[cfg(all(test, feature = "any"))] +mod underflow_tests { + use super::*; + use crate::any::{ + Any, AnyArguments, AnyConnectOptions, AnyConnection, AnyConnectionBackend, AnyQueryResult, + AnyRow, AnyStatement, AnyTypeInfo, + }; + use crate::pool::Pool; + use crate::sql_str::SqlStr; + use either::Either; + use futures_core::future::BoxFuture; + use futures_core::stream::BoxStream; + use std::str::FromStr; + + /// A backend that constructs but never executes anything. The pool's `release`/`pop_idle` + /// paths only move the opaque connection around — they never call any of these methods. + #[derive(Debug)] + struct StubBackend; + + impl AnyConnectionBackend for StubBackend { + fn name(&self) -> &str { + "stub" + } + fn close(self: Box) -> BoxFuture<'static, crate::Result<()>> { + unimplemented!() + } + fn close_hard(self: Box) -> BoxFuture<'static, crate::Result<()>> { + unimplemented!() + } + fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn begin(&mut self, _statement: Option) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn rollback(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn start_rollback(&mut self) { + unimplemented!() + } + fn shrink_buffers(&mut self) { + unimplemented!() + } + fn flush(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn should_flush(&self) -> bool { + unimplemented!() + } + fn fetch_many( + &mut self, + _query: SqlStr, + _persistent: bool, + _arguments: Option, + ) -> BoxStream<'_, crate::Result>> { + unimplemented!() + } + fn fetch_optional( + &mut self, + _query: SqlStr, + _persistent: bool, + _arguments: Option, + ) -> BoxFuture<'_, crate::Result>> { + unimplemented!() + } + fn prepare_with<'c, 'q: 'c>( + &'c mut self, + _sql: SqlStr, + _parameters: &[AnyTypeInfo], + ) -> BoxFuture<'c, crate::Result> { + unimplemented!() + } + #[cfg(feature = "offline")] + fn describe( + &mut self, + _sql: SqlStr, + ) -> BoxFuture<'_, crate::Result>> { + unimplemented!() + } + } + + fn make_live(inner: &Arc>) -> Floating> { + // Mint a live connection like `acquire` would: take a size slot + a permit, wrap a + // stub connection. The pool's `release`/`try_acquire` never touch the connection + // itself, so a stub backend that panics on use is fine. + inner.size.fetch_add(1, Ordering::AcqRel); + let permit = inner + .semaphore + .try_acquire(1) + .expect("a permit should be available"); + let guard = DecrementSizeGuard::from_permit(Arc::clone(inner), permit); + let conn = AnyConnection { + backend: Box::new(StubBackend), + }; + Floating::new_live(conn, guard) + } + + // Stress test for the `num_idle` underflow race, exercised entirely through the real + // `try_acquire` and `release` paths. + // + // The bug: `release` published a returned connection (push to the idle queue + release its + // semaphore permit) *before* incrementing `num_idle`. A concurrent `try_acquire` could pop + // that connection and run its `pop_idle` decrement while `num_idle` was still 0, wrapping + // the `usize` to `usize::MAX`. Downstream, the maintenance task's `for _ in 0..num_idle()` + // sweep would then spin ~forever on a synchronous body, pinning a worker thread (the + // reported 100%-CPU-on-shutdown symptom). + // + // Many threads hammer acquire+release on a small, heavily oversubscribed pool, so the idle + // count is constantly driven to 0 right as another thread is publishing a connection — + // exactly the interleaving that underflows. Every thread samples `num_idle()` after each + // op and records the largest value ever seen; an underflow shows up as a value far above + // `max_connections`. This requires real OS threads: the race window in `release` has no + // `.await`, so it cannot interleave on a single-threaded executor. + #[test] + fn release_try_acquire_stress_never_underflows_num_idle() { + use std::sync::atomic::AtomicUsize; + use std::thread; + + const MAX_CONNS: u32 = 3; + const WORKERS: usize = 12; + const ITERS: usize = 200_000; + + let opts = AnyConnectOptions::from_str("stub::memory:").expect("parse url"); + let pool: Pool = PoolOptions::new() + .max_connections(MAX_CONNS) + .min_connections(0) + .connect_lazy_with(opts); + let inner = Arc::clone(&pool.0); + + // Pre-fill the pool with `MAX_CONNS` idle connections via the real `release` path. + for _ in 0..MAX_CONNS { + inner.release(make_live(&inner)); + } + assert_eq!(inner.num_idle(), MAX_CONNS as usize); + + // Sticky high-water mark of `num_idle` observed across all threads. A wrapped + // (underflowed) read is `usize::MAX`, far above `MAX_CONNS`. + let max_seen = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..WORKERS { + let inner = Arc::clone(&inner); + let max_seen = Arc::clone(&max_seen); + handles.push(thread::spawn(move || { + for _ in 0..ITERS { + // Real acquire/release cycle, racing the other threads. + if let Some(idle) = inner.try_acquire() { + inner.release(idle.into_live()); + } + max_seen.fetch_max(inner.num_idle(), Ordering::Relaxed); + } + })); + } + for h in handles { + h.join().expect("worker thread panicked"); + } + + let observed = max_seen.load(Ordering::Relaxed); + assert!( + observed <= MAX_CONNS as usize, + "num_idle exceeded max_connections (saw {observed}, max {MAX_CONNS}); \ + this indicates a `num_idle` underflow" + ); + + // Drain so the stub connections' size slots are released before the pool drops. + while inner.try_acquire().is_some() {} + } +} From c714af63e54bf1bb124cf592fec413c596de2118 Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Fri, 29 May 2026 18:45:27 -0700 Subject: [PATCH 2/2] Formatting --- sqlx-core/src/pool/inner.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index f19bf953ba..e47663d236 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -197,11 +197,11 @@ impl PoolInner { // Saturating: never underflow even if a concurrent `release` hasn't yet published // its increment. An underflow would wrap `num_idle` to `usize::MAX` and wedge the // maintenance task in a non-yielding spin (see `release` for the full invariant). - let _ = self.num_idle.fetch_update( - Ordering::AcqRel, - Ordering::Acquire, - |n| Some(n.saturating_sub(1)), - ); + let _ = self + .num_idle + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { + Some(n.saturating_sub(1)) + }); Ok(Floating::from_idle(idle, (*self).clone(), permit)) } else { Err(permit)