Skip to content
177 changes: 164 additions & 13 deletions crates/fluss/src/client/table/partition_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,191 @@

use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::{DataType, RowType};
use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
use crate::row::InternalRow;
use crate::row::field_getter::FieldGetter;
use crate::util::partition;
use std::sync::Arc;

/// A getter to get partition name from a row.
#[allow(dead_code)]
pub struct PartitionGetter<'a> {
partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
pub struct PartitionGetter {
partition_keys: Arc<[String]>,
partitions: Vec<(DataType, FieldGetter)>,
}

#[allow(dead_code)]
impl<'a> PartitionGetter<'a> {
pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) -> Result<Self> {
impl PartitionGetter {
pub fn new(row_type: &RowType, partition_keys: Arc<[String]>) -> Result<Self> {
let mut partitions = Vec::with_capacity(partition_keys.len());

for partition_key in partition_keys {
for partition_key in partition_keys.iter() {
if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) {
let data_type = &row_type
let data_type = row_type
.fields()
.get(partition_col_index)
.unwrap()
.data_type;
let field_getter = FieldGetter::create(data_type, partition_col_index);
.data_type
.clone();
let field_getter = FieldGetter::create(&data_type, partition_col_index);

partitions.push((partition_key, data_type, field_getter));
partitions.push((data_type, field_getter));
} else {
return Err(IllegalArgument {
message: format!(
"The partition column {partition_key} is not in the row {row_type}."
"The partition column {} is not in the row {}.",
partition_key, row_type
),
});
};
}

Ok(Self { partitions })
Ok(Self {
partition_keys,
partitions,
})
}

// TODO Implement get partition
pub fn get_partition(&self, row: &dyn InternalRow) -> Result<String> {
self.get_partition_spec(row)
.map(|ps| ps.get_partition_name())
}

pub fn get_partition_spec(&self, row: &dyn InternalRow) -> Result<ResolvedPartitionSpec> {
let mut partition_values = Vec::with_capacity(self.partitions.len());

for (data_type, field_getter) in &self.partitions {
let value = field_getter.get_field(row);
if value.is_null() {
return Err(IllegalArgument {
message: "Partition value shouldn't be null.".to_string(),
});
}
partition_values.push(partition::convert_value_of_type(&value, data_type)?);
}

ResolvedPartitionSpec::new(Arc::clone(&self.partition_keys), partition_values)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::{DataField, IntType, StringType};
use crate::row::{Datum, GenericRow};

#[test]
fn test_partition_getter_single_key() {
let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataType::Int(IntType::new()), None),
DataField::new(
"region".to_string(),
DataType::String(StringType::new()),
None,
),
]);

let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()]))
.expect("should succeed");

let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::from("US")]);
let partition_name = getter.get_partition(&row).expect("should succeed");
assert_eq!(partition_name, "US");
}

#[test]
fn test_partition_getter_multiple_keys() {
let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataType::Int(IntType::new()), None),
DataField::new(
"date".to_string(),
DataType::String(StringType::new()),
None,
),
DataField::new(
"region".to_string(),
DataType::String(StringType::new()),
None,
),
]);

let getter = PartitionGetter::new(
&row_type,
Arc::from(["date".to_string(), "region".to_string()]),
)
.expect("should succeed");

let row = GenericRow::from_data(vec![
Datum::Int32(42),
Datum::from("2024-01-15"),
Datum::from("US"),
]);
let partition_name = getter.get_partition(&row).expect("should succeed");
assert_eq!(partition_name, "2024-01-15$US");
}

#[test]
fn test_partition_getter_invalid_column() {
let row_type = RowType::new(vec![DataField::new(
"id".to_string(),
DataType::Int(IntType::new()),
None,
)]);

let result = PartitionGetter::new(&row_type, Arc::from(["nonexistent".to_string()]));
assert!(result.is_err());
}

#[test]
fn test_partition_getter_null_value() {
let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataType::Int(IntType::new()), None),
DataField::new(
"region".to_string(),
DataType::String(StringType::new()),
None,
),
]);

let getter = PartitionGetter::new(&row_type, Arc::from(["region".to_string()]))
.expect("should succeed");

let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]);
let result = getter.get_partition(&row);
assert!(result.is_err());
}

#[test]
fn test_get_partition_spec() {
let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataType::Int(IntType::new()), None),
DataField::new(
"date".to_string(),
DataType::String(StringType::new()),
None,
),
DataField::new(
"region".to_string(),
DataType::String(StringType::new()),
None,
),
]);

let getter = PartitionGetter::new(
&row_type,
Arc::from(["date".to_string(), "region".to_string()]),
)
.expect("should succeed");

let row = GenericRow::from_data(vec![
Datum::Int32(42),
Datum::from("2024-01-15"),
Datum::from("US"),
]);
let spec = getter.get_partition_spec(&row).expect("should succeed");

assert_eq!(spec.get_partition_keys(), &["date", "region"]);
assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
assert_eq!(spec.get_partition_name(), "2024-01-15$US");
}
}
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ async fn spawn_download_task(
result_sender: request.result_sender,
}
}
Err(e) if request.result_sender.is_closed() => {
Err(_e) if request.result_sender.is_closed() => {
// Receiver dropped (cancelled) - release permit, don't re-queue
drop(permit);
DownloadResult::Cancelled
Expand Down
6 changes: 6 additions & 0 deletions crates/fluss/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub enum Error {
)]
IllegalArgument { message: String },

#[snafu(
visibility(pub(crate)),
display("Fluss hitting invalid partition error {}.", message)
)]
InvalidPartition { message: String },

#[snafu(
visibility(pub(crate)),
display("Fluss hitting IO not supported error {}.", message)
Expand Down
Loading
Loading