diff --git a/src/health/mod.rs b/src/health/mod.rs new file mode 100644 index 0000000..fcb722b --- /dev/null +++ b/src/health/mod.rs @@ -0,0 +1 @@ +pub mod tcp; diff --git a/src/health/tcp.rs b/src/health/tcp.rs new file mode 100644 index 0000000..9993d7b --- /dev/null +++ b/src/health/tcp.rs @@ -0,0 +1,41 @@ +use std::{sync::atomic::Ordering, time::Duration}; + +use crate::state::{app::SharedAppState, backend::BackendState}; +use anyhow::Result; +use tokio::{net::TcpStream, time::sleep}; +use tracing::info; + +// This will evolve later into: +// - retries +// - thresholds +// - latency tracking +// - richer health states +pub async fn check_backend_status(backend: &BackendState) -> Result<()> { + let backend_address = { format!("{}:{}", backend.config.host, backend.config.port) }; + + match TcpStream::connect(&backend_address).await { + Ok(_) => { + backend.healthy.store(true, Ordering::Relaxed); + info!("backend {} healthy", backend.config.id); + } + Err(_) => { + backend.healthy.store(false, Ordering::Relaxed); + info!("backend {} unreachable", backend.config.id); + } + } + + Ok(()) +} + +pub async fn start_health_checker(state: SharedAppState) { + loop { + let state = state.read().await; + for upstream in &state.upstreams { + for backend in &upstream.backends { + let _ = check_backend_status(backend).await; + } + } + drop(state); + sleep(Duration::from_secs(5)).await; + } +} diff --git a/src/lib.rs b/src/lib.rs index 28f5d37..89058d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod algorithms; pub mod common; pub mod config; +pub mod health; pub mod proxy; pub mod state; diff --git a/src/main.rs b/src/main.rs index 4508d63..7c3c5bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use anyhow::{Result, bail}; use laminar::{ config::{loader::load_config, validator::validate_config}, + health::tcp::start_health_checker, proxy::tcp::start_tcp_proxy, state::app::{AppState, SharedAppState}, }; @@ -39,6 +40,13 @@ async fn main() -> Result<()> { // for upstream in &state.upstreams { // info!("upstream '{}' initialized with {} backends", upstream.id, upstream.backends.len()); // } + + let health_state = shared_state.clone(); + + tokio::spawn(async move { + start_health_checker(health_state).await; + }); + let listener_address = format!("{listener_host}:{listener_port}"); start_tcp_proxy(&listener_address, shared_state).await?; diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 39f32cf..1999dec 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -30,7 +30,13 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh let backend_address = { let state = state.read().await; let upstream = &state.upstreams[0]; - let backend = upstream.next_backend(); + let backend = match upstream.next_backend() { + Some(backend) => backend, + None => { + error!("no healthy backend available"); + return Ok(()); + } + }; format!("{}:{}", backend.config.host, backend.config.port) }; @@ -38,8 +44,6 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh let mut backend_stream = TcpStream::connect(&backend_address).await?; - drop(state); - copy_bidirectional(&mut stream, &mut backend_stream).await?; Ok(()) diff --git a/src/state/app.rs b/src/state/app.rs index db63271..b71b9ee 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -12,9 +12,15 @@ pub struct UpstreamPool { impl UpstreamPool { // Very naive round robin. - pub fn next_backend(&self) -> &BackendState { - let index = self.current_index.fetch_add(1, Ordering::Relaxed); - &self.backends[index % self.backends.len()] + pub fn next_backend(&self) -> Option<&BackendState> { + for _ in 0..self.backends.len() { + let index = self.current_index.fetch_add(1, Ordering::Relaxed); + let backend = &self.backends[index % self.backends.len()]; + if backend.healthy.load(Ordering::Relaxed) { + return Some(backend); + } + } + None } } // Central shared runtime state for the entire load balancer. diff --git a/src/state/backend.rs b/src/state/backend.rs index dd20e3f..1a8a128 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicBool; + // use std::sync::Arc; // use tokio::sync::RwLock; use crate::config::BackendServerConfig; @@ -13,7 +15,7 @@ pub struct BackendState { // Temporary boolean health indicator. // This will later evolve into a richer health state machine: // Healthy -> Unhealthy -> Recovering - pub healthy: bool, + pub healthy: AtomicBool, // This becomes important for least-connections balancing. pub active_connections: usize, @@ -24,6 +26,11 @@ pub struct BackendState { impl BackendState { pub fn new(config: BackendServerConfig) -> Self { - Self { config, healthy: true, active_connections: 0, failed_health_checks: 0 } + Self { + config, + healthy: AtomicBool::new(true), + active_connections: 0, + failed_health_checks: 0, + } } }