From 7e473cb95c3a7b6bf3f6ba14349cefc8395bb9c8 Mon Sep 17 00:00:00 2001 From: pie-314 Date: Fri, 15 May 2026 20:20:51 +0530 Subject: [PATCH] added RAII based system tracking --- CHANGELOG.md | 4 ++++ ROADMAP.md | 8 ++++---- src/proxy/tcp.rs | 11 +++++++---- src/state/app.rs | 9 +++++---- src/state/backend.rs | 32 +++++++++++++++++++++++++++++--- tests/health_checker.rs | 2 +- tests/health_selection.rs | 11 +++++++---- tests/round_robin.rs | 7 +++++-- 8 files changed, 62 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 154ed23..68e3e39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,8 @@ The internal architecture now more closely resembles real-world proxy and load b - Added automatic unhealthy backend skipping during selection - Added configurable health check intervals through YAML config - Added graceful handling when no healthy backends are available +- Added RAII-based active connection tracking using `ConnectionGuard` +- Added thread-safe connection metrics using `AtomicUsize` - Added integration tests for: - round robin balancing - unhealthy backend filtering @@ -104,6 +106,8 @@ The internal architecture now more closely resembles real-world proxy and load b - backend selection - backend connection - traffic forwarding +- Refactored state ownership to use `Arc` for shared runtime access +- Moved connection cleanup logic to `Drop` implementation for guaranteed decrementing ### Notes diff --git a/ROADMAP.md b/ROADMAP.md index 68891a7..d8e103c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -90,9 +90,9 @@ ## Connection Tracking -* [ ] Track active connections per backend -* [ ] Increment on connect -* [ ] Decrement on disconnect +* [*] Track active connections per backend +* [*] Increment on connect +* [*] Decrement on disconnect * [ ] Track total requests --- @@ -129,7 +129,7 @@ * [ ] Refactor duplicated runtime logic * [ ] Separate balancing module * [ ] Separate health module -* [ ] Improve state ownership model +* [*] Improve state ownership model --- diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index 1999dec..0bd013c 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -6,6 +6,7 @@ use tokio::{ use tracing::{error, info}; use crate::state::app::SharedAppState; +use crate::state::backend::ConnectionGuard; pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Result<()> { let listener = TcpListener::bind(address).await?; @@ -27,18 +28,20 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re } async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> { - let backend_address = { + let guard = { let state = state.read().await; let upstream = &state.upstreams[0]; - let backend = match upstream.next_backend() { - Some(backend) => backend, + let backend_arc = match upstream.next_backend() { + Some(backend) => backend.clone(), None => { error!("no healthy backend available"); return Ok(()); } }; - format!("{}:{}", backend.config.host, backend.config.port) + ConnectionGuard::new(backend_arc) + // format!("{}:{}", backend.config.host, backend.config.port) }; + let backend_address = guard.address(); info!("forwarding traffic to {}", backend_address); diff --git a/src/state/app.rs b/src/state/app.rs index b71b9ee..1216c90 100644 --- a/src/state/app.rs +++ b/src/state/app.rs @@ -7,17 +7,17 @@ use tokio::sync::RwLock; pub struct UpstreamPool { pub id: String, pub current_index: AtomicUsize, - pub backends: Vec, + pub backends: Vec>, } impl UpstreamPool { // Very naive round robin. - pub fn next_backend(&self) -> Option<&BackendState> { + pub fn next_backend(&self) -> Option> { for _ in 0..self.backends.len() { let index = self.current_index.fetch_add(1, Ordering::Relaxed); let backend = &self.backends[index % self.backends.len()]; if backend.healthy.load(Ordering::Relaxed) { - return Some(backend); + return Some(backend.clone()); } } None @@ -50,7 +50,8 @@ impl AppState { // upstream has id, algorithm and servers // BackendState.config is of same type as a server - let backends = upstream.servers.into_iter().map(BackendState::new).collect(); + let backends = + upstream.servers.into_iter().map(|s| Arc::new(BackendState::new(s))).collect(); UpstreamPool { id: upstream.id, diff --git a/src/state/backend.rs b/src/state/backend.rs index 1a8a128..b63c927 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -1,9 +1,16 @@ -use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; // use std::sync::Arc; // use tokio::sync::RwLock; use crate::config::BackendServerConfig; +pub struct ConnectionGuard { + // We hold an Arc so backend state stays alive + // for the lifetime of the connection. + backend: Arc, +} + // Mutable runtime representation of a backend server. // - immutable backend configuration // - mutable runtime health/connection state @@ -18,10 +25,29 @@ pub struct BackendState { pub healthy: AtomicBool, // This becomes important for least-connections balancing. - pub active_connections: usize, + pub active_connections: AtomicUsize, pub failed_health_checks: usize, } +impl ConnectionGuard { + pub fn new(backend: Arc) -> Self { + // Increment immediately upon creation + backend.active_connections.fetch_add(1, Ordering::Relaxed); + Self { backend } + } + + // get the address from the guard + pub fn address(&self) -> String { + format!("{}:{}", self.backend.config.host, self.backend.config.port) + } +} + +impl Drop for ConnectionGuard { + fn drop(&mut self) { + self.backend.active_connections.fetch_sub(1, Ordering::Relaxed); + } +} + // pub type SharedBackendState = Arc>; impl BackendState { @@ -29,7 +55,7 @@ impl BackendState { Self { config, healthy: AtomicBool::new(true), - active_connections: 0, + active_connections: AtomicUsize::new(0), failed_health_checks: 0, } } diff --git a/tests/health_checker.rs b/tests/health_checker.rs index a086a37..8b27675 100644 --- a/tests/health_checker.rs +++ b/tests/health_checker.rs @@ -18,7 +18,7 @@ fn create_backend(port: u16) -> BackendState { healthy: AtomicBool::new(false), - active_connections: 0, + active_connections: 0.into(), failed_health_checks: 0, } diff --git a/tests/health_selection.rs b/tests/health_selection.rs index c2065c3..a7ed67d 100644 --- a/tests/health_selection.rs +++ b/tests/health_selection.rs @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState { healthy: AtomicBool::new(healthy), - active_connections: 0, + active_connections: 0.into(), failed_health_checks: 0, } @@ -29,7 +29,10 @@ fn unhealthy_backend_is_skipped() { current_index: (0).into(), - backends: vec![create_backend("dead", 9001, false), create_backend("healthy", 9002, true)], + backends: vec![ + create_backend("dead", 9001, false).into(), + create_backend("healthy", 9002, true).into(), + ], }; let backend = upstream.next_backend().unwrap(); @@ -45,8 +48,8 @@ fn returns_none_when_all_backends_dead() { current_index: (0).into(), backends: vec![ - create_backend("dead-1", 9001, false), - create_backend("dead-2", 9002, false), + create_backend("dead-1", 9001, false).into(), + create_backend("dead-2", 9002, false).into(), ], }; diff --git a/tests/round_robin.rs b/tests/round_robin.rs index 153e1e8..a14fe07 100644 --- a/tests/round_robin.rs +++ b/tests/round_robin.rs @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16) -> BackendState { healthy: AtomicBool::new(true), - active_connections: 0, + active_connections: 0.into(), failed_health_checks: 0, } @@ -29,7 +29,10 @@ fn round_robin_rotates_backends() { current_index: (0).into(), - backends: vec![create_backend("server-1", 9001), create_backend("server-2", 9002)], + backends: vec![ + create_backend("server-1", 9001).into(), + create_backend("server-2", 9002).into(), + ], }; let first = upstream.next_backend().unwrap();