From e8a22cd029d833af022b4a671f718cc99ca09dab Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 16 May 2026 11:59:41 +0530 Subject: [PATCH 1/2] feat: add backend retry failover handling --- src/proxy/tcp.rs | 58 ++++++++++++++++++++++++++++++++------------ src/state/app.rs | 4 ++- src/state/backend.rs | 6 +++++ 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 0bd013c..17183c8 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,4 +1,4 @@ -use anyhow::Ok; + use tokio::{ io::copy_bidirectional, net::{TcpListener, TcpStream}, @@ -28,26 +28,52 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re } async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> { - let guard = { + let retry_attempt = { let state = state.read().await; - let upstream = &state.upstreams[0]; - let backend_arc = match upstream.next_backend() { - Some(backend) => backend.clone(), - None => { - error!("no healthy backend available"); - return Ok(()); - } - }; - ConnectionGuard::new(backend_arc) - // format!("{}:{}", backend.config.host, backend.config.port) + state.retry_attempts }; - let backend_address = guard.address(); - info!("forwarding traffic to {}", backend_address); + // retry attempts mean how many times we should try + // connecting the client to a suitable backend server + // + // if a backend is available: + // route the traffic normally + // + // if connection fails: + // mark that backend unhealthy so future selections skip it + for _ in 0..retry_attempt { + let guard = { + let state = state.read().await; + let upstream = &state.upstreams[0]; + let backend_arc = match upstream.next_backend() { + Some(backend) => backend, + None => { + error!("no healthy backend available"); + return Ok(()); + } + }; + ConnectionGuard::new(backend_arc) + // format!("{}:{}", backend.config.host, backend.config.port) + }; + let backend_address = guard.address(); - let mut backend_stream = TcpStream::connect(&backend_address).await?; + info!("forwarding traffic to {}", backend_address); - copy_bidirectional(&mut stream, &mut backend_stream).await?; + match TcpStream::connect(&backend_address).await { + Ok(mut backend_stream) => { + copy_bidirectional(&mut stream, &mut backend_stream).await?; + return Ok(()); + } + Err(error) => { + // this is very important .. + guard.mark_backend_unhealthy(); + error!("failed to connect to backend {}: {:?}", backend_address, error); + // retry another + continue; + } + } + } + error!("all backend retry attempts failed"); Ok(()) } diff --git a/src/state/app.rs b/src/state/app.rs index 8acd020..7412eac 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -38,6 +38,7 @@ impl UpstreamPool { // - admin APIs #[derive(Debug)] pub struct AppState { + pub retry_attempts: usize, pub upstreams: Vec, } @@ -63,12 +64,13 @@ impl AppState { id: upstream.id, current_index: AtomicUsize::new(0), algorithm: upstream.algorithm, + backends, // all backends belonging to a single upstream type ( single logical service) } }) .collect(); - Self { upstreams } + Self { upstreams, retry_attempts: config.load_balancer.retry_attempts } } } diff --git a/src/state/backend.rs b/src/state/backend.rs index b63c927..34f68ff 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -36,10 +36,16 @@ impl ConnectionGuard { Self { backend } } + pub fn backend_id(&self) -> &str { + &self.backend.config.id + } // get the address from the guard pub fn address(&self) -> String { format!("{}:{}", self.backend.config.host, self.backend.config.port) } + pub fn mark_backend_unhealthy(&self) { + self.backend.healthy.store(false, Ordering::Relaxed); + } } impl Drop for ConnectionGuard { From 0aa7c5431fc9512276e25b04f14dd11dd5839da7 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 16 May 2026 12:02:43 +0530 Subject: [PATCH 2/2] fix: cargo fmt diff issue --- src/proxy/tcp.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 17183c8..933a455 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,4 +1,3 @@ - use tokio::{ io::copy_bidirectional, net::{TcpListener, TcpStream},