Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Ok;
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -28,26 +27,52 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re
}

async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> {
let guard = {
let retry_attempt = {
let state = state.read().await;
let upstream = &state.upstreams[0];
let backend_arc = match upstream.next_backend() {
Some(backend) => backend.clone(),
None => {
error!("no healthy backend available");
return Ok(());
}
};
ConnectionGuard::new(backend_arc)
// format!("{}:{}", backend.config.host, backend.config.port)
state.retry_attempts
};
let backend_address = guard.address();

info!("forwarding traffic to {}", backend_address);
// retry attempts mean how many times we should try
// connecting the client to a suitable backend server
//
// if a backend is available:
// route the traffic normally
//
// if connection fails:
// mark that backend unhealthy so future selections skip it
for _ in 0..retry_attempt {
let guard = {
let state = state.read().await;
let upstream = &state.upstreams[0];
let backend_arc = match upstream.next_backend() {
Some(backend) => backend,
None => {
error!("no healthy backend available");
return Ok(());
}
};
ConnectionGuard::new(backend_arc)
// format!("{}:{}", backend.config.host, backend.config.port)
};
let backend_address = guard.address();

let mut backend_stream = TcpStream::connect(&backend_address).await?;
info!("forwarding traffic to {}", backend_address);

copy_bidirectional(&mut stream, &mut backend_stream).await?;
match TcpStream::connect(&backend_address).await {
Ok(mut backend_stream) => {
copy_bidirectional(&mut stream, &mut backend_stream).await?;
return Ok(());
}
Err(error) => {
// this is very important ..
guard.mark_backend_unhealthy();
error!("failed to connect to backend {}: {:?}", backend_address, error);

// retry another
continue;
}
}
}
error!("all backend retry attempts failed");
Ok(())
}
4 changes: 3 additions & 1 deletion src/state/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl UpstreamPool {
// - admin APIs
#[derive(Debug)]
pub struct AppState {
pub retry_attempts: usize,
pub upstreams: Vec<UpstreamPool>,
}

Expand All @@ -63,12 +64,13 @@ impl AppState {
id: upstream.id,
current_index: AtomicUsize::new(0),
algorithm: upstream.algorithm,

backends, // all backends belonging to a single upstream type ( single logical service)
}
})
.collect();

Self { upstreams }
Self { upstreams, retry_attempts: config.load_balancer.retry_attempts }
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/state/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ impl ConnectionGuard {
Self { backend }
}

pub fn backend_id(&self) -> &str {
&self.backend.config.id
}
// get the address from the guard
pub fn address(&self) -> String {
format!("{}:{}", self.backend.config.host, self.backend.config.port)
}
pub fn mark_backend_unhealthy(&self) {
self.backend.healthy.store(false, Ordering::Relaxed);
}
}

impl Drop for ConnectionGuard {
Expand Down
Loading