SignalR-inspired WebSocket hub abstraction for Axum - broadcast, unicast, JSON/Binary codecs. Passes 10k connections tests.
Working with raw WebSocket in Axum requires a lot of boilerplate - managing connections, handling heartbeats, implementing broadcast logic, dealing with serialization. axum-signal takes inspiration from SignalR and brings a clean hub abstraction on top of Axum's WebSocket support.
You define your message types and handle logic. The library handles everything else.
[dependencies]
axum-signal = "0.1.2"Or directly from GitHub:
[dependencies]
axum-signal = { git = "https://github.com/rdcm/axum-signal" }Define your messages and implement WsHub:
use axum_signal::{WsHub, MessageContext, JsonCodec};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
pub struct HelloMessage {
pub text: String,
}
#[derive(Serialize)]
pub enum HelloReply {
Ok(String),
Err(String),
}
pub struct HelloHub;
impl WsHub for HelloHub {
type Codec = JsonCodec;
type InMessage = HelloMessage;
type OutMessage = HelloReply;
async fn on_message(&self, msg: Self::InMessage, ctx: MessageContext<Self::OutMessage, Self::Codec>) {
ctx.broadcast(HelloReply::Ok(msg.text)).await
}
}Wire it up in your Axum router:
pub fn hello_router() -> Router<AppState> {
Router::new().route(
"/ws",
get(
|ws: WebSocketUpgrade, State(state): State<AppState>| async move {
let config = WsHubConfig::default();
ws.on_upgrade(move |socket| async move {
serve_hub(socket, HelloHub::new(state), &config).await
})
},
),
)
}To pass data from Axum extractors into your hub:
pub fn hello_router() -> Router<AppState> {
Router::new().route(
"/ws",
get(
|ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(params): Query<WsQueryParams>| async move {
// Extractors are resolved here, before the upgrade.
// Pass the extracted data into the hub constructor.
let config = WsHubConfig::default();
ws.on_upgrade(move |socket| async move {
serve_hub(socket, HelloHub::new(state, params), &config).await
})
},
),
)
}That's it. No connection management, no heartbeat setup, no serialization boilerplate.
Typed messages - define your InMessage and OutMessage as plain Rust structs. Serialization is handled automatically.
Pluggable codecs - switch between JSON and Binary with a single type change:
// JSON over text frames
type Codec = JsonCodec;
// Binary via postcard
type Codec = BinaryCodec;Broadcast, unicast, groups - flexible targeting:
async fn on_message(&self, msg: Self::InMessage, ctx: MessageContext<Self::OutMessage, Self::Codec>) {
// to all clients
ctx.broadcast(MyReply::Ok(msg.text.clone())).await;
// to sender only
ctx.unicast(MyReply::Ok(msg.text.clone())).await;
// to all except specific connections
ctx.broadcast_except(&["conn-id-1", "conn-id-2"], MyReply::Ok(msg.text.clone())).await;
// group management
ctx.add_to_group("room-1").await;
ctx.remove_from_group("room-1").await;
// to all in a group
ctx.broadcast_group("room-1", MyReply::Ok(msg.text.clone())).await;
// to a group except specific connections
ctx.broadcast_group_except("room-1", &["conn-id-1"], MyReply::Ok(msg.text.clone())).await;
// to all in multiple groups (each connection receives at most once)
ctx.broadcast_groups(&["room-1", "room-2"], MyReply::Ok(msg.text)).await;
}Broadcast delivery policies - control what happens when a client is too slow to consume messages:
use axum_signal::{BroadcastPolicy, WsHubConfig};
use std::time::Duration;
// Wait indefinitely — guarantees delivery, one slow client blocks the rest (default)
let config = WsHubConfig {
policy: BroadcastPolicy::Block,
..WsHubConfig::default()
};
// Drop message after timeout, keep connection alive
let config = WsHubConfig {
policy: BroadcastPolicy::DropMessage {
timeout: Duration::from_millis(100),
},
..WsHubConfig::default()
};
// Drop message after timeout, disconnect after N consecutive drops
let config = WsHubConfig {
policy: BroadcastPolicy::DropConnection {
timeout: Duration::from_millis(100),
max_drops: 5,
},
..WsHubConfig::default()
};
// Disconnect if the rolling-average RTT over the last `window` heartbeats exceeds `max_rtt`.
// Until the first pong arrives the connection is treated as healthy.
let config = WsHubConfig {
policy: BroadcastPolicy::DropOnHighRtt {
max_rtt: Duration::from_millis(300),
rtt_samples: 8,
},
..WsHubConfig::default()
};Applies to all multi-client operations: broadcast_except, broadcast_group, broadcast_group_except, broadcast_groups. Plain broadcast uses a tokio::broadcast channel and handles lagged receivers separately.
Override on_message_drop to react to dropped messages — e.g. to increment a metric:
async fn on_message_drop(&self, connection_id: Arc<str>) {
metrics::counter!("ws.dropped_messages").increment(1);
}Default lifecycle hooks - override only what you need:
impl WsHub for HelloHub {
type Codec = JsonCodec;
type InMessage = HelloMessage;
type OutMessage = HelloReply;
// optional - default logs connection id
async fn on_connect(&self, req: ConnectionRequest) {
tracing::info!("new connection: {}", req.connection_id);
}
// optional - default logs connection id
async fn on_disconnect(&self, req: DisconnectRequest) {
tracing::info!("disconnected: {}", req.connection_id);
}
// optional - default logs a warning
async fn on_message_drop(&self, connection_id: Arc<str>) {
metrics::counter!("ws.dropped_messages").increment(1);
}
async fn on_message(&self, msg: Self::InMessage, ctx: MessageContext<Self::OutMessage, Self::Codec>) {
ctx.unicast(HelloReply::Ok(msg.text)).await;
}
}Enable the client feature:
[dependencies]
axum-signal = { version = "0.1.2", features = ["client"] }HubClient mirrors the hub's type parameters — S is the message type sent to the server (InMessage), R is the message type received from the server (OutMessage), and C is the codec. All three must match the server hub.
use axum_signal::{HubClient, JsonCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
pub struct HelloMessage {
pub text: String,
}
#[derive(Deserialize, Debug)]
pub enum HelloReply {
Ok(String),
Err(String),
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = HubClient::builder("ws://localhost:3000/ws")
.with_in_message::<HelloMessage>()
.with_out_message::<HelloReply>()
.with_codec::<JsonCodec>()
.build();
client.on_message(|reply: HelloReply| println!("{reply:?}"));
client.connect().await?;
client.send(HelloMessage { text: "hello".into() })?;
// sends a Close frame and waits for the server to echo it back
client.disconnect().await;
Ok(())
}Calling disconnect() performs the WebSocket closing handshake — it sends a Close frame, waits for the writer task to flush it, then waits for the reader task to receive the server's Close response. This avoids the Connection reset without closing handshake warning on the server side.
If disconnect() is not called, Drop will still send the Close frame on a best-effort basis, but without waiting for the handshake to complete.
Tested with k6 on AMD Ryzen 9 (32 cores), 32GB RAM.
ulimit -n 65535 && k6 run benchmarks/10k_connections.js
/\ Grafana /‾‾/
/\ / \ |\ __ / /
/ \/ \ | |/ / / ‾‾\
/ \ | ( | (‾) |
/ __________ \ |_|\_\ \_____/
execution: local
script: benchmarks/10k_connections.js
output: -
scenarios: (100.00%) 1 scenario, 10000 max VUs, 1m30s max duration (incl. graceful stop):
* default: 10000 looping VUs for 1m0s (gracefulStop: 30s)
█ TOTAL RESULTS
checks_total.......: 170000 1888.350122/s
checks_succeeded...: 100.00% 170000 out of 170000
checks_failed......: 0.00% 0 out of 170000
✓ got response
EXECUTION
vus................: 10000 min=10000 max=10000
vus_max............: 10000 min=10000 max=10000
NETWORK
data_received......: 5.4 MB 60 kB/s
data_sent..........: 6.8 MB 75 kB/s
WEBSOCKET
ws_connecting......: avg=156.49ms min=454.96µs med=137.77ms max=401.14ms p(90)=259.56ms p(95)=288.33ms
ws_msgs_received...: 170000 1888.350122/s
ws_msgs_sent.......: 170001 1888.36123/s
ws_sessions........: 10000 111.079419/s
running (1m30.0s), 00000/10000 VUs, 0 complete and 10000 interrupted iterations
default ✓ [======================================] 10000 VUs 1m0s
ulimit -n 65535 && k6 run benchmarks/10k_connections.js
/\ Grafana /‾‾/
/\ / \ |\ __ / /
/ \/ \ | |/ / / ‾‾\
/ \ | ( | (‾) |
/ __________ \ |_|\_\ \_____/
execution: local
script: benchmarks/10k_connections.js
output: -
scenarios: (100.00%) 1 scenario, 10000 max VUs, 1m30s max duration (incl. graceful stop):
* default: 10000 looping VUs for 1m0s (gracefulStop: 30s)
█ TOTAL RESULTS
checks_total.......: 40295409 447473.026684/s
checks_succeeded...: 100.00% 40295409 out of 40295409
checks_failed......: 0.00% 0 out of 40295409
✓ got response
EXECUTION
vus................: 10000 min=10000 max=10000
vus_max............: 10000 min=10000 max=10000
NETWORK
data_received......: 911 MB 10 MB/s
data_sent..........: 6.8 MB 75 kB/s
WEBSOCKET
ws_connecting......: avg=155.42ms min=55.24ms med=143.95ms max=317.78ms p(90)=225.3ms p(95)=230.77ms
ws_msgs_received...: 40300506 447529.627922/s
ws_msgs_sent.......: 170000 1887.8184/s
ws_sessions........: 10000 111.048141/s
running (1m30.1s), 00000/10000 VUs, 0 complete and 10000 interrupted iterations
default ✓ [======================================] 10000 VUs 1m0s