Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 192 additions & 3 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,14 @@ impl<DB: Database> PoolInner<DB> {
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<Floating<DB, Idle<DB>>, AsyncSemaphoreReleaser<'a>> {
if let Some(idle) = self.idle_conns.pop() {
self.num_idle.fetch_sub(1, Ordering::AcqRel);
// Saturating: never underflow even if a concurrent `release` hasn't yet published
// its increment. An underflow would wrap `num_idle` to `usize::MAX` and wedge the
// maintenance task in a non-yielding spin (see `release` for the full invariant).
let _ = self
.num_idle
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
Some(n.saturating_sub(1))
});
Ok(Floating::from_idle(idle, (*self).clone(), permit))
} else {
Err(permit)
Expand All @@ -206,15 +213,21 @@ impl<DB: Database> PoolInner<DB> {

let Floating { inner: idle, guard } = floating.into_idle();

// Bump the idle counter *before* the connection becomes acquirable, so a concurrent
// `pop_idle` can never observe a popped connection without a matching increment.
// (Otherwise `num_idle.fetch_sub` can underflow a `usize` to `usize::MAX`, which makes
// the maintenance task's `for _ in 0..num_idle()` loop spin ~forever, pegging a CPU.)
// Over-counting transiently (incremented, not yet pushed) is harmless: `pop_idle`
// simply finds an empty queue and returns the permit without decrementing.
self.num_idle.fetch_add(1, Ordering::AcqRel);

if self.idle_conns.push(idle).is_err() {
panic!("BUG: connection queue overflow in release()");
}

// NOTE: we need to make sure we drop the permit *after* we push to the idle queue
// don't decrease the size
guard.release_permit();

self.num_idle.fetch_add(1, Ordering::AcqRel);
}

/// Try to atomically increment the pool size for a new connection.
Expand Down Expand Up @@ -621,3 +634,179 @@ impl<DB: Database> Drop for DecrementSizeGuard<DB> {
}
}
}

// Uses the in-crate `Any` database with a stub backend so we can drive `PoolInner` internals
// directly. (We can't use a real driver here: the only `Database` impls outside this crate
// would be a different `sqlx-core` instance via the dev-dependency cycle.)
#[cfg(all(test, feature = "any"))]
mod underflow_tests {
use super::*;
use crate::any::{
Any, AnyArguments, AnyConnectOptions, AnyConnection, AnyConnectionBackend, AnyQueryResult,
AnyRow, AnyStatement, AnyTypeInfo,
};
use crate::pool::Pool;
use crate::sql_str::SqlStr;
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use std::str::FromStr;

/// A backend that constructs but never executes anything. The pool's `release`/`pop_idle`
/// paths only move the opaque connection around — they never call any of these methods.
#[derive(Debug)]
struct StubBackend;

impl AnyConnectionBackend for StubBackend {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would take advice on how to write the test here - this is a bit gnarly, but it worked.

fn name(&self) -> &str {
"stub"
}
fn close(self: Box<Self>) -> BoxFuture<'static, crate::Result<()>> {
unimplemented!()
}
fn close_hard(self: Box<Self>) -> BoxFuture<'static, crate::Result<()>> {
unimplemented!()
}
fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> {
unimplemented!()
}
fn begin(&mut self, _statement: Option<SqlStr>) -> BoxFuture<'_, crate::Result<()>> {
unimplemented!()
}
fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>> {
unimplemented!()
}
fn rollback(&mut self) -> BoxFuture<'_, crate::Result<()>> {
unimplemented!()
}
fn start_rollback(&mut self) {
unimplemented!()
}
fn shrink_buffers(&mut self) {
unimplemented!()
}
fn flush(&mut self) -> BoxFuture<'_, crate::Result<()>> {
unimplemented!()
}
fn should_flush(&self) -> bool {
unimplemented!()
}
fn fetch_many(
&mut self,
_query: SqlStr,
_persistent: bool,
_arguments: Option<AnyArguments>,
) -> BoxStream<'_, crate::Result<Either<AnyQueryResult, AnyRow>>> {
unimplemented!()
}
fn fetch_optional(
&mut self,
_query: SqlStr,
_persistent: bool,
_arguments: Option<AnyArguments>,
) -> BoxFuture<'_, crate::Result<Option<AnyRow>>> {
unimplemented!()
}
fn prepare_with<'c, 'q: 'c>(
&'c mut self,
_sql: SqlStr,
_parameters: &[AnyTypeInfo],
) -> BoxFuture<'c, crate::Result<AnyStatement>> {
unimplemented!()
}
#[cfg(feature = "offline")]
fn describe(
&mut self,
_sql: SqlStr,
) -> BoxFuture<'_, crate::Result<crate::describe::Describe<Any>>> {
unimplemented!()
}
}

