diff --git a/Cargo.toml b/Cargo.toml index d13bf7e..1873d54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ base64 = { version = "0.22", optional = true } bytes = "1.7.1" futures-channel = { version = "0.3", optional = true } futures-core = { version = "0.3" } -futures-util = { version = "0.3.16", default-features = false, optional = true } +futures-util = { version = "0.3.16", default-features = false, features = ["alloc"], optional = true } http = "1.0" http-body = "1.0.0" hyper = "1.8.0" @@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } tokio-test = "0.4" +tokio-util = { version = "0.7", features = ["rt"] } tower-test = "0.4" pretty_env_logger = "0.5" diff --git a/src/client/legacy/connect/dns.rs b/src/client/legacy/connect/dns.rs index abeb2cc..4e242fe 100644 --- a/src/client/legacy/connect/dns.rs +++ b/src/client/legacy/connect/dns.rs @@ -240,8 +240,96 @@ impl SocketAddrs { pub(super) fn len(&self) -> usize { self.iter.as_slice().len() } + + /// Create an interleaved address iterator per RFC 8305 (Happy Eyeballs v2) Section 4. + /// + /// Takes `first_family_count` addresses from the preferred family, + /// then interleaves remaining addresses: one fallback, one preferred, repeat. + /// + /// Input: `[v6_1, v6_2, v4_1, v4_2]` (IPv6 preferred) + /// Output: `[v6_1, v4_1, v6_2, v4_2]` (with `first_family_count=1`) + pub(super) fn interleave_by_family(self, first_family_count: usize) -> InterleavedAddrs { + InterleavedAddrs::new(self, first_family_count) + } +} + +/// Iterator over addresses interleaved by family per RFC 8305 (Happy Eyeballs v2). +pub(super) struct InterleavedAddrs { + inner: vec::IntoIter, + total: usize, +} + +impl InterleavedAddrs { + fn new(addrs: SocketAddrs, first_family_count: usize) -> Self { + let addrs: Vec<_> = addrs.iter.collect(); + let total = addrs.len(); + + if addrs.is_empty() { + return InterleavedAddrs { + inner: Vec::new().into_iter(), + total: 0, + }; + } + + // Determine preferred family from first address + let prefer_ipv6 = addrs[0].is_ipv6(); + + let (mut preferred, fallback): (Vec<_>, Vec<_>) = if prefer_ipv6 { + addrs.into_iter().partition(|a| a.is_ipv6()) + } else { + addrs.into_iter().partition(|a| a.is_ipv4()) + }; + + let mut result = Vec::with_capacity(total); + + // Take first_family_count from preferred + let take_count = first_family_count.min(preferred.len()); + result.extend(preferred.drain(..take_count)); + + // Interleave remaining: fallback, preferred, fallback, preferred... + let mut pref_iter = preferred.into_iter(); + let mut fall_iter = fallback.into_iter(); + + loop { + match (fall_iter.next(), pref_iter.next()) { + (Some(f), Some(p)) => { + result.push(f); + result.push(p); + } + (Some(f), None) => result.push(f), + (None, Some(p)) => result.push(p), + (None, None) => break, + } + } + + InterleavedAddrs { + inner: result.into_iter(), + total, + } + } + + /// Total number of addresses (original count before any iteration). + pub(super) fn total(&self) -> usize { + self.total + } +} + +impl Iterator for InterleavedAddrs { + type Item = SocketAddr; + + #[inline] + fn next(&mut self) -> Option { + self.inner.next() + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } } +impl ExactSizeIterator for InterleavedAddrs {} + impl Iterator for SocketAddrs { type Item = SocketAddr; #[inline] @@ -357,4 +445,101 @@ mod tests { assert_eq!(name.as_str(), DOMAIN); assert_eq!(name.to_string(), DOMAIN); } + + // === RFC 8305 Address Interleaving Tests === + + #[test] + fn test_interleave_by_family_basic() { + // IPv6 preferred (first in list), interleave with IPv4 + let v6_1: SocketAddr = "[2001:db8::1]:80".parse().unwrap(); + let v6_2: SocketAddr = "[2001:db8::2]:80".parse().unwrap(); + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v6_1, v6_2, v4_1, v4_2]); + let result: Vec<_> = addrs.interleave_by_family(1).collect(); + + // RFC 8305: first_family_count=1 means v6, v4, v6, v4... + assert_eq!(result, vec![v6_1, v4_1, v6_2, v4_2]); + } + + #[test] + fn test_interleave_by_family_empty() { + let addrs = SocketAddrs::new(vec![]); + let result: Vec<_> = addrs.interleave_by_family(1).collect(); + assert!(result.is_empty()); + } + + #[test] + fn test_interleave_by_family_single_family() { + // All IPv4 - no interleaving needed + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + let v4_3: SocketAddr = "192.0.2.3:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v4_1, v4_2, v4_3]); + let result: Vec<_> = addrs.interleave_by_family(1).collect(); + + assert_eq!(result, vec![v4_1, v4_2, v4_3]); + } + + #[test] + fn test_interleave_by_family_count_2() { + // first_family_count=2: take 2 from preferred, then interleave + let v6_1: SocketAddr = "[2001:db8::1]:80".parse().unwrap(); + let v6_2: SocketAddr = "[2001:db8::2]:80".parse().unwrap(); + let v6_3: SocketAddr = "[2001:db8::3]:80".parse().unwrap(); + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v6_1, v6_2, v6_3, v4_1, v4_2]); + let result: Vec<_> = addrs.interleave_by_family(2).collect(); + + // First 2 v6, then interleave: v4, v6, v4 + assert_eq!(result, vec![v6_1, v6_2, v4_1, v6_3, v4_2]); + } + + #[test] + fn test_interleave_by_family_count_0() { + // first_family_count=0: immediate interleave, fallback first + let v6_1: SocketAddr = "[2001:db8::1]:80".parse().unwrap(); + let v6_2: SocketAddr = "[2001:db8::2]:80".parse().unwrap(); + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v6_1, v6_2, v4_1, v4_2]); + let result: Vec<_> = addrs.interleave_by_family(0).collect(); + + // Fallback first: v4, v6, v4, v6 + assert_eq!(result, vec![v4_1, v6_1, v4_2, v6_2]); + } + + #[test] + fn test_interleave_by_family_count_exceeds() { + // first_family_count exceeds available preferred addresses + let v6_1: SocketAddr = "[2001:db8::1]:80".parse().unwrap(); + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v6_1, v4_1, v4_2]); + let result: Vec<_> = addrs.interleave_by_family(5).collect(); + + // Only 1 v6, take it, then all v4s + assert_eq!(result, vec![v6_1, v4_1, v4_2]); + } + + #[test] + fn test_interleave_by_family_ipv4_preferred() { + // IPv4 first in list means IPv4 is preferred + let v4_1: SocketAddr = "192.0.2.1:80".parse().unwrap(); + let v4_2: SocketAddr = "192.0.2.2:80".parse().unwrap(); + let v6_1: SocketAddr = "[2001:db8::1]:80".parse().unwrap(); + let v6_2: SocketAddr = "[2001:db8::2]:80".parse().unwrap(); + + let addrs = SocketAddrs::new(vec![v4_1, v4_2, v6_1, v6_2]); + let result: Vec<_> = addrs.interleave_by_family(1).collect(); + + // v4 preferred: v4, v6, v4, v6 + assert_eq!(result, vec![v4_1, v6_1, v4_2, v6_2]); + } } diff --git a/src/client/legacy/connect/http.rs b/src/client/legacy/connect/http.rs index 0236cc8..f187ca1 100644 --- a/src/client/legacy/connect/http.rs +++ b/src/client/legacy/connect/http.rs @@ -11,6 +11,7 @@ use std::time::Duration; use futures_core::ready; use futures_util::future::Either; +use futures_util::stream::{FuturesUnordered, StreamExt as _}; use http::uri::{Scheme, Uri}; use pin_project_lite::pin_project; use socket2::TcpKeepalive; @@ -91,6 +92,12 @@ struct Config { interface: Option, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] tcp_user_timeout: Option, + /// RFC 8305: Delay between staggered connection attempts. + /// When Some(_), enables RFC 8305 mode. When None (default), uses RFC 6555 mode. + connection_attempt_delay: Option, + /// RFC 8305: Number of preferred family addresses before interleaving. + /// Clamped to 0..=8 to prevent fallback family starvation. + first_address_family_count: u8, } #[derive(Default, Debug, Clone, Copy)] @@ -252,6 +259,8 @@ impl HttpConnector { interface: None, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] tcp_user_timeout: None, + connection_attempt_delay: None, + first_address_family_count: 1, }), resolver, } @@ -361,11 +370,53 @@ impl HttpConnector { /// Default is 300 milliseconds. /// /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 + #[deprecated( + since = "0.1.20", + note = "Use set_connection_attempt_delay for Happy Eyeballs v2 (RFC 8305) support" + )] #[inline] pub fn set_happy_eyeballs_timeout(&mut self, dur: Option) { self.config_mut().happy_eyeballs_timeout = dur; } + /// Set the delay between connection attempts for [RFC 8305 (Happy Eyeballs v2)][RFC 8305]. + /// + /// When set to `Some(duration)`, enables RFC 8305 mode: + /// - Addresses are interleaved by family (IPv6, IPv4, IPv6, IPv4, ...) + /// - New connection attempts start every `duration` + /// - Multiple connections race in parallel + /// + /// When `None` (default), uses RFC 6555 behavior with `happy_eyeballs_timeout`. + /// + /// Recommended value is `Some(Duration::from_millis(250))` per RFC 8305. + /// Values below 10ms are clamped to 10ms per RFC 8305 Section 5. + /// + /// [RFC 8305]: https://tools.ietf.org/html/rfc8305 + #[inline] + pub fn set_connection_attempt_delay(&mut self, dur: Option) { + self.config_mut().connection_attempt_delay = dur.map(|d| d.max(Duration::from_millis(10))); + } + + /// Set the number of addresses from the preferred family to try before interleaving. + /// + /// This controls how many addresses from the preferred family (determined by + /// the first resolved address) are attempted before interleaving with the + /// fallback family. + /// + /// - `1` (default): `[v6_1, v4_1, v6_2, v4_2, ...]` + /// - `2`: `[v6_1, v6_2, v4_1, v6_3, v4_2, ...]` + /// - `0`: Immediately interleave starting with fallback family + /// + /// Values above 8 are clamped to 8 to prevent fallback family starvation. + /// + /// Only takes effect when RFC 8305 mode is enabled via [`set_connection_attempt_delay`]. + /// + /// [`set_connection_attempt_delay`]: Self::set_connection_attempt_delay + #[inline] + pub fn set_first_address_family_count(&mut self, count: u8) { + self.config_mut().first_address_family_count = count.min(8); + } + /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. /// /// Default is `false`. @@ -703,26 +754,38 @@ impl StdError for ConnectError { } } -struct ConnectingTcp<'a> { - preferred: ConnectingTcpRemote, - fallback: Option, - config: &'a Config, +/// Connection state machine that supports both RFC 6555 and RFC 8305 modes. +enum ConnectingTcp<'a> { + /// RFC 6555 mode: preferred/fallback with single delay + V1 { + preferred: ConnectingTcpRemote, + fallback: Option, + config: &'a Config, + }, + /// RFC 8305 mode: interleaved addresses with staggered attempts + V2(ConnectingTcpV2<'a>), } impl<'a> ConnectingTcp<'a> { fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { + // Use RFC 8305 mode if connection_attempt_delay is set + if config.connection_attempt_delay.is_some() { + return ConnectingTcp::V2(ConnectingTcpV2::new(remote_addrs, config)); + } + + // Otherwise use RFC 6555 mode (unchanged behavior) if let Some(fallback_timeout) = config.happy_eyeballs_timeout { let (preferred_addrs, fallback_addrs) = remote_addrs .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); if fallback_addrs.is_empty() { - return ConnectingTcp { + return ConnectingTcp::V1 { preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), fallback: None, config, }; } - ConnectingTcp { + ConnectingTcp::V1 { preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), fallback: Some(ConnectingTcpFallback { delay: tokio::time::sleep(fallback_timeout), @@ -731,7 +794,7 @@ impl<'a> ConnectingTcp<'a> { config, } } else { - ConnectingTcp { + ConnectingTcp::V1 { preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), fallback: None, config, @@ -740,6 +803,150 @@ impl<'a> ConnectingTcp<'a> { } } +/// RFC 8305 (Happy Eyeballs v2) staggered connection state machine. +struct ConnectingTcpV2<'a> { + addresses: dns::InterleavedAddrs, + total_addrs: usize, + config: &'a Config, +} + +type BoxTcpConnecting = Pin> + Send>>; + +impl<'a> ConnectingTcpV2<'a> { + fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { + let addresses = + remote_addrs.interleave_by_family(config.first_address_family_count as usize); + let total_addrs = addresses.total(); + + Self { + addresses, + total_addrs, + config, + } + } + + async fn connect(mut self) -> Result { + if self.total_addrs == 0 { + return Err(ConnectError::new( + "tcp connect error", + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "no addresses to connect to", + ), + )); + } + + let connect_timeout = self + .config + .connect_timeout + .and_then(|t| t.checked_div(self.total_addrs as u32)); + + // RFC 8305 recommends 250ms Connection Attempt Delay + let stagger_delay = self + .config + .connection_attempt_delay + .unwrap_or(Duration::from_millis(250)); + + let mut first_error = None; + let mut active = FuturesUnordered::new(); + + // Start first connection + active.push(self.start_next(connect_timeout)?); + + loop { + // Handle empty active list (all connections failed) + if active.is_empty() { + if self.addresses.len() == 0 { + return Err(first_error.unwrap_or_else(|| { + ConnectError::new( + "tcp connect error", + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "Network unreachable", + ), + ) + })); + } + match self.start_next(connect_timeout) { + Ok(conn) => active.push(conn), + Err(e) => { + if first_error.is_none() { + first_error = Some(e); + } + continue; + } + } + } + + // If there are more addresses to try, race connections against stagger timer + if self.addresses.len() > 0 { + let stagger_timer = tokio::time::sleep(stagger_delay); + futures_util::pin_mut!(stagger_timer); + + match futures_util::future::select(active.next(), stagger_timer).await { + Either::Left((Some(result), _timer)) => match result { + Ok(stream) => return Ok(stream), + Err(e) => { + trace!("connection error: {:?}", e); + if first_error.is_none() { + first_error = Some(e); + } + continue; + } + }, + Either::Left((None, _timer)) => continue, + Either::Right(((), _next)) => { + match self.start_next(connect_timeout) { + Ok(conn) => active.push(conn), + Err(e) => { + trace!("connection start error: {:?}", e); + if first_error.is_none() { + first_error = Some(e); + } + } + } + continue; + } + } + } else { + // No more addresses to try, just wait for any connection to complete + match active.next().await { + Some(Ok(stream)) => return Ok(stream), + Some(Err(e)) => { + trace!("connection error: {:?}", e); + if first_error.is_none() { + first_error = Some(e); + } + continue; + } + None => continue, + } + } + } + } + + fn start_next( + &mut self, + connect_timeout: Option, + ) -> Result { + let addr = self.addresses.next().ok_or_else(|| { + ConnectError::new( + "tcp connect error", + std::io::Error::new(std::io::ErrorKind::NotConnected, "No more addresses"), + ) + })?; + + let attempted = self.total_addrs - self.addresses.len(); + debug!( + "Happy Eyeballs v2: starting connection {}/{} to {}", + attempted, self.total_addrs, addr + ); + + let fut = connect(&addr, self.config, connect_timeout)?; + Ok(Box::pin(fut)) + } +} + struct ConnectingTcpFallback { delay: Sleep, remote: ConnectingTcpRemote, @@ -950,40 +1157,47 @@ fn connect( } impl ConnectingTcp<'_> { - async fn connect(mut self) -> Result { - match self.fallback { - None => self.preferred.connect(self.config).await, - Some(mut fallback) => { - let preferred_fut = self.preferred.connect(self.config); - futures_util::pin_mut!(preferred_fut); - - let fallback_fut = fallback.remote.connect(self.config); - futures_util::pin_mut!(fallback_fut); - - let fallback_delay = fallback.delay; - futures_util::pin_mut!(fallback_delay); - - let (result, future) = - match futures_util::future::select(preferred_fut, fallback_delay).await { - Either::Left((result, _fallback_delay)) => { - (result, Either::Right(fallback_fut)) - } - Either::Right(((), preferred_fut)) => { - // Delay is done, start polling both the preferred and the fallback - futures_util::future::select(preferred_fut, fallback_fut) - .await - .factor_first() - } - }; - - if result.is_err() { - // Fallback to the remaining future (could be preferred or fallback) - // if we get an error - future.await - } else { - result + async fn connect(self) -> Result { + match self { + ConnectingTcp::V1 { + mut preferred, + fallback, + config, + } => match fallback { + None => preferred.connect(config).await, + Some(mut fallback) => { + let preferred_fut = preferred.connect(config); + futures_util::pin_mut!(preferred_fut); + + let fallback_fut = fallback.remote.connect(config); + futures_util::pin_mut!(fallback_fut); + + let fallback_delay = fallback.delay; + futures_util::pin_mut!(fallback_delay); + + let (result, future) = + match futures_util::future::select(preferred_fut, fallback_delay).await { + Either::Left((result, _fallback_delay)) => { + (result, Either::Right(fallback_fut)) + } + Either::Right(((), preferred_fut)) => { + // Delay is done, start polling both the preferred and the fallback + futures_util::future::select(preferred_fut, fallback_fut) + .await + .factor_first() + } + }; + + if result.is_err() { + // Fallback to the remaining future (could be preferred or fallback) + // if we get an error + future.await + } else { + result + } } - } + }, + ConnectingTcp::V2(v2) => v2.connect().await, } } } @@ -1007,7 +1221,7 @@ mod tests { use crate::client::legacy::connect::http::TcpKeepaliveConfig; use super::super::sealed::{Connect, ConnectSvc}; - use super::{Config, ConnectError, HttpConnector}; + use super::{dns, Config, ConnectError, ConnectingTcp, HttpConnector}; use super::set_port; @@ -1314,6 +1528,8 @@ mod tests { target_os = "linux" ))] tcp_user_timeout: None, + connection_attempt_delay: None, + first_address_family_count: 1, }; let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg); let start = Instant::now(); @@ -1447,4 +1663,212 @@ mod tests { set_port(&mut addr, 443, false); assert_eq!(addr.port(), 443); } + + #[test] + fn test_connection_attempt_delay_default() { + let connector = HttpConnector::new(); + let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(vec![]), &connector.config); + // Default should be Happy Eyeballs v1 for backward compatibility + assert!(matches!(connecting_tcp, ConnectingTcp::V1 { .. })); + } + + #[test] + fn test_set_connection_attempt_delay() { + let mut connector = HttpConnector::new(); + connector.set_connection_attempt_delay(Some(Duration::from_millis(250))); + let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(vec![]), &connector.config); + // Setting connection_attempt_delay should use Happy Eyeballs v2 + assert!(matches!(connecting_tcp, ConnectingTcp::V2 { .. })); + } + + #[test] + fn test_connection_attempt_delay_clamped_to_10ms() { + let mut connector = HttpConnector::new(); + + // Setting 0ms should clamp to 10ms per RFC 8305 + connector.set_connection_attempt_delay(Some(Duration::from_millis(0))); + assert_eq!( + connector.config.connection_attempt_delay, + Some(Duration::from_millis(10)) + ); + + // Setting 5ms should clamp to 10ms + connector.set_connection_attempt_delay(Some(Duration::from_millis(5))); + assert_eq!( + connector.config.connection_attempt_delay, + Some(Duration::from_millis(10)) + ); + + // Setting 100ms should stay 100ms + connector.set_connection_attempt_delay(Some(Duration::from_millis(100))); + assert_eq!( + connector.config.connection_attempt_delay, + Some(Duration::from_millis(100)) + ); + + // Setting None should stay None + connector.set_connection_attempt_delay(None); + assert!(connector.config.connection_attempt_delay.is_none()); + } + + #[test] + fn test_first_address_family_count_clamped() { + let mut connector = HttpConnector::new(); + + // Setting 100 should clamp to 8 + connector.set_first_address_family_count(100); + assert_eq!(connector.config.first_address_family_count, 8); + + // Setting 0 should stay 0 + connector.set_first_address_family_count(0); + assert_eq!(connector.config.first_address_family_count, 0); + } + + #[test] + fn test_deprecated_happy_eyeballs_timeout_still_works() { + let mut connector = HttpConnector::new(); + #[allow(deprecated)] + connector.set_happy_eyeballs_timeout(Some(Duration::from_millis(500))); + + // Should still set the value (deprecated but functional) + assert_eq!( + connector.config.happy_eyeballs_timeout, + Some(Duration::from_millis(500)) + ); + } + + /// Returns true if Happy Eyeballs v2 tests can run on this system. + /// Checks: IPv4 localhost bindable, IPv6 available, RFC 6666 prefix routable. + fn can_run_happy_eyeballs_v2_test() -> bool { + use std::net::{SocketAddr, TcpListener, TcpStream}; + + // Check IPv4 localhost + if TcpListener::bind("127.0.0.1:0").is_err() { + eprintln!("IPv4 localhost not bindable"); + return false; + } + + // Check IPv6 localhost + if TcpListener::bind("[::1]:0").is_err() { + eprintln!("IPv6 localhost not bindable"); + return false; + } + + // Check RFC 6666 discard prefix is routable (connection should timeout, not "no route") + let addr: SocketAddr = "[100::1]:80".parse().unwrap(); + match TcpStream::connect_timeout(&addr, Duration::from_millis(100)) { + Err(e) if e.kind() == io::ErrorKind::TimedOut => true, // Good - routable but blackholed + Err(e) if e.kind() == io::ErrorKind::NetworkUnreachable => { + // No route + eprintln!("No route to RFC 6666 discard prefix"); + false + } + _ => true, // Other errors (connection refused, etc.) - assume routable + } + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_happy_eyeballs_v2_staggered_fallback() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + + if !can_run_happy_eyeballs_v2_test() { + eprintln!( + "Skipping test_happy_eyeballs_v2_staggered_fallback: network requirements not met" + ); + return; + } + + let server = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = server.local_addr().unwrap().port(); + + let _handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(async move { + let _ = server.accept().await; + })); + + let addrs = vec![ + // Blackhole IPv6 address (will timeout) + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1)), port), + // IPv4 localhost (will succeed) + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + ]; + + let mut http_connector = HttpConnector::new(); + http_connector.set_connection_attempt_delay(Some(Duration::from_millis(250))); + + let connecting = + super::ConnectingTcp::new(dns::SocketAddrs::new(addrs), &http_connector.config); + assert!(matches!(connecting, super::ConnectingTcp::V2 { .. })); + let result = connecting.connect().await; + dbg!(&result); + assert_eq!( + result.unwrap().peer_addr().unwrap(), + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port) + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_happy_eyeballs_v2_immediate_success() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + + if !can_run_happy_eyeballs_v2_test() { + eprintln!( + "Skipping test_happy_eyeballs_v2_immediate_success: network requirements not met" + ); + return; + } + + let server = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = server.local_addr().unwrap().port(); + let _handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(async move { + let _ = server.accept().await; + })); + + // IPv4 first (will succeed immediately) - no stagger delay needed + let addrs = vec![ + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1)), port), + ]; + + let mut http_connector = HttpConnector::new(); + http_connector.set_connection_attempt_delay(Some(Duration::from_millis(250))); + + let connecting = + super::ConnectingTcp::new(dns::SocketAddrs::new(addrs), &http_connector.config); + assert!(matches!(connecting, super::ConnectingTcp::V2 { .. })); + let result = connecting.connect().await; + assert_eq!( + result.unwrap().peer_addr().unwrap(), + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port) + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_happy_eyeballs_v2_timeout() { + use std::net::{IpAddr, Ipv6Addr, SocketAddr}; + + if !can_run_happy_eyeballs_v2_test() { + eprintln!("Skipping test_happy_eyeballs_v2_timeout: network requirements not met"); + return; + } + + // All blackhole IPv6 addresses (will timeout) + let addrs = vec![ + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1)), 4242), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 2)), 4242), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 3)), 4242), + ]; + + let mut http_connector = HttpConnector::new(); + http_connector.set_connection_attempt_delay(Some(Duration::from_millis(250))); + http_connector.set_connect_timeout(Some(Duration::from_secs(2))); + + let connecting = + super::ConnectingTcp::new(dns::SocketAddrs::new(addrs), &http_connector.config); + assert!(matches!(connecting, super::ConnectingTcp::V2 { .. })); + let result = connecting.connect().await; + assert_eq!(result.unwrap_err().msg, "tcp connect error"); + } }