diff --git a/CHANGELOG.md b/CHANGELOG.md index 68e3e39..cfa0fe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,3 +137,29 @@ This creates a clean foundation for future improvements such as: - latency-aware health scoring - parallelized health probes - advanced balancing strategies + +--- + +## 2026-05-16 + +### Added + +- Added pluggable balancing algorithm structure +- Added least-connections load balancing +- Added configurable upstream balancing selection through YAML +- Added dedicated algorithm modules for: + - round robin + - least connections +- Added tests for least-connections backend selection + +### Changed + +- Refactored backend selection logic out of `UpstreamPool` +- Reorganized balancing logic into isolated algorithm modules + +### Notes + +Laminar can now route traffic using runtime-aware balancing strategies instead of fixed request rotation. + +The current implementation intentionally keeps algorithm dispatch simple using enum matching and naive selection logic. +More advanced balancing abstractions and optimizations will evolve later as additional strategies are introduced. diff --git a/ROADMAP.md b/ROADMAP.md index d8e103c..b8d285d 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -4,68 +4,68 @@ ## Project Foundation -* [*] Initialize repository structure -* [*] Setup formatting/linting -* [ ] Setup CI pipeline -* [*] Setup logging framework -* [*] Define configuration format -* [*] Create runtime AppState -* [*] Define backend configuration model -* [ ] Setup graceful shutdown handling +- [x] Initialize repository structure +- [x] Setup formatting/linting +- [x] Setup CI pipeline +- [x] Setup logging framework +- [x] Define configuration format +- [x] Create runtime AppState +- [x] Define backend configuration model +- [ ] Setup graceful shutdown handling --- ## TCP Listener -* [*] Create listening socket -* [*] Bind address/port -* [ ] Configure socket options - * [ ] SO_REUSEADDR - * [ ] TCP_NODELAY - * [ ] Keepalive -* [*] Implement accept loop -* [*] Handle concurrent client connections -* [*] Handle client disconnects -* [ ] Implement connection cleanup +- [x] Create listening socket +- [x] Bind address/port +- [ ] Configure socket options + - [ ] SO_REUSEADDR + - [ ] TCP_NODELAY + - [ ] Keepalive +- [x] Implement accept loop +- [x] Handle concurrent client connections +- [x] Handle client disconnects +- [ ] Implement connection cleanup --- ## TCP Proxying -* [*] Connect to backend server -* [*] Forward client → backend traffic -* [*] Forward backend → client traffic -* [*] Support bidirectional forwarding -* [*] Handle disconnect propagation -* [ ] Add connection timeout handling -* [ ] Add idle timeout handling +- [x] Connect to backend server +- [x] Forward client → backend traffic +- [x] Forward backend → client traffic +- [x] Support bidirectional forwarding +- [x] Handle disconnect propagation +- [ ] Add connection timeout handling +- [ ] Add idle timeout handling --- ## Backend Pool -* [*] Implement backend registry -* [*] Track backend runtime state -* [ ] Add backend availability tracking -* [*] Implement backend selection interface +- [x] Implement backend registry +- [x] Track backend runtime state +- [ ] Add backend availability tracking +- [x] Implement backend selection interface --- ## Round Robin Balancing -* [*] Implement Round Robin -* [ ] Skip unavailable backends -* [ ] Handle empty backend pool safely +- [x] Implement Round Robin +- [x] Skip unavailable backends +- [x] Handle empty backend pool safely --- ## Phase 1 Deliverable -* [*] Functional TCP load balancer -* [*] Concurrent connection support -* [*] Multiple backend support -* [*] Basic balancing -* [*] Structured logs +- [x] Functional TCP load balancer +- [x] Concurrent connection support +- [x] Multiple backend support +- [x] Basic balancing +- [x] Structured logs --- @@ -73,72 +73,72 @@ ## Backend Health Checks -* [ ] Implement active TCP health probes -* [ ] Add healthy/unhealthy backend state -* [ ] Skip unhealthy backends -* [ ] Add health transition logging +- [x] Implement active TCP health probes +- [x] Add healthy/unhealthy backend state +- [x] Skip unhealthy backends +- [x] Add health transition logging --- ## Periodic Health Monitoring -* [ ] Create background health task -* [ ] Add configurable health intervals -* [ ] Add backend recovery detection +- [x] Create background health task +- [x] Add configurable health intervals +- [ ] Add backend recovery detection --- ## Connection Tracking -* [*] Track active connections per backend -* [*] Increment on connect -* [*] Decrement on disconnect -* [ ] Track total requests +- [x] Track active connections per backend +- [x] Increment on connect +- [x] Decrement on disconnect +- [ ] Track total requests --- ## Least Connections Balancing -* [ ] Implement Least Connections algorithm -* [ ] Ignore unhealthy backends -* [ ] Add algorithm abstraction layer +- [x] Implement Least Connections algorithm +- [x] Ignore unhealthy backends +- [x] Add algorithm abstraction layer --- ## Retry Logic -* [ ] Retry failed backend connections -* [ ] Retry next available backend -* [ ] Add retry limits -* [ ] Add retry logging +- [ ] Retry failed backend connections +- [ ] Retry next available backend +- [ ] Add retry limits +- [ ] Add retry logging --- ## Timeout Management -* [ ] Backend connect timeout -* [ ] Idle connection timeout -* [ ] Read timeout -* [ ] Write timeout +- [ ] Backend connect timeout +- [ ] Idle connection timeout +- [ ] Read timeout +- [ ] Write timeout --- ## Runtime State Refactor -* [ ] Reduce lock scope sizes -* [ ] Refactor duplicated runtime logic -* [ ] Separate balancing module -* [ ] Separate health module -* [*] Improve state ownership model +- [ ] Reduce lock scope sizes +- [ ] Refactor duplicated runtime logic +- [ ] Separate balancing module +- [ ] Separate health module +- [*] Improve state ownership model --- ## Phase 2 Deliverable -* [ ] Health-aware balancing -* [ ] Retry support -* [ ] Connection metrics -* [ ] Runtime stability improvements +- [ ] Health-aware balancing +- [ ] Retry support +- [ ] Connection metrics +- [ ] Runtime stability improvements --- @@ -146,72 +146,72 @@ ## Weighted Balancing -* [ ] Weighted Round Robin -* [ ] Backend weights in config -* [ ] Dynamic weight updates +- [ ] Weighted Round Robin +- [ ] Backend weights in config +- [ ] Dynamic weight updates --- ## Metrics -* [ ] Active connection metrics -* [ ] Request counters -* [ ] Failure counters -* [ ] Backend health metrics -* [ ] Throughput metrics +- [ ] Active connection metrics +- [ ] Request counters +- [ ] Failure counters +- [ ] Backend health metrics +- [ ] Throughput metrics --- ## Prometheus Integration -* [ ] Prometheus metrics exporter -* [ ] Metrics endpoint -* [ ] Backend-specific metrics +- [ ] Prometheus metrics exporter +- [ ] Metrics endpoint +- [ ] Backend-specific metrics --- ## Logging Improvements -* [ ] Structured JSON logs -* [ ] Request correlation IDs -* [ ] Retry logging -* [ ] Timeout logging -* [ ] Backend transition logging +- [ ] Structured JSON logs +- [ ] Request correlation IDs +- [ ] Retry logging +- [ ] Timeout logging +- [ ] Backend transition logging --- ## Graceful Backend Draining -* [ ] Add draining backend state -* [ ] Stop routing new connections -* [ ] Wait for active connections -* [ ] Graceful backend removal +- [ ] Add draining backend state +- [ ] Stop routing new connections +- [ ] Wait for active connections +- [ ] Graceful backend removal --- ## Dynamic Config Reloading -* [ ] Watch configuration file -* [ ] Reload backend configuration -* [ ] Preserve active connections -* [ ] Runtime backend updates +- [ ] Watch configuration file +- [ ] Reload backend configuration +- [ ] Preserve active connections +- [ ] Runtime backend updates --- ## Admin API -* [ ] Add runtime status endpoint -* [ ] Add backend health endpoint -* [ ] Add backend enable/disable API -* [ ] Add metrics endpoint +- [ ] Add runtime status endpoint +- [ ] Add backend health endpoint +- [ ] Add backend enable/disable API +- [ ] Add metrics endpoint --- ## Phase 3 Deliverable -* [ ] Runtime configurability -* [ ] Operational observability -* [ ] Graceful backend management +- [ ] Runtime configurability +- [ ] Operational observability +- [ ] Graceful backend management --- @@ -219,101 +219,101 @@ ## Event-Driven Runtime -* [ ] Integrate epoll -* [ ] Add edge-triggered events -* [ ] Implement event batching -* [ ] Add worker thread model +- [ ] Integrate epoll +- [ ] Add edge-triggered events +- [ ] Implement event batching +- [ ] Add worker thread model --- ## Memory & Buffer Optimization -* [ ] Buffer pooling -* [ ] Reduce allocations -* [ ] Optimize stream forwarding -* [ ] Reduce lock contention +- [ ] Buffer pooling +- [ ] Reduce allocations +- [ ] Optimize stream forwarding +- [ ] Reduce lock contention --- ## Advanced Algorithms -* [ ] IP Hash -* [ ] Consistent Hashing -* [ ] Power of Two Choices -* [ ] Least Response Time +- [ ] IP Hash +- [ ] Consistent Hashing +- [ ] Power of Two Choices +- [ ] Least Response Time --- ## TLS Features -* [ ] TLS passthrough -* [ ] SNI inspection -* [ ] Certificate reload support +- [ ] TLS passthrough +- [ ] SNI inspection +- [ ] Certificate reload support --- ## UDP Support -* [ ] UDP listener -* [ ] Datagram forwarding -* [ ] Stateless balancing -* [ ] UDP health checks +- [ ] UDP listener +- [ ] Datagram forwarding +- [ ] Stateless balancing +- [ ] UDP health checks --- ## Advanced Observability -* [ ] Grafana dashboards -* [ ] OpenTelemetry support -* [ ] Distributed tracing +- [ ] Grafana dashboards +- [ ] OpenTelemetry support +- [ ] Distributed tracing --- ## Benchmarking -* [ ] wrk benchmarks -* [ ] iperf benchmarks -* [ ] Concurrent connection stress testing -* [ ] Latency measurements -* [ ] Throughput measurements +- [ ] wrk benchmarks +- [ ] iperf benchmarks +- [ ] Concurrent connection stress testing +- [ ] Latency measurements +- [ ] Throughput measurements --- ## Chaos Testing -* [ ] Backend crash simulation -* [ ] Packet loss simulation -* [ ] High latency simulation -* [ ] High churn testing +- [ ] Backend crash simulation +- [ ] Packet loss simulation +- [ ] High latency simulation +- [ ] High churn testing --- ## Deployment -* [ ] Docker image -* [ ] Static binary builds -* [ ] systemd service -* [ ] Kubernetes manifests -* [ ] CI/CD pipeline +- [ ] Docker image +- [ ] Static binary builds +- [ ] systemd service +- [ ] Kubernetes manifests +- [ ] CI/CD pipeline --- ## Phase 4 Deliverable -* [ ] Production-grade runtime -* [ ] High concurrency support -* [ ] Operational tooling -* [ ] Performance optimization +- [ ] Production-grade runtime +- [ ] High concurrency support +- [ ] Operational tooling +- [ ] Performance optimization --- # Future Exploration -* [ ] io_uring -* [ ] eBPF observability -* [ ] XDP packet filtering -* [ ] DPDK experimentation -* [ ] QUIC exploration -* [ ] Plugin system -* [ ] WASM/Lua extensions -* [ ] Distributed load balancing +- [ ] io_uring +- [ ] eBPF observability +- [ ] XDP packet filtering +- [ ] DPDK experimentation +- [ ] QUIC exploration +- [ ] Plugin system +- [ ] WASM/Lua extensions +- [ ] Distributed load balancing diff --git a/src/algorithms/least_connections.rs b/src/algorithms/least_connections.rs new file mode 100644 index 0000000..e5a1482 --- /dev/null +++ b/src/algorithms/least_connections.rs @@ -0,0 +1,33 @@ +use std::sync::{Arc, atomic::Ordering}; + +use crate::state::backend::BackendState; + +pub fn select_backend(backends: &[Arc]) -> Option> { + // prev and curr approach + + let mut selected: Option> = None; + + for backend in backends { + //check if this backend is healthy + if !backend.healthy.load(Ordering::Relaxed) { + continue; + } + + match &selected { + Some(prev) => { + let prev_connections = prev.active_connections.load(Ordering::Relaxed); + + let backend_connections = backend.active_connections.load(Ordering::Relaxed); + + if prev_connections > backend_connections { + selected = Some(backend.clone()); + } + } + None => { + selected = Some(backend.clone()); + } + } + } + + selected +} diff --git a/src/algorithms/mod.rs b/src/algorithms/mod.rs index 8b13789..754c3a9 100644 --- a/src/algorithms/mod.rs +++ b/src/algorithms/mod.rs @@ -1 +1,2 @@ - +pub mod least_connections; +pub mod round_robin; diff --git a/src/algorithms/round_robin.rs b/src/algorithms/round_robin.rs new file mode 100644 index 0000000..349e131 --- /dev/null +++ b/src/algorithms/round_robin.rs @@ -0,0 +1,20 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use crate::state::backend::BackendState; + +pub fn select_backend( + backends: &[Arc], + current_index: &AtomicUsize, +) -> Option> { + for _ in 0..backends.len() { + let index = current_index.fetch_add(1, Ordering::Relaxed); + let backend = &backends[index % backends.len()]; + if backend.healthy.load(Ordering::Relaxed) { + return Some(backend.clone()); + } + } + None +} diff --git a/src/state/app.rs b/src/state/app.rs index 1216c90..8acd020 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -1,26 +1,32 @@ +use crate::algorithms::{least_connections, round_robin}; +use crate::config::LoadBalancingAlgorithm; use crate::{config::types::Config, state::backend::BackendState}; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; use tokio::sync::RwLock; // Contains all backend servers belonging to a single logical service. #[derive(Debug)] pub struct UpstreamPool { pub id: String, pub current_index: AtomicUsize, + pub algorithm: LoadBalancingAlgorithm, pub backends: Vec>, } impl UpstreamPool { // Very naive round robin. pub fn next_backend(&self) -> Option> { - for _ in 0..self.backends.len() { - let index = self.current_index.fetch_add(1, Ordering::Relaxed); - let backend = &self.backends[index % self.backends.len()]; - if backend.healthy.load(Ordering::Relaxed) { - return Some(backend.clone()); + match &self.algorithm { + LoadBalancingAlgorithm::RoundRobin => { + round_robin::select_backend(&self.backends, &self.current_index) + } + LoadBalancingAlgorithm::LeastConnections => { + least_connections::select_backend(&self.backends) + } + _ => { + unimplemented!("algorithm not implemented yet") } } - None } } // Central shared runtime state for the entire load balancer. @@ -56,6 +62,7 @@ impl AppState { UpstreamPool { id: upstream.id, current_index: AtomicUsize::new(0), + algorithm: upstream.algorithm, backends, // all backends belonging to a single upstream type ( single logical service) } }) diff --git a/tests/health_selection.rs b/tests/health_selection.rs index a7ed67d..74f16aa 100644 --- a/tests/health_selection.rs +++ b/tests/health_selection.rs @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState { healthy: AtomicBool::new(healthy), - active_connections: 0.into(), + active_connections: (0).into(), failed_health_checks: 0, } @@ -26,9 +26,8 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState { fn unhealthy_backend_is_skipped() { let upstream = UpstreamPool { id: "main".to_string(), - current_index: (0).into(), - + algorithm: laminar::config::LoadBalancingAlgorithm::LeastConnections, backends: vec![ create_backend("dead", 9001, false).into(), create_backend("healthy", 9002, true).into(), @@ -44,9 +43,8 @@ fn unhealthy_backend_is_skipped() { fn returns_none_when_all_backends_dead() { let upstream = UpstreamPool { id: "main".to_string(), - current_index: (0).into(), - + algorithm: laminar::config::LoadBalancingAlgorithm::LeastConnections, backends: vec![ create_backend("dead-1", 9001, false).into(), create_backend("dead-2", 9002, false).into(), diff --git a/tests/least_connections.rs b/tests/least_connections.rs new file mode 100644 index 0000000..eef6df0 --- /dev/null +++ b/tests/least_connections.rs @@ -0,0 +1,41 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicUsize}, +}; + +use laminar::{ + algorithms::least_connections, config::types::BackendServerConfig, state::backend::BackendState, +}; + +fn create_backend( + id: &str, + port: u16, + healthy: bool, + active_connections: usize, +) -> Arc { + Arc::new(BackendState { + config: BackendServerConfig { + id: id.to_string(), + host: "127.0.0.1".to_string(), + port, + weight: 1, + }, + + healthy: AtomicBool::new(healthy), + active_connections: AtomicUsize::new(active_connections), + failed_health_checks: 0, + }) +} + +#[test] +fn selects_backend_with_fewer_connections() { + let backend_1 = create_backend("server-1", 9001, true, 10); + + let backend_2 = create_backend("server-2", 9002, true, 2); + + let backends = vec![backend_1, backend_2]; + + let selected = least_connections::select_backend(&backends).unwrap(); + + assert_eq!(selected.config.id, "server-2"); +} diff --git a/tests/round_robin.rs b/tests/round_robin.rs index a14fe07..75af3cf 100644 --- a/tests/round_robin.rs +++ b/tests/round_robin.rs @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16) -> BackendState { healthy: AtomicBool::new(true), - active_connections: 0.into(), + active_connections: (0).into(), failed_health_checks: 0, } @@ -26,9 +26,8 @@ fn create_backend(id: &str, port: u16) -> BackendState { fn round_robin_rotates_backends() { let upstream = UpstreamPool { id: "main".to_string(), - current_index: (0).into(), - + algorithm: laminar::config::LoadBalancingAlgorithm::RoundRobin, backends: vec![ create_backend("server-1", 9001).into(), create_backend("server-2", 9002).into(),