Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
- [ ] Refactor duplicated runtime logic
- [ ] Separate balancing module
- [ ] Separate health module
- [*] Improve state ownership model
- [ ] Improve state ownership model

---

Expand Down
2 changes: 1 addition & 1 deletion src/config/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::{Result, bail};
use std::collections::HashSet;

// The load balancer follows a fail-fast startup philosophy.
// Invalid topology or malformed configuration should prevent startup
// Invalid configuration should prevent startup
// rather than causing runtime instability.

pub fn validate_config(config: &Config) -> Result<()> {
Expand Down
18 changes: 11 additions & 7 deletions src/health/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ pub async fn check_backend_status(backend: &BackendState) -> Result<()> {

pub async fn start_health_checker(state: SharedAppState, interval_secs: u64) {
loop {
let state = state.read().await;
for upstream in &state.upstreams {
for backend in &upstream.backends {
let _ = check_backend_status(backend).await;
}
let backends = {
let state = state.read().await;

state
.upstreams
.iter()
.flat_map(|upstream| upstream.backends.clone())
.collect::<Vec<_>>()
};
for backend in backends {
let _ = check_backend_status(&backend).await;
}

// releasing the lock before going to sleep ..
drop(state);
sleep(Duration::from_secs(interval_secs)).await;
}
}
10 changes: 8 additions & 2 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::state::app::SharedAppState;
use crate::state::backend::ConnectionGuard;
use std::time::Duration;
use std::{collections::HashSet, time::Duration};
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -32,6 +32,8 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) ->
(state.retry_attempts, state.connect_timeout, state.idle_timeout)
};

let mut attempted_backends = HashSet::new();

// retry attempts mean how many times we should try
// connecting the client to a suitable backend server
//
Expand All @@ -55,6 +57,9 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) ->
// format!("{}:{}", backend.config.host, backend.config.port)
};
let backend_address = guard.address();
if attempted_backends.contains(guard.backend_id()) {
continue;
}

info!("forwarding traffic to {}", backend_address);

Expand All @@ -64,7 +69,8 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) ->
}
Err(error) => {
guard.mark_backend_unhealthy();
error!("backend {} failed: {:?}", backend_address, error);
error!(backend_id = %guard.backend_id(),backend = %backend_address,attempt = attempted_backends.len() + 1,"backend request failed: {:?}",error);
attempted_backends.insert(guard.backend_id().to_string());
continue;
}
}
Expand Down
Loading