diff --git a/CHANGELOG.md b/CHANGELOG.md index 181438c..6e1dcdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,3 +236,44 @@ The runtime architecture documentation was also expanded to better explain how: - `ConnectionGuard` interact during live traffic routing and health monitoring. + +--- + +## 2026-06-05 + +### Added + +- Added Prometheus metrics exporter +- Added `/prometheus` runtime metrics endpoint +- Added backend-specific Prometheus metrics +- Added structured JSON runtime logging +- Added request correlation IDs using UUIDs +- Added graceful TCP proxy shutdown handling +- Added backend draining support +- Added draining-aware backend selection +- Added runtime request and failure metrics +- Added Prometheus active connection gauges + +### Changed + +- Refactored proxy connection lifecycle logging +- Improved structured tracing across retries and request flow +- Improved backend runtime observability +- Improved connection accounting using Prometheus gauges +- Improved backend lifecycle management semantics +- Cleaned up proxy retry orchestration flow + +### Notes + +This phase focused heavily on operational observability and runtime lifecycle management. + +Laminar now supports: + +- Prometheus-compatible metrics +- structured request tracing +- graceful shutdown handling +- backend draining +- runtime traffic visibility +- backend-aware operational telemetry + +The runtime now behaves more like an operational load balancing system with live observability and traffic management capabilities. diff --git a/Cargo.lock b/Cargo.lock index 03a5fb6..8e6d5b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -366,10 +372,11 @@ dependencies = [ "anyhow", "axum", "futures", + "prometheus", "serde", "serde_json", "serde_yaml", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "tracing-subscriber", @@ -507,6 +514,41 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 2.0.18", +] + +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quote" version = "1.0.45" @@ -692,13 +734,33 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c0a9206..c6b794f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT" anyhow = "1.0.102" axum = "0.8.9" futures = "0.3.32" +prometheus = "0.14.0" serde = {version ="1.0.228",features = ["derive"]} serde_json = "1.0.150" serde_yaml = {version = "0.9.34"} diff --git a/ROADMAP.md b/ROADMAP.md index 8a4442e..bba7ab2 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -149,7 +149,7 @@ ## Metrics - [x] Active connection metrics -- [ ] Request counters +- [x] Request counters - [x] Failure counters - [x] Backend health metrics - [ ] Throughput metrics @@ -159,9 +159,9 @@ ## Prometheus Integration -- [ ] Prometheus metrics exporter -- [ ] Metrics endpoint -- [ ] Backend-specific metrics +- [x] Prometheus metrics exporter +- [x] Metrics endpoint +- [x] Backend-specific metrics --- @@ -177,9 +177,9 @@ ## Graceful Backend Draining -- [ ] Add draining backend state -- [ ] Stop routing new connections -- [ ] Wait for active connections +- [x] Add draining backend state +- [x] Stop routing new connections +- [x] Wait for active connections - [ ] Graceful backend removal --- @@ -205,7 +205,7 @@ ## Phase 3 Deliverable - [ ] Runtime configurability -- [ ] Operational observability +- [x] Operational observability - [ ] Graceful backend management --- diff --git a/src/admin/http.rs b/src/admin/http.rs index f082df7..86aff08 100644 --- a/src/admin/http.rs +++ b/src/admin/http.rs @@ -1,8 +1,11 @@ +use crate::{metrics::registry::gather_metrics, state::app::SharedAppState}; +use axum::{ + Json, Router, + extract::{Path, State}, + routing::{get, post}, +}; use std::sync::atomic::Ordering; -use axum::{Json, Router, extract::State, routing::get}; - -use laminar::state::app::SharedAppState; use serde::Serialize; use tokio::net::TcpListener; @@ -13,6 +16,7 @@ struct BackendMetrics { active_connections: usize, total_requests: usize, failed_requests: usize, + draining: bool, } #[derive(Serialize)] @@ -27,6 +31,10 @@ struct MetricsResponse { upstreams: Vec, } +async fn prometheus_handler() -> String { + gather_metrics() +} + async fn metrics_handler(State(state): State) -> Json { let state = state.read().await; @@ -43,6 +51,7 @@ async fn metrics_handler(State(state): State) -> Json) -> Json, + State(state): State, +) -> String { + let state = state.read().await; + + for upstream in &state.upstreams { + for backend in &upstream.backends { + if backend.config.id == id { + backend.mark_draining(); + + tracing::info!( + backend_id = %id, + "backend marked as draining" + ); + + return format!("backend '{id}' marked draining"); + } + } + } + + format!("backend '{id}' not found") +} + pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> { - let app = Router::new().route("/metrics", get(metrics_handler)).with_state(state); + let app = Router::new() + .route("/metrics", get(metrics_handler)) + .route("/backend/{id}/drain", post(drain_backend_handler)) + .route("/prometheus", get(prometheus_handler)) + .with_state(state); let listener = TcpListener::bind(address).await?; axum::serve(listener, app).await?; Ok(()) diff --git a/src/algorithms/least_connections.rs b/src/algorithms/least_connections.rs index fa58f7a..7221d80 100644 --- a/src/algorithms/least_connections.rs +++ b/src/algorithms/least_connections.rs @@ -8,7 +8,7 @@ pub fn select_backend(backends: &[Arc]) -> Option Result<()> { tracing_subscriber::fmt().json().with_current_span(true).with_span_list(true).init(); @@ -36,6 +37,8 @@ async fn main() -> Result<()> { bail!("no upstreams configured"); } + metrics::registry::initialize_metrics(); + let shared_state: SharedAppState = Arc::new(RwLock::new(state)); let health_state = shared_state.clone(); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..d108990 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1 @@ +pub mod registry; diff --git a/src/metrics/registry.rs b/src/metrics/registry.rs new file mode 100644 index 0000000..9e711ae --- /dev/null +++ b/src/metrics/registry.rs @@ -0,0 +1,45 @@ +use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Registry, TextEncoder}; +use std::sync::OnceLock; +pub static REGISTRY: OnceLock = OnceLock::new(); +pub static TOTAL_REQUESTS: OnceLock = OnceLock::new(); +pub static FAILED_REQUESTS: OnceLock = OnceLock::new(); +pub static ACTIVE_CONNECTIONS: OnceLock = OnceLock::new(); + +pub fn initialize_metrics() { + let registry = Registry::new(); + + let total_requests = IntCounterVec::new( + prometheus::Opts::new("laminar_total_requests", "Total successful requests"), + &["backend"], + ) + .unwrap(); + + let failed_requests = IntCounterVec::new( + prometheus::Opts::new("laminar_failed_requests", "Total failed requests"), + &["backend"], + ) + .unwrap(); + + let active_connections = IntGaugeVec::new( + prometheus::Opts::new("laminar_active_connections", "Current active connections"), + &["backend"], + ) + .unwrap(); + + registry.register(Box::new(total_requests.clone())).unwrap(); + registry.register(Box::new(failed_requests.clone())).unwrap(); + registry.register(Box::new(active_connections.clone())).unwrap(); + + REGISTRY.set(registry).unwrap(); + TOTAL_REQUESTS.set(total_requests).unwrap(); + FAILED_REQUESTS.set(failed_requests).unwrap(); + ACTIVE_CONNECTIONS.set(active_connections).unwrap(); +} + +pub fn gather_metrics() -> String { + let encoder = TextEncoder::new(); + let metric_families = REGISTRY.get().unwrap().gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index eca16f4..785cb24 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,4 +1,5 @@ use crate::common::shutdown::shutdown_signal; +use crate::metrics::registry::{FAILED_REQUESTS, TOTAL_REQUESTS}; use crate::state::app::SharedAppState; use crate::state::backend::ConnectionGuard; use std::{collections::HashSet, time::Duration}; @@ -27,7 +28,10 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re tokio::spawn(async move { if let Err(error) = handle_connection(client_stream,state).await { - error!("connection handling failed {:?}",error); + error!( + error = %error, + "connection handling failed" + ); } }); } @@ -61,32 +65,33 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> // if connection fails: // mark that backend unhealthy so future selections skip it for _ in 0..retry_attempt { - let guard = { + let backend_arc = { let state = state.read().await; let upstream = &state.upstreams[0]; - let backend_arc = match upstream.next_backend() { + match upstream.next_backend() { Some(backend) => backend, None => { error!( request_id = %request_id, "no healthy backend available" ); + return Ok(()); } - }; - ConnectionGuard::new(backend_arc) - // format!("{}:{}", backend.config.host, backend.config.port) + } }; - let backend_address = guard.address(); - if attempted_backends.contains(guard.backend_id()) { + if attempted_backends.contains(&backend_arc.config.id) { continue; } + let guard = ConnectionGuard::new(backend_arc); + let backend_address = guard.address(); + info!( request_id = %request_id, backend_id = %guard.backend_id(), backend = %backend_address, - "forwarding traffic" + "proxy connection started" ); match proxy_connection(&mut stream, &backend_address, connect_timeout, idle_timeout).await { @@ -96,10 +101,16 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> backend_id = %guard.backend_id(), "request completed" ); + if let Some(metrics) = TOTAL_REQUESTS.get() { + metrics.with_label_values(&[guard.backend_id()]).inc(); + } guard.backend().increment_total_requests(); return Ok(()); } Err(error) => { + if let Some(metrics) = FAILED_REQUESTS.get() { + metrics.with_label_values(&[guard.backend_id()]).inc(); + } guard.backend().increment_failed_requests(); guard.mark_backend_unhealthy(); error!( @@ -115,7 +126,10 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> } } } - error!("all backend retry attempts failed"); + error!( + request_id = %request_id, + "all backend retry attempts failed" + ); Ok(()) } diff --git a/src/state/backend.rs b/src/state/backend.rs index a06c846..e58a5e4 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; // use std::sync::Arc; // use tokio::sync::RwLock; use crate::config::BackendServerConfig; - +use crate::metrics::registry::ACTIVE_CONNECTIONS; pub struct ConnectionGuard { // We hold an Arc so backend state stays alive // for the lifetime of the connection. @@ -30,12 +30,17 @@ pub struct BackendState { pub total_requests: AtomicUsize, pub failed_requests: AtomicUsize, + + pub draining: AtomicBool, } impl ConnectionGuard { pub fn new(backend: Arc) -> Self { // Increment immediately upon creation backend.active_connections.fetch_add(1, Ordering::Relaxed); + if let Some(metrics) = ACTIVE_CONNECTIONS.get() { + metrics.with_label_values(&[&backend.config.id]).inc(); + } Self { backend } } @@ -58,6 +63,9 @@ impl ConnectionGuard { impl Drop for ConnectionGuard { fn drop(&mut self) { self.backend.active_connections.fetch_sub(1, Ordering::Relaxed); + if let Some(metrics) = ACTIVE_CONNECTIONS.get() { + metrics.with_label_values(&[&self.backend.config.id]).dec(); + } } } @@ -68,6 +76,7 @@ impl BackendState { Self { config, healthy: AtomicBool::new(true), + draining: AtomicBool::new(false), active_connections: AtomicUsize::new(0), failed_health_checks: 0, total_requests: AtomicUsize::new(0), @@ -83,6 +92,14 @@ impl BackendState { self.failed_requests.fetch_add(1, Ordering::Relaxed); } + pub fn mark_draining(&self) { + self.draining.store(true, Ordering::Relaxed); + } + + pub fn is_draining(&self) -> bool { + self.draining.load(Ordering::Relaxed) + } + pub fn is_healthy(&self) -> bool { self.healthy.load(Ordering::Relaxed) } diff --git a/tests/connect_timeout.rs b/tests/connect_timeout.rs index 852c55f..93d1743 100644 --- a/tests/connect_timeout.rs +++ b/tests/connect_timeout.rs @@ -26,6 +26,7 @@ async fn marks_backend_unhealthy_on_connect_timeout() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let guard = ConnectionGuard::new(backend.clone()); diff --git a/tests/connection_guard.rs b/tests/connection_guard.rs index 1d71504..915e240 100644 --- a/tests/connection_guard.rs +++ b/tests/connection_guard.rs @@ -22,6 +22,7 @@ fn connection_guard_tracks_active_connections() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); assert_eq!(backend.active_connections.load(Ordering::Relaxed), 0); diff --git a/tests/health_checker.rs b/tests/health_checker.rs index ff7fbef..fe29479 100644 --- a/tests/health_checker.rs +++ b/tests/health_checker.rs @@ -21,6 +21,7 @@ fn create_backend(port: u16) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } } diff --git a/tests/health_selection.rs b/tests/health_selection.rs index 671a6e4..f08b830 100644 --- a/tests/health_selection.rs +++ b/tests/health_selection.rs @@ -19,6 +19,7 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } } diff --git a/tests/least_connections.rs b/tests/least_connections.rs index 7286e64..daf5442 100644 --- a/tests/least_connections.rs +++ b/tests/least_connections.rs @@ -26,6 +26,7 @@ fn create_backend( failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }) } diff --git a/tests/request_metrics.rs b/tests/request_metrics.rs index e01b73a..b1c0708 100644 --- a/tests/request_metrics.rs +++ b/tests/request_metrics.rs @@ -18,6 +18,7 @@ fn request_metrics_increment_correctly() { total_requests: AtomicUsize::new(0), failed_requests: AtomicUsize::new(0), failed_health_checks: 0, + draining: AtomicBool::new(false), }); backend.increment_total_requests(); diff --git a/tests/retry_stabilization.rs b/tests/retry_stabilization.rs index ba5e253..1410453 100644 --- a/tests/retry_stabilization.rs +++ b/tests/retry_stabilization.rs @@ -22,6 +22,7 @@ fn unhealthy_backend_is_not_selected_again() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let backend_2 = Arc::new(BackendState { @@ -37,6 +38,7 @@ fn unhealthy_backend_is_not_selected_again() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let backends = vec![backend_1, backend_2.clone()]; diff --git a/tests/round_robin.rs b/tests/round_robin.rs index 9340cfb..c2c350b 100644 --- a/tests/round_robin.rs +++ b/tests/round_robin.rs @@ -19,6 +19,7 @@ fn create_backend(id: &str, port: u16) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } }