From 46a41a233a0952911465a5c06803d3dfd7c5a408 Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Fri, 16 Jan 2026 11:03:31 +0530 Subject: [PATCH 1/6] feat(rust): utils-rs, monitor, stop flag, error enums, exit code rust library: - added utils-rs for shared utils - added CpuRing for better cpu cores handling - added Monitor for post connection uds comm - added stop flag support in stats tui rust examples: - added error enums and exit code - moved from run flag to stop flag --- Cargo.toml | 1 + examples/arpresolver-rs/Cargo.toml | 3 +- examples/arpresolver-rs/src/cli.rs | 22 +--- examples/arpresolver-rs/src/error.rs | 12 ++ examples/arpresolver-rs/src/main.rs | 139 ++++++++------------- examples/firewall-rs/Cargo.toml | 3 +- examples/firewall-rs/src/cli.rs | 26 +--- examples/firewall-rs/src/error.rs | 13 ++ examples/firewall-rs/src/main.rs | 148 ++++++++-------------- examples/helloworld-rs/src/main.rs | 17 +-- examples/ip4ping-rs/Cargo.toml | 3 +- examples/ip4ping-rs/src/cli.rs | 22 +--- examples/ip4ping-rs/src/error.rs | 12 ++ examples/ip4ping-rs/src/main.rs | 138 ++++++++------------- examples/l2fwd-rs/Cargo.toml | 3 +- examples/l2fwd-rs/src/cli.rs | 22 +--- examples/l2fwd-rs/src/error.rs | 12 ++ examples/l2fwd-rs/src/main.rs | 138 ++++++++------------- examples/maglev-rs/Cargo.toml | 3 +- examples/maglev-rs/src/cli.rs | 26 +--- examples/maglev-rs/src/error.rs | 23 ++++ examples/maglev-rs/src/main.rs | 179 +++++++++++---------------- examples/simplefwd-rs/Cargo.toml | 3 +- examples/simplefwd-rs/src/cli.rs | 22 +--- examples/simplefwd-rs/src/error.rs | 12 ++ examples/simplefwd-rs/src/main.rs | 138 ++++++++------------- lib/flash-rs/src/client.rs | 35 ++---- lib/flash-rs/src/config/socket.rs | 11 +- lib/flash-rs/src/lib.rs | 6 +- lib/flash-rs/src/monitor.rs | 62 ++++++++++ lib/flash-rs/src/tui/dashboard.rs | 38 ++++-- lib/flash-rs/src/uds/client.rs | 12 +- lib/flash-rs/src/uds/conn.rs | 11 +- lib/flash-rs/src/uds/mod.rs | 4 +- lib/flash-rs/src/xsk/socket.rs | 7 ++ lib/utils-rs/Cargo.toml | 8 ++ lib/utils-rs/src/cpu.rs | 114 +++++++++++++++++ lib/utils-rs/src/error.rs | 14 +++ lib/utils-rs/src/lib.rs | 4 + 39 files changed, 732 insertions(+), 734 deletions(-) create mode 100644 examples/arpresolver-rs/src/error.rs create mode 100644 examples/firewall-rs/src/error.rs create mode 100644 examples/ip4ping-rs/src/error.rs create mode 100644 examples/l2fwd-rs/src/error.rs create mode 100644 examples/maglev-rs/src/error.rs create mode 100644 examples/simplefwd-rs/src/error.rs create mode 100644 lib/flash-rs/src/monitor.rs create mode 100644 lib/utils-rs/Cargo.toml create mode 100644 lib/utils-rs/src/cpu.rs create mode 100644 lib/utils-rs/src/error.rs create mode 100644 lib/utils-rs/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1035a6f..f6af436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "lib/flash-rs", + "lib/utils-rs", "examples/arpresolver-rs", "examples/firewall-rs", "examples/helloworld-rs", diff --git a/examples/arpresolver-rs/Cargo.toml b/examples/arpresolver-rs/Cargo.toml index fd6fd84..77f1b76 100644 --- a/examples/arpresolver-rs/Cargo.toml +++ b/examples/arpresolver-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/arpresolver-rs/src/cli.rs b/examples/arpresolver-rs/src/cli.rs index ff946a2..eb32532 100644 --- a/examples/arpresolver-rs/src/cli.rs +++ b/examples/arpresolver-rs/src/cli.rs @@ -1,9 +1,9 @@ -#[cfg(feature = "stats")] use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[arg(short = 'M', long, help = "NF MAC address")] pub nf_mac: MacAddr6, @@ -46,10 +33,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/arpresolver-rs/src/error.rs b/examples/arpresolver-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/arpresolver-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/arpresolver-rs/src/main.rs b/examples/arpresolver-rs/src/main.rs index fa8821a..53ad412 100644 --- a/examples/arpresolver-rs/src/main.rs +++ b/examples/arpresolver-rs/src/main.rs @@ -1,13 +1,14 @@ mod cli; +mod error; mod nf; use std::{ net::Ipv4Addr, + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -17,16 +18,16 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; fn socket_thread( mut socket: Socket, nf_mac: MacAddr6, nf_ip: Ipv4Addr, mac_addr: Option, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -60,108 +61,76 @@ fn socket_thread( } } -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); - - let (sockets, route) = match flash::connect(&cli.flash_config) { - Ok(t) => t, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; + let nf_ip_addr = monitor.get_nf_ip_addr()?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.nf_mac, route.ip_addr, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.nf_mac, nf_ip_addr, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/firewall-rs/Cargo.toml b/examples/firewall-rs/Cargo.toml index db462fb..c3b7ef1 100644 --- a/examples/firewall-rs/Cargo.toml +++ b/examples/firewall-rs/Cargo.toml @@ -5,14 +5,15 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" csv = "1.3.1" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" serde = { version = "1.0.219", features = ["derive"] } +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/firewall-rs/src/cli.rs b/examples/firewall-rs/src/cli.rs index 3c3448a..ac9a5da 100644 --- a/examples/firewall-rs/src/cli.rs +++ b/examples/firewall-rs/src/cli.rs @@ -1,11 +1,9 @@ -use std::path::PathBuf; - -#[cfg(feature = "stats")] -use std::str::FromStr as _; +use std::{path::PathBuf, str::FromStr as _}; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -15,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[arg(short, long, help = "Path to denylist csv file")] pub denylist: PathBuf, @@ -48,10 +33,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/firewall-rs/src/error.rs b/examples/firewall-rs/src/error.rs new file mode 100644 index 0000000..120410b --- /dev/null +++ b/examples/firewall-rs/src/error.rs @@ -0,0 +1,13 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + Csv(#[from] csv::Error), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/firewall-rs/src/main.rs b/examples/firewall-rs/src/main.rs index c042e5a..f97b41e 100644 --- a/examples/firewall-rs/src/main.rs +++ b/examples/firewall-rs/src/main.rs @@ -1,12 +1,13 @@ mod cli; +mod error; mod nf; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -16,15 +17,15 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::{cli::Cli, nf::Firewall}; +use crate::{cli::Cli, error::AppError, nf::Firewall}; fn socket_thread( mut socket: Socket, firewall: &Arc, mac_addr: Option, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -55,118 +56,79 @@ fn socket_thread( } } -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); - - let sockets = match flash::connect(&cli.flash_config) { - Ok((sockets, _)) => sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let firewall = match Firewall::new(cli.denylist) { - Ok(firewall) => Arc::new(firewall), - Err(err) => { - eprintln!("error loading firewall rules: {err}"); - return; - } - }; + Some(stop.clone()), + )?; - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + let firewall = Arc::new(Firewall::new(cli.denylist)?); #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); + }) + }; + + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); let firewall = firewall.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, &firewall, cli.mac_addr, &r); - }) - }) - .collect::>(); + move || socket_thread(socket, &firewall, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/helloworld-rs/src/main.rs b/examples/helloworld-rs/src/main.rs index 3855497..5c482cb 100644 --- a/examples/helloworld-rs/src/main.rs +++ b/examples/helloworld-rs/src/main.rs @@ -7,20 +7,7 @@ fn main() { let flash_config = FlashConfig::parse(); - let sockets = match flash::connect(&flash_config) { - Ok((sockets, _)) => sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - #[cfg(feature = "tracing")] - return; + if let Err(err) = flash::connect(&flash_config) { + eprintln!("{err}"); } - - #[cfg(feature = "tracing")] - tracing::info!("Sockets: {sockets:?}"); } diff --git a/examples/ip4ping-rs/Cargo.toml b/examples/ip4ping-rs/Cargo.toml index 0fd97e9..37475eb 100644 --- a/examples/ip4ping-rs/Cargo.toml +++ b/examples/ip4ping-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/ip4ping-rs/src/cli.rs b/examples/ip4ping-rs/src/cli.rs index cfc783e..31391d6 100644 --- a/examples/ip4ping-rs/src/cli.rs +++ b/examples/ip4ping-rs/src/cli.rs @@ -1,9 +1,9 @@ -#[cfg(feature = "stats")] use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[arg(short = 'm', long, help = "Dest MAC address")] pub mac_addr: Option, @@ -43,10 +30,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/ip4ping-rs/src/error.rs b/examples/ip4ping-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/ip4ping-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/ip4ping-rs/src/main.rs b/examples/ip4ping-rs/src/main.rs index 0784743..ec9c35a 100644 --- a/examples/ip4ping-rs/src/main.rs +++ b/examples/ip4ping-rs/src/main.rs @@ -1,12 +1,13 @@ mod cli; +mod error; mod nf; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -16,10 +17,10 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -53,108 +54,75 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/l2fwd-rs/Cargo.toml b/examples/l2fwd-rs/Cargo.toml index 8412719..5c30211 100644 --- a/examples/l2fwd-rs/Cargo.toml +++ b/examples/l2fwd-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/l2fwd-rs/src/cli.rs b/examples/l2fwd-rs/src/cli.rs index e5de218..53aa2ad 100644 --- a/examples/l2fwd-rs/src/cli.rs +++ b/examples/l2fwd-rs/src/cli.rs @@ -1,9 +1,9 @@ -#[cfg(feature = "stats")] use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[cfg(feature = "stats")] #[command(flatten)] @@ -43,10 +30,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/l2fwd-rs/src/error.rs b/examples/l2fwd-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/l2fwd-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/l2fwd-rs/src/main.rs b/examples/l2fwd-rs/src/main.rs index 9a1a138..100d435 100644 --- a/examples/l2fwd-rs/src/main.rs +++ b/examples/l2fwd-rs/src/main.rs @@ -1,11 +1,12 @@ mod cli; +mod error; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -15,7 +16,7 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; #[forbid(clippy::indexing_slicing)] #[inline] @@ -32,8 +33,8 @@ fn mac_swap(pkt: &mut [u8; 14], mac_addr: Option) { } } -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -60,108 +61,75 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/maglev-rs/Cargo.toml b/examples/maglev-rs/Cargo.toml index 0934e0b..d2c0519 100644 --- a/examples/maglev-rs/Cargo.toml +++ b/examples/maglev-rs/Cargo.toml @@ -5,14 +5,15 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } fnv = "1.0.7" macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } twox-hash = "2.1.0" +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/maglev-rs/src/cli.rs b/examples/maglev-rs/src/cli.rs index 5f77a23..64bf3d0 100644 --- a/examples/maglev-rs/src/cli.rs +++ b/examples/maglev-rs/src/cli.rs @@ -1,11 +1,9 @@ -use std::net::Ipv4Addr; - -#[cfg(feature = "stats")] -use std::str::FromStr as _; +use std::{net::Ipv4Addr, str::FromStr as _}; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -15,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[cfg(feature = "stats")] #[command(flatten)] @@ -48,10 +33,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/maglev-rs/src/error.rs b/examples/maglev-rs/src/error.rs new file mode 100644 index 0000000..083eaa9 --- /dev/null +++ b/examples/maglev-rs/src/error.rs @@ -0,0 +1,23 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), + + #[error("app error: empty route and no fallback IP")] + EmptyRoute, + + #[error( + "app error: no of next NF MACs ({mac_count}) does not match no of next NFs ({ip_count})" + )] + MacIpMismatch { + mac_count: usize, + ip_count: usize, + }, +} diff --git a/examples/maglev-rs/src/main.rs b/examples/maglev-rs/src/main.rs index 1ae88f3..b7b4014 100644 --- a/examples/maglev-rs/src/main.rs +++ b/examples/maglev-rs/src/main.rs @@ -1,15 +1,16 @@ mod cli; +mod error; mod maglev; mod nf; use std::{ hash::BuildHasher, net::Ipv4Addr, + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -20,7 +21,7 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::{cli::Cli, maglev::Maglev}; +use crate::{cli::Cli, error::AppError, maglev::Maglev}; const MAGLEV_TABLE_SIZE: usize = 65537; @@ -29,9 +30,9 @@ fn socket_thread( maglev: &Arc>, next_ip: &Arc>, next_mac: &Arc>, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -63,137 +64,103 @@ fn socket_thread( } } -#[allow(clippy::too_many_lines)] -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; - let (sockets, route) = match flash::connect(&cli.flash_config) { - Ok(t) => t, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); - - #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( - sockets.iter().map(Socket::stats), - cli.stats.fps, - cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let next_ip = if route.next.is_empty() { + let mut next_ip_addr = monitor.get_next_ip_addr()?; + if next_ip_addr.is_empty() { if let Some(fb_ip) = cli.fallback_ip { - vec![fb_ip] + next_ip_addr.push(fb_ip); } else { - eprintln!("empty route and no fallback IP configured"); - return; + return Err(AppError::EmptyRoute); } - } else { - route.next - }; + } - if cli.next_mac.len() > 1 && cli.next_mac.len() != next_ip.len() { - eprintln!( - "number of next NF MACs ({}) does not match number of next NFs ({})", - cli.next_mac.len(), - next_ip.len() - ); - return; + if cli.next_mac.len() > 1 && cli.next_mac.len() != next_ip_addr.len() { + return Err(AppError::MacIpMismatch { + mac_count: cli.next_mac.len(), + ip_count: next_ip_addr.len(), + }); } - let maglev = Arc::new(Maglev::::new(&next_ip, MAGLEV_TABLE_SIZE)); - let next_ip = Arc::new(next_ip); + let maglev = Arc::new(Maglev::::new( + &next_ip_addr, + MAGLEV_TABLE_SIZE, + )); + let next_ip = Arc::new(next_ip_addr); let next_mac = Arc::new(cli.next_mac); - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; + let mut tui = StatsDashboard::new( + sockets.iter().map(Socket::stats), + cli.stats.fps, + cli.stats.layout, + Some(stop.clone()), + )?; - let run = Arc::new(AtomicBool::new(true)); + #[cfg(feature = "stats")] + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; + + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); + }) + }; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); let maglev = maglev.clone(); let next_ip = next_ip.clone(); let next_mac = next_mac.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, &maglev, &next_ip, &next_mac, &r); - }) - }) - .collect::>(); + move || socket_thread(socket, &maglev, &next_ip, &next_mac, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/simplefwd-rs/Cargo.toml b/examples/simplefwd-rs/Cargo.toml index a3146a9..5a52ff7 100644 --- a/examples/simplefwd-rs/Cargo.toml +++ b/examples/simplefwd-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/simplefwd-rs/src/cli.rs b/examples/simplefwd-rs/src/cli.rs index e5de218..53aa2ad 100644 --- a/examples/simplefwd-rs/src/cli.rs +++ b/examples/simplefwd-rs/src/cli.rs @@ -1,9 +1,9 @@ -#[cfg(feature = "stats")] use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRing; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] + pub cpu_range: CpuRing, #[cfg(feature = "stats")] #[command(flatten)] @@ -43,10 +30,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: CpuRing, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/simplefwd-rs/src/error.rs b/examples/simplefwd-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/simplefwd-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/simplefwd-rs/src/main.rs b/examples/simplefwd-rs/src/main.rs index 9ea3956..a1ef700 100644 --- a/examples/simplefwd-rs/src/main.rs +++ b/examples/simplefwd-rs/src/main.rs @@ -1,11 +1,12 @@ mod cli; +mod error; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -15,10 +16,10 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -50,108 +51,75 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::info!("Sockets: {sockets:?}"); +fn run(mut cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/lib/flash-rs/src/client.rs b/lib/flash-rs/src/client.rs index 183cd23..c6863a7 100644 --- a/lib/flash-rs/src/client.rs +++ b/lib/flash-rs/src/client.rs @@ -1,10 +1,11 @@ -use std::{net::Ipv4Addr, str::FromStr, sync::Arc}; +use std::sync::Arc; use crate::{ config::{BindFlags, FlashConfig, Mode, PollConfig, SocketConfig, XskConfig}, error::FlashResult, fd::SocketFd, mem::{PollOutStatus, Umem}, + monitor::Monitor, uds::UdsClient, xsk::Socket, }; @@ -12,14 +13,8 @@ use crate::{ #[cfg(feature = "stats")] use crate::{config::XdpFlags, stats::Stats}; -#[derive(Debug)] -pub struct Route { - pub ip_addr: Ipv4Addr, - pub next: Vec, -} - #[allow(clippy::missing_errors_doc, clippy::too_many_lines)] -pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { +pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Monitor)> { let mut uds_client = UdsClient::new()?; let (umem_fd, total_sockets, umem_size, umem_scale) = @@ -91,17 +86,6 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { #[cfg(all(feature = "stats", feature = "tracing"))] tracing::debug!("Ifname: {ifname}"); - let route = Route { - ip_addr: Ipv4Addr::from_str(&uds_client.get_ip_addr()?)?, - next: uds_client - .get_dst_ip_addr()? - .iter() - .map(|y| Ipv4Addr::from_str(y)) - .collect::, _>>()?, - }; - - uds_client.set_nonblocking()?; - let xsk_config = XskConfig::new(bind_flags, mode); let next_size = uds_client.get_route_info()?; @@ -120,13 +104,9 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { let prev_nf = uds_client.get_prev_nf()?; let pollout_status = PollOutStatus::new(pollout_fd, pollout_size, config.nf_id, prev_nf)?; + let socket_config = Arc::new(SocketConfig::new(xsk_config, poll_config, pollout_status)); - let socket_config = Arc::new(SocketConfig::new( - xsk_config, - poll_config, - pollout_status, - uds_client, - )); + let uds_client = Arc::new(uds_client); let sockets = socket_info .into_iter() @@ -146,9 +126,10 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { #[cfg(feature = "stats")] Stats::new(fd, ifname.clone(), ifqueue, xdp_flags.clone()), socket_config.clone(), + uds_client.clone(), ) }) - .collect::, _>>()?; + .collect::>()?; - Ok((sockets, route)) + Ok((sockets, Monitor::new(uds_client))) } diff --git a/lib/flash-rs/src/config/socket.rs b/lib/flash-rs/src/config/socket.rs index e65889f..c0bf004 100644 --- a/lib/flash-rs/src/config/socket.rs +++ b/lib/flash-rs/src/config/socket.rs @@ -1,4 +1,4 @@ -use crate::{mem::PollOutStatus, uds::UdsClient}; +use crate::mem::PollOutStatus; use super::{poll::PollConfig, xsk::XskConfig}; @@ -7,21 +7,14 @@ pub(crate) struct SocketConfig { pub(crate) xsk: XskConfig, pub(crate) poll: PollConfig, pub(crate) pollout_status: PollOutStatus, - _uds_client: UdsClient, } impl SocketConfig { - pub(crate) fn new( - xsk: XskConfig, - poll: PollConfig, - pollout_status: PollOutStatus, - uds_client: UdsClient, - ) -> Self { + pub(crate) fn new(xsk: XskConfig, poll: PollConfig, pollout_status: PollOutStatus) -> Self { Self { xsk, poll, pollout_status, - _uds_client: uds_client, } } } diff --git a/lib/flash-rs/src/lib.rs b/lib/flash-rs/src/lib.rs index 4ac2bda..da89740 100644 --- a/lib/flash-rs/src/lib.rs +++ b/lib/flash-rs/src/lib.rs @@ -3,6 +3,7 @@ mod config; mod error; mod fd; mod mem; +mod monitor; mod uds; mod util; mod xsk; @@ -14,8 +15,5 @@ pub mod stats; pub mod tui; pub use crate::{ - client::{Route, connect}, - config::FlashConfig, - error::FlashError, - xsk::Socket, + client::connect, config::FlashConfig, error::FlashError, monitor::Monitor, xsk::Socket, }; diff --git a/lib/flash-rs/src/monitor.rs b/lib/flash-rs/src/monitor.rs new file mode 100644 index 0000000..c6a0a70 --- /dev/null +++ b/lib/flash-rs/src/monitor.rs @@ -0,0 +1,62 @@ +use std::{ + net::Ipv4Addr, + str::FromStr as _, + sync::Arc, + thread::{self, JoinHandle}, +}; + +use libc::{POLLERR, POLLHUP, POLLRDHUP, pollfd}; + +use crate::{error::FlashResult, uds::UdsClient}; + +pub struct Monitor { + uds_client: Arc, +} + +impl Monitor { + pub(crate) fn new(uds_client: Arc) -> Self { + Self { uds_client } + } + + #[allow(clippy::mut_from_ref)] + fn get_mut_client(&self) -> &mut UdsClient { + unsafe { &mut *Arc::as_ptr(&self.uds_client).cast_mut() } + } + + #[allow(clippy::missing_errors_doc)] + pub fn get_nf_ip_addr(&mut self) -> FlashResult { + Ok(Ipv4Addr::from_str(&self.get_mut_client().get_ip_addr()?)?) + } + + #[allow(clippy::missing_errors_doc)] + pub fn get_next_ip_addr(&mut self) -> FlashResult> { + Ok(self + .get_mut_client() + .get_dst_ip_addr()? + .iter() + .map(|y| Ipv4Addr::from_str(y)) + .collect::, _>>()?) + } + + pub fn spawn_disconnect_handler(&self, handler: F) -> JoinHandle<()> + where + F: FnOnce() + Send + 'static, + { + let fd = self.uds_client.get_conn_raw_fd(); + thread::spawn(move || unsafe { + let events = POLLHUP | POLLERR | POLLRDHUP; + let mut pollfd = pollfd { + fd, + events, + revents: 0, + }; + + loop { + if libc::poll(&raw mut pollfd, 1, -1) > 0 && (pollfd.revents & events) != 0 { + handler(); + break; + } + } + }) + } +} diff --git a/lib/flash-rs/src/tui/dashboard.rs b/lib/flash-rs/src/tui/dashboard.rs index 163bccf..7c3a8bc 100644 --- a/lib/flash-rs/src/tui/dashboard.rs +++ b/lib/flash-rs/src/tui/dashboard.rs @@ -1,6 +1,9 @@ use std::{ io, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; @@ -8,7 +11,7 @@ use ratatui::{ Terminal, crossterm::{ ExecutableCommand, - event::{self, Event, KeyCode, KeyEvent, KeyEventKind}, + event::{self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers}, terminal::{self, EnterAlternateScreen, LeaveAlternateScreen}, }, prelude::CrosstermBackend, @@ -29,6 +32,7 @@ pub struct StatsDashboard { terminal: Terminal>, panels: Vec, layout_cache: LayoutCache, + stop: Option>, } impl StatsDashboard { @@ -37,6 +41,7 @@ impl StatsDashboard { stats: impl Iterator>, fps: u64, layout: GridLayout, + stop: Option>, ) -> TuiResult { let panels = stats .enumerate() @@ -56,6 +61,7 @@ impl StatsDashboard { terminal, panels, layout_cache: LayoutCache::new(layout, num_panels), + stop, }) } @@ -80,33 +86,47 @@ impl StatsDashboard { fn poll_until_next_frame(&mut self) -> io::Result { let next_frame_time = self.last_frame_time + self.frame_interval; - loop { + let flag = loop { + if let Some(stop) = &self.stop + && stop.load(Ordering::Relaxed) + { + break true; + } + let now = Instant::now(); if now >= next_frame_time { - break; + break false; } if event::poll(next_frame_time - now)? { match event::read()? { Event::Key(KeyEvent { code, + modifiers, kind: KeyEventKind::Press, .. }) => match code { - KeyCode::Char('q' | 'Q') | KeyCode::Esc => return Ok(true), - KeyCode::Char('r' | 'R') => return Ok(false), + KeyCode::Char('q' | 'Q') | KeyCode::Esc => break true, + KeyCode::Char('c' | 'C') if modifiers.contains(KeyModifiers::CONTROL) => { + break true; + } + KeyCode::Char('r' | 'R') => break false, _ => {} }, Event::Resize(width, height) => { self.resize_panels((width, height)); - return Ok(false); + break false; } _ => {} } } - } + }; - Ok(false) + Ok(flag + || self + .stop + .as_ref() + .is_some_and(|r| r.load(Ordering::Relaxed))) } #[allow(clippy::missing_errors_doc)] diff --git a/lib/flash-rs/src/uds/client.rs b/lib/flash-rs/src/uds/client.rs index b0cb728..7558019 100644 --- a/lib/flash-rs/src/uds/client.rs +++ b/lib/flash-rs/src/uds/client.rs @@ -1,3 +1,5 @@ +use std::os::fd::{AsRawFd as _, RawFd}; + use crate::util; use super::{ @@ -14,7 +16,7 @@ use super::{ const FLASH_UNIX_SOCKET_PATH: &str = "/tmp/flash/uds.sock"; #[derive(Debug)] -pub(crate) struct UdsClient { +pub struct UdsClient { conn: UdsConn, } @@ -26,6 +28,10 @@ impl UdsClient { }) } + pub(crate) fn get_conn_raw_fd(&self) -> RawFd { + self.conn.as_raw_fd() + } + #[allow( clippy::cast_possible_truncation, clippy::cast_possible_wrap, @@ -201,10 +207,6 @@ impl UdsClient { Ok(prev_nf_ids) } - - pub(crate) fn set_nonblocking(&mut self) -> UdsResult<()> { - Ok(self.conn.set_nonblocking(true)?) - } } impl Drop for UdsClient { diff --git a/lib/flash-rs/src/uds/conn.rs b/lib/flash-rs/src/uds/conn.rs index 888e745..7b74149 100644 --- a/lib/flash-rs/src/uds/conn.rs +++ b/lib/flash-rs/src/uds/conn.rs @@ -1,6 +1,9 @@ use std::{ io::{self, Read as _, Write as _}, - os::unix::net::UnixStream, + os::{ + fd::{AsRawFd, RawFd}, + unix::net::UnixStream, + }, path::Path, }; @@ -60,9 +63,11 @@ impl UdsConn { .trim_end_matches('\0') .to_string()) } +} +impl AsRawFd for UdsConn { #[inline] - pub(super) fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { - self.0.set_nonblocking(nonblocking) + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() } } diff --git a/lib/flash-rs/src/uds/mod.rs b/lib/flash-rs/src/uds/mod.rs index 05915dc..7cb4170 100644 --- a/lib/flash-rs/src/uds/mod.rs +++ b/lib/flash-rs/src/uds/mod.rs @@ -3,6 +3,4 @@ mod conn; mod def; mod error; -pub(crate) use client::UdsClient; - -pub use error::UdsError; +pub use {client::UdsClient, error::UdsError}; diff --git a/lib/flash-rs/src/xsk/socket.rs b/lib/flash-rs/src/xsk/socket.rs index 0034321..c66c2aa 100644 --- a/lib/flash-rs/src/xsk/socket.rs +++ b/lib/flash-rs/src/xsk/socket.rs @@ -6,6 +6,7 @@ use crate::{ config::{BindFlags, Mode, PollMode, SocketConfig}, fd::SocketFd, mem::{CompRing, Cons as _, Desc, FillRing, Prod as _, RxRing, TxRing, Umem}, + uds::UdsClient, }; #[cfg(feature = "pool")] @@ -31,13 +32,16 @@ pub struct Socket { outstanding_tx: u32, clock: Clock, idle_timestamp: Option, + config: Arc, + _uds_guard: Arc, #[cfg(feature = "stats")] stats: Arc, } impl Socket { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( fd: SocketFd, umem: Umem, @@ -46,6 +50,7 @@ impl Socket { umem_offset: u64, #[cfg(feature = "stats")] stats: Stats, config: Arc, + uds_guard: Arc, ) -> SocketResult { let off = fd.xdp_mmap_offsets()?; @@ -73,7 +78,9 @@ impl Socket { outstanding_tx: 0, clock: Clock::new(), idle_timestamp: None, + config, + _uds_guard: uds_guard, #[cfg(feature = "stats")] stats: Arc::new(stats), diff --git a/lib/utils-rs/Cargo.toml b/lib/utils-rs/Cargo.toml new file mode 100644 index 0000000..c67fdf0 --- /dev/null +++ b/lib/utils-rs/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "utils" +version = "0.1.0" +edition = "2024" + +[dependencies] +core_affinity = "0.8.3" +thiserror = "2.0.17" diff --git a/lib/utils-rs/src/cpu.rs b/lib/utils-rs/src/cpu.rs new file mode 100644 index 0000000..d19b941 --- /dev/null +++ b/lib/utils-rs/src/cpu.rs @@ -0,0 +1,114 @@ +use std::{ + str::FromStr, + thread::{self, JoinHandle}, +}; + +use core_affinity::CoreId; + +use crate::error::{UtilError, UtilResult}; + +#[derive(Clone, Debug)] +pub struct CpuRing { + cores: Vec, + curr_idx: usize, +} + +impl CpuRing { + #[allow(clippy::missing_errors_doc)] + pub fn new() -> UtilResult { + Ok(Self { + cores: core_affinity::get_core_ids().ok_or(UtilError::NoCpuCores)?, + curr_idx: 0, + }) + } + + pub fn reset(&mut self) { + self.curr_idx = 0; + } + + pub fn set_affinity_next(&mut self) -> impl FnOnce() -> bool { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + move || core_affinity::set_for_current(core_id) + } + + #[allow(clippy::missing_errors_doc)] + pub fn spawn(&mut self, f: F) -> JoinHandle<()> + where + F: FnOnce() + Send + 'static, + { + if self.cores.is_empty() { + thread::spawn(f) + } else { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) + } + } + + #[allow(clippy::missing_errors_doc)] + pub fn spawn_multiple(&mut self, funcs: impl IntoIterator) -> Vec> + where + F: FnOnce() + Send + 'static, + { + if self.cores.is_empty() { + funcs.into_iter().map(|f| thread::spawn(f)).collect() + } else { + funcs + .into_iter() + .map(|f| { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) + }) + .collect() + } + } +} + +impl FromStr for CpuRing { + type Err = UtilError; + + fn from_str(s: &str) -> Result { + let available_cores = match core_affinity::get_core_ids() { + Some(cores) if !cores.is_empty() => cores, + _ => return Err(UtilError::NoCpuCores), + }; + + let mut cores = Vec::new(); + for part in s.split(',') { + if let Some((start, end)) = part.split_once('-') + && let Ok(start) = start.trim().parse::() + && let Ok(end) = end.trim().parse::() + && start <= end + { + for core in start..=end { + if let Some(core_id) = available_cores.iter().find(|c| c.id == core) { + cores.push(*core_id); + } else { + return Err(UtilError::CpuCoreNotFound(core)); + } + } + } else if let Ok(core) = part.trim().parse::() { + if let Some(core_id) = available_cores.iter().find(|c| c.id == core) { + cores.push(*core_id); + } else { + return Err(UtilError::CpuCoreNotFound(core)); + } + } else { + return Err(UtilError::InvalidCpuCoreRange(part.to_string())); + } + } + + Ok(Self { cores, curr_idx: 0 }) + } +} diff --git a/lib/utils-rs/src/error.rs b/lib/utils-rs/src/error.rs new file mode 100644 index 0000000..d60f8f5 --- /dev/null +++ b/lib/utils-rs/src/error.rs @@ -0,0 +1,14 @@ +pub(crate) type UtilResult = Result; + +#[derive(Debug, thiserror::Error)] +#[error("util error: {0}")] +pub enum UtilError { + #[error("util error: no CPU cores found")] + NoCpuCores, + + #[error("util error: invalid CPU core/range {0}")] + InvalidCpuCoreRange(String), + + #[error("util error: CPU core {0} not found")] + CpuCoreNotFound(usize), +} diff --git a/lib/utils-rs/src/lib.rs b/lib/utils-rs/src/lib.rs new file mode 100644 index 0000000..1bbd508 --- /dev/null +++ b/lib/utils-rs/src/lib.rs @@ -0,0 +1,4 @@ +mod cpu; +mod error; + +pub use {cpu::CpuRing, error::UtilError}; From b2524b6938d3024e4b25ccd8deee293659a02c17 Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Fri, 16 Jan 2026 11:12:12 +0530 Subject: [PATCH 2/6] fix(rust): socket bugs --- lib/flash-rs/src/xsk/socket.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/flash-rs/src/xsk/socket.rs b/lib/flash-rs/src/xsk/socket.rs index c66c2aa..b23f3c2 100644 --- a/lib/flash-rs/src/xsk/socket.rs +++ b/lib/flash-rs/src/xsk/socket.rs @@ -273,14 +273,12 @@ impl Socket { } else { self.complete_tx_rx(); - if self.shared.xsk_config.mode.contains(Mode::FLASH_BUSY_POLL) - || self.tx.needs_wakeup() - { + if self.config.xsk.mode.contains(Mode::FLASH_BUSY_POLL) || self.tx.needs_wakeup() { #[cfg(feature = "stats")] unsafe { (*self.stats.app.get()).tx_wakeup_sendtos += 1; } - let _ = self.fd.kick(); + self.fd.kick_tx(); } } } @@ -327,7 +325,7 @@ impl Socket { } #[cfg(feature = "tracing")] - if rcvd > self.shared.xsk_config.batch_size { + if rcvd > self.config.xsk.batch_size { tracing::warn!("xsk: received more descriptors than batch size"); } @@ -363,14 +361,12 @@ impl Socket { } else { self.complete_tx_rx(); - if self.shared.xsk_config.mode.contains(Mode::FLASH_BUSY_POLL) - || self.tx.needs_wakeup() - { + if self.config.xsk.mode.contains(Mode::FLASH_BUSY_POLL) || self.tx.needs_wakeup() { #[cfg(feature = "stats")] unsafe { (*self.stats.app.get()).tx_wakeup_sendtos += 1; } - let _ = self.fd.kick(); + self.fd.kick_tx(); } } } From 12349fe5686fd4130ec078094e67ff3bb85808e9 Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Fri, 16 Jan 2026 22:13:04 +0530 Subject: [PATCH 3/6] fix(rust): uds client closing error logging --- lib/flash-rs/src/uds/client.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/flash-rs/src/uds/client.rs b/lib/flash-rs/src/uds/client.rs index 7558019..101bd40 100644 --- a/lib/flash-rs/src/uds/client.rs +++ b/lib/flash-rs/src/uds/client.rs @@ -211,14 +211,11 @@ impl UdsClient { impl Drop for UdsClient { fn drop(&mut self) { - #[cfg(feature = "tracing")] if let Err(err) = self.conn.write_all(&FLASH_CLOSE_CONN) { + #[cfg(feature = "tracing")] tracing::error!("error closing flash connection: {err}"); - } else { - tracing::debug!("Sent FLASH_CLOSE_CONN: {FLASH_CLOSE_CONN:?}"); - } - if let Err(err) = self.conn.write_all(&FLASH_CLOSE_CONN) { + #[cfg(not(feature = "tracing"))] eprintln!("error closing flash connection: {err}"); } } From 9f6e2c9b5b1ef94627f4393241bbe917a67d212f Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Sat, 17 Jan 2026 21:44:08 +0530 Subject: [PATCH 4/6] feat(rust): CpuRange - default and all - renamed CpuRing to CpuRange - added default and all CpuRange - fixed formatting --- examples/arpresolver-rs/src/cli.rs | 22 ++++++++++---- examples/firewall-rs/src/cli.rs | 22 ++++++++++---- examples/ip4ping-rs/src/cli.rs | 22 ++++++++++---- examples/l2fwd-rs/src/cli.rs | 22 ++++++++++---- examples/maglev-rs/src/cli.rs | 22 ++++++++++---- examples/simplefwd-rs/src/cli.rs | 22 ++++++++++---- lib/flash-rs/src/tui/layout_str.rs | 4 +-- lib/utils-rs/src/cpu.rs | 47 +++++++++++++++++++++++++----- lib/utils-rs/src/lib.rs | 2 +- 9 files changed, 145 insertions(+), 40 deletions(-) diff --git a/examples/arpresolver-rs/src/cli.rs b/examples/arpresolver-rs/src/cli.rs index eb32532..949ee48 100644 --- a/examples/arpresolver-rs/src/cli.rs +++ b/examples/arpresolver-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[arg(short = 'M', long, help = "NF MAC address")] pub nf_mac: MacAddr6, @@ -35,11 +41,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/firewall-rs/src/cli.rs b/examples/firewall-rs/src/cli.rs index ac9a5da..7815e56 100644 --- a/examples/firewall-rs/src/cli.rs +++ b/examples/firewall-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::{path::PathBuf, str::FromStr as _}; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[arg(short, long, help = "Path to denylist csv file")] pub denylist: PathBuf, @@ -35,11 +41,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/ip4ping-rs/src/cli.rs b/examples/ip4ping-rs/src/cli.rs index 31391d6..6feae1c 100644 --- a/examples/ip4ping-rs/src/cli.rs +++ b/examples/ip4ping-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[arg(short = 'm', long, help = "Dest MAC address")] pub mac_addr: Option, @@ -32,11 +38,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/l2fwd-rs/src/cli.rs b/examples/l2fwd-rs/src/cli.rs index 53aa2ad..f3fa940 100644 --- a/examples/l2fwd-rs/src/cli.rs +++ b/examples/l2fwd-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[cfg(feature = "stats")] #[command(flatten)] @@ -32,11 +38,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/maglev-rs/src/cli.rs b/examples/maglev-rs/src/cli.rs index 64bf3d0..a2cad23 100644 --- a/examples/maglev-rs/src/cli.rs +++ b/examples/maglev-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::{net::Ipv4Addr, str::FromStr as _}; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[cfg(feature = "stats")] #[command(flatten)] @@ -35,11 +41,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/simplefwd-rs/src/cli.rs b/examples/simplefwd-rs/src/cli.rs index 53aa2ad..f3fa940 100644 --- a/examples/simplefwd-rs/src/cli.rs +++ b/examples/simplefwd-rs/src/cli.rs @@ -3,7 +3,7 @@ use std::str::FromStr as _; use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; -use utils::CpuRing; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,8 +13,14 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg(short = 'c', long, value_parser = CpuRing::from_str, help = "CPU core range for socket threads")] - pub cpu_range: CpuRing, + #[arg( + short = 'c', + long, + default_value_t = CpuRange::default(), + value_parser = CpuRange::from_str, + help = "CPU core range for socket threads" + )] + pub cpu_range: CpuRange, #[cfg(feature = "stats")] #[command(flatten)] @@ -32,11 +38,17 @@ pub struct StatsConfig { long = "stats-cpu", help = "CPU core index for stats thread" )] - pub cpu: CpuRing, + pub cpu: CpuRange, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + value_parser = GridLayout::from_str, + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/lib/flash-rs/src/tui/layout_str.rs b/lib/flash-rs/src/tui/layout_str.rs index 00dc968..2977e70 100644 --- a/lib/flash-rs/src/tui/layout_str.rs +++ b/lib/flash-rs/src/tui/layout_str.rs @@ -1,4 +1,4 @@ -use std::{fmt, str::FromStr}; +use std::{fmt, str}; use super::layout::GridLayout; @@ -14,7 +14,7 @@ impl GridLayoutParseError { } } -impl FromStr for GridLayout { +impl str::FromStr for GridLayout { type Err = GridLayoutParseError; fn from_str(s: &str) -> Result { diff --git a/lib/utils-rs/src/cpu.rs b/lib/utils-rs/src/cpu.rs index d19b941..e57a9d8 100644 --- a/lib/utils-rs/src/cpu.rs +++ b/lib/utils-rs/src/cpu.rs @@ -1,5 +1,5 @@ use std::{ - str::FromStr, + fmt, str, thread::{self, JoinHandle}, }; @@ -7,13 +7,13 @@ use core_affinity::CoreId; use crate::error::{UtilError, UtilResult}; -#[derive(Clone, Debug)] -pub struct CpuRing { +#[derive(Clone, Debug, Default)] +pub struct CpuRange { cores: Vec, curr_idx: usize, } -impl CpuRing { +impl CpuRange { #[allow(clippy::missing_errors_doc)] pub fn new() -> UtilResult { Ok(Self { @@ -22,6 +22,14 @@ impl CpuRing { }) } + #[allow(clippy::missing_errors_doc)] + pub fn all() -> UtilResult { + match core_affinity::get_core_ids() { + Some(cores) if !cores.is_empty() => Ok(Self { cores, curr_idx: 0 }), + _ => Err(UtilError::NoCpuCores), + } + } + pub fn reset(&mut self) { self.curr_idx = 0; } @@ -33,7 +41,6 @@ impl CpuRing { move || core_affinity::set_for_current(core_id) } - #[allow(clippy::missing_errors_doc)] pub fn spawn(&mut self, f: F) -> JoinHandle<()> where F: FnOnce() + Send + 'static, @@ -51,7 +58,6 @@ impl CpuRing { } } - #[allow(clippy::missing_errors_doc)] pub fn spawn_multiple(&mut self, funcs: impl IntoIterator) -> Vec> where F: FnOnce() + Send + 'static, @@ -75,7 +81,7 @@ impl CpuRing { } } -impl FromStr for CpuRing { +impl str::FromStr for CpuRange { type Err = UtilError; fn from_str(s: &str) -> Result { @@ -112,3 +118,30 @@ impl FromStr for CpuRing { Ok(Self { cores, curr_idx: 0 }) } } + +impl fmt::Display for CpuRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut parts = Vec::new(); + let mut i = 0; + + while i < self.cores.len() { + let start = self.cores[i].id; + let mut end = start; + + while i + 1 < self.cores.len() && self.cores[i + 1].id == end + 1 { + end = self.cores[i + 1].id; + i += 1; + } + + if start == end { + parts.push(format!("{start}")); + } else { + parts.push(format!("{start}-{end}")); + } + + i += 1; + } + + write!(f, "{}", parts.join(",")) + } +} diff --git a/lib/utils-rs/src/lib.rs b/lib/utils-rs/src/lib.rs index 1bbd508..f45652c 100644 --- a/lib/utils-rs/src/lib.rs +++ b/lib/utils-rs/src/lib.rs @@ -1,4 +1,4 @@ mod cpu; mod error; -pub use {cpu::CpuRing, error::UtilError}; +pub use {cpu::CpuRange, error::UtilError}; From bd6be6cd4efbe39d1d148047da8a871efc88802c Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Sat, 17 Jan 2026 23:21:59 +0530 Subject: [PATCH 5/6] fix(rust): cli parsing - added default stats threads - updated CpuRange, empty cores - removed implicit clap value_parser --- examples/arpresolver-rs/src/cli.rs | 5 +- examples/firewall-rs/src/cli.rs | 5 +- examples/ip4ping-rs/src/cli.rs | 5 +- examples/l2fwd-rs/src/cli.rs | 5 +- examples/maglev-rs/src/cli.rs | 5 +- examples/simplefwd-rs/src/cli.rs | 5 +- lib/utils-rs/src/cpu.rs | 112 +++++++++++++++++++---------- 7 files changed, 82 insertions(+), 60 deletions(-) diff --git a/examples/arpresolver-rs/src/cli.rs b/examples/arpresolver-rs/src/cli.rs index 949ee48..b3cb38b 100644 --- a/examples/arpresolver-rs/src/cli.rs +++ b/examples/arpresolver-rs/src/cli.rs @@ -1,5 +1,3 @@ -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; @@ -17,7 +15,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -39,6 +36,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -50,7 +48,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/examples/firewall-rs/src/cli.rs b/examples/firewall-rs/src/cli.rs index 7815e56..fbdb1fc 100644 --- a/examples/firewall-rs/src/cli.rs +++ b/examples/firewall-rs/src/cli.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, str::FromStr as _}; +use std::path::PathBuf; use clap::Parser; use flash::FlashConfig; @@ -17,7 +17,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -39,6 +38,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -50,7 +50,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/examples/ip4ping-rs/src/cli.rs b/examples/ip4ping-rs/src/cli.rs index 6feae1c..931335d 100644 --- a/examples/ip4ping-rs/src/cli.rs +++ b/examples/ip4ping-rs/src/cli.rs @@ -1,5 +1,3 @@ -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; @@ -17,7 +15,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -36,6 +33,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -47,7 +45,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/examples/l2fwd-rs/src/cli.rs b/examples/l2fwd-rs/src/cli.rs index f3fa940..581183a 100644 --- a/examples/l2fwd-rs/src/cli.rs +++ b/examples/l2fwd-rs/src/cli.rs @@ -1,5 +1,3 @@ -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; @@ -17,7 +15,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -36,6 +33,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -47,7 +45,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/examples/maglev-rs/src/cli.rs b/examples/maglev-rs/src/cli.rs index a2cad23..32357d4 100644 --- a/examples/maglev-rs/src/cli.rs +++ b/examples/maglev-rs/src/cli.rs @@ -1,4 +1,4 @@ -use std::{net::Ipv4Addr, str::FromStr as _}; +use std::net::Ipv4Addr; use clap::Parser; use flash::FlashConfig; @@ -17,7 +17,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -39,6 +38,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -50,7 +50,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/examples/simplefwd-rs/src/cli.rs b/examples/simplefwd-rs/src/cli.rs index f3fa940..581183a 100644 --- a/examples/simplefwd-rs/src/cli.rs +++ b/examples/simplefwd-rs/src/cli.rs @@ -1,5 +1,3 @@ -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; @@ -17,7 +15,6 @@ pub struct Cli { short = 'c', long, default_value_t = CpuRange::default(), - value_parser = CpuRange::from_str, help = "CPU core range for socket threads" )] pub cpu_range: CpuRange, @@ -36,6 +33,7 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", + default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] pub cpu: CpuRange, @@ -47,7 +45,6 @@ pub struct StatsConfig { short = 'l', long, default_value_t = GridLayout::default(), - value_parser = GridLayout::from_str, help = "Tui layout" )] pub layout: GridLayout, diff --git a/lib/utils-rs/src/cpu.rs b/lib/utils-rs/src/cpu.rs index e57a9d8..47f828a 100644 --- a/lib/utils-rs/src/cpu.rs +++ b/lib/utils-rs/src/cpu.rs @@ -13,21 +13,44 @@ pub struct CpuRange { curr_idx: usize, } +fn get_available_cores() -> UtilResult> { + match core_affinity::get_core_ids() { + Some(cores) if !cores.is_empty() => Ok(cores), + _ => Err(UtilError::NoCpuCores), + } +} + impl CpuRange { #[allow(clippy::missing_errors_doc)] - pub fn new() -> UtilResult { - Ok(Self { - cores: core_affinity::get_core_ids().ok_or(UtilError::NoCpuCores)?, - curr_idx: 0, - }) + pub fn new(cores: impl IntoIterator) -> UtilResult { + let available_cores = get_available_cores()?; + let cores = cores + .into_iter() + .map(|core| { + available_cores + .iter() + .find(|c| c.id == core) + .copied() + .ok_or(UtilError::CpuCoreNotFound(core)) + }) + .collect::, _>>()?; + + Ok(cores.into()) } #[allow(clippy::missing_errors_doc)] pub fn all() -> UtilResult { - match core_affinity::get_core_ids() { - Some(cores) if !cores.is_empty() => Ok(Self { cores, curr_idx: 0 }), - _ => Err(UtilError::NoCpuCores), - } + Ok(get_available_cores()?.into()) + } + + #[allow(clippy::must_use_candidate)] + pub fn len(&self) -> usize { + self.cores.len() + } + + #[allow(clippy::must_use_candidate)] + pub fn is_empty(&self) -> bool { + self.cores.is_empty() } pub fn reset(&mut self) { @@ -46,38 +69,45 @@ impl CpuRange { F: FnOnce() + Send + 'static, { if self.cores.is_empty() { - thread::spawn(f) - } else { - let core_id = self.cores[self.curr_idx]; - self.curr_idx = (self.curr_idx + 1) % self.cores.len(); - - thread::spawn(move || { - core_affinity::set_for_current(core_id); - f(); - }) + return thread::spawn(f); } + + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) } - pub fn spawn_multiple(&mut self, funcs: impl IntoIterator) -> Vec> + pub fn spawn_multiple(&mut self, funcs: I) -> Vec> where F: FnOnce() + Send + 'static, + I: IntoIterator, { if self.cores.is_empty() { - funcs.into_iter().map(|f| thread::spawn(f)).collect() - } else { - funcs - .into_iter() - .map(|f| { - let core_id = self.cores[self.curr_idx]; - self.curr_idx = (self.curr_idx + 1) % self.cores.len(); - - thread::spawn(move || { - core_affinity::set_for_current(core_id); - f(); - }) - }) - .collect() + return funcs.into_iter().map(|f| thread::spawn(f)).collect(); } + + funcs + .into_iter() + .map(|f| { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) + }) + .collect() + } +} + +impl From> for CpuRange { + fn from(cores: Vec) -> Self { + Self { cores, curr_idx: 0 } } } @@ -85,12 +115,14 @@ impl str::FromStr for CpuRange { type Err = UtilError; fn from_str(s: &str) -> Result { - let available_cores = match core_affinity::get_core_ids() { - Some(cores) if !cores.is_empty() => cores, - _ => return Err(UtilError::NoCpuCores), - }; + let s = s.trim(); + if s.is_empty() { + return Ok(CpuRange::default()); + } + let available_cores = get_available_cores()?; let mut cores = Vec::new(); + for part in s.split(',') { if let Some((start, end)) = part.split_once('-') && let Ok(start) = start.trim().parse::() @@ -115,12 +147,16 @@ impl str::FromStr for CpuRange { } } - Ok(Self { cores, curr_idx: 0 }) + Ok(cores.into()) } } impl fmt::Display for CpuRange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.cores.is_empty() { + return write!(f, "none"); + } + let mut parts = Vec::new(); let mut i = 0; From 8595bfadf9a2fafe6fca96d5818eccc6c1d3d2c4 Mon Sep 17 00:00:00 2001 From: Arghyadip Chakraborty Date: Sun, 18 Jan 2026 00:29:28 +0530 Subject: [PATCH 6/6] fix(rust): default cpu range --- examples/arpresolver-rs/src/cli.rs | 12 +++--------- examples/arpresolver-rs/src/main.rs | 5 +++-- examples/firewall-rs/src/cli.rs | 12 +++--------- examples/firewall-rs/src/main.rs | 5 +++-- examples/ip4ping-rs/src/cli.rs | 12 +++--------- examples/ip4ping-rs/src/main.rs | 5 +++-- examples/l2fwd-rs/src/cli.rs | 12 +++--------- examples/l2fwd-rs/src/main.rs | 5 +++-- examples/maglev-rs/src/cli.rs | 12 +++--------- examples/maglev-rs/src/main.rs | 5 +++-- examples/simplefwd-rs/src/cli.rs | 12 +++--------- examples/simplefwd-rs/src/main.rs | 5 +++-- lib/utils-rs/src/cpu.rs | 4 ---- 13 files changed, 36 insertions(+), 70 deletions(-) diff --git a/examples/arpresolver-rs/src/cli.rs b/examples/arpresolver-rs/src/cli.rs index b3cb38b..7b1729b 100644 --- a/examples/arpresolver-rs/src/cli.rs +++ b/examples/arpresolver-rs/src/cli.rs @@ -11,13 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short = 'M', long, help = "NF MAC address")] pub nf_mac: MacAddr6, @@ -36,10 +31,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/arpresolver-rs/src/main.rs b/examples/arpresolver-rs/src/main.rs index 53ad412..25a41de 100644 --- a/examples/arpresolver-rs/src/main.rs +++ b/examples/arpresolver-rs/src/main.rs @@ -61,7 +61,7 @@ fn socket_thread( } } -fn run(mut cli: Cli) -> Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; let nf_ip_addr = monitor.get_nf_ip_addr()?; let stop = Arc::new(AtomicBool::new(true)); @@ -75,7 +75,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { )?; #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -98,6 +98,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); move || socket_thread(socket, cli.nf_mac, nf_ip_addr, cli.mac_addr, &stop) diff --git a/examples/firewall-rs/src/cli.rs b/examples/firewall-rs/src/cli.rs index fbdb1fc..bb73aae 100644 --- a/examples/firewall-rs/src/cli.rs +++ b/examples/firewall-rs/src/cli.rs @@ -13,13 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short, long, help = "Path to denylist csv file")] pub denylist: PathBuf, @@ -38,10 +33,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/firewall-rs/src/main.rs b/examples/firewall-rs/src/main.rs index f97b41e..c368295 100644 --- a/examples/firewall-rs/src/main.rs +++ b/examples/firewall-rs/src/main.rs @@ -56,7 +56,7 @@ fn socket_thread( } } -fn run(mut cli: Cli) -> Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, monitor) = flash::connect(&cli.flash_config)?; let stop = Arc::new(AtomicBool::new(true)); @@ -71,7 +71,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let firewall = Arc::new(Firewall::new(cli.denylist)?); #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -94,6 +94,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); let firewall = firewall.clone(); diff --git a/examples/ip4ping-rs/src/cli.rs b/examples/ip4ping-rs/src/cli.rs index 931335d..5249f19 100644 --- a/examples/ip4ping-rs/src/cli.rs +++ b/examples/ip4ping-rs/src/cli.rs @@ -11,13 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short = 'm', long, help = "Dest MAC address")] pub mac_addr: Option, @@ -33,10 +28,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/ip4ping-rs/src/main.rs b/examples/ip4ping-rs/src/main.rs index ec9c35a..cec4569 100644 --- a/examples/ip4ping-rs/src/main.rs +++ b/examples/ip4ping-rs/src/main.rs @@ -54,7 +54,7 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, monitor) = flash::connect(&cli.flash_config)?; let stop = Arc::new(AtomicBool::new(true)); @@ -67,7 +67,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { )?; #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -90,6 +90,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); move || socket_thread(socket, cli.mac_addr, &stop) diff --git a/examples/l2fwd-rs/src/cli.rs b/examples/l2fwd-rs/src/cli.rs index 581183a..9c8b2c4 100644 --- a/examples/l2fwd-rs/src/cli.rs +++ b/examples/l2fwd-rs/src/cli.rs @@ -11,13 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -33,10 +28,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/l2fwd-rs/src/main.rs b/examples/l2fwd-rs/src/main.rs index 100d435..911a08b 100644 --- a/examples/l2fwd-rs/src/main.rs +++ b/examples/l2fwd-rs/src/main.rs @@ -61,7 +61,7 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, monitor) = flash::connect(&cli.flash_config)?; let stop = Arc::new(AtomicBool::new(true)); @@ -74,7 +74,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { )?; #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -97,6 +97,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); move || socket_thread(socket, cli.mac_addr, &stop) diff --git a/examples/maglev-rs/src/cli.rs b/examples/maglev-rs/src/cli.rs index 32357d4..c5c66bc 100644 --- a/examples/maglev-rs/src/cli.rs +++ b/examples/maglev-rs/src/cli.rs @@ -13,13 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -38,10 +33,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/maglev-rs/src/main.rs b/examples/maglev-rs/src/main.rs index b7b4014..c8e1e28 100644 --- a/examples/maglev-rs/src/main.rs +++ b/examples/maglev-rs/src/main.rs @@ -64,7 +64,7 @@ fn socket_thread( } } -fn run(mut cli: Cli) -> Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; let mut next_ip_addr = monitor.get_next_ip_addr()?; @@ -101,7 +101,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { )?; #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -124,6 +124,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); let maglev = maglev.clone(); diff --git a/examples/simplefwd-rs/src/cli.rs b/examples/simplefwd-rs/src/cli.rs index 581183a..9c8b2c4 100644 --- a/examples/simplefwd-rs/src/cli.rs +++ b/examples/simplefwd-rs/src/cli.rs @@ -11,13 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = CpuRange::default(), - help = "CPU core range for socket threads" - )] - pub cpu_range: CpuRange, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -33,10 +28,9 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = CpuRange::default(), help = "CPU core index for stats thread" )] - pub cpu: CpuRange, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, diff --git a/examples/simplefwd-rs/src/main.rs b/examples/simplefwd-rs/src/main.rs index a1ef700..e79bbcd 100644 --- a/examples/simplefwd-rs/src/main.rs +++ b/examples/simplefwd-rs/src/main.rs @@ -51,7 +51,7 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc Result<(), AppError> { +fn run(cli: Cli) -> Result<(), AppError> { let (sockets, monitor) = flash::connect(&cli.flash_config)?; let stop = Arc::new(AtomicBool::new(true)); @@ -64,7 +64,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { )?; #[cfg(feature = "stats")] - let stats_thread = cli.stats.cpu.spawn(move || { + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { if let Err(err) = tui.run() { eprintln!("error dumping stats: {err}"); } @@ -87,6 +87,7 @@ fn run(mut cli: Cli) -> Result<(), AppError> { let socket_threads = cli .cpu_range + .unwrap_or_default() .spawn_multiple(sockets.into_iter().map(|socket| { let stop = stop.clone(); move || socket_thread(socket, cli.mac_addr, &stop) diff --git a/lib/utils-rs/src/cpu.rs b/lib/utils-rs/src/cpu.rs index 47f828a..ec14f7c 100644 --- a/lib/utils-rs/src/cpu.rs +++ b/lib/utils-rs/src/cpu.rs @@ -153,10 +153,6 @@ impl str::FromStr for CpuRange { impl fmt::Display for CpuRange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.cores.is_empty() { - return write!(f, "none"); - } - let mut parts = Vec::new(); let mut i = 0;