Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/health/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod tcp;
41 changes: 41 additions & 0 deletions src/health/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{sync::atomic::Ordering, time::Duration};

use crate::state::{app::SharedAppState, backend::BackendState};
use anyhow::Result;
use tokio::{net::TcpStream, time::sleep};
use tracing::info;

// This will evolve later into:
// - retries
// - thresholds
// - latency tracking
// - richer health states
pub async fn check_backend_status(backend: &BackendState) -> Result<()> {
let backend_address = { format!("{}:{}", backend.config.host, backend.config.port) };

match TcpStream::connect(&backend_address).await {
Ok(_) => {
backend.healthy.store(true, Ordering::Relaxed);
info!("backend {} healthy", backend.config.id);
}
Err(_) => {
backend.healthy.store(false, Ordering::Relaxed);
info!("backend {} unreachable", backend.config.id);
}
}

Ok(())
}

pub async fn start_health_checker(state: SharedAppState) {
loop {
let state = state.read().await;
for upstream in &state.upstreams {
for backend in &upstream.backends {
let _ = check_backend_status(backend).await;
}
}
drop(state);
sleep(Duration::from_secs(5)).await;
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod algorithms;
pub mod common;
pub mod config;
pub mod health;
pub mod proxy;
pub mod state;
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::{Result, bail};
use laminar::{
config::{loader::load_config, validator::validate_config},
health::tcp::start_health_checker,
proxy::tcp::start_tcp_proxy,
state::app::{AppState, SharedAppState},
};
Expand Down Expand Up @@ -39,6 +40,13 @@ async fn main() -> Result<()> {
// for upstream in &state.upstreams {
// info!("upstream '{}' initialized with {} backends", upstream.id, upstream.backends.len());
// }

let health_state = shared_state.clone();

tokio::spawn(async move {
start_health_checker(health_state).await;
});

let listener_address = format!("{listener_host}:{listener_port}");

start_tcp_proxy(&listener_address, shared_state).await?;
Expand Down
10 changes: 7 additions & 3 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh
let backend_address = {
let state = state.read().await;
let upstream = &state.upstreams[0];
let backend = upstream.next_backend();
let backend = match upstream.next_backend() {
Some(backend) => backend,
None => {
error!("no healthy backend available");
return Ok(());
}
};
format!("{}:{}", backend.config.host, backend.config.port)
};

info!("forwarding traffic to {}", backend_address);

let mut backend_stream = TcpStream::connect(&backend_address).await?;

drop(state);

copy_bidirectional(&mut stream, &mut backend_stream).await?;

Ok(())
Expand Down
12 changes: 9 additions & 3 deletions src/state/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ pub struct UpstreamPool {

impl UpstreamPool {
// Very naive round robin.
pub fn next_backend(&self) -> &BackendState {
let index = self.current_index.fetch_add(1, Ordering::Relaxed);
&self.backends[index % self.backends.len()]
pub fn next_backend(&self) -> Option<&BackendState> {
for _ in 0..self.backends.len() {
let index = self.current_index.fetch_add(1, Ordering::Relaxed);
let backend = &self.backends[index % self.backends.len()];
if backend.healthy.load(Ordering::Relaxed) {
return Some(backend);
}
}
None
}
}
// Central shared runtime state for the entire load balancer.
Expand Down
11 changes: 9 additions & 2 deletions src/state/backend.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicBool;

// use std::sync::Arc;
// use tokio::sync::RwLock;
use crate::config::BackendServerConfig;
Expand All @@ -13,7 +15,7 @@ pub struct BackendState {
// Temporary boolean health indicator.
// This will later evolve into a richer health state machine:
// Healthy -> Unhealthy -> Recovering
pub healthy: bool,
pub healthy: AtomicBool,

// This becomes important for least-connections balancing.
pub active_connections: usize,
Expand All @@ -24,6 +26,11 @@ pub struct BackendState {

impl BackendState {
pub fn new(config: BackendServerConfig) -> Self {
Self { config, healthy: true, active_connections: 0, failed_health_checks: 0 }
Self {
config,
healthy: AtomicBool::new(true),
active_connections: 0,
failed_health_checks: 0,
}
}
}
Loading