Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 163 additions & 25 deletions datafusion/common/benches/with_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
use ahash::RandomState;
use arrow::array::{
Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray,
NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, StringViewArray, make_array,
NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
StringViewArray, StructArray, make_array,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{ArrowDictionaryKeyType, Int32Type, Int64Type};
use arrow::datatypes::{
ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type,
};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use datafusion_common::hash_utils::with_hashes;
use rand::Rng;
Expand All @@ -37,6 +40,7 @@ const BATCH_SIZE: usize = 8192;
struct BenchData {
name: &'static str,
array: ArrayRef,
supports_nulls: bool,
}

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -47,50 +51,75 @@ fn criterion_benchmark(c: &mut Criterion) {
BenchData {
name: "int64",
array: primitive_array::<Int64Type>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "utf8",
array: pool.string_array::<i32>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "large_utf8",
array: pool.string_array::<i64>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "utf8_view",
array: pool.string_view_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "utf8_view (small)",
array: small_pool.string_view_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "dictionary_utf8_int32",
array: pool.dictionary_array::<Int32Type>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "struct_array",
array: create_struct_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "run_array_int32",
array: create_run_array::<Int32Type>(BATCH_SIZE),
supports_nulls: true,
},
];

for BenchData { name, array } in cases {
// with_hash has different code paths for single vs multiple arrays and nulls vs no nulls
let nullable_array = add_nulls(&array);
for BenchData {
name,
array,
supports_nulls,
} in cases
{
c.bench_function(&format!("{name}: single, no nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&array));
});
c.bench_function(&format!("{name}: single, nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&nullable_array));
});
c.bench_function(&format!("{name}: multiple, no nulls"), |b| {
let arrays = vec![array.clone(), array.clone(), array.clone()];
do_hash_test(b, &arrays);
});
c.bench_function(&format!("{name}: multiple, nulls"), |b| {
let arrays = vec![
nullable_array.clone(),
nullable_array.clone(),
nullable_array.clone(),
];
do_hash_test(b, &arrays);
});

// Union arrays can't have null bitmasks
if supports_nulls {
let nullable_array = add_nulls(&array);

c.bench_function(&format!("{name}: single, nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&nullable_array));
});
c.bench_function(&format!("{name}: multiple, nulls"), |b| {
let arrays = vec![
nullable_array.clone(),
nullable_array.clone(),
nullable_array.clone(),
];
do_hash_test(b, &arrays);
});
}
}
}

Expand Down Expand Up @@ -122,16 +151,51 @@ where
builder.finish().expect("should be nulls in buffer")
}

// Returns an new array that is the same as array, but with nulls
// Returns a new array that is the same as array, but with nulls
// Handles the special case of RunArray where nulls must be in the values array
fn add_nulls(array: &ArrayRef) -> ArrayRef {
let array_data = array
.clone()
.into_data()
.into_builder()
.nulls(Some(create_null_mask(array.len())))
.build()
.unwrap();
make_array(array_data)
use arrow::datatypes::DataType;

match array.data_type() {
DataType::RunEndEncoded(_, _) => {
// RunArray can't have top-level nulls, so apply nulls to the values array
let run_array = array
.as_any()
.downcast_ref::<RunArray<Int32Type>>()
.expect("Expected RunArray");

let run_ends_buffer = run_array.run_ends().inner().clone();
let run_ends_array = PrimitiveArray::<Int32Type>::new(run_ends_buffer, None);
let values = run_array.values().clone();

// Add nulls to the values array
let values_with_nulls = {
let array_data = values
.clone()
.into_data()
.into_builder()
.nulls(Some(create_null_mask(values.len())))
.build()
.unwrap();
make_array(array_data)
};

Arc::new(
RunArray::try_new(&run_ends_array, values_with_nulls.as_ref())
.expect("Failed to create RunArray with null values"),
)
}
_ => {
let array_data = array
.clone()
.into_data()
.into_builder()
.nulls(Some(create_null_mask(array.len())))
.build()
.unwrap();
make_array(array_data)
}
}
}

pub fn make_rng() -> StdRng {
Expand Down Expand Up @@ -205,5 +269,79 @@ where
Arc::new(array)
}

fn boolean_array(array_len: usize) -> ArrayRef {
let mut rng = make_rng();
Arc::new(
(0..array_len)
.map(|_| Some(rng.random::<bool>()))
.collect::<arrow::array::BooleanArray>(),
)
}

fn string_array(array_len: usize) -> ArrayRef {
let mut rng = make_rng();
let strings: Vec<String> = (0..array_len)
.map(|_| {
let len = rng.random_range(5..20);
let value: Vec<u8> =
rng.clone().sample_iter(&Alphanumeric).take(len).collect();
String::from_utf8(value).unwrap()
})
.collect();
Arc::new(StringArray::from(strings))
}

/// Create a StructArray with multiple columns
fn create_struct_array(array_len: usize) -> ArrayRef {
let bool_array = boolean_array(array_len);
let int32_array = primitive_array::<Int32Type>(array_len);
let int64_array = primitive_array::<Int64Type>(array_len);
let str_array = string_array(array_len);

let fields = Fields::from(vec![
Field::new("bool_col", DataType::Boolean, false),
Field::new("int32_col", DataType::Int32, false),
Field::new("int64_col", DataType::Int64, false),
Field::new("string_col", DataType::Utf8, false),
]);

Arc::new(StructArray::new(
fields,
vec![bool_array, int32_array, int64_array, str_array],
None,
))
}

/// Create a RunArray to test run array hashing.
fn create_run_array<T>(array_len: usize) -> ArrayRef
where
T: ArrowPrimitiveType,
StandardUniform: Distribution<T::Native>,
{
let mut rng = make_rng();

// Create runs of varying lengths
let mut run_ends = Vec::new();
let mut values = Vec::new();
let mut current_end = 0;

while current_end < array_len {
// Random run length between 1 and 50
let run_length = rng.random_range(1..=50).min(array_len - current_end);
current_end += run_length;
run_ends.push(current_end as i32);
values.push(Some(rng.random::<T::Native>()));
}

let run_ends_array = Arc::new(PrimitiveArray::<Int32Type>::from(run_ends));
let values_array: Arc<dyn Array> =
Arc::new(values.into_iter().collect::<PrimitiveArray<T>>());

Arc::new(
RunArray::try_new(&run_ends_array, values_array.as_ref())
.expect("Failed to create RunArray"),
)
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);