Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
562 changes: 561 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ license = "MIT"

[dependencies]
anyhow = "1.0.102"
axum = "0.8.9"
futures = "0.3.32"
serde = {version ="1.0.228",features = ["derive"]}
serde_json = "1.0.150"
serde_yaml = {version = "0.9.34"}
thiserror = "2.0.18"
tokio = {version = "1.52.3", features = ["full","net"]}
tracing = "0.1.44"
tracing-subscriber = "0.3.23"
tracing-subscriber = {version="0.3.23", features = ["json"]}
uuid = {version="1.23.2",features=["v4"]}



14 changes: 7 additions & 7 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
- [x] Track active connections per backend
- [x] Increment on connect
- [x] Decrement on disconnect
- [ ] Track total requests

---

Expand Down Expand Up @@ -149,11 +148,12 @@

## Metrics

- [ ] Active connection metrics
- [x] Active connection metrics
- [ ] Request counters
- [ ] Failure counters
- [ ] Backend health metrics
- [x] Failure counters
- [x] Backend health metrics
- [ ] Throughput metrics
- [x] Track total requests

---

Expand All @@ -167,8 +167,8 @@

## Logging Improvements

- [ ] Structured JSON logs
- [ ] Request correlation IDs
- [x] Structured JSON logs
- [x] Request correlation IDs
- [x] Retry logging
- [x] Timeout logging
- [x] Backend transition logging
Expand Down Expand Up @@ -198,7 +198,7 @@
- [ ] Add runtime status endpoint
- [ ] Add backend health endpoint
- [ ] Add backend enable/disable API
- [ ] Add metrics endpoint
- [x] Add metrics endpoint

---

Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ allow = [
"MIT",
"Apache-2.0",
"Unicode-3.0",
"BSD-3-Clause"
]
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
Expand Down
65 changes: 65 additions & 0 deletions src/admin/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::atomic::Ordering;

use axum::{Json, Router, extract::State, routing::get};

use laminar::state::app::SharedAppState;
use serde::Serialize;
use tokio::net::TcpListener;

#[derive(Serialize)]
struct BackendMetrics {
id: String,
healthy: bool,
active_connections: usize,
total_requests: usize,
failed_requests: usize,
}

#[derive(Serialize)]
struct UpstreamMetrics {
id: String,
algorithm: String,
backends: Vec<BackendMetrics>,
}

#[derive(Serialize)]
struct MetricsResponse {
upstreams: Vec<UpstreamMetrics>,
}

async fn metrics_handler(State(state): State<SharedAppState>) -> Json<MetricsResponse> {
let state = state.read().await;

let upstreams = state
.upstreams
.iter()
.map(|upstream| {
let backends = upstream
.backends
.iter()
.map(|backend| BackendMetrics {
id: backend.config.id.clone(),
healthy: backend.healthy.load(Ordering::Relaxed),
active_connections: backend.active_connections.load(Ordering::Relaxed),
total_requests: backend.total_requests.load(Ordering::Relaxed),
failed_requests: backend.failed_requests.load(Ordering::Relaxed),
})
.collect();

UpstreamMetrics {
id: upstream.id.clone(),
algorithm: format!("{:?}", upstream.algorithm),
backends,
}
})
.collect();

Json(MetricsResponse { upstreams })
}

pub async fn start_admin_server(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let app = Router::new().route("/metrics", get(metrics_handler)).with_state(state);
let listener = TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
Ok(())
}
1 change: 1 addition & 0 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod http;
48 changes: 1 addition & 47 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1 @@
// // server state will have .. ip and port ( we can get that form the siurces.yml , check if it is active (up) and the number of connections )

// pub mod types;
// use std::{ fs::File, net::SocketAddr };
// use std::result::Result;
// use anyhow::{ self, Ok };
// use futures::future::join_all;
// use tokio::net::TcpStream;

// pub async fn check_servers_health() -> Result<Vec<Server>, anyhow::Error> {
// let servers = parse_yaml()?;

// // try to connect each one and filter out the dead ones
// let futures = servers.into_iter().map(|mut s| async {
// let addr: SocketAddr = s.ip.parse().expect("the address is not valid");
// // let res = TcpStream::connect(addr).await;

// // match res {
// // Ok(_) => {
// // s.can_connect = true;
// // }
// // Err(_) => {
// // s.can_connect = false;
// // }
// // }

// // shortcut
// s.can_connect = TcpStream::connect(addr).await.is_ok();

// s
// });

// // run the futures as the async blocks are lazy
// let res = join_all(futures).await;
// Ok(res)
// }

// pub async fn active_servers() -> Result<Vec<Server>, anyhow::Error> {
// let servers = check_servers_health().await?;

// Ok(
// servers
// .into_iter()
// .filter(|item| item.can_connect)
// .collect()
// )
// }
pub mod shutdown;
6 changes: 6 additions & 0 deletions src/common/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use tracing::info;