fn make_live(inner: &Arc<PoolInner<Any>>) -> Floating<Any, Live<Any>> {
// Mint a live connection like `acquire` would: take a size slot + a permit, wrap a
// stub connection. The pool's `release`/`try_acquire` never touch the connection
// itself, so a stub backend that panics on use is fine.
inner.size.fetch_add(1, Ordering::AcqRel);
let permit = inner
.semaphore
.try_acquire(1)
.expect("a permit should be available");
let guard = DecrementSizeGuard::from_permit(Arc::clone(inner), permit);
let conn = AnyConnection {
backend: Box::new(StubBackend),
};
Floating::new_live(conn, guard)
}

// Stress test for the `num_idle` underflow race, exercised entirely through the real
// `try_acquire` and `release` paths.
//
// The bug: `release` published a returned connection (push to the idle queue + release its
// semaphore permit) *before* incrementing `num_idle`. A concurrent `try_acquire` could pop
// that connection and run its `pop_idle` decrement while `num_idle` was still 0, wrapping
// the `usize` to `usize::MAX`. Downstream, the maintenance task's `for _ in 0..num_idle()`
// sweep would then spin ~forever on a synchronous body, pinning a worker thread (the
// reported 100%-CPU-on-shutdown symptom).
//
// Many threads hammer acquire+release on a small, heavily oversubscribed pool, so the idle
// count is constantly driven to 0 right as another thread is publishing a connection —
// exactly the interleaving that underflows. Every thread samples `num_idle()` after each
// op and records the largest value ever seen; an underflow shows up as a value far above
// `max_connections`. This requires real OS threads: the race window in `release` has no
// `.await`, so it cannot interleave on a single-threaded executor.
#[test]
fn release_try_acquire_stress_never_underflows_num_idle() {
use std::sync::atomic::AtomicUsize;
use std::thread;

const MAX_CONNS: u32 = 3;
const WORKERS: usize = 12;
const ITERS: usize = 200_000;

let opts = AnyConnectOptions::from_str("stub::memory:").expect("parse url");
let pool: Pool<Any> = PoolOptions::new()
.max_connections(MAX_CONNS)
.min_connections(0)
.connect_lazy_with(opts);
let inner = Arc::clone(&pool.0);

// Pre-fill the pool with `MAX_CONNS` idle connections via the real `release` path.
for _ in 0..MAX_CONNS {
inner.release(make_live(&inner));
}
assert_eq!(inner.num_idle(), MAX_CONNS as usize);

// Sticky high-water mark of `num_idle` observed across all threads. A wrapped
// (underflowed) read is `usize::MAX`, far above `MAX_CONNS`.
let max_seen = Arc::new(AtomicUsize::new(0));

let mut handles = Vec::new();
for _ in 0..WORKERS {
let inner = Arc::clone(&inner);
let max_seen = Arc::clone(&max_seen);
handles.push(thread::spawn(move || {
for _ in 0..ITERS {
// Real acquire/release cycle, racing the other threads.
if let Some(idle) = inner.try_acquire() {
inner.release(idle.into_live());
}
max_seen.fetch_max(inner.num_idle(), Ordering::Relaxed);
}
}));
}
for h in handles {
h.join().expect("worker thread panicked");
}

let observed = max_seen.load(Ordering::Relaxed);
assert!(
observed <= MAX_CONNS as usize,
"num_idle exceeded max_connections (saw {observed}, max {MAX_CONNS}); \
this indicates a `num_idle` underflow"
);

// Drain so the stub connections' size slots are released before the pool drops.
while inner.try_acquire().is_some() {}
}
}
Loading