Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct LoadBalancerConfig {
pub sticky_sessions: bool,

pub health_check_interval_secs: u64,
pub connect_timeout_secs: u64,
pub idle_timeout_secs: u64,
}

// Static backend server definition loaded from configuration.
Expand Down
26 changes: 20 additions & 6 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream},
time::timeout,
};
use tracing::{error, info};

Expand All @@ -27,9 +28,9 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re
}

async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> {
let retry_attempt = {
let (retry_attempt, connect_timeout, idle_timeout) = {
let state = state.read().await;
state.retry_attempts
(state.retry_attempts, state.connect_timeout, state.idle_timeout)
};

// retry attempts mean how many times we should try
Expand Down Expand Up @@ -58,16 +59,29 @@ async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyh

info!("forwarding traffic to {}", backend_address);

match TcpStream::connect(&backend_address).await {
Ok(mut backend_stream) => {
copy_bidirectional(&mut stream, &mut backend_stream).await?;
match timeout(connect_timeout, TcpStream::connect(&backend_address)).await {
Ok(Ok(mut backend_stream)) => {
if timeout(idle_timeout, copy_bidirectional(&mut stream, &mut backend_stream))
.await
.is_err()
{
error!("connection with {} timed out (idle)", backend_address);
}
return Ok(());
}
Err(error) => {
Ok(Err(error)) => {
// this is very important ..
guard.mark_backend_unhealthy();
error!("failed to connect to backend {}: {:?}", backend_address, error);

// retry another
continue;
}
Err(_) => {
// connection attempt timed out
guard.mark_backend_unhealthy();
error!("connection attempt to {} timed out", backend_address);

// retry another
continue;
}
Expand Down
10 changes: 9 additions & 1 deletion src/state/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::config::LoadBalancingAlgorithm;
use crate::{config::types::Config, state::backend::BackendState};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use tokio::sync::RwLock;
// Contains all backend servers belonging to a single logical service.
#[derive(Debug)]
Expand Down Expand Up @@ -40,6 +41,8 @@ impl UpstreamPool {
pub struct AppState {
pub retry_attempts: usize,
pub upstreams: Vec<UpstreamPool>,
pub connect_timeout: Duration,
pub idle_timeout: Duration,
}

pub type SharedAppState = Arc<RwLock<AppState>>;
Expand Down Expand Up @@ -70,7 +73,12 @@ impl AppState {
})
.collect();

Self { upstreams, retry_attempts: config.load_balancer.retry_attempts }
Self {
upstreams,
retry_attempts: config.load_balancer.retry_attempts,
connect_timeout: Duration::from_secs(config.load_balancer.connect_timeout_secs),
idle_timeout: Duration::from_secs(config.load_balancer.idle_timeout_secs),
}
}
}

Expand Down
Loading