diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 4529d86..887c0a4 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -17,40 +17,191 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{DataType, RowType}; +use crate::metadata::{DataType, ResolvedPartitionSpec, RowType}; +use crate::row::InternalRow; use crate::row::field_getter::FieldGetter; +use crate::util::partition; +use std::sync::Arc; +/// A getter to get partition name from a row. #[allow(dead_code)] -pub struct PartitionGetter<'a> { - partitions: Vec<(&'a String, &'a DataType, FieldGetter)>, +pub struct PartitionGetter { + partition_keys: Arc<[String]>, + partitions: Vec<(DataType, FieldGetter)>, } #[allow(dead_code)] -impl<'a> PartitionGetter<'a> { - pub fn new(row_type: &'a RowType, partition_keys: &'a Vec) -> Result { +impl PartitionGetter { + pub fn new(row_type: &RowType, partition_keys: Arc<[String]>) -> Result { let mut partitions = Vec::with_capacity(partition_keys.len()); - for partition_key in partition_keys { + for partition_key in partition_keys.iter() { if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) { - let data_type = &row_type + let data_type = row_type .fields() .get(partition_col_index) .unwrap() - .data_type; - let field_getter = FieldGetter::create(data_type, partition_col_index); + .data_type + .clone(); + let field_getter = FieldGetter::create(&data_type, partition_col_index); - partitions.push((partition_key, data_type, field_getter)); + partitions.push((data_type, field_getter)); } else { return Err(IllegalArgument { message: format!( - "The partition column {partition_key} is not in the row {row_type}." + "The partition column {} is not in the row {}.", + partition_key, row_type ), }); }; } - Ok(Self { partitions }) + Ok(Self { + partition_keys, + partitions, + }) } - // TODO Implement get partition + pub fn get_partition(&self, row: &dyn InternalRow) -> Result { + self.get_partition_spec(row) + .map(|ps| ps.get_partition_name()) + } + + pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result { + let mut partition_values = Vec::with_capacity(self.partitions.len()); + + for (data_type, field_getter) in &self.partitions { + let value = field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Partition value shouldn't be null.".to_string(), + }); + } + partition_values.push(partition::convert_value_of_type(&value, data_type)?); + } + + ResolvedPartitionSpec::new(Arc::clone(&self.partition_keys), partition_values) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataField, IntType, StringType}; + use crate::row::{Datum, GenericRow}; + + #[test] + fn test_partition_getter_single_key() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()])) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::from("US")]); + let partition_name = getter.get_partition(&row).expect("should succeed"); + assert_eq!(partition_name, "US"); + } + + #[test] + fn test_partition_getter_multiple_keys() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "date".to_string(), + DataType::String(StringType::new()), + None, + ), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = PartitionGetter::new( + &row_type, + Arc::from(["date".to_string(), "region".to_string()]), + ) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![ + Datum::Int32(42), + Datum::from("2024-01-15"), + Datum::from("US"), + ]); + let partition_name = getter.get_partition(&row).expect("should succeed"); + assert_eq!(partition_name, "2024-01-15$US"); + } + + #[test] + fn test_partition_getter_invalid_column() { + let row_type = RowType::new(vec![DataField::new( + "id".to_string(), + DataType::Int(IntType::new()), + None, + )]); + + let result = PartitionGetter::new(&row_type, Arc::from(["nonexistent".to_string()])); + assert!(result.is_err()); + } + + #[test] + fn test_partition_getter_null_value() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()])) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]); + let result = getter.get_partition(&row); + assert!(result.is_err()); + } + + #[test] + fn test_get_partition_spec() { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataType::Int(IntType::new()), None), + DataField::new( + "date".to_string(), + DataType::String(StringType::new()), + None, + ), + DataField::new( + "region".to_string(), + DataType::String(StringType::new()), + None, + ), + ]); + + let getter = PartitionGetter::new( + &row_type, + Arc::from(["date".to_string(), "region".to_string()]), + ) + .expect("should succeed"); + + let row = GenericRow::from_data(vec![ + Datum::Int32(42), + Datum::from("2024-01-15"), + Datum::from("US"), + ]); + let spec = getter.get_partition_spec(&row).expect("should succeed"); + + assert_eq!(spec.get_partition_keys(), &["date", "region"]); + assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); + assert_eq!(spec.get_partition_name(), "2024-01-15$US"); + } } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index c39056d..52ac813 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -469,7 +469,7 @@ async fn spawn_download_task( result_sender: request.result_sender, } } - Err(e) if request.result_sender.is_closed() => { + Err(_e) if request.result_sender.is_closed() => { // Receiver dropped (cancelled) - release permit, don't re-queue drop(permit); DownloadResult::Cancelled diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index 368d8ab..68426d7 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -93,6 +93,12 @@ pub enum Error { )] IllegalArgument { message: String }, + #[snafu( + visibility(pub(crate)), + display("Fluss hitting invalid partition error {}.", message) + )] + InvalidPartition { message: String }, + #[snafu( visibility(pub(crate)), display("Fluss hitting IO not supported error {}.", message) diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 1ecc0dc..59133cc 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::PartitionId; use crate::error::{Error, Result}; use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use crate::{PartitionId, TableId}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::sync::Arc; /// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and /// they need to be re-arranged to the correct order by comparing with a list of strictly ordered @@ -72,20 +73,21 @@ impl Display for PartitionSpec { /// partition keys. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ResolvedPartitionSpec { - partition_keys: Vec, + partition_keys: Arc<[String]>, partition_values: Vec, } pub const PARTITION_SPEC_SEPARATOR: &str = "$"; impl ResolvedPartitionSpec { - pub fn new(partition_keys: Vec, partition_values: Vec) -> Result { + pub fn new(partition_keys: Arc<[String]>, partition_values: Vec) -> Result { if partition_keys.len() != partition_values.len() { return Err(Error::IllegalArgument { message: "The number of partition keys and partition values should be the same." .to_string(), }); } + Ok(Self { partition_keys, partition_values, @@ -93,7 +95,7 @@ impl ResolvedPartitionSpec { } pub fn from_partition_spec( - partition_keys: Vec, + partition_keys: Arc<[String]>, partition_spec: &PartitionSpec, ) -> Self { let partition_values = @@ -104,14 +106,7 @@ impl ResolvedPartitionSpec { } } - pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { - Self { - partition_keys: vec![partition_key], - partition_values: vec![partition_value], - } - } - - pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { + pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name: &str) -> Self { let partition_values: Vec = partition_name .split(PARTITION_SPEC_SEPARATOR) .map(|s| s.to_string()) @@ -141,7 +136,7 @@ impl ResolvedPartitionSpec { } Ok(Self { - partition_keys: keys, + partition_keys: Arc::from(keys), partition_values: values, }) } @@ -238,6 +233,7 @@ impl ResolvedPartitionSpec { .iter() .map(|kv| kv.value.clone()) .collect(); + Self { partition_keys, partition_values, @@ -245,7 +241,7 @@ impl ResolvedPartitionSpec { } fn get_reordered_partition_values( - partition_keys: &[String], + partition_keys: &Arc<[String]>, partition_spec: &PartitionSpec, ) -> Vec { let partition_spec_map = partition_spec.get_spec_map(); @@ -312,7 +308,7 @@ impl PartitionInfo { } impl Display for PartitionInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!( f, "Partition{{name='{}', id={}}}", @@ -325,12 +321,12 @@ impl Display for PartitionInfo { /// A class to identify a table partition, containing the table id and the partition id. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct TablePartition { - table_id: i64, + table_id: TableId, partition_id: PartitionId, } impl TablePartition { - pub fn new(table_id: i64, partition_id: PartitionId) -> Self { + pub fn new(table_id: TableId, partition_id: PartitionId) -> Self { Self { table_id, partition_id, @@ -363,7 +359,7 @@ mod tests { #[test] fn test_resolved_partition_spec_name() { let spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); @@ -378,7 +374,7 @@ mod tests { #[test] fn test_resolved_partition_spec_from_partition_name() { let spec = ResolvedPartitionSpec::from_partition_name( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), "2024-01-15$US", ); @@ -398,7 +394,7 @@ mod tests { #[test] fn test_resolved_partition_spec_mismatched_lengths() { let result = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string()], ); @@ -407,9 +403,11 @@ mod tests { #[test] fn test_partition_info() { - let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); let info = PartitionInfo::new(42, spec); assert_eq!(info.get_partition_id(), 42); @@ -440,9 +438,11 @@ mod tests { #[test] fn test_partition_info_pb_roundtrip() { - let spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); let info = PartitionInfo::new(42, spec); let pb = info.to_pb(); @@ -455,14 +455,16 @@ mod tests { #[test] fn test_contains() { let full_spec = ResolvedPartitionSpec::new( - vec!["date".to_string(), "region".to_string()], + Arc::from(["date".to_string(), "region".to_string()]), vec!["2024-01-15".to_string(), "US".to_string()], ) .unwrap(); - let partial_spec = - ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) - .unwrap(); + let partial_spec = ResolvedPartitionSpec::new( + Arc::from(["date".to_string()]), + vec!["2024-01-15".to_string()], + ) + .unwrap(); assert!(full_spec.contains(&partial_spec).unwrap()); } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index c4a9195..9b43166 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -679,6 +679,10 @@ impl Display for TablePath { } } +const MAX_NAME_LENGTH: usize = 200; + +const INTERNAL_NAME_PREFIX: &str = "__"; + impl TablePath { pub fn new(db: String, tbl: String) -> Self { TablePath { @@ -696,6 +700,52 @@ impl TablePath { pub fn table(&self) -> &str { &self.table } + + pub fn detect_invalid_name(identifier: &str) -> Option { + if identifier.is_empty() { + return Some("the empty string is not allowed".to_string()); + } + if identifier == "." { + return Some("'.' is not allowed".to_string()); + } + if identifier == ".." { + return Some("'..' is not allowed".to_string()); + } + if identifier.len() > MAX_NAME_LENGTH { + return Some(format!( + "the length of '{}' is longer than the max allowed length {}", + identifier, MAX_NAME_LENGTH + )); + } + if Self::contains_invalid_pattern(identifier) { + return Some(format!( + "'{}' contains one or more characters other than ASCII alphanumerics, '_' and '-'", + identifier + )); + } + None + } + + pub fn validate_prefix(identifier: &str) -> Option { + if identifier.starts_with(INTERNAL_NAME_PREFIX) { + return Some(format!( + "'{}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server", + INTERNAL_NAME_PREFIX + )); + } + None + } + + // Valid characters for Fluss table names are the ASCII alphanumerics, '_' and '-'. + fn contains_invalid_pattern(identifier: &str) -> bool { + for c in identifier.chars() { + let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-'; + if !valid_char { + return true; + } + } + false + } } /// A database name, table name and partition name combo. It's used to represent the physical path of @@ -1107,3 +1157,63 @@ impl LakeSnapshot { &self.table_buckets_offset } } + +/// Tests for [`TablePath`]. +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validate() { + // assert valid name + let path = TablePath::new("db_2-abc3".to_string(), "table-1_abc_2".to_string()); + assert!(TablePath::detect_invalid_name(path.database()).is_none()); + assert!(TablePath::detect_invalid_name(path.table()).is_none()); + assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2"); + + // assert invalid name prefix + assert!( + TablePath::validate_prefix("__table-1") + .unwrap() + .contains("'__' is not allowed as prefix") + ); + + // check max length + let long_name = "a".repeat(200); + assert!(TablePath::detect_invalid_name(&long_name).is_none()); + + // assert invalid names + assert_invalid_name("*abc", "'*abc' contains one or more characters other than"); + assert_invalid_name( + "table.abc", + "'table.abc' contains one or more characters other than", + ); + assert_invalid_name("", "the empty string is not allowed"); + assert_invalid_name(" ", "' ' contains one or more characters other than"); + assert_invalid_name(".", "'.' is not allowed"); + assert_invalid_name("..", "'..' is not allowed"); + let invalid_long_name = "a".repeat(201); + assert_invalid_name( + &invalid_long_name, + &format!( + "the length of '{}' is longer than the max allowed length {}", + invalid_long_name, MAX_NAME_LENGTH + ), + ); + } + + fn assert_invalid_name(name: &str, expected_message: &str) { + let result = TablePath::detect_invalid_name(name); + assert!( + result.is_some(), + "Expected '{}' to be invalid, but it was valid", + name + ); + assert!( + result.as_ref().unwrap().contains(expected_message), + "Expected message containing '{}', but got '{}'", + expected_message, + result.unwrap() + ); + } +} diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 7b3850f..b42cfec 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -801,10 +801,7 @@ impl TimestampNtz { } } - pub fn from_millis_nanos( - millisecond: i64, - nano_of_millisecond: i32, - ) -> crate::error::Result { + pub fn from_millis_nanos(millisecond: i64, nano_of_millisecond: i32) -> Result { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( @@ -849,10 +846,7 @@ impl TimestampLtz { } } - pub fn from_millis_nanos( - epoch_millisecond: i64, - nano_of_millisecond: i32, - ) -> crate::error::Result { + pub fn from_millis_nanos(epoch_millisecond: i64, nano_of_millisecond: i32) -> Result { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 3760487..b987fe2 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod murmur_hash; +pub mod partition; pub mod varint; use crate::TableId; diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs new file mode 100644 index 0000000..036cac4 --- /dev/null +++ b/crates/fluss/src/util/partition.rs @@ -0,0 +1,532 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Utils for partition. +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz}; +use jiff::ToSpan; +use std::fmt::Write; + +fn hex_string(bytes: &[u8]) -> String { + let mut hex = String::with_capacity(bytes.len() * 2); + for &b in bytes { + write!(hex, "{:02x}", b).unwrap(); + } + hex +} + +fn reformat_float(value: f32) -> String { + if value.is_nan() { + "NaN".to_string() + } else if value.is_infinite() { + if value > 0.0 { + "Inf".to_string() + } else { + "-Inf".to_string() + } + } else { + value.to_string().replace('.', "_") + } +} + +fn reformat_double(value: f64) -> String { + if value.is_nan() { + "NaN".to_string() + } else if value.is_infinite() { + if value > 0.0 { + "Inf".to_string() + } else { + "-Inf".to_string() + } + } else { + value.to_string().replace('.', "_") + } +} + +const UNIX_EPOCH_DATE: jiff::civil::Date = jiff::civil::date(1970, 1, 1); + +fn day_to_string(days: i32) -> String { + let date = UNIX_EPOCH_DATE + days.days(); + format!("{:04}-{:02}-{:02}", date.year(), date.month(), date.day()) +} + +fn date_to_string(date: Date) -> String { + day_to_string(date.get_inner()) +} + +const MILLIS_PER_SECOND: i64 = 1_000; +const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND; +const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE; + +fn milli_to_string(milli: i32) -> String { + let hour = milli.div_euclid(MILLIS_PER_HOUR as i32); + let min = milli + .rem_euclid(MILLIS_PER_HOUR as i32) + .div_euclid(MILLIS_PER_MINUTE as i32); + let sec = milli + .rem_euclid(MILLIS_PER_MINUTE as i32) + .div_euclid(MILLIS_PER_SECOND as i32); + let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32); + + format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms) +} + +fn time_to_string(time: Time) -> String { + milli_to_string(time.get_inner()) +} + +trait Timestamp { + fn get_milli(&self) -> i64; + fn get_nano_of_milli(&self) -> i32; +} + +impl Timestamp for TimestampNtz { + fn get_milli(&self) -> i64 { + self.get_millisecond() + } + + fn get_nano_of_milli(&self) -> i32 { + self.get_nano_of_millisecond() + } +} + +impl Timestamp for TimestampLtz { + fn get_milli(&self) -> i64 { + self.get_epoch_millisecond() + } + + fn get_nano_of_milli(&self) -> i32 { + self.get_nano_of_millisecond() + } +} + +/// This formats date time while adhering to java side behaviour +/// +fn timestamp_to_string(ts: T) -> String { + let millis = ts.get_milli(); + let nanos = ts.get_nano_of_milli(); + + let millis_of_second = millis.rem_euclid(MILLIS_PER_SECOND); + let total_secs = millis.div_euclid(MILLIS_PER_SECOND); + + let epoch = jiff::Timestamp::UNIX_EPOCH; + let ts_jiff = epoch + jiff::Span::new().seconds(total_secs); + let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime(); + + if nanos > 0 { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}{:06}", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + millis_of_second, + nanos + ) + } else if millis_of_second > 0 { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + millis_of_second + ) + } else { + format!( + "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_", + dt.year(), + dt.month(), + dt.day(), + dt.hour(), + dt.minute(), + dt.second(), + ) + } +} + +/// Converts a Datum value to its string representation for partition naming. +pub fn convert_value_of_type(value: &Datum, data_type: &DataType) -> Result { + match (value, data_type) { + (Datum::String(s), DataType::Char(_) | DataType::String(_)) => Ok(s.to_string()), + (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()), + (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => Ok(hex_string(bytes)), + (Datum::Int8(v), DataType::TinyInt(_)) => Ok(v.to_string()), + (Datum::Int16(v), DataType::SmallInt(_)) => Ok(v.to_string()), + (Datum::Int32(v), DataType::Int(_)) => Ok(v.to_string()), + (Datum::Int64(v), DataType::BigInt(_)) => Ok(v.to_string()), + (Datum::Date(d), DataType::Date(_)) => Ok(date_to_string(*d)), + (Datum::Time(t), DataType::Time(_)) => Ok(time_to_string(*t)), + (Datum::Float32(f), DataType::Float(_)) => Ok(reformat_float(f.into_inner())), + (Datum::Float64(f), DataType::Double(_)) => Ok(reformat_double(f.into_inner())), + (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => Ok(timestamp_to_string(*ts)), + (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => Ok(timestamp_to_string(*ts)), + _ => Err(IllegalArgument { + message: format!( + "Unsupported conversion to partition key from data type: {data_type:?}, value: {value:?}" + ), + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{ + BigIntType, BinaryType, BooleanType, BytesType, CharType, DateType, DoubleType, FloatType, + IntType, SmallIntType, StringType, TimeType, TimestampLTzType, TimestampType, TinyIntType, + }; + use crate::row::{Date, Time, TimestampLtz, TimestampNtz}; + use std::borrow::Cow; + + use crate::metadata::TablePath; + + #[test] + fn test_string() { + let datum = Datum::String(Cow::Borrowed("Fluss")); + + let to_string_result = convert_value_of_type(&datum, &DataType::String(StringType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "Fluss"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_char() { + let datum = Datum::String(Cow::Borrowed("F")); + + let to_string_result = convert_value_of_type(&datum, &DataType::Char(CharType::new(1))) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "F"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_boolean() { + let datum = Datum::Bool(true); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Boolean(BooleanType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "true"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_byte() { + let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); + + let to_string_result = convert_value_of_type(&datum, &DataType::Bytes(BytesType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1020304050ff"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_binary() { + let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 0xFF])); + + let to_string_result = convert_value_of_type(&datum, &DataType::Binary(BinaryType::new(6))) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1020304050ff"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_tiny_int() { + let datum = Datum::Int8(100); + + let to_string_result = + convert_value_of_type(&datum, &DataType::TinyInt(TinyIntType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "100"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_small_int() { + let datum = Datum::Int16(-32760); + + let to_string_result = + convert_value_of_type(&datum, &DataType::SmallInt(SmallIntType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "-32760"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_int() { + let datum = Datum::Int32(299000); + + let to_string_result = convert_value_of_type(&datum, &DataType::Int(IntType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "299000"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_big_int() { + let datum = Datum::Int64(1748662955428); + + let to_string_result = convert_value_of_type(&datum, &DataType::BigInt(BigIntType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1748662955428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_date() { + let datum = Datum::Date(Date::new(20235)); + + let to_string_result = convert_value_of_type(&datum, &DataType::Date(DateType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-27"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_time() { + let datum = Datum::Time(Time::new(5402199)); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Time(TimeType::new(3).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "01-30-02_199"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_float() { + let datum = Datum::Float32(5.73.into()); + + let to_string_result = convert_value_of_type(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "5_73"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + let datum = Datum::Float32(f32::NAN.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), + "NaN" + ); + + let datum = Datum::Float32(f32::INFINITY.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), + "Inf" + ); + + let datum = Datum::Float32(f32::NEG_INFINITY.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Float(FloatType::new())) + .expect("datum conversion to partition string failed"), + "-Inf" + ); + } + + #[test] + fn test_double() { + let datum = Datum::Float64(5.73737.into()); + + let to_string_result = convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "5_73737"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + let datum = Datum::Float64(f64::NAN.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), + "NaN" + ); + + let datum = Datum::Float64(f64::INFINITY.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), + "Inf" + ); + + let datum = Datum::Float64(f64::NEG_INFINITY.into()); + assert_eq!( + convert_value_of_type(&datum, &DataType::Double(DoubleType::new())) + .expect("datum conversion to partition string failed"), + "-Inf" + ); + } + + #[test] + fn test_timestamp_ntz() { + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955428, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero nanos of millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955428, 0).expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955000, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis and zero nanos + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(1748662955000, 0).expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Negative millis + let datum = Datum::TimestampNtz( + TimestampNtz::from_millis_nanos(-1748662955428, 99988) + .expect("TimestampNtz init failed"), + ); + + let to_string_result = + convert_value_of_type(&datum, &DataType::Timestamp(TimestampType::new(9).unwrap())) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } + + #[test] + fn test_timestamp_ltz() { + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955428, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_of_type( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero nanos of millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955428, 0).expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_of_type( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_428"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955000, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_of_type( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Zero millis and zero nanos + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(1748662955000, 0).expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_of_type( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "2025-05-31-03-42-35_"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + + // Negative millis + let datum = Datum::TimestampLtz( + TimestampLtz::from_millis_nanos(-1748662955428, 99988) + .expect("TimestampLtz init failed"), + ); + + let to_string_result = convert_value_of_type( + &datum, + &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()), + ) + .expect("datum conversion to partition string failed"); + assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988"); + let detect_invalid = TablePath::detect_invalid_name(&to_string_result); + assert!(detect_invalid.is_none()); + } +}