Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,44 @@ The runtime architecture documentation was also expanded to better explain how:
- `ConnectionGuard`

interact during live traffic routing and health monitoring.

---

## 2026-06-05

### Added

- Added Prometheus metrics exporter
- Added `/prometheus` runtime metrics endpoint
- Added backend-specific Prometheus metrics
- Added structured JSON runtime logging
- Added request correlation IDs using UUIDs
- Added graceful TCP proxy shutdown handling
- Added backend draining support
- Added draining-aware backend selection
- Added runtime request and failure metrics
- Added Prometheus active connection gauges

### Changed

- Refactored proxy connection lifecycle logging
- Improved structured tracing across retries and request flow
- Improved backend runtime observability
- Improved connection accounting using Prometheus gauges
- Improved backend lifecycle management semantics
- Cleaned up proxy retry orchestration flow

### Notes

This phase focused heavily on operational observability and runtime lifecycle management.

Laminar now supports:

- Prometheus-compatible metrics
- structured request tracing
- graceful shutdown handling
- backend draining
- runtime traffic visibility
- backend-aware operational telemetry

The runtime now behaves more like an operational load balancing system with live observability and traffic management capabilities.
66 changes: 64 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = "MIT"
anyhow = "1.0.102"
axum = "0.8.9"
futures = "0.3.32"
prometheus = "0.14.0"
serde = {version ="1.0.228",features = ["derive"]}
serde_json = "1.0.150"
serde_yaml = {version = "0.9.34"}
Expand Down
16 changes: 8 additions & 8 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
## Metrics

- [x] Active connection metrics
- [ ] Request counters
- [x] Request counters
- [x] Failure counters
- [x] Backend health metrics
- [ ] Throughput metrics
Expand All @@ -159,9 +159,9 @@

## Prometheus Integration

- [ ] Prometheus metrics exporter
- [ ] Metrics endpoint
- [ ] Backend-specific metrics
- [x] Prometheus metrics exporter
- [x] Metrics endpoint
- [x] Backend-specific metrics

---

Expand All @@ -177,9 +177,9 @@

## Graceful Backend Draining

- [ ] Add draining backend state
- [ ] Stop routing new connections
- [ ] Wait for active connections
- [x] Add draining backend state
- [x] Stop routing new connections
- [x] Wait for active connections
- [ ] Graceful backend removal

---
Expand All @@ -205,7 +205,7 @@
## Phase 3 Deliverable

- [ ] Runtime configurability
- [ ] Operational observability
- [x] Operational observability
- [ ] Graceful backend management

---
Expand Down
45 changes: 41 additions & 4 deletions src/admin/http.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::{metrics::registry::gather_metrics, state::app::SharedAppState};
use axum::{
Json, Router,
extract::{Path, State},
routing::{get, post},
};
use std::sync::atomic::Ordering;

use axum::{Json, Router, extract::State, routing::get};

use laminar::state::app::SharedAppState;
use serde::Serialize;
use tokio::net::TcpListener;

Expand All @@ -13,6 +16,7 @@ struct BackendMetrics {
active_connections: usize,
total_requests: usize,
failed_requests: usize,
draining: bool,
}

#[derive(Serialize)]
Expand All @@ -27,6 +31,10 @@ struct MetricsResponse {
upstreams: Vec<UpstreamMetrics>,
}

async fn prometheus_handler() -> String {
gather_metrics()
}

async fn metrics_handler(State(state): State<SharedAppState>) -> Json<MetricsResponse> {
let state = state.read().await;

Expand All @@ -43,6 +51,7 @@ async fn metrics_handler(State(state): State<SharedAppState>) -> Json<MetricsRes
active_connections: backend.active_connections.load(Ordering::Relaxed),
total_requests: backend.total_requests.load(Ordering::Relaxed),
failed_requests: backend.failed_requests.load(Ordering::Relaxed),
draining: backend.draining.load(Ordering::Relaxed),
})
.collect();

Expand All @@ -57,8 +66,36 @@ async fn metrics_handler(State(state): State<SharedAppState>) -> Json<MetricsRes
Json(MetricsResponse { upstreams })
}

