From bdf1eb772fe7d4d8b9d5abbac397b7d4bae5fc2f Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 15 May 2026 17:34:01 +0530 Subject: [PATCH 1/2] feat: implement health check functionality and improve backend state management --- src/health/mod.rs | 1 + src/health/tcp.rs | 41 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 19 +++++++++++++++---- src/proxy/tcp.rs | 17 +++++++++-------- src/state/app.rs | 19 ++++++++++++------- src/state/backend.rs | 11 +++++++++-- 7 files changed, 88 insertions(+), 21 deletions(-) create mode 100644 src/health/mod.rs create mode 100644 src/health/tcp.rs diff --git a/src/health/mod.rs b/src/health/mod.rs new file mode 100644 index 0000000..fcb722b --- /dev/null +++ b/src/health/mod.rs @@ -0,0 +1 @@ +pub mod tcp; diff --git a/src/health/tcp.rs b/src/health/tcp.rs new file mode 100644 index 0000000..fcb2d2e --- /dev/null +++ b/src/health/tcp.rs @@ -0,0 +1,41 @@ +use std::{ sync::atomic::Ordering, time::Duration }; + +use crate::state::{ app::SharedAppState, backend::BackendState }; +use anyhow::{ Result }; +use tokio::{ net::TcpStream, time::sleep }; +use tracing::info; + +// This will evolve later into: +// - retries +// - thresholds +// - latency tracking +// - richer health states +pub async fn check_backend_status(backend: &BackendState) -> Result<()> { + let backend_address = { format!("{}:{}", backend.config.host, backend.config.port) }; + + match TcpStream::connect(&backend_address).await { + Ok(_) => { + backend.healthy.store(true, Ordering::Relaxed); + info!("backend {} healthy", backend.config.id); + } + Err(_) => { + backend.healthy.store(false, Ordering::Relaxed); + info!("backend {} unreachable", backend.config.id); + } + } + + Ok(()) +} + +pub async fn start_health_checker(state: SharedAppState) { + loop { + let state = state.read().await; + for upstream in &state.upstreams { + for backend in &upstream.backends { + let _ = check_backend_status(backend).await; + } + } + drop(state); + sleep(Duration::from_secs(5)).await; + } +} diff --git a/src/lib.rs b/src/lib.rs index 28f5d37..26ad83e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,4 @@ pub mod common; pub mod config; pub mod proxy; pub mod state; +pub mod health; diff --git a/src/main.rs b/src/main.rs index 4508d63..369bb86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,12 @@ #![warn(clippy::all)] #![warn(clippy::pedantic)] -use anyhow::{Result, bail}; +use anyhow::{ Result, bail }; use laminar::{ - config::{loader::load_config, validator::validate_config}, + config::{ loader::load_config, validator::validate_config }, + health::tcp::start_health_checker, proxy::tcp::start_tcp_proxy, - state::app::{AppState, SharedAppState}, + state::app::{ AppState, SharedAppState }, }; use std::sync::Arc; use tokio::sync::RwLock; @@ -15,7 +16,10 @@ use tracing::info; async fn main() -> Result<()> { tracing_subscriber::fmt::init(); - let path = std::env::args().nth(1).unwrap_or_else(|| "laminar_config.yaml".to_string()); + let path = std::env + ::args() + .nth(1) + .unwrap_or_else(|| "laminar_config.yaml".to_string()); info!("loading config from {}", path); let config = load_config(&path)?; @@ -39,6 +43,13 @@ async fn main() -> Result<()> { // for upstream in &state.upstreams { // info!("upstream '{}' initialized with {} backends", upstream.id, upstream.backends.len()); // } + + let health_state = shared_state.clone(); + + tokio::spawn(async move { + start_health_checker(health_state).await; + }); + let listener_address = format!("{listener_host}:{listener_port}"); start_tcp_proxy(&listener_address, shared_state).await?; diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 39f32cf..a3412e8 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,9 +1,6 @@ use anyhow::Ok; -use tokio::{ - io::copy_bidirectional, - net::{TcpListener, TcpStream}, -}; -use tracing::{error, info}; +use tokio::{ io::copy_bidirectional, net::{ TcpListener, TcpStream } }; +use tracing::{ error, info }; use crate::state::app::SharedAppState; @@ -30,7 +27,13 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh let backend_address = { let state = state.read().await; let upstream = &state.upstreams[0]; - let backend = upstream.next_backend(); + let backend = match upstream.next_backend() { + Some(backend) => backend, + None => { + error!("no healthy backend available"); + return Ok(()); + } + }; format!("{}:{}", backend.config.host, backend.config.port) }; @@ -38,8 +41,6 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh let mut backend_stream = TcpStream::connect(&backend_address).await?; - drop(state); - copy_bidirectional(&mut stream, &mut backend_stream).await?; Ok(()) diff --git a/src/state/app.rs b/src/state/app.rs index db63271..797544f 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -1,6 +1,6 @@ -use crate::{config::types::Config, state::backend::BackendState}; +use crate::{ config::types::Config, state::backend::BackendState }; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{ AtomicUsize, Ordering }; use tokio::sync::RwLock; // Contains all backend servers belonging to a single logical service. #[derive(Debug)] @@ -12,9 +12,15 @@ pub struct UpstreamPool { impl UpstreamPool { // Very naive round robin. - pub fn next_backend(&self) -> &BackendState { - let index = self.current_index.fetch_add(1, Ordering::Relaxed); - &self.backends[index % self.backends.len()] + pub fn next_backend(&self) -> Option<&BackendState> { + for _ in 0..self.backends.len() { + let index = self.current_index.fetch_add(1, Ordering::Relaxed); + let backend = &self.backends[index % self.backends.len()]; + if backend.healthy.load(Ordering::Relaxed) { + return Some(backend); + } + } + return None; } } // Central shared runtime state for the entire load balancer. @@ -37,8 +43,7 @@ impl AppState { // each upstream has an id, algorithm and servers( yes group of servers) // each server has id, host, port, weight - let upstreams = config - .upstreams + let upstreams = config.upstreams .into_iter() .map(|upstream| { // upstream has id, algorithm and servers diff --git a/src/state/backend.rs b/src/state/backend.rs index dd20e3f..1a8a128 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicBool; + // use std::sync::Arc; // use tokio::sync::RwLock; use crate::config::BackendServerConfig; @@ -13,7 +15,7 @@ pub struct BackendState { // Temporary boolean health indicator. // This will later evolve into a richer health state machine: // Healthy -> Unhealthy -> Recovering - pub healthy: bool, + pub healthy: AtomicBool, // This becomes important for least-connections balancing. pub active_connections: usize, @@ -24,6 +26,11 @@ pub struct BackendState { impl BackendState { pub fn new(config: BackendServerConfig) -> Self { - Self { config, healthy: true, active_connections: 0, failed_health_checks: 0 } + Self { + config, + healthy: AtomicBool::new(true), + active_connections: 0, + failed_health_checks: 0, + } } } From 7347c1c1aa6dedb0d75f78ea87e492911435258d Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 15 May 2026 17:35:12 +0530 Subject: [PATCH 2/2] cargo fmt and clippy --- src/health/tcp.rs | 8 ++++---- src/lib.rs | 2 +- src/main.rs | 11 ++++------- src/proxy/tcp.rs | 7 +++++-- src/state/app.rs | 9 +++++---- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/health/tcp.rs b/src/health/tcp.rs index fcb2d2e..9993d7b 100644 --- a/src/health/tcp.rs +++ b/src/health/tcp.rs @@ -1,8 +1,8 @@ -use std::{ sync::atomic::Ordering, time::Duration }; +use std::{sync::atomic::Ordering, time::Duration}; -use crate::state::{ app::SharedAppState, backend::BackendState }; -use anyhow::{ Result }; -use tokio::{ net::TcpStream, time::sleep }; +use crate::state::{app::SharedAppState, backend::BackendState}; +use anyhow::Result; +use tokio::{net::TcpStream, time::sleep}; use tracing::info; // This will evolve later into: diff --git a/src/lib.rs b/src/lib.rs index 26ad83e..89058d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ pub mod algorithms; pub mod common; pub mod config; +pub mod health; pub mod proxy; pub mod state; -pub mod health; diff --git a/src/main.rs b/src/main.rs index 369bb86..7c3c5bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,12 @@ #![warn(clippy::all)] #![warn(clippy::pedantic)] -use anyhow::{ Result, bail }; +use anyhow::{Result, bail}; use laminar::{ - config::{ loader::load_config, validator::validate_config }, + config::{loader::load_config, validator::validate_config}, health::tcp::start_health_checker, proxy::tcp::start_tcp_proxy, - state::app::{ AppState, SharedAppState }, + state::app::{AppState, SharedAppState}, }; use std::sync::Arc; use tokio::sync::RwLock; @@ -16,10 +16,7 @@ use tracing::info; async fn main() -> Result<()> { tracing_subscriber::fmt::init(); - let path = std::env - ::args() - .nth(1) - .unwrap_or_else(|| "laminar_config.yaml".to_string()); + let path = std::env::args().nth(1).unwrap_or_else(|| "laminar_config.yaml".to_string()); info!("loading config from {}", path); let config = load_config(&path)?; diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index a3412e8..1999dec 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,6 +1,9 @@ use anyhow::Ok; -use tokio::{ io::copy_bidirectional, net::{ TcpListener, TcpStream } }; -use tracing::{ error, info }; +use tokio::{ + io::copy_bidirectional, + net::{TcpListener, TcpStream}, +}; +use tracing::{error, info}; use crate::state::app::SharedAppState; diff --git a/src/state/app.rs b/src/state/app.rs index 797544f..b71b9ee 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -1,6 +1,6 @@ -use crate::{ config::types::Config, state::backend::BackendState }; +use crate::{config::types::Config, state::backend::BackendState}; use std::sync::Arc; -use std::sync::atomic::{ AtomicUsize, Ordering }; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::RwLock; // Contains all backend servers belonging to a single logical service. #[derive(Debug)] @@ -20,7 +20,7 @@ impl UpstreamPool { return Some(backend); } } - return None; + None } } // Central shared runtime state for the entire load balancer. @@ -43,7 +43,8 @@ impl AppState { // each upstream has an id, algorithm and servers( yes group of servers) // each server has id, host, port, weight - let upstreams = config.upstreams + let upstreams = config + .upstreams .into_iter() .map(|upstream| { // upstream has id, algorithm and servers