Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/fluss/src/client/table/partition_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ impl PartitionGetter {
} else {
return Err(IllegalArgument {
message: format!(
"The partition column {} is not in the row {}.",
partition_key, row_type
"The partition column {partition_key} is not in the row {row_type}."
),
});
};
Expand Down
188 changes: 177 additions & 11 deletions crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,14 +712,12 @@ impl TablePath {
}
if identifier.len() > MAX_NAME_LENGTH {
return Some(format!(
"the length of '{}' is longer than the max allowed length {}",
identifier, MAX_NAME_LENGTH
"the length of '{identifier}' is longer than the max allowed length {MAX_NAME_LENGTH}"
));
}
if Self::contains_invalid_pattern(identifier) {
return Some(format!(
"'{}' contains one or more characters other than ASCII alphanumerics, '_' and '-'",
identifier
"'{identifier}' contains one or more characters other than ASCII alphanumerics, '_' and '-'"
));
}
None
Expand All @@ -728,8 +726,7 @@ impl TablePath {
pub fn validate_prefix(identifier: &str) -> Option<String> {
if identifier.starts_with(INTERNAL_NAME_PREFIX) {
return Some(format!(
"'{}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server",
INTERNAL_NAME_PREFIX
"'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server"
));
}
None
Expand Down Expand Up @@ -834,6 +831,75 @@ impl TableInfo {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AutoPartitionStrategy {
auto_partition_enabled: bool,
auto_partition_key: Option<String>,
auto_partition_time_unit: String,
auto_partition_num_precreate: i32,
auto_partition_num_retention: i32,
auto_partition_timezone: String,
}

impl AutoPartitionStrategy {
pub fn from(properties: &HashMap<String, String>) -> Self {
Self {
auto_partition_enabled: properties
.get("table.auto-partition.enabled")
.and_then(|s| s.parse().ok())
.unwrap_or(false),
auto_partition_key: properties
.get("table.auto-partition.key")
.map(|s| s.to_string()),
auto_partition_time_unit: properties
.get("table.auto-partition.time-unit")
.map(|s| s.to_string())
.unwrap_or_else(|| "DAY".to_string()),
auto_partition_num_precreate: properties
.get("table.auto-partition.num-precreate")
.and_then(|s| s.parse().ok())
.unwrap_or(2),
auto_partition_num_retention: properties
.get("table.auto-partition.num-retention")
.and_then(|s| s.parse().ok())
.unwrap_or(7),
auto_partition_timezone: properties
.get("table.auto-partition.time-zone")
.map(|s| s.to_string())
.unwrap_or_else(|| {
jiff::tz::TimeZone::system()
.iana_name()
.unwrap_or("UTC")
.to_string()
}),
}
}

pub fn is_auto_partition_enabled(&self) -> bool {
self.auto_partition_enabled
}

pub fn key(&self) -> Option<&str> {
self.auto_partition_key.as_deref()
}

pub fn time_unit(&self) -> &str {
&self.auto_partition_time_unit
}

pub fn num_precreate(&self) -> i32 {
self.auto_partition_num_precreate
}

pub fn num_retention(&self) -> i32 {
self.auto_partition_num_retention
}

pub fn timezone(&self) -> &str {
&self.auto_partition_timezone
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableConfig {
pub properties: HashMap<String, String>,
Expand Down Expand Up @@ -866,6 +932,10 @@ impl TableConfig {
.unwrap_or(DEFAULT_KV_FORMAT);
kv_format.parse().map_err(Into::into)
}

pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
AutoPartitionStrategy::from(&self.properties)
}
}

impl TableInfo {
Expand Down Expand Up @@ -1003,7 +1073,11 @@ impl TableInfo {
}

pub fn is_auto_partitioned(&self) -> bool {
self.is_partitioned() && todo!()
self.is_partitioned()
&& self
.table_config
.get_auto_partition_strategy()
.is_auto_partition_enabled()
}

pub fn get_partition_keys(&self) -> &[String] {
Expand Down Expand Up @@ -1161,6 +1235,7 @@ impl LakeSnapshot {
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::DataTypes;

#[test]
fn test_validate() {
Expand Down Expand Up @@ -1195,8 +1270,7 @@ mod tests {
assert_invalid_name(
&invalid_long_name,
&format!(
"the length of '{}' is longer than the max allowed length {}",
invalid_long_name, MAX_NAME_LENGTH
"the length of '{invalid_long_name}' is longer than the max allowed length {MAX_NAME_LENGTH}"
),
);
}
Expand All @@ -1205,8 +1279,7 @@ mod tests {
let result = TablePath::detect_invalid_name(name);
assert!(
result.is_some(),
"Expected '{}' to be invalid, but it was valid",
name
"Expected '{name}' to be invalid, but it was valid"
);
assert!(
result.as_ref().unwrap().contains(expected_message),
Expand All @@ -1215,4 +1288,97 @@ mod tests {
result.unwrap()
);
}

#[test]
fn test_is_auto_partitioned() {
let schema = Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.primary_key(vec!["id".to_string()])
.build()
.unwrap();

let table_path = TablePath::new("db".to_string(), "tbl".to_string());

// 1. Not partitioned, auto partition disabled
let mut properties = HashMap::new();
let table_info = TableInfo::new(
table_path.clone(),
1,
1,
schema.clone(),
vec!["id".to_string()],
vec![], // No partition keys
1,
properties.clone(),
HashMap::new(),
None,
0,
0,
);
assert!(!table_info.is_auto_partitioned());

// 2. Not partitioned, auto partition enabled
properties.insert(
"table.auto-partition.enabled".to_string(),
"true".to_string(),
);
let table_info = TableInfo::new(
table_path.clone(),
1,
1,
schema.clone(),
vec!["id".to_string()],
vec![], // No partition keys
1,
properties.clone(),
HashMap::new(),
None,
0,
0,
);
assert!(!table_info.is_auto_partitioned());

// 3. Partitioned, auto partition disabled
properties.insert(
"table.auto-partition.enabled".to_string(),
"false".to_string(),
);
let table_info = TableInfo::new(
table_path.clone(),
1,
1,
schema.clone(),
vec!["id".to_string()],
vec!["name".to_string()], // Partition keys
1,
properties.clone(),
HashMap::new(),
None,
0,
0,
);
assert!(!table_info.is_auto_partitioned());

// 4. Partitioned, auto partition enabled
properties.insert(
"table.auto-partition.enabled".to_string(),
"true".to_string(),
);
let table_info = TableInfo::new(
table_path.clone(),
1,
1,
schema.clone(),
vec!["id".to_string()],
vec!["name".to_string()], // Partition keys
1,
properties.clone(),
HashMap::new(),
None,
0,
0,
);
assert!(table_info.is_auto_partitioned());
}
}
4 changes: 2 additions & 2 deletions crates/fluss/src/util/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::fmt::Write;
fn hex_string(bytes: &[u8]) -> String {
let mut hex = String::with_capacity(bytes.len() * 2);
for &b in bytes {
write!(hex, "{:02x}", b).unwrap();
write!(hex, "{b:02x}").unwrap();
}
hex
}
Expand Down Expand Up @@ -84,7 +84,7 @@ fn milli_to_string(milli: i32) -> String {
.div_euclid(MILLIS_PER_SECOND as i32);
let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32);

format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms)
format!("{hour:02}-{min:02}-{sec:02}_{ms:03}")
}

fn time_to_string(time: Time) -> String {
Expand Down
Loading