async fn drain_backend_handler(
Path(id): Path<String>,
State(state): State<SharedAppState>,
) -> String {
let state = state.read().await;

for upstream in &state.upstreams {
for backend in &upstream.backends {
if backend.config.id == id {
backend.mark_draining();

tracing::info!(
backend_id = %id,
"backend marked as draining"
);

return format!("backend '{id}' marked draining");
}
}
}

format!("backend '{id}' not found")
}

pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let app = Router::new().route("/metrics", get(metrics_handler)).with_state(state);
let app = Router::new()
.route("/metrics", get(metrics_handler))
.route("/backend/{id}/drain", post(drain_backend_handler))
.route("/prometheus", get(prometheus_handler))
.with_state(state);
let listener = TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/least_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn select_backend(backends: &[Arc<BackendState>]) -> Option<Arc<BackendState

for backend in backends {
//check if this backend is healthy
if !backend.healthy.load(Ordering::Relaxed) {
if !backend.healthy.load(Ordering::Relaxed) || backend.draining.load(Ordering::Relaxed) {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn select_backend(
for _ in 0..backends.len() {
let index = current_index.fetch_add(1, Ordering::Relaxed);
let backend = &backends[index % backends.len()];
if backend.healthy.load(Ordering::Relaxed) {
if backend.healthy.load(Ordering::Relaxed) && !backend.draining.load(Ordering::Relaxed) {
return Some(backend.clone());
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/health/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub async fn start_health_checker(state: SharedAppState, interval_secs: u64) {
};
for backend in backends {
let _ = check_backend_status(&backend).await;
if backend.is_draining() && backend.active_connections.load(Ordering::Relaxed) == 0 {
info!(backend_id =%backend.config.id,"backend safe to remove");
}
}

sleep(Duration::from_secs(interval_secs)).await;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod admin;
pub mod algorithms;
pub mod common;
pub mod config;
pub mod health;
pub mod metrics;
pub mod proxy;
pub mod state;
7 changes: 5 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
#![warn(clippy::all)]
#![warn(clippy::pedantic)]
#![allow(dead_code)]
mod admin;

use anyhow::{Result, bail};
use laminar::{
admin,
config::{loader::load_config, validator::validate_config},
health::tcp::start_health_checker,
metrics,
proxy::tcp::start_tcp_proxy,
state::app::{AppState, SharedAppState},
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
mod common;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt().json().with_current_span(true).with_span_list(true).init();
Expand All @@ -36,6 +37,8 @@ async fn main() -> Result<()> {
bail!("no upstreams configured");
}

metrics::registry::initialize_metrics();

let shared_state: SharedAppState = Arc::new(RwLock::new(state));

let health_state = shared_state.clone();
Expand Down
1 change: 1 addition & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod registry;
45 changes: 45 additions & 0 deletions src/metrics/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Registry, TextEncoder};
use std::sync::OnceLock;
pub static REGISTRY: OnceLock<Registry> = OnceLock::new();
pub static TOTAL_REQUESTS: OnceLock<IntCounterVec> = OnceLock::new();
pub static FAILED_REQUESTS: OnceLock<IntCounterVec> = OnceLock::new();
pub static ACTIVE_CONNECTIONS: OnceLock<IntGaugeVec> = OnceLock::new();

pub fn initialize_metrics() {
let registry = Registry::new();

let total_requests = IntCounterVec::new(
prometheus::Opts::new("laminar_total_requests", "Total successful requests"),
&["backend"],
)
.unwrap();

let failed_requests = IntCounterVec::new(
prometheus::Opts::new("laminar_failed_requests", "Total failed requests"),
&["backend"],
)
.unwrap();

let active_connections = IntGaugeVec::new(
prometheus::Opts::new("laminar_active_connections", "Current active connections"),
&["backend"],
)
.unwrap();

registry.register(Box::new(total_requests.clone())).unwrap();
registry.register(Box::new(failed_requests.clone())).unwrap();
registry.register(Box::new(active_connections.clone())).unwrap();

REGISTRY.set(registry).unwrap();
TOTAL_REQUESTS.set(total_requests).unwrap();
FAILED_REQUESTS.set(failed_requests).unwrap();
ACTIVE_CONNECTIONS.set(active_connections).unwrap();
}

pub fn gather_metrics() -> String {
let encoder = TextEncoder::new();
let metric_families = REGISTRY.get().unwrap().gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
Loading
Loading