Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/algorithms/least_connections.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::{Arc, atomic::Ordering};

use crate::state::backend::BackendState;
use std::sync::{Arc, atomic::Ordering};

pub fn select_backend(backends: &[Arc<BackendState>]) -> Option<Arc<BackendState>> {
// prev and curr approach
Expand Down
3 changes: 2 additions & 1 deletion src/config/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ load_balancer:
retry_attempts: 2
sticky_sessions: false
health_check_interval_secs: 5

connect_timeout_secs: 3
idle_timeout_secs: 30
upstreams:
- id: "main"
algorithm: "round_robin"
Expand Down
1 change: 0 additions & 1 deletion src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub struct ServerConfig {
pub struct LoadBalancerConfig {
pub retry_attempts: usize,
pub sticky_sessions: bool,

pub health_check_interval_secs: u64,
pub connect_timeout_secs: u64,
pub idle_timeout_secs: u64,
Expand Down
8 changes: 8 additions & 0 deletions src/config/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ use std::collections::HashSet;
// rather than causing runtime instability.

pub fn validate_config(config: &Config) -> Result<()> {
if config.load_balancer.connect_timeout_secs == 0 {
bail!("connect_timeout_secs must be greater than 0");
}

if config.load_balancer.idle_timeout_secs == 0 {
bail!("idle_timeout_secs must be greater than 0");
}

let mut upstream_ids = HashSet::new();
for upstream in &config.upstreams {
if upstream.servers.is_empty() {
Expand Down
54 changes: 29 additions & 25 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::state::app::SharedAppState;
use crate::state::backend::ConnectionGuard;
use std::time::Duration;
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream},
time::timeout,
};
use tracing::{error, info};

use crate::state::app::SharedAppState;
use crate::state::backend::ConnectionGuard;

pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let listener = TcpListener::bind(address).await?;

Expand All @@ -18,7 +18,6 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re

info!("new client connected {}", client_address);
let state = state.clone();

tokio::spawn(async move {
if let Err(error) = handle_connection(client_stream, state).await {
error!("connection handling failed {:?}", error)
Expand Down Expand Up @@ -59,34 +58,39 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh

info!("forwarding traffic to {}", backend_address);

match timeout(connect_timeout, TcpStream::connect(&backend_address)).await {
Ok(Ok(mut backend_stream)) => {
if timeout(idle_timeout, copy_bidirectional(&mut stream, &mut backend_stream))
.await
.is_err()
{
error!("connection with {} timed out (idle)", backend_address);
}
match proxy_connection(&mut stream, &backend_address, connect_timeout, idle_timeout).await {
Ok(_) => {
return Ok(());
}
Ok(Err(error)) => {
// this is very important ..
guard.mark_backend_unhealthy();
error!("failed to connect to backend {}: {:?}", backend_address, error);

// retry another
continue;
}
Err(_) => {
// connection attempt timed out
Err(error) => {
guard.mark_backend_unhealthy();
error!("connection attempt to {} timed out", backend_address);

// retry another
error!("backend {} failed: {:?}", backend_address, error);
continue;
}
}
}
error!("all backend retry attempts failed");
Ok(())
}

async fn proxy_connection(
client_stream: &mut TcpStream,
backend_address: &str,
connect_timeout: Duration,
idle_timeout: Duration,
) -> anyhow::Result<()> {
let mut backend_stream =
timeout(connect_timeout, TcpStream::connect(backend_address)).await??;

match timeout(idle_timeout, copy_bidirectional(client_stream, &mut backend_stream)).await {
Ok(Ok(_)) => Ok(()),

Ok(Err(error)) => {
anyhow::bail!("proxy IO error: {error}");
}

Err(_) => {
anyhow::bail!("connection idle timeout");
}
}
}
36 changes: 36 additions & 0 deletions tests/connect_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use laminar::{
config::types::BackendServerConfig,
state::backend::{BackendState, ConnectionGuard},
};
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Duration,
};
use tokio::{net::TcpStream, time::timeout};

#[tokio::test]
async fn marks_backend_unhealthy_on_connect_timeout() {
let backend = Arc::new(BackendState {
config: BackendServerConfig {
id: "dead-backend".into(),
host: "10.255.255.1".into(),
port: 1234,
weight: 1,
},

healthy: AtomicBool::new(true),
active_connections: AtomicUsize::new(0),
failed_health_checks: 0,
});

let guard = ConnectionGuard::new(backend.clone());
let address = guard.address();
let result = timeout(Duration::from_millis(100), TcpStream::connect(address)).await;
assert!(result.is_err());

guard.mark_backend_unhealthy();
assert!(!backend.healthy.load(Ordering::Relaxed));
}
Loading