From 40acf7cb411f878bc76b7dc415085ab3be55e964 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 24 Jan 2026 19:05:47 +0000 Subject: [PATCH 1/9] Partition getter and utils --- .../src/client/table/partition_getter.rs | 172 ++++- crates/fluss/src/error.rs | 6 + crates/fluss/src/metadata/table.rs | 110 +++ crates/fluss/src/util/mod.rs | 1 + crates/fluss/src/util/partition.rs | 643 ++++++++++++++++++ 5 files changed, 919 insertions(+), 13 deletions(-) create mode 100644 crates/fluss/src/util/partition.rs diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 4529d868..55b1ec7c 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -17,40 +17,186 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{DataType, RowType}; +use crate::metadata::{DataType, ResolvedPartitionSpec, RowType}; +use crate::row::InternalRow; use crate::row::field_getter::FieldGetter; +use crate::util::partition; +/// A getter to get partition name from a row. #[allow(dead_code)] -pub struct PartitionGetter<'a> { - partitions: Vec<(&'a String, &'a DataType, FieldGetter)>, +pub struct PartitionGetter { + partition_keys: Vec, + partitions: Vec<(DataType, FieldGetter)>, } #[allow(dead_code)] -impl<'a> PartitionGetter<'a> { - pub fn new(row_type: &'a RowType, partition_keys: &'a Vec) -> Result { +impl PartitionGetter { + pub fn new(row_type: &RowType, partition_keys: Vec) -> Result { let mut partitions = Vec::with_capacity(partition_keys.len()); - for partition_key in partition_keys { + for partition_key in &partition_keys { if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) { - let data_type = &row_type + let data_type = row_type .fields() .get(partition_col_index) .unwrap() - .data_type; - let field_getter = FieldGetter::create(data_type, partition_col_index); + .data_type + .clone(); + let field_getter = FieldGetter::create(&data_type, partition_col_index); - partitions.push((partition_key, data_type, field_getter)); + partitions.push((data_type, field_getter)); } else { return Err(IllegalArgument { message: format!( - "The partition column {partition_key} is not in the row {row_type}." + "The partition column {} is not in the row {}.", + partition_key, row_type ), }); }; } - Ok(Self { partitions }) + Ok(Self { + partition_keys, + partitions, + }) } - // TODO Implement get partition + pub fn get_partition(&self, row: &dyn InternalRow) -> Result { + self.get_partition_spec(row) + .map(|ps| ps.get_partition_name()) + } + + pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result { + let mut partition_values = Vec::with_capacity(self.partitions.len()); + + for (data_type, field_getter) in &self.partitions { + let value = field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Partition value shouldn't be null.".to_string(), + }); + } + partition_values.push(partition::convert_value_to_string(&value, data_type)); + } + + ResolvedPartitionSpec::new(self.partition_keys.clone(), partition_values) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataField, IntType, StringType}; + use crate::row::{Datum, GenericRow}; + + #[test] + fn test_partition_getter_single_key() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = + PartitionGetter::new(&row_type, vec!["region".to_string()]).expect("should succeed"); + + let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::from("US")]); + let partition_name = getter.get_partition(&row).expect("should succeed"); + assert_eq!(partition_name, "US"); + } + + #[test] + fn test_partition_getter_multiple_keys() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "date".to_string(), + DataType::String(StringType::new()), + None, + ), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = + PartitionGetter::new(&row_type, vec!["date".to_string(), "region".to_string()]) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![ + Datum::Int32(42), + Datum::from("2024-01-15"), + Datum::from("US"), + ]); + let partition_name = getter.get_partition(&row).expect("should succeed"); + assert_eq!(partition_name, "2024-01-15$US"); + } + + #[test] + fn test_partition_getter_invalid_column() { + let row_type = RowType::new(vec![DataField::new( + "id".to_string(), + DataType::Int(IntType::new()), + None, + )]); + + let result = PartitionGetter::new(&row_type, vec!["nonexistent".to_string()]); + assert!(result.is_err()); + } + + #[test] + fn test_partition_getter_null_value() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = + PartitionGetter::new(&row_type, vec!["region".to_string()]).expect("should succeed"); + + let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]); + let result = getter.get_partition(&row); + assert!(result.is_err()); + } + + #[test] + fn test_get_partition_spec() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "date".to_string(), + DataType::String(StringType::new()), + None, + ), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = + PartitionGetter::new(&row_type, vec!["date".to_string(), "region".to_string()]) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![ + Datum::Int32(42), + Datum::from("2024-01-15"), + Datum::from("US"), + ]); + let spec = getter.get_partition_spec(&row).expect("should succeed"); + + assert_eq!(spec.get_partition_keys(), &["date", "region"]); + assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); + assert_eq!(spec.get_partition_name(), "2024-01-15$US"); + } } diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index 368d8abc..68426d7c 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -93,6 +93,12 @@ pub enum Error { )] IllegalArgument { message: String }, + #[snafu( + visibility(pub(crate)), + display("Fluss hitting invalid partition error {}.", message) + )] + InvalidPartition { message: String }, + #[snafu( visibility(pub(crate)), display("Fluss hitting IO not supported error {}.", message) diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index c4a91954..9b43166c 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -679,6 +679,10 @@ impl Display for TablePath { } } +const MAX_NAME_LENGTH: usize = 200; + +const INTERNAL_NAME_PREFIX: &str = "__"; + impl TablePath { pub fn new(db: String, tbl: String) -> Self { TablePath { @@ -696,6 +700,52 @@ impl TablePath { pub fn table(&self) -> &str { &self.table } + + pub fn detect_invalid_name(identifier: &str) -> Option { + if identifier.is_empty() { + return Some("the empty string is not allowed".to_string()); + } + if identifier == "." { + return Some("'.' is not allowed".to_string()); + } + if identifier == ".." { + return Some("'..' is not allowed".to_string()); + } + if identifier.len() > MAX_NAME_LENGTH { + return Some(format!( + "the length of '{}' is longer than the max allowed length {}", + identifier, MAX_NAME_LENGTH + )); + } + if Self::contains_invalid_pattern(identifier) { + return Some(format!( + "'{}' contains one or more characters other than ASCII alphanumerics, '_' and '-'", + identifier + )); + } + None + } + + pub fn validate_prefix(identifier: &str) -> Option { + if identifier.starts_with(INTERNAL_NAME_PREFIX) { + return Some(format!( + "'{}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server", + INTERNAL_NAME_PREFIX + )); + } + None + } + + // Valid characters for Fluss table names are the ASCII alphanumerics, '_' and '-'. + fn contains_invalid_pattern(identifier: &str) -> bool { + for c in identifier.chars() { + let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-'; + if !valid_char { + return true; + } + } + false + } } /// A database name, table name and partition name combo. It's used to represent the physical path of @@ -1107,3 +1157,63 @@ impl LakeSnapshot { &self.table_buckets_offset } } + +/// Tests for [`TablePath`]. +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validate() { + // assert valid name + let path = TablePath::new("db_2-abc3".to_string(), "table-1_abc_2".to_string()); + assert!(TablePath::detect_invalid_name(path.database()).is_none()); + assert!(TablePath::detect_invalid_name(path.table()).is_none()); + assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2"); + + // assert invalid name prefix + assert!( + TablePath::validate_prefix("__table-1") + .unwrap() + .contains("'__' is not allowed as prefix") + ); + + // check max length + let long_name = "a".repeat(200); + assert!(TablePath::detect_invalid_name(&long_name).is_none()); + + // assert invalid names + assert_invalid_name("*abc", "'*abc' contains one or more characters other than"); + assert_invalid_name( + "table.abc", + "'table.abc' contains one or more characters other than", + ); + assert_invalid_name("", "the empty string is not allowed"); + assert_invalid_name(" ", "' ' contains one or more characters other than"); + assert_invalid_name(".", "'.' is not allowed"); + assert_invalid_name("..", "'..' is not allowed"); + let invalid_long_name = "a".repeat(201); + assert_invalid_name( + &invalid_long_name, + &format!( + "the length of '{}' is longer than the max allowed length {}", + invalid_long_name, MAX_NAME_LENGTH + ), + ); + } + + fn assert_invalid_name(name: &str, expected_message: &str) { + let result = TablePath::detect_invalid_name(name); + assert!( + result.is_some(), + "Expected '{}' to be invalid, but it was valid", + name + ); + assert!( + result.as_ref().unwrap().contains(expected_message), + "Expected message containing '{}', but got '{}'", + expected_message, + result.unwrap() + ); + } +} diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 3760487d..b987fe25 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod murmur_hash; +pub mod partition; pub mod varint; use crate::TableId; diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs new file mode 100644 index 00000000..fcf4f9fd --- /dev/null +++ b/crates/fluss/src/util/partition.rs @@ -0,0 +1,643 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utils for partition. + +#![allow(dead_code)] + +use crate::error::{Error, Result}; +use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, TablePath}; +use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; +use jiff::ToSpan; +use jiff::Zoned; +use jiff::civil::DateTime; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AutoPartitionTimeUnit { + Year, + Quarter, + Month, + Day, + Hour, +} + +pub fn validate_partition_spec( + table_path: &TablePath, + partition_keys: &[String], + partition_spec: &PartitionSpec, + is_create: bool, +) -> Result<()> { + let partition_spec_map = partition_spec.get_spec_map(); + if partition_keys.len() != partition_spec_map.len() { + return Err(Error::InvalidPartition { + message: format!( + "PartitionSpec size is not equal to partition keys size for partitioned table {}.", + table_path + ), + }); + } + + let mut reordered_partition_values: Vec<&str> = Vec::with_capacity(partition_keys.len()); + for partition_key in partition_keys { + if let Some(value) = partition_spec_map.get(partition_key) { + reordered_partition_values.push(value); + } else { + return Err(Error::InvalidPartition { + message: format!( + "PartitionSpec {} does not contain partition key '{}' for partitioned table {}.", + partition_spec, partition_key, table_path + ), + }); + } + } + + validate_partition_values(&reordered_partition_values, is_create) +} + +fn validate_partition_values(partition_values: &[&str], is_create: bool) -> Result<()> { + for value in partition_values { + let invalid_name_error = TablePath::detect_invalid_name(value); + let prefix_error = if is_create { + TablePath::validate_prefix(value) + } else { + None + }; + + if invalid_name_error.is_some() || prefix_error.is_some() { + let error_msg = invalid_name_error.unwrap_or_else(|| prefix_error.unwrap()); + return Err(Error::InvalidPartition { + message: format!("The partition value {} is invalid: {}", value, error_msg), + }); + } + } + Ok(()) +} + +/// Generate [`ResolvedPartitionSpec`] for auto partition in server. When we auto creating a +/// partition, we need to first generate a [`ResolvedPartitionSpec`]. +/// +/// The value is the formatted time with the specified time unit. +pub fn generate_auto_partition( + partition_keys: Vec, + current: &Zoned, + offset: i32, + time_unit: AutoPartitionTimeUnit, +) -> ResolvedPartitionSpec { + let auto_partition_field_spec = generate_auto_partition_time(current, offset, time_unit); + ResolvedPartitionSpec::from_partition_name(partition_keys, &auto_partition_field_spec) +} + +pub fn generate_auto_partition_time( + current: &Zoned, + offset: i32, + time_unit: AutoPartitionTimeUnit, +) -> String { + match time_unit { + AutoPartitionTimeUnit::Year => { + let adjusted = current + .checked_add(jiff::Span::new().years(offset)) + .expect("year overflow"); + format!("{}", adjusted.year()) + } + AutoPartitionTimeUnit::Quarter => { + let adjusted = current + .checked_add(jiff::Span::new().months(offset * 3)) + .expect("quarter overflow"); + let quarter = (adjusted.month() as i32 - 1) / 3 + 1; + format!("{}{}", adjusted.year(), quarter) + } + AutoPartitionTimeUnit::Month => { + let adjusted = current + .checked_add(jiff::Span::new().months(offset)) + .expect("month overflow"); + format!("{}{:02}", adjusted.year(), adjusted.month()) + } + AutoPartitionTimeUnit::Day => { + let adjusted = current + .checked_add(jiff::Span::new().days(offset)) + .expect("day overflow"); + format!( + "{}{:02}{:02}", + adjusted.year(), + adjusted.month(), + adjusted.day() + ) + } + AutoPartitionTimeUnit::Hour => { + let adjusted = current + .checked_add(jiff::Span::new().hours(offset)) + .expect("hour overflow"); + format!( + "{}{:02}{:02}{:02}", + adjusted.year(), + adjusted.month(), + adjusted.day(), + adjusted.hour() + ) + } + } +} + +fn hex_string(bytes: &[u8]) -> String { + let mut hex = String::with_capacity(bytes.len() * 2); + for &b in bytes { + let h = format!("{:x}", b); + if h.len() == 1 { + hex.push('0'); + } + hex.push_str(&h); + } + hex +} + +fn reformat_float(value: f32) -> String { + if value.is_nan() { + "NaN".to_string() + } else if value.is_infinite() { + if value > 0.0 { + "Inf".to_string() + } else { + "-Inf".to_string() + } + } else { + value.to_string().replace('.', "_") + } +} + +fn reformat_double(value: f64) -> String { + if value.is_nan() { + "NaN".to_string() + } else if value.is_infinite() { + if value > 0.0 { + "Inf".to_string() + } else { + "-Inf".to_string() + } + } else { + value.to_string().replace('.', "_") + } +} + +const UNIX_EPOCH_DATE: jiff::civil::Date = jiff::civil::date(1970, 1, 1); + +fn day_to_string(days: i32) -> String { + let date = UNIX_EPOCH_DATE + days.days(); + format!("{:04}-{:02}-{:02}", date.year(), date.month(), date.day()) +} + +fn date_to_string(date: Date) -> String { + day_to_string(date.get_inner()) +} + +const NANOS_PER_MILLIS: i64 = 1_000_000; +const MILLIS_PER_SECOND: i64 = 1_000; +const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND; +const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE; + +fn milli_to_string(milli: i32) -> String { + let hour = milli.div_euclid(MILLIS_PER_HOUR as i32); + let min = milli + .rem_euclid(MILLIS_PER_HOUR as i32) + .div_euclid(MILLIS_PER_MINUTE as i32); + let sec = milli + .rem_euclid(MILLIS_PER_MINUTE as i32) + .div_euclid(MILLIS_PER_SECOND as i32); + let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32); + + format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms) +} + +fn time_to_string(time: Time) -> String { + milli_to_string(time.get_inner()) +} + +/// Always add nanoseconds whether TimestampNtz and TimestampLtz are compact or not. +fn timestamp_ntz_to_string(ts: TimestampNtz) -> String { + let millis = ts.get_millisecond(); + let nano_of_milli = ts.get_nano_of_millisecond(); + + let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + (nano_of_milli as i64); + let total_secs = millis / MILLIS_PER_SECOND; + + let epoch = jiff::Timestamp::UNIX_EPOCH; + let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); + let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime(); + + format_date_time(total_nanos, dt) +} + +fn timestamp_ltz_to_string(ts: TimestampLtz) -> String { + let millis = ts.get_epoch_millisecond(); + let nano_of_milli = ts.get_nano_of_millisecond(); + + let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + (nano_of_milli as i64); + let total_secs = millis / MILLIS_PER_SECOND; + + let epoch = jiff::Timestamp::UNIX_EPOCH; + let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); + let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime(); + + format_date_time(total_nanos, dt) +} + +fn format_date_time(total_nanos: i64, dt: DateTime) -> String { + if total_nanos > 0 { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{}", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + total_nanos + ) + } else { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + ) + } +} + +/// Converts a Datum value to its string representation for partition naming. +pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> String { + match (value, data_type) { + (Datum::String(s), DataType::Char(_) | DataType::String(_)) => s.to_string(), + (Datum::Bool(b), DataType::Boolean(_)) => b.to_string(), + (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => hex_string(bytes), + (Datum::Int8(v), DataType::TinyInt(_)) => v.to_string(), + (Datum::Int16(v), DataType::SmallInt(_)) => v.to_string(), + (Datum::Int32(v), DataType::Int(_)) => v.to_string(), + (Datum::Int64(v), DataType::BigInt(_)) => v.to_string(), + (Datum::Date(d), DataType::Date(_)) => date_to_string(*d), + (Datum::Time(t), DataType::Time(_)) => time_to_string(*t), + (Datum::Float32(f), DataType::Float(_)) => reformat_float(f.into_inner()), + (Datum::Float64(f), DataType::Double(_)) => reformat_double(f.into_inner()), + (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => timestamp_ltz_to_string(*ts), + (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => timestamp_ntz_to_string(*ts), + _ => panic!( + "Unsupported data type for partition key: {:?}, value: {:?}", + data_type, value + ), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{ + BigIntType, BinaryType, BooleanType, BytesType, CharType, DateType, DoubleType, FloatType, + IntType, SmallIntType, StringType, TimeType, TimestampLTzType, TimestampType, TinyIntType, + }; + use crate::row::{Date, Time, TimestampLtz, TimestampNtz}; + use std::borrow::Cow; + + use crate::metadata::TablePath; + + #[test] + fn test_validate_partition_values() { + // Test invalid character '$' + let result = validate_partition_values(&["$1", "2"], true); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("The partition value $1 is invalid")); + assert!(err_msg.contains( + "'$1' contains one or more characters other than ASCII alphanumerics, '_' and '-'" + )); + + // Test invalid character '?' + let result = validate_partition_values(&["?1", "2"], false); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("The partition value ?1 is invalid")); + assert!(err_msg.contains( + "'?1' contains one or more characters other than ASCII alphanumerics, '_' and '-'" + )); + + // Test reserved prefix '__' with is_create=true + let result = validate_partition_values(&["__p1", "2"], true); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("The partition value __p1 is invalid")); + assert!(err_msg.contains("'__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server")); + + // Test reserved prefix '__' with is_create=false (should pass) + let result = validate_partition_values(&["__p1", "2"], false); + assert!(result.is_ok()); + + // Test validate_partition_spec with mismatched size + let table_path = TablePath::new("test_db".to_string(), "test_table".to_string()); + let partition_keys = vec!["b".to_string()]; + let partition_spec = PartitionSpec::new(std::collections::HashMap::new()); + let result = validate_partition_spec(&table_path, &partition_keys, &partition_spec, true); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("PartitionSpec size is not equal to partition keys size for partitioned table test_db.test_table")); + } + + #[test] + fn test_generate_auto_partition_name() { + use jiff::civil::date; + use jiff::tz::TimeZone; + + // LocalDateTime of 2024-11-11 11:11 with UTC-8 timezone + let tz = TimeZone::get("Etc/GMT+8").expect("timezone"); + let zoned = date(2024, 11, 11) + .at(11, 11, 0, 0) + .to_zoned(tz) + .expect("Zoned datetime creation failed"); + + // for year + test_generate_auto_partition_name_for( + &zoned, + AutoPartitionTimeUnit::Year, + &[-1, 0, 1, 2, 3], + &["2023", "2024", "2025", "2026", "2027"], + ); + + // for quarter + test_generate_auto_partition_name_for( + &zoned, + AutoPartitionTimeUnit::Quarter, + &[-1, 0, 1, 2, 3], + &["20243", "20244", "20251", "20252", "20253"], + ); + + // for month + test_generate_auto_partition_name_for( + &zoned, + AutoPartitionTimeUnit::Month, + &[-1, 0, 1, 2, 3], + &["202410", "202411", "202412", "202501", "202502"], + ); + + // for day + test_generate_auto_partition_name_for( + &zoned, + AutoPartitionTimeUnit::Day, + &[-1, 0, 1, 2, 3, 20], + &[ + "20241110", "20241111", "20241112", "20241113", "20241114", "20241201", + ], + ); + + // for hour + test_generate_auto_partition_name_for( + &zoned, + AutoPartitionTimeUnit::Hour, + &[-2, -1, 0, 1, 2, 3, 13], + &[ + "2024111109", + "2024111110", + "2024111111", + "2024111112", + "2024111113", + "2024111114", + "2024111200", + ], + ); + } + + fn test_generate_auto_partition_name_for( + zoned: &Zoned, + time_unit: AutoPartitionTimeUnit, + offsets: &[i32], + expected: &[&str], + ) { + for (i, offset) in offsets.iter().enumerate() { + let resolved_partition_spec = + generate_auto_partition(vec!["dt".to_string()], zoned, *offset, time_unit); + assert_eq!( + resolved_partition_spec.get_partition_name(), + expected[i], + "{:?} offset {} failed", + time_unit, + offset + ); + } + } + + #[test] + fn test_string() { + let datum = Datum::String(Cow::Borrowed("Fluss")); + + let to_string_result = + convert_value_to_string(&datum, &DataType::String(StringType::new())); + assert_eq!(to_string_result, "Fluss"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_char() { + let datum = Datum::String(Cow::Borrowed("F")); + + let to_string_result = convert_value_to_string(&datum, &DataType::Char(CharType::new(1))); + assert_eq!(to_string_result, "F"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_boolean() { + let datum = Datum::Bool(true); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Boolean(BooleanType::new())); + assert_eq!(to_string_result, "true"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_byte() { + let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); + + let to_string_result = convert_value_to_string(&datum, &DataType::Bytes(BytesType::new())); + assert_eq!(to_string_result, "1020304050ff"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_binary() { + let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Binary(BinaryType::new(6))); + assert_eq!(to_string_result, "1020304050ff"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_tiny_int() { + let datum = Datum::Int8(100); + + let to_string_result = + convert_value_to_string(&datum, &DataType::TinyInt(TinyIntType::new())); + assert_eq!(to_string_result, "100"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_small_int() { + let datum = Datum::Int16(-32760); + + let to_string_result = + convert_value_to_string(&datum, &DataType::SmallInt(SmallIntType::new())); + assert_eq!(to_string_result, "-32760"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_int() { + let datum = Datum::Int32(299000); + + let to_string_result = convert_value_to_string(&datum, &DataType::Int(IntType::new())); + assert_eq!(to_string_result, "299000"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_big_int() { + let datum = Datum::Int64(1748662955428); + + let to_string_result = + convert_value_to_string(&datum, &DataType::BigInt(BigIntType::new())); + assert_eq!(to_string_result, "1748662955428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_date() { + let datum = Datum::Date(Date::new(20235)); + + let to_string_result = convert_value_to_string(&datum, &DataType::Date(DateType::new())); + assert_eq!(to_string_result, "2025-05-27"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_time() { + let datum = Datum::Time(Time::new(5402199)); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Time(TimeType::new(3).unwrap())); + assert_eq!(to_string_result, "01-30-02_199"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_float() { + let datum = Datum::Float32(5.73.into()); + + let to_string_result = convert_value_to_string(&datum, &DataType::Float(FloatType::new())); + assert_eq!(to_string_result, "5_73"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + let datum = Datum::Float32(f32::NAN.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + "NaN" + ); + + let datum = Datum::Float32(f32::INFINITY.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + "Inf" + ); + + let datum = Datum::Float32(f32::NEG_INFINITY.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + "-Inf" + ); + } + + #[test] + fn test_double() { + let datum = Datum::Float64(5.73737.into()); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())); + assert_eq!(to_string_result, "5_73737"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + let datum = Datum::Float64(f64::NAN.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + "NaN" + ); + + let datum = Datum::Float64(f64::INFINITY.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + "Inf" + ); + + let datum = Datum::Float64(f64::NEG_INFINITY.into()); + assert_eq!( + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + "-Inf" + ); + } + + #[test] + fn test_timestamp_ntz() { + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955428, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_timestamp_ltz() { + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955428, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_to_string( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } +} From f8d8e565341499923312391af82937c1263bb170 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 25 Jan 2026 14:59:07 +0000 Subject: [PATCH 2/9] Address copilot comments --- crates/fluss/src/client/admin.rs | 7 +- .../src/client/table/partition_getter.rs | 6 +- crates/fluss/src/metadata/partition.rs | 84 +++++---- .../src/rpc/message/list_partition_infos.rs | 2 +- crates/fluss/src/util/partition.rs | 159 +++++++++++------- 5 files changed, 147 insertions(+), 111 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index bffe0f51..448e03cc 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -140,7 +140,10 @@ impl FlussAdmin { } /// List all partitions in the given table. - pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result> { + pub async fn list_partition_infos( + &self, + table_path: &TablePath, + ) -> Result>> { self.list_partition_infos_with_spec(table_path, None).await } @@ -149,7 +152,7 @@ impl FlussAdmin { &self, table_path: &TablePath, partial_partition_spec: Option<&PartitionSpec>, - ) -> Result> { + ) -> Result>> { let response = self .admin_gateway .request(ListPartitionInfosRequest::new( diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 55b1ec7c..6b576f1e 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -66,7 +66,7 @@ impl PartitionGetter { .map(|ps| ps.get_partition_name()) } - pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result { + pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result> { let mut partition_values = Vec::with_capacity(self.partitions.len()); for (data_type, field_getter) in &self.partitions { @@ -76,10 +76,10 @@ impl PartitionGetter { message: "Partition value shouldn't be null.".to_string(), }); } - partition_values.push(partition::convert_value_to_string(&value, data_type)); + partition_values.push(partition::convert_value_to_string(&value, data_type)?); } - ResolvedPartitionSpec::new(self.partition_keys.clone(), partition_values) + ResolvedPartitionSpec::new(self.partition_keys.as_slice(), partition_values) } } diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 1ecc0dcd..8d9d3f40 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::PartitionId; use crate::error::{Error, Result}; use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use crate::{PartitionId, TableId}; +use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{Display, Formatter}; @@ -71,53 +72,46 @@ impl Display for PartitionSpec { /// spec is re-arranged into the correct order by comparing it with a list of strictly ordered /// partition keys. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ResolvedPartitionSpec { - partition_keys: Vec, +pub struct ResolvedPartitionSpec<'a> { + partition_keys: Cow<'a, [String]>, partition_values: Vec, } pub const PARTITION_SPEC_SEPARATOR: &str = "$"; -impl ResolvedPartitionSpec { - pub fn new(partition_keys: Vec, partition_values: Vec) -> Result { +impl<'a> ResolvedPartitionSpec<'a> { + pub fn new(partition_keys: &'a [String], partition_values: Vec) -> Result { if partition_keys.len() != partition_values.len() { return Err(Error::IllegalArgument { message: "The number of partition keys and partition values should be the same." .to_string(), }); } + Ok(Self { - partition_keys, + partition_keys: Cow::Borrowed(partition_keys), partition_values, }) } pub fn from_partition_spec( - partition_keys: Vec, + partition_keys: &'a [String], partition_spec: &PartitionSpec, ) -> Self { - let partition_values = - Self::get_reordered_partition_values(&partition_keys, partition_spec); + let partition_values = Self::get_reordered_partition_values(partition_keys, partition_spec); Self { - partition_keys, + partition_keys: Cow::Borrowed(partition_keys), partition_values, } } - pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { - Self { - partition_keys: vec![partition_key], - partition_values: vec![partition_value], - } - } - - pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { + pub fn from_partition_name(partition_keys: &'a [String], partition_name: &str) -> Self { let partition_values: Vec = partition_name .split(PARTITION_SPEC_SEPARATOR) .map(|s| s.to_string()) .collect(); Self { - partition_keys, + partition_keys: Cow::Borrowed(partition_keys), partition_values, } } @@ -141,7 +135,7 @@ impl ResolvedPartitionSpec { } Ok(Self { - partition_keys: keys, + partition_keys: Cow::Owned(keys), partition_values: values, }) } @@ -238,8 +232,9 @@ impl ResolvedPartitionSpec { .iter() .map(|kv| kv.value.clone()) .collect(); + Self { - partition_keys, + partition_keys: Cow::Owned(partition_keys), partition_values, } } @@ -256,7 +251,7 @@ impl ResolvedPartitionSpec { } } -impl Display for ResolvedPartitionSpec { +impl<'a> Display for ResolvedPartitionSpec<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.get_partition_qualified_name()) } @@ -265,13 +260,13 @@ impl Display for ResolvedPartitionSpec { /// Information of a partition metadata, includes the partition's name and the partition id that /// represents the unique identifier of the partition. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PartitionInfo { +pub struct PartitionInfo<'a> { partition_id: PartitionId, - partition_spec: ResolvedPartitionSpec, + partition_spec: ResolvedPartitionSpec<'a>, } -impl PartitionInfo { - pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec) -> Self { +impl<'a> PartitionInfo<'a> { + pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec<'a>) -> Self { Self { partition_id, partition_spec, @@ -288,7 +283,7 @@ impl PartitionInfo { self.partition_spec.get_partition_name() } - pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec { + pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec<'_> { &self.partition_spec } @@ -311,7 +306,7 @@ impl PartitionInfo { } } -impl Display for PartitionInfo { +impl<'a> Display for PartitionInfo<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, @@ -325,12 +320,12 @@ impl Display for PartitionInfo { /// A class to identify a table partition, containing the table id and the partition id. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct TablePartition { - table_id: i64, + table_id: TableId, partition_id: PartitionId, } impl TablePartition { - pub fn new(table_id: i64, partition_id: PartitionId) -> Self { + pub fn new(table_id: TableId, partition_id: PartitionId) -> Self { Self { table_id, partition_id, @@ -362,8 +357,9 @@ mod tests { #[test] fn test_resolved_partition_spec_name() { + let partition_keys = vec!["date".to_string(), "region".to_string()]; let spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + partition_keys.as_slice(), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); @@ -377,10 +373,9 @@ mod tests { #[test] fn test_resolved_partition_spec_from_partition_name() { - let spec = ResolvedPartitionSpec::from_partition_name( - vec!["date".to_string(), "region".to_string()], - "2024-01-15$US", - ); + let partition_keys = vec!["date".to_string(), "region".to_string()]; + let spec = + ResolvedPartitionSpec::from_partition_name(partition_keys.as_slice(), "2024-01-15$US"); assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); } @@ -397,18 +392,18 @@ mod tests { #[test] fn test_resolved_partition_spec_mismatched_lengths() { - let result = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], - vec!["2024-01-15".to_string()], - ); + let partition_keys = vec!["date".to_string(), "region".to_string()]; + let result = + ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]); assert!(result.is_err()); } #[test] fn test_partition_info() { + let partition_keys = vec!["date".to_string()]; let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) .unwrap(); let info = PartitionInfo::new(42, spec); @@ -440,8 +435,9 @@ mod tests { #[test] fn test_partition_info_pb_roundtrip() { + let partition_keys = vec!["date".to_string()]; let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) .unwrap(); let info = PartitionInfo::new(42, spec); @@ -454,14 +450,16 @@ mod tests { #[test] fn test_contains() { + let partition_keys = vec!["date".to_string(), "region".to_string()]; let full_spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + partition_keys.as_slice(), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); + let partition_keys = vec!["date".to_string()]; let partial_spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) .unwrap(); assert!(full_spec.contains(&partial_spec).unwrap()); diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs b/crates/fluss/src/rpc/message/list_partition_infos.rs index ab693671..86ce5c8e 100644 --- a/crates/fluss/src/rpc/message/list_partition_infos.rs +++ b/crates/fluss/src/rpc/message/list_partition_infos.rs @@ -54,7 +54,7 @@ impl_write_version_type!(ListPartitionInfosRequest); impl_read_version_type!(ListPartitionInfosResponse); impl ListPartitionInfosResponse { - pub fn get_partitions_info(&self) -> Vec { + pub fn get_partitions_info(&self) -> Vec> { self.partitions_info .iter() .map(PartitionInfo::from_pb) diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index fcf4f9fd..e98085a6 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -19,6 +19,7 @@ #![allow(dead_code)] +use crate::error::Error::IllegalArgument; use crate::error::{Error, Result}; use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, TablePath}; use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; @@ -91,63 +92,76 @@ fn validate_partition_values(partition_values: &[&str], is_create: bool) -> Resu /// partition, we need to first generate a [`ResolvedPartitionSpec`]. /// /// The value is the formatted time with the specified time unit. -pub fn generate_auto_partition( - partition_keys: Vec, +pub fn generate_auto_partition<'a>( + partition_keys: &'a [String], current: &Zoned, offset: i32, time_unit: AutoPartitionTimeUnit, -) -> ResolvedPartitionSpec { - let auto_partition_field_spec = generate_auto_partition_time(current, offset, time_unit); - ResolvedPartitionSpec::from_partition_name(partition_keys, &auto_partition_field_spec) +) -> Result> { + let auto_partition_field_spec = generate_auto_partition_time(current, offset, time_unit)?; + Ok(ResolvedPartitionSpec::from_partition_name( + partition_keys, + auto_partition_field_spec.as_str(), + )) } pub fn generate_auto_partition_time( current: &Zoned, offset: i32, time_unit: AutoPartitionTimeUnit, -) -> String { +) -> Result { match time_unit { AutoPartitionTimeUnit::Year => { let adjusted = current .checked_add(jiff::Span::new().years(offset)) - .expect("year overflow"); - format!("{}", adjusted.year()) + .map_err(|_| IllegalArgument { + message: "Year offset would cause overflow".to_string(), + })?; + Ok(format!("{}", adjusted.year())) } AutoPartitionTimeUnit::Quarter => { let adjusted = current .checked_add(jiff::Span::new().months(offset * 3)) - .expect("quarter overflow"); + .map_err(|_| IllegalArgument { + message: "Quarter offset would cause overflow".to_string(), + })?; let quarter = (adjusted.month() as i32 - 1) / 3 + 1; - format!("{}{}", adjusted.year(), quarter) + Ok(format!("{}{}", adjusted.year(), quarter)) } AutoPartitionTimeUnit::Month => { let adjusted = current .checked_add(jiff::Span::new().months(offset)) - .expect("month overflow"); - format!("{}{:02}", adjusted.year(), adjusted.month()) + .map_err(|_| IllegalArgument { + message: "Month offset would cause overflow".to_string(), + })?; + Ok(format!("{}{:02}", adjusted.year(), adjusted.month())) } AutoPartitionTimeUnit::Day => { let adjusted = current .checked_add(jiff::Span::new().days(offset)) - .expect("day overflow"); - format!( + .map_err(|_| IllegalArgument { + message: "Day offset would cause overflow".to_string(), + })?; + Ok(format!( "{}{:02}{:02}", adjusted.year(), adjusted.month(), adjusted.day() - ) + )) } AutoPartitionTimeUnit::Hour => { let adjusted = current .checked_add(jiff::Span::new().hours(offset)) - .expect("hour overflow"); - format!( + .map_err(|_| IllegalArgument { + message: "Hour offset would cause overflow".to_string(), + })?; + Ok(format!( "{}{:02}{:02}{:02}", adjusted.year(), adjusted.month(), adjusted.day(), adjusted.hour() - ) + )) } } } @@ -155,10 +169,7 @@ pub fn generate_auto_partition_time( fn hex_string(bytes: &[u8]) -> String { let mut hex = String::with_capacity(bytes.len() * 2); for &b in bytes { - let h = format!("{:x}", b); - if h.len() == 1 { - hex.push('0'); - } + let h = format!("{:02x}", b); hex.push_str(&h); } hex @@ -280,25 +291,26 @@ fn format_date_time(total_nanos: i64, dt: DateTime) -> String { } /// Converts a Datum value to its string representation for partition naming. -pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> String { +pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> Result { match (value, data_type) { - (Datum::String(s), DataType::Char(_) | DataType::String(_)) => s.to_string(), - (Datum::Bool(b), DataType::Boolean(_)) => b.to_string(), - (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => hex_string(bytes), - (Datum::Int8(v), DataType::TinyInt(_)) => v.to_string(), - (Datum::Int16(v), DataType::SmallInt(_)) => v.to_string(), - (Datum::Int32(v), DataType::Int(_)) => v.to_string(), - (Datum::Int64(v), DataType::BigInt(_)) => v.to_string(), - (Datum::Date(d), DataType::Date(_)) => date_to_string(*d), - (Datum::Time(t), DataType::Time(_)) => time_to_string(*t), - (Datum::Float32(f), DataType::Float(_)) => reformat_float(f.into_inner()), - (Datum::Float64(f), DataType::Double(_)) => reformat_double(f.into_inner()), - (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => timestamp_ltz_to_string(*ts), - (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => timestamp_ntz_to_string(*ts), - _ => panic!( - "Unsupported data type for partition key: {:?}, value: {:?}", - data_type, value - ), + (Datum::String(s), DataType::Char(_) | DataType::String(_)) => Ok(s.to_string()), + (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()), + (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => Ok(hex_string(bytes)), + (Datum::Int8(v), DataType::TinyInt(_)) => Ok(v.to_string()), + (Datum::Int16(v), DataType::SmallInt(_)) => Ok(v.to_string()), + (Datum::Int32(v), DataType::Int(_)) => Ok(v.to_string()), + (Datum::Int64(v), DataType::BigInt(_)) => Ok(v.to_string()), + (Datum::Date(d), DataType::Date(_)) => Ok(date_to_string(*d)), + (Datum::Time(t), DataType::Time(_)) => Ok(time_to_string(*t)), + (Datum::Float32(f), DataType::Float(_)) => Ok(reformat_float(f.into_inner())), + (Datum::Float64(f), DataType::Double(_)) => Ok(reformat_double(f.into_inner())), + (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => Ok(timestamp_ltz_to_string(*ts)), + (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => Ok(timestamp_ntz_to_string(*ts)), + _ => Err(IllegalArgument { + message: format!( + "Unsupported conversion to partition key from data type: {data_type:?}, value: {value:?}" + ), + }), } } @@ -425,8 +437,10 @@ mod tests { expected: &[&str], ) { for (i, offset) in offsets.iter().enumerate() { + let partition_keys = vec!["dt".to_string()]; let resolved_partition_spec = - generate_auto_partition(vec!["dt".to_string()], zoned, *offset, time_unit); + generate_auto_partition(partition_keys.as_slice(), zoned, *offset, time_unit) + .expect("partition generation failed"); assert_eq!( resolved_partition_spec.get_partition_name(), expected[i], @@ -442,7 +456,8 @@ mod tests { let datum = Datum::String(Cow::Borrowed("Fluss")); let to_string_result = - convert_value_to_string(&datum, &DataType::String(StringType::new())); + convert_value_to_string(&datum, &DataType::String(StringType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "Fluss"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -452,7 +467,8 @@ mod tests { fn test_char() { let datum = Datum::String(Cow::Borrowed("F")); - let to_string_result = convert_value_to_string(&datum, &DataType::Char(CharType::new(1))); + let to_string_result = convert_value_to_string(&datum, &DataType::Char(CharType::new(1))) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "F"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -463,7 +479,8 @@ mod tests { let datum = Datum::Bool(true); let to_string_result = - convert_value_to_string(&datum, &DataType::Boolean(BooleanType::new())); + convert_value_to_string(&datum, &DataType::Boolean(BooleanType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "true"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -473,7 +490,8 @@ mod tests { fn test_byte() { let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); - let to_string_result = convert_value_to_string(&datum, &DataType::Bytes(BytesType::new())); + let to_string_result = convert_value_to_string(&datum, &DataType::Bytes(BytesType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1020304050ff"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -484,7 +502,8 @@ mod tests { let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); let to_string_result = - convert_value_to_string(&datum, &DataType::Binary(BinaryType::new(6))); + convert_value_to_string(&datum, &DataType::Binary(BinaryType::new(6))) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1020304050ff"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -495,7 +514,8 @@ mod tests { let datum = Datum::Int8(100); let to_string_result = - convert_value_to_string(&datum, &DataType::TinyInt(TinyIntType::new())); + convert_value_to_string(&datum, &DataType::TinyInt(TinyIntType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "100"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -506,7 +526,8 @@ mod tests { let datum = Datum::Int16(-32760); let to_string_result = - convert_value_to_string(&datum, &DataType::SmallInt(SmallIntType::new())); + convert_value_to_string(&datum, &DataType::SmallInt(SmallIntType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "-32760"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -516,7 +537,8 @@ mod tests { fn test_int() { let datum = Datum::Int32(299000); - let to_string_result = convert_value_to_string(&datum, &DataType::Int(IntType::new())); + let to_string_result = convert_value_to_string(&datum, &DataType::Int(IntType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "299000"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -527,7 +549,8 @@ mod tests { let datum = Datum::Int64(1748662955428); let to_string_result = - convert_value_to_string(&datum, &DataType::BigInt(BigIntType::new())); + convert_value_to_string(&datum, &DataType::BigInt(BigIntType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1748662955428"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -537,7 +560,8 @@ mod tests { fn test_date() { let datum = Datum::Date(Date::new(20235)); - let to_string_result = convert_value_to_string(&datum, &DataType::Date(DateType::new())); + let to_string_result = convert_value_to_string(&datum, &DataType::Date(DateType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-27"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -548,7 +572,8 @@ mod tests { let datum = Datum::Time(Time::new(5402199)); let to_string_result = - convert_value_to_string(&datum, &DataType::Time(TimeType::new(3).unwrap())); + convert_value_to_string(&datum, &DataType::Time(TimeType::new(3).unwrap())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "01-30-02_199"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -558,26 +583,30 @@ mod tests { fn test_float() { let datum = Datum::Float32(5.73.into()); - let to_string_result = convert_value_to_string(&datum, &DataType::Float(FloatType::new())); + let to_string_result = convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "5_73"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); let datum = Datum::Float32(f32::NAN.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), "NaN" ); let datum = Datum::Float32(f32::INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), "Inf" ); let datum = Datum::Float32(f32::NEG_INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())), + convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), "-Inf" ); } @@ -587,26 +616,30 @@ mod tests { let datum = Datum::Float64(5.73737.into()); let to_string_result = - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())); + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "5_73737"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); let datum = Datum::Float64(f64::NAN.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), "NaN" ); let datum = Datum::Float64(f64::INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), "Inf" ); let datum = Datum::Float64(f64::NEG_INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())), + convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), "-Inf" ); } @@ -619,7 +652,8 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())); + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -635,7 +669,8 @@ mod tests { let to_string_result = convert_value_to_string( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), - ); + ) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); From 4d1ff936eb68a0d620967a8cfe958d4f6ad6c3c0 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 25 Jan 2026 14:59:21 +0000 Subject: [PATCH 3/9] Squelch warning on unused variable --- crates/fluss/src/client/table/remote_log.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index c39056db..52ac813e 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -469,7 +469,7 @@ async fn spawn_download_task( result_sender: request.result_sender, } } - Err(e) if request.result_sender.is_closed() => { + Err(_e) if request.result_sender.is_closed() => { // Receiver dropped (cancelled) - release permit, don't re-queue drop(permit); DownloadResult::Cancelled From cfc9995c36265605d4990d53e5f0e4257910fa53 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 26 Jan 2026 10:01:43 +0000 Subject: [PATCH 4/9] Add more test cases ensuring parity with Java --- crates/fluss/src/row/datum.rs | 5 +- crates/fluss/src/util/partition.rs | 149 ++++++++++++++++++++++++----- 2 files changed, 125 insertions(+), 29 deletions(-) diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 7b3850f8..312b40fe 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -801,10 +801,7 @@ impl TimestampNtz { } } - pub fn from_millis_nanos( - millisecond: i64, - nano_of_millisecond: i32, - ) -> crate::error::Result { + pub fn from_millis_nanos(millisecond: i64, nano_of_millisecond: i32) -> Result { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index e98085a6..bffbffa0 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -25,7 +25,6 @@ use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, TablePath} use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use jiff::ToSpan; use jiff::Zoned; -use jiff::civil::DateTime; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AutoPartitionTimeUnit { @@ -236,50 +235,70 @@ fn time_to_string(time: Time) -> String { milli_to_string(time.get_inner()) } -/// Always add nanoseconds whether TimestampNtz and TimestampLtz are compact or not. -fn timestamp_ntz_to_string(ts: TimestampNtz) -> String { - let millis = ts.get_millisecond(); - let nano_of_milli = ts.get_nano_of_millisecond(); +trait Timestamp { + fn get_milli(&self) -> i64; + fn get_nano_of_milli(&self) -> i32; +} - let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + (nano_of_milli as i64); - let total_secs = millis / MILLIS_PER_SECOND; +impl Timestamp for TimestampNtz { + fn get_milli(&self) -> i64 { + self.get_millisecond() + } - let epoch = jiff::Timestamp::UNIX_EPOCH; - let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); - let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime(); + fn get_nano_of_milli(&self) -> i32 { + self.get_nano_of_millisecond() + } +} - format_date_time(total_nanos, dt) +impl Timestamp for TimestampLtz { + fn get_milli(&self) -> i64 { + self.get_epoch_millisecond() + } + + fn get_nano_of_milli(&self) -> i32 { + self.get_nano_of_millisecond() + } } -fn timestamp_ltz_to_string(ts: TimestampLtz) -> String { - let millis = ts.get_epoch_millisecond(); - let nano_of_milli = ts.get_nano_of_millisecond(); +/// This formats date time while adhering to java side behaviour +/// +fn timestamp_to_string(ts: T) -> String { + let millis = ts.get_milli(); + let nanos = ts.get_nano_of_milli(); - let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + (nano_of_milli as i64); + let millis_of_second = millis % MILLIS_PER_SECOND; let total_secs = millis / MILLIS_PER_SECOND; let epoch = jiff::Timestamp::UNIX_EPOCH; let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime(); - format_date_time(total_nanos, dt) -} - -fn format_date_time(total_nanos: i64, dt: DateTime) -> String { - if total_nanos > 0 { + if nanos > 0 { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}{:06}", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + millis_of_second, + nanos + ) + } else if millis_of_second > 0 { format!( - "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{}", + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}", dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), dt.second(), - total_nanos + millis_of_second ) } else { format!( - "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}", + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_", dt.year(), dt.month(), dt.day(), @@ -304,8 +323,8 @@ pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> Result Ok(time_to_string(*t)), (Datum::Float32(f), DataType::Float(_)) => Ok(reformat_float(f.into_inner())), (Datum::Float64(f), DataType::Double(_)) => Ok(reformat_double(f.into_inner())), - (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => Ok(timestamp_ltz_to_string(*ts)), - (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => Ok(timestamp_ntz_to_string(*ts)), + (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => Ok(timestamp_to_string(*ts)), + (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => Ok(timestamp_to_string(*ts)), _ => Err(IllegalArgument { message: format!( "Unsupported conversion to partition key from data type: {data_type:?}, value: {value:?}" @@ -657,6 +676,43 @@ mod tests { assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); + + // Zero nanos of millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955428, 0).expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955000, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis and zero nanos + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955000, 0).expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); } #[test] @@ -674,5 +730,48 @@ mod tests { assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); + + // Zero nanos of millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955428, 0).expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_to_string( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955000, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_to_string( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis and zero nanos + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955000, 0).expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_to_string( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); } } From fc5a9c6961c2725e646bd55df2ae94cdd0fd1944 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 26 Jan 2026 10:12:32 +0000 Subject: [PATCH 5/9] Add more test cases ensuring parity with Java --- crates/fluss/src/row/datum.rs | 5 +---- crates/fluss/src/util/partition.rs | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 312b40fe..b42cfec4 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -846,10 +846,7 @@ impl TimestampLtz { } } - pub fn from_millis_nanos( - epoch_millisecond: i64, - nano_of_millisecond: i32, - ) -> crate::error::Result { + pub fn from_millis_nanos(epoch_millisecond: i64, nano_of_millisecond: i32) -> Result { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index bffbffa0..e556b787 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -266,8 +266,8 @@ fn timestamp_to_string(ts: T) -> String { let millis = ts.get_milli(); let nanos = ts.get_nano_of_milli(); - let millis_of_second = millis % MILLIS_PER_SECOND; - let total_secs = millis / MILLIS_PER_SECOND; + let millis_of_second = millis.rem_euclid(MILLIS_PER_SECOND); + let total_secs = millis.div_euclid(MILLIS_PER_SECOND); let epoch = jiff::Timestamp::UNIX_EPOCH; let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); From 806f647f12ca6a2dfa854cbca0f9c463b0f8a443 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 26 Jan 2026 10:16:18 +0000 Subject: [PATCH 6/9] Add test case for negative millis --- crates/fluss/src/util/partition.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index e556b787..4fdff40a 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -713,6 +713,19 @@ mod tests { assert_eq!(to_string_result, "2025-05-31-03-42-35_"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); + + // Negative millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(-1748662955428, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); } #[test] @@ -773,5 +786,20 @@ mod tests { assert_eq!(to_string_result, "2025-05-31-03-42-35_"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); + + // Negative millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(-1748662955428, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_to_string( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); } } From d8e2c0700ab61aaba40f3062fd31b1a9303761b7 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 26 Jan 2026 14:12:43 +0000 Subject: [PATCH 7/9] Removed partition util methods that are not used on client side --- .../src/client/table/partition_getter.rs | 2 +- crates/fluss/src/util/partition.rs | 347 ++---------------- 2 files changed, 38 insertions(+), 311 deletions(-) diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 6b576f1e..f44153b3 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -76,7 +76,7 @@ impl PartitionGetter { message: "Partition value shouldn't be null.".to_string(), }); } - partition_values.push(partition::convert_value_to_string(&value, data_type)?); + partition_values.push(partition::convert_value_of_type(&value, data_type)?); } ResolvedPartitionSpec::new(self.partition_keys.as_slice(), partition_values) diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index 4fdff40a..051bb3ee 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -15,155 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! Utils for partition. - -#![allow(dead_code)] - +/// Utils for partition. use crate::error::Error::IllegalArgument; -use crate::error::{Error, Result}; -use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, TablePath}; +use crate::error::Result; +use crate::metadata::DataType; use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use jiff::ToSpan; -use jiff::Zoned; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum AutoPartitionTimeUnit { - Year, - Quarter, - Month, - Day, - Hour, -} - -pub fn validate_partition_spec( - table_path: &TablePath, - partition_keys: &[String], - partition_spec: &PartitionSpec, - is_create: bool, -) -> Result<()> { - let partition_spec_map = partition_spec.get_spec_map(); - if partition_keys.len() != partition_spec_map.len() { - return Err(Error::InvalidPartition { - message: format!( - "PartitionSpec size is not equal to partition keys size for partitioned table {}.", - table_path - ), - }); - } - - let mut reordered_partition_values: Vec<&str> = Vec::with_capacity(partition_keys.len()); - for partition_key in partition_keys { - if let Some(value) = partition_spec_map.get(partition_key) { - reordered_partition_values.push(value); - } else { - return Err(Error::InvalidPartition { - message: format!( - "PartitionSpec {} does not contain partition key '{}' for partitioned table {}.", - partition_spec, partition_key, table_path - ), - }); - } - } - - validate_partition_values(&reordered_partition_values, is_create) -} - -fn validate_partition_values(partition_values: &[&str], is_create: bool) -> Result<()> { - for value in partition_values { - let invalid_name_error = TablePath::detect_invalid_name(value); - let prefix_error = if is_create { - TablePath::validate_prefix(value) - } else { - None - }; - - if invalid_name_error.is_some() || prefix_error.is_some() { - let error_msg = invalid_name_error.unwrap_or_else(|| prefix_error.unwrap()); - return Err(Error::InvalidPartition { - message: format!("The partition value {} is invalid: {}", value, error_msg), - }); - } - } - Ok(()) -} - -/// Generate [`ResolvedPartitionSpec`] for auto partition in server. When we auto creating a -/// partition, we need to first generate a [`ResolvedPartitionSpec`]. -/// -/// The value is the formatted time with the specified time unit. -pub fn generate_auto_partition<'a>( - partition_keys: &'a [String], - current: &Zoned, - offset: i32, - time_unit: AutoPartitionTimeUnit, -) -> Result> { - let auto_partition_field_spec = generate_auto_partition_time(current, offset, time_unit)?; - Ok(ResolvedPartitionSpec::from_partition_name( - partition_keys, - auto_partition_field_spec.as_str(), - )) -} - -pub fn generate_auto_partition_time( - current: &Zoned, - offset: i32, - time_unit: AutoPartitionTimeUnit, -) -> Result { - match time_unit { - AutoPartitionTimeUnit::Year => { - let adjusted = current - .checked_add(jiff::Span::new().years(offset)) - .map_err(|_| IllegalArgument { - message: "Year offset would cause overflow".to_string(), - })?; - Ok(format!("{}", adjusted.year())) - } - AutoPartitionTimeUnit::Quarter => { - let adjusted = current - .checked_add(jiff::Span::new().months(offset * 3)) - .map_err(|_| IllegalArgument { - message: "Quarter offset would cause overflow".to_string(), - })?; - let quarter = (adjusted.month() as i32 - 1) / 3 + 1; - Ok(format!("{}{}", adjusted.year(), quarter)) - } - AutoPartitionTimeUnit::Month => { - let adjusted = current - .checked_add(jiff::Span::new().months(offset)) - .map_err(|_| IllegalArgument { - message: "Month offset would cause overflow".to_string(), - })?; - Ok(format!("{}{:02}", adjusted.year(), adjusted.month())) - } - AutoPartitionTimeUnit::Day => { - let adjusted = current - .checked_add(jiff::Span::new().days(offset)) - .map_err(|_| IllegalArgument { - message: "Day offset would cause overflow".to_string(), - })?; - Ok(format!( - "{}{:02}{:02}", - adjusted.year(), - adjusted.month(), - adjusted.day() - )) - } - AutoPartitionTimeUnit::Hour => { - let adjusted = current - .checked_add(jiff::Span::new().hours(offset)) - .map_err(|_| IllegalArgument { - message: "Hour offset would cause overflow".to_string(), - })?; - Ok(format!( - "{}{:02}{:02}{:02}", - adjusted.year(), - adjusted.month(), - adjusted.day(), - adjusted.hour() - )) - } - } -} fn hex_string(bytes: &[u8]) -> String { let mut hex = String::with_capacity(bytes.len() * 2); @@ -213,7 +70,6 @@ fn date_to_string(date: Date) -> String { day_to_string(date.get_inner()) } -const NANOS_PER_MILLIS: i64 = 1_000_000; const MILLIS_PER_SECOND: i64 = 1_000; const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND; const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE; @@ -310,7 +166,7 @@ fn timestamp_to_string(ts: T) -> String { } /// Converts a Datum value to its string representation for partition naming. -pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> Result { +pub fn convert_value_of_type(value: &Datum, data_type: &DataType) -> Result { match (value, data_type) { (Datum::String(s), DataType::Char(_) | DataType::String(_)) => Ok(s.to_string()), (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()), @@ -345,138 +201,12 @@ mod tests { use crate::metadata::TablePath; - #[test] - fn test_validate_partition_values() { - // Test invalid character '$' - let result = validate_partition_values(&["$1", "2"], true); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!(err_msg.contains("The partition value $1 is invalid")); - assert!(err_msg.contains( - "'$1' contains one or more characters other than ASCII alphanumerics, '_' and '-'" - )); - - // Test invalid character '?' - let result = validate_partition_values(&["?1", "2"], false); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!(err_msg.contains("The partition value ?1 is invalid")); - assert!(err_msg.contains( - "'?1' contains one or more characters other than ASCII alphanumerics, '_' and '-'" - )); - - // Test reserved prefix '__' with is_create=true - let result = validate_partition_values(&["__p1", "2"], true); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!(err_msg.contains("The partition value __p1 is invalid")); - assert!(err_msg.contains("'__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server")); - - // Test reserved prefix '__' with is_create=false (should pass) - let result = validate_partition_values(&["__p1", "2"], false); - assert!(result.is_ok()); - - // Test validate_partition_spec with mismatched size - let table_path = TablePath::new("test_db".to_string(), "test_table".to_string()); - let partition_keys = vec!["b".to_string()]; - let partition_spec = PartitionSpec::new(std::collections::HashMap::new()); - let result = validate_partition_spec(&table_path, &partition_keys, &partition_spec, true); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!(err_msg.contains("PartitionSpec size is not equal to partition keys size for partitioned table test_db.test_table")); - } - - #[test] - fn test_generate_auto_partition_name() { - use jiff::civil::date; - use jiff::tz::TimeZone; - - // LocalDateTime of 2024-11-11 11:11 with UTC-8 timezone - let tz = TimeZone::get("Etc/GMT+8").expect("timezone"); - let zoned = date(2024, 11, 11) - .at(11, 11, 0, 0) - .to_zoned(tz) - .expect("Zoned datetime creation failed"); - - // for year - test_generate_auto_partition_name_for( - &zoned, - AutoPartitionTimeUnit::Year, - &[-1, 0, 1, 2, 3], - &["2023", "2024", "2025", "2026", "2027"], - ); - - // for quarter - test_generate_auto_partition_name_for( - &zoned, - AutoPartitionTimeUnit::Quarter, - &[-1, 0, 1, 2, 3], - &["20243", "20244", "20251", "20252", "20253"], - ); - - // for month - test_generate_auto_partition_name_for( - &zoned, - AutoPartitionTimeUnit::Month, - &[-1, 0, 1, 2, 3], - &["202410", "202411", "202412", "202501", "202502"], - ); - - // for day - test_generate_auto_partition_name_for( - &zoned, - AutoPartitionTimeUnit::Day, - &[-1, 0, 1, 2, 3, 20], - &[ - "20241110", "20241111", "20241112", "20241113", "20241114", "20241201", - ], - ); - - // for hour - test_generate_auto_partition_name_for( - &zoned, - AutoPartitionTimeUnit::Hour, - &[-2, -1, 0, 1, 2, 3, 13], - &[ - "2024111109", - "2024111110", - "2024111111", - "2024111112", - "2024111113", - "2024111114", - "2024111200", - ], - ); - } - - fn test_generate_auto_partition_name_for( - zoned: &Zoned, - time_unit: AutoPartitionTimeUnit, - offsets: &[i32], - expected: &[&str], - ) { - for (i, offset) in offsets.iter().enumerate() { - let partition_keys = vec!["dt".to_string()]; - let resolved_partition_spec = - generate_auto_partition(partition_keys.as_slice(), zoned, *offset, time_unit) - .expect("partition generation failed"); - assert_eq!( - resolved_partition_spec.get_partition_name(), - expected[i], - "{:?} offset {} failed", - time_unit, - offset - ); - } - } - #[test] fn test_string() { let datum = Datum::String(Cow::Borrowed("Fluss")); - let to_string_result = - convert_value_to_string(&datum, &DataType::String(StringType::new())) - .expect("datum conversion to partition string failed"); + let to_string_result = convert_value_of_type(&datum, &DataType::String(StringType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "Fluss"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -486,7 +216,7 @@ mod tests { fn test_char() { let datum = Datum::String(Cow::Borrowed("F")); - let to_string_result = convert_value_to_string(&datum, &DataType::Char(CharType::new(1))) + let to_string_result = convert_value_of_type(&datum, &DataType::Char(CharType::new(1))) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "F"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -498,7 +228,7 @@ mod tests { let datum = Datum::Bool(true); let to_string_result = - convert_value_to_string(&datum, &DataType::Boolean(BooleanType::new())) + convert_value_of_type(&datum, &DataType::Boolean(BooleanType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "true"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -509,7 +239,7 @@ mod tests { fn test_byte() { let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); - let to_string_result = convert_value_to_string(&datum, &DataType::Bytes(BytesType::new())) + let to_string_result = convert_value_of_type(&datum, &DataType::Bytes(BytesType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1020304050ff"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -520,9 +250,8 @@ mod tests { fn test_binary() { let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); - let to_string_result = - convert_value_to_string(&datum, &DataType::Binary(BinaryType::new(6))) - .expect("datum conversion to partition string failed"); + let to_string_result = convert_value_of_type(&datum, &DataType::Binary(BinaryType::new(6))) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1020304050ff"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -533,7 +262,7 @@ mod tests { let datum = Datum::Int8(100); let to_string_result = - convert_value_to_string(&datum, &DataType::TinyInt(TinyIntType::new())) + convert_value_of_type(&datum, &DataType::TinyInt(TinyIntType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "100"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -545,7 +274,7 @@ mod tests { let datum = Datum::Int16(-32760); let to_string_result = - convert_value_to_string(&datum, &DataType::SmallInt(SmallIntType::new())) + convert_value_of_type(&datum, &DataType::SmallInt(SmallIntType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "-32760"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -556,7 +285,7 @@ mod tests { fn test_int() { let datum = Datum::Int32(299000); - let to_string_result = convert_value_to_string(&datum, &DataType::Int(IntType::new())) + let to_string_result = convert_value_of_type(&datum, &DataType::Int(IntType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "299000"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -567,9 +296,8 @@ mod tests { fn test_big_int() { let datum = Datum::Int64(1748662955428); - let to_string_result = - convert_value_to_string(&datum, &DataType::BigInt(BigIntType::new())) - .expect("datum conversion to partition string failed"); + let to_string_result = convert_value_of_type(&datum, &DataType::BigInt(BigIntType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1748662955428"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); @@ -579,7 +307,7 @@ mod tests { fn test_date() { let datum = Datum::Date(Date::new(20235)); - let to_string_result = convert_value_to_string(&datum, &DataType::Date(DateType::new())) + let to_string_result = convert_value_of_type(&datum, &DataType::Date(DateType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-27"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -591,7 +319,7 @@ mod tests { let datum = Datum::Time(Time::new(5402199)); let to_string_result = - convert_value_to_string(&datum, &DataType::Time(TimeType::new(3).unwrap())) + convert_value_of_type(&datum, &DataType::Time(TimeType::new(3).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "01-30-02_199"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -602,7 +330,7 @@ mod tests { fn test_float() { let datum = Datum::Float32(5.73.into()); - let to_string_result = convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + let to_string_result = convert_value_of_type(&datum, &DataType::Float(FloatType::new())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "5_73"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -610,21 +338,21 @@ mod tests { let datum = Datum::Float32(f32::NAN.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) .expect("datum conversion to partition string failed"), "NaN" ); let datum = Datum::Float32(f32::INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) .expect("datum conversion to partition string failed"), "Inf" ); let datum = Datum::Float32(f32::NEG_INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Float(FloatType::new())) + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) .expect("datum conversion to partition string failed"), "-Inf" ); @@ -634,30 +362,29 @@ mod tests { fn test_double() { let datum = Datum::Float64(5.73737.into()); - let to_string_result = - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) - .expect("datum conversion to partition string failed"); + let to_string_result = convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "5_73737"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); assert!(detect_invalid.is_none()); let datum = Datum::Float64(f64::NAN.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) .expect("datum conversion to partition string failed"), "NaN" ); let datum = Datum::Float64(f64::INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) .expect("datum conversion to partition string failed"), "Inf" ); let datum = Datum::Float64(f64::NEG_INFINITY.into()); assert_eq!( - convert_value_to_string(&datum, &DataType::Double(DoubleType::new())) + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) .expect("datum conversion to partition string failed"), "-Inf" ); @@ -671,7 +398,7 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -683,7 +410,7 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_428"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -696,7 +423,7 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -708,7 +435,7 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "2025-05-31-03-42-35_"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -721,7 +448,7 @@ mod tests { ); let to_string_result = - convert_value_to_string(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) .expect("datum conversion to partition string failed"); assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988"); let detect_invalid = TablePath::detect_invalid_name(&to_string_result); @@ -735,7 +462,7 @@ mod tests { .expect("TimestampLtz init failed"), ); - let to_string_result = convert_value_to_string( + let to_string_result = convert_value_of_type( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), ) @@ -749,7 +476,7 @@ mod tests { TimestampLtz::from_millis_nanos(1748662955428, 0).expect("TimestampLtz init failed"), ); - let to_string_result = convert_value_to_string( + let to_string_result = convert_value_of_type( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), ) @@ -764,7 +491,7 @@ mod tests { .expect("TimestampLtz init failed"), ); - let to_string_result = convert_value_to_string( + let to_string_result = convert_value_of_type( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), ) @@ -778,7 +505,7 @@ mod tests { TimestampLtz::from_millis_nanos(1748662955000, 0).expect("TimestampLtz init failed"), ); - let to_string_result = convert_value_to_string( + let to_string_result = convert_value_of_type( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), ) @@ -793,7 +520,7 @@ mod tests { .expect("TimestampLtz init failed"), ); - let to_string_result = convert_value_to_string( + let to_string_result = convert_value_of_type( &datum, &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), ) From fd6120e293ac7362a296dc6f00f4c5feca389eae Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 26 Jan 2026 14:28:08 +0000 Subject: [PATCH 8/9] Use clone to simplify ParitionInfo / ResolvedPartitionSpec ownership and lifetimes. --- crates/fluss/src/client/admin.rs | 7 +- .../src/client/table/partition_getter.rs | 4 +- crates/fluss/src/metadata/partition.rs | 71 +++++++++---------- .../src/rpc/message/list_partition_infos.rs | 2 +- 4 files changed, 39 insertions(+), 45 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 448e03cc..bffe0f51 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -140,10 +140,7 @@ impl FlussAdmin { } /// List all partitions in the given table. - pub async fn list_partition_infos( - &self, - table_path: &TablePath, - ) -> Result>> { + pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result> { self.list_partition_infos_with_spec(table_path, None).await } @@ -152,7 +149,7 @@ impl FlussAdmin { &self, table_path: &TablePath, partial_partition_spec: Option<&PartitionSpec>, - ) -> Result>> { + ) -> Result> { let response = self .admin_gateway .request(ListPartitionInfosRequest::new( diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index f44153b3..f355d8d2 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -66,7 +66,7 @@ impl PartitionGetter { .map(|ps| ps.get_partition_name()) } - pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result> { + pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result { let mut partition_values = Vec::with_capacity(self.partitions.len()); for (data_type, field_getter) in &self.partitions { @@ -79,7 +79,7 @@ impl PartitionGetter { partition_values.push(partition::convert_value_of_type(&value, data_type)?); } - ResolvedPartitionSpec::new(self.partition_keys.as_slice(), partition_values) + ResolvedPartitionSpec::new(self.partition_keys.clone(), partition_values) } } diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 8d9d3f40..b33a9442 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -18,7 +18,6 @@ use crate::error::{Error, Result}; use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; use crate::{PartitionId, TableId}; -use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{Display, Formatter}; @@ -72,15 +71,15 @@ impl Display for PartitionSpec { /// spec is re-arranged into the correct order by comparing it with a list of strictly ordered /// partition keys. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ResolvedPartitionSpec<'a> { - partition_keys: Cow<'a, [String]>, +pub struct ResolvedPartitionSpec { + partition_keys: Vec, partition_values: Vec, } pub const PARTITION_SPEC_SEPARATOR: &str = "$"; -impl<'a> ResolvedPartitionSpec<'a> { - pub fn new(partition_keys: &'a [String], partition_values: Vec) -> Result { +impl ResolvedPartitionSpec { + pub fn new(partition_keys: Vec, partition_values: Vec) -> Result { if partition_keys.len() != partition_values.len() { return Err(Error::IllegalArgument { message: "The number of partition keys and partition values should be the same." @@ -89,29 +88,30 @@ impl<'a> ResolvedPartitionSpec<'a> { } Ok(Self { - partition_keys: Cow::Borrowed(partition_keys), + partition_keys, partition_values, }) } pub fn from_partition_spec( - partition_keys: &'a [String], + partition_keys: Vec, partition_spec: &PartitionSpec, ) -> Self { - let partition_values = Self::get_reordered_partition_values(partition_keys, partition_spec); + let partition_values = + Self::get_reordered_partition_values(partition_keys.as_slice(), partition_spec); Self { - partition_keys: Cow::Borrowed(partition_keys), + partition_keys, partition_values, } } - pub fn from_partition_name(partition_keys: &'a [String], partition_name: &str) -> Self { + pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { let partition_values: Vec = partition_name .split(PARTITION_SPEC_SEPARATOR) .map(|s| s.to_string()) .collect(); Self { - partition_keys: Cow::Borrowed(partition_keys), + partition_keys, partition_values, } } @@ -135,7 +135,7 @@ impl<'a> ResolvedPartitionSpec<'a> { } Ok(Self { - partition_keys: Cow::Owned(keys), + partition_keys: keys, partition_values: values, }) } @@ -234,7 +234,7 @@ impl<'a> ResolvedPartitionSpec<'a> { .collect(); Self { - partition_keys: Cow::Owned(partition_keys), + partition_keys, partition_values, } } @@ -251,7 +251,7 @@ impl<'a> ResolvedPartitionSpec<'a> { } } -impl<'a> Display for ResolvedPartitionSpec<'a> { +impl Display for ResolvedPartitionSpec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.get_partition_qualified_name()) } @@ -260,13 +260,13 @@ impl<'a> Display for ResolvedPartitionSpec<'a> { /// Information of a partition metadata, includes the partition's name and the partition id that /// represents the unique identifier of the partition. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PartitionInfo<'a> { +pub struct PartitionInfo { partition_id: PartitionId, - partition_spec: ResolvedPartitionSpec<'a>, + partition_spec: ResolvedPartitionSpec, } -impl<'a> PartitionInfo<'a> { - pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec<'a>) -> Self { +impl PartitionInfo { + pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec) -> Self { Self { partition_id, partition_spec, @@ -283,7 +283,7 @@ impl<'a> PartitionInfo<'a> { self.partition_spec.get_partition_name() } - pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec<'_> { + pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec { &self.partition_spec } @@ -306,8 +306,8 @@ impl<'a> PartitionInfo<'a> { } } -impl<'a> Display for PartitionInfo<'a> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl Display for PartitionInfo { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!( f, "Partition{{name='{}', id={}}}", @@ -357,9 +357,8 @@ mod tests { #[test] fn test_resolved_partition_spec_name() { - let partition_keys = vec!["date".to_string(), "region".to_string()]; let spec = ResolvedPartitionSpec::new( - partition_keys.as_slice(), + vec!["date".to_string(), "region".to_string()], vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); @@ -373,9 +372,10 @@ mod tests { #[test] fn test_resolved_partition_spec_from_partition_name() { - let partition_keys = vec!["date".to_string(), "region".to_string()]; - let spec = - ResolvedPartitionSpec::from_partition_name(partition_keys.as_slice(), "2024-01-15$US"); + let spec = ResolvedPartitionSpec::from_partition_name( + vec!["date".to_string(), "region".to_string()], + "2024-01-15$US", + ); assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); } @@ -392,18 +392,18 @@ mod tests { #[test] fn test_resolved_partition_spec_mismatched_lengths() { - let partition_keys = vec!["date".to_string(), "region".to_string()]; - let result = - ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]); + let result = ResolvedPartitionSpec::new( + vec!["date".to_string(), "region".to_string()], + vec!["2024-01-15".to_string()], + ); assert!(result.is_err()); } #[test] fn test_partition_info() { - let partition_keys = vec!["date".to_string()]; let spec = - ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) .unwrap(); let info = PartitionInfo::new(42, spec); @@ -435,9 +435,8 @@ mod tests { #[test] fn test_partition_info_pb_roundtrip() { - let partition_keys = vec!["date".to_string()]; let spec = - ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) .unwrap(); let info = PartitionInfo::new(42, spec); @@ -450,16 +449,14 @@ mod tests { #[test] fn test_contains() { - let partition_keys = vec!["date".to_string(), "region".to_string()]; let full_spec = ResolvedPartitionSpec::new( - partition_keys.as_slice(), + vec!["date".to_string(), "region".to_string()], vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); - let partition_keys = vec!["date".to_string()]; let partial_spec = - ResolvedPartitionSpec::new(partition_keys.as_slice(), vec!["2024-01-15".to_string()]) + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) .unwrap(); assert!(full_spec.contains(&partial_spec).unwrap()); diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs b/crates/fluss/src/rpc/message/list_partition_infos.rs index 86ce5c8e..ab693671 100644 --- a/crates/fluss/src/rpc/message/list_partition_infos.rs +++ b/crates/fluss/src/rpc/message/list_partition_infos.rs @@ -54,7 +54,7 @@ impl_write_version_type!(ListPartitionInfosRequest); impl_read_version_type!(ListPartitionInfosResponse); impl ListPartitionInfosResponse { - pub fn get_partitions_info(&self) -> Vec> { + pub fn get_partitions_info(&self) -> Vec { self.partitions_info .iter() .map(PartitionInfo::from_pb) From e3c13989dfd4ea6b99e24d4bad96fbe14b213a22 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 27 Jan 2026 09:39:16 +0000 Subject: [PATCH 9/9] Address comments --- .../src/client/table/partition_getter.rs | 35 ++++++++------ crates/fluss/src/metadata/partition.rs | 47 +++++++++++-------- crates/fluss/src/util/partition.rs | 4 +- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index f355d8d2..887c0a4f 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -21,20 +21,21 @@ use crate::metadata::{DataType, ResolvedPartitionSpec, RowType}; use crate::row::InternalRow; use crate::row::field_getter::FieldGetter; use crate::util::partition; +use std::sync::Arc; /// A getter to get partition name from a row. #[allow(dead_code)] pub struct PartitionGetter { - partition_keys: Vec, + partition_keys: Arc<[String]>, partitions: Vec<(DataType, FieldGetter)>, } #[allow(dead_code)] impl PartitionGetter { - pub fn new(row_type: &RowType, partition_keys: Vec) -> Result { + pub fn new(row_type: &RowType, partition_keys: Arc<[String]>) -> Result { let mut partitions = Vec::with_capacity(partition_keys.len()); - for partition_key in &partition_keys { + for partition_key in partition_keys.iter() { if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) { let data_type = row_type .fields() @@ -79,7 +80,7 @@ impl PartitionGetter { partition_values.push(partition::convert_value_of_type(&value, data_type)?); } - ResolvedPartitionSpec::new(self.partition_keys.clone(), partition_values) + ResolvedPartitionSpec::new(Arc::clone(&self.partition_keys), partition_values) } } @@ -100,8 +101,8 @@ mod tests { ), ]); - let getter = - PartitionGetter::new(&row_type, vec!["region".to_string()]).expect("should succeed"); + let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()])) + .expect("should succeed"); let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::from("US")]); let partition_name = getter.get_partition(&row).expect("should succeed"); @@ -124,9 +125,11 @@ mod tests { ), ]); - let getter = - PartitionGetter::new(&row_type, vec!["date".to_string(), "region".to_string()]) - .expect("should succeed"); + let getter = PartitionGetter::new( + &row_type, + Arc::from(["date".to_string(), "region".to_string()]), + ) + .expect("should succeed"); let row = GenericRow::from_data(vec![ Datum::Int32(42), @@ -145,7 +148,7 @@ mod tests { None, )]); - let result = PartitionGetter::new(&row_type, vec!["nonexistent".to_string()]); + let result = PartitionGetter::new(&row_type, Arc::from(["nonexistent".to_string()])); assert!(result.is_err()); } @@ -160,8 +163,8 @@ mod tests { ), ]); - let getter = - PartitionGetter::new(&row_type, vec!["region".to_string()]).expect("should succeed"); + let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()])) + .expect("should succeed"); let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]); let result = getter.get_partition(&row); @@ -184,9 +187,11 @@ mod tests { ), ]); - let getter = - PartitionGetter::new(&row_type, vec!["date".to_string(), "region".to_string()]) - .expect("should succeed"); + let getter = PartitionGetter::new( + &row_type, + Arc::from(["date".to_string(), "region".to_string()]), + ) + .expect("should succeed"); let row = GenericRow::from_data(vec![ Datum::Int32(42), diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index b33a9442..59133cc2 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -20,6 +20,7 @@ use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; use crate::{PartitionId, TableId}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::sync::Arc; /// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and /// they need to be re-arranged to the correct order by comparing with a list of strictly ordered @@ -72,14 +73,14 @@ impl Display for PartitionSpec { /// partition keys. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ResolvedPartitionSpec { - partition_keys: Vec, + partition_keys: Arc<[String]>, partition_values: Vec, } pub const PARTITION_SPEC_SEPARATOR: &str = "$"; impl ResolvedPartitionSpec { - pub fn new(partition_keys: Vec, partition_values: Vec) -> Result { + pub fn new(partition_keys: Arc<[String]>, partition_values: Vec) -> Result { if partition_keys.len() != partition_values.len() { return Err(Error::IllegalArgument { message: "The number of partition keys and partition values should be the same." @@ -94,18 +95,18 @@ impl ResolvedPartitionSpec { } pub fn from_partition_spec( - partition_keys: Vec, + partition_keys: Arc<[String]>, partition_spec: &PartitionSpec, ) -> Self { let partition_values = - Self::get_reordered_partition_values(partition_keys.as_slice(), partition_spec); + Self::get_reordered_partition_values(&partition_keys, partition_spec); Self { partition_keys, partition_values, } } - pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { + pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name: &str) -> Self { let partition_values: Vec = partition_name .split(PARTITION_SPEC_SEPARATOR) .map(|s| s.to_string()) @@ -135,7 +136,7 @@ impl ResolvedPartitionSpec { } Ok(Self { - partition_keys: keys, + partition_keys: Arc::from(keys), partition_values: values, }) } @@ -240,7 +241,7 @@ impl ResolvedPartitionSpec { } fn get_reordered_partition_values( - partition_keys: &[String], + partition_keys: &Arc<[String]>, partition_spec: &PartitionSpec, ) -> Vec { let partition_spec_map = partition_spec.get_spec_map(); @@ -358,7 +359,7 @@ mod tests { #[test] fn test_resolved_partition_spec_name() { let spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); @@ -373,7 +374,7 @@ mod tests { #[test] fn test_resolved_partition_spec_from_partition_name() { let spec = ResolvedPartitionSpec::from_partition_name( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), "2024-01-15$US", ); @@ -393,7 +394,7 @@ mod tests { #[test] fn test_resolved_partition_spec_mismatched_lengths() { let result = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string()], ); @@ -402,9 +403,11 @@ mod tests { #[test] fn test_partition_info() { - let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); let info = PartitionInfo::new(42, spec); assert_eq!(info.get_partition_id(), 42); @@ -435,9 +438,11 @@ mod tests { #[test] fn test_partition_info_pb_roundtrip() { - let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); let info = PartitionInfo::new(42, spec); let pb = info.to_pb(); @@ -450,14 +455,16 @@ mod tests { #[test] fn test_contains() { let full_spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); - let partial_spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let partial_spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); assert!(full_spec.contains(&partial_spec).unwrap()); } diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index 051bb3ee..036cac46 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -21,12 +21,12 @@ use crate::error::Result; use crate::metadata::DataType; use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use jiff::ToSpan; +use std::fmt::Write; fn hex_string(bytes: &[u8]) -> String { let mut hex = String::with_capacity(bytes.len() * 2); for &b in bytes { - let h = format!("{:02x}", b); - hex.push_str(&h); + write!(hex, "{:02x}", b).unwrap(); } hex }