Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ The internal architecture now more closely resembles real-world proxy and load b
- Added automatic unhealthy backend skipping during selection
- Added configurable health check intervals through YAML config
- Added graceful handling when no healthy backends are available
- Added RAII-based active connection tracking using `ConnectionGuard`
- Added thread-safe connection metrics using `AtomicUsize`
- Added integration tests for:
- round robin balancing
- unhealthy backend filtering
Expand All @@ -104,6 +106,8 @@ The internal architecture now more closely resembles real-world proxy and load b
- backend selection
- backend connection
- traffic forwarding
- Refactored state ownership to use `Arc<BackendState>` for shared runtime access
- Moved connection cleanup logic to `Drop` implementation for guaranteed decrementing

### Notes

Expand Down
8 changes: 4 additions & 4 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@

## Connection Tracking

* [ ] Track active connections per backend
* [ ] Increment on connect
* [ ] Decrement on disconnect
* [*] Track active connections per backend
* [*] Increment on connect
* [*] Decrement on disconnect
* [ ] Track total requests

---
Expand Down Expand Up @@ -129,7 +129,7 @@
* [ ] Refactor duplicated runtime logic
* [ ] Separate balancing module
* [ ] Separate health module
* [ ] Improve state ownership model
* [*] Improve state ownership model

---

Expand Down
11 changes: 7 additions & 4 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::{
use tracing::{error, info};

use crate::state::app::SharedAppState;
use crate::state::backend::ConnectionGuard;

pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let listener = TcpListener::bind(address).await?;
Expand All @@ -27,18 +28,20 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re
}

async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> {
let backend_address = {
let guard = {
let state = state.read().await;
let upstream = &state.upstreams[0];
let backend = match upstream.next_backend() {
Some(backend) => backend,
let backend_arc = match upstream.next_backend() {
Some(backend) => backend.clone(),
None => {
error!("no healthy backend available");
return Ok(());
}
};
format!("{}:{}", backend.config.host, backend.config.port)
ConnectionGuard::new(backend_arc)
// format!("{}:{}", backend.config.host, backend.config.port)
};
let backend_address = guard.address();

info!("forwarding traffic to {}", backend_address);

Expand Down
9 changes: 5 additions & 4 deletions src/state/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ use tokio::sync::RwLock;
pub struct UpstreamPool {
pub id: String,
pub current_index: AtomicUsize,
pub backends: Vec<BackendState>,
pub backends: Vec<Arc<BackendState>>,
}

impl UpstreamPool {
// Very naive round robin.
pub fn next_backend(&self) -> Option<&BackendState> {
pub fn next_backend(&self) -> Option<Arc<BackendState>> {
for _ in 0..self.backends.len() {
let index = self.current_index.fetch_add(1, Ordering::Relaxed);
let backend = &self.backends[index % self.backends.len()];
if backend.healthy.load(Ordering::Relaxed) {
return Some(backend);
return Some(backend.clone());
}
}
None
Expand Down Expand Up @@ -50,7 +50,8 @@ impl AppState {
// upstream has id, algorithm and servers
// BackendState.config is of same type as a server

let backends = upstream.servers.into_iter().map(BackendState::new).collect();
let backends =
upstream.servers.into_iter().map(|s| Arc::new(BackendState::new(s))).collect();

UpstreamPool {
id: upstream.id,
Expand Down
32 changes: 29 additions & 3 deletions src/state/backend.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

// use std::sync::Arc;
// use tokio::sync::RwLock;
use crate::config::BackendServerConfig;

pub struct ConnectionGuard {
// We hold an Arc so backend state stays alive
// for the lifetime of the connection.
backend: Arc<BackendState>,
}

// Mutable runtime representation of a backend server.
// - immutable backend configuration
// - mutable runtime health/connection state
Expand All @@ -18,18 +25,37 @@ pub struct BackendState {
pub healthy: AtomicBool,

// This becomes important for least-connections balancing.
pub active_connections: usize,
pub active_connections: AtomicUsize,
pub failed_health_checks: usize,
}

impl ConnectionGuard {
pub fn new(backend: Arc<BackendState>) -> Self {
// Increment immediately upon creation
backend.active_connections.fetch_add(1, Ordering::Relaxed);
Self { backend }
}

// get the address from the guard
pub fn address(&self) -> String {
format!("{}:{}", self.backend.config.host, self.backend.config.port)
}
}

impl Drop for ConnectionGuard {
fn drop(&mut self) {
self.backend.active_connections.fetch_sub(1, Ordering::Relaxed);
}
}

// pub type SharedBackendState = Arc<RwLock<BackendState>>;

impl BackendState {
pub fn new(config: BackendServerConfig) -> Self {
Self {
config,
healthy: AtomicBool::new(true),
active_connections: 0,
active_connections: AtomicUsize::new(0),
failed_health_checks: 0,
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/health_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn create_backend(port: u16) -> BackendState {

healthy: AtomicBool::new(false),

active_connections: 0,
active_connections: 0.into(),

failed_health_checks: 0,
}
Expand Down
11 changes: 7 additions & 4 deletions tests/health_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState {

healthy: AtomicBool::new(healthy),

active_connections: 0,
active_connections: 0.into(),

failed_health_checks: 0,
}
Expand All @@ -29,7 +29,10 @@ fn unhealthy_backend_is_skipped() {

current_index: (0).into(),

backends: vec![create_backend("dead", 9001, false), create_backend("healthy", 9002, true)],
backends: vec![
create_backend("dead", 9001, false).into(),
create_backend("healthy", 9002, true).into(),
],
};

let backend = upstream.next_backend().unwrap();
Expand All @@ -45,8 +48,8 @@ fn returns_none_when_all_backends_dead() {
current_index: (0).into(),

backends: vec![
create_backend("dead-1", 9001, false),
create_backend("dead-2", 9002, false),
create_backend("dead-1", 9001, false).into(),
create_backend("dead-2", 9002, false).into(),
],
};

Expand Down
7 changes: 5 additions & 2 deletions tests/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn create_backend(id: &str, port: u16) -> BackendState {

healthy: AtomicBool::new(true),

active_connections: 0,
active_connections: 0.into(),

failed_health_checks: 0,
}
Expand All @@ -29,7 +29,10 @@ fn round_robin_rotates_backends() {

current_index: (0).into(),

backends: vec![create_backend("server-1", 9001), create_backend("server-2", 9002)],
backends: vec![
create_backend("server-1", 9001).into(),
create_backend("server-2", 9002).into(),
],
};

let first = upstream.next_backend().unwrap();
Expand Down
Loading