From ee6e40a7a8ec11907299a82c5fd2c00581314474 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 25 Jan 2026 17:03:40 +0800 Subject: [PATCH 1/2] chore: introduce new with capacity to align generic row with java side --- bindings/cpp/src/types.rs | 2 +- bindings/python/src/table.rs | 20 ++--- crates/examples/src/example_kv_table.rs | 13 ++- crates/examples/src/example_table.rs | 4 +- .../src/client/table/log_fetch_buffer.rs | 4 +- crates/fluss/src/client/table/remote_log.rs | 15 ++-- crates/fluss/src/client/table/upsert.rs | 6 +- crates/fluss/src/metadata/json_serde.rs | 12 +-- crates/fluss/src/metadata/partition.rs | 6 +- crates/fluss/src/metadata/table.rs | 3 +- crates/fluss/src/record/arrow.rs | 36 +++----- crates/fluss/src/row/column.rs | 5 +- .../src/row/compacted/compacted_row_reader.rs | 7 +- crates/fluss/src/row/datum.rs | 48 ++++------- crates/fluss/src/row/decimal.rs | 15 ++-- crates/fluss/src/row/mod.rs | 86 ++++++++++++++----- crates/fluss/tests/integration/kv_table.rs | 34 ++++---- .../tests/integration/table_remote_scan.rs | 2 +- 18 files changed, 152 insertions(+), 166 deletions(-) diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index fef73cea..726e3d12 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -209,7 +209,7 @@ pub fn empty_table_info() -> ffi::FfiTableInfo { pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { use fcore::row::Datum; - let mut generic_row = fcore::row::GenericRow::new(); + let mut generic_row = fcore::row::GenericRow::new(row.fields.len()); for (idx, field) in row.fields.iter().enumerate() { let datum = match field.datum_type { diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b56a29db..0ae71864 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -259,13 +259,13 @@ impl AppendWriter { // Get the expected Arrow schema from the Fluss table let row_type = self.table_info.get_row_type(); let expected_schema = fcore::record::to_arrow_schema(row_type) - .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {}", e)))?; + .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {e}")))?; // Convert Arrow schema to PyArrow schema let py_schema = expected_schema .as_ref() .to_pyarrow(py) - .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {}", e)))?; + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; // Import pyarrow module let pyarrow = py.import("pyarrow")?; @@ -570,13 +570,12 @@ fn python_decimal_to_datum( let decimal_str: String = value.str()?.extract()?; let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| { - FlussError::new_err(format!("Failed to parse decimal '{}': {}", decimal_str, e)) + FlussError::new_err(format!("Failed to parse decimal '{decimal_str}': {e}")) })?; let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale).map_err(|e| { FlussError::new_err(format!( - "Failed to convert decimal '{}' to DECIMAL({}, {}): {}", - decimal_str, precision, scale, e + "Failed to convert decimal '{decimal_str}' to DECIMAL({precision}, {scale}): {e}" )) })?; @@ -641,10 +640,9 @@ fn python_time_to_datum(value: &Bound) -> PyResult) -> PyResult) -> PyResult Result<()> { println!("\n=== Upserting ==="); for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 35)] { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, id); row.set_field(1, name); row.set_field(2, age); @@ -80,7 +80,7 @@ pub async fn main() -> Result<()> { } println!("\n=== Updating ==="); - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, 1); row.set_field(1, "Verso"); row.set_field(2, 33i64); @@ -96,12 +96,11 @@ pub async fn main() -> Result<()> { ); println!("\n=== Deleting ==="); - let mut row = GenericRow::new(); + // For delete, only primary key field needs to be set; other fields can remain null + let mut row = GenericRow::new(3); row.set_field(0, 2); - row.set_field(1, ""); - row.set_field(2, 0i64); upsert_writer.delete(&row).await?; - println!("Deleted: {row:?}"); + println!("Deleted row with id=2"); let result = lookuper.lookup(&make_key(2)).await?; if result.get_single_row()?.is_none() { @@ -112,7 +111,7 @@ pub async fn main() -> Result<()> { } fn make_key(id: i32) -> GenericRow<'static> { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(1); row.set_field(0, id); row } diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 7333056f..ca6b9428 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -56,7 +56,7 @@ pub async fn main() -> Result<()> { print!("Get created table:\n {table_info}\n"); // write row - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, 22222); row.set_field(1, "t2t"); row.set_field(2, 123_456_789_123i64); @@ -64,7 +64,7 @@ pub async fn main() -> Result<()> { let table = conn.get_table(&table_path).await?; let append_writer = table.new_append()?.create_writer(); let f1 = append_writer.append(row); - row = GenericRow::new(); + row = GenericRow::new(3); row.set_field(0, 233333); row.set_field(1, "tt44"); row.set_field(2, 987_654_321_987i64); diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index b529806f..7ece34b4 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -800,7 +800,7 @@ impl PendingFetch for RemotePendingFetch { let pos = self.pos_in_log_segment as usize; if pos >= file_size { return Err(Error::UnexpectedError { - message: format!("Position {} exceeds file size {}", pos, file_size), + message: format!("Position {pos} exceeds file size {file_size}"), source: None, }); } @@ -911,7 +911,7 @@ mod tests { }, )?; - let mut row = GenericRow::new(); + let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); let record = WriteRecord::for_append(table_path, 1, row); diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index c39056db..a2e19d49 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -469,7 +469,7 @@ async fn spawn_download_task( result_sender: request.result_sender, } } - Err(e) if request.result_sender.is_closed() => { + Err(_e) if request.result_sender.is_closed() => { // Receiver dropped (cancelled) - release permit, don't re-queue drop(permit); DownloadResult::Cancelled @@ -491,8 +491,7 @@ async fn spawn_download_task( DownloadResult::FailedPermanently { error: Error::UnexpectedError { message: format!( - "Failed to download remote log segment after {} retries: {}", - retry_count, e + "Failed to download remote log segment after {retry_count} retries: {e}" ), source: Some(Box::new(e)), }, @@ -585,7 +584,7 @@ async fn coordinator_loop( // Cancelled - permit already released, nothing to do } Err(e) => { - log::error!("Download task panicked: {:?}", e); + log::error!("Download task panicked: {e:?}"); // Permit already released via RAII } } @@ -1001,7 +1000,7 @@ mod tests { if should_fail { Err(Error::UnexpectedError { - message: format!("Fake fetch failed for {}", segment_id), + message: format!("Fake fetch failed for {segment_id}"), source: None, }) } else { @@ -1012,7 +1011,7 @@ mod tests { .unwrap() .as_nanos(); let file_path = - temp_dir.join(format!("fake_segment_{}_{}.log", segment_id, timestamp)); + temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log")); tokio::fs::write(&file_path, &fake_data).await?; Ok(FetchResult { @@ -1121,7 +1120,7 @@ mod tests { // Request 4 segments with same priority (to isolate concurrency limiting from priority) let segs: Vec<_> = (0..4) - .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone())) .collect(); let _futures: Vec<_> = segs @@ -1168,7 +1167,7 @@ mod tests { // Request 4 downloads let segs: Vec<_> = (0..4) - .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone())) .collect(); let mut futures: Vec<_> = segs diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index a3909e72..984592d0 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -232,8 +232,7 @@ impl UpsertWriterFactory { None => { return Err(IllegalArgument { message: format!( - "The specified primary key {} is not in row type {}", - primary_key, row_type + "The specified primary key {primary_key} is not in row type {row_type}" ), }); } @@ -250,8 +249,7 @@ impl UpsertWriterFactory { if target_column_set[index] { return Err(IllegalArgument { message: format!( - "Explicitly specifying values for the auto increment column {} is not allowed.", - auto_increment_col_name + "Explicitly specifying values for the auto increment column {auto_increment_col_name} is not allowed." ), }); } diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs index faa5583b..d0d56ef2 100644 --- a/crates/fluss/src/metadata/json_serde.rs +++ b/crates/fluss/src/metadata/json_serde.rs @@ -205,7 +205,7 @@ impl JsonSerde for DataType { DataType::Decimal( crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale) .map_err(|e| Error::JsonSerdeError { - message: format!("Invalid DECIMAL parameters: {}", e), + message: format!("Invalid DECIMAL parameters: {e}"), })?, ) } @@ -218,7 +218,7 @@ impl JsonSerde for DataType { DataType::Time( crate::metadata::datatype::TimeType::with_nullable(true, precision).map_err( |e| Error::JsonSerdeError { - message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {}", e), + message: format!("Invalid TIME_WITHOUT_TIME_ZONE precision: {e}"), }, )?, ) @@ -231,10 +231,7 @@ impl JsonSerde for DataType { DataType::Timestamp( crate::metadata::datatype::TimestampType::with_nullable(true, precision) .map_err(|e| Error::JsonSerdeError { - message: format!( - "Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {}", - e - ), + message: format!("Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision: {e}"), })?, ) } @@ -247,8 +244,7 @@ impl JsonSerde for DataType { crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision) .map_err(|e| Error::JsonSerdeError { message: format!( - "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {}", - e + "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision: {e}" ), })?, ) diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 1ecc0dcd..e40fbf9e 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -131,8 +131,7 @@ impl ResolvedPartitionSpec { if parts.len() != 2 { return Err(Error::IllegalArgument { message: format!( - "Invalid partition name format. Expected key=value, got: {}", - pair + "Invalid partition name format. Expected key=value, got: {pair}" ), }); } @@ -199,8 +198,7 @@ impl ResolvedPartitionSpec { None => { return Err(Error::IllegalArgument { message: format!( - "table does not contain partitionKey: {}", - other_partition_key + "table does not contain partitionKey: {other_partition_key}" ), }); } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index c4a91954..3b9da7d9 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -227,8 +227,7 @@ impl SchemaBuilder { if !column_names.contains(auto_inc_col) { return Err(IllegalArgument { message: format!( - "Auto increment column '{}' is not found in the schema columns.", - auto_inc_col + "Auto increment column '{auto_inc_col}' is not found in the schema columns." ), }); } diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 4bfdc71c..63df6de6 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -107,7 +107,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { // Check for negative size (corrupted data) if batch_size_bytes < 0 { return Err(Error::UnexpectedError { - message: format!("Invalid negative batch size: {}", batch_size_bytes), + message: format!("Invalid negative batch size: {batch_size_bytes}"), source: None, }); } @@ -120,8 +120,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { .checked_add(LOG_OVERHEAD) .ok_or_else(|| Error::UnexpectedError { message: format!( - "Batch size {} + LOG_OVERHEAD {} would overflow", - batch_size_u, LOG_OVERHEAD + "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD} would overflow" ), source: None, })?; @@ -130,8 +129,7 @@ fn validate_batch_size(batch_size_bytes: i32) -> Result { if total_size > MAX_BATCH_SIZE { return Err(Error::UnexpectedError { message: format!( - "Batch size {} exceeds maximum allowed size {}", - total_size, MAX_BATCH_SIZE + "Batch size {total_size} exceeds maximum allowed size {MAX_BATCH_SIZE}" ), source: None, }); @@ -259,8 +257,7 @@ impl RowAppendRecordBatchBuilder { .with_precision_and_scale(*precision, *scale) .map_err(|e| Error::IllegalArgument { message: format!( - "Invalid decimal precision {} or scale {}: {}", - precision, scale, e + "Invalid decimal precision {precision} or scale {scale}: {e}" ), })?; Ok(Box::new(builder)) @@ -273,8 +270,7 @@ impl RowAppendRecordBatchBuilder { } _ => Err(Error::IllegalArgument { message: format!( - "Time32 only supports Second and Millisecond units, got: {:?}", - unit + "Time32 only supports Second and Millisecond units, got: {unit:?}" ), }), }, @@ -285,8 +281,7 @@ impl RowAppendRecordBatchBuilder { arrow_schema::TimeUnit::Nanosecond => Ok(Box::new(Time64NanosecondBuilder::new())), _ => Err(Error::IllegalArgument { message: format!( - "Time64 only supports Microsecond and Nanosecond units, got: {:?}", - unit + "Time64 only supports Microsecond and Nanosecond units, got: {unit:?}" ), }), }, @@ -592,10 +587,7 @@ impl FileSource { // Validate base_offset to prevent underflow in total_size() if base_offset > file_size { return Err(Error::UnexpectedError { - message: format!( - "base_offset ({}) exceeds file_size ({})", - base_offset, file_size - ), + message: format!("base_offset ({base_offset}) exceeds file_size ({file_size})"), source: None, }); } @@ -1044,7 +1036,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { 7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond), invalid => { return Err(Error::IllegalArgument { - message: format!("Invalid precision {} for TimeType (must be 0-9)", invalid), + message: format!("Invalid precision {invalid} for TimeType (must be 0-9)"), }); } }, @@ -1055,10 +1047,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { 7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None), invalid => { return Err(Error::IllegalArgument { - message: format!( - "Invalid precision {} for TimestampType (must be 0-9)", - invalid - ), + message: format!("Invalid precision {invalid} for TimestampType (must be 0-9)"), }); } }, @@ -1070,8 +1059,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { invalid => { return Err(Error::IllegalArgument { message: format!( - "Invalid precision {} for TimestampLTzType (must be 0-9)", - invalid + "Invalid precision {invalid} for TimestampLTzType (must be 0-9)" ), }); } @@ -1939,13 +1927,13 @@ mod tests { }, )?; - let mut row = GenericRow::new(); + let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); let record = WriteRecord::for_append(table_path.clone(), 1, row); builder.append(&record)?; - let mut row2 = GenericRow::new(); + let mut row2 = GenericRow::new(2); row2.set_field(0, 2_i32); row2.set_field(1, "bob"); let record2 = WriteRecord::for_append(table_path, 2, row2); diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 615e0384..46c25b24 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -199,10 +199,7 @@ impl InternalRow for ColumnarRow { let field = schema.field(pos); let arrow_scale = match field.data_type() { DataType::Decimal128(_p, s) => *s as i64, - dt => panic!( - "Expected Decimal128 data type at column {}, found: {:?}", - pos, dt - ), + dt => panic!("Expected Decimal128 data type at column {pos}, found: {dt:?}"), }; let i128_val = array.value(self.row_id); diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 40470db1..00e53aa1 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -50,7 +50,7 @@ impl<'a> CompactedRowDeserializer<'a> { } pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(self.row_type.fields().len()); let mut cursor = reader.initial_position(); for (col_pos, data_field) in self.row_type.fields().iter().enumerate() { let dtype = &data_field.data_type; @@ -161,10 +161,7 @@ impl<'a> CompactedRowDeserializer<'a> { } } _ => { - panic!( - "Unsupported DataType in CompactedRowDeserializer: {:?}", - dtype - ); + panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); } }; cursor = next_cursor; diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 7b3850f8..b8083730 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -407,8 +407,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result { .checked_mul(MICROS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp milliseconds {} overflows when converting to microseconds", - millis + "Timestamp milliseconds {millis} overflows when converting to microseconds" ), })?; let nanos_micros = (nanos as i64) / MICROS_PER_MILLI; @@ -416,8 +415,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result { .checked_add(nanos_micros) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp overflow when adding microseconds: {} + {}", - millis_micros, nanos_micros + "Timestamp overflow when adding microseconds: {millis_micros} + {nanos_micros}" ), }) } @@ -429,16 +427,14 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result { .checked_mul(NANOS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp milliseconds {} overflows when converting to nanoseconds", - millis + "Timestamp milliseconds {millis} overflows when converting to nanoseconds" ), })?; millis_nanos .checked_add(nanos as i64) .ok_or_else(|| RowConvertError { message: format!( - "Timestamp overflow when adding nanoseconds: {} + {}", - millis_nanos, nanos + "Timestamp overflow when adding nanoseconds: {millis_nanos} + {nanos}" ), }) } @@ -504,10 +500,7 @@ impl Datum<'_> { arrow_schema::DataType::Decimal128(p, s) => (*p, *s), _ => { return Err(RowConvertError { - message: format!( - "Expected Decimal128 Arrow type, got: {:?}", - data_type - ), + message: format!("Expected Decimal128 Arrow type, got: {data_type:?}"), }); } }; @@ -515,7 +508,7 @@ impl Datum<'_> { // Validate scale is non-negative (Fluss doesn't support negative scales) if s < 0 { return Err(RowConvertError { - message: format!("Negative decimal scale {} is not supported", s), + message: format!("Negative decimal scale {s} is not supported"), }); } @@ -535,8 +528,7 @@ impl Datum<'_> { if actual_precision > target_precision as usize { return Err(RowConvertError { message: format!( - "Decimal precision overflow: value has {} digits but Arrow expects {} (value: {})", - actual_precision, target_precision, rescaled + "Decimal precision overflow: value has {actual_precision} digits but Arrow expects {target_precision} (value: {rescaled})" ), }); } @@ -546,7 +538,7 @@ impl Datum<'_> { Ok(v) => v, Err(_) => { return Err(RowConvertError { - message: format!("Decimal value exceeds i128 range: {}", rescaled), + message: format!("Decimal value exceeds i128 range: {rescaled}"), }); } }; @@ -575,8 +567,7 @@ impl Datum<'_> { if millis % MILLIS_PER_SECOND as i32 != 0 { return Err(RowConvertError { message: format!( - "Time value {} ms has sub-second precision but schema expects seconds only", - millis + "Time value {millis} ms has sub-second precision but schema expects seconds only" ), }); } @@ -602,8 +593,7 @@ impl Datum<'_> { .checked_mul(MICROS_PER_MILLI) .ok_or_else(|| RowConvertError { message: format!( - "Time value {} ms overflows when converting to microseconds", - millis + "Time value {millis} ms overflows when converting to microseconds" ), })?; b.append_value(micros); @@ -618,8 +608,7 @@ impl Datum<'_> { let nanos = (millis as i64).checked_mul(NANOS_PER_MILLI).ok_or_else( || RowConvertError { message: format!( - "Time value {} ms overflows when converting to nanoseconds", - millis + "Time value {millis} ms overflows when converting to nanoseconds" ), }, )?; @@ -630,8 +619,7 @@ impl Datum<'_> { _ => { return Err(RowConvertError { message: format!( - "Expected Time32/Time64 Arrow type, got: {:?}", - data_type + "Expected Time32/Time64 Arrow type, got: {data_type:?}" ), }); } @@ -808,8 +796,7 @@ impl TimestampNtz { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( - "nanoOfMillisecond must be in range [0, {}], got: {}", - MAX_NANO_OF_MILLISECOND, nano_of_millisecond + "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}" ), }); } @@ -856,8 +843,7 @@ impl TimestampLtz { if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) { return Err(crate::error::Error::IllegalArgument { message: format!( - "nanoOfMillisecond must be in range [0, {}], got: {}", - MAX_NANO_OF_MILLISECOND, nano_of_millisecond + "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}" ), }); } @@ -1030,10 +1016,8 @@ mod timestamp_tests { #[test] fn test_timestamp_nanos_out_of_range() { // Test that both TimestampNtz and TimestampLtz reject invalid nanos - let expected_msg = format!( - "nanoOfMillisecond must be in range [0, {}]", - MAX_NANO_OF_MILLISECOND - ); + let expected_msg = + format!("nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}]"); // Too large (1,000,000 is just beyond the valid range) let result_ntz = TimestampNtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND + 1); diff --git a/crates/fluss/src/row/decimal.rs b/crates/fluss/src/row/decimal.rs index b14bde50..fd21b829 100644 --- a/crates/fluss/src/row/decimal.rs +++ b/crates/fluss/src/row/decimal.rs @@ -129,16 +129,14 @@ impl Decimal { // Sanity check that scale matches debug_assert_eq!( exp, scale as i64, - "Scaled decimal exponent ({}) != expected scale ({})", - exp, scale + "Scaled decimal exponent ({exp}) != expected scale ({scale})" ); let actual_precision = Self::compute_precision(&unscaled); if actual_precision > precision as usize { return Err(Error::IllegalArgument { message: format!( - "Decimal precision overflow: value has {} digits but precision is {} (value: {})", - actual_precision, precision, scaled + "Decimal precision overflow: value has {actual_precision} digits but precision is {precision} (value: {scaled})" ), }); } @@ -147,8 +145,7 @@ impl Decimal { let long_val = if precision <= MAX_COMPACT_PRECISION { Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument { message: format!( - "Decimal mantissa exceeds i64 range for compact precision {}: unscaled={} (value={})", - precision, unscaled, scaled + "Decimal mantissa exceeds i64 range for compact precision {precision}: unscaled={unscaled} (value={scaled})" ), })?) } else { @@ -168,8 +165,7 @@ impl Decimal { if precision > MAX_COMPACT_PRECISION { return Err(Error::IllegalArgument { message: format!( - "Precision {} exceeds MAX_COMPACT_PRECISION ({})", - precision, MAX_COMPACT_PRECISION + "Precision {precision} exceeds MAX_COMPACT_PRECISION ({MAX_COMPACT_PRECISION})" ), }); } @@ -178,8 +174,7 @@ impl Decimal { if actual_precision > precision as usize { return Err(Error::IllegalArgument { message: format!( - "Decimal precision overflow: unscaled value has {} digits but precision is {}", - actual_precision, precision + "Decimal precision overflow: unscaled value has {actual_precision} digits but precision is {precision}" ), }); } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 81a42540..85d50b0d 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -159,6 +159,23 @@ impl<'a> InternalRow for GenericRow<'a> { self.values.get(_pos).unwrap().try_into().unwrap() } + fn get_float(&self, pos: usize) -> f32 { + self.values.get(pos).unwrap().try_into().unwrap() + } + + fn get_double(&self, pos: usize) -> f64 { + self.values.get(pos).unwrap().try_into().unwrap() + } + + fn get_char(&self, pos: usize, _length: usize) -> &str { + // don't check length, following java client + self.get_string(pos) + } + + fn get_string(&self, pos: usize) -> &str { + self.values.get(pos).unwrap().try_into().unwrap() + } + fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> Decimal { match self.values.get(pos).unwrap() { Datum::Decimal(d) => d.clone(), @@ -196,23 +213,6 @@ impl<'a> InternalRow for GenericRow<'a> { } } - fn get_float(&self, pos: usize) -> f32 { - self.values.get(pos).unwrap().try_into().unwrap() - } - - fn get_double(&self, pos: usize) -> f64 { - self.values.get(pos).unwrap().try_into().unwrap() - } - - fn get_char(&self, pos: usize, _length: usize) -> &str { - // don't check length, following java client - self.get_string(pos) - } - - fn get_string(&self, pos: usize) -> &str { - self.values.get(pos).unwrap().try_into().unwrap() - } - fn get_binary(&self, pos: usize, _length: usize) -> &[u8] { self.values.get(pos).unwrap().as_blob() } @@ -224,7 +224,7 @@ impl<'a> InternalRow for GenericRow<'a> { impl<'a> Default for GenericRow<'a> { fn default() -> Self { - Self::new() + Self::new(0) } } @@ -234,12 +234,33 @@ impl<'a> GenericRow<'a> { values: data.into_iter().map(Into::into).collect(), } } - pub fn new() -> GenericRow<'a> { - GenericRow { values: vec![] } + + /// Creates a GenericRow with the specified number of fields, all initialized to null. + /// + /// This is useful when you need to create a row with a specific field count + /// but only want to set some fields (e.g., for KV delete operations where + /// only primary key fields need to be set). + /// + /// # Example + /// ``` + /// use fluss::row::GenericRow; + /// + /// let mut row = GenericRow::new(3); + /// row.set_field(0, 42); // Only set the primary key + /// // Fields 1 and 2 remain null + /// ``` + pub fn new(field_count: usize) -> GenericRow<'a> { + GenericRow { + values: vec![Datum::Null; field_count], + } } + /// Sets the field at the given position to the specified value. + /// + /// # Panics + /// Panics if `pos` is out of bounds (>= field count). pub fn set_field(&mut self, pos: usize, value: impl Into>) { - self.values.insert(pos, value.into()); + self.values[pos] = value.into(); } } @@ -249,11 +270,32 @@ mod tests { #[test] fn is_null_at_checks_datum_nullity() { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(2); row.set_field(0, Datum::Null); row.set_field(1, 42_i32); assert!(row.is_null_at(0)); assert!(!row.is_null_at(1)); } + + #[test] + fn new_initializes_nulls() { + let row = GenericRow::new(3); + assert_eq!(row.get_field_count(), 3); + assert!(row.is_null_at(0)); + assert!(row.is_null_at(1)); + assert!(row.is_null_at(2)); + } + + #[test] + fn partial_row_for_delete() { + // Simulates delete scenario: only primary key (field 0) is set + let mut row = GenericRow::new(3); + row.set_field(0, 123_i32); + // Fields 1 and 2 remain null + assert_eq!(row.get_field_count(), 3); + assert_eq!(row.get_int(0), 123); + assert!(row.is_null_at(1)); + assert!(row.is_null_at(2)); + } } diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 3f46f9f6..a4f29617 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -53,10 +53,8 @@ mod kv_table_test { } fn make_key(id: i32) -> GenericRow<'static> { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, id); - row.set_field(1, ""); - row.set_field(2, 0i64); row } @@ -98,7 +96,7 @@ mod kv_table_test { // Upsert rows for (id, name, age) in &test_data { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, *id); row.set_field(1, *name); row.set_field(2, *age); @@ -132,7 +130,7 @@ mod kv_table_test { } // Update the record with new age - let mut updated_row = GenericRow::new(); + let mut updated_row = GenericRow::new(3); updated_row.set_field(0, 1); updated_row.set_field(1, "Verso"); updated_row.set_field(2, 33i64); @@ -162,10 +160,8 @@ mod kv_table_test { ); // Delete record with id=1 - let mut delete_row = GenericRow::new(); + let mut delete_row = GenericRow::new(3); delete_row.set_field(0, 1); - delete_row.set_field(1, ""); - delete_row.set_field(2, 0i64); upsert_writer .delete(&delete_row) .await @@ -262,7 +258,7 @@ mod kv_table_test { ]; for (region, user_id, score) in &test_data { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(3); row.set_field(0, *region); row.set_field(1, *user_id); row.set_field(2, *score); @@ -277,7 +273,7 @@ mod kv_table_test { .expect("Failed to create lookuper"); // Lookup (US, 1) - should return score 100 - let mut key = GenericRow::new(); + let mut key = GenericRow::new(3); key.set_field(0, "US"); key.set_field(1, 1); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -288,7 +284,7 @@ mod kv_table_test { assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100"); // Lookup (EU, 2) - should return score 250 - let mut key = GenericRow::new(); + let mut key = GenericRow::new(3); key.set_field(0, "EU"); key.set_field(1, 2); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -299,7 +295,7 @@ mod kv_table_test { assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250"); // Update (US, 1) score - let mut update_row = GenericRow::new(); + let mut update_row = GenericRow::new(3); update_row.set_field(0, "US"); update_row.set_field(1, 1); update_row.set_field(2, 500i64); @@ -309,7 +305,7 @@ mod kv_table_test { .expect("Failed to update"); // Verify update - let mut key = GenericRow::new(); + let mut key = GenericRow::new(3); key.set_field(0, "US"); key.set_field(1, 1); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -367,7 +363,7 @@ mod kv_table_test { .create_writer() .expect("Failed to create writer"); - let mut row = GenericRow::new(); + let mut row = GenericRow::new(4); row.set_field(0, 1); row.set_field(1, "Verso"); row.set_field(2, 32i64); @@ -407,7 +403,7 @@ mod kv_table_test { .expect("Failed to create UpsertWriter with partial write"); // Update only the score column - let mut partial_row = GenericRow::new(); + let mut partial_row = GenericRow::new(4); partial_row.set_field(0, 1); partial_row.set_field(1, Datum::Null); // not in partial update column partial_row.set_field(2, Datum::Null); // not in partial update column @@ -522,7 +518,7 @@ mod kv_table_test { let col_binary: &[u8] = b"fixed binary data!!!"; // Upsert a row with all datatypes - let mut row = GenericRow::new(); + let mut row = GenericRow::new(17); row.set_field(0, pk_int); row.set_field(1, col_boolean); row.set_field(2, col_tinyint); @@ -553,7 +549,7 @@ mod kv_table_test { .create_lookuper() .expect("Failed to create lookuper"); - let mut key = GenericRow::new(); + let mut key = GenericRow::new(17); key.set_field(0, pk_int); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -625,7 +621,7 @@ mod kv_table_test { // Test with null values for nullable columns let pk_int_2 = 2i32; - let mut row_with_nulls = GenericRow::new(); + let mut row_with_nulls = GenericRow::new(17); row_with_nulls.set_field(0, pk_int_2); row_with_nulls.set_field(1, Datum::Null); // col_boolean row_with_nulls.set_field(2, Datum::Null); // col_tinyint @@ -650,7 +646,7 @@ mod kv_table_test { .expect("Failed to upsert row with nulls"); // Lookup row with nulls - let mut key2 = GenericRow::new(); + let mut key2 = GenericRow::new(17); key2.set_field(0, pk_int_2); let result = lookuper.lookup(&key2).await.expect("Failed to lookup"); diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index e28a8362..c83da0f2 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -146,7 +146,7 @@ mod table_remote_scan_test { // append 20 rows, there must be some tiered to remote let record_count = 20; for i in 0..record_count { - let mut row = GenericRow::new(); + let mut row = GenericRow::new(2); row.set_field(0, i as i32); let v = format!("v{}", i); row.set_field(1, v.as_str()); From 6875a5436b8b7fdd2af29c3efa84d515d24428ac Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 25 Jan 2026 21:29:33 +0800 Subject: [PATCH 2/2] address comments --- crates/fluss/src/row/mod.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 85d50b0d..f7c8bec5 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -222,12 +222,6 @@ impl<'a> InternalRow for GenericRow<'a> { } } -impl<'a> Default for GenericRow<'a> { - fn default() -> Self { - Self::new(0) - } -} - impl<'a> GenericRow<'a> { pub fn from_data(data: Vec>>) -> GenericRow<'a> { GenericRow {