diff --git a/bench/bench.just b/bench/bench.just index de2d3f9e..763fe5c0 100644 --- a/bench/bench.just +++ b/bench/bench.just @@ -107,7 +107,7 @@ build-tc: cargo-build-musl: echo "Building musl binary (slim)..." - cross build --release --target "{{ musl_target }}" -p wallhack-cli --no-default-features --features slim + cargo build --release --target "{{ musl_target }}" -p wallhack-cli --no-default-features --features slim clean: rm -rf "{{ staging_dir }}" diff --git a/bench/vm/init.sh b/bench/vm/init.sh index 1a03f00d..81c88156 100755 --- a/bench/vm/init.sh +++ b/bench/vm/init.sh @@ -150,7 +150,7 @@ _run_exit() { echo "WALLHACK_TS: exit_wallhack_start=$(date +%s%3N)" # wallhack exit node — connects to entry (retries with backoff) - wallhack daemon ${DEBUG:+"--debug"} exit \ + wallhack daemon ${DEBUG:+"--debug"} --role exit \ -c "${ENTRY_ETH}:${WH_PORT}${_TSUFFIX}" \ --name "${PEER_NAME}" \ 2>&1 | tee /tmp/wallhack-exit.log & @@ -304,7 +304,7 @@ _run_entry() { echo "WALLHACK_TS: entry_wallhack_start=$(date +%s%3N)" # Start wallhack entry node (listen mode) - wallhack daemon ${DEBUG:+"--debug"} entry \ + wallhack daemon ${DEBUG:+"--debug"} --role entry \ -l ":${WH_PORT}${_TSUFFIX}" \ 2>&1 | tee /tmp/wallhack-entry.log & diff --git a/crates/api/src/handlers.rs b/crates/api/src/handlers.rs index 0cde8583..1377c0bf 100644 --- a/crates/api/src/handlers.rs +++ b/crates/api/src/handlers.rs @@ -27,6 +27,28 @@ pub struct StatsResponse { pub packets_out: u64, pub active_connections: u64, pub active_flows: u64, + /// Total packets dropped since daemon start. + pub packets_dropped: u64, + /// Total connections opened since daemon start (monotonically increasing). + pub total_connections: u64, + /// Total flows opened since daemon start (monotonically increasing). + pub total_flows: u64, +} + +impl From for StatsResponse { + fn from(s: wallhack_wire::management::StatsResponse) -> Self { + Self { + bytes_in: s.bytes_in, + bytes_out: s.bytes_out, + packets_in: s.packets_in, + packets_out: s.packets_out, + active_connections: s.active_connections, + active_flows: s.active_flows, + packets_dropped: s.packets_dropped, + total_connections: s.total_connections, + total_flows: s.total_flows, + } + } } /// Node info response. @@ -244,14 +266,10 @@ pub async fn stats(State(state): State) -> Result, .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; match resp.response { - Some(management_response::Response::Stats(s)) => Ok(Json(StatsResponse { - bytes_in: s.bytes_in, - bytes_out: s.bytes_out, - packets_in: s.packets_in, - packets_out: s.packets_out, - active_connections: s.active_connections, - active_flows: s.active_flows, - })), + Some(management_response::Response::Stats(s)) => { + let response: StatsResponse = s.into(); + Ok(Json(response)) + } _ => Err(StatusCode::INTERNAL_SERVER_ERROR), } } diff --git a/crates/cli/src/output.rs b/crates/cli/src/output.rs index 63af274e..ae8d4f4e 100644 --- a/crates/cli/src/output.rs +++ b/crates/cli/src/output.rs @@ -128,6 +128,8 @@ pub fn print_response(resp: &ManagementResponse) -> Result<(), CtlError> { let _ = writeln!(tw, "connections:\t{}", s.active_connections); let _ = writeln!(tw, "flows:\t{}", s.active_flows); let _ = writeln!(tw, "dropped:\t{}", s.packets_dropped); + let _ = writeln!(tw, "total connections:\t{}", s.total_connections); + let _ = writeln!(tw, "total flows:\t{}", s.total_flows); let _ = tw.flush(); } Some(management_response::Response::Peers(p)) => { diff --git a/crates/core/src/control/handler.rs b/crates/core/src/control/handler.rs index f35de827..dc858254 100644 --- a/crates/core/src/control/handler.rs +++ b/crates/core/src/control/handler.rs @@ -6,7 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Instant}; use arc_swap::ArcSwap; -use tokio::sync::watch; +use tokio::sync::mpsc; use wallhack_wire::{ control::{ ControlRequest, ControlResponse, ErrorResponse, PeerInfo, PingResponse, RouteInfo, @@ -21,6 +21,51 @@ use super::{ log_buffer::LogBuffer, metrics::SharedMetrics, peers::SharedRegistry, routes::SharedRouteTable, }; +/// Reply channel for a single command. +/// +/// Uses a standard-library sync channel so the handler side (which may be +/// called from a synchronous context) can block waiting for the reply without +/// needing an async runtime handle. +type ReplySender = std::sync::mpsc::SyncSender>; + +/// Create a reply channel pair for a node command. +fn reply_channel() -> ( + ReplySender, + std::sync::mpsc::Receiver>, +) { + std::sync::mpsc::sync_channel(1) +} + +/// A command sent from the handler/API layer to the mode task via the +/// control watch channel. +#[derive(Debug)] +pub enum NodeCommand { + /// Set or clear the role hint. + Role { + /// `Some` to set a hint, `None` to clear (auto). + hint: Option, + }, + /// Connect to a remote peer at the given address. + Connect { + /// Target address (host, host:port, etc.). + addr: String, + /// Channel for sending the result back to the caller. + reply: ReplySender, + }, + /// Start listening for incoming peer connections. + Listen { + /// Address to bind. + addr: SocketAddr, + /// Channel for sending the result back to the caller. + reply: ReplySender, + }, + /// Disconnect from the currently connected peer. + Disconnect { + /// Channel for sending the result back to the caller. + reply: ReplySender<()>, + }, +} + /// Mutable runtime state that can change after construction. /// /// Stored behind `ArcSwap` for wait-free reads (same pattern as `Registry`). @@ -112,9 +157,12 @@ impl HandlerConfig { /// on the current state of metrics and configuration. pub struct Handler { config: HandlerConfig, - /// Sender for hint changes. The mode task watches the receiver and - /// re-evaluates when a new hint arrives. `None` means no hint is active. - hint_tx: watch::Sender>, + /// Command channel to the mode task. Carries role changes, connect, + /// listen, and disconnect commands. + command_source: mpsc::Sender, + /// Receiver side of the command channel. Extracted once by the daemon + /// before wrapping Handler in `Arc`. + command_sink: std::sync::Mutex>>, log_buffer: LogBuffer, metrics: SharedMetrics, peers: SharedRegistry, @@ -140,10 +188,11 @@ impl Handler { log_buffer: Option, ) -> Self { let state = SharedNodeState::new(config.node_role); - let (hint_tx, _) = watch::channel(None); + let (command_source, command_sink) = mpsc::channel(8); Self { config, - hint_tx, + command_source, + command_sink: std::sync::Mutex::new(Some(command_sink)), log_buffer: log_buffer.unwrap_or_default(), metrics, peers, @@ -164,10 +213,19 @@ impl Handler { self.state.clone() } - /// Returns a receiver that fires when the runtime hint changes. + /// Extracts the command receiver. Called once by the daemon before + /// wrapping Handler in `Arc`. + /// + /// # Panics + /// + /// Panics if the mutex is poisoned or if called more than once. #[must_use] - pub fn hint_rx(&self) -> watch::Receiver> { - self.hint_tx.subscribe() + pub fn command_sink(&self) -> mpsc::Receiver { + self.command_sink + .lock() + .expect("command_sink mutex poisoned") + .take() + .expect("command_sink already taken") } /// Handles a control request and returns a response. @@ -419,25 +477,57 @@ impl crate::node_api::NodeApi for Handler { } } - fn connect(&self, _addr: &str) -> crate::node_api::Result { - Err(crate::node_api::NodeApiError::NotSupported( - "dynamic connect not yet implemented — specify --connect at startup".into(), - )) + fn connect(&self, addr: &str) -> crate::node_api::Result { + let (reply_sender, reply_receiver) = reply_channel(); + self.command_source + .try_send(NodeCommand::Connect { + addr: addr.to_string(), + reply: reply_sender, + }) + .map_err(|_| { + crate::node_api::NodeApiError::NotSupported( + "dynamic connect not supported in this mode".into(), + ) + })?; + tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| { + crate::node_api::NodeApiError::Internal("mode task dropped reply".into()) + })? } fn listen( &self, - _addr: std::net::SocketAddr, + addr: std::net::SocketAddr, ) -> crate::node_api::Result { - Err(crate::node_api::NodeApiError::NotSupported( - "dynamic listen not yet implemented — specify --listen at startup".into(), - )) + let (reply_sender, reply_receiver) = reply_channel(); + self.command_source + .try_send(NodeCommand::Listen { + addr, + reply: reply_sender, + }) + .map_err(|_| { + crate::node_api::NodeApiError::NotSupported( + "dynamic listen not supported in this mode".into(), + ) + })?; + tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| { + crate::node_api::NodeApiError::Internal("mode task dropped reply".into()) + })? } fn disconnect(&self) -> crate::node_api::Result<()> { - Err(crate::node_api::NodeApiError::NotSupported( - "dynamic disconnect not yet implemented".into(), - )) + let (reply_sender, reply_receiver) = reply_channel(); + self.command_source + .try_send(NodeCommand::Disconnect { + reply: reply_sender, + }) + .map_err(|_| { + crate::node_api::NodeApiError::NotSupported( + "dynamic disconnect not supported in this mode".into(), + ) + })?; + tokio::task::block_in_place(|| reply_receiver.recv()).map_err(|_| { + crate::node_api::NodeApiError::Internal("mode task dropped reply".into()) + })? } fn add_route( @@ -509,12 +599,16 @@ impl crate::node_api::NodeApi for Handler { } fn hint_set(&self, hint: RoleHint) -> crate::node_api::Result<()> { - self.hint_tx.send_replace(Some(hint)); + let _ = self + .command_source + .try_send(NodeCommand::Role { hint: Some(hint) }); Ok(()) } fn hint_set_auto(&self) -> crate::node_api::Result<()> { - self.hint_tx.send_replace(None); + let _ = self + .command_source + .try_send(NodeCommand::Role { hint: None }); Ok(()) } diff --git a/crates/core/src/control/metrics.rs b/crates/core/src/control/metrics.rs index 70425e26..4386eaf8 100644 --- a/crates/core/src/control/metrics.rs +++ b/crates/core/src/control/metrics.rs @@ -14,6 +14,10 @@ pub struct Metrics { active_connections: AtomicU64, active_flows: AtomicU64, packets_dropped: AtomicU64, + /// Monotonically increasing count of all connections ever opened. + total_connections: AtomicU64, + /// Monotonically increasing count of all flows ever opened. + total_flows: AtomicU64, } impl Metrics { @@ -31,6 +35,8 @@ impl Metrics { active_connections: self.active_connections.load(Ordering::Relaxed), active_flows: self.active_flows.load(Ordering::Relaxed), packets_dropped: self.packets_dropped.load(Ordering::Relaxed), + total_connections: self.total_connections.load(Ordering::Relaxed), + total_flows: self.total_flows.load(Ordering::Relaxed), } } @@ -52,6 +58,7 @@ impl Metrics { pub fn inc_active_connections(&self) { self.active_connections.fetch_add(1, Ordering::Relaxed); + self.total_connections.fetch_add(1, Ordering::Relaxed); } pub fn dec_active_connections(&self) { @@ -64,6 +71,7 @@ impl Metrics { pub fn inc_active_flows(&self) { self.active_flows.fetch_add(1, Ordering::Relaxed); + self.total_flows.fetch_add(1, Ordering::Relaxed); } pub fn dec_active_flows(&self) { @@ -72,3 +80,59 @@ impl Metrics { } pub type SharedMetrics = Arc; + +#[cfg(test)] +mod tests { + use super::Metrics; + + #[test] + fn total_connections_increments_and_does_not_decrement() { + let metrics = Metrics::new(); + + metrics.inc_active_connections(); + assert_eq!(metrics.snapshot().total_connections, 1); + assert_eq!(metrics.snapshot().active_connections, 1); + + metrics.dec_active_connections(); + // active_connections decrements, total_connections must not + assert_eq!(metrics.snapshot().total_connections, 1); + assert_eq!(metrics.snapshot().active_connections, 0); + } + + #[test] + fn total_flows_increments_and_does_not_decrement() { + let metrics = Metrics::new(); + + metrics.inc_active_flows(); + assert_eq!(metrics.snapshot().total_flows, 1); + assert_eq!(metrics.snapshot().active_flows, 1); + + metrics.dec_active_flows(); + // active_flows decrements, total_flows must not + assert_eq!(metrics.snapshot().total_flows, 1); + assert_eq!(metrics.snapshot().active_flows, 0); + } + + #[test] + fn cumulative_counters_survive_connection_churn() { + let metrics = Metrics::new(); + + // Simulate 5 connection open/close cycles + for _ in 0..5 { + metrics.inc_active_connections(); + metrics.dec_active_connections(); + } + + assert_eq!(metrics.snapshot().total_connections, 5); + assert_eq!(metrics.snapshot().active_connections, 0); + + // Simulate 3 flow open/close cycles + for _ in 0..3 { + metrics.inc_active_flows(); + metrics.dec_active_flows(); + } + + assert_eq!(metrics.snapshot().total_flows, 3); + assert_eq!(metrics.snapshot().active_flows, 0); + } +} diff --git a/crates/core/src/ipc.rs b/crates/core/src/ipc.rs index e3a10f85..51425160 100644 --- a/crates/core/src/ipc.rs +++ b/crates/core/src/ipc.rs @@ -304,16 +304,7 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen } Some(management_request::Request::Stats(_)) => { - let m = api.metrics(); - management_response::Response::Stats(StatsResponse { - bytes_in: m.bytes_in, - bytes_out: m.bytes_out, - packets_in: m.packets_in, - packets_out: m.packets_out, - active_connections: m.active_connections, - active_flows: m.active_flows, - packets_dropped: m.packets_dropped, - }) + management_response::Response::Stats(api.metrics().into()) } Some(management_request::Request::Peers(_)) => { @@ -556,6 +547,22 @@ impl From for management::PeerInfo { } } +impl From for StatsResponse { + fn from(m: crate::node_api::Metrics) -> Self { + Self { + bytes_in: m.bytes_in, + bytes_out: m.bytes_out, + packets_in: m.packets_in, + packets_out: m.packets_out, + active_connections: m.active_connections, + active_flows: m.active_flows, + packets_dropped: m.packets_dropped, + total_connections: m.total_connections, + total_flows: m.total_flows, + } + } +} + impl From for management::RouteEntry { fn from(r: crate::node_api::RouteEntry) -> Self { let elapsed = r.create_time.elapsed(); diff --git a/crates/core/src/node_api.rs b/crates/core/src/node_api.rs index 7d294728..f5796673 100644 --- a/crates/core/src/node_api.rs +++ b/crates/core/src/node_api.rs @@ -78,6 +78,10 @@ pub struct Metrics { pub active_connections: u64, pub active_flows: u64, pub packets_dropped: u64, + /// Total connections opened since daemon start (monotonically increasing). + pub total_connections: u64, + /// Total flows opened since daemon start (monotonically increasing). + pub total_flows: u64, } /// Overall node info. diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 1dd99d41..ff35911a 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -164,6 +164,7 @@ pub fn start_node( log_buffer, ); let node_state = handler.node_state(); + let command_sink = handler.command_sink(); let node_api: Arc = Arc::new(handler); let (shutdown_tx, _shutdown_rx) = watch::channel(()); @@ -177,6 +178,7 @@ pub fn start_node( route_updates: route_update_rx, route_updates_tx: route_update_tx, node_state, + command_sink, }; let task = tokio::spawn(async move { mode::run(&config, resources).await.map_err(Into::into) }); diff --git a/crates/daemon/src/mode/auto.rs b/crates/daemon/src/mode/auto.rs index 0f7eb83d..697b4649 100644 --- a/crates/daemon/src/mode/auto.rs +++ b/crates/daemon/src/mode/auto.rs @@ -19,7 +19,7 @@ use wallhack_core::{ handler::SharedNodeState, metrics::Metrics, peers::{ConnectionSide, Registry}, - routes::SharedRouteTable, + routes::{RouteUpdate, SharedRouteTable}, }, entry::manager::ConnectionManager, exit::{net::SyscallExitAdapter, orchestrator::Orchestrator}, @@ -33,19 +33,47 @@ use crate::{ NodeError, address_spec::{AddressSpec, Protocol}, config::SecurityParams, - daemon_config::{AutoConfig, GlobalConfig, RelayConfig}, + daemon_config::{AutoConfig, GlobalConfig}, tun_cap::detect_tun_capable, }; /// Reconnect delay for auto-connector sessions. const RECONNECT_DELAY: Duration = Duration::from_millis(500); +/// Shared context for all auto-mode internal functions. +/// +/// Bundles the state that would otherwise be passed as individual arguments +/// to every function in the auto-mode call tree. +struct AutoContext { + global: GlobalConfig, + cfg: AutoConfig, + metrics: Arc, + peers: Arc, + routes: SharedRouteTable, + route_updates_tx: tokio::sync::broadcast::Sender, + node_state: SharedNodeState, + tun_capable: bool, +} + +impl AutoContext { + fn route_updates(&self) -> tokio::sync::broadcast::Receiver { + self.route_updates_tx.subscribe() + } + + fn security(&self) -> SecurityParams { + SecurityParams { + psk: self.global.psk.clone(), + accept_fingerprint: self.cfg.accept_fingerprint.clone(), + } + } +} + /// Run in auto-negotiation mode. /// /// # Errors /// /// Returns error if the connection setup fails non-retryably. -// REASON: threading metrics, peers, routes, route_updates, route_updates_tx, node_state through mode dispatch +// REASON: threading metrics, peers, routes, route_updates_tx, command_sink, node_state through mode dispatch #[allow(clippy::too_many_arguments)] pub(crate) async fn run( global: &GlobalConfig, @@ -53,8 +81,8 @@ pub(crate) async fn run( metrics: Arc, peers: Arc, routes: SharedRouteTable, - route_updates: tokio::sync::broadcast::Receiver, - route_updates_tx: tokio::sync::broadcast::Sender, + route_updates_tx: tokio::sync::broadcast::Sender, + command_sink: tokio::sync::mpsc::Receiver, node_state: SharedNodeState, ) -> Result<(), NodeError> { let tun_capable = detect_tun_capable(); @@ -83,55 +111,46 @@ pub(crate) async fn run( tracing::info!("Eligible roles: {}", eligible.join(", ")); // Set initial capabilities; role stays Indeterminate until negotiation. + // For the both-connect-and-listen path we start with `listening: false` + // and set it to `true` once the listener successfully binds. let interactive = std::io::IsTerminal::is_terminal(&std::io::stdin()); + let initial_listening = cfg.listen.is_some() && cfg.connect.is_none(); node_state.update_capabilities(Capabilities { tun_capable, - listening: cfg.listen.is_some(), + listening: initial_listening, connecting: cfg.connect.is_some(), interactive, }); - match (&cfg.connect, &cfg.listen) { + let ctx = Arc::new(AutoContext { + global: global.clone(), + cfg: cfg.clone(), + metrics, + peers, + routes, + route_updates_tx, + node_state, + tun_capable, + }); + + match (&ctx.cfg.connect, &ctx.cfg.listen) { (Some(connect), Some(listen)) => { - // Both connect and listen → relay role (no negotiation needed). - tracing::info!("Both connect and listen addresses provided: running as relay"); - node_state.update_role(NodeRole::Relay); - let relay_cfg = RelayConfig { - name: cfg.name.clone(), - connect: connect.clone(), - listen: listen.clone(), - accept_fingerprint: cfg.accept_fingerprint.clone(), - }; - super::relay::run(global, &relay_cfg, metrics, peers, node_state).await + // Both connect and listen → start as exit, promote to relay when + // the listener has a second peer connected. + tracing::info!( + "Both connect and listen addresses provided: starting as exit, promoting to relay on listener peer" + ); + run_connect_listen_relay_promotable(Arc::clone(&ctx), connect, listen).await } (Some(connect), None) => { - run_auto_connector( - global, - cfg, - connect, - tun_capable, - metrics, - peers, - routes, - route_updates, - node_state, - ) - .await + // Connector-only path: run the connector as a task and poll + // command_sink for dynamic listen/disconnect commands. + run_auto_connector_with_commands(Arc::clone(&ctx), connect, command_sink).await } (None, Some(listen)) => { - run_auto_listener( - global, - cfg, - listen, - tun_capable, - metrics, - peers, - routes, - route_updates, - route_updates_tx, - node_state, - ) - .await + // Listener-only path: run the listener as a task and poll + // command_sink for dynamic connect/disconnect commands. + run_auto_listener_with_commands(Arc::clone(&ctx), listen, command_sink).await } (None, None) => Err(NodeError::Config( "auto mode requires a connect or listen address".into(), @@ -139,6 +158,181 @@ pub(crate) async fn run( } } +/// Connector-only path with command integration. +/// +/// Spawns the connector as a task, then polls `command_sink` for dynamic +/// commands. `Listen` commands start a listener task alongside the +/// connector. `Disconnect` commands abort the active connector. +// REASON: too_many_lines: symmetric listen/connect/disconnect command arms with distinct spawn logic +#[allow(clippy::too_many_lines)] +async fn run_auto_connector_with_commands( + ctx: Arc, + connect: &AddressSpec, + mut command_sink: tokio::sync::mpsc::Receiver, +) -> Result<(), NodeError> { + use wallhack_core::{control::handler::NodeCommand, node_api::NodeApiError}; + + let connect = connect.clone(); + + let connector_task: tokio::task::JoinHandle> = { + let ctx = Arc::clone(&ctx); + let connect = connect.clone(); + tokio::spawn(async move { run_auto_connector(Arc::clone(&ctx), &connect).await }) + }; + + // Listener task spawned on demand by a `Listen` command. + let mut listener_task: Option>> = None; + + tokio::pin!(connector_task); + + loop { + // Poll the listener task for completion (log errors, clear handle). + if let Some(ref mut lt) = listener_task + && lt.is_finished() + { + if let Ok(Err(e)) = lt.await { + tracing::warn!("Dynamic listener exited with error: {e}"); + } + listener_task = None; + } + + tokio::select! { + result = &mut connector_task => { + if let Some(lt) = listener_task.take() { + lt.abort(); + } + return result?; + } + Some(cmd) = command_sink.recv() => { + match cmd { + NodeCommand::Role { .. } => { + // Role changes not yet handled in connector path. + } + NodeCommand::Connect { reply, .. } => { + let _ = reply.send(Err(NodeApiError::AlreadyConnected)); + } + NodeCommand::Listen { addr, reply } => { + if listener_task.as_ref().is_some_and(|lt| !lt.is_finished()) { + let _ = reply.send(Err(NodeApiError::AlreadyListening)); + continue; + } + let ctx = Arc::clone(&ctx); + let listen_spec = AddressSpec { + addr: addr.to_string(), + protocol: connect.protocol, + }; + let handle = tokio::spawn(async move { + run_auto_listener(Arc::clone(&ctx), &listen_spec).await + }); + let listen_info = wallhack_core::node_api::ListenInfo { + listen_addr: addr, + protocol: format!("{:?}", connect.protocol), + fingerprint: String::new(), + }; + let _ = reply.send(Ok(listen_info)); + listener_task = Some(handle); + } + NodeCommand::Disconnect { reply } => { + connector_task.abort(); + let _ = reply.send(Ok(())); + if let Some(lt) = listener_task.take() { + lt.abort(); + } + return Ok(()); + } + } + } + } + } +} + +/// Listener-only path with command integration. +/// +/// Spawns the listener as a task, then polls `command_sink` for dynamic +/// commands. `Connect` commands spawn a connector task alongside the +/// listener. `Disconnect` commands abort the active connector. +// REASON: too_many_lines: symmetric connect/listen/disconnect command arms with distinct spawn logic +#[allow(clippy::too_many_lines)] +async fn run_auto_listener_with_commands( + ctx: Arc, + listen: &AddressSpec, + mut command_sink: tokio::sync::mpsc::Receiver, +) -> Result<(), NodeError> { + use wallhack_core::{control::handler::NodeCommand, node_api::NodeApiError}; + + let listen = listen.clone(); + + let listener_task: tokio::task::JoinHandle> = { + let ctx = Arc::clone(&ctx); + let listen = listen.clone(); + tokio::spawn(async move { run_auto_listener(Arc::clone(&ctx), &listen).await }) + }; + + // Connector task spawned on demand by a `Connect` command. + let mut connector_task: Option>> = None; + + tokio::pin!(listener_task); + + loop { + // Poll the connector task for completion (log errors, clear handle). + if let Some(ref mut ct) = connector_task + && ct.is_finished() + { + if let Ok(Err(e)) = ct.await { + tracing::warn!("Dynamic connector exited with error: {e}"); + } + connector_task = None; + } + + tokio::select! { + result = &mut listener_task => { + if let Some(ct) = connector_task.take() { + ct.abort(); + } + return result?; + } + Some(cmd) = command_sink.recv() => { + match cmd { + NodeCommand::Role { .. } => { + // Role changes not yet handled in listener path. + } + NodeCommand::Listen { reply, .. } => { + let _ = reply.send(Err(NodeApiError::AlreadyListening)); + } + NodeCommand::Connect { addr, reply } => { + if connector_task.as_ref().is_some_and(|ct| !ct.is_finished()) { + let _ = reply.send(Err(NodeApiError::AlreadyConnected)); + continue; + } + let ctx = Arc::clone(&ctx); + let connect_spec = AddressSpec { + addr: addr.clone(), + protocol: listen.protocol, + }; + let handle = tokio::spawn(async move { + run_auto_connector(Arc::clone(&ctx), &connect_spec).await + }); + let connect_info = wallhack_core::node_api::ConnectInfo { + peer_addr: addr, + protocol: format!("{:?}", listen.protocol), + }; + let _ = reply.send(Ok(connect_info)); + connector_task = Some(handle); + } + NodeCommand::Disconnect { reply } => { + if let Some(ct) = connector_task.take() { + ct.abort(); + let _ = reply.send(Ok(())); + } else { + let _ = reply.send(Err(NodeApiError::NotConnected)); + } + } + } + } + } + } +} + /// Build a local `Handshake` for capability advertisement. /// /// Always populates `routes` with locally-routable CIDRs so that a peer @@ -206,24 +400,14 @@ fn install_advertised_routes( // ============================================================================ /// Auto connector: connect to a peer, negotiate role, run the session. -// REASON: threading transport, metrics, peers, routes, route_updates through protocol-specific quic/ws arms -#[allow(clippy::too_many_lines, clippy::too_many_arguments)] -async fn run_auto_connector( - global: &GlobalConfig, - cfg: &AutoConfig, - spec: &AddressSpec, - tun_capable: bool, - metrics: Arc, - peers: Arc, - routes: SharedRouteTable, - route_updates: tokio::sync::broadcast::Receiver, - node_state: SharedNodeState, -) -> Result<(), NodeError> { +// REASON: too_many_lines: symmetric quic/ws dispatch arms, each with factory and session closures +#[allow(clippy::too_many_lines)] +async fn run_auto_connector(ctx: Arc, spec: &AddressSpec) -> Result<(), NodeError> { let local_hs = build_local_handshake( - cfg, - &global.version, + &ctx.cfg, + &ctx.global.version, Capabilities { - tun_capable, + tun_capable: ctx.tun_capable, listening: false, connecting: true, interactive: std::io::IsTerminal::is_terminal(&std::io::stdin()), @@ -232,30 +416,25 @@ async fn run_auto_connector( tracing::info!("Auto connector: connecting to {}...", spec.addr); let endpoint = - crate::transport::resolve_endpoint(&spec.addr, global.dns_server.as_deref()).await?; + crate::transport::resolve_endpoint(&spec.addr, ctx.global.dns_server.as_deref()).await?; let peer_addr = endpoint.to_string(); - let security = SecurityParams { - psk: global.psk.clone(), - accept_fingerprint: cfg.accept_fingerprint.clone(), - }; - - // route_updates is a Receiver. We need to pass fresh receivers to the loop. + let security = ctx.security(); match spec.protocol { Protocol::Udp => { #[cfg(feature = "quic")] { let client_config = crate::config::build_quic_client_config( - global, + &ctx.global, endpoint, - Some(cfg.name.clone()), + Some(ctx.cfg.name.clone()), &security, Some(local_hs.clone()), ); - let route_updates = route_updates.resubscribe(); // Clone peers for the factory closure; the session closure moves the original. - let peers_for_factory = Arc::clone(&peers); + let peers_for_factory = Arc::clone(&ctx.peers); + let ctx_session = Arc::clone(&ctx); crate::transport::connect_loop( || { let client_config = client_config.clone(); @@ -271,23 +450,15 @@ async fn run_auto_connector( move |connect_result| { // erase() is sync — runs before async move captures anything generic let connect_result = connect_result.erase(); - let metrics = Arc::clone(&metrics); - let peers = Arc::clone(&peers); + let ctx = Arc::clone(&ctx_session); let peer_addr = peer_addr.clone(); let local_hs = local_hs.clone(); - let node_state = node_state.clone(); - let routes = Arc::clone(&routes); - let route_updates = route_updates.resubscribe(); async move { run_auto_connect_session_dispatch( connect_result, &local_hs, &peer_addr, - metrics, - peers, - node_state, - Some(routes), - Some(route_updates), + Arc::clone(&ctx), ) .await } @@ -303,15 +474,15 @@ async fn run_auto_connector( #[cfg(feature = "websocket")] { let client_config = crate::config::build_ws_client_config( - global, + &ctx.global, endpoint, - Some(cfg.name.clone()), + Some(ctx.cfg.name.clone()), &security, Some(local_hs.clone()), ); - let route_updates = route_updates.resubscribe(); // Clone peers for the factory closure; the session closure moves the original. - let peers_for_factory = Arc::clone(&peers); + let peers_for_factory = Arc::clone(&ctx.peers); + let ctx_session = Arc::clone(&ctx); crate::transport::connect_loop( || { let client_config = client_config.clone(); @@ -326,23 +497,15 @@ async fn run_auto_connector( move |connect_result| { // erase() is sync — runs before async move captures anything generic let connect_result = connect_result.erase(); - let metrics = Arc::clone(&metrics); - let peers = Arc::clone(&peers); + let ctx = Arc::clone(&ctx_session); let peer_addr = peer_addr.clone(); let local_hs = local_hs.clone(); - let node_state = node_state.clone(); - let routes = Arc::clone(&routes); - let route_updates = route_updates.resubscribe(); async move { run_auto_connect_session_dispatch( connect_result, &local_hs, &peer_addr, - metrics, - peers, - node_state, - Some(routes), - Some(route_updates), + Arc::clone(&ctx), ) .await } @@ -358,19 +521,13 @@ async fn run_auto_connector( } /// Non-generic auto-connector dispatch: negotiates role and runs the session. -// REASON: symmetric entry/exit/relay/indeterminate negotiation arms, each with distinct session logic -#[allow(clippy::too_many_arguments, clippy::too_many_lines)] +// REASON: too_many_lines: symmetric entry/exit/relay/indeterminate negotiation arms, each with distinct session logic +#[allow(clippy::too_many_lines)] async fn run_auto_connect_session_dispatch( connect_result: wallhack_core::client::client::ErasedConnectResult, local_hs: &Handshake, peer_addr: &str, - metrics: Arc, - peers: Arc, - node_state: SharedNodeState, - routes: Option, - route_updates: Option< - tokio::sync::broadcast::Receiver, - >, + ctx: Arc, ) -> Result<(), NodeError> { let wallhack_core::client::client::ErasedConnectResult { peer_handshake_rx, @@ -411,7 +568,7 @@ async fn run_auto_connect_session_dispatch( NegotiationResult::Resolved { role, .. } => *role, NegotiationResult::Indeterminate { .. } => NodeRole::Indeterminate, }; - node_state.update_role(negotiated_role); + ctx.node_state.update_role(negotiated_role); let peer_role = super::peer_role_from_capabilities(peer_hs.capabilities.unwrap_or_default()); tracing::info!( "Role resolved: peer={} addr={peer_addr} local_role={negotiated_role} peer_role={peer_role}", @@ -427,16 +584,14 @@ async fn run_auto_connect_session_dispatch( // applies routes from the table when it creates the TUN, so they // must be in the table before we call it. let peer_name = peer_hs.name.as_str(); - let routes = routes.as_ref().map(Arc::clone); + let routes = Some(Arc::clone(&ctx.routes)); let tun_name = if peer_name.is_empty() { None } else { Some(super::entry::peer_name_to_iface(peer_name)) }; - if let Some(ref r) = routes - && !peer_name.is_empty() - { - install_advertised_routes(r, peer_name, &peer_hs.routes); + if !peer_name.is_empty() { + install_advertised_routes(&ctx.routes, peer_name, &peer_hs.routes); } drop(tasks); @@ -446,12 +601,12 @@ async fn run_auto_connect_session_dispatch( instructions_rx, responses_rx, control_tx, - &metrics, + &ctx.metrics, peer_addr, Some(peer_name), - Some(Arc::clone(&peers)), + Some(Arc::clone(&ctx.peers)), routes.clone(), - route_updates, + Some(ctx.route_updates()), ) .await; @@ -501,7 +656,7 @@ async fn run_auto_connect_session_dispatch( } drop(tasks); let heartbeat = - super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); run_auto_exit_session_inner( transport, instructions_rx, @@ -511,8 +666,7 @@ async fn run_auto_connect_session_dispatch( peer_caps, &peer_name, peer_addr, - &metrics, - &peers, + &ctx, ) .await } @@ -537,16 +691,17 @@ async fn run_auto_connect_session_dispatch( peer_hs.name.clone() }; let peer_caps = peer_hs.capabilities.unwrap_or_default(); - peers.register( + ctx.peers.register( name.clone(), peer_addr.to_string(), NodeRole::Indeterminate, peer_caps, wallhack_core::control::peers::ConnectionSide::Connect, ); - let _heartbeat = super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&peers)); + let _heartbeat = + super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&ctx.peers)); hold_until_disconnect(tasks).await; - peers.unregister(&name); + ctx.peers.unregister(&name); tracing::info!("Peer disconnected: {name}"); Ok(()) } @@ -566,7 +721,7 @@ async fn hold_until_disconnect(mut tasks: wallhack_core::client::client::Connect } /// Non-generic exit session handler for the auto-connector path. -// REASON: threading transport, instructions, responses, heartbeat, role, caps, peer info, metrics, peers +// REASON: threading transport, instructions, responses, heartbeat, peer role/caps/name/addr, ctx #[allow(clippy::too_many_arguments)] async fn run_auto_exit_session_inner( transport: Arc, @@ -577,10 +732,9 @@ async fn run_auto_exit_session_inner( peer_caps: Capabilities, peer_name: &str, peer_addr: &str, - metrics: &Arc, - peers: &Arc, + ctx: &Arc, ) -> Result<(), NodeError> { - peers.register( + ctx.peers.register( peer_name.to_string(), peer_addr.to_string(), peer_role, @@ -593,7 +747,7 @@ async fn run_auto_exit_session_inner( std::time::Duration::from_mins(1), std::time::Duration::from_mins(5), ); - let orchestrator = Orchestrator::new(Arc::new(adapter), Arc::clone(metrics)); + let orchestrator = Orchestrator::new(Arc::new(adapter), Arc::clone(&ctx.metrics)); let stream_fut = super::exit::run_stream_listener(transport); let drive_fut = orchestrator.drive(responses_tx, instructions_rx); @@ -610,7 +764,7 @@ async fn run_auto_exit_session_inner( } } - peers.unregister(peer_name); + ctx.peers.unregister(peer_name); tracing::info!("Peer disconnected: {peer_name}"); Ok(()) } @@ -620,25 +774,12 @@ async fn run_auto_exit_session_inner( // ============================================================================ /// Auto listener: accept connections, negotiate role, dispatch. -// REASON: threading metrics, peers, routes, route_updates, route_updates_tx, node_state through listener -#[allow(clippy::too_many_arguments)] -async fn run_auto_listener( - global: &GlobalConfig, - cfg: &AutoConfig, - spec: &AddressSpec, - tun_capable: bool, - metrics: Arc, - peers: Arc, - routes: SharedRouteTable, - route_updates: tokio::sync::broadcast::Receiver, - route_updates_tx: tokio::sync::broadcast::Sender, - node_state: SharedNodeState, -) -> Result<(), NodeError> { +async fn run_auto_listener(ctx: Arc, spec: &AddressSpec) -> Result<(), NodeError> { let local_hs = build_local_handshake( - cfg, - &global.version, + &ctx.cfg, + &ctx.global.version, Capabilities { - tun_capable, + tun_capable: ctx.tun_capable, listening: true, connecting: false, interactive: std::io::IsTerminal::is_terminal(&std::io::stdin()), @@ -650,18 +791,16 @@ async fn run_auto_listener( handler_config: wallhack_core::control::handler::HandlerConfig::new( NodeRole::Indeterminate, "wallhack".to_string(), - global.version.clone(), + ctx.global.version.clone(), ), - metrics: Some(Arc::clone(&metrics)), - peers: Some(Arc::clone(&peers)), - routes: Some(Arc::clone(&routes)), - route_updates: Some(route_updates_tx), + metrics: Some(Arc::clone(&ctx.metrics)), + peers: Some(Arc::clone(&ctx.peers)), + routes: Some(Arc::clone(&ctx.routes)), + route_updates: Some(ctx.route_updates_tx.clone()), local_handshake: Some(local_hs.clone()), }; let server_config = - crate::config::build_server_config(&global.tls, addr, global.psk.clone(), None); - - // route_updates is a Receiver. + crate::config::build_server_config(&ctx.global.tls, addr, ctx.global.psk.clone(), None); match spec.protocol { Protocol::Udp => { @@ -671,20 +810,9 @@ async fn run_auto_listener( wallhack_core::server::quic::QuicServer::try_new(server_config, server_options) .map_err(|e| NodeError::Transport(Box::new(e)))?; let bound = server.local_addr()?; - node_state.set_listen_addr(bound); - let route_updates = route_updates.resubscribe(); - let routes = Arc::clone(&routes); - run_auto_accept_loop( - server, - local_hs, - global.psk.clone(), - metrics, - peers, - routes, - route_updates, - node_state, - ) - .await + ctx.node_state.set_listen_addr(bound); + run_auto_accept_loop(server, local_hs, ctx.global.psk.clone(), Arc::clone(&ctx)) + .await } #[cfg(not(feature = "quic"))] Err(NodeError::TransportUnavailable("quic")) @@ -697,20 +825,9 @@ async fn run_auto_listener( server_options, )?; let bound = server.local_addr()?; - node_state.set_listen_addr(bound); - let route_updates = route_updates.resubscribe(); - let routes = Arc::clone(&routes); - run_auto_accept_loop( - server, - local_hs, - global.psk.clone(), - metrics, - peers, - routes, - route_updates, - node_state, - ) - .await + ctx.node_state.set_listen_addr(bound); + run_auto_accept_loop(server, local_hs, ctx.global.psk.clone(), Arc::clone(&ctx)) + .await } #[cfg(not(feature = "websocket"))] Err(NodeError::TransportUnavailable("websocket")) @@ -719,17 +836,11 @@ async fn run_auto_listener( } /// Accept loop for auto-negotiation listener. -// REASON: threading local_hs, psk, metrics, peers, routes, route_updates, node_state through generic accept loop -#[allow(clippy::too_many_arguments)] async fn run_auto_accept_loop( mut server: S, local_hs: Handshake, server_psk: Option>, - metrics: Arc, - peers: Arc, - routes: SharedRouteTable, - route_updates: tokio::sync::broadcast::Receiver, - node_state: SharedNodeState, + ctx: Arc, ) -> Result<(), NodeError> where S::Error: std::error::Error + Send + Sync + 'static, @@ -786,11 +897,7 @@ where ) = accept_result.into_channels(); let local_hs = local_hs.clone(); - let metrics = Arc::clone(&metrics); - let peers = Arc::clone(&peers); - let routes = Arc::clone(&routes); - let route_updates = route_updates.resubscribe(); - let node_state = node_state.clone(); + let ctx = Arc::clone(&ctx); tokio::spawn(async move { if let Err(e) = run_auto_accept_session_inner( @@ -802,12 +909,8 @@ where control_tx, peer_hs, local_hs, - metrics, - peers, - Some(routes), - Some(route_updates), + ctx, peer_addr, - node_state, ) .await { @@ -828,11 +931,634 @@ where Ok(()) } +// ============================================================================ +// Connect+Listen relay-promotable path +// ============================================================================ + +/// Shared relay bridge state published by the connector and consumed by +/// the listener's accept loop. +/// +/// Created once per connector session (when the connector negotiates as relay) +/// and shared with the listener task. When the connector disconnects, the +/// watch channel is reset to `None`. +struct RelayBridge { + /// Sender for forwarding exit-peer responses to the source (entry) peer. + source_resp_tx: tokio::sync::mpsc::Sender, + /// Registration channel for the instruction fan-out task. + fanout_register_tx: tokio::sync::mpsc::UnboundedSender< + tokio::sync::mpsc::Sender, + >, + /// Watch channel for publishing the latest accepted peer transport (for bidi bridging). + peer_transport_tx: + tokio::sync::watch::Sender>>, + /// The connector's transport (for bidi bridging from source to peer). + source_transport: Arc, + /// Shutdown signal — dropped when the connector session ends. + shutdown_rx: tokio::sync::watch::Receiver<()>, +} + +/// Run the connect+listen relay-promotable auto mode. +/// +/// Starts both a connector (initially as exit) and a listener simultaneously. +/// When the listener is confirmed running, capabilities are updated to +/// include `listening: true`. On the next connector reconnect, the updated +/// capabilities cause the connector to negotiate as relay, enabling bridging. +/// +/// # Errors +/// +/// Returns error if a non-retryable connection error occurs. +async fn run_connect_listen_relay_promotable( + ctx: Arc, + connect_spec: &AddressSpec, + listen_spec: &AddressSpec, +) -> Result<(), NodeError> { + // Watch channel: None = no active connector session, Some = connector is + // running as relay and the bridge is ready for the listener to use. + let (bridge_tx, bridge_rx) = tokio::sync::watch::channel::>>(None); + + let addr: std::net::SocketAddr = listen_spec.addr.parse::()?.into(); + + // Build server options for the relay listener side. + // Accept-side handshake declares Fixed(Entry) so accepted peers resolve to + // Exit via complement (matching relay.rs semantics). + let server_options = { + use wallhack_core::control::handler::HandlerConfig; + ServerOptions { + handler_config: HandlerConfig::new( + NodeRole::Relay, + "wallhack".to_string(), + ctx.global.version.clone(), + ), + metrics: None, + peers: None, + routes: None, + route_updates: None, + local_handshake: Some(wallhack_wire::data::Handshake { + capabilities: Some(wallhack_wire::data::Capabilities { + tun_capable: false, + listening: true, + connecting: true, + interactive: std::io::IsTerminal::is_terminal(&std::io::stdin()), + }), + name: ctx.cfg.name.clone(), + version: ctx.global.version.clone(), + psk_proof: Vec::new(), + routes: Vec::new(), + hint: Some(wallhack_wire::data::RoleHint { + level: wallhack_wire::data::HintLevel::Fixed.into(), + target: wallhack_wire::data::NodeRole::RoleEntry.into(), + }), + }), + } + }; + + let server_config = + crate::config::build_server_config(&ctx.global.tls, addr, ctx.global.psk.clone(), None); + + let security = ctx.security(); + + let connect_endpoint = + crate::transport::resolve_endpoint(&connect_spec.addr, ctx.global.dns_server.as_deref()) + .await?; + + // Shared flag: set to `true` by the listener task once it has successfully + // bound to its port. The connector reads this to determine whether to + // advertise `listening: true` in its next handshake. + let listener_ready = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Spawn the listener task. It runs independently and bridges accepted + // peers to the connector session when a bridge is available. + let listener_task: tokio::task::JoinHandle> = { + let ctx = Arc::clone(&ctx); + let bridge_rx = bridge_rx.clone(); + let server_options = server_options.clone(); + let listen_spec_owned = listen_spec.clone(); + let listener_ready_flag = Arc::clone(&listener_ready); + + tokio::spawn(async move { + run_relay_promotable_listener( + listen_spec_owned, + server_config, + server_options, + ctx, + bridge_rx, + listener_ready_flag, + ) + .await + }) + }; + + // Connector loop: connect, negotiate, run as exit or relay. + // Capabilities start with `listening: false`; once the listener is up + // (signaled by `listener_ready`) we advertise `listening: true`. + let connector_result = run_relay_promotable_connector( + Arc::clone(&ctx), + connect_spec, + connect_endpoint, + security, + bridge_tx, + listener_ready, + ) + .await; + + // If the connector exits, abort the listener. + listener_task.abort(); + connector_result +} + +/// The connector half of the relay-promotable auto mode. +/// +/// Connects with `listening: false` initially. After the listener is running +/// (first relay-capable connect), advertises `listening: true` so the peer +/// negotiates us as relay on subsequent reconnects. +// REASON: too_many_lines: symmetric quic/ws dispatch arms, each with factory+session closures and TOCTOU-safe hs sharing +#[allow(clippy::too_many_lines)] +async fn run_relay_promotable_connector( + ctx: Arc, + connect_spec: &AddressSpec, + endpoint: std::net::SocketAddr, + security: SecurityParams, + bridge_tx: tokio::sync::watch::Sender>>, + listener_ready: Arc, +) -> Result<(), NodeError> { + let bridge_tx = Arc::new(bridge_tx); + + match connect_spec.protocol { + Protocol::Udp => { + #[cfg(feature = "quic")] + { + // Build a fresh client config before each attempt. Re-read + // listener_ready each attempt so that once the listener is up + // we advertise `listening: true` and negotiate as relay. + // + // The factory and session closures share `last_local_hs` so + // that the handshake used for TLS (built in factory) is + // identical to the one used for negotiation (read in session), + // eliminating the TOCTOU window between the two reads. + let last_local_hs: Arc>> = + Arc::new(std::sync::Mutex::new(None)); + let last_local_hs_factory = Arc::clone(&last_local_hs); + let last_local_hs_session = Arc::clone(&last_local_hs); + let listener_ready_factory = Arc::clone(&listener_ready); + let bridge_tx_arc = Arc::clone(&bridge_tx); + let peers_factory = Arc::clone(&ctx.peers); + let ctx_session = Arc::clone(&ctx); + + crate::transport::connect_loop( + || { + let is_listening = + listener_ready_factory.load(std::sync::atomic::Ordering::Acquire); + let current_caps = Capabilities { + tun_capable: ctx.tun_capable, + listening: is_listening, + connecting: true, + interactive: std::io::IsTerminal::is_terminal(&std::io::stdin()), + }; + let local_hs = + build_local_handshake(&ctx.cfg, &ctx.global.version, current_caps); + // Store for session closure — same value, no second read. + *last_local_hs_factory + .lock() + .expect("last_local_hs poisoned") = Some(local_hs.clone()); + let client_config = crate::config::build_quic_client_config( + &ctx.global, + endpoint, + Some(ctx.cfg.name.clone()), + &security, + Some(local_hs), + ); + let peers = Arc::clone(&peers_factory); + async move { + use wallhack_core::client::client::Client; + let mut client = + wallhack_core::client::quic::QuicClient::try_new(client_config)?; + client.peer_registry = Some(peers); + client.connect(NodeRole::Indeterminate).await + } + }, + move |connect_result| { + let connect_result = connect_result.erase(); + let ctx = Arc::clone(&ctx_session); + let bridge_tx = Arc::clone(&bridge_tx_arc); + // Use the handshake the factory already built — no second + // read of listener_ready, so there is no TOCTOU window. + let local_hs = last_local_hs_session + .lock() + .expect("last_local_hs poisoned") + .take() + .expect("factory always stores local_hs before session runs"); + async move { + run_relay_promotable_connector_session( + connect_result, + &local_hs, + ctx, + bridge_tx, + ) + .await + } + }, + RECONNECT_DELAY, + ) + .await + } + #[cfg(not(feature = "quic"))] + { + let _ = (bridge_tx, ctx); + Err(NodeError::TransportUnavailable("quic")) + } + } + Protocol::Tcp => { + #[cfg(feature = "websocket")] + { + // See QUIC arm above for the `last_local_hs` pattern rationale. + let last_local_hs: Arc>> = + Arc::new(std::sync::Mutex::new(None)); + let last_local_hs_factory = Arc::clone(&last_local_hs); + let last_local_hs_session = Arc::clone(&last_local_hs); + let listener_ready_factory = Arc::clone(&listener_ready); + let bridge_tx_arc = Arc::clone(&bridge_tx); + let peers_factory = Arc::clone(&ctx.peers); + let ctx_session = Arc::clone(&ctx); + + crate::transport::connect_loop( + || { + let is_listening = + listener_ready_factory.load(std::sync::atomic::Ordering::Acquire); + let current_caps = Capabilities { + tun_capable: ctx.tun_capable, + listening: is_listening, + connecting: true, + interactive: std::io::IsTerminal::is_terminal(&std::io::stdin()), + }; + let local_hs = + build_local_handshake(&ctx.cfg, &ctx.global.version, current_caps); + // Store for session closure — same value, no second read. + *last_local_hs_factory + .lock() + .expect("last_local_hs poisoned") = Some(local_hs.clone()); + let client_config = crate::config::build_ws_client_config( + &ctx.global, + endpoint, + Some(ctx.cfg.name.clone()), + &security, + Some(local_hs), + ); + let peers = Arc::clone(&peers_factory); + async move { + let mut client = + wallhack_core::client::ws::WsClient::new(client_config)?; + client.peer_registry = Some(peers); + client.connect(NodeRole::Indeterminate).await + } + }, + move |connect_result| { + let connect_result = connect_result.erase(); + let ctx = Arc::clone(&ctx_session); + let bridge_tx = Arc::clone(&bridge_tx_arc); + // Use the handshake the factory already built — no second + // read of listener_ready, so there is no TOCTOU window. + let local_hs = last_local_hs_session + .lock() + .expect("last_local_hs poisoned") + .take() + .expect("factory always stores local_hs before session runs"); + async move { + run_relay_promotable_connector_session( + connect_result, + &local_hs, + ctx, + bridge_tx, + ) + .await + } + }, + RECONNECT_DELAY, + ) + .await + } + #[cfg(not(feature = "websocket"))] + { + let _ = (bridge_tx, ctx); + Err(NodeError::TransportUnavailable("websocket")) + } + } + } +} + +/// A single connector session for the relay-promotable path. +/// +/// Negotiates the session role. If exit → runs exit session. If relay → +/// creates the relay bridge, publishes it to the bridge watch channel, and +/// runs the relay session. On exit (disconnect), publishes `None` to the +/// watch channel. +// REASON: too_many_lines: exit and relay arms each have distinct session setup +#[allow(clippy::too_many_lines)] +async fn run_relay_promotable_connector_session( + connect_result: wallhack_core::client::client::ErasedConnectResult, + local_hs: &Handshake, + ctx: Arc, + bridge_tx: Arc>>>, +) -> Result<(), NodeError> { + use wallhack_core::transport::protocol::run_send_responses; + + let wallhack_core::client::client::ErasedConnectResult { + peer_handshake_rx, + transport, + channels, + tasks, + control_tx, + peer_addr, + } = connect_result; + + let DataChannels { + instructions_tx: _instructions_tx, + instructions_rx, + responses_tx, + responses_rx, + } = channels; + + let Some(rx) = peer_handshake_rx else { + tracing::warn!("No peer handshake receiver in relay-promotable ConnectResult"); + return Ok(()); + }; + let peer_hs = match tokio::time::timeout(Duration::from_secs(30), rx).await { + Ok(Ok(hs)) => hs, + Ok(Err(_)) => { + tracing::warn!("Peer handshake channel closed before delivery"); + return Ok(()); + } + Err(_) => { + tracing::warn!("Timed out waiting for peer handshake"); + return Ok(()); + } + }; + + let result = negotiate(local_hs, &peer_hs); + let negotiated_role = match &result { + NegotiationResult::Resolved { role, .. } => *role, + NegotiationResult::Indeterminate { .. } => NodeRole::Indeterminate, + }; + ctx.node_state.update_role(negotiated_role); + + let peer_name = if peer_hs.name.is_empty() { + "unknown".to_string() + } else { + peer_hs.name.clone() + }; + let peer_caps = peer_hs.capabilities.unwrap_or_default(); + let peer_role = super::peer_role_from_capabilities(peer_caps); + + tracing::info!( + "Role resolved: peer={peer_name} local_role={negotiated_role} peer_role={peer_role}", + ); + + match negotiated_role { + NodeRole::Exit => { + // Still operating as exit — listener not yet running or capabilities + // not yet updated. Run as a normal exit session. + tracing::debug!( + "Relay-promotable connector: running as exit (listening not yet advertised)" + ); + + { + let transport = Arc::clone(&transport); + tokio::spawn(async move { + match transport.open_uni_erased().await { + Ok(mut send) => { + if let Err(e) = run_send_responses(&mut send, responses_rx).await { + tracing::debug!("Relay-promotable exit send-responses: {e}"); + } + } + Err(e) => { + tracing::debug!( + "Relay-promotable exit failed to open send stream: {e}" + ); + } + } + }); + } + + drop(tasks); + let heartbeat = + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); + + run_auto_exit_session_inner( + transport, + instructions_rx, + responses_tx, + heartbeat, + peer_role, + peer_caps, + &peer_name, + &peer_addr, + &ctx, + ) + .await + } + NodeRole::Relay => { + // Now operating as relay. Create the bridge and publish it so the + // listener accept loop can start bridging peers. + tracing::info!("Relay-promotable connector: promoting to relay"); + + ctx.peers.register( + peer_name.clone(), + peer_addr.clone(), + peer_role, + peer_caps, + ConnectionSide::Connect, + ); + + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(()); + + // Outgoing: open uni stream to source, send exit-peer responses. + { + let transport = Arc::clone(&transport); + tokio::spawn(async move { + match transport.open_uni_erased().await { + Ok(mut send) => { + if let Err(e) = run_send_responses(&mut send, responses_rx).await { + tracing::debug!("Relay-promotable relay send-responses: {e}"); + } + } + Err(e) => tracing::debug!( + "Relay-promotable relay failed to open send stream: {e}" + ), + } + }); + } + + // Fan-out task for instructions from source to exit peers. + let fanout_register_tx = crate::transport::spawn_fanout_task(instructions_rx); + + // Watch channel for the latest peer transport (bidi bridging). + let (peer_transport_tx, peer_transport_rx) = tokio::sync::watch::channel::< + Option>, + >(None); + + // Source→peer bidi bridge. + super::relay::spawn_source_to_peer_bidi_bridge( + Arc::clone(&transport), + peer_transport_rx, + shutdown_rx.clone(), + ); + + // Forward accepted peer events to the source peer as PeerAnnouncements. + super::relay::spawn_peer_announcement_forwarder( + &ctx.peers, + control_tx.clone(), + peer_name.clone(), + ); + + let bridge = Arc::new(RelayBridge { + source_resp_tx: responses_tx, + fanout_register_tx, + peer_transport_tx, + source_transport: Arc::clone(&transport), + shutdown_rx: shutdown_rx.clone(), + }); + // Publish bridge — listener can start using it. + let _ = bridge_tx.send(Some(Arc::clone(&bridge))); + + let _heartbeat = + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); + + let mut task_set = tasks; + task_set.wait_for_disconnect().await; + tracing::warn!("Lost connection to {peer_name}"); + + // Clear bridge so listener knows the connector is gone. + let _ = bridge_tx.send(None); + drop(shutdown_tx); + ctx.peers.unregister(&peer_name); + + Ok(()) + } + _ => { + tracing::warn!( + "Relay-promotable connector: unexpected role {negotiated_role}; holding" + ); + let _keep_alive = control_tx; + hold_until_disconnect(tasks).await; + Ok(()) + } + } +} + +/// Listener half of the relay-promotable auto mode. +/// +/// Runs independently, accepting peers and bridging them through the current +/// relay bridge (if one is available). If no bridge is available when a peer +/// connects, the peer is rejected with a short hold. +// REASON: too_many_lines: quic and websocket branches each with server setup, bind, and accept loop +#[allow(clippy::too_many_lines)] +async fn run_relay_promotable_listener( + listen_spec: AddressSpec, + server_config: wallhack_core::server::config::ServerConfig, + server_options: ServerOptions, + ctx: Arc, + bridge_rx: tokio::sync::watch::Receiver>>, + listener_ready: Arc, +) -> Result<(), NodeError> { + match listen_spec.protocol { + Protocol::Udp => { + #[cfg(feature = "quic")] + { + let server = + wallhack_core::server::quic::QuicServer::try_new(server_config, server_options) + .map_err(|e| NodeError::Transport(Box::new(e)))?; + let bound = server.local_addr()?; + ctx.node_state.set_listen_addr(bound); + // Signal the connector that the listener is up. The connector + // will advertise `listening: true` on its next reconnect and + // negotiate as relay. + listener_ready.store(true, std::sync::atomic::Ordering::Release); + tracing::info!("Relay-promotable listener: listening on {bound} (QUIC)"); + run_relay_promotable_accept_loop(server, Arc::clone(&ctx.peers), bridge_rx).await + } + #[cfg(not(feature = "quic"))] + { + let _ = (ctx, bridge_rx, listener_ready); + Err(NodeError::TransportUnavailable("quic")) + } + } + Protocol::Tcp => { + #[cfg(feature = "websocket")] + { + let server = wallhack_core::server::ws::WebSocketServer::try_new( + server_config, + server_options, + )?; + let bound = server.local_addr()?; + ctx.node_state.set_listen_addr(bound); + listener_ready.store(true, std::sync::atomic::Ordering::Release); + tracing::info!("Relay-promotable listener: listening on {bound} (WebSocket)"); + run_relay_promotable_accept_loop(server, Arc::clone(&ctx.peers), bridge_rx).await + } + #[cfg(not(feature = "websocket"))] + { + let _ = (ctx, bridge_rx, listener_ready); + Err(NodeError::TransportUnavailable("websocket")) + } + } + } +} + +/// Accept loop for the relay-promotable listener. +/// +/// For each accepted peer: if a relay bridge is available, bridge the peer +/// through the connector session. Otherwise drop the peer with a log message. +async fn run_relay_promotable_accept_loop( + mut server: S, + peers: Arc, + bridge_rx: tokio::sync::watch::Receiver>>, +) -> Result<(), NodeError> +where + S::Error: std::error::Error + Send + Sync + 'static, + S::Transport: Send + Sync + 'static, + ::SendStream: 'static, + ::RecvStream: 'static, + ::BiStream: Send + 'static, +{ + loop { + match server.accept(NodeRole::Relay).await { + Ok(Some(accept_result)) => { + let erased = accept_result.erase(); + let bridge = bridge_rx.borrow().clone(); + if let Some(bridge) = bridge { + super::relay::handle_relay_connection( + erased, + bridge.source_resp_tx.clone(), + &bridge.fanout_register_tx, + &bridge.peer_transport_tx, + &bridge.source_transport, + &peers, + &bridge.shutdown_rx, + ); + } else { + tracing::warn!( + "Relay-promotable listener: no connector session active, dropping peer {}", + erased.peer_addr + ); + } + } + Ok(None) => { + tracing::info!("Relay-promotable listener: server closed"); + break; + } + Err(e) => { + tracing::warn!("Relay-promotable listener: accept error: {e}"); + } + } + } + + Ok(()) +} + /// Non-generic inner implementation for accepted auto-listener sessions. /// /// All generic extraction (transport, channels, handshake) happens in the /// caller before spawning, so this function is monomorphized only once. -// REASON: symmetric entry/exit/relay/indeterminate negotiation arms, each with distinct session setup +// REASON: threading transport, channels, control, peer_hs, local_hs, peer_addr through negotiation arms #[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn run_auto_accept_session_inner( transport: Arc, @@ -843,14 +1569,8 @@ async fn run_auto_accept_session_inner( control_tx: tokio::sync::mpsc::Sender, peer_hs: Option, local_hs: Handshake, - metrics: Arc, - peers: Arc, - routes: Option, - route_updates: Option< - tokio::sync::broadcast::Receiver, - >, + ctx: Arc, peer_addr: String, - node_state: SharedNodeState, ) -> Result<(), NodeError> { let Some(peer_hs) = peer_hs else { tracing::warn!("No peer handshake from {peer_addr}; cannot negotiate"); @@ -863,7 +1583,7 @@ async fn run_auto_accept_session_inner( NegotiationResult::Resolved { role, .. } => *role, NegotiationResult::Indeterminate { .. } => NodeRole::Indeterminate, }; - node_state.update_role(negotiated_role); + ctx.node_state.update_role(negotiated_role); let peer_role = super::peer_role_from_capabilities(peer_hs.capabilities.unwrap_or_default()); tracing::info!( "Role resolved: peer={} addr={peer_addr} local_role={negotiated_role} peer_role={peer_role}", @@ -897,28 +1617,24 @@ async fn run_auto_accept_session_inner( // Install routes advertised by the exit peer before applying them // to the TUN so the apply block below picks them up in one pass. - if let Some(ref r) = routes - && !peer_hs.name.is_empty() - { - install_advertised_routes(r, &peer_hs.name, &peer_hs.routes); + if !peer_hs.name.is_empty() { + install_advertised_routes(&ctx.routes, &peer_hs.name, &peer_hs.routes); } // Apply all routes (user-configured and newly-advertised) to the TUN. - // REASON: outer guard is an option, inner guard is a separate semantic check on peer identity + // REASON: outer guard is a non-empty name check; inner guard is a separate route match #[allow(clippy::collapsible_if)] - if let Some(r) = &routes { - if !peer_hs.name.is_empty() { - for entry in r.list() { - if entry.peer == peer_hs.name { - let _ = - crate::netlink::add_os_route(&entry.cidr.to_string(), &tun_name); - } + if !peer_hs.name.is_empty() { + for entry in ctx.routes.list() { + if entry.peer == peer_hs.name { + let _ = crate::netlink::add_os_route(&entry.cidr.to_string(), &tun_name); } } } // Spawn route update listener - if let Some(mut updates) = route_updates { + { + let mut updates = ctx.route_updates(); let tun_name = tun_name.clone(); let peer = if peer_hs.name.is_empty() { None @@ -933,7 +1649,7 @@ async fn run_auto_accept_session_inner( ); loop { match updates.recv().await { - Ok(wallhack_core::control::routes::RouteUpdate::Add(entry)) => { + Ok(RouteUpdate::Add(entry)) => { // REASON: peer match is a route filter; OS call error is a separate concern #[allow(clippy::collapsible_if)] if Some(entry.peer.as_str()) == peer.as_deref() { @@ -945,7 +1661,7 @@ async fn run_auto_accept_session_inner( } } } - Ok(wallhack_core::control::routes::RouteUpdate::Remove(entry)) => { + Ok(RouteUpdate::Remove(entry)) => { // REASON: peer match is a route filter; OS call error is a separate concern #[allow(clippy::collapsible_if)] if Some(entry.peer.as_str()) == peer.as_deref() { @@ -975,7 +1691,7 @@ async fn run_auto_accept_session_inner( let (manager, _) = ConnectionManager::new( actor, Arc::clone(&transport), - Arc::clone(&metrics), + Arc::clone(&ctx.metrics), instructions_tx, responses_rx, ); @@ -992,7 +1708,7 @@ async fn run_auto_accept_session_inner( } else { NodeRole::Exit }; - peers.register( + ctx.peers.register( peer_name.clone(), peer_addr.clone(), peer_role, @@ -1001,7 +1717,7 @@ async fn run_auto_accept_session_inner( ); let _heartbeat = - super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); let handle = tokio::spawn(async move { manager.run().await }); match handle.await { @@ -1009,23 +1725,21 @@ async fn run_auto_accept_session_inner( Ok(Err(e)) => tracing::warn!("Auto entry session error {peer_name}: {e}"), Err(e) => tracing::warn!("Auto entry session task failed {peer_name}: {e}"), } - peers.unregister(&peer_name); + ctx.peers.unregister(&peer_name); // Best-effort TUN cleanup after disconnect. crate::netlink::delete_tun(&tun_name); // Remove auto-managed routes and their OS entries now that the // session has ended. - if let Some(ref r) = routes { - let removed = r.remove_auto_by_peer(&peer_name); - if !removed.is_empty() { - tracing::info!( - "Removing {} auto route(s) for disconnected exit {peer_name}", - removed.len() - ); - for entry in &removed { - let _ = crate::netlink::remove_os_route(&entry.cidr.to_string(), &tun_name); - } + let removed = ctx.routes.remove_auto_by_peer(&peer_name); + if !removed.is_empty() { + tracing::info!( + "Removing {} auto route(s) for disconnected exit {peer_name}", + removed.len() + ); + for entry in &removed { + let _ = crate::netlink::remove_os_route(&entry.cidr.to_string(), &tun_name); } } } @@ -1076,7 +1790,7 @@ async fn run_auto_accept_session_inner( }; let peer_caps = peer_hs.capabilities.unwrap_or_default(); let peer_role = super::peer_role_from_capabilities(peer_caps); - peers.register( + ctx.peers.register( peer_name.clone(), peer_addr.clone(), peer_role, @@ -1085,14 +1799,14 @@ async fn run_auto_accept_session_inner( ); let _heartbeat = - super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); let adapter = SyscallExitAdapter::new(); let _reaper = adapter.start_reaper( std::time::Duration::from_mins(1), std::time::Duration::from_mins(5), ); - let orchestrator = Orchestrator::new(Arc::new(adapter), Arc::clone(&metrics)); + let orchestrator = Orchestrator::new(Arc::new(adapter), Arc::clone(&ctx.metrics)); let stream_fut = super::exit::run_stream_listener(Arc::clone(&transport)); let drive_fut = orchestrator.drive(responses_tx, instructions_rx); @@ -1108,7 +1822,7 @@ async fn run_auto_accept_session_inner( } } - peers.unregister(&peer_name); + ctx.peers.unregister(&peer_name); tracing::info!("Peer disconnected: {peer_name}"); } NegotiationResult::Resolved { @@ -1130,20 +1844,21 @@ async fn run_auto_accept_session_inner( peer_hs.name.clone() }; let peer_caps = peer_hs.capabilities.unwrap_or_default(); - peers.register( + ctx.peers.register( name.clone(), peer_addr.clone(), NodeRole::Indeterminate, peer_caps, wallhack_core::control::peers::ConnectionSide::Accept, ); - let _heartbeat = super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&peers)); + let _heartbeat = + super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&ctx.peers)); // Hold transport alive; wait for the peer to disconnect // by draining the instructions channel (closes when transport dies). let _keep_transport = transport; let mut rx = instructions_rx; while rx.recv().await.is_some() {} - peers.unregister(&name); + ctx.peers.unregister(&name); tracing::info!("Peer disconnected: {name}"); } } diff --git a/crates/daemon/src/mode/mod.rs b/crates/daemon/src/mode/mod.rs index 1cca80f9..8d77c4ef 100644 --- a/crates/daemon/src/mode/mod.rs +++ b/crates/daemon/src/mode/mod.rs @@ -30,6 +30,9 @@ pub(crate) struct NodeResources { pub route_updates_tx: tokio::sync::broadcast::Sender, pub node_state: SharedNodeState, + /// Receiver for commands (role, connect, listen, disconnect) from the + /// control API. Only auto mode consumes this. + pub command_sink: tokio::sync::mpsc::Receiver, } /// Derive a peer's role from its advertised capabilities. @@ -159,8 +162,8 @@ pub(crate) async fn run(config: &DaemonConfig, resources: NodeResources) -> Resu resources.metrics, resources.peers, resources.routes, - resources.route_updates, resources.route_updates_tx, + resources.command_sink, resources.node_state, ) .await diff --git a/crates/daemon/src/mode/relay.rs b/crates/daemon/src/mode/relay.rs index 34a839fa..073aea42 100644 --- a/crates/daemon/src/mode/relay.rs +++ b/crates/daemon/src/mode/relay.rs @@ -341,102 +341,17 @@ async fn run_relay_loop_inner( Option>, >(None); - // Source→peer bidi bridge: single accept loop on the source transport. - // When a bidi stream arrives from the source, opens a matching bidi to - // the current peer and splices them together. - { - let transport = Arc::clone(&transport); - let mut shutdown_bidi = shutdown_rx.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - result = transport.accept_bi_erased() => { - match result { - Ok(Some(source_stream)) => { - let current_peer = peer_transport_rx.borrow().clone(); - let Some(peer) = current_peer else { - tracing::debug!("bidi bridge: no peer connected, dropping stream"); - continue; - }; - tokio::spawn(async move { - match peer.open_bi_erased().await { - Ok(peer_stream) => { - if let Err(e) = wallhack_core::transport::splice_bi( - source_stream, - peer_stream, - ).await { - tracing::debug!("bidi bridge (source→peer) ended: {e}"); - } - } - Err(e) => tracing::debug!("bidi bridge: failed to open peer stream: {e}"), - } - }); - } - Ok(None) => break, - Err(e) => { - tracing::debug!("bidi bridge: source accept_bi error: {e}"); - } - } - } - _ = shutdown_bidi.changed() => break, - } - } - }); - } + // Source→peer bidi bridge: accept bidi streams from the source transport + // and splice each one to the current peer transport. + spawn_source_to_peer_bidi_bridge( + Arc::clone(&transport), + peer_transport_rx, + shutdown_rx.clone(), + ); // Forward accepted peer events to the source peer as PeerAnnouncements. // The source (entry) registers these peers for topology visibility. - { - let mut peer_events = peers.subscribe(); - let peer_name = peer_name.clone(); - let source_control_tx = source_control_tx.clone(); - tokio::spawn(async move { - use wallhack_core::control::peers::PeerEvent; - use wallhack_wire::control::{ - ControlMessage, PeerAnnouncement, control_message, peer_announcement, - }; - - loop { - match peer_events.recv().await { - Ok(PeerEvent::Connected { name, addr, role }) if name != peer_name => { - let announcement = PeerAnnouncement { - event: peer_announcement::Event::Connected.into(), - name, - addr, - role: wallhack_wire::data::NodeRole::from(role).into(), - routes: Vec::new(), - }; - let msg = ControlMessage { - message: Some(control_message::Message::PeerAnnouncement(announcement)), - }; - if source_control_tx.send(msg).await.is_err() { - break; - } - } - Ok(PeerEvent::Disconnected { name }) if name != peer_name => { - let announcement = PeerAnnouncement { - event: peer_announcement::Event::Disconnected.into(), - name, - addr: String::new(), - role: 0, - routes: Vec::new(), - }; - let msg = ControlMessage { - message: Some(control_message::Message::PeerAnnouncement(announcement)), - }; - if source_control_tx.send(msg).await.is_err() { - break; - } - } - Ok(_) => {} // skip source peer events - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::debug!("Peer announcement forwarder lagged {n} events"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - } + spawn_peer_announcement_forwarder(&peers, source_control_tx.clone(), peer_name.clone()); let listener_fut = run_listener( global, @@ -659,9 +574,116 @@ where Ok(()) } +/// Spawn the source→peer bidi bridge task. +/// +/// Accepts bidi streams from `source` and splices each one to the current +/// peer transport (looked up from `peer_transport_rx`). Exits when the source +/// closes or `shutdown_rx` fires. +pub(crate) fn spawn_source_to_peer_bidi_bridge( + source: Arc, + peer_transport_rx: tokio::sync::watch::Receiver< + Option>, + >, + mut shutdown_rx: tokio::sync::watch::Receiver<()>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + loop { + tokio::select! { + result = source.accept_bi_erased() => { + match result { + Ok(Some(source_stream)) => { + let current_peer = peer_transport_rx.borrow().clone(); + let Some(peer) = current_peer else { + tracing::debug!("bidi bridge: no peer connected, dropping stream"); + continue; + }; + tokio::spawn(async move { + match peer.open_bi_erased().await { + Ok(peer_stream) => { + if let Err(e) = wallhack_core::transport::splice_bi( + source_stream, + peer_stream, + ).await { + tracing::debug!("bidi bridge (source→peer) ended: {e}"); + } + } + Err(e) => tracing::debug!("bidi bridge: failed to open peer stream: {e}"), + } + }); + } + Ok(None) => break, + Err(e) => { + tracing::debug!("bidi bridge: source accept_bi error: {e}"); + } + } + } + _ = shutdown_rx.changed() => break, + } + } + }) +} + +/// Spawn the peer announcement forwarder task. +/// +/// Subscribes to peer events and forwards `PeerAnnouncement` control messages +/// to `source_control_tx`, skipping events for `exclude_name`. +pub(crate) fn spawn_peer_announcement_forwarder( + peers: &Arc, + source_control_tx: tokio::sync::mpsc::Sender, + exclude_name: String, +) -> tokio::task::JoinHandle<()> { + let mut peer_events = peers.subscribe(); + tokio::spawn(async move { + use wallhack_core::control::peers::PeerEvent; + use wallhack_wire::control::{ + ControlMessage, PeerAnnouncement, control_message, peer_announcement, + }; + + loop { + match peer_events.recv().await { + Ok(PeerEvent::Connected { name, addr, role }) if name != exclude_name => { + let announcement = PeerAnnouncement { + event: peer_announcement::Event::Connected.into(), + name, + addr, + role: wallhack_wire::data::NodeRole::from(role).into(), + routes: Vec::new(), + }; + let msg = ControlMessage { + message: Some(control_message::Message::PeerAnnouncement(announcement)), + }; + if source_control_tx.send(msg).await.is_err() { + break; + } + } + Ok(PeerEvent::Disconnected { name }) if name != exclude_name => { + let announcement = PeerAnnouncement { + event: peer_announcement::Event::Disconnected.into(), + name, + addr: String::new(), + role: 0, + routes: Vec::new(), + }; + let msg = ControlMessage { + message: Some(control_message::Message::PeerAnnouncement(announcement)), + }; + if source_control_tx.send(msg).await.is_err() { + break; + } + } + Ok(_) => {} // skip excluded peer events + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("Peer announcement forwarder lagged {n} events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }) +} + /// Non-generic handler for erased relay connection results. #[allow(clippy::too_many_lines)] // REASON: symmetric uni/bidi stream setup and heartbeat per accepted peer -fn handle_relay_connection( +pub(crate) fn handle_relay_connection( erased: wallhack_core::server::server::ErasedAcceptResult, source_resp_tx: tokio::sync::mpsc::Sender, fanout_register_tx: &tokio::sync::mpsc::UnboundedSender< diff --git a/crates/mcp/src/convert.rs b/crates/mcp/src/convert.rs index 331b224f..dd042dd5 100644 --- a/crates/mcp/src/convert.rs +++ b/crates/mcp/src/convert.rs @@ -23,17 +23,7 @@ pub fn format_response(resp: &ManagementResponse) -> Result { let _ = writeln!(out, "uptime: {}", format_uptime(s.uptime_ms)); Ok(out) } - Some(management_response::Response::Stats(s)) => Ok(format!( - "bytes in: {}\nbytes out: {}\npackets in: {}\npackets out: {}\n\ - connections: {}\nflows: {}\ndropped: {}", - s.bytes_in, - s.bytes_out, - s.packets_in, - s.packets_out, - s.active_connections, - s.active_flows, - s.packets_dropped, - )), + Some(management_response::Response::Stats(s)) => Ok(s.to_string()), Some(management_response::Response::Peers(p)) => { if p.peers.is_empty() { return Ok("No connected peers.".to_string()); diff --git a/crates/wire/proto/management.proto b/crates/wire/proto/management.proto index 460c52b8..8b1b4f3e 100644 --- a/crates/wire/proto/management.proto +++ b/crates/wire/proto/management.proto @@ -161,6 +161,8 @@ message StatsResponse { uint64 active_connections = 5; uint64 active_flows = 6; uint64 packets_dropped = 7; + uint64 total_connections = 8; // monotonically increasing; survives session close + uint64 total_flows = 9; // monotonically increasing; survives session close } message PeerInfo { diff --git a/crates/wire/src/management.rs b/crates/wire/src/management.rs index 95860c38..1eeb72ff 100644 --- a/crates/wire/src/management.rs +++ b/crates/wire/src/management.rs @@ -23,6 +23,25 @@ impl fmt::Display for NodeRole { } } +impl fmt::Display for StatsResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "bytes in: {}\nbytes out: {}\npackets in: {}\npackets out: {}\n\ + connections: {}\nflows: {}\ndropped: {}\ntotal connections: {}\ntotal flows: {}", + self.bytes_in, + self.bytes_out, + self.packets_in, + self.packets_out, + self.active_connections, + self.active_flows, + self.packets_dropped, + self.total_connections, + self.total_flows, + ) + } +} + impl fmt::Display for PeerStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/website/src/content/config.ts b/website/src/content.config.ts similarity index 73% rename from website/src/content/config.ts rename to website/src/content.config.ts index 7b81491e..a41ae9da 100644 --- a/website/src/content/config.ts +++ b/website/src/content.config.ts @@ -1,7 +1,8 @@ +import { glob } from "astro/loaders"; import { defineCollection, z } from "astro:content"; const docs = defineCollection({ - type: "content", + loader: glob({ pattern: "**/*.{md,mdx,mdoc}", base: "./src/content/docs" }), schema: z.object({ title: z.string(), description: z.string(), diff --git a/website/src/data/openapi.json b/website/src/data/openapi.json index 8c01418f..a5b57905 100644 --- a/website/src/data/openapi.json +++ b/website/src/data/openapi.json @@ -77,7 +77,10 @@ "packets_in", "packets_out", "active_connections", - "active_flows" + "active_flows", + "packets_dropped", + "total_connections", + "total_flows" ], "properties": { "bytes_in": { @@ -103,6 +106,18 @@ "active_flows": { "type": "integer", "description": "Number of active L4 network flows being tracked." + }, + "packets_dropped": { + "type": "integer", + "description": "Total packets dropped since daemon start." + }, + "total_connections": { + "type": "integer", + "description": "Total peer connections opened since daemon start (monotonically increasing)." + }, + "total_flows": { + "type": "integer", + "description": "Total L4 flows opened since daemon start (monotonically increasing)." } } }, diff --git a/website/src/pages/[slug].astro b/website/src/pages/[slug].astro index ec82ef8c..bff69bc3 100644 --- a/website/src/pages/[slug].astro +++ b/website/src/pages/[slug].astro @@ -1,13 +1,13 @@ --- -import { getCollection } from 'astro:content'; +import { getCollection, render } from 'astro:content'; import DocsLayout from '../layouts/DocsLayout.astro'; export async function getStaticPaths() { const docs = await getCollection('docs'); return docs - .filter((doc) => doc.slug !== 'index') + .filter((doc) => doc.id !== 'index') .map((doc) => ({ - params: { slug: doc.slug }, + params: { slug: doc.id }, props: { doc }, })); } @@ -17,7 +17,7 @@ interface Props { } const { doc } = Astro.props; -const { Content } = await doc.render(); +const { Content } = await render(doc); --- diff --git a/website/src/pages/index.astro b/website/src/pages/index.astro index 93a8a5bf..bce95f04 100644 --- a/website/src/pages/index.astro +++ b/website/src/pages/index.astro @@ -1,5 +1,5 @@ --- -import { getEntry } from 'astro:content'; +import { getEntry, render } from 'astro:content'; import DocsLayout from '../layouts/DocsLayout.astro'; const entry = await getEntry('docs', 'index'); @@ -7,7 +7,7 @@ if (!entry) { throw new Error("docs/index.mdoc is missing — cannot render home page"); } -const { Content } = await entry.render(); +const { Content } = await render(entry); --- diff --git a/website/src/pages/og/[slug].png.ts b/website/src/pages/og/[slug].png.ts index 729f26e0..5f758089 100644 --- a/website/src/pages/og/[slug].png.ts +++ b/website/src/pages/og/[slug].png.ts @@ -26,7 +26,7 @@ export async function getStaticPaths() { const docs = await getCollection("docs"); return [ ...docs.map((doc) => ({ - params: { slug: doc.slug }, + params: { slug: doc.id }, props: { title: doc.data.title, description: doc.data.description }, })), { diff --git a/website/website.just b/website/website.just index 570056cf..9eeafefd 100644 --- a/website/website.just +++ b/website/website.just @@ -1,5 +1,3 @@ -set working-directory := "website" - # Install website dependencies (pnpm, frozen lockfile) deps: pnpm install --frozen-lockfile --silent