Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub fn empty_table_info() -> ffi::FfiTableInfo {
pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> {
use fcore::row::Datum;

let mut generic_row = fcore::row::GenericRow::new();
let mut generic_row = fcore::row::GenericRow::new(row.fields.len());

for (idx, field) in row.fields.iter().enumerate() {
let datum = match field.datum_type {
Expand Down
20 changes: 9 additions & 11 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ impl AppendWriter {
// Get the expected Arrow schema from the Fluss table
let row_type = self.table_info.get_row_type();
let expected_schema = fcore::record::to_arrow_schema(row_type)
.map_err(|e| FlussError::new_err(format!("Failed to get table schema: {}", e)))?;
.map_err(|e| FlussError::new_err(format!("Failed to get table schema: {e}")))?;

// Convert Arrow schema to PyArrow schema
let py_schema = expected_schema
.as_ref()
.to_pyarrow(py)
.map_err(|e| FlussError::new_err(format!("Failed to convert schema: {}", e)))?;
.map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?;

// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
Expand Down Expand Up @@ -570,13 +570,12 @@ fn python_decimal_to_datum(

let decimal_str: String = value.str()?.extract()?;
let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
FlussError::new_err(format!("Failed to parse decimal '{}': {}", decimal_str, e))
FlussError::new_err(format!("Failed to parse decimal '{decimal_str}': {e}"))
})?;

let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale).map_err(|e| {
FlussError::new_err(format!(
"Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
decimal_str, precision, scale, e
"Failed to convert decimal '{decimal_str}' to DECIMAL({precision}, {scale}): {e}"
))
})?;

Expand Down Expand Up @@ -641,10 +640,9 @@ fn python_time_to_datum(value: &Bound<PyAny>) -> PyResult<fcore::row::Datum<'sta
if microsecond % MICROS_PER_MILLI as i32 != 0 {
return Err(FlussError::new_err(format!(
"TIME values with sub-millisecond precision are not supported. \
Got time with {} microseconds (not divisible by 1000). \
Got time with {microsecond} microseconds (not divisible by 1000). \
Fluss stores TIME as milliseconds since midnight. \
Please round to milliseconds before insertion.",
microsecond
Please round to milliseconds before insertion."
)));
}

Expand All @@ -663,7 +661,7 @@ fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) -> PyResult<fcore::row
let (epoch_millis, nano_of_milli) = extract_datetime_components_ntz(value)?;

let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, nano_of_milli)
.map_err(|e| FlussError::new_err(format!("Failed to create TimestampNtz: {}", e)))?;
.map_err(|e| FlussError::new_err(format!("Failed to create TimestampNtz: {e}")))?;

Ok(fcore::row::Datum::TimestampNtz(ts))
}
Expand All @@ -675,7 +673,7 @@ fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) -> PyResult<fcore::row
let (epoch_millis, nano_of_milli) = extract_datetime_components_ltz(value)?;

let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, nano_of_milli)
.map_err(|e| FlussError::new_err(format!("Failed to create TimestampLtz: {}", e)))?;
.map_err(|e| FlussError::new_err(format!("Failed to create TimestampLtz: {e}")))?;

Ok(fcore::row::Datum::TimestampLtz(ts))
}
Expand Down Expand Up @@ -803,7 +801,7 @@ fn datetime_to_epoch_millis_as_utc(

let timestamp = jiff::tz::Offset::UTC
.to_timestamp(civil_dt)
.map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
.map_err(|e| FlussError::new_err(format!("Invalid datetime: {e}")))?;

let millis = timestamp.as_millisecond();
let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as i32) as i32;
Expand Down
13 changes: 6 additions & 7 deletions crates/examples/src/example_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn main() -> Result<()> {

println!("\n=== Upserting ===");
for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 35)] {
let mut row = GenericRow::new();
let mut row = GenericRow::new(3);
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
Expand All @@ -80,7 +80,7 @@ pub async fn main() -> Result<()> {
}

println!("\n=== Updating ===");
let mut row = GenericRow::new();
let mut row = GenericRow::new(3);
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 33i64);
Expand All @@ -96,12 +96,11 @@ pub async fn main() -> Result<()> {
);

println!("\n=== Deleting ===");
let mut row = GenericRow::new();
// For delete, only primary key field needs to be set; other fields can remain null
let mut row = GenericRow::new(3);
row.set_field(0, 2);
row.set_field(1, "");
row.set_field(2, 0i64);
upsert_writer.delete(&row).await?;
println!("Deleted: {row:?}");
println!("Deleted row with id=2");

let result = lookuper.lookup(&make_key(2)).await?;
if result.get_single_row()?.is_none() {
Expand All @@ -112,7 +111,7 @@ pub async fn main() -> Result<()> {
}

fn make_key(id: i32) -> GenericRow<'static> {
let mut row = GenericRow::new();
let mut row = GenericRow::new(1);
row.set_field(0, id);
row
}
4 changes: 2 additions & 2 deletions crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ pub async fn main() -> Result<()> {
print!("Get created table:\n {table_info}\n");

// write row
let mut row = GenericRow::new();
let mut row = GenericRow::new(3);
row.set_field(0, 22222);
row.set_field(1, "t2t");
row.set_field(2, 123_456_789_123i64);

let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer();
let f1 = append_writer.append(row);
row = GenericRow::new();
row = GenericRow::new(3);
row.set_field(0, 233333);
row.set_field(1, "tt44");
row.set_field(2, 987_654_321_987i64);
Expand Down
4 changes: 2 additions & 2 deletions crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ impl PendingFetch for RemotePendingFetch {
let pos = self.pos_in_log_segment as usize;
if pos >= file_size {
return Err(Error::UnexpectedError {
message: format!("Position {} exceeds file size {}", pos, file_size),
message: format!("Position {pos} exceeds file size {file_size}"),
source: None,
});
}
Expand Down Expand Up @@ -911,7 +911,7 @@ mod tests {
},
)?;

let mut row = GenericRow::new();
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
let record = WriteRecord::for_append(table_path, 1, row);
Expand Down
15 changes: 7 additions & 8 deletions crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ async fn spawn_download_task(
result_sender: request.result_sender,
}
}
Err(e) if request.result_sender.is_closed() => {
Err(_e) if request.result_sender.is_closed() => {
// Receiver dropped (cancelled) - release permit, don't re-queue
drop(permit);
DownloadResult::Cancelled
Expand All @@ -491,8 +491,7 @@ async fn spawn_download_task(
DownloadResult::FailedPermanently {
error: Error::UnexpectedError {
message: format!(
"Failed to download remote log segment after {} retries: {}",
retry_count, e
"Failed to download remote log segment after {retry_count} retries: {e}"
),
source: Some(Box::new(e)),
},
Expand Down Expand Up @@ -585,7 +584,7 @@ async fn coordinator_loop(
// Cancelled - permit already released, nothing to do
}
Err(e) => {
log::error!("Download task panicked: {:?}", e);
log::error!("Download task panicked: {e:?}");
// Permit already released via RAII
}
}
Expand Down Expand Up @@ -1001,7 +1000,7 @@ mod tests {

if should_fail {
Err(Error::UnexpectedError {
message: format!("Fake fetch failed for {}", segment_id),
message: format!("Fake fetch failed for {segment_id}"),
source: None,
})
} else {
Expand All @@ -1012,7 +1011,7 @@ mod tests {
.unwrap()
.as_nanos();
let file_path =
temp_dir.join(format!("fake_segment_{}_{}.log", segment_id, timestamp));
temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log"));
tokio::fs::write(&file_path, &fake_data).await?;

Ok(FetchResult {
Expand Down Expand Up @@ -1121,7 +1120,7 @@ mod tests {

// Request 4 segments with same priority (to isolate concurrency limiting from priority)
let segs: Vec<_> = (0..4)
.map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone()))
.map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone()))
.collect();

let _futures: Vec<_> = segs
Expand Down Expand Up @@ -1168,7 +1167,7 @@ mod tests {

// Request 4 downloads
let segs: Vec<_> = (0..4)
.map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone()))
.map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone()))
.collect();

let mut futures: Vec<_> = segs
Expand Down
6 changes: 2 additions & 4 deletions crates/fluss/src/client/table/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ impl UpsertWriterFactory {
None => {
return Err(IllegalArgument {
message: format!(
"The specified primary key {} is not in row type {}",
primary_key, row_type
"The specified primary key {primary_key} is not in row type {row_type}"
),
});
}
Expand All @@ -250,8 +249,7 @@ impl UpsertWriterFactory {
if target_column_set[index] {
return Err(IllegalArgument {
message: format!(
"Explicitly specifying values for the auto increment column {} is not allowed.",
auto_increment_col_name
"Explicitly specifying values for the auto increment column {auto_increment_col_name} is not allowed."
),
});
}
Expand Down
12 changes: 4 additions & 8 deletions crates/fluss/src/metadata/json_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl JsonSerde for DataType {
DataType::Decimal(
crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale)
.map_err(|e| Error::JsonSerdeError {
message: format!("Invalid DECIMAL parameters: {}", e),
message: format!("Invalid DECIMAL parameters: {e}"),
})?,
)
}
Expand All @@ -218,7 +218,7 @@ impl JsonSerde for DataType {
DataType::Time(
crate::metadata::datatype::TimeType::with_nullable(true, precision).map_err(
|e| Error::JsonSerdeError {
message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {}", e),
message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {e}"),
},
)?,
)
Expand All @@ -231,10 +231,7 @@ impl JsonSerde for DataType {
DataType::Timestamp(
crate::metadata::datatype::TimestampType::with_nullable(true, precision)
.map_err(|e| Error::JsonSerdeError {
message: format!(
"Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {}",
e
),
message: format!("Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {e}"),
})?,
)
}
Expand All @@ -247,8 +244,7 @@ impl JsonSerde for DataType {
crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision)
.map_err(|e| Error::JsonSerdeError {
message: format!(
"Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {}",
e
"Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {e}"
),
})?,
)
Expand Down
6 changes: 2 additions & 4 deletions crates/fluss/src/metadata/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ impl ResolvedPartitionSpec {
if parts.len() != 2 {
return Err(Error::IllegalArgument {
message: format!(
"Invalid partition name format. Expected key=value, got: {}",
pair
"Invalid partition name format. Expected key=value, got: {pair}"
),
});
}
Expand Down Expand Up @@ -199,8 +198,7 @@ impl ResolvedPartitionSpec {
None => {
return Err(Error::IllegalArgument {
message: format!(
"table does not contain partitionKey: {}",
other_partition_key
"table does not contain partitionKey: {other_partition_key}"
),
});
}
Expand Down
3 changes: 1 addition & 2 deletions crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ impl SchemaBuilder {
if !column_names.contains(auto_inc_col) {
return Err(IllegalArgument {
message: format!(
"Auto increment column '{}' is not found in the schema columns.",
auto_inc_col
"Auto increment column '{auto_inc_col}' is not found in the schema columns."
),
});
}
Expand Down
Loading
Loading