From 3cb77d317fec88af243ba26ad6cd415866c17294 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sun, 1 Feb 2026 12:10:47 +0100 Subject: [PATCH] Add optional hash join buffering --- benchmarks/src/imdb/run.rs | 8 ++ benchmarks/src/tpcds/run.rs | 6 + benchmarks/src/tpch/run.rs | 8 ++ datafusion/common/src/config.rs | 15 +++ .../src/hash_join_buffering.rs | 103 ++++++++++++++++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 5 + .../test_files/datetime/arith_date_time.slt | 1 - .../datetime/arith_timestamp_duration.slt | 2 +- .../sqllogictest/test_files/explain.slt | 4 + .../test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/limit.slt | 2 +- .../test_files/limit_single_row_batches.slt | 2 +- .../test_files/spark/collection/size.slt | 1 - .../test_files/spark/datetime/time_trunc.slt | 1 - .../test_files/spark/datetime/trunc.slt | 1 - datafusion/sqllogictest/test_files/struct.slt | 2 +- .../sqllogictest/test_files/truncate.slt | 2 +- docs/source/user-guide/configs.md | 1 + 19 files changed, 158 insertions(+), 9 deletions(-) create mode 100644 datafusion/physical-optimizer/src/hash_join_buffering.rs diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 9ddea67148efd..29ca5249aa5b3 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -92,6 +92,10 @@ pub struct RunOpt { /// True by default. #[arg(short = 'j', long = "prefer_hash_join", default_value = "true")] prefer_hash_join: BoolDefaultTrue, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } fn map_query_id_to_str(query_id: usize) -> &'static str { @@ -306,6 +310,8 @@ impl RunOpt { .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); @@ -527,6 +533,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(map_query_id_to_str(query))?; @@ -563,6 +570,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(map_query_id_to_str(query))?; diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs index 586ee754d2114..8e24b121b2f93 100644 --- a/benchmarks/src/tpcds/run.rs +++ b/benchmarks/src/tpcds/run.rs @@ -144,6 +144,10 @@ pub struct RunOpt { /// The tables should have been created with the `--sort` option for this to have any effect. #[arg(short = 't', long = "sorted")] sorted: bool, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } impl RunOpt { @@ -162,6 +166,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().optimizer.enable_piecewise_merge_join = self.enable_piecewise_merge_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); // register tables diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 9706296feae61..392e02f8478b7 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -105,6 +105,10 @@ pub struct RunOpt { /// The tables should have been created with the `--sort` option for this to have any effect. #[arg(short = 't', long = "sorted")] sorted: bool, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } impl RunOpt { @@ -123,6 +127,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().optimizer.enable_piecewise_merge_join = self.enable_piecewise_merge_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); // register tables @@ -392,6 +398,7 @@ mod tests { prefer_hash_join: true, enable_piecewise_merge_join: false, sorted: false, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; @@ -430,6 +437,7 @@ mod tests { prefer_hash_join: true, enable_piecewise_merge_join: false, sorted: false, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a18861aa3a695..9be10f7aec162 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -669,6 +669,21 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + /// How many bytes to buffer in the probe side of hash joins while the build side is + /// concurrently being built. + /// + /// Without this, hash joins will wait until the full materialization of the build side + /// before polling the probe side. This is useful in scenarios where the query is not + /// completely CPU bounded, allowing to do some early work concurrently and reducing the + /// latency of the query. + /// + /// Note that when hash join buffering is enabled, the probe side will start eagerly + /// polling data, not giving time for the producer side of dynamic filters to produce any + /// meaningful predicate. Queries with dynamic filters might see performance degradation. + /// + /// Disabled by default, set to a number greater than 0 for enabling it. + pub hash_join_buffering_capacity: usize, default = 0 } } diff --git a/datafusion/physical-optimizer/src/hash_join_buffering.rs b/datafusion/physical-optimizer/src/hash_join_buffering.rs new file mode 100644 index 0000000000000..3c29b46c0fa64 --- /dev/null +++ b/datafusion/physical-optimizer/src/hash_join_buffering.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PhysicalOptimizerRule; +use datafusion_common::JoinSide; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::buffer::BufferExec; +use datafusion_physical_plan::joins::HashJoinExec; +use std::sync::Arc; + +/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the +/// configured capacity in the probe side: +/// +/// ```text +/// ┌───────────────────┐ +/// │ HashJoinExec │ +/// └─────▲────────▲────┘ +/// ┌───────┘ └─────────┐ +/// │ │ +/// ┌────────────────┐ ┌─────────────────┐ +/// │ Build side │ + │ BufferExec │ +/// └────────────────┘ └────────▲────────┘ +/// │ +/// ┌────────┴────────┐ +/// │ Probe side │ +/// └─────────────────┘ +/// ``` +/// +/// Which allows eagerly pulling it even before the build side has completely finished. +#[derive(Debug, Default)] +pub struct HashJoinBuffering {} + +impl HashJoinBuffering { + pub fn new() -> Self { + Self::default() + } +} + +impl PhysicalOptimizerRule for HashJoinBuffering { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> datafusion_common::Result> { + let capacity = config.execution.hash_join_buffering_capacity; + if capacity == 0 { + return Ok(plan); + } + + plan.transform_down(|plan| { + let Some(node) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + let plan = Arc::clone(&plan); + Ok(Transformed::yes( + if HashJoinExec::probe_side() == JoinSide::Left { + // Do not stack BufferExec nodes together. + if node.left.as_any().downcast_ref::().is_some() { + return Ok(Transformed::no(plan)); + } + plan.with_new_children(vec![ + Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)), + Arc::clone(&node.right), + ])? + } else { + // Do not stack BufferExec nodes together. + if node.right.as_any().downcast_ref::().is_some() { + return Ok(Transformed::no(plan)); + } + plan.with_new_children(vec![ + Arc::clone(&node.left), + Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)), + ])? + }, + )) + }) + .data() + } + + fn name(&self) -> &str { + "HashJoinBuffering" + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 3a0d79ae2d234..98331a94e31c6 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -39,6 +39,7 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; +pub mod hash_join_buffering; pub mod pushdown_sort; pub mod sanity_checker; pub mod topk_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index ff71c9ec64385..2cd9c6dd3568b 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; +use crate::hash_join_buffering::HashJoinBuffering; use crate::limit_pushdown_past_window::LimitPushPastWindows; use crate::pushdown_sort::PushdownSort; use datafusion_common::Result; @@ -131,6 +132,10 @@ impl PhysicalOptimizer { // This can possibly be combined with [LimitPushdown] // It needs to come after [EnforceSorting] Arc::new(LimitPushPastWindows::new()), + // The HashJoinBuffering rule adds a BufferExec node with the configured capacity + // in the prob side of hash joins. That way, the probe side gets eagerly polled before + // the build side is completely finished. + Arc::new(HashJoinBuffering::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt b/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt index bc796a51ff5a4..8e85c8f90580e 100644 --- a/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt +++ b/datafusion/sqllogictest/test_files/datetime/arith_date_time.slt @@ -113,4 +113,3 @@ SELECT '2001-09-28'::date / '03:00'::time query error Invalid timestamp arithmetic operation SELECT '2001-09-28'::date % '03:00'::time - diff --git a/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt b/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt index 10381346f8359..aeeebe73db701 100644 --- a/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt +++ b/datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt @@ -144,4 +144,4 @@ query error Invalid timestamp arithmetic operation SELECT '2001-09-28T01:00:00'::timestamp % arrow_cast(12345, 'Duration(Second)'); query error Invalid timestamp arithmetic operation -SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)'); \ No newline at end of file +SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)'); diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 6f615ec391c9e..6fbdce0615f75 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -239,6 +239,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -319,6 +320,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -363,6 +365,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -600,6 +603,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..45a5c9c66e3af 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true datafusion.execution.enable_ansi_mode false datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false +datafusion.execution.hash_join_buffering_capacity 0 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true @@ -357,6 +358,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. +datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 429181a2d385b..ec8363f51acfa 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions; # Tear down src_table table: statement ok -DROP TABLE src_table; \ No newline at end of file +DROP TABLE src_table; diff --git a/datafusion/sqllogictest/test_files/limit_single_row_batches.slt b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt index fbdb0140e047a..9f626816e2146 100644 --- a/datafusion/sqllogictest/test_files/limit_single_row_batches.slt +++ b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt @@ -19,4 +19,4 @@ SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1); 1 statement ok -DROP TABLE filter_limit; \ No newline at end of file +DROP TABLE filter_limit; diff --git a/datafusion/sqllogictest/test_files/spark/collection/size.slt b/datafusion/sqllogictest/test_files/spark/collection/size.slt index dabcfd069bce8..106760eebfe42 100644 --- a/datafusion/sqllogictest/test_files/spark/collection/size.slt +++ b/datafusion/sqllogictest/test_files/spark/collection/size.slt @@ -129,4 +129,3 @@ SELECT size(column1) FROM VALUES (map(['a'], [1])), (map(['a','b'], [1,2])), (NU 1 2 -1 - diff --git a/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt index f00c40f0a9371..35ffa483bb068 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt @@ -71,4 +71,3 @@ NULL # incorrect format query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond SELECT time_trunc('test', '09:32:05.123456'::time); - diff --git a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt index f6bf6b5751ed2..aa26d7bd0ef06 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt @@ -90,4 +90,3 @@ SELECT trunc('2009-02-12'::date, NULL::string); # incorrect format query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. SELECT trunc('2009-02-12'::date, 'test'::string); - diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index e20815a58c765..53a1bb4ec6751 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -1666,4 +1666,4 @@ order by id; 3 2 150 statement ok -drop table t_agg_window; \ No newline at end of file +drop table t_agg_window; diff --git a/datafusion/sqllogictest/test_files/truncate.slt b/datafusion/sqllogictest/test_files/truncate.slt index 5a5d47760d1f9..ad3ccbb1a7cf4 100644 --- a/datafusion/sqllogictest/test_files/truncate.slt +++ b/datafusion/sqllogictest/test_files/truncate.slt @@ -82,4 +82,4 @@ logical_plan physical_plan_error 01)TRUNCATE operation on table 't1' 02)caused by -03)This feature is not implemented: TRUNCATE not supported for Base table \ No newline at end of file +03)This feature is not implemented: TRUNCATE not supported for Base table diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..31b85b09a4833 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -133,6 +133,7 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |