Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,47 @@

//! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce
//! data transfer as much as possible.
//!
//! # Plan Limit Absorption
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can mention that with the use of LimitedBatchCoalescer this can allow for passing the limit down and allowing output immediately when the limit is reached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated!

//! In addition to pushing down [`LimitExec`] in the plan, some operators can
//! "absorb" a limit and stop early during execution.
//!
//! ## Background: vectorized volcano execution model
//! DataFusion uses a batched volcano model. For most operators, output is
//! produced in batches of `datafusion.execution.batch_size` (default 8192), so
//! the batch sizes typically look like:
//! ```text
//! 8192, 8192, ..., 8192, 100 (the final batch may be partial)
//! ```
//!
//! ## Example
//! For a join with an expensive, selective predicate:
//! ```text
//! LimitExec(fetch=10)
//! -- NestedLoopJoinExec(on=expr_expensive_and_selective)
//! --- DataSourceExec()
//! --- DataSourceExec()
//! ```
//!
//! Under this model, `NestedLoopJoinExec` would keep working until it can emit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to give an example with an embedded limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks.

//! a full batch (8192 rows), even though the query only needs 10. If the limit
//! cannot be pushed below the join, we can still embed it inside the join so it
//! stops once the limit is satisfied. The transformed plan looks like:
//!
//! ```text
//! NestedLoopJoinExec(on=expr_expensive_and_selective, fetch=10)
//! --- DataSourceExec()
//! --- DataSourceExec()
//! ```
//!
//! ## Implementation
//! The current optimizer rule optionally pushes `fetch` requirements into
//! operators via [`ExecutionPlan::with_fetch`].
//!
//! To support early termination in operators, [`LimitedBatchCoalescer`](https://docs.rs/datafusion/latest/datafusion/physical_plan/coalesce/struct.LimitedBatchCoalescer.html)
//! can help manage the output buffer.
//!
//! Reference implementation in Hash Join: <https://github.com/apache/datafusion/pull/20228>

use std::fmt::Debug;
use std::sync::Arc;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {

/// Returns a fetching variant of this `ExecutionPlan` node, if it supports
/// fetch limits. Returns `None` otherwise.
///
/// See physical optimizer rule [`limit_pushdown`] for details.
///
/// [`limit_pushdown`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/limit_pushdown/index.html
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
None
}
Expand Down