pub async fn shutdown_signal() {
tokio::signal::ctrl_c().await.expect("failed to listen for shutdown signal");
info!("shutdown signal received");
}
2 changes: 1 addition & 1 deletion src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct UpstreamConfig {
pub servers: Vec<BackendServerConfig>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum LoadBalancingAlgorithm {
RoundRobin,
Expand Down
17 changes: 11 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![warn(clippy::all)]
#![warn(clippy::pedantic)]

#![allow(dead_code)]
mod admin;
use anyhow::{Result, bail};
use laminar::{
config::{loader::load_config, validator::validate_config},
Expand All @@ -11,10 +12,10 @@ use laminar::{
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;

mod common;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
tracing_subscriber::fmt().json().with_current_span(true).with_span_list(true).init();

let path = std::env::args().nth(1).unwrap_or_else(|| "laminar_config.yaml".to_string());

Expand All @@ -36,11 +37,15 @@ async fn main() -> Result<()> {
}

let shared_state: SharedAppState = Arc::new(RwLock::new(state));
// for upstream in &state.upstreams {
// info!("upstream '{}' initialized with {} backends", upstream.id, upstream.backends.len());
// }

let health_state = shared_state.clone();
let admin_state = shared_state.clone();

tokio::spawn(async move {
if let Err(error) = admin::http::start_admin_server("127.0.0.1:9090", admin_state).await {
tracing::error!("admin server failed: {:?}", error);
}
});
tokio::spawn(async move {
start_health_checker(health_state, health_interval).await;
});
Expand Down
60 changes: 50 additions & 10 deletions src/proxy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::common::shutdown::shutdown_signal;
use crate::state::app::SharedAppState;
use crate::state::backend::ConnectionGuard;
use std::{collections::HashSet, time::Duration};
Expand All @@ -7,26 +8,43 @@ use tokio::{
time::timeout,
};
use tracing::{error, info};
use uuid::Uuid;

pub async fn start_tcp_proxy(address: &str, state: SharedAppState) -> anyhow::Result<()> {
let listener = TcpListener::bind(address).await?;

info!("tcp proxy listening on {}", address);

loop {
let (client_stream, client_address) = listener.accept().await?;
tokio::select! {
result = listener.accept() => {
let (client_stream, client_address) = result?;
info!(
client = %client_address,
"new client connected"
);
let state = state.clone();
tokio::spawn(async move {
if let Err(error) = handle_connection(client_stream,state).await
{
error!("connection handling failed {:?}",error);
}
});
}

info!("new client connected {}", client_address);
let state = state.clone();
tokio::spawn(async move {
if let Err(error) = handle_connection(client_stream, state).await {
error!("connection handling failed {:?}", error)
_ = shutdown_signal() => {
info!("tcp proxy shutting down");
break;
}
});
}
}
info!("tcp proxy stopped accepting new connections");

Ok(())
}

pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) -> anyhow::Result<()> {
let request_id = Uuid::new_v4();
let (retry_attempt, connect_timeout, idle_timeout) = {
let state = state.read().await;
(state.retry_attempts, state.connect_timeout, state.idle_timeout)
Expand All @@ -49,7 +67,10 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) ->
let backend_arc = match upstream.next_backend() {
Some(backend) => backend,
None => {
error!("no healthy backend available");
error!(
request_id = %request_id,
"no healthy backend available"
);
return Ok(());
}
};
Expand All @@ -61,15 +82,34 @@ pub async fn handle_connection(mut stream: TcpStream, state: SharedAppState) ->
continue;
}

info!("forwarding traffic to {}", backend_address);
info!(
request_id = %request_id,
backend_id = %guard.backend_id(),
backend = %backend_address,
"forwarding traffic"
);

match proxy_connection(&mut stream, &backend_address, connect_timeout, idle_timeout).await {
Ok(_) => {
info!(
request_id = %request_id,
backend_id = %guard.backend_id(),
"request completed"
);
guard.backend().increment_total_requests();
return Ok(());
}
Err(error) => {
guard.backend().increment_failed_requests();
guard.mark_backend_unhealthy();
error!(backend_id = %guard.backend_id(),backend = %backend_address,attempt = attempted_backends.len() + 1,"backend request failed: {:?}",error);
error!(
request_id = %request_id,
backend_id = %guard.backend_id(),
backend = %backend_address,
attempt = attempted_backends.len() + 1,
error = %error,
"backend request failed"
);
attempted_backends.insert(guard.backend_id().to_string());
continue;
}
Expand Down
17 changes: 17 additions & 0 deletions src/state/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct BackendState {
// This becomes important for least-connections balancing.
pub active_connections: AtomicUsize,
pub failed_health_checks: usize,

pub total_requests: AtomicUsize,
pub failed_requests: AtomicUsize,
}

impl ConnectionGuard {
Expand All @@ -43,6 +46,10 @@ impl ConnectionGuard {
pub fn address(&self) -> String {
format!("{}:{}", self.backend.config.host, self.backend.config.port)
}

pub fn backend(&self) -> &BackendState {
&self.backend
}
pub fn mark_backend_unhealthy(&self) {
self.backend.healthy.store(false, Ordering::Relaxed);
}
Expand All @@ -63,9 +70,19 @@ impl BackendState {
healthy: AtomicBool::new(true),
active_connections: AtomicUsize::new(0),
failed_health_checks: 0,
total_requests: AtomicUsize::new(0),
failed_requests: AtomicUsize::new(0),
}
}

pub fn increment_total_requests(&self) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
}

pub fn increment_failed_requests(&self) {
self.failed_requests.fetch_add(1, Ordering::Relaxed);
}

pub fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
Expand Down
2 changes: 2 additions & 0 deletions tests/connect_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ async fn marks_backend_unhealthy_on_connect_timeout() {
healthy: AtomicBool::new(true),
active_connections: AtomicUsize::new(0),
failed_health_checks: 0,
failed_requests: AtomicUsize::new(0),
total_requests: AtomicUsize::new(0),
});

let guard = ConnectionGuard::new(backend.clone());
Expand Down
Loading
Loading