diff --git a/Cargo.toml b/Cargo.toml index 1035a6f..f6af436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "lib/flash-rs", + "lib/utils-rs", "examples/arpresolver-rs", "examples/firewall-rs", "examples/helloworld-rs", diff --git a/examples/arpresolver-rs/Cargo.toml b/examples/arpresolver-rs/Cargo.toml index fd6fd84..77f1b76 100644 --- a/examples/arpresolver-rs/Cargo.toml +++ b/examples/arpresolver-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/arpresolver-rs/src/cli.rs b/examples/arpresolver-rs/src/cli.rs index ff946a2..7b1729b 100644 --- a/examples/arpresolver-rs/src/cli.rs +++ b/examples/arpresolver-rs/src/cli.rs @@ -1,9 +1,7 @@ -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short = 'M', long, help = "NF MAC address")] pub nf_mac: MacAddr6, @@ -46,14 +31,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/arpresolver-rs/src/error.rs b/examples/arpresolver-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/arpresolver-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/arpresolver-rs/src/main.rs b/examples/arpresolver-rs/src/main.rs index fa8821a..25a41de 100644 --- a/examples/arpresolver-rs/src/main.rs +++ b/examples/arpresolver-rs/src/main.rs @@ -1,13 +1,14 @@ mod cli; +mod error; mod nf; use std::{ net::Ipv4Addr, + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -17,16 +18,16 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; fn socket_thread( mut socket: Socket, nf_mac: MacAddr6, nf_ip: Ipv4Addr, mac_addr: Option, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -60,108 +61,77 @@ fn socket_thread( } } -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); - - let (sockets, route) = match flash::connect(&cli.flash_config) { - Ok(t) => t, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; + let nf_ip_addr = monitor.get_nf_ip_addr()?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.nf_mac, route.ip_addr, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.nf_mac, nf_ip_addr, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/firewall-rs/Cargo.toml b/examples/firewall-rs/Cargo.toml index db462fb..c3b7ef1 100644 --- a/examples/firewall-rs/Cargo.toml +++ b/examples/firewall-rs/Cargo.toml @@ -5,14 +5,15 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" csv = "1.3.1" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" serde = { version = "1.0.219", features = ["derive"] } +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/firewall-rs/src/cli.rs b/examples/firewall-rs/src/cli.rs index 3c3448a..bb73aae 100644 --- a/examples/firewall-rs/src/cli.rs +++ b/examples/firewall-rs/src/cli.rs @@ -1,11 +1,9 @@ use std::path::PathBuf; -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -15,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short, long, help = "Path to denylist csv file")] pub denylist: PathBuf, @@ -48,14 +33,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/firewall-rs/src/error.rs b/examples/firewall-rs/src/error.rs new file mode 100644 index 0000000..120410b --- /dev/null +++ b/examples/firewall-rs/src/error.rs @@ -0,0 +1,13 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + Csv(#[from] csv::Error), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/firewall-rs/src/main.rs b/examples/firewall-rs/src/main.rs index c042e5a..c368295 100644 --- a/examples/firewall-rs/src/main.rs +++ b/examples/firewall-rs/src/main.rs @@ -1,12 +1,13 @@ mod cli; +mod error; mod nf; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -16,15 +17,15 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::{cli::Cli, nf::Firewall}; +use crate::{cli::Cli, error::AppError, nf::Firewall}; fn socket_thread( mut socket: Socket, firewall: &Arc, mac_addr: Option, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -55,118 +56,80 @@ fn socket_thread( } } -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); - - let sockets = match flash::connect(&cli.flash_config) { - Ok((sockets, _)) => sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; + Some(stop.clone()), + )?; - let firewall = match Firewall::new(cli.denylist) { - Ok(firewall) => Arc::new(firewall), - Err(err) => { - eprintln!("error loading firewall rules: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + let firewall = Arc::new(Firewall::new(cli.denylist)?); #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; + + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); + }) + }; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); let firewall = firewall.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, &firewall, cli.mac_addr, &r); - }) - }) - .collect::>(); + move || socket_thread(socket, &firewall, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/helloworld-rs/src/main.rs b/examples/helloworld-rs/src/main.rs index 3855497..5c482cb 100644 --- a/examples/helloworld-rs/src/main.rs +++ b/examples/helloworld-rs/src/main.rs @@ -7,20 +7,7 @@ fn main() { let flash_config = FlashConfig::parse(); - let sockets = match flash::connect(&flash_config) { - Ok((sockets, _)) => sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - #[cfg(feature = "tracing")] - return; + if let Err(err) = flash::connect(&flash_config) { + eprintln!("{err}"); } - - #[cfg(feature = "tracing")] - tracing::info!("Sockets: {sockets:?}"); } diff --git a/examples/ip4ping-rs/Cargo.toml b/examples/ip4ping-rs/Cargo.toml index 0fd97e9..37475eb 100644 --- a/examples/ip4ping-rs/Cargo.toml +++ b/examples/ip4ping-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/ip4ping-rs/src/cli.rs b/examples/ip4ping-rs/src/cli.rs index cfc783e..5249f19 100644 --- a/examples/ip4ping-rs/src/cli.rs +++ b/examples/ip4ping-rs/src/cli.rs @@ -1,9 +1,7 @@ -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[arg(short = 'm', long, help = "Dest MAC address")] pub mac_addr: Option, @@ -43,14 +28,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/ip4ping-rs/src/error.rs b/examples/ip4ping-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/ip4ping-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/ip4ping-rs/src/main.rs b/examples/ip4ping-rs/src/main.rs index 0784743..cec4569 100644 --- a/examples/ip4ping-rs/src/main.rs +++ b/examples/ip4ping-rs/src/main.rs @@ -1,12 +1,13 @@ mod cli; +mod error; mod nf; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -16,10 +17,10 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -53,108 +54,76 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/l2fwd-rs/Cargo.toml b/examples/l2fwd-rs/Cargo.toml index 8412719..5c30211 100644 --- a/examples/l2fwd-rs/Cargo.toml +++ b/examples/l2fwd-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/l2fwd-rs/src/cli.rs b/examples/l2fwd-rs/src/cli.rs index e5de218..9c8b2c4 100644 --- a/examples/l2fwd-rs/src/cli.rs +++ b/examples/l2fwd-rs/src/cli.rs @@ -1,9 +1,7 @@ -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -43,14 +28,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/l2fwd-rs/src/error.rs b/examples/l2fwd-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/l2fwd-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/l2fwd-rs/src/main.rs b/examples/l2fwd-rs/src/main.rs index 9a1a138..911a08b 100644 --- a/examples/l2fwd-rs/src/main.rs +++ b/examples/l2fwd-rs/src/main.rs @@ -1,11 +1,12 @@ mod cli; +mod error; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -15,7 +16,7 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; #[forbid(clippy::indexing_slicing)] #[inline] @@ -32,8 +33,8 @@ fn mac_swap(pkt: &mut [u8; 14], mac_addr: Option) { } } -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -60,108 +61,76 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/maglev-rs/Cargo.toml b/examples/maglev-rs/Cargo.toml index 0934e0b..d2c0519 100644 --- a/examples/maglev-rs/Cargo.toml +++ b/examples/maglev-rs/Cargo.toml @@ -5,14 +5,15 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } fnv = "1.0.7" macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } twox-hash = "2.1.0" +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/maglev-rs/src/cli.rs b/examples/maglev-rs/src/cli.rs index 5f77a23..c5c66bc 100644 --- a/examples/maglev-rs/src/cli.rs +++ b/examples/maglev-rs/src/cli.rs @@ -1,11 +1,9 @@ use std::net::Ipv4Addr; -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -15,21 +13,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -48,14 +33,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/maglev-rs/src/error.rs b/examples/maglev-rs/src/error.rs new file mode 100644 index 0000000..083eaa9 --- /dev/null +++ b/examples/maglev-rs/src/error.rs @@ -0,0 +1,23 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), + + #[error("app error: empty route and no fallback IP")] + EmptyRoute, + + #[error( + "app error: no of next NF MACs ({mac_count}) does not match no of next NFs ({ip_count})" + )] + MacIpMismatch { + mac_count: usize, + ip_count: usize, + }, +} diff --git a/examples/maglev-rs/src/main.rs b/examples/maglev-rs/src/main.rs index 1ae88f3..c8e1e28 100644 --- a/examples/maglev-rs/src/main.rs +++ b/examples/maglev-rs/src/main.rs @@ -1,15 +1,16 @@ mod cli; +mod error; mod maglev; mod nf; use std::{ hash::BuildHasher, net::Ipv4Addr, + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -20,7 +21,7 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::{cli::Cli, maglev::Maglev}; +use crate::{cli::Cli, error::AppError, maglev::Maglev}; const MAGLEV_TABLE_SIZE: usize = 65537; @@ -29,9 +30,9 @@ fn socket_thread( maglev: &Arc>, next_ip: &Arc>, next_mac: &Arc>, - run: &Arc, + stop: &Arc, ) { - while run.load(Ordering::SeqCst) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -63,137 +64,104 @@ fn socket_thread( } } -#[allow(clippy::too_many_lines)] -fn main() { - #[cfg(feature = "tracing")] - tracing_subscriber::fmt::init(); - - let cli = Cli::parse(); - - let (sockets, route) = match flash::connect(&cli.flash_config) { - Ok(t) => t, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Sockets: {:?}", sockets); - - #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( - sockets.iter().map(Socket::stats), - cli.stats.fps, - cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, mut monitor) = flash::connect(&cli.flash_config)?; - let next_ip = if route.next.is_empty() { + let mut next_ip_addr = monitor.get_next_ip_addr()?; + if next_ip_addr.is_empty() { if let Some(fb_ip) = cli.fallback_ip { - vec![fb_ip] + next_ip_addr.push(fb_ip); } else { - eprintln!("empty route and no fallback IP configured"); - return; + return Err(AppError::EmptyRoute); } - } else { - route.next - }; + } - if cli.next_mac.len() > 1 && cli.next_mac.len() != next_ip.len() { - eprintln!( - "number of next NF MACs ({}) does not match number of next NFs ({})", - cli.next_mac.len(), - next_ip.len() - ); - return; + if cli.next_mac.len() > 1 && cli.next_mac.len() != next_ip_addr.len() { + return Err(AppError::MacIpMismatch { + mac_count: cli.next_mac.len(), + ip_count: next_ip_addr.len(), + }); } - let maglev = Arc::new(Maglev::::new(&next_ip, MAGLEV_TABLE_SIZE)); - let next_ip = Arc::new(next_ip); + let maglev = Arc::new(Maglev::::new( + &next_ip_addr, + MAGLEV_TABLE_SIZE, + )); + let next_ip = Arc::new(next_ip_addr); let next_mac = Arc::new(cli.next_mac); - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; + let mut tui = StatsDashboard::new( + sockets.iter().map(Socket::stats), + cli.stats.fps, + cli.stats.layout, + Some(stop.clone()), + )?; - let run = Arc::new(AtomicBool::new(true)); + #[cfg(feature = "stats")] + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; + + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); + }) + }; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); let maglev = maglev.clone(); let next_ip = next_ip.clone(); let next_mac = next_mac.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, &maglev, &next_ip, &next_mac, &r); - }) - }) - .collect::>(); + move || socket_thread(socket, &maglev, &next_ip, &next_mac, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/examples/simplefwd-rs/Cargo.toml b/examples/simplefwd-rs/Cargo.toml index a3146a9..5a52ff7 100644 --- a/examples/simplefwd-rs/Cargo.toml +++ b/examples/simplefwd-rs/Cargo.toml @@ -5,12 +5,13 @@ edition = "2024" [dependencies] clap = { version = "4.5.35", features = ["derive"] } -core_affinity = "0.8.3" ctrlc = { version = "3.4.5", optional = true } flash = { path = "../../lib/flash-rs", features = ["clap"] } macaddr = "1.0.1" +thiserror = "2.0.17" tracing = { version = "0.1.41", optional = true } tracing-subscriber = { version = "0.3.19", optional = true } +utils = { path = "../../lib/utils-rs" } [features] default = ["dep:ctrlc"] diff --git a/examples/simplefwd-rs/src/cli.rs b/examples/simplefwd-rs/src/cli.rs index e5de218..9c8b2c4 100644 --- a/examples/simplefwd-rs/src/cli.rs +++ b/examples/simplefwd-rs/src/cli.rs @@ -1,9 +1,7 @@ -#[cfg(feature = "stats")] -use std::str::FromStr as _; - use clap::Parser; use flash::FlashConfig; use macaddr::MacAddr6; +use utils::CpuRange; #[cfg(feature = "stats")] use flash::tui::GridLayout; @@ -13,21 +11,8 @@ pub struct Cli { #[command(flatten)] pub flash_config: FlashConfig, - #[arg( - short = 'c', - long, - default_value_t = 0, - help = "Starting CPU core index for socket threads" - )] - pub cpu_start: usize, - - #[arg( - short = 'e', - long, - default_value_t = 0, - help = "Ending CPU core index for socket threads (inclusive)" - )] - pub cpu_end: usize, + #[arg(short = 'c', long, help = "CPU core range for socket threads")] + pub cpu_range: Option, #[cfg(feature = "stats")] #[command(flatten)] @@ -43,14 +28,18 @@ pub struct StatsConfig { #[arg( short = 's', long = "stats-cpu", - default_value_t = 1, help = "CPU core index for stats thread" )] - pub cpu: usize, + pub cpu: Option, #[arg(short = 'F', long, default_value_t = 1, help = "Tui frames per second")] pub fps: u64, - #[arg(short = 'l', long, default_value_t = GridLayout::default(), value_parser = GridLayout::from_str, help = "Tui layout")] + #[arg( + short = 'l', + long, + default_value_t = GridLayout::default(), + help = "Tui layout" + )] pub layout: GridLayout, } diff --git a/examples/simplefwd-rs/src/error.rs b/examples/simplefwd-rs/src/error.rs new file mode 100644 index 0000000..d42c20a --- /dev/null +++ b/examples/simplefwd-rs/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +#[error("app error: {0}")] +pub enum AppError { + Flash(#[from] flash::FlashError), + + #[cfg(feature = "stats")] + Tui(#[from] flash::tui::TuiError), + + #[cfg(not(feature = "stats"))] + #[error("app error: error setting Ctrl-C handler: {0}")] + Ctrl(#[from] ctrlc::Error), +} diff --git a/examples/simplefwd-rs/src/main.rs b/examples/simplefwd-rs/src/main.rs index 9ea3956..e79bbcd 100644 --- a/examples/simplefwd-rs/src/main.rs +++ b/examples/simplefwd-rs/src/main.rs @@ -1,11 +1,12 @@ mod cli; +mod error; use std::{ + process::ExitCode, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, - thread, }; use clap::Parser; @@ -15,10 +16,10 @@ use macaddr::MacAddr6; #[cfg(feature = "stats")] use flash::tui::StatsDashboard; -use crate::cli::Cli; +use crate::{cli::Cli, error::AppError}; -fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc) { - while run.load(Ordering::SeqCst) { +fn socket_thread(mut socket: Socket, mac_addr: Option, stop: &Arc) { + while !stop.load(Ordering::Relaxed) { if !socket.poll().is_ok_and(|val| val) { continue; } @@ -50,108 +51,76 @@ fn socket_thread(mut socket: Socket, mac_addr: Option, run: &Arc sockets, - Err(err) => { - eprintln!("{err}"); - return; - } - }; - - if sockets.is_empty() { - eprintln!("no sockets received"); - return; - } - - #[cfg(feature = "tracing")] - tracing::info!("Sockets: {sockets:?}"); +fn run(cli: Cli) -> Result<(), AppError> { + let (sockets, monitor) = flash::connect(&cli.flash_config)?; + let stop = Arc::new(AtomicBool::new(true)); #[cfg(feature = "stats")] - let mut tui = match StatsDashboard::new( + let mut tui = StatsDashboard::new( sockets.iter().map(Socket::stats), cli.stats.fps, cli.stats.layout, - ) { - Ok(t) => t, - Err(err) => { - eprintln!("error creating tui: {err}"); - return; - } - }; - - let cores = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .filter(|core_id| core_id.id >= cli.cpu_start && core_id.id <= cli.cpu_end) - .collect::>(); - - if cores.is_empty() { - eprintln!("no cores found in range {}-{}", cli.cpu_start, cli.cpu_end); - return; - } - - #[cfg(feature = "tracing")] - tracing::debug!("Cores: {:?}", cores); + Some(stop.clone()), + )?; #[cfg(feature = "stats")] - let Some(stats_core) = core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .find(|core_id| core_id.id == cli.stats.cpu) - else { - eprintln!("no core found for stats thread {}", cli.stats.cpu); - return; - }; - - let run = Arc::new(AtomicBool::new(true)); + let stats_thread = cli.stats.cpu.unwrap_or_default().spawn(move || { + if let Err(err) = tui.run() { + eprintln!("error dumping stats: {err}"); + } + }); #[cfg(not(feature = "stats"))] - if let Err(err) = { - let r = run.clone(); + { + let stop = stop.clone(); ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); }) - } { - eprintln!("error setting Ctrl-C handler: {err}"); - return; - } + }?; - let handles = sockets - .into_iter() - .zip(cores.into_iter().cycle()) - .map(|(socket, core_id)| { - let r = run.clone(); - thread::spawn(move || { - core_affinity::set_for_current(core_id); - socket_thread(socket, cli.mac_addr, &r); - }) + let _ = { + let stop = stop.clone(); + monitor.spawn_disconnect_handler(move || { + stop.store(true, Ordering::Release); }) - .collect::>(); + }; + + let socket_threads = cli + .cpu_range + .unwrap_or_default() + .spawn_multiple(sockets.into_iter().map(|socket| { + let stop = stop.clone(); + move || socket_thread(socket, cli.mac_addr, &stop) + })); #[cfg(feature = "stats")] - if let Err(err) = thread::spawn(move || { - core_affinity::set_for_current(stats_core); - if let Err(err) = tui.run() { - eprintln!("error dumping stats: {err}"); - } - }) - .join() - { + if let Err(err) = stats_thread.join() { eprintln!("error in stats thread: {err:?}"); } #[cfg(feature = "stats")] - run.store(false, Ordering::SeqCst); + stop.store(true, Ordering::Release); - for handle in handles { + for handle in socket_threads { if let Err(err) = handle.join() { - eprintln!("error in thread: {err:?}"); + eprintln!("error in socket thread: {err:?}"); + } + } + + Ok(()) +} + +fn main() -> ExitCode { + #[cfg(feature = "tracing")] + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + + match run(cli) { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE } } } diff --git a/lib/flash-rs/src/client.rs b/lib/flash-rs/src/client.rs index 183cd23..c6863a7 100644 --- a/lib/flash-rs/src/client.rs +++ b/lib/flash-rs/src/client.rs @@ -1,10 +1,11 @@ -use std::{net::Ipv4Addr, str::FromStr, sync::Arc}; +use std::sync::Arc; use crate::{ config::{BindFlags, FlashConfig, Mode, PollConfig, SocketConfig, XskConfig}, error::FlashResult, fd::SocketFd, mem::{PollOutStatus, Umem}, + monitor::Monitor, uds::UdsClient, xsk::Socket, }; @@ -12,14 +13,8 @@ use crate::{ #[cfg(feature = "stats")] use crate::{config::XdpFlags, stats::Stats}; -#[derive(Debug)] -pub struct Route { - pub ip_addr: Ipv4Addr, - pub next: Vec, -} - #[allow(clippy::missing_errors_doc, clippy::too_many_lines)] -pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { +pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Monitor)> { let mut uds_client = UdsClient::new()?; let (umem_fd, total_sockets, umem_size, umem_scale) = @@ -91,17 +86,6 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { #[cfg(all(feature = "stats", feature = "tracing"))] tracing::debug!("Ifname: {ifname}"); - let route = Route { - ip_addr: Ipv4Addr::from_str(&uds_client.get_ip_addr()?)?, - next: uds_client - .get_dst_ip_addr()? - .iter() - .map(|y| Ipv4Addr::from_str(y)) - .collect::, _>>()?, - }; - - uds_client.set_nonblocking()?; - let xsk_config = XskConfig::new(bind_flags, mode); let next_size = uds_client.get_route_info()?; @@ -120,13 +104,9 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { let prev_nf = uds_client.get_prev_nf()?; let pollout_status = PollOutStatus::new(pollout_fd, pollout_size, config.nf_id, prev_nf)?; + let socket_config = Arc::new(SocketConfig::new(xsk_config, poll_config, pollout_status)); - let socket_config = Arc::new(SocketConfig::new( - xsk_config, - poll_config, - pollout_status, - uds_client, - )); + let uds_client = Arc::new(uds_client); let sockets = socket_info .into_iter() @@ -146,9 +126,10 @@ pub fn connect(config: &FlashConfig) -> FlashResult<(Vec, Route)> { #[cfg(feature = "stats")] Stats::new(fd, ifname.clone(), ifqueue, xdp_flags.clone()), socket_config.clone(), + uds_client.clone(), ) }) - .collect::, _>>()?; + .collect::>()?; - Ok((sockets, route)) + Ok((sockets, Monitor::new(uds_client))) } diff --git a/lib/flash-rs/src/config/socket.rs b/lib/flash-rs/src/config/socket.rs index e65889f..c0bf004 100644 --- a/lib/flash-rs/src/config/socket.rs +++ b/lib/flash-rs/src/config/socket.rs @@ -1,4 +1,4 @@ -use crate::{mem::PollOutStatus, uds::UdsClient}; +use crate::mem::PollOutStatus; use super::{poll::PollConfig, xsk::XskConfig}; @@ -7,21 +7,14 @@ pub(crate) struct SocketConfig { pub(crate) xsk: XskConfig, pub(crate) poll: PollConfig, pub(crate) pollout_status: PollOutStatus, - _uds_client: UdsClient, } impl SocketConfig { - pub(crate) fn new( - xsk: XskConfig, - poll: PollConfig, - pollout_status: PollOutStatus, - uds_client: UdsClient, - ) -> Self { + pub(crate) fn new(xsk: XskConfig, poll: PollConfig, pollout_status: PollOutStatus) -> Self { Self { xsk, poll, pollout_status, - _uds_client: uds_client, } } } diff --git a/lib/flash-rs/src/lib.rs b/lib/flash-rs/src/lib.rs index 4ac2bda..da89740 100644 --- a/lib/flash-rs/src/lib.rs +++ b/lib/flash-rs/src/lib.rs @@ -3,6 +3,7 @@ mod config; mod error; mod fd; mod mem; +mod monitor; mod uds; mod util; mod xsk; @@ -14,8 +15,5 @@ pub mod stats; pub mod tui; pub use crate::{ - client::{Route, connect}, - config::FlashConfig, - error::FlashError, - xsk::Socket, + client::connect, config::FlashConfig, error::FlashError, monitor::Monitor, xsk::Socket, }; diff --git a/lib/flash-rs/src/monitor.rs b/lib/flash-rs/src/monitor.rs new file mode 100644 index 0000000..c6a0a70 --- /dev/null +++ b/lib/flash-rs/src/monitor.rs @@ -0,0 +1,62 @@ +use std::{ + net::Ipv4Addr, + str::FromStr as _, + sync::Arc, + thread::{self, JoinHandle}, +}; + +use libc::{POLLERR, POLLHUP, POLLRDHUP, pollfd}; + +use crate::{error::FlashResult, uds::UdsClient}; + +pub struct Monitor { + uds_client: Arc, +} + +impl Monitor { + pub(crate) fn new(uds_client: Arc) -> Self { + Self { uds_client } + } + + #[allow(clippy::mut_from_ref)] + fn get_mut_client(&self) -> &mut UdsClient { + unsafe { &mut *Arc::as_ptr(&self.uds_client).cast_mut() } + } + + #[allow(clippy::missing_errors_doc)] + pub fn get_nf_ip_addr(&mut self) -> FlashResult { + Ok(Ipv4Addr::from_str(&self.get_mut_client().get_ip_addr()?)?) + } + + #[allow(clippy::missing_errors_doc)] + pub fn get_next_ip_addr(&mut self) -> FlashResult> { + Ok(self + .get_mut_client() + .get_dst_ip_addr()? + .iter() + .map(|y| Ipv4Addr::from_str(y)) + .collect::, _>>()?) + } + + pub fn spawn_disconnect_handler(&self, handler: F) -> JoinHandle<()> + where + F: FnOnce() + Send + 'static, + { + let fd = self.uds_client.get_conn_raw_fd(); + thread::spawn(move || unsafe { + let events = POLLHUP | POLLERR | POLLRDHUP; + let mut pollfd = pollfd { + fd, + events, + revents: 0, + }; + + loop { + if libc::poll(&raw mut pollfd, 1, -1) > 0 && (pollfd.revents & events) != 0 { + handler(); + break; + } + } + }) + } +} diff --git a/lib/flash-rs/src/tui/dashboard.rs b/lib/flash-rs/src/tui/dashboard.rs index 163bccf..7c3a8bc 100644 --- a/lib/flash-rs/src/tui/dashboard.rs +++ b/lib/flash-rs/src/tui/dashboard.rs @@ -1,6 +1,9 @@ use std::{ io, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; @@ -8,7 +11,7 @@ use ratatui::{ Terminal, crossterm::{ ExecutableCommand, - event::{self, Event, KeyCode, KeyEvent, KeyEventKind}, + event::{self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers}, terminal::{self, EnterAlternateScreen, LeaveAlternateScreen}, }, prelude::CrosstermBackend, @@ -29,6 +32,7 @@ pub struct StatsDashboard { terminal: Terminal>, panels: Vec, layout_cache: LayoutCache, + stop: Option>, } impl StatsDashboard { @@ -37,6 +41,7 @@ impl StatsDashboard { stats: impl Iterator>, fps: u64, layout: GridLayout, + stop: Option>, ) -> TuiResult { let panels = stats .enumerate() @@ -56,6 +61,7 @@ impl StatsDashboard { terminal, panels, layout_cache: LayoutCache::new(layout, num_panels), + stop, }) } @@ -80,33 +86,47 @@ impl StatsDashboard { fn poll_until_next_frame(&mut self) -> io::Result { let next_frame_time = self.last_frame_time + self.frame_interval; - loop { + let flag = loop { + if let Some(stop) = &self.stop + && stop.load(Ordering::Relaxed) + { + break true; + } + let now = Instant::now(); if now >= next_frame_time { - break; + break false; } if event::poll(next_frame_time - now)? { match event::read()? { Event::Key(KeyEvent { code, + modifiers, kind: KeyEventKind::Press, .. }) => match code { - KeyCode::Char('q' | 'Q') | KeyCode::Esc => return Ok(true), - KeyCode::Char('r' | 'R') => return Ok(false), + KeyCode::Char('q' | 'Q') | KeyCode::Esc => break true, + KeyCode::Char('c' | 'C') if modifiers.contains(KeyModifiers::CONTROL) => { + break true; + } + KeyCode::Char('r' | 'R') => break false, _ => {} }, Event::Resize(width, height) => { self.resize_panels((width, height)); - return Ok(false); + break false; } _ => {} } } - } + }; - Ok(false) + Ok(flag + || self + .stop + .as_ref() + .is_some_and(|r| r.load(Ordering::Relaxed))) } #[allow(clippy::missing_errors_doc)] diff --git a/lib/flash-rs/src/tui/layout_str.rs b/lib/flash-rs/src/tui/layout_str.rs index 00dc968..2977e70 100644 --- a/lib/flash-rs/src/tui/layout_str.rs +++ b/lib/flash-rs/src/tui/layout_str.rs @@ -1,4 +1,4 @@ -use std::{fmt, str::FromStr}; +use std::{fmt, str}; use super::layout::GridLayout; @@ -14,7 +14,7 @@ impl GridLayoutParseError { } } -impl FromStr for GridLayout { +impl str::FromStr for GridLayout { type Err = GridLayoutParseError; fn from_str(s: &str) -> Result { diff --git a/lib/flash-rs/src/uds/client.rs b/lib/flash-rs/src/uds/client.rs index b0cb728..101bd40 100644 --- a/lib/flash-rs/src/uds/client.rs +++ b/lib/flash-rs/src/uds/client.rs @@ -1,3 +1,5 @@ +use std::os::fd::{AsRawFd as _, RawFd}; + use crate::util; use super::{ @@ -14,7 +16,7 @@ use super::{ const FLASH_UNIX_SOCKET_PATH: &str = "/tmp/flash/uds.sock"; #[derive(Debug)] -pub(crate) struct UdsClient { +pub struct UdsClient { conn: UdsConn, } @@ -26,6 +28,10 @@ impl UdsClient { }) } + pub(crate) fn get_conn_raw_fd(&self) -> RawFd { + self.conn.as_raw_fd() + } + #[allow( clippy::cast_possible_truncation, clippy::cast_possible_wrap, @@ -201,22 +207,15 @@ impl UdsClient { Ok(prev_nf_ids) } - - pub(crate) fn set_nonblocking(&mut self) -> UdsResult<()> { - Ok(self.conn.set_nonblocking(true)?) - } } impl Drop for UdsClient { fn drop(&mut self) { - #[cfg(feature = "tracing")] if let Err(err) = self.conn.write_all(&FLASH_CLOSE_CONN) { + #[cfg(feature = "tracing")] tracing::error!("error closing flash connection: {err}"); - } else { - tracing::debug!("Sent FLASH_CLOSE_CONN: {FLASH_CLOSE_CONN:?}"); - } - if let Err(err) = self.conn.write_all(&FLASH_CLOSE_CONN) { + #[cfg(not(feature = "tracing"))] eprintln!("error closing flash connection: {err}"); } } diff --git a/lib/flash-rs/src/uds/conn.rs b/lib/flash-rs/src/uds/conn.rs index 888e745..7b74149 100644 --- a/lib/flash-rs/src/uds/conn.rs +++ b/lib/flash-rs/src/uds/conn.rs @@ -1,6 +1,9 @@ use std::{ io::{self, Read as _, Write as _}, - os::unix::net::UnixStream, + os::{ + fd::{AsRawFd, RawFd}, + unix::net::UnixStream, + }, path::Path, }; @@ -60,9 +63,11 @@ impl UdsConn { .trim_end_matches('\0') .to_string()) } +} +impl AsRawFd for UdsConn { #[inline] - pub(super) fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { - self.0.set_nonblocking(nonblocking) + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() } } diff --git a/lib/flash-rs/src/uds/mod.rs b/lib/flash-rs/src/uds/mod.rs index 05915dc..7cb4170 100644 --- a/lib/flash-rs/src/uds/mod.rs +++ b/lib/flash-rs/src/uds/mod.rs @@ -3,6 +3,4 @@ mod conn; mod def; mod error; -pub(crate) use client::UdsClient; - -pub use error::UdsError; +pub use {client::UdsClient, error::UdsError}; diff --git a/lib/flash-rs/src/xsk/socket.rs b/lib/flash-rs/src/xsk/socket.rs index 0034321..b23f3c2 100644 --- a/lib/flash-rs/src/xsk/socket.rs +++ b/lib/flash-rs/src/xsk/socket.rs @@ -6,6 +6,7 @@ use crate::{ config::{BindFlags, Mode, PollMode, SocketConfig}, fd::SocketFd, mem::{CompRing, Cons as _, Desc, FillRing, Prod as _, RxRing, TxRing, Umem}, + uds::UdsClient, }; #[cfg(feature = "pool")] @@ -31,13 +32,16 @@ pub struct Socket { outstanding_tx: u32, clock: Clock, idle_timestamp: Option, + config: Arc, + _uds_guard: Arc, #[cfg(feature = "stats")] stats: Arc, } impl Socket { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( fd: SocketFd, umem: Umem, @@ -46,6 +50,7 @@ impl Socket { umem_offset: u64, #[cfg(feature = "stats")] stats: Stats, config: Arc, + uds_guard: Arc, ) -> SocketResult { let off = fd.xdp_mmap_offsets()?; @@ -73,7 +78,9 @@ impl Socket { outstanding_tx: 0, clock: Clock::new(), idle_timestamp: None, + config, + _uds_guard: uds_guard, #[cfg(feature = "stats")] stats: Arc::new(stats), @@ -266,14 +273,12 @@ impl Socket { } else { self.complete_tx_rx(); - if self.shared.xsk_config.mode.contains(Mode::FLASH_BUSY_POLL) - || self.tx.needs_wakeup() - { + if self.config.xsk.mode.contains(Mode::FLASH_BUSY_POLL) || self.tx.needs_wakeup() { #[cfg(feature = "stats")] unsafe { (*self.stats.app.get()).tx_wakeup_sendtos += 1; } - let _ = self.fd.kick(); + self.fd.kick_tx(); } } } @@ -320,7 +325,7 @@ impl Socket { } #[cfg(feature = "tracing")] - if rcvd > self.shared.xsk_config.batch_size { + if rcvd > self.config.xsk.batch_size { tracing::warn!("xsk: received more descriptors than batch size"); } @@ -356,14 +361,12 @@ impl Socket { } else { self.complete_tx_rx(); - if self.shared.xsk_config.mode.contains(Mode::FLASH_BUSY_POLL) - || self.tx.needs_wakeup() - { + if self.config.xsk.mode.contains(Mode::FLASH_BUSY_POLL) || self.tx.needs_wakeup() { #[cfg(feature = "stats")] unsafe { (*self.stats.app.get()).tx_wakeup_sendtos += 1; } - let _ = self.fd.kick(); + self.fd.kick_tx(); } } } diff --git a/lib/utils-rs/Cargo.toml b/lib/utils-rs/Cargo.toml new file mode 100644 index 0000000..c67fdf0 --- /dev/null +++ b/lib/utils-rs/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "utils" +version = "0.1.0" +edition = "2024" + +[dependencies] +core_affinity = "0.8.3" +thiserror = "2.0.17" diff --git a/lib/utils-rs/src/cpu.rs b/lib/utils-rs/src/cpu.rs new file mode 100644 index 0000000..ec14f7c --- /dev/null +++ b/lib/utils-rs/src/cpu.rs @@ -0,0 +1,179 @@ +use std::{ + fmt, str, + thread::{self, JoinHandle}, +}; + +use core_affinity::CoreId; + +use crate::error::{UtilError, UtilResult}; + +#[derive(Clone, Debug, Default)] +pub struct CpuRange { + cores: Vec, + curr_idx: usize, +} + +fn get_available_cores() -> UtilResult> { + match core_affinity::get_core_ids() { + Some(cores) if !cores.is_empty() => Ok(cores), + _ => Err(UtilError::NoCpuCores), + } +} + +impl CpuRange { + #[allow(clippy::missing_errors_doc)] + pub fn new(cores: impl IntoIterator) -> UtilResult { + let available_cores = get_available_cores()?; + let cores = cores + .into_iter() + .map(|core| { + available_cores + .iter() + .find(|c| c.id == core) + .copied() + .ok_or(UtilError::CpuCoreNotFound(core)) + }) + .collect::, _>>()?; + + Ok(cores.into()) + } + + #[allow(clippy::missing_errors_doc)] + pub fn all() -> UtilResult { + Ok(get_available_cores()?.into()) + } + + #[allow(clippy::must_use_candidate)] + pub fn len(&self) -> usize { + self.cores.len() + } + + #[allow(clippy::must_use_candidate)] + pub fn is_empty(&self) -> bool { + self.cores.is_empty() + } + + pub fn reset(&mut self) { + self.curr_idx = 0; + } + + pub fn set_affinity_next(&mut self) -> impl FnOnce() -> bool { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + move || core_affinity::set_for_current(core_id) + } + + pub fn spawn(&mut self, f: F) -> JoinHandle<()> + where + F: FnOnce() + Send + 'static, + { + if self.cores.is_empty() { + return thread::spawn(f); + } + + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) + } + + pub fn spawn_multiple(&mut self, funcs: I) -> Vec> + where + F: FnOnce() + Send + 'static, + I: IntoIterator, + { + if self.cores.is_empty() { + return funcs.into_iter().map(|f| thread::spawn(f)).collect(); + } + + funcs + .into_iter() + .map(|f| { + let core_id = self.cores[self.curr_idx]; + self.curr_idx = (self.curr_idx + 1) % self.cores.len(); + + thread::spawn(move || { + core_affinity::set_for_current(core_id); + f(); + }) + }) + .collect() + } +} + +impl From> for CpuRange { + fn from(cores: Vec) -> Self { + Self { cores, curr_idx: 0 } + } +} + +impl str::FromStr for CpuRange { + type Err = UtilError; + + fn from_str(s: &str) -> Result { + let s = s.trim(); + if s.is_empty() { + return Ok(CpuRange::default()); + } + + let available_cores = get_available_cores()?; + let mut cores = Vec::new(); + + for part in s.split(',') { + if let Some((start, end)) = part.split_once('-') + && let Ok(start) = start.trim().parse::() + && let Ok(end) = end.trim().parse::() + && start <= end + { + for core in start..=end { + if let Some(core_id) = available_cores.iter().find(|c| c.id == core) { + cores.push(*core_id); + } else { + return Err(UtilError::CpuCoreNotFound(core)); + } + } + } else if let Ok(core) = part.trim().parse::() { + if let Some(core_id) = available_cores.iter().find(|c| c.id == core) { + cores.push(*core_id); + } else { + return Err(UtilError::CpuCoreNotFound(core)); + } + } else { + return Err(UtilError::InvalidCpuCoreRange(part.to_string())); + } + } + + Ok(cores.into()) + } +} + +impl fmt::Display for CpuRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut parts = Vec::new(); + let mut i = 0; + + while i < self.cores.len() { + let start = self.cores[i].id; + let mut end = start; + + while i + 1 < self.cores.len() && self.cores[i + 1].id == end + 1 { + end = self.cores[i + 1].id; + i += 1; + } + + if start == end { + parts.push(format!("{start}")); + } else { + parts.push(format!("{start}-{end}")); + } + + i += 1; + } + + write!(f, "{}", parts.join(",")) + } +} diff --git a/lib/utils-rs/src/error.rs b/lib/utils-rs/src/error.rs new file mode 100644 index 0000000..d60f8f5 --- /dev/null +++ b/lib/utils-rs/src/error.rs @@ -0,0 +1,14 @@ +pub(crate) type UtilResult = Result; + +#[derive(Debug, thiserror::Error)] +#[error("util error: {0}")] +pub enum UtilError { + #[error("util error: no CPU cores found")] + NoCpuCores, + + #[error("util error: invalid CPU core/range {0}")] + InvalidCpuCoreRange(String), + + #[error("util error: CPU core {0} not found")] + CpuCoreNotFound(usize), +} diff --git a/lib/utils-rs/src/lib.rs b/lib/utils-rs/src/lib.rs new file mode 100644 index 0000000..f45652c --- /dev/null +++ b/lib/utils-rs/src/lib.rs @@ -0,0 +1,4 @@ +mod cpu; +mod error; + +pub use {cpu::CpuRange, error::UtilError};