From 935ef504a7c7d4261b58959df96c9aaccae205f2 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 25 Jan 2026 18:07:25 +0800 Subject: [PATCH 1/4] chore: introduce SecurityTokenManager to update token in the background --- crates/fluss/src/client/credentials.rs | 396 +++++++++++++++----- crates/fluss/src/client/table/remote_log.rs | 44 ++- crates/fluss/src/client/table/scanner.rs | 45 ++- 3 files changed, 350 insertions(+), 135 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index c520b441..4d5f108a 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -19,13 +19,28 @@ use crate::client::metadata::Metadata; use crate::error::{Error, Result}; use crate::rpc::RpcClient; use crate::rpc::message::GetSecurityTokenRequest; +use log::{debug, info, warn}; use parking_lot::RwLock; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::{oneshot, watch}; +use tokio::task::JoinHandle; -const CACHE_TTL: Duration = Duration::from_secs(3600); +/// Default renewal time ratio - refresh at 80% of token lifetime +const DEFAULT_TOKEN_RENEWAL_RATIO: f64 = 0.8; +/// Default retry backoff when token fetch fails +const DEFAULT_RENEWAL_RETRY_BACKOFF: Duration = Duration::from_secs(60); +/// Minimum delay between refreshes +const MIN_RENEWAL_DELAY: Duration = Duration::from_secs(1); +/// Default refresh interval for tokens without expiration (never expires) +const DEFAULT_NON_EXPIRING_REFRESH_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour + +/// Type alias for credentials properties receiver +/// - `None` = not yet fetched, should wait +/// - `Some(HashMap)` = fetched (may be empty if no auth needed) +pub type CredentialsReceiver = watch::Receiver>>; #[derive(Debug, Deserialize)] struct Credentials { @@ -34,48 +49,6 @@ struct Credentials { security_token: Option, } -struct CachedToken { - access_key_id: String, - secret_access_key: String, - security_token: Option, - addition_infos: HashMap, - cached_at: Instant, -} - -impl CachedToken { - fn to_remote_fs_props(&self) -> HashMap { - let mut props = HashMap::new(); - - props.insert("access_key_id".to_string(), self.access_key_id.clone()); - props.insert( - "secret_access_key".to_string(), - self.secret_access_key.clone(), - ); - - if let Some(token) = &self.security_token { - props.insert("security_token".to_string(), token.clone()); - } - - for (key, value) in &self.addition_infos { - if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) { - let final_value = if transform { - // Invert boolean value (path_style_access -> enable_virtual_host_style) - if value == "true" { - "false".to_string() - } else { - "true".to_string() - } - } else { - value.clone() - }; - props.insert(opendal_key, final_value); - } - } - - props - } -} - /// Returns (opendal_key, needs_inversion) /// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> { @@ -88,53 +61,234 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> { } } -pub struct CredentialsCache { - inner: RwLock>, +/// Build remote filesystem props from credentials and additional info +fn build_remote_fs_props( + credentials: &Credentials, + addition_infos: &HashMap, +) -> HashMap { + let mut props = HashMap::new(); + + props.insert( + "access_key_id".to_string(), + credentials.access_key_id.clone(), + ); + props.insert( + "secret_access_key".to_string(), + credentials.access_key_secret.clone(), + ); + + if let Some(token) = &credentials.security_token { + props.insert("security_token".to_string(), token.clone()); + } + + for (key, value) in addition_infos { + if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) { + let final_value = if transform { + // Invert boolean value (path_style_access -> enable_virtual_host_style) + if value == "true" { + "false".to_string() + } else { + "true".to_string() + } + } else { + value.clone() + }; + props.insert(opendal_key, final_value); + } + } + + props +} + +/// Manager for security tokens that refreshes tokens in a background task. +/// +/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where +/// a background thread periodically refreshes tokens based on their expiration time. +/// +/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers. +/// Consumers can subscribe by calling `subscribe()` to get a receiver. +/// +/// The channel value is `Option`: +/// - `None` = not yet fetched, consumers should wait +/// - `Some(HashMap)` = fetched (may be empty if no auth needed) +/// +/// # Example +/// ```ignore +/// let manager = SecurityTokenManager::new(rpc_client, metadata); +/// let credentials_rx = manager.subscribe(); +/// manager.start(); +/// +/// // Consumer can get latest credentials via: +/// let props = credentials_rx.borrow().clone(); +/// ``` +pub struct SecurityTokenManager { rpc_client: Arc, metadata: Arc, + token_renewal_ratio: f64, + renewal_retry_backoff: Duration, + /// Watch channel sender for broadcasting token updates + credentials_tx: watch::Sender>>, + /// Watch channel receiver (kept to allow cloning for new subscribers) + credentials_rx: watch::Receiver>>, + /// Handle to the background refresh task + task_handle: RwLock>>, + /// Sender to signal shutdown + shutdown_tx: RwLock>>, } -impl CredentialsCache { +impl SecurityTokenManager { pub fn new(rpc_client: Arc, metadata: Arc) -> Self { + let (credentials_tx, credentials_rx) = watch::channel(None); Self { - inner: RwLock::new(None), rpc_client, metadata, + token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO, + renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF, + credentials_tx, + credentials_rx, + task_handle: RwLock::new(None), + shutdown_tx: RwLock::new(None), + } + } + + /// Subscribe to credential updates. + /// Returns a receiver that always contains the latest credentials. + /// Consumers can call `receiver.borrow()` to get the current value. + pub fn subscribe(&self) -> CredentialsReceiver { + self.credentials_rx.clone() + } + + /// Start the background token refresh task. + /// This should be called once after creating the manager. + pub fn start(&self) { + if self.task_handle.read().is_some() { + warn!("SecurityTokenManager is already started"); + return; + } + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + *self.shutdown_tx.write() = Some(shutdown_tx); + + let rpc_client = Arc::clone(&self.rpc_client); + let metadata = Arc::clone(&self.metadata); + let token_renewal_ratio = self.token_renewal_ratio; + let renewal_retry_backoff = self.renewal_retry_backoff; + let credentials_tx = self.credentials_tx.clone(); + + let handle = tokio::spawn(async move { + Self::token_refresh_loop( + rpc_client, + metadata, + token_renewal_ratio, + renewal_retry_backoff, + credentials_tx, + shutdown_rx, + ) + .await; + }); + + *self.task_handle.write() = Some(handle); + info!("SecurityTokenManager started"); + } + + /// Stop the background token refresh task. + pub fn stop(&self) { + if let Some(tx) = self.shutdown_tx.write().take() { + let _ = tx.send(()); + } + if let Some(handle) = self.task_handle.write().take() { + handle.abort(); } + info!("SecurityTokenManager stopped"); } - pub async fn get_or_refresh(&self) -> Result> { - { - let guard = self.inner.read(); - if let Some(cached) = guard.as_ref() { - if cached.cached_at.elapsed() < CACHE_TTL { - return Ok(cached.to_remote_fs_props()); + /// Background task that periodically refreshes tokens. + async fn token_refresh_loop( + rpc_client: Arc, + metadata: Arc, + token_renewal_ratio: f64, + renewal_retry_backoff: Duration, + credentials_tx: watch::Sender>>, + mut shutdown_rx: oneshot::Receiver<()>, + ) { + info!("Starting token refresh loop"); + + loop { + // Fetch token and send to channel + let result = Self::fetch_token(&rpc_client, &metadata).await; + + let next_delay = match result { + Ok((props, expiration_time)) => { + // Send credentials via watch channel (Some indicates fetched) + if let Err(e) = credentials_tx.send(Some(props)) { + log::debug!("No active subscribers for credentials update: {:?}", e); + } + + // Calculate next renewal delay based on expiration time + if let Some(exp_time) = expiration_time { + Self::calculate_renewal_delay(exp_time, token_renewal_ratio) + } else { + // No expiration time - token never expires, use long refresh interval + log::info!( + "Token has no expiration time (never expires), next refresh in {:?}", + DEFAULT_NON_EXPIRING_REFRESH_INTERVAL + ); + DEFAULT_NON_EXPIRING_REFRESH_INTERVAL + } + } + Err(e) => { + log::warn!( + "Failed to obtain security token: {:?}, will retry in {:?}", + e, + renewal_retry_backoff + ); + renewal_retry_backoff + } + }; + + log::debug!("Next token refresh in {:?}", next_delay); + + // Wait for either the delay to elapse or shutdown signal + tokio::select! { + _ = tokio::time::sleep(next_delay) => { + // Continue to next iteration to refresh + } + _ = &mut shutdown_rx => { + log::info!("Token refresh loop received shutdown signal"); + break; } } } - - self.refresh_from_server().await } - async fn refresh_from_server(&self) -> Result> { - let cluster = self.metadata.get_cluster(); - let server_node = cluster - .get_one_available_server() - .expect("no tablet server available"); - let conn = self.rpc_client.get_connection(server_node).await?; + /// Fetch token from server. + /// Returns the props and expiration time if available. + async fn fetch_token( + rpc_client: &Arc, + metadata: &Arc, + ) -> Result<(HashMap, Option)> { + let cluster = metadata.get_cluster(); + let server_node = + cluster + .get_one_available_server() + .ok_or_else(|| Error::UnexpectedError { + message: "No tablet server available for token refresh".to_string(), + source: None, + })?; + let conn = rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); let response = conn.request(request).await?; - // the token may be empty if the remote filesystem - // doesn't require token to access + // The token may be empty if remote filesystem doesn't require authentication if response.token.is_empty() { - return Ok(HashMap::new()); + log::info!("Empty token received, remote filesystem may not require authentication"); + return Ok((HashMap::new(), response.expiration_time)); } let credentials: Credentials = serde_json::from_slice(&response.token).map_err(|e| Error::JsonSerdeError { - message: format!("Error when parse token from server: {e}"), + message: format!("Error when parsing token from server: {e}"), })?; let mut addition_infos = HashMap::new(); @@ -142,26 +296,50 @@ impl CredentialsCache { addition_infos.insert(kv.key.clone(), kv.value.clone()); } - let cached = CachedToken { - access_key_id: credentials.access_key_id, - secret_access_key: credentials.access_key_secret, - security_token: credentials.security_token, - addition_infos, - cached_at: Instant::now(), - }; + let props = build_remote_fs_props(&credentials, &addition_infos); + log::debug!("Security token fetched successfully"); + + Ok((props, response.expiration_time)) + } + + /// Calculate the delay before next token renewal. + /// Uses the renewal ratio to refresh before actual expiration. + fn calculate_renewal_delay(expiration_time: i64, renewal_ratio: f64) -> Duration { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let time_until_expiry = expiration_time - now; + if time_until_expiry <= 0 { + // Token already expired, refresh immediately + return MIN_RENEWAL_DELAY; + } + + let delay_ms = (time_until_expiry as f64 * renewal_ratio) as u64; + let delay = Duration::from_millis(delay_ms); - let props = cached.to_remote_fs_props(); - *self.inner.write() = Some(cached); + log::debug!( + "Calculated renewal delay: {:?} (expiration: {}, now: {}, ratio: {})", + delay, + expiration_time, + now, + renewal_ratio + ); + + delay.max(MIN_RENEWAL_DELAY) + } +} - Ok(props) +impl Drop for SecurityTokenManager { + fn drop(&mut self) { + self.stop(); } } #[cfg(test)] mod tests { use super::*; - use crate::client::metadata::Metadata; - use crate::cluster::Cluster; #[test] fn convert_hadoop_key_to_opendal_maps_known_keys() { @@ -177,26 +355,55 @@ mod tests { assert!(convert_hadoop_key_to_opendal("unknown.key").is_none()); } - #[tokio::test] - async fn credentials_cache_returns_cached_props() -> Result<()> { - let cached = CachedToken { + #[test] + fn calculate_renewal_delay_returns_correct_delay() { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + // Token expires in 1 hour + let expiration = now + 3600 * 1000; + let delay = SecurityTokenManager::calculate_renewal_delay(expiration, 0.8); + + // Should be approximately 48 minutes (80% of 1 hour) + let expected_min = Duration::from_secs(2800); // ~46.7 minutes + let expected_max = Duration::from_secs(2900); // ~48.3 minutes + assert!( + delay >= expected_min && delay <= expected_max, + "Expected delay between {:?} and {:?}, got {:?}", + expected_min, + expected_max, + delay + ); + } + + #[test] + fn calculate_renewal_delay_handles_expired_token() { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + // Token already expired + let expiration = now - 1000; + let delay = SecurityTokenManager::calculate_renewal_delay(expiration, 0.8); + + // Should return minimum delay + assert_eq!(delay, MIN_RENEWAL_DELAY); + } + + #[test] + fn build_remote_fs_props_includes_all_fields() { + let credentials = Credentials { access_key_id: "ak".to_string(), - secret_access_key: "sk".to_string(), + access_key_secret: "sk".to_string(), security_token: Some("token".to_string()), - addition_infos: HashMap::from([( - "fs.s3a.path.style.access".to_string(), - "true".to_string(), - )]), - cached_at: Instant::now(), - }; - - let cache = CredentialsCache { - inner: RwLock::new(Some(cached)), - rpc_client: Arc::new(RpcClient::new()), - metadata: Arc::new(Metadata::new_for_test(Arc::new(Cluster::default()))), }; + let addition_infos = + HashMap::from([("fs.s3a.path.style.access".to_string(), "true".to_string())]); - let props = cache.get_or_refresh().await?; + let props = build_remote_fs_props(&credentials, &addition_infos); assert_eq!(props.get("access_key_id"), Some(&"ak".to_string())); assert_eq!(props.get("secret_access_key"), Some(&"sk".to_string())); assert_eq!(props.get("security_token"), Some(&"token".to_string())); @@ -204,6 +411,5 @@ mod tests { props.get("enable_virtual_host_style"), Some(&"false".to_string()) ); - Ok(()) } } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index c39056db..0359c5a7 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -14,11 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::client::credentials::CredentialsReceiver; use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use std::{ cmp::{Ordering, Reverse, min}, collections::{BinaryHeap, HashMap}, @@ -290,7 +291,7 @@ enum DownloadResult { /// Production implementation of RemoteLogFetcher that downloads from actual storage struct ProductionFetcher { - remote_fs_props: Arc>>, + credentials_rx: CredentialsReceiver, local_log_dir: Arc, } @@ -299,7 +300,7 @@ impl RemoteLogFetcher for ProductionFetcher { &self, request: &RemoteLogDownloadRequest, ) -> Pin> + Send>> { - let remote_fs_props = self.remote_fs_props.clone(); + let mut credentials_rx = self.credentials_rx.clone(); let local_log_dir = self.local_log_dir.clone(); // Clone data needed for async operation to avoid lifetime issues @@ -317,14 +318,29 @@ impl RemoteLogFetcher for ProductionFetcher { remote_log_tablet_dir, segment.segment_id, offset_prefix ); - let remote_fs_props_map = remote_fs_props.read().clone(); + // Get credentials from watch channel, waiting if not yet fetched + // - None = not yet fetched, wait + // - Some(props) = fetched (may be empty if no auth needed) + let remote_fs_props = { + let maybe_props = credentials_rx.borrow().clone(); + match maybe_props { + Some(props) => props, + None => { + // Credentials not yet fetched, wait for first update + log::info!("Waiting for credentials to be available..."); + let _ = credentials_rx.changed().await; + // After change, unwrap or use empty (should be Some now) + credentials_rx.borrow().clone().unwrap_or_default() + } + } + }; // Download file to disk (streaming, no memory spike) let file_path = RemoteLogDownloader::download_file( &remote_log_tablet_dir, &remote_path, &local_file_path, - &remote_fs_props_map, + &remote_fs_props, ) .await?; @@ -726,7 +742,6 @@ impl RemoteLogDownloadFuture { /// won't wait for completion. Pending futures will fail. pub struct RemoteLogDownloader { request_sender: Option>, - remote_fs_props: Option>>>, } impl RemoteLogDownloader { @@ -734,21 +749,17 @@ impl RemoteLogDownloader { local_log_dir: TempDir, max_prefetch_segments: usize, max_concurrent_downloads: usize, + credentials_rx: CredentialsReceiver, ) -> Result { - let remote_fs_props = Arc::new(RwLock::new(HashMap::new())); let fetcher = Arc::new(ProductionFetcher { - remote_fs_props: remote_fs_props.clone(), + credentials_rx, local_log_dir: Arc::new(local_log_dir), }); - let mut downloader = - Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)?; - downloader.remote_fs_props = Some(remote_fs_props); - Ok(downloader) + Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) } /// Create a RemoteLogDownloader with a custom fetcher (for testing). - /// The remote_fs_props will be None since custom fetchers typically don't need S3 credentials. pub fn new_with_fetcher( fetcher: Arc, max_prefetch_segments: usize, @@ -771,16 +782,9 @@ impl RemoteLogDownloader { Ok(Self { request_sender: Some(request_sender), - remote_fs_props: None, }) } - pub fn set_remote_fs_props(&self, props: HashMap) { - if let Some(ref remote_fs_props) = self.remote_fs_props { - *remote_fs_props.write() = props; - } - } - /// Request to fetch a remote log segment to local. This method is non-blocking. pub fn request_remote_log( &self, diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index dbebe1ac..356ba1cd 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -28,7 +28,7 @@ use tempfile::TempDir; use crate::TableId; use crate::client::connection::FlussConnection; -use crate::client::credentials::CredentialsCache; +use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, @@ -462,9 +462,10 @@ struct LogFetcher { read_context: ReadContext, remote_read_context: ReadContext, remote_log_downloader: Arc, - // todo: consider schedule a background thread to update - // token instead of update in fetch phase - credentials_cache: Arc, + /// Background security token manager for remote filesystem access. + /// Kept alive to run the background refresh task; stopped on drop. + #[allow(dead_code)] + security_token_manager: Arc, log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, } @@ -476,7 +477,6 @@ struct FetchResponseContext { read_context: ReadContext, remote_read_context: ReadContext, remote_log_downloader: Arc, - credentials_cache: Arc, } impl LogFetcher { @@ -497,6 +497,23 @@ impl LogFetcher { let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone())); + // Create security token manager for background token refresh + let security_token_manager = + Arc::new(SecurityTokenManager::new(conns.clone(), metadata.clone())); + + // Subscribe to credentials updates and pass to remote log downloader + let credentials_rx = security_token_manager.subscribe(); + + let remote_log_downloader = Arc::new(RemoteLogDownloader::new( + tmp_dir, + config.scanner_remote_log_prefetch_num, + config.scanner_remote_log_download_threads, + credentials_rx, + )?); + + // Start the background token refresh task + security_token_manager.start(); + Ok(LogFetcher { conns: conns.clone(), metadata: metadata.clone(), @@ -505,12 +522,8 @@ impl LogFetcher { log_scanner_status, read_context, remote_read_context, - remote_log_downloader: Arc::new(RemoteLogDownloader::new( - tmp_dir, - config.scanner_remote_log_prefetch_num, - config.scanner_remote_log_download_threads, - )?), - credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), + remote_log_downloader, + security_token_manager, log_fetch_buffer, nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), }) @@ -670,7 +683,6 @@ impl LogFetcher { let read_context = self.read_context.clone(); let remote_read_context = self.remote_read_context.clone(); let remote_log_downloader = Arc::clone(&self.remote_log_downloader); - let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); let response_context = FetchResponseContext { @@ -680,7 +692,6 @@ impl LogFetcher { read_context, remote_read_context, remote_log_downloader, - credentials_cache: creds_cache, }; // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. @@ -755,7 +766,6 @@ impl LogFetcher { read_context, remote_read_context, remote_log_downloader, - credentials_cache, } = context; for pb_fetch_log_resp in fetch_response.tables_resp { @@ -825,10 +835,7 @@ impl LogFetcher { // Check if this is a remote log fetch if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info { - // set remote fs props - let remote_fs_props = credentials_cache.get_or_refresh().await.unwrap(); - remote_log_downloader.set_remote_fs_props(remote_fs_props); - + // Remote fs props are already set by the background SecurityTokenManager let remote_fetch_info = RemoteLogFetchInfo::from_proto(remote_log_fetch_info, table_bucket.clone()); @@ -1649,7 +1656,6 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), - credentials_cache: fetcher.credentials_cache.clone(), }; LogFetcher::handle_fetch_response(response, response_context).await; @@ -1703,7 +1709,6 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), - credentials_cache: fetcher.credentials_cache.clone(), }; LogFetcher::handle_fetch_response(response, response_context).await; From 8bf34550144fea56df81116d48c7d19daf5afeb0 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 25 Jan 2026 21:43:55 +0800 Subject: [PATCH 2/4] address copilot comments --- crates/fluss/src/client/credentials.rs | 42 +++++++++++---------- crates/fluss/src/client/table/remote_log.rs | 26 +++++++++++-- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 4d5f108a..442b9d63 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -31,9 +31,11 @@ use tokio::task::JoinHandle; /// Default renewal time ratio - refresh at 80% of token lifetime const DEFAULT_TOKEN_RENEWAL_RATIO: f64 = 0.8; /// Default retry backoff when token fetch fails -const DEFAULT_RENEWAL_RETRY_BACKOFF: Duration = Duration::from_secs(60); +const DEFAULT_RENEWAL_RETRY_BACKOFF: Duration = Duration::from_secs(30); /// Minimum delay between refreshes const MIN_RENEWAL_DELAY: Duration = Duration::from_secs(1); +/// Maximum delay between refreshes (24 hours) - prevents overflow and ensures periodic refresh +const MAX_RENEWAL_DELAY: Duration = Duration::from_secs(24 * 60 * 60); /// Default refresh interval for tokens without expiration (never expires) const DEFAULT_NON_EXPIRING_REFRESH_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour @@ -196,9 +198,8 @@ impl SecurityTokenManager { if let Some(tx) = self.shutdown_tx.write().take() { let _ = tx.send(()); } - if let Some(handle) = self.task_handle.write().take() { - handle.abort(); - } + // Take and drop the task handle so the task can finish gracefully + let _ = self.task_handle.write().take(); info!("SecurityTokenManager stopped"); } @@ -221,7 +222,7 @@ impl SecurityTokenManager { Ok((props, expiration_time)) => { // Send credentials via watch channel (Some indicates fetched) if let Err(e) = credentials_tx.send(Some(props)) { - log::debug!("No active subscribers for credentials update: {:?}", e); + debug!("No active subscribers for credentials update: {:?}", e); } // Calculate next renewal delay based on expiration time @@ -229,7 +230,7 @@ impl SecurityTokenManager { Self::calculate_renewal_delay(exp_time, token_renewal_ratio) } else { // No expiration time - token never expires, use long refresh interval - log::info!( + info!( "Token has no expiration time (never expires), next refresh in {:?}", DEFAULT_NON_EXPIRING_REFRESH_INTERVAL ); @@ -237,16 +238,15 @@ impl SecurityTokenManager { } } Err(e) => { - log::warn!( + warn!( "Failed to obtain security token: {:?}, will retry in {:?}", - e, - renewal_retry_backoff + e, renewal_retry_backoff ); renewal_retry_backoff } }; - log::debug!("Next token refresh in {:?}", next_delay); + debug!("Next token refresh in {:?}", next_delay); // Wait for either the delay to elapse or shutdown signal tokio::select! { @@ -254,7 +254,7 @@ impl SecurityTokenManager { // Continue to next iteration to refresh } _ = &mut shutdown_rx => { - log::info!("Token refresh loop received shutdown signal"); + info!("Token refresh loop received shutdown signal"); break; } } @@ -282,7 +282,7 @@ impl SecurityTokenManager { // The token may be empty if remote filesystem doesn't require authentication if response.token.is_empty() { - log::info!("Empty token received, remote filesystem may not require authentication"); + info!("Empty token received, remote filesystem may not require authentication"); return Ok((HashMap::new(), response.expiration_time)); } @@ -297,13 +297,14 @@ impl SecurityTokenManager { } let props = build_remote_fs_props(&credentials, &addition_infos); - log::debug!("Security token fetched successfully"); + debug!("Security token fetched successfully"); Ok((props, response.expiration_time)) } /// Calculate the delay before next token renewal. /// Uses the renewal ratio to refresh before actual expiration. + /// Caps the delay to MAX_RENEWAL_DELAY to prevent overflow and ensure periodic refresh. fn calculate_renewal_delay(expiration_time: i64, renewal_ratio: f64) -> Duration { let now = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -316,18 +317,19 @@ impl SecurityTokenManager { return MIN_RENEWAL_DELAY; } - let delay_ms = (time_until_expiry as f64 * renewal_ratio) as u64; + // Cap time_until_expiry to prevent overflow when casting to f64 and back + let max_delay_ms = MAX_RENEWAL_DELAY.as_millis() as i64; + let capped_time = time_until_expiry.min(max_delay_ms); + + let delay_ms = (capped_time as f64 * renewal_ratio) as u64; let delay = Duration::from_millis(delay_ms); - log::debug!( + debug!( "Calculated renewal delay: {:?} (expiration: {}, now: {}, ratio: {})", - delay, - expiration_time, - now, - renewal_ratio + delay, expiration_time, now, renewal_ratio ); - delay.max(MIN_RENEWAL_DELAY) + delay.clamp(MIN_RENEWAL_DELAY, MAX_RENEWAL_DELAY) } } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 0359c5a7..c0817fff 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -328,9 +328,29 @@ impl RemoteLogFetcher for ProductionFetcher { None => { // Credentials not yet fetched, wait for first update log::info!("Waiting for credentials to be available..."); - let _ = credentials_rx.changed().await; - // After change, unwrap or use empty (should be Some now) - credentials_rx.borrow().clone().unwrap_or_default() + // If the sender side has been dropped (e.g. during shutdown), + // this will return an error. Surface that as a proper error + // instead of silently falling back to empty credentials. + if let Err(e) = credentials_rx.changed().await { + let io_err = io::Error::new( + io::ErrorKind::BrokenPipe, + format!( + "credentials manager shut down before credentials were obtained: {e}" + ), + ); + return Err(io_err.into()); + } + // After a successful change notification, credentials should be set. + // If they are still missing, treat this as an error instead of + // defaulting to an empty map (which could break auth flows). + credentials_rx + .borrow() + .clone() + .ok_or_else(|| Error::UnexpectedError { + message: "credentials not available after watch notification" + .to_string(), + source: None, + })? } } }; From 80b4168d864189f75935a81e7aaf45d1007ba4cb Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 25 Jan 2026 21:45:00 +0800 Subject: [PATCH 3/4] address copilot comments --- bindings/python/src/table.rs | 20 ++++---- crates/fluss/src/client/credentials.rs | 18 +++---- .../src/client/table/log_fetch_buffer.rs | 2 +- crates/fluss/src/client/table/remote_log.rs | 13 +++-- crates/fluss/src/client/table/upsert.rs | 6 +-- crates/fluss/src/metadata/json_serde.rs | 12 ++--- crates/fluss/src/metadata/partition.rs | 6 +-- crates/fluss/src/metadata/table.rs | 3 +- crates/fluss/src/record/arrow.rs | 32 ++++--------- crates/fluss/src/row/column.rs | 5 +- .../src/row/compacted/compacted_row_reader.rs | 5 +- crates/fluss/src/row/datum.rs | 48 +++++++------------ crates/fluss/src/row/decimal.rs | 15 ++---- 13 files changed, 64 insertions(+), 121 deletions(-) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b56a29db..0ae71864 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -259,13 +259,13 @@ impl AppendWriter { // Get the expected Arrow schema from the Fluss table let row_type = self.table_info.get_row_type(); let expected_schema = fcore::record::to_arrow_schema(row_type) - .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {}", e)))?; + .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {e}")))?; // Convert Arrow schema to PyArrow schema let py_schema = expected_schema .as_ref() .to_pyarrow(py) - .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {}", e)))?; + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; // Import pyarrow module let pyarrow = py.import("pyarrow")?; @@ -570,13 +570,12 @@ fn python_decimal_to_datum( let decimal_str: String = value.str()?.extract()?; let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| { - FlussError::new_err(format!("Failed to parse decimal '{}': {}", decimal_str, e)) + FlussError::new_err(format!("Failed to parse decimal '{decimal_str}': {e}")) })?; let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale).map_err(|e| { FlussError::new_err(format!( - "Failed to convert decimal '{}' to DECIMAL({}, {}): {}", - decimal_str, precision, scale, e + "Failed to convert decimal '{decimal_str}' to DECIMAL({precision}, {scale}): {e}" )) })?; @@ -641,10 +640,9 @@ fn python_time_to_datum(value: &Bound) -> PyResult) -> PyResult) -> PyResult { // Send credentials via watch channel (Some indicates fetched) if let Err(e) = credentials_tx.send(Some(props)) { - debug!("No active subscribers for credentials update: {:?}", e); + debug!("No active subscribers for credentials update: {e:?}"); } // Calculate next renewal delay based on expiration time @@ -231,22 +231,20 @@ impl SecurityTokenManager { } else { // No expiration time - token never expires, use long refresh interval info!( - "Token has no expiration time (never expires), next refresh in {:?}", - DEFAULT_NON_EXPIRING_REFRESH_INTERVAL + "Token has no expiration time (never expires), next refresh in {DEFAULT_NON_EXPIRING_REFRESH_INTERVAL:?}" ); DEFAULT_NON_EXPIRING_REFRESH_INTERVAL } } Err(e) => { warn!( - "Failed to obtain security token: {:?}, will retry in {:?}", - e, renewal_retry_backoff + "Failed to obtain security token: {e:?}, will retry in {renewal_retry_backoff:?}" ); renewal_retry_backoff } }; - debug!("Next token refresh in {:?}", next_delay); + debug!("Next token refresh in {next_delay:?}"); // Wait for either the delay to elapse or shutdown signal tokio::select! { @@ -325,8 +323,7 @@ impl SecurityTokenManager { let delay = Duration::from_millis(delay_ms); debug!( - "Calculated renewal delay: {:?} (expiration: {}, now: {}, ratio: {})", - delay, expiration_time, now, renewal_ratio + "Calculated renewal delay: {delay:?} (expiration: {expiration_time}, now: {now}, ratio: {renewal_ratio})" ); delay.clamp(MIN_RENEWAL_DELAY, MAX_RENEWAL_DELAY) @@ -373,10 +370,7 @@ mod tests { let expected_max = Duration::from_secs(2900); // ~48.3 minutes assert!( delay >= expected_min && delay <= expected_max, - "Expected delay between {:?} and {:?}, got {:?}", - expected_min, - expected_max, - delay + "Expected delay between {expected_min:?} and {expected_max:?}, got {delay:?}" ); } diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index b529806f..2fd32948 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -800,7 +800,7 @@ impl PendingFetch for RemotePendingFetch { let pos = self.pos_in_log_segment as usize; if pos >= file_size { return Err(Error::UnexpectedError { - message: format!("Position {} exceeds file size {}", pos, file_size), + message: format!("Position {pos} exceeds file size {file_size}"), source: None, }); } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index c0817fff..b4553758 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -527,8 +527,7 @@ async fn spawn_download_task( DownloadResult::FailedPermanently { error: Error::UnexpectedError { message: format!( - "Failed to download remote log segment after {} retries: {}", - retry_count, e + "Failed to download remote log segment after {retry_count} retries: {e}" ), source: Some(Box::new(e)), }, @@ -621,7 +620,7 @@ async fn coordinator_loop( // Cancelled - permit already released, nothing to do } Err(e) => { - log::error!("Download task panicked: {:?}", e); + log::error!("Download task panicked: {e:?}"); // Permit already released via RAII } } @@ -1025,7 +1024,7 @@ mod tests { if should_fail { Err(Error::UnexpectedError { - message: format!("Fake fetch failed for {}", segment_id), + message: format!("Fake fetch failed for {segment_id}"), source: None, }) } else { @@ -1036,7 +1035,7 @@ mod tests { .unwrap() .as_nanos(); let file_path = - temp_dir.join(format!("fake_segment_{}_{}.log", segment_id, timestamp)); + temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log")); tokio::fs::write(&file_path, &fake_data).await?; Ok(FetchResult { @@ -1145,7 +1144,7 @@ mod tests { // Request 4 segments with same priority (to isolate concurrency limiting from priority) let segs: Vec<_> = (0..4) - .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone())) .collect(); let _futures: Vec<_> = segs @@ -1192,7 +1191,7 @@ mod tests { // Request 4 downloads let segs: Vec<_> = (0..4) - .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone())) .collect(); let mut futures: Vec<_> = segs diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index a3909e72..984592d0 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -232,8 +232,7 @@ impl UpsertWriterFactory { None => { return Err(IllegalArgument { message: format!( - "The specified primary key {} is not in row type {}", - primary_key, row_type + "The specified primary key {primary_key} is not in row type {row_type}" ), }); } @@ -250,8 +249,7 @@ impl UpsertWriterFactory { if target_column_set[index] { return Err(IllegalArgument { message: format!( - "Explicitly specifying values for the auto increment column {} is not allowed.", - auto_increment_col_name + "Explicitly specifying values for the auto increment column {auto_increment_col_name} is not allowed." ), }); } diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs index faa5583b..d0d56ef2 100644 --- a/crates/fluss/src/metadata/json_serde.rs +++ b/crates/fluss/src/metadata/json_serde.rs @@ -205,7 +205,7 @@ impl JsonSerde for DataType { DataType::Decimal( crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale) .map_err(|e| Error::JsonSerdeError { - message: format!("Invalid DECIMAL parameters: {}", e), + message: format!("Invalid DECIMAL parameters: {e}"), })?, ) } @@ -218,7 +218,7 @@ impl JsonSerde for DataType { DataType::Time( crate::metadata::datatype::TimeType::with_nullable(true, precision).map_err( |e| Error::JsonSerdeError { - message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {}", e), + message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {e}"), }, )?, ) @@ -231,10 +231,7 @@ impl JsonSerde for DataType { DataType::Timestamp( crate::metadata::datatype::TimestampType::with_nullable(true, precision) .map_err(|e| Error::JsonSerdeError { - message: format!( - "Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {}", - e - ), + message: format!("Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {e}"), })?, ) } @@ -247,8 +244,7 @@ impl JsonSerde for DataType { crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision) .map_err(|e| Error::JsonSerdeError { message: format!( - "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {}", - e + "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {e}" ), })?, ) diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 1ecc0dcd..e40fbf9e 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -131,8 +131,7 @@ impl ResolvedPartitionSpec { if parts.len() != 2 { return Err(Error::IllegalArgument { message: format!( - "Invalid partition name format. Expected key=value, got: {}", - pair + "Invalid partition name format. Expected key=value, got: {pair}" ), }); } @@ -199,8 +198,7 @@ impl ResolvedPartitionSpec { None => { return Err(Error::IllegalArgument { message: format!( - "table does not contain partitionKey: {}", - other_partition_key + "table does not contain partitionKey: {other_partition_key}" ), }); } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index c4a91954..3b9da7d9 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -227,8 +227,7 @@ impl SchemaBuilder { if !column_names.contains(auto_inc_col) { return Err(IllegalArgument { message: format!( - "Auto increment column '{}' is not found in the schema columns.", - auto_inc_col + "Auto increment column '{auto_inc_col}' is not found in the schema columns." ), }); } diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 4bfdc71c..b2bc73b0 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -107,7 +107,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { // Check for negative size (corrupted data) if batch_size_bytes < 0 { return Err(Error::UnexpectedError { - message: format!("Invalid negative batch size: {}", batch_size_bytes), + message: format!("Invalid negative batch size: {batch_size_bytes}"), source: None, }); } @@ -120,8 +120,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { .checked_add(LOG_OVERHEAD) .ok_or_else(|| Error::UnexpectedError { message: format!( - "Batch size {} + LOG_OVERHEAD {} would overflow", - batch_size_u, LOG_OVERHEAD + "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD} would overflow" ), source: None, })?; @@ -130,8 +129,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { if total_size > MAX_BATCH_SIZE { return Err(Error::UnexpectedError { message: format!( - "Batch size {} exceeds maximum allowed size {}", - total_size, MAX_BATCH_SIZE + "Batch size {total_size} exceeds maximum allowed size {MAX_BATCH_SIZE}" ), source: None, }); @@ -259,8 +257,7 @@ impl RowAppendRecordBatchBuilder { .with_precision_and_scale(*precision, *scale) .map_err(|e| Error::IllegalArgument { message: format!( - "Invalid decimal precision {} or scale {}: {}", - precision, scale, e + "Invalid decimal precision {precision} or scale {scale}: {e}" ), })?; Ok(Box::new(builder)) @@ -273,8 +270,7 @@ impl RowAppendRecordBatchBuilder { } _ => Err(Error::IllegalArgument { message: format!( - "Time32 only supports Second and Millisecond units, got: {:?}", - unit + "Time32 only supports Second and Millisecond units, got: {unit:?}" ), }), }, @@ -285,8 +281,7 @@ impl RowAppendRecordBatchBuilder { arrow_schema::TimeUnit::Nanosecond => Ok(Box::new(Time64NanosecondBuilder::new())), _ => Err(Error::IllegalArgument { message: format!( - "Time64 only supports Microsecond and Nanosecond units, got: {:?}", - unit + "Time64 only supports Microsecond and Nanosecond units, got: {unit:?}" ), }), }, @@ -592,10 +587,7 @@ impl FileSource { // Validate base_offset to prevent underflow in total_size() if base_offset > file_size { return Err(Error::UnexpectedError { - message: format!( - "base_offset ({}) exceeds file_size ({})", - base_offset, file_size - ), + message: format!("base_offset ({base_offset}) exceeds file_size ({file_size})"), source: None, }); } @@ -1044,7 +1036,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { 7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond), invalid => { return Err(Error::IllegalArgument { - message: format!("Invalid precision {} for TimeType (must be 0-9)", invalid), + message: format!("Invalid precision {invalid} for TimeType (must be 0-9)"), }); } }, @@ -1055,10 +1047,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { 7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None), invalid => { return Err(Error::IllegalArgument { - message: format!( - "Invalid precision {} for TimestampType (must be 0-9)", - invalid - ), + message: format!("Invalid precision {invalid} for TimestampType (must be 0-9)"), }); } }, @@ -1070,8 +1059,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { invalid => { return Err(Error::IllegalArgument { message: format!( - "Invalid precision {} for TimestampLTzType (must be 0-9)", - invalid + "Invalid precision {invalid} for TimestampLTzType (must be 0-9)" ), }); } diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 615e0384..46c25b24 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -199,10 +199,7 @@ impl InternalRow for ColumnarRow { let field = schema.field(pos); let arrow_scale = match field.data_type() { DataType::Decimal128(_p, s) => *s as i64, - dt => panic!( - "Expected Decimal128 data type at column {}, found: {:?}", - pos, dt - ), + dt => panic!("Expected Decimal128 data type at column {pos}, found: {dt:?}"), }; let i128_val = array.value(self.row_id); diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 40470db1..8fe683bc 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -161,10 +161,7 @@ impl<'a> CompactedRowDeserializer<'a> { } } _ => { - panic!( - "Unsupported DataType in CompactedRowDeserializer: {:?}", - dtype - ); + panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); } }; cursor = next_cursor; diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 7b3850f8..b8083730 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -407,8 +407,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result { .checked_mul(MICROS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp milliseconds {} overflows when converting to microseconds", - millis + "Timestamp milliseconds {millis} overflows when converting to microseconds" ), })?; let nanos_micros = (nanos as i64) / MICROS_PER_MILLI; @@ -416,8 +415,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result { .checked_add(nanos_micros) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp overflow when adding microseconds: {} + {}", - millis_micros, nanos_micros + "Timestamp overflow when adding microseconds: {millis_micros} + {nanos_micros}" ), }) } @@ -429,16 +427,14 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result { .checked_mul(NANOS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp milliseconds {} overflows when converting to nanoseconds", - millis + "Timestamp milliseconds {millis} overflows when converting to nanoseconds" ), })?; millis_nanos .checked_add(nanos as i64) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp overflow when adding nanoseconds: {} + {}", - millis_nanos, nanos + "Timestamp overflow when adding nanoseconds: {millis_nanos} + {nanos}" ), }) } @@ -504,10 +500,7 @@ impl Datum<'_> { arrow_schema::DataType::Decimal128(p, s) => (*p, *s), _ => { return Err(RowConvertError { - message: format!( - "Expected Decimal128 Arrow type, got: {:?}", - data_type - ), + message: format!("Expected Decimal128 Arrow type, got: {data_type:?}"), }); } }; @@ -515,7 +508,7 @@ impl Datum<'_> { // Validate scale is non-negative (Fluss doesn't support negative scales) if s < 0 { return Err(RowConvertError { - message: format!("Negative decimal scale {} is not supported", s), + message: format!("Negative decimal scale {s} is not supported"), }); } @@ -535,8 +528,7 @@ impl Datum<'_> { if actual_precision > target_precision as usize { return Err(RowConvertError { message: format!( - "Decimal precision overflow: value has {} digits but Arrow expects {} (value: {})", - actual_precision, target_precision, rescaled + "Decimal precision overflow: value has {actual_precision} digits but Arrow expects {target_precision} (value: {rescaled})" ), }); } @@ -546,7 +538,7 @@ impl Datum<'_> { Ok(v) => v, Err(_) => { return Err(RowConvertError { - message: format!("Decimal value exceeds i128 range: {}", rescaled), + message: format!("Decimal value exceeds i128 range: {rescaled}"), }); } }; @@ -575,8 +567,7 @@ impl Datum<'_> { if millis % MILLIS_PER_SECOND as i32 != 0 { return Err(RowConvertError { message: format!( - "Time value {} ms has sub-second precision but schema expects seconds only", - millis + "Time value {millis} ms has sub-second precision but schema expects seconds only" ), }); } @@ -602,8 +593,7 @@ impl Datum<'_> { .checked_mul(MICROS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Time value {} ms overflows when converting to microseconds", - millis + "Time value {millis} ms overflows when converting to microseconds" ), })?; b.append_value(micros); @@ -618,8 +608,7 @@ impl Datum<'_> { let nanos = (millis as i64).checked_mul(NANOS_PER_MILLI).ok_or_else( || RowConvertError { message: format!( - "Time value {} ms overflows when converting to nanoseconds", - millis + "Time value {millis} ms overflows when converting to nanoseconds" ), }, )?; @@ -630,8 +619,7 @@ impl Datum<'_> { _ => { return Err(RowConvertError { message: format!( - "Expected Time32/Time64 Arrow type, got: {:?}", - data_type + "Expected Time32/Time64 Arrow type, got: {data_type:?}" ), }); } @@ -808,8 +796,7 @@ impl TimestampNtz { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( - "nanoOfMillisecond must be in range [0, {}], got: {}", - MAX_NANO_OF_MILLISECOND, nano_of_millisecond + "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}" ), }); } @@ -856,8 +843,7 @@ impl TimestampLtz { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( - "nanoOfMillisecond must be in range [0, {}], got: {}", - MAX_NANO_OF_MILLISECOND, nano_of_millisecond + "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}" ), }); } @@ -1030,10 +1016,8 @@ mod timestamp_tests { #[test] fn test_timestamp_nanos_out_of_range() { // Test that both TimestampNtz and TimestampLtz reject invalid nanos - let expected_msg = format!( - "nanoOfMillisecond must be in range [0, {}]", - MAX_NANO_OF_MILLISECOND - ); + let expected_msg = + format!("nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}]"); // Too large (1,000,000 is just beyond the valid range) let result_ntz = TimestampNtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND + 1); diff --git a/crates/fluss/src/row/decimal.rs b/crates/fluss/src/row/decimal.rs index b14bde50..fd21b829 100644 --- a/crates/fluss/src/row/decimal.rs +++ b/crates/fluss/src/row/decimal.rs @@ -129,16 +129,14 @@ impl Decimal { // Sanity check that scale matches debug_assert_eq!( exp, scale as i64, - "Scaled decimal exponent ({}) != expected scale ({})", - exp, scale + "Scaled decimal exponent ({exp}) != expected scale ({scale})" ); let actual_precision = Self::compute_precision(&unscaled); if actual_precision > precision as usize { return Err(Error::IllegalArgument { message: format!( - "Decimal precision overflow: value has {} digits but precision is {} (value: {})", - actual_precision, precision, scaled + "Decimal precision overflow: value has {actual_precision} digits but precision is {precision} (value: {scaled})" ), }); } @@ -147,8 +145,7 @@ impl Decimal { let long_val = if precision <= MAX_COMPACT_PRECISION { Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument { message: format!( - "Decimal mantissa exceeds i64 range for compact precision {}: unscaled={} (value={})", - precision, unscaled, scaled + "Decimal mantissa exceeds i64 range for compact precision {precision}: unscaled={unscaled} (value={scaled})" ), })?) } else { @@ -168,8 +165,7 @@ impl Decimal { if precision > MAX_COMPACT_PRECISION { return Err(Error::IllegalArgument { message: format!( - "Precision {} exceeds MAX_COMPACT_PRECISION ({})", - precision, MAX_COMPACT_PRECISION + "Precision {precision} exceeds MAX_COMPACT_PRECISION ({MAX_COMPACT_PRECISION})" ), }); } @@ -178,8 +174,7 @@ impl Decimal { if actual_precision > precision as usize { return Err(Error::IllegalArgument { message: format!( - "Decimal precision overflow: unscaled value has {} digits but precision is {}", - actual_precision, precision + "Decimal precision overflow: unscaled value has {actual_precision} digits but precision is {precision}" ), }); } From 4c0e3da37d38dc1d399c5444942796fd18078393 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 26 Jan 2026 19:06:49 +0800 Subject: [PATCH 4/4] minor fix --- crates/fluss/src/client/credentials.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 30e44cbf..93a53669 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -34,10 +34,10 @@ const DEFAULT_TOKEN_RENEWAL_RATIO: f64 = 0.8; const DEFAULT_RENEWAL_RETRY_BACKOFF: Duration = Duration::from_secs(30); /// Minimum delay between refreshes const MIN_RENEWAL_DELAY: Duration = Duration::from_secs(1); -/// Maximum delay between refreshes (24 hours) - prevents overflow and ensures periodic refresh -const MAX_RENEWAL_DELAY: Duration = Duration::from_secs(24 * 60 * 60); +/// Maximum delay between refreshes (7 days) - prevents overflow and ensures periodic refresh +const MAX_RENEWAL_DELAY: Duration = Duration::from_secs(7 * 24 * 60 * 60); /// Default refresh interval for tokens without expiration (never expires) -const DEFAULT_NON_EXPIRING_REFRESH_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour +const DEFAULT_NON_EXPIRING_REFRESH_INTERVAL: Duration = Duration::from_secs(7 * 24 * 60 * 60); // 7 day /// Type alias for credentials properties receiver /// - `None` = not yet fetched, should wait