Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench/bench.just
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ build-tc:

cargo-build-musl:
echo "Building musl binary (slim)..."
cross build --release --target "{{ musl_target }}" -p wallhack-cli --no-default-features --features slim
cargo build --release --target "{{ musl_target }}" -p wallhack-cli --no-default-features --features slim

clean:
rm -rf "{{ staging_dir }}"
Expand Down
4 changes: 2 additions & 2 deletions bench/vm/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ _run_exit() {

echo "WALLHACK_TS: exit_wallhack_start=$(date +%s%3N)"
# wallhack exit node — connects to entry (retries with backoff)
wallhack daemon ${DEBUG:+"--debug"} exit \
wallhack daemon ${DEBUG:+"--debug"} --role exit \
-c "${ENTRY_ETH}:${WH_PORT}${_TSUFFIX}" \
--name "${PEER_NAME}" \
2>&1 | tee /tmp/wallhack-exit.log &
Expand Down Expand Up @@ -304,7 +304,7 @@ _run_entry() {

echo "WALLHACK_TS: entry_wallhack_start=$(date +%s%3N)"
# Start wallhack entry node (listen mode)
wallhack daemon ${DEBUG:+"--debug"} entry \
wallhack daemon ${DEBUG:+"--debug"} --role entry \
-l ":${WH_PORT}${_TSUFFIX}" \
2>&1 | tee /tmp/wallhack-entry.log &

Expand Down
34 changes: 26 additions & 8 deletions crates/api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ pub struct StatsResponse {
pub packets_out: u64,
pub active_connections: u64,
pub active_flows: u64,
/// Total packets dropped since daemon start.
pub packets_dropped: u64,
/// Total connections opened since daemon start (monotonically increasing).
pub total_connections: u64,
/// Total flows opened since daemon start (monotonically increasing).
pub total_flows: u64,
}

impl From<wallhack_wire::management::StatsResponse> for StatsResponse {
fn from(s: wallhack_wire::management::StatsResponse) -> Self {
Self {
bytes_in: s.bytes_in,
bytes_out: s.bytes_out,
packets_in: s.packets_in,
packets_out: s.packets_out,
active_connections: s.active_connections,
active_flows: s.active_flows,
packets_dropped: s.packets_dropped,
total_connections: s.total_connections,
total_flows: s.total_flows,
}
}
}

/// Node info response.
Expand Down Expand Up @@ -244,14 +266,10 @@ pub async fn stats(State(state): State<ApiState>) -> Result<Json<StatsResponse>,
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

match resp.response {
Some(management_response::Response::Stats(s)) => Ok(Json(StatsResponse {
bytes_in: s.bytes_in,
bytes_out: s.bytes_out,
packets_in: s.packets_in,
packets_out: s.packets_out,
active_connections: s.active_connections,
active_flows: s.active_flows,
})),
Some(management_response::Response::Stats(s)) => {
let response: StatsResponse = s.into();
Ok(Json(response))
}
_ => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub fn print_response(resp: &ManagementResponse) -> Result<(), CtlError> {
let _ = writeln!(tw, "connections:\t{}", s.active_connections);
let _ = writeln!(tw, "flows:\t{}", s.active_flows);
let _ = writeln!(tw, "dropped:\t{}", s.packets_dropped);
let _ = writeln!(tw, "total connections:\t{}", s.total_connections);
let _ = writeln!(tw, "total flows:\t{}", s.total_flows);
let _ = tw.flush();
}
Some(management_response::Response::Peers(p)) => {
Expand Down
138 changes: 116 additions & 22 deletions crates/core/src/control/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::{net::SocketAddr, sync::Arc, time::Instant};

use arc_swap::ArcSwap;
use tokio::sync::watch;
use tokio::sync::mpsc;
use wallhack_wire::{
control::{
ControlRequest, ControlResponse, ErrorResponse, PeerInfo, PingResponse, RouteInfo,
Expand All @@ -21,6 +21,51 @@ use super::{
log_buffer::LogBuffer, metrics::SharedMetrics, peers::SharedRegistry, routes::SharedRouteTable,
};

/// Reply channel for a single command.
///
/// Uses a standard-library sync channel so the handler side (which may be
/// called from a synchronous context) can block waiting for the reply without
/// needing an async runtime handle.
type ReplySender<T> = std::sync::mpsc::SyncSender<Result<T, crate::node_api::NodeApiError>>;

/// Create a reply channel pair for a node command.
fn reply_channel<T>() -> (
ReplySender<T>,
std::sync::mpsc::Receiver<Result<T, crate::node_api::NodeApiError>>,
) {
std::sync::mpsc::sync_channel(1)
}

/// A command sent from the handler/API layer to the mode task via the
/// control watch channel.
#[derive(Debug)]
pub enum NodeCommand {
/// Set or clear the role hint.
Role {
/// `Some` to set a hint, `None` to clear (auto).
hint: Option<RoleHint>,
},
/// Connect to a remote peer at the given address.
Connect {
/// Target address (host, host:port, etc.).
addr: String,
/// Channel for sending the result back to the caller.
reply: ReplySender<crate::node_api::ConnectInfo>,
},
/// Start listening for incoming peer connections.
Listen {
/// Address to bind.
addr: SocketAddr,
/// Channel for sending the result back to the caller.
reply: ReplySender<crate::node_api::ListenInfo>,
},
/// Disconnect from the currently connected peer.
Disconnect {
/// Channel for sending the result back to the caller.
reply: ReplySender<()>,
},
}

/// Mutable runtime state that can change after construction.
///
/// Stored behind `ArcSwap` for wait-free reads (same pattern as `Registry`).
Expand Down Expand Up @@ -112,9 +157,12 @@ impl HandlerConfig {
/// on the current state of metrics and configuration.
pub struct Handler {
config: HandlerConfig,
/// Sender for hint changes. The mode task watches the receiver and
/// re-evaluates when a new hint arrives. `None` means no hint is active.
hint_tx: watch::Sender<Option<RoleHint>>,
/// Command channel to the mode task. Carries role changes, connect,
/// listen, and disconnect commands.
command_source: mpsc::Sender<NodeCommand>,
/// Receiver side of the command channel. Extracted once by the daemon
/// before wrapping Handler in `Arc<dyn NodeApi>`.
command_sink: std::sync::Mutex<Option<mpsc::Receiver<NodeCommand>>>,
log_buffer: LogBuffer,
metrics: SharedMetrics,
peers: SharedRegistry,
Expand All @@ -140,10 +188,11 @@ impl Handler {
log_buffer: Option<LogBuffer>,
) -> Self {
let state = SharedNodeState::new(config.node_role);
let (hint_tx, _) = watch::channel(None);
let (command_source, command_sink) = mpsc::channel(8);
Self {
config,
hint_tx,
command_source,
command_sink: std::sync::Mutex::new(Some(command_sink)),
log_buffer: log_buffer.unwrap_or_default(),
metrics,
peers,
Expand All @@ -164,10 +213,19 @@ impl Handler {
self.state.clone()
}

/// Returns a receiver that fires when the runtime hint changes.
/// Extracts the command receiver. Called once by the daemon before
/// wrapping Handler in `Arc<dyn NodeApi>`.
///
/// # Panics
///
/// Panics if the mutex is poisoned or if called more than once.
#[must_use]
pub fn hint_rx(&self) -> watch::Receiver<Option<RoleHint>> {
self.hint_tx.subscribe()
pub fn command_sink(&self) -> mpsc::Receiver<NodeCommand> {
self.command_sink
.lock()
.expect("command_sink mutex poisoned")
.take()
.expect("command_sink already taken")
}

/// Handles a control request and returns a response.
Expand Down Expand Up @@ -419,25 +477,57 @@ impl crate::node_api::NodeApi for Handler {
}
}

fn connect(&self, _addr: &str) -> crate::node_api::Result<crate::node_api::ConnectInfo> {
Err(crate::node_api::NodeApiError::NotSupported(
"dynamic connect not yet implemented — specify --connect at startup".into(),
))
fn connect(&self, addr: &str) -> crate::node_api::Result<crate::node_api::ConnectInfo> {
let (reply_sender, reply_receiver) = reply_channel();
self.command_source
.try_send(NodeCommand::Connect {
addr: addr.to_string(),
reply: reply_sender,
})
.map_err(|_| {
crate::node_api::NodeApiError::NotSupported(
"dynamic connect not supported in this mode".into(),
)
})?;
tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| {
crate::node_api::NodeApiError::Internal("mode task dropped reply".into())
})?
}

fn listen(
&self,
_addr: std::net::SocketAddr,
addr: std::net::SocketAddr,
) -> crate::node_api::Result<crate::node_api::ListenInfo> {
Err(crate::node_api::NodeApiError::NotSupported(
"dynamic listen not yet implemented — specify --listen at startup".into(),
))
let (reply_sender, reply_receiver) = reply_channel();
self.command_source
.try_send(NodeCommand::Listen {
addr,
reply: reply_sender,
})
.map_err(|_| {
crate::node_api::NodeApiError::NotSupported(
"dynamic listen not supported in this mode".into(),
)
})?;
tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| {
crate::node_api::NodeApiError::Internal("mode task dropped reply".into())
})?
}

fn disconnect(&self) -> crate::node_api::Result<()> {
Err(crate::node_api::NodeApiError::NotSupported(
"dynamic disconnect not yet implemented".into(),
))
let (reply_sender, reply_receiver) = reply_channel();
self.command_source
.try_send(NodeCommand::Disconnect {
reply: reply_sender,
})
.map_err(|_| {
crate::node_api::NodeApiError::NotSupported(
"dynamic disconnect not supported in this mode".into(),
)
})?;
tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| {
crate::node_api::NodeApiError::Internal("mode task dropped reply".into())
})?
}

fn add_route(
Expand Down Expand Up @@ -509,12 +599,16 @@ impl crate::node_api::NodeApi for Handler {
}

fn hint_set(&self, hint: RoleHint) -> crate::node_api::Result<()> {
self.hint_tx.send_replace(Some(hint));
let _ = self
.command_source
.try_send(NodeCommand::Role { hint: Some(hint) });
Ok(())
}

fn hint_set_auto(&self) -> crate::node_api::Result<()> {
self.hint_tx.send_replace(None);
let _ = self
.command_source
.try_send(NodeCommand::Role { hint: None });
Ok(())
}

Expand Down
64 changes: 64 additions & 0 deletions crates/core/src/control/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct Metrics {
active_connections: AtomicU64,
active_flows: AtomicU64,
packets_dropped: AtomicU64,
/// Monotonically increasing count of all connections ever opened.
total_connections: AtomicU64,
/// Monotonically increasing count of all flows ever opened.
total_flows: AtomicU64,
}

impl Metrics {
Expand All @@ -31,6 +35,8 @@ impl Metrics {
active_connections: self.active_connections.load(Ordering::Relaxed),
active_flows: self.active_flows.load(Ordering::Relaxed),
packets_dropped: self.packets_dropped.load(Ordering::Relaxed),
total_connections: self.total_connections.load(Ordering::Relaxed),
total_flows: self.total_flows.load(Ordering::Relaxed),
}
}

Expand All @@ -52,6 +58,7 @@ impl Metrics {

pub fn inc_active_connections(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
}

pub fn dec_active_connections(&self) {
Expand All @@ -64,6 +71,7 @@ impl Metrics {

pub fn inc_active_flows(&self) {
self.active_flows.fetch_add(1, Ordering::Relaxed);
self.total_flows.fetch_add(1, Ordering::Relaxed);
}

pub fn dec_active_flows(&self) {
Expand All @@ -72,3 +80,59 @@ impl Metrics {
}

pub type SharedMetrics = Arc<Metrics>;

#[cfg(test)]
mod tests {
use super::Metrics;

#[test]
fn total_connections_increments_and_does_not_decrement() {
let metrics = Metrics::new();

metrics.inc_active_connections();
assert_eq!(metrics.snapshot().total_connections, 1);
assert_eq!(metrics.snapshot().active_connections, 1);

metrics.dec_active_connections();
// active_connections decrements, total_connections must not
assert_eq!(metrics.snapshot().total_connections, 1);
assert_eq!(metrics.snapshot().active_connections, 0);
}

#[test]
fn total_flows_increments_and_does_not_decrement() {
let metrics = Metrics::new();

metrics.inc_active_flows();
assert_eq!(metrics.snapshot().total_flows, 1);
assert_eq!(metrics.snapshot().active_flows, 1);

metrics.dec_active_flows();
// active_flows decrements, total_flows must not
assert_eq!(metrics.snapshot().total_flows, 1);
assert_eq!(metrics.snapshot().active_flows, 0);
}

#[test]
fn cumulative_counters_survive_connection_churn() {
let metrics = Metrics::new();

// Simulate 5 connection open/close cycles
for _ in 0..5 {
metrics.inc_active_connections();
metrics.dec_active_connections();
}

assert_eq!(metrics.snapshot().total_connections, 5);
assert_eq!(metrics.snapshot().active_connections, 0);

// Simulate 3 flow open/close cycles
for _ in 0..3 {
metrics.inc_active_flows();
metrics.dec_active_flows();
}

assert_eq!(metrics.snapshot().total_flows, 3);
assert_eq!(metrics.snapshot().active_flows, 0);
}
}
Loading
Loading