From 8e9bd63222f171cb80b1c213ba9eb44c46fbe3ea Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 10 Feb 2026 21:12:53 +0800 Subject: [PATCH 1/2] add doc for plan limit absorption --- .../physical-optimizer/src/limit_pushdown.rs | 26 +++++++++++++++++++ .../physical-plan/src/execution_plan.rs | 4 +++ 2 files changed, 30 insertions(+) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index e7bede494da99..eefd9054a86e9 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -17,6 +17,32 @@ //! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce //! data transfer as much as possible. +//! +//! # Plan Limit Absorption +//! In addition to pushing down [`LimitExec`] in the plan, some operators can +//! "absorb" a limit and stop early during execution. This optimizer rule also includes +//! limit absorption transformation through [`ExecutionPlan::with_fetch`]. +//! +//! ## Background: vectorized volcano execution model +//! DataFusion uses a batched volcano model. For most operators, output is +//! produced in batches of `datafusion.execution.batch_size` (default 8192), so +//! the batch sizes typically look like: +//! ```text +//! 8192, 8192, ..., 8192, 100 (the final batch may be partial) +//! ``` +//! +//! ## Example +//! For a join with an expensive, selective predicate: +//! ```text +//! LimitExec(fetch=10) +//! -- NestedLoopJoinExec(on=expr_expensive_and_selective) +//! --- DataSourceExec() +//! --- DataSourceExec() +//! ``` +//! Under this model, `NestedLoopJoinExec` would keep working until it can emit +//! a full batch (8192 rows), even though the query only needs 10. If the limit +//! cannot be pushed below the join, we can still embed it inside the join so it +//! stops once the limit is satisfied. use std::fmt::Debug; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 43cce0e5ea421..26c0314052a86 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -513,6 +513,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns a fetching variant of this `ExecutionPlan` node, if it supports /// fetch limits. Returns `None` otherwise. + /// + /// See physical optimizer rule [`limit_pushdown`] for details. + /// + /// [`limit_pushdown`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/limit_pushdown/index.html fn with_fetch(&self, _limit: Option) -> Option> { None } From 98a5aebd94cbe0f038a821e28734483f4c670302 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 11 Feb 2026 12:46:20 +0800 Subject: [PATCH 2/2] review --- .../physical-optimizer/src/limit_pushdown.rs | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index eefd9054a86e9..b556037699404 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -20,8 +20,7 @@ //! //! # Plan Limit Absorption //! In addition to pushing down [`LimitExec`] in the plan, some operators can -//! "absorb" a limit and stop early during execution. This optimizer rule also includes -//! limit absorption transformation through [`ExecutionPlan::with_fetch`]. +//! "absorb" a limit and stop early during execution. //! //! ## Background: vectorized volcano execution model //! DataFusion uses a batched volcano model. For most operators, output is @@ -39,10 +38,26 @@ //! --- DataSourceExec() //! --- DataSourceExec() //! ``` +//! //! Under this model, `NestedLoopJoinExec` would keep working until it can emit //! a full batch (8192 rows), even though the query only needs 10. If the limit //! cannot be pushed below the join, we can still embed it inside the join so it -//! stops once the limit is satisfied. +//! stops once the limit is satisfied. The transformed plan looks like: +//! +//! ```text +//! NestedLoopJoinExec(on=expr_expensive_and_selective, fetch=10) +//! --- DataSourceExec() +//! --- DataSourceExec() +//! ``` +//! +//! ## Implementation +//! The current optimizer rule optionally pushes `fetch` requirements into +//! operators via [`ExecutionPlan::with_fetch`]. +//! +//! To support early termination in operators, [`LimitedBatchCoalescer`](https://docs.rs/datafusion/latest/datafusion/physical_plan/coalesce/struct.LimitedBatchCoalescer.html) +//! can help manage the output buffer. +//! +//! Reference implementation in Hash Join: use std::fmt::Debug; use std::sync::Arc;