From f2821969474fb718e3fb8eef344dafab9a6f19b7 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 5 Jun 2026 11:24:46 +0530 Subject: [PATCH 1/4] chore: add graceful backend draining --- src/admin/http.rs | 37 +++++++++++++++++++++++++++-- src/algorithms/least_connections.rs | 2 +- src/algorithms/round_robin.rs | 2 +- src/health/tcp.rs | 3 +++ src/state/backend.rs | 11 +++++++++ tests/connect_timeout.rs | 1 + tests/connection_guard.rs | 1 + tests/health_checker.rs | 1 + tests/health_selection.rs | 1 + tests/least_connections.rs | 1 + tests/request_metrics.rs | 1 + tests/retry_stabilization.rs | 2 ++ tests/round_robin.rs | 1 + 13 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/admin/http.rs b/src/admin/http.rs index f082df7..5363685 100644 --- a/src/admin/http.rs +++ b/src/admin/http.rs @@ -1,6 +1,10 @@ use std::sync::atomic::Ordering; -use axum::{Json, Router, extract::State, routing::get}; +use axum::{ + Json, Router, + extract::{Path, State}, + routing::{get, post}, +}; use laminar::state::app::SharedAppState; use serde::Serialize; @@ -13,6 +17,7 @@ struct BackendMetrics { active_connections: usize, total_requests: usize, failed_requests: usize, + draining: bool, } #[derive(Serialize)] @@ -43,6 +48,7 @@ async fn metrics_handler(State(state): State) -> Json) -> Json, + State(state): State, +) -> String { + let state = state.read().await; + + for upstream in &state.upstreams { + for backend in &upstream.backends { + if backend.config.id == id { + backend.mark_draining(); + + tracing::info!( + backend_id = %id, + "backend marked as draining" + ); + + return format!("backend '{id}' marked draining"); + } + } + } + + format!("backend '{id}' not found") +} + pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> { - let app = Router::new().route("/metrics", get(metrics_handler)).with_state(state); + let app = Router::new() + .route("/metrics", get(metrics_handler)) + .route("/backend/{id}/drain", post(drain_backend_handler)) + .with_state(state); let listener = TcpListener::bind(address).await?; axum::serve(listener, app).await?; Ok(()) diff --git a/src/algorithms/least_connections.rs b/src/algorithms/least_connections.rs index fa58f7a..1ecb722 100644 --- a/src/algorithms/least_connections.rs +++ b/src/algorithms/least_connections.rs @@ -8,7 +8,7 @@ pub fn select_backend(backends: &[Arc]) -> Option bool { + self.draining.load(Ordering::Relaxed) + } + pub fn is_healthy(&self) -> bool { self.healthy.load(Ordering::Relaxed) } diff --git a/tests/connect_timeout.rs b/tests/connect_timeout.rs index 852c55f..93d1743 100644 --- a/tests/connect_timeout.rs +++ b/tests/connect_timeout.rs @@ -26,6 +26,7 @@ async fn marks_backend_unhealthy_on_connect_timeout() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let guard = ConnectionGuard::new(backend.clone()); diff --git a/tests/connection_guard.rs b/tests/connection_guard.rs index 1d71504..915e240 100644 --- a/tests/connection_guard.rs +++ b/tests/connection_guard.rs @@ -22,6 +22,7 @@ fn connection_guard_tracks_active_connections() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); assert_eq!(backend.active_connections.load(Ordering::Relaxed), 0); diff --git a/tests/health_checker.rs b/tests/health_checker.rs index ff7fbef..fe29479 100644 --- a/tests/health_checker.rs +++ b/tests/health_checker.rs @@ -21,6 +21,7 @@ fn create_backend(port: u16) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } } diff --git a/tests/health_selection.rs b/tests/health_selection.rs index 671a6e4..f08b830 100644 --- a/tests/health_selection.rs +++ b/tests/health_selection.rs @@ -19,6 +19,7 @@ fn create_backend(id: &str, port: u16, healthy: bool) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } } diff --git a/tests/least_connections.rs b/tests/least_connections.rs index 7286e64..daf5442 100644 --- a/tests/least_connections.rs +++ b/tests/least_connections.rs @@ -26,6 +26,7 @@ fn create_backend( failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }) } diff --git a/tests/request_metrics.rs b/tests/request_metrics.rs index e01b73a..b1c0708 100644 --- a/tests/request_metrics.rs +++ b/tests/request_metrics.rs @@ -18,6 +18,7 @@ fn request_metrics_increment_correctly() { total_requests: AtomicUsize::new(0), failed_requests: AtomicUsize::new(0), failed_health_checks: 0, + draining: AtomicBool::new(false), }); backend.increment_total_requests(); diff --git a/tests/retry_stabilization.rs b/tests/retry_stabilization.rs index ba5e253..1410453 100644 --- a/tests/retry_stabilization.rs +++ b/tests/retry_stabilization.rs @@ -22,6 +22,7 @@ fn unhealthy_backend_is_not_selected_again() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let backend_2 = Arc::new(BackendState { @@ -37,6 +38,7 @@ fn unhealthy_backend_is_not_selected_again() { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), }); let backends = vec![backend_1, backend_2.clone()]; diff --git a/tests/round_robin.rs b/tests/round_robin.rs index 9340cfb..c2c350b 100644 --- a/tests/round_robin.rs +++ b/tests/round_robin.rs @@ -19,6 +19,7 @@ fn create_backend(id: &str, port: u16) -> BackendState { failed_health_checks: 0, failed_requests: AtomicUsize::new(0), total_requests: AtomicUsize::new(0), + draining: AtomicBool::new(false), } } From eabe509bb564c458e705673222eb5d8c0e506b41 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 5 Jun 2026 11:55:55 +0530 Subject: [PATCH 2/4] chore: add prometheus metrics --- Cargo.lock | 66 +++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + src/admin/http.rs | 10 +++++-- src/lib.rs | 2 ++ src/main.rs | 7 +++-- src/metrics/mod.rs | 1 + src/metrics/registry.rs | 45 ++++++++++++++++++++++++++++ src/proxy/tcp.rs | 30 ++++++++++++------- src/state/backend.rs | 4 ++- 9 files changed, 148 insertions(+), 18 deletions(-) create mode 100644 src/metrics/mod.rs create mode 100644 src/metrics/registry.rs diff --git a/Cargo.lock b/Cargo.lock index 03a5fb6..8e6d5b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -366,10 +372,11 @@ dependencies = [ "anyhow", "axum", "futures", + "prometheus", "serde", "serde_json", "serde_yaml", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "tracing-subscriber", @@ -507,6 +514,41 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 2.0.18", +] + +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quote" version = "1.0.45" @@ -692,13 +734,33 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c0a9206..c6b794f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT" anyhow = "1.0.102" axum = "0.8.9" futures = "0.3.32" +prometheus = "0.14.0" serde = {version ="1.0.228",features = ["derive"]} serde_json = "1.0.150" serde_yaml = {version = "0.9.34"} diff --git a/src/admin/http.rs b/src/admin/http.rs index 5363685..86aff08 100644 --- a/src/admin/http.rs +++ b/src/admin/http.rs @@ -1,12 +1,11 @@ -use std::sync::atomic::Ordering; - +use crate::{metrics::registry::gather_metrics, state::app::SharedAppState}; use axum::{ Json, Router, extract::{Path, State}, routing::{get, post}, }; +use std::sync::atomic::Ordering; -use laminar::state::app::SharedAppState; use serde::Serialize; use tokio::net::TcpListener; @@ -32,6 +31,10 @@ struct MetricsResponse { upstreams: Vec, } +async fn prometheus_handler() -> String { + gather_metrics() +} + async fn metrics_handler(State(state): State) -> Json { let state = state.read().await; @@ -91,6 +94,7 @@ pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow: let app = Router::new() .route("/metrics", get(metrics_handler)) .route("/backend/{id}/drain", post(drain_backend_handler)) + .route("/prometheus", get(prometheus_handler)) .with_state(state); let listener = TcpListener::bind(address).await?; axum::serve(listener, app).await?; diff --git a/src/lib.rs b/src/lib.rs index 89058d0..3c06d38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ +pub mod admin; pub mod algorithms; pub mod common; pub mod config; pub mod health; +pub mod metrics; pub mod proxy; pub mod state; diff --git a/src/main.rs b/src/main.rs index de47d9c..59828bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,19 @@ #![warn(clippy::all)] #![warn(clippy::pedantic)] #![allow(dead_code)] -mod admin; + use anyhow::{Result, bail}; use laminar::{ + admin, config::{loader::load_config, validator::validate_config}, health::tcp::start_health_checker, + metrics, proxy::tcp::start_tcp_proxy, state::app::{AppState, SharedAppState}, }; use std::sync::Arc; use tokio::sync::RwLock; use tracing::info; -mod common; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt().json().with_current_span(true).with_span_list(true).init(); @@ -36,6 +37,8 @@ async fn main() -> Result<()> { bail!("no upstreams configured"); } + metrics::registry::initialize_metrics(); + let shared_state: SharedAppState = Arc::new(RwLock::new(state)); let health_state = shared_state.clone(); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..d108990 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1 @@ +pub mod registry; diff --git a/src/metrics/registry.rs b/src/metrics/registry.rs new file mode 100644 index 0000000..9e711ae --- /dev/null +++ b/src/metrics/registry.rs @@ -0,0 +1,45 @@ +use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Registry, TextEncoder}; +use std::sync::OnceLock; +pub static REGISTRY: OnceLock = OnceLock::new(); +pub static TOTAL_REQUESTS: OnceLock = OnceLock::new(); +pub static FAILED_REQUESTS: OnceLock = OnceLock::new(); +pub static ACTIVE_CONNECTIONS: OnceLock = OnceLock::new(); + +pub fn initialize_metrics() { + let registry = Registry::new(); + + let total_requests = IntCounterVec::new( + prometheus::Opts::new("laminar_total_requests", "Total successful requests"), + &["backend"], + ) + .unwrap(); + + let failed_requests = IntCounterVec::new( + prometheus::Opts::new("laminar_failed_requests", "Total failed requests"), + &["backend"], + ) + .unwrap(); + + let active_connections = IntGaugeVec::new( + prometheus::Opts::new("laminar_active_connections", "Current active connections"), + &["backend"], + ) + .unwrap(); + + registry.register(Box::new(total_requests.clone())).unwrap(); + registry.register(Box::new(failed_requests.clone())).unwrap(); + registry.register(Box::new(active_connections.clone())).unwrap(); + + REGISTRY.set(registry).unwrap(); + TOTAL_REQUESTS.set(total_requests).unwrap(); + FAILED_REQUESTS.set(failed_requests).unwrap(); + ACTIVE_CONNECTIONS.set(active_connections).unwrap(); +} + +pub fn gather_metrics() -> String { + let encoder = TextEncoder::new(); + let metric_families = REGISTRY.get().unwrap().gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index eca16f4..a26dca9 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -1,4 +1,5 @@ use crate::common::shutdown::shutdown_signal; +use crate::metrics::registry::{FAILED_REQUESTS, TOTAL_REQUESTS}; use crate::state::app::SharedAppState; use crate::state::backend::ConnectionGuard; use std::{collections::HashSet, time::Duration}; @@ -27,7 +28,10 @@ pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Re tokio::spawn(async move { if let Err(error) = handle_connection(client_stream,state).await { - error!("connection handling failed {:?}",error); + error!( + error = %error, + "connection handling failed" + ); } }); } @@ -61,32 +65,33 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> // if connection fails: // mark that backend unhealthy so future selections skip it for _ in 0..retry_attempt { - let guard = { + let backend_arc = { let state = state.read().await; let upstream = &state.upstreams[0]; - let backend_arc = match upstream.next_backend() { + match upstream.next_backend() { Some(backend) => backend, None => { error!( request_id = %request_id, "no healthy backend available" ); + return Ok(()); } - }; - ConnectionGuard::new(backend_arc) - // format!("{}:{}", backend.config.host, backend.config.port) + } }; - let backend_address = guard.address(); - if attempted_backends.contains(guard.backend_id()) { + if attempted_backends.contains(&backend_arc.config.id) { continue; } + let guard = ConnectionGuard::new(backend_arc); + let backend_address = guard.address(); + info!( request_id = %request_id, backend_id = %guard.backend_id(), backend = %backend_address, - "forwarding traffic" + "proxy connection started" ); match proxy_connection(&mut stream, &backend_address, connect_timeout, idle_timeout).await { @@ -96,10 +101,12 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> backend_id = %guard.backend_id(), "request completed" ); + TOTAL_REQUESTS.get().unwrap().with_label_values(&[guard.backend_id()]).inc(); guard.backend().increment_total_requests(); return Ok(()); } Err(error) => { + FAILED_REQUESTS.get().unwrap().with_label_values(&[guard.backend_id()]).inc(); guard.backend().increment_failed_requests(); guard.mark_backend_unhealthy(); error!( @@ -115,7 +122,10 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> } } } - error!("all backend retry attempts failed"); + error!( + request_id = %request_id, + "all backend retry attempts failed" + ); Ok(()) } diff --git a/src/state/backend.rs b/src/state/backend.rs index c378131..4f7dd89 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; // use std::sync::Arc; // use tokio::sync::RwLock; use crate::config::BackendServerConfig; - +use crate::metrics::registry::ACTIVE_CONNECTIONS; pub struct ConnectionGuard { // We hold an Arc so backend state stays alive // for the lifetime of the connection. @@ -38,6 +38,7 @@ impl ConnectionGuard { pub fn new(backend: Arc) -> Self { // Increment immediately upon creation backend.active_connections.fetch_add(1, Ordering::Relaxed); + ACTIVE_CONNECTIONS.get().unwrap().with_label_values(&[&backend.config.id]).inc(); Self { backend } } @@ -60,6 +61,7 @@ impl ConnectionGuard { impl Drop for ConnectionGuard { fn drop(&mut self) { self.backend.active_connections.fetch_sub(1, Ordering::Relaxed); + ACTIVE_CONNECTIONS.get().unwrap().with_label_values(&[&self.backend.config.id]).dec(); } } From 0c1f1b9e4f597e20ff5072ce78383c987664eabf Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 5 Jun 2026 12:02:50 +0530 Subject: [PATCH 3/4] update roadmap and changelog --- CHANGELOG.md | 41 +++++++++++++++++++++++++++++++++++++++++ ROADMAP.md | 16 ++++++++-------- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 181438c..6e1dcdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,3 +236,44 @@ The runtime architecture documentation was also expanded to better explain how: - `ConnectionGuard` interact during live traffic routing and health monitoring. + +--- + +## 2026-06-05 + +### Added + +- Added Prometheus metrics exporter +- Added `/prometheus` runtime metrics endpoint +- Added backend-specific Prometheus metrics +- Added structured JSON runtime logging +- Added request correlation IDs using UUIDs +- Added graceful TCP proxy shutdown handling +- Added backend draining support +- Added draining-aware backend selection +- Added runtime request and failure metrics +- Added Prometheus active connection gauges + +### Changed + +- Refactored proxy connection lifecycle logging +- Improved structured tracing across retries and request flow +- Improved backend runtime observability +- Improved connection accounting using Prometheus gauges +- Improved backend lifecycle management semantics +- Cleaned up proxy retry orchestration flow + +### Notes + +This phase focused heavily on operational observability and runtime lifecycle management. + +Laminar now supports: + +- Prometheus-compatible metrics +- structured request tracing +- graceful shutdown handling +- backend draining +- runtime traffic visibility +- backend-aware operational telemetry + +The runtime now behaves more like an operational load balancing system with live observability and traffic management capabilities. diff --git a/ROADMAP.md b/ROADMAP.md index 8a4442e..bba7ab2 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -149,7 +149,7 @@ ## Metrics - [x] Active connection metrics -- [ ] Request counters +- [x] Request counters - [x] Failure counters - [x] Backend health metrics - [ ] Throughput metrics @@ -159,9 +159,9 @@ ## Prometheus Integration -- [ ] Prometheus metrics exporter -- [ ] Metrics endpoint -- [ ] Backend-specific metrics +- [x] Prometheus metrics exporter +- [x] Metrics endpoint +- [x] Backend-specific metrics --- @@ -177,9 +177,9 @@ ## Graceful Backend Draining -- [ ] Add draining backend state -- [ ] Stop routing new connections -- [ ] Wait for active connections +- [x] Add draining backend state +- [x] Stop routing new connections +- [x] Wait for active connections - [ ] Graceful backend removal --- @@ -205,7 +205,7 @@ ## Phase 3 Deliverable - [ ] Runtime configurability -- [ ] Operational observability +- [x] Operational observability - [ ] Graceful backend management --- From 765b01aa3df03b2ab737bfebadebcd08f3202225 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 5 Jun 2026 12:13:49 +0530 Subject: [PATCH 4/4] fix: decouple runtime tests from prometheus initialization --- src/algorithms/least_connections.rs | 2 +- src/proxy/tcp.rs | 8 ++++++-- src/state/backend.rs | 8 ++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/algorithms/least_connections.rs b/src/algorithms/least_connections.rs index 1ecb722..7221d80 100644 --- a/src/algorithms/least_connections.rs +++ b/src/algorithms/least_connections.rs @@ -8,7 +8,7 @@ pub fn select_backend(backends: &[Arc]) -> Option backend_id = %guard.backend_id(), "request completed" ); - TOTAL_REQUESTS.get().unwrap().with_label_values(&[guard.backend_id()]).inc(); + if let Some(metrics) = TOTAL_REQUESTS.get() { + metrics.with_label_values(&[guard.backend_id()]).inc(); + } guard.backend().increment_total_requests(); return Ok(()); } Err(error) => { - FAILED_REQUESTS.get().unwrap().with_label_values(&[guard.backend_id()]).inc(); + if let Some(metrics) = FAILED_REQUESTS.get() { + metrics.with_label_values(&[guard.backend_id()]).inc(); + } guard.backend().increment_failed_requests(); guard.mark_backend_unhealthy(); error!( diff --git a/src/state/backend.rs b/src/state/backend.rs index 4f7dd89..e58a5e4 100644 --- a/src/state/backend.rs +++ b/src/state/backend.rs @@ -38,7 +38,9 @@ impl ConnectionGuard { pub fn new(backend: Arc) -> Self { // Increment immediately upon creation backend.active_connections.fetch_add(1, Ordering::Relaxed); - ACTIVE_CONNECTIONS.get().unwrap().with_label_values(&[&backend.config.id]).inc(); + if let Some(metrics) = ACTIVE_CONNECTIONS.get() { + metrics.with_label_values(&[&backend.config.id]).inc(); + } Self { backend } } @@ -61,7 +63,9 @@ impl ConnectionGuard { impl Drop for ConnectionGuard { fn drop(&mut self) { self.backend.active_connections.fetch_sub(1, Ordering::Relaxed); - ACTIVE_CONNECTIONS.get().unwrap().with_label_values(&[&self.backend.config.id]).dec(); + if let Some(metrics) = ACTIVE_CONNECTIONS.get() { + metrics.with_label_values(&[&self.backend.config.id]).dec(); + } } }