Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ tracing = "0.1.44"
tracing-subscriber = {version="0.3.23", features = ["json"]}
uuid = {version="1.23.2",features=["v4"]}



[dev-dependencies]
tempfile = "3.27.0"
16 changes: 8 additions & 8 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- [x] Define configuration format
- [x] Create runtime AppState
- [x] Define backend configuration model
- [ ] Setup graceful shutdown handling
- [x] Setup graceful shutdown handling

---

Expand Down Expand Up @@ -140,8 +140,8 @@

## Weighted Balancing

- [ ] Weighted Round Robin
- [ ] Backend weights in config
- [x] Weighted Round Robin
- [x] Backend weights in config
- [ ] Dynamic weight updates

---
Expand Down Expand Up @@ -180,16 +180,16 @@
- [x] Add draining backend state
- [x] Stop routing new connections
- [x] Wait for active connections
- [ ] Graceful backend removal
- [x] Graceful backend removal

---

## Dynamic Config Reloading

- [ ] Watch configuration file
- [ ] Reload backend configuration
- [ ] Preserve active connections
- [ ] Runtime backend updates
- [x] Watch configuration file (manual reload api semantics implemented)
- [x] Reload backend configuration
- [x] Preserve active connections
- [x] Runtime backend updates

---

Expand Down
81 changes: 79 additions & 2 deletions src/admin/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{metrics::registry::gather_metrics, state::app::SharedAppState};
use crate::{
admin::reload::reload_config, metrics::registry::gather_metrics, state::app::SharedAppState,
};
use axum::{
Json, Router,
extract::{Path, State},
Expand Down Expand Up @@ -31,6 +33,31 @@ struct MetricsResponse {
upstreams: Vec<UpstreamMetrics>,
}

#[derive(Serialize)]
struct BackendStatus {
id: String,
healthy: bool,
draining: bool,
weight: usize,
active_connections: usize,
total_requests: usize,
failed_requests: usize,
}

#[derive(Serialize)]
struct UpstreamStatus {
id: String,
algorithm: String,
backend_count: usize,
weighted_backend_count: usize,
backends: Vec<BackendStatus>,
}

#[derive(Serialize)]
struct StatusResponse {
upstreams: Vec<UpstreamStatus>,
}

async fn prometheus_handler() -> String {
gather_metrics()
}
Expand Down Expand Up @@ -76,7 +103,6 @@ async fn drain_backend_handler(
for backend in &upstream.backends {
if backend.config.id == id {
backend.mark_draining();

tracing::info!(
backend_id = %id,
"backend marked as draining"
Expand All @@ -90,11 +116,62 @@ async fn drain_backend_handler(
format!("backend '{id}' not found")
}

async fn reload_handler(State(state): State<SharedAppState>) -> String {
match reload_config(state).await {
Ok(_) => "config reloaded".into(),

Err(error) => {
tracing::error!(
error = %error,
"config reload failed"
);

format!("reload failed: {error}")
}
}
}

async fn status_handler(State(state): State<SharedAppState>) -> Json<StatusResponse> {
let state = state.read().await;

let upstreams = state
.upstreams
.iter()
.map(|upstream| {
let backends = upstream
.backends
.iter()
.map(|backend| BackendStatus {
id: backend.config.id.clone(),
healthy: backend.healthy.load(Ordering::Relaxed),
draining: backend.draining.load(Ordering::Relaxed),
weight: backend.config.weight,
active_connections: backend.active_connections.load(Ordering::Relaxed),
total_requests: backend.total_requests.load(Ordering::Relaxed),
failed_requests: backend.failed_requests.load(Ordering::Relaxed),
})
.collect();

UpstreamStatus {
id: upstream.id.clone(),
algorithm: format!("{:?}", upstream.algorithm),
backend_count: upstream.backends.len(),
weighted_backend_count: upstream.weighted_backends.len(),
backends,
}
})
.collect();

Json(StatusResponse { upstreams })
}

pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let app = Router::new()
.route("/metrics", get(metrics_handler))
.route("/backend/{id}/drain", post(drain_backend_handler))
.route("/prometheus", get(prometheus_handler))
.route("/reload", post(reload_handler))
.route("/status", get(status_handler))
.with_state(state);
let listener = TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
Expand Down
1 change: 1 addition & 0 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod http;
pub mod reload;
89 changes: 89 additions & 0 deletions src/admin/reload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::{Arc, atomic::AtomicUsize};

use anyhow::Result;

use crate::{
config::{loader::load_config, validator::validate_config},
state::{
app::{SharedAppState, UpstreamPool},
backend::BackendState,
},
};

pub async fn reload_config(state: SharedAppState) -> Result<()> {
let config_path = {
let state = state.read().await;
state.config_path.clone()
};

let config = load_config(&config_path)?;
validate_config(&config)?;
let mut state = state.write().await;

for new_upstream in config.upstreams {
let existing_upstream = state.upstreams.iter_mut().find(|u| u.id == new_upstream.id);
match existing_upstream {
Some(upstream) => {
for server in &new_upstream.servers {
let exists = upstream.backends.iter().any(|b| b.config.id == server.id);

if !exists {
tracing::info!(
backend_id = %server.id,
"adding new backend"
);
upstream.backends.push(Arc::new(BackendState::new(server.clone())));
}
}

for backend in &upstream.backends {
let still_exists =
new_upstream.servers.iter().any(|s| s.id == backend.config.id);

if !still_exists {
backend.mark_draining();
tracing::info!(
backend_id =
%backend.config.id,
"backend marked draining during reload"
);
}
}
upstream.rebuild_weighted_backends();
}

None => {
tracing::info!(
upstream_id = %new_upstream.id,
"adding new upstream"
);

let backends = new_upstream
.servers
.into_iter()
.map(|server| Arc::new(BackendState::new(server)))
.collect();

let mut upstream_pool = UpstreamPool {
id: new_upstream.id,

current_index: AtomicUsize::new(0),

algorithm: new_upstream.algorithm,

backends,

weighted_backends: Vec::new(),
};

upstream_pool.rebuild_weighted_backends();

state.upstreams.push(upstream_pool);
}
}
}

tracing::info!("runtime config reloaded");

Ok(())
}
3 changes: 2 additions & 1 deletion src/algorithms/least_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ pub fn select_backend(backends: &[Arc<BackendState>]) -> Option<Arc<BackendState

for backend in backends {
//check if this backend is healthy
if !backend.healthy.load(Ordering::Relaxed) || backend.draining.load(Ordering::Relaxed) {
// yes it is NOT
if !backend.is_routable() {
continue;
}

Expand Down
1 change: 1 addition & 0 deletions src/algorithms/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod least_connections;
pub mod round_robin;
pub mod weighted_round_robin;
2 changes: 1 addition & 1 deletion src/algorithms/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn select_backend(
for _ in 0..backends.len() {
let index = current_index.fetch_add(1, Ordering::Relaxed);
let backend = &backends[index % backends.len()];
if backend.healthy.load(Ordering::Relaxed) && !backend.draining.load(Ordering::Relaxed) {
if backend.is_routable() {
return Some(backend.clone());
}
}
Expand Down
25 changes: 25 additions & 0 deletions src/algorithms/weighted_round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};

use crate::state::backend::BackendState;

pub fn select_backend(
weighted_backends: &[Arc<BackendState>],
current_index: &AtomicUsize,
) -> Option<Arc<BackendState>> {
let routable = weighted_backends
.iter()
.filter(|backend| backend.is_routable())
.cloned()
.collect::<Vec<_>>();

if routable.is_empty() {
return None;
}

let index = current_index.fetch_add(1, Ordering::Relaxed);

Some(routable[index % routable.len()].clone())
}
2 changes: 1 addition & 1 deletion src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct LoadBalancerConfig {
// Static backend server definition loaded from configuration.
// This only contains immutable backend metadata.
// Live runtime information is tracked separately in "BackendState".
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct BackendServerConfig {
pub id: String,
pub host: String,
Expand Down
Loading
Loading