Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5dec3b9
fix: allow OuterRefColumn for non-adjacent outer relation
duongcongtoai May 25, 2025
af21609
fix: accidentally pushdown filter with subquery
duongcongtoai May 25, 2025
3247d31
chore: clippy
duongcongtoai May 25, 2025
3fbb9ba
chore: rm debug details
duongcongtoai May 25, 2025
c6d570c
fix: breaking changes
duongcongtoai May 25, 2025
c4cd3f7
fix: lateral join losing its outer ref columns
duongcongtoai May 25, 2025
3d924af
test: more test case for other decorrelation
duongcongtoai May 25, 2025
df34dc7
doc: better comments
duongcongtoai May 26, 2025
68b423f
fix: test
duongcongtoai Nov 22, 2025
a83e4d2
Remove outer_query_schema from planner
mkleen Jan 21, 2026
73e9387
Add new tests
mkleen Jan 21, 2026
6144277
fixup! Add new tests
mkleen Jan 21, 2026
193be03
fixup! fixup! Add new tests
mkleen Jan 21, 2026
4dbb62d
Fix fmt
mkleen Jan 21, 2026
0e77c16
Remove test from joins.slt
mkleen Jan 21, 2026
75a6ebb
Remove whitespace in CROSS JOIN output
mkleen Jan 23, 2026
ed7b5d9
Return references in planner
mkleen Feb 6, 2026
cc9ee37
Add sql logic tests back
mkleen Feb 6, 2026
818926d
Simplify loop logic
mkleen Feb 7, 2026
8a228bc
Fix fmt
mkleen Feb 7, 2026
20583c5
Remove redundant tests for sql integrations
mkleen Feb 7, 2026
510838a
fixup! Remove redundant tests for sql integrations
mkleen Feb 7, 2026
42170bf
fixup! fixup! Remove redundant tests for sql integrations
mkleen Feb 7, 2026
885c337
fixup! fixup! fixup! Remove redundant tests for sql integrations
mkleen Feb 7, 2026
d094f13
Fix fmt
mkleen Feb 7, 2026
14aac86
Fix clippy
mkleen Feb 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ pub fn check_subquery_expr(
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
"Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"
"Correlated scalar subquery in the GROUP BY clause must \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this change is unrelated (it's in my original PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from fmt.

also be in the aggregate expressions"
)
} else {
Ok(())
}
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
"Correlated scalar subquery can only be used in Projection, \
Filter, Aggregate plan nodes"
),
}?;
}
Expand Down
55 changes: 25 additions & 30 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema()
&& let Ok((qualifier, field)) =
for outer in planner_context.outer_queries_schemas() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
));
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
));
}
}

// Default case
Expand Down Expand Up @@ -172,14 +173,14 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
not_impl_err!("compound identifier: {ids:?}")
} else {
// Check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
for outer in planner_context.outer_queries_schemas() {
let search_result = search_dfschema(&ids, outer);
match search_result {
let result = match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
// TODO: remove this when we have support for nested identifiers for OuterReferenceColumn
not_impl_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
Column::from((qualifier, field))
Expand All @@ -195,26 +196,20 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
))
}
// Found no matching field, will return a default
None => {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) =
form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
}
} else {
let s = &ids[0..ids.len()];
// Safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) = form_identifier(s).unwrap();
let mut column = Column::new(relation, column_name);
if self.options.collect_spans
&& let Some(span) = ids_span
{
column.spans_mut().add_span(span);
}
Ok(Expr::Column(column))
None => continue,
};
return result;
}
// Safe unwrap as column name can never be empty or exceed the bounds
let (relation, column_name) =
form_identifier(&ids[0..ids.len()]).unwrap();
let mut column = Column::new(relation, column_name);
if self.options.collect_spans
&& let Some(span) = ids_span
{
column.spans_mut().add_span(span);
}
Ok(Expr::Column(column))
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();
Ok(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -54,8 +53,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = &subquery.body.as_ref() {
Expand All @@ -70,7 +68,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -98,8 +96,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
Expand All @@ -112,7 +109,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -172,8 +169,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
Expand All @@ -188,7 +184,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down
34 changes: 22 additions & 12 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,11 @@ pub struct PlannerContext {
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
outer_query_schema: Option<DFSchemaRef>,

/// The queries schemas of outer query relations, used to resolve the outer referenced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect

/// columns in subquery (recursive aware)
outer_queries_schemas_stack: Vec<DFSchemaRef>,

/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
outer_from_schema: Option<DFSchemaRef>,
Expand All @@ -282,7 +285,7 @@ impl PlannerContext {
Self {
prepare_param_data_types: Arc::new(vec![]),
ctes: HashMap::new(),
outer_query_schema: None,
outer_queries_schemas_stack: vec![],
outer_from_schema: None,
create_table_schema: None,
}
Expand All @@ -297,19 +300,26 @@ impl PlannerContext {
self
}

// Return a reference to the outer query's schema
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref().map(|s| s.as_ref())
/// Return the stack of outer relations' schemas, the outer most
/// relation are at the first entry
pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
&self.outer_queries_schemas_stack
}

/// Sets the outer query schema, returning the existing one, if
/// any
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.outer_query_schema, &mut schema);
schema
pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
self.outer_queries_schemas_stack.push(schema);
}

/// The schema of the adjacent outer relation
pub fn latest_outer_query_schema(&mut self) -> Option<&DFSchemaRef> {
self.outer_queries_schemas_stack.last()
}

/// Remove the schema of the adjacent outer relation
pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.pop()
}

pub fn set_table_schema(
Expand Down
23 changes: 14 additions & 9 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} => {
let tbl_func_ref = self.object_name_to_table_reference(name)?;
let schema = planner_context
.outer_query_schema()
.outer_queries_schemas()
.last()
.cloned()
.unwrap_or_else(DFSchema::empty);
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let func_args = args
.into_iter()
.map(|arg| match arg {
Expand Down Expand Up @@ -310,20 +311,24 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let old_from_schema = planner_context
.set_outer_from_schema(None)
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let new_query_schema = match planner_context.outer_query_schema() {
Some(old_query_schema) => {
let outer_query_schema = planner_context.pop_outer_query_schema();
let new_query_schema = match outer_query_schema {
Some(ref old_query_schema) => {
let mut new_query_schema = old_from_schema.as_ref().clone();
new_query_schema.merge(old_query_schema);
Some(Arc::new(new_query_schema))
new_query_schema.merge(old_query_schema.as_ref());
Arc::new(new_query_schema)
}
None => Some(Arc::clone(&old_from_schema)),
None => Arc::clone(&old_from_schema),
};
let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
planner_context.append_outer_query_schema(new_query_schema);

let plan = self.create_relation(subquery, planner_context)?;
let outer_ref_columns = plan.all_out_ref_exprs();

planner_context.set_outer_query_schema(old_query_schema);
planner_context.pop_outer_query_schema();
if let Some(schema) = outer_query_schema {
planner_context.append_outer_query_schema(schema);
}
planner_context.set_outer_from_schema(Some(old_from_schema));

// We can omit the subquery wrapper if there are no columns
Expand Down
21 changes: 14 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, Result, not_impl_err, plan_err};
use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -637,12 +637,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let outer_query_schema_vec =
planner_context.outer_queries_schemas().to_vec();
let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;

Expand All @@ -657,9 +654,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let mut schema_stack: Vec<Vec<&DFSchema>> =
vec![vec![plan.schema()], fallback_schemas];
for sc in outer_query_schema_vec.iter().rev() {
schema_stack.push(vec![sc.as_ref()]);
}

let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
schema_stack
.iter()
.map(|sc| sc.as_slice())
.collect::<Vec<&[&DFSchema]>>()
.as_slice(),
&[using_columns],
)?;

Expand Down
16 changes: 16 additions & 0 deletions datafusion/sql/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,26 @@ impl ContextProvider for MockContextProvider {
])),
"orders" => Ok(Schema::new(vec![
Field::new("order_id", DataType::UInt32, false),
Field::new("o_orderkey", DataType::UInt32, false),
Copy link
Contributor Author

@mkleen mkleen Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These extensions were necessary to get the tests to work.

Field::new("o_custkey", DataType::UInt32, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("customer_id", DataType::UInt32, false),
Field::new("o_totalprice", DataType::Decimal32(15, 2), false),
Field::new("o_item_id", DataType::Utf8, false),
Field::new("qty", DataType::Int32, false),
Field::new("price", DataType::Float64, false),
Field::new("delivered", DataType::Boolean, false),
])),
"customer" => Ok(Schema::new(vec![
Field::new("c_custkey", DataType::UInt32, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_nationkey", DataType::UInt32, false),
Field::new("c_phone", DataType::Decimal32(15, 2), false),
Field::new("c_acctbal", DataType::Float64, false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
])),
"array" => Ok(Schema::new(vec![
Field::new(
"left",
Expand All @@ -186,8 +200,10 @@ impl ContextProvider for MockContextProvider {
),
])),
"lineitem" => Ok(Schema::new(vec![
Field::new("l_orderkey", DataType::UInt32, false),
Field::new("l_item_id", DataType::UInt32, false),
Field::new("l_description", DataType::Utf8, false),
Field::new("l_extendedprice", DataType::Decimal32(15, 2), false),
Field::new("price", DataType::Float64, false),
])),
"aggregate_test_100" => Ok(Schema::new(vec![
Expand Down
Loading