From 65ffca626882225f9d304a067d58fb718e698efa Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 6 Jun 2026 12:26:42 +0530 Subject: [PATCH 1/2] feat: implement backend enable/disable functionality and config watcher --- Cargo.lock | 194 +++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + ROADMAP.md | 13 +-- deny.toml | 4 +- src/admin/http.rs | 50 +++++++++++ src/admin/mod.rs | 1 + src/admin/watcher.rs | 37 +++++++++ src/main.rs | 14 ++++ src/state/backend.rs | 8 ++ 9 files changed, 308 insertions(+), 14 deletions(-) create mode 100644 src/admin/watcher.rs diff --git a/Cargo.lock b/Cargo.lock index 3e10361..3e0a413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,7 +103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -133,6 +133,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.32" @@ -353,6 +362,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533e68a5842e734946fe159fb03fc9bbbb254f590dd0d8ad321ae5ff7beca2c1" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.18" @@ -371,6 +400,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "273c0752728918e0ac4976f2b275b6fefb9ecd400585dec929419f3844cd87b5" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07293a4e297ac234359b510362495713f75ea345d5307140414f20c69ffeb087" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "laminar" version = "0.1.0" @@ -378,6 +427,7 @@ dependencies = [ "anyhow", "axum", "futures", + "notify", "prometheus", "serde", "serde_json", @@ -454,8 +504,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", - "windows-sys", + "windows-sys 0.61.2", +] + +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags", ] [[package]] @@ -464,7 +542,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -596,7 +674,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -611,6 +689,15 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -740,7 +827,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -770,7 +857,7 @@ dependencies = [ "getrandom", "once_cell", "rustix", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -836,7 +923,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -984,6 +1071,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1087,12 +1184,30 @@ dependencies = [ "semver", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -1102,6 +1217,71 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index e5a5c52..ba3f038 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT" anyhow = "1.0.102" axum = "0.8.9" futures = "0.3.32" +notify = "8.2.0" prometheus = "0.14.0" serde = {version ="1.0.228",features = ["derive"]} serde_json = "1.0.150" diff --git a/ROADMAP.md b/ROADMAP.md index 0799373..85768e9 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -152,7 +152,7 @@ - [x] Request counters - [x] Failure counters - [x] Backend health metrics -- [ ] Throughput metrics +- [x] Throughput metrics - [x] Track total requests --- @@ -186,7 +186,8 @@ ## Dynamic Config Reloading -- [x] Watch configuration file (manual reload api semantics implemented) +- [x] Runtime config reload API +- [x] Automatic file watcher reload - [x] Reload backend configuration - [x] Preserve active connections - [x] Runtime backend updates @@ -195,18 +196,18 @@ ## Admin API -- [ ] Add runtime status endpoint +- [x] Add runtime status endpoint - [ ] Add backend health endpoint -- [ ] Add backend enable/disable API +- [x] Add backend enable/disable API - [x] Add metrics endpoint --- ## Phase 3 Deliverable -- [ ] Runtime configurability +- [x] Runtime configurability - [x] Operational observability -- [ ] Graceful backend management +- [x] Graceful backend management --- diff --git a/deny.toml b/deny.toml index d7091a9..9d51584 100644 --- a/deny.toml +++ b/deny.toml @@ -94,7 +94,9 @@ allow = [ "MIT", "Apache-2.0", "Unicode-3.0", - "BSD-3-Clause" + "BSD-3-Clause", + "ISC", + "CC0-1.0" ] # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the diff --git a/src/admin/http.rs b/src/admin/http.rs index 8c07354..9f69f58 100644 --- a/src/admin/http.rs +++ b/src/admin/http.rs @@ -165,6 +165,54 @@ async fn status_handler(State(state): State) -> Json, + State(state): State, +) -> String { + let state = state.read().await; + + for upstream in &state.upstreams { + for backend in &upstream.backends { + if backend.config.id == id { + backend.disable(); + + tracing::info!( + backend_id = %id, + "backend disabled" + ); + + return format!("backend '{id}' disabled"); + } + } + } + + format!("backend '{id}' not found") +} + +async fn enable_backend_handler( + Path(id): Path, + State(state): State, +) -> String { + let state = state.read().await; + + for upstream in &state.upstreams { + for backend in &upstream.backends { + if backend.config.id == id { + backend.enable(); + + tracing::info!( + backend_id = %id, + "backend enabled" + ); + + return format!("backend '{id}' enabled"); + } + } + } + + format!("backend '{id}' not found") +} + pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> { let app = Router::new() .route("/metrics", get(metrics_handler)) @@ -172,6 +220,8 @@ pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow: .route("/prometheus", get(prometheus_handler)) .route("/reload", post(reload_handler)) .route("/status", get(status_handler)) + .route("/backend/{id}/disable", post(disable_backend_handler)) + .route("/backend/{id}/enable", post(enable_backend_handler)) .with_state(state); let listener = TcpListener::bind(address).await?; axum::serve(listener, app).await?; diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 2b3459e..deefb69 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -1,2 +1,3 @@ pub mod http; pub mod reload; +pub mod watcher; diff --git a/src/admin/watcher.rs b/src/admin/watcher.rs new file mode 100644 index 0000000..52480bc --- /dev/null +++ b/src/admin/watcher.rs @@ -0,0 +1,37 @@ +use notify::{RecursiveMode, Watcher}; + +use crate::{admin::reload::reload_config, state::app::SharedAppState}; + +pub async fn start_config_watcher(state: SharedAppState, path: String) -> notify::Result<()> { + let (tx, mut rx) = tokio::sync::mpsc::channel(32); + + let mut watcher = notify::recommended_watcher(move |result| { + let _ = tx.blocking_send(result); + })?; + + watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?; + + while let Some(event) = rx.recv().await { + match event { + Ok(_) => { + tracing::info!("config file changed"); + + if let Err(error) = reload_config(state.clone()).await { + tracing::error!( + error = %error, + "config reload failed" + ); + } + } + + Err(error) => { + tracing::error!( + error = %error, + "watch error" + ); + } + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 99a16b9..9c062de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,15 +44,29 @@ async fn main() -> Result<()> { let health_state = shared_state.clone(); let admin_state = shared_state.clone(); + let watcher_state = shared_state.clone(); + let watcher_path = path.clone(); + tokio::spawn(async move { if let Err(error) = admin::http::start_admin_server("127.0.0.1:9090", admin_state).await { tracing::error!("admin server failed: {:?}", error); } }); + tokio::spawn(async move { start_health_checker(health_state, health_interval).await; }); + tokio::spawn(async move { + if let Err(error) = admin::watcher::start_config_watcher(watcher_state, watcher_path).await + { + tracing::error!( + error = %error, + "config watcher failed" + ); + } + }); + let listener_address = format!("{listener_host}:{listener_port}"); start_tcp_proxy(&listener_address, shared_state).await?; diff --git a/src/state/backend.rs b/src/state/backend.rs index 9143ae0..b37103f 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -107,4 +107,12 @@ impl BackendState { pub fn is_routable(&self) -> bool { self.healthy.load(Ordering::Relaxed) && !self.draining.load(Ordering::Relaxed) } + + pub fn disable(&self) { + self.healthy.store(false, Ordering::Relaxed); + } + + pub fn enable(&self) { + self.healthy.store(true, Ordering::Relaxed); + } } From e0ac601c1bfec11a6b83b9ce859a5b02355e3d35 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 6 Jun 2026 12:39:25 +0530 Subject: [PATCH 2/2] feat: enhance backend management with health checks, weight updates, and config reload support --- CHANGELOG.md | 58 +++++++++++++++++++++++++++++++++++++++++--- ROADMAP.md | 4 +-- src/admin/http.rs | 57 ++++++++++++++++++++++++++++++++++++++++++- src/admin/watcher.rs | 5 ++++ src/main.rs | 6 ++--- src/state/backend.rs | 4 +++ 6 files changed, 124 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e1dcdc..7b264f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -253,27 +253,77 @@ interact during live traffic routing and health monitoring. - Added draining-aware backend selection - Added runtime request and failure metrics - Added Prometheus active connection gauges +- Added runtime config reload support +- Added graceful backend removal semantics +- Added weighted round robin balancing +- Added backend weight configuration support +- Added runtime reload integration tests +- Added draining lifecycle integration tests ### Changed +- Refactored weighted round robin scheduling using precomputed weighted backend pools - Refactored proxy connection lifecycle logging - Improved structured tracing across retries and request flow - Improved backend runtime observability - Improved connection accounting using Prometheus gauges - Improved backend lifecycle management semantics +- Improved runtime reload reconciliation behavior +- Improved weighted scheduling runtime efficiency - Cleaned up proxy retry orchestration flow ### Notes -This phase focused heavily on operational observability and runtime lifecycle management. +This phase focused heavily on operational observability, runtime lifecycle safety, and runtime traffic management behavior. Laminar now supports: - Prometheus-compatible metrics - structured request tracing - graceful shutdown handling -- backend draining -- runtime traffic visibility +- graceful backend draining +- runtime config reloads +- weighted traffic distribution +- runtime-safe backend lifecycle transitions - backend-aware operational telemetry -The runtime now behaves more like an operational load balancing system with live observability and traffic management capabilities. +The runtime now behaves much more like a production-oriented traffic management system with live observability, graceful lifecycle handling, and runtime traffic control semantics. + +--- + +## 2026-06-06 + +### Added + +- Added Prometheus request duration histograms +- Added backend connection latency histograms +- Added throughput metrics for inbound and outbound traffic +- Added runtime status API endpoint +- Added backend enable/disable admin APIs +- Added backend health API endpoint +- Added dynamic backend weight update API +- Added automatic config watcher reload support + +### Changed + +- Standardized Prometheus metric labels using backend IDs +- Improved runtime observability with latency-aware metrics +- Improved weighted round robin runtime efficiency +- Improved operational runtime control APIs +- Improved backend runtime management semantics + +### Notes + +This phase focused on completing Laminar’s runtime control and observability layer. + +Laminar now supports: + +- runtime status introspection APIs +- backend operational control APIs +- dynamic backend weight mutation +- automatic config reload watching +- request latency observability +- backend connection latency tracking +- throughput visibility + +Phase 3 now provides a much more complete operational runtime environment with live observability, runtime mutation support, and production-style backend lifecycle controls. diff --git a/ROADMAP.md b/ROADMAP.md index 85768e9..20e2fac 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -142,7 +142,7 @@ - [x] Weighted Round Robin - [x] Backend weights in config -- [ ] Dynamic weight updates +- [x] Dynamic weight updates --- @@ -197,7 +197,7 @@ ## Admin API - [x] Add runtime status endpoint -- [ ] Add backend health endpoint +- [x] Add backend health endpoint - [x] Add backend enable/disable API - [x] Add metrics endpoint diff --git a/src/admin/http.rs b/src/admin/http.rs index 9f69f58..bcbf5b3 100644 --- a/src/admin/http.rs +++ b/src/admin/http.rs @@ -6,7 +6,7 @@ use axum::{ extract::{Path, State}, routing::{get, post}, }; -use std::sync::atomic::Ordering; +use std::sync::{Arc, atomic::Ordering}; use serde::Serialize; use tokio::net::TcpListener; @@ -58,6 +58,13 @@ struct StatusResponse { upstreams: Vec, } +#[derive(Serialize)] +struct BackendHealthResponse { + id: String, + healthy: bool, + draining: bool, +} + async fn prometheus_handler() -> String { gather_metrics() } @@ -189,6 +196,26 @@ async fn disable_backend_handler( format!("backend '{id}' not found") } +async fn backend_health_handler( + Path(id): Path, + State(state): State, +) -> Json> { + let state = state.read().await; + + for upstream in &state.upstreams { + for backend in &upstream.backends { + if backend.config.id == id { + return Json(Some(BackendHealthResponse { + id: backend.config.id.clone(), + healthy: backend.healthy.load(Ordering::Relaxed), + draining: backend.draining.load(Ordering::Relaxed), + })); + } + } + } + Json(None) +} + async fn enable_backend_handler( Path(id): Path, State(state): State, @@ -213,6 +240,32 @@ async fn enable_backend_handler( format!("backend '{id}' not found") } +async fn update_backend_weight_handler( + Path((id, weight)): Path<(String, usize)>, + State(state): State, +) -> String { + let mut state = state.write().await; + for upstream in &mut state.upstreams { + for backend in &mut upstream.backends { + if backend.config.id == id { + if let Some(inner) = Arc::get_mut(backend) { + inner.set_weight(weight); + upstream.rebuild_weighted_backends(); + tracing::info!( + backend_id = %id, + weight = weight, + "backend weight updated" + ); + return format!("backend '{id}' weight updated to {weight}"); + } + return format!("backend '{id}' currently in use"); + } + } + } + + format!("backend '{id}' not found") +} + pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> { let app = Router::new() .route("/metrics", get(metrics_handler)) @@ -222,6 +275,8 @@ pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow: .route("/status", get(status_handler)) .route("/backend/{id}/disable", post(disable_backend_handler)) .route("/backend/{id}/enable", post(enable_backend_handler)) + .route("/backend/{id}/health", get(backend_health_handler)) + .route("/backend/{id}/weight/{weight}", post(update_backend_weight_handler)) .with_state(state); let listener = TcpListener::bind(address).await?; axum::serve(listener, app).await?; diff --git a/src/admin/watcher.rs b/src/admin/watcher.rs index 52480bc..2b06563 100644 --- a/src/admin/watcher.rs +++ b/src/admin/watcher.rs @@ -11,6 +11,11 @@ pub async fn start_config_watcher(state: SharedAppState, path: String) -> notify watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?; + tracing::info!( + path = %path, + "config watcher started" + ); + while let Some(event) = rx.recv().await { match event { Ok(_) => { diff --git a/src/main.rs b/src/main.rs index 9c062de..e6dcf22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,12 +41,12 @@ async fn main() -> Result<()> { let shared_state: SharedAppState = Arc::new(RwLock::new(state)); - let health_state = shared_state.clone(); - let admin_state = shared_state.clone(); - let watcher_state = shared_state.clone(); let watcher_path = path.clone(); + let health_state = shared_state.clone(); + let admin_state = shared_state.clone(); + tokio::spawn(async move { if let Err(error) = admin::http::start_admin_server("127.0.0.1:9090", admin_state).await { tracing::error!("admin server failed: {:?}", error); diff --git a/src/state/backend.rs b/src/state/backend.rs index b37103f..b97b22c 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -115,4 +115,8 @@ impl BackendState { pub fn enable(&self) { self.healthy.store(true, Ordering::Relaxed); } + + pub fn set_weight(&mut self, weight: usize) { + self.config.weight = weight; + } }