From 94b492ae10ee086b1f756c7b7f03eaeaee180db2 Mon Sep 17 00:00:00 2001 From: Parvat Raj Singh Date: Tue, 19 May 2026 23:08:34 +0530 Subject: [PATCH] repair: update serve_repair and xdp_sender --- core/src/repair/mod.rs | 1 + core/src/repair/repair_service.rs | 62 +++++++++++++++++++-------- core/src/repair/xdp_sender.rs | 33 ++++++++++++++ core/src/tvu.rs | 4 ++ core/src/validator.rs | 25 +++++++---- core/src/window_service.rs | 3 ++ validator/src/admin_rpc_service.rs | 1 + validator/src/commands/run/execute.rs | 11 ++++- 8 files changed, 112 insertions(+), 28 deletions(-) create mode 100644 core/src/repair/xdp_sender.rs diff --git a/core/src/repair/mod.rs b/core/src/repair/mod.rs index f9456930651..43175df58c7 100644 --- a/core/src/repair/mod.rs +++ b/core/src/repair/mod.rs @@ -16,3 +16,4 @@ pub mod result; pub mod serve_repair; pub mod serve_repair_service; pub(crate) mod standard_repair_handler; +pub(crate) mod xdp_sender; diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index e83885cee68..34853c8e506 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -16,8 +16,10 @@ use { REPAIR_PEERS_CACHE_CAPACITY, RepairPeers, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType, }, + xdp_sender::RepairXdpSender, }, }, + bytes::Bytes, crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}, lazy_lru::LruCache, rand::prelude::IndexedRandom as _, @@ -621,6 +623,7 @@ impl RepairService { repair_info: RepairInfo, outstanding_requests: Arc>, repair_service_channels: RepairServiceChannels, + xdp_sender: Option, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -636,6 +639,7 @@ impl RepairService { repair_service_channels.repair_channels, repair_info, &outstanding_requests, + xdp_sender.as_ref(), ) }) .unwrap() @@ -800,6 +804,7 @@ impl RepairService { repair_info: &RepairInfo, outstanding_requests: &RwLock, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, repair_metrics: &mut RepairMetrics, ) { let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed"); @@ -826,15 +831,23 @@ impl RepairService { let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); if !batch.is_empty() { let num_pkts = batch.len(); - let batch = batch.iter().map(|(bytes, addr)| (bytes, addr)); - match batch_send(repair_socket, batch) { - Ok(()) => (), - Err(SendPktsError::IoError(err, num_failed)) => { - error!( - "{} batch_send failed to send {num_failed}/{num_pkts} packets first error \ - {err:?}", - repair_info.cluster_info.id() - ); + if let Some(xdp) = xdp_sender { + for (i, (bytes, addr)) in batch.into_iter().enumerate() { + if let Err(e) = xdp.try_send(i, addr, Bytes::from(bytes)) { + warn!("repair xdp send failed: {e:?}"); + } + } + } else { + let batch = batch.iter().map(|(bytes, addr)| (bytes, addr)); + match batch_send(repair_socket, batch) { + Ok(()) => (), + Err(SendPktsError::IoError(err, num_failed)) => { + error!( + "{} batch_send failed to send {num_failed}/{num_pkts} packets first \ + error {err:?}", + repair_info.cluster_info.id() + ); + } } } } @@ -851,6 +864,7 @@ impl RepairService { repair_tracker: &mut RepairTracker, outstanding_requests: &RwLock, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, ) { let RepairChannels { verified_voter_slots_receiver, @@ -904,6 +918,7 @@ impl RepairService { repair_info, outstanding_requests, repair_socket, + xdp_sender, repair_metrics, ); } @@ -915,6 +930,7 @@ impl RepairService { repair_channels: RepairChannels, repair_info: RepairInfo, outstanding_requests: &RwLock, + xdp_sender: Option<&RepairXdpSender>, ) { let (sharable_banks, migration_status) = { let bank_forks_r = repair_info.bank_forks.read().unwrap(); @@ -951,6 +967,7 @@ impl RepairService { &mut repair_tracker, outstanding_requests, repair_socket, + xdp_sender, ); repair_tracker.repair_metrics.maybe_report(); sleep(Duration::from_millis(REPAIR_MS)); @@ -1091,6 +1108,7 @@ impl RepairService { slot: u64, shred_index: u64, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, outstanding_repair_requests: Arc>, ) { let mut repair_peers = vec![]; @@ -1123,6 +1141,7 @@ impl RepairService { slot, shred_index, repair_socket, + xdp_sender, outstanding_repair_requests.clone(), ); } @@ -1135,6 +1154,7 @@ impl RepairService { slot: u64, shred_index: u64, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, outstanding_repair_requests: Arc>, ) { // Setup repair request @@ -1156,16 +1176,21 @@ impl RepairService { let packet_buf = ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap(); - // Prepare packet batch to send - let reqs = [(&packet_buf, address)]; - - // Send packet batch - match batch_send(repair_socket, reqs) { - Ok(()) => { - debug!("successfully sent repair request to {pubkey} / {address}!"); + if let Some(xdp) = xdp_sender { + if let Err(e) = xdp.try_send(0, address, Bytes::from(packet_buf)) { + warn!("repair xdp send to {pubkey} ({address}) failed: {e:?}"); + } else { + debug!("successfully sent repair request via XDP to {pubkey} / {address}!"); } - Err(SendPktsError::IoError(err, _num_failed)) => { - error!("batch_send failed to send packet - error = {err:?}"); + } else { + let reqs = [(&packet_buf, address)]; + match batch_send(repair_socket, reqs) { + Ok(()) => { + debug!("successfully sent repair request to {pubkey} / {address}!"); + } + Err(SendPktsError::IoError(err, _num_failed)) => { + error!("batch_send failed to send packet - error = {err:?}"); + } } } } @@ -1433,6 +1458,7 @@ mod test { slot, shred_index, &sender, + None, outstanding_repair_requests, ); diff --git a/core/src/repair/xdp_sender.rs b/core/src/repair/xdp_sender.rs new file mode 100644 index 00000000000..a5787719203 --- /dev/null +++ b/core/src/repair/xdp_sender.rs @@ -0,0 +1,33 @@ +use { + agave_xdp::transmitter as tx, bytes::Bytes, crossbeam_channel::TrySendError, + std::net::SocketAddrV4, +}; + +/// Convenience wrapper around [`tx::XdpSender`] for the repair path. +/// +/// Like turbine, repair always sends from a fixed source address, so we store +/// it once and attach it to every packet automatically. +#[derive(Clone)] +pub struct RepairXdpSender { + sender: tx::XdpSender, + src_addr: SocketAddrV4, +} + +impl RepairXdpSender { + pub fn new(sender: tx::XdpSender, src_addr: SocketAddrV4) -> Self { + Self { sender, src_addr } + } + + #[inline] + pub fn try_send( + &self, + sender_index: usize, + addr: impl Into, + payload: Bytes, + ) -> Result<(), TrySendError> { + self.sender.try_send( + sender_index, + tx::BytesTxPacket::new(self.src_addr, addr, None, payload), + ) + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 1dcc8fb67de..9885953ee85 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -21,6 +21,7 @@ use { repair::{ block_id_repair_service::BlockIdRepairChannels, repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels}, + xdp_sender::RepairXdpSender, }, replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig}, shred_fetch_stage::{SHRED_FETCH_CHANNEL_SIZE, ShredFetchStage}, @@ -145,6 +146,7 @@ pub struct TvuConfig { pub shred_sigverify_threads: NonZeroUsize, pub bls_sigverify_threads: NonZeroUsize, pub turbine_xdp_sender: Option, + pub repair_xdp_sender: Option, } impl Default for TvuConfig { @@ -160,6 +162,7 @@ impl Default for TvuConfig { shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), bls_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), turbine_xdp_sender: None, + repair_xdp_sender: None, } } } @@ -476,6 +479,7 @@ impl Tvu { leader_schedule_cache.clone(), tvu_config.shred_version, outstanding_repair_requests, + tvu_config.repair_xdp_sender, ) }; diff --git a/core/src/validator.rs b/core/src/validator.rs index 5eabe12a8b6..8e6f9b93c8d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -17,7 +17,10 @@ use { }, forwarding_stage::ForwardingClientConfig, repair::{ - self, repair_handler::RepairHandlerType, serve_repair_service::ServeRepairService, + self, + repair_handler::RepairHandlerType, + serve_repair_service::ServeRepairService, + xdp_sender::RepairXdpSender, }, resource_limits::{ResourceLimitError, adjust_nofile_limit}, sample_performance_service::SamplePerformanceService, @@ -687,7 +690,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, tpu_config: ValidatorTpuConfig, admin_rpc_service_post_init: Arc>>, - xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4)>, + xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4, SocketAddrV4)>, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); Self::new_with_exit( @@ -722,7 +725,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, tpu_config: ValidatorTpuConfig, admin_rpc_service_post_init: Arc>>, - xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4)>, + xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4, SocketAddrV4)>, exit: Arc, ) -> Result { #[cfg(debug_assertions)] @@ -1571,16 +1574,21 @@ impl Validator { // This channel backing up indicates a serious problem in votor let (votor_event_sender, votor_event_receiver) = bounded(1000); - let (xdp_transmitter, turbine_xdp_sender, quic_xdp_sender) = - if let Some((xdp_transmit_builder, src_addr)) = xdp_builder_with_src_addr { + let (xdp_transmitter, turbine_xdp_sender, quic_xdp_sender, repair_xdp_sender) = + if let Some((xdp_transmit_builder, turbine_src_addr, repair_src_addr)) = + xdp_builder_with_src_addr + { let (transmitter, sender) = xdp_transmit_builder.build(); + // Use protocol-specific source ports so turbine and repair replies route correctly. + let repair_sender = RepairXdpSender::new(sender.clone(), repair_src_addr); ( Some(transmitter), - Some(XdpSender::new(sender.clone(), src_addr)), - Some((sender, *src_addr.ip())), + Some(XdpSender::new(sender.clone(), turbine_src_addr)), + Some((sender, *turbine_src_addr.ip())), + Some(repair_sender), ) } else { - (None, None, None) + (None, None, None, None) }; // disable all2all tests if not allowed for a given cluster type @@ -1641,6 +1649,7 @@ impl Validator { shred_sigverify_threads: config.tvu_shred_sigverify_threads, bls_sigverify_threads: config.tvu_bls_sigverify_threads, turbine_xdp_sender: turbine_xdp_sender.clone(), + repair_xdp_sender, }, &max_slots, block_metadata_notifier, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 63bfba32004..82957906c1f 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -10,6 +10,7 @@ use { repair_service::{ OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels, }, + xdp_sender::RepairXdpSender, }, result::{Error, Result}, }, @@ -276,6 +277,7 @@ impl WindowService { leader_schedule_cache: Arc, shred_version: u16, outstanding_repair_requests: Arc>, + repair_xdp_sender: Option, ) -> WindowService { let cluster_info = repair_info.cluster_info.clone(); let bank_forks = repair_info.bank_forks.clone(); @@ -297,6 +299,7 @@ impl WindowService { repair_info.clone(), outstanding_repair_requests.clone(), repair_service_channels, + repair_xdp_sender, ); let block_id_repair_service = BlockIdRepairService::new( diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 7833618c1f8..ea0ac532f8f 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -645,6 +645,7 @@ impl AdminRpc for AdminRpcImpl { slot, shred_index, &post_init.repair_socket, + None, post_init.outstanding_repair_requests.clone(), ); Ok(()) diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 4506aea6110..1ce5b9cedb7 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -376,7 +376,13 @@ pub fn execute( std::net::SocketAddrV4, }; - let src_port = node.sockets.retransmit_sockets[0] + let turbine_src_port = node.sockets.retransmit_sockets[0] + .local_addr() + .expect("failed to get local address") + .port(); + let repair_src_port = node + .sockets + .repair .local_addr() .expect("failed to get local address") .port(); @@ -398,7 +404,8 @@ pub fn execute( ( TransmitterBuilder::new(xdp_config, exit.clone()) .expect("failed to create xdp transmitter"), - SocketAddrV4::new(src_ip, src_port), + SocketAddrV4::new(src_ip, turbine_src_port), + SocketAddrV4::new(src_ip, repair_src_port), ) });