diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 0bd013c..933a455 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,4 +1,3 @@ -use anyhow::Ok; use tokio::{ io::copy_bidirectional, net::{TcpListener, TcpStream}, @@ -28,26 +27,52 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re } async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> { - let guard = { + let retry_attempt = { let state = state.read().await; - let upstream = &state.upstreams[0]; - let backend_arc = match upstream.next_backend() { - Some(backend) => backend.clone(), - None => { - error!("no healthy backend available"); - return Ok(()); - } - }; - ConnectionGuard::new(backend_arc) - // format!("{}:{}", backend.config.host, backend.config.port) + state.retry_attempts }; - let backend_address = guard.address(); - info!("forwarding traffic to {}", backend_address); + // retry attempts mean how many times we should try + // connecting the client to a suitable backend server + // + // if a backend is available: + // route the traffic normally + // + // if connection fails: + // mark that backend unhealthy so future selections skip it + for _ in 0..retry_attempt { + let guard = { + let state = state.read().await; + let upstream = &state.upstreams[0]; + let backend_arc = match upstream.next_backend() { + Some(backend) => backend, + None => { + error!("no healthy backend available"); + return Ok(()); + } + }; + ConnectionGuard::new(backend_arc) + // format!("{}:{}", backend.config.host, backend.config.port) + }; + let backend_address = guard.address(); - let mut backend_stream = TcpStream::connect(&backend_address).await?; + info!("forwarding traffic to {}", backend_address); - copy_bidirectional(&mut stream, &mut backend_stream).await?; + match TcpStream::connect(&backend_address).await { + Ok(mut backend_stream) => { + copy_bidirectional(&mut stream, &mut backend_stream).await?; + return Ok(()); + } + Err(error) => { + // this is very important .. + guard.mark_backend_unhealthy(); + error!("failed to connect to backend {}: {:?}", backend_address, error); + // retry another + continue; + } + } + } + error!("all backend retry attempts failed"); Ok(()) } diff --git a/src/state/app.rs b/src/state/app.rs index 8acd020..7412eac 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -38,6 +38,7 @@ impl UpstreamPool { // - admin APIs #[derive(Debug)] pub struct AppState { + pub retry_attempts: usize, pub upstreams: Vec, } @@ -63,12 +64,13 @@ impl AppState { id: upstream.id, current_index: AtomicUsize::new(0), algorithm: upstream.algorithm, + backends, // all backends belonging to a single upstream type ( single logical service) } }) .collect(); - Self { upstreams } + Self { upstreams, retry_attempts: config.load_balancer.retry_attempts } } } diff --git a/src/state/backend.rs b/src/state/backend.rs index b63c927..34f68ff 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -36,10 +36,16 @@ impl ConnectionGuard { Self { backend } } + pub fn backend_id(&self) -> &str { + &self.backend.config.id + } // get the address from the guard pub fn address(&self) -> String { format!("{}:{}", self.backend.config.host, self.backend.config.port) } + pub fn mark_backend_unhealthy(&self) { + self.backend.healthy.store(false, Ordering::Relaxed); + } } impl Drop for ConnectionGuard {