From 5dec3b9ccce82dd6c4ea07f5b9cdbae2a570b852 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 16:10:07 +0200 Subject: [PATCH 01/26] fix: allow OuterRefColumn for non-adjacent outer relation --- datafusion/sql/src/expr/identifier.rs | 77 +++++++++++--------- datafusion/sql/src/expr/subquery.rs | 20 ++--- datafusion/sql/src/planner.rs | 29 +++++--- datafusion/sql/src/relation/mod.rs | 19 +++-- datafusion/sql/src/select.rs | 6 +- datafusion/sqllogictest/test_files/debug.slt | 52 +++++++++++++ 6 files changed, 132 insertions(+), 71 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 34fbe2edf8dd9..221fd517a2418 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -76,15 +76,16 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - if let Some(outer) = planner_context.outer_query_schema() - && let Ok((qualifier, field)) = + for outer in planner_context.outer_query_schema() { + if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) - { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - return Ok(Expr::OuterReferenceColumn( - Arc::clone(field), - Column::from((qualifier, field)), - )); + { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + return Ok(Expr::OuterReferenceColumn( + Arc::clone(field), + Column::from((qualifier, field)), + )); + } } // Default case @@ -172,36 +173,44 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - let search_result = search_dfschema(&ids, outer); - match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( + let outer_schemas = planner_context.outer_query_schema(); + let mut maybe_result = None; + if outer_schemas.len() > 0 { + for outer in planner_context.outer_query_schema() { + let search_result = search_dfschema(&ids, outer); + let result = match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove when can support nested identifiers for OuterReferenceColumn + not_impl_err!( "Nested identifiers are not yet supported for OuterReferenceColumn {}", Column::from((qualifier, field)) .quoted_flat_name() ) - } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - Arc::clone(field), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = - form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + Ok(Expr::OuterReferenceColumn( + Arc::clone(field), + Column::from((qualifier, field)), + )) + } + // Found no matching field, will return a default + None => continue, + }; + maybe_result = Some(result); + break; + } + if let Some(result) = maybe_result { + result + } else { + let s = &ids[0..ids.len()]; + // safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + Ok(Expr::Column(Column::new(relation, column_name))) } } else { let s = &ids[0..ids.len()]; diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 6837b2671cb1c..007de5c581464 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,11 +31,10 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); Ok(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(sub_plan), @@ -54,8 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = &subquery.body.as_ref() { @@ -70,7 +68,7 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, @@ -98,8 +96,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -112,7 +109,7 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, @@ -172,8 +169,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -188,7 +184,7 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 520a2d55ef6a2..0de96c15a2555 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -262,7 +262,7 @@ pub struct PlannerContext { /// Use `Arc` to allow cheap cloning ctes: HashMap>, /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Option, + outer_query_schema: Vec, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -282,7 +282,7 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, + outer_query_schema: vec![], outer_from_schema: None, create_table_schema: None, } @@ -298,18 +298,27 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + pub fn outer_query_schema(&self) -> Vec<&DFSchema> { + self.outer_query_schema + .iter() + .map(|sc| sc.as_ref()) + .collect() } /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema + pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { + self.outer_query_schema.push(schema); + } + + pub fn latest_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.last().clone().cloned() + } + + /// Sets the outer query schema, returning the existing one, if + /// any + pub fn pop_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index cef3726c62e40..d11ddb435c702 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -27,7 +27,7 @@ use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ PlannedRelation, RelationPlannerContext, RelationPlanning, }; -use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, expr::Unnest}; +use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; @@ -262,9 +262,8 @@ impl SqlToRel<'_, S> { } => { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context - .outer_query_schema() - .cloned() - .unwrap_or_else(DFSchema::empty); + .latest_outer_query_schema() + .unwrap_or(Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { @@ -310,20 +309,20 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.outer_query_schema() { + let new_query_schema = match planner_context.pop_outer_query_schema() { Some(old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); - new_query_schema.merge(old_query_schema); - Some(Arc::new(new_query_schema)) + new_query_schema.merge(old_query_schema.as_ref()); + Arc::new(new_query_schema) } - None => Some(Arc::clone(&old_from_schema)), + None => Arc::clone(&old_from_schema), }; - let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); + planner_context.set_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_query_schema); + planner_context.pop_outer_query_schema(); planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..0803733a048bc 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -637,14 +637,10 @@ impl SqlToRel<'_, S> { match selection { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); - let outer_query_schema = planner_context.outer_query_schema().cloned(); - let outer_query_schema_vec = outer_query_schema - .as_ref() - .map(|schema| vec![schema]) - .unwrap_or_else(Vec::new); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; + let outer_query_schema_vec = planner_context.outer_query_schema(); // Check for aggregation functions let aggregate_exprs = diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt new file mode 100644 index 0000000000000..48fd16bc0fd97 --- /dev/null +++ b/datafusion/sqllogictest/test_files/debug.slt @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + +statement ok +CREATE TABLE employees ( + employee_id INTEGER, + employee_name VARCHAR, + dept_id INTEGER, + salary DECIMAL +); + +statement ok +CREATE TABLE project_assignments ( + project_id INTEGER, + employee_id INTEGER, + priority INTEGER +); + + + +query TT +explain SELECT e1.employee_name, e1.salary +FROM employees e1 +WHERE e1.salary > ( + SELECT AVG(e2.salary) + FROM employees e2 + WHERE e2.dept_id = e1.dept_id + AND e2.salary > ( + SELECT AVG(e3.salary) + FROM employees e3 + WHERE e3.dept_id = e1.dept_id + ) +); +---- \ No newline at end of file From af2160977dd5e0ef50592f92a67f7cfdd7b05ef8 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:14:53 +0200 Subject: [PATCH 02/26] fix: accidentally pushdown filter with subquery --- .../expr/src/logical_plan/invariants.rs | 9 ++++-- datafusion/optimizer/src/push_down_filter.rs | 14 ++++++++- .../sqllogictest/test_files/subquery.slt | 29 +++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index b39b23e30f4e8..3bb76706563a1 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -214,9 +214,12 @@ pub fn check_subquery_expr( Ok(()) } } - _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" - ), + any => { + println!("here {any}"); + plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" + ) + } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index ecd6a89f2a3e6..4c961645a6547 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1131,7 +1131,11 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = filter_predicates .into_iter() - .partition(|pred| pred.is_volatile()); + // TODO: subquery decorrelation sometimes cannot decorrelated all the expr + // (i.e in the case of recursive subquery) + // this function may accidentally pushdown the subquery expr as well + // until then, we have to exclude these exprs here + .partition(|pred| pred.is_volatile() || has_subquery(*pred)); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1423,6 +1427,14 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } +fn has_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + _ => Ok(false), + }) + .unwrap() +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e73f4ec3e32da..34c2c3b1003ac 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,3 +1528,32 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) +======= +# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice < ( + select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_1 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < () +10)----------------Subquery: +11)------------------Projection: sum(lineitem.l_extendedprice) AS price +12)--------------------Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] +13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) +14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] +15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] From 3247d315acd922148f46b31da10dd4b6221b2387 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:20:20 +0200 Subject: [PATCH 03/26] chore: clippy --- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/planner.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4c961645a6547..7fbc18187cbfb 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1135,7 +1135,7 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(*pred)); + .partition(|pred| pred.is_volatile() || has_subquery(pred)); // Check which non-volatile filters are supported by source let supported_filters = scan diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 221fd517a2418..2a8e2a9a77579 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -175,7 +175,7 @@ impl SqlToRel<'_, S> { // Check the outer_query_schema and try to find a match let outer_schemas = planner_context.outer_query_schema(); let mut maybe_result = None; - if outer_schemas.len() > 0 { + if !outer_schemas.is_empty() { for outer in planner_context.outer_query_schema() { let search_result = search_dfschema(&ids, outer); let result = match search_result { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0de96c15a2555..121ed9655912a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -312,7 +312,7 @@ impl PlannerContext { } pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().clone().cloned() + self.outer_query_schema.last().cloned() } /// Sets the outer query schema, returning the existing one, if From 3fbb9bad4d56c0f4a53f86cc5c23525b65bd601d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:23:33 +0200 Subject: [PATCH 04/26] chore: rm debug details --- .../expr/src/logical_plan/invariants.rs | 7 +-- datafusion/sqllogictest/test_files/debug.slt | 52 ------------------- 2 files changed, 2 insertions(+), 57 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 3bb76706563a1..42a2993fbea37 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -214,12 +214,9 @@ pub fn check_subquery_expr( Ok(()) } } - any => { - println!("here {any}"); - plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" + _ => plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" ) - } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt deleted file mode 100644 index 48fd16bc0fd97..0000000000000 --- a/datafusion/sqllogictest/test_files/debug.slt +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# make sure to a batch size smaller than row number of the table. -statement ok -set datafusion.execution.batch_size = 2; - -statement ok -CREATE TABLE employees ( - employee_id INTEGER, - employee_name VARCHAR, - dept_id INTEGER, - salary DECIMAL -); - -statement ok -CREATE TABLE project_assignments ( - project_id INTEGER, - employee_id INTEGER, - priority INTEGER -); - - - -query TT -explain SELECT e1.employee_name, e1.salary -FROM employees e1 -WHERE e1.salary > ( - SELECT AVG(e2.salary) - FROM employees e2 - WHERE e2.dept_id = e1.dept_id - AND e2.salary > ( - SELECT AVG(e3.salary) - FROM employees e3 - WHERE e3.dept_id = e1.dept_id - ) -); ----- \ No newline at end of file From c6d570ccbe5014211e9d575a08689eeadc4e4c2f Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 18:50:33 +0200 Subject: [PATCH 05/26] fix: breaking changes --- .../expr/src/logical_plan/invariants.rs | 8 +++-- datafusion/optimizer/src/push_down_filter.rs | 8 +++-- datafusion/sql/src/expr/identifier.rs | 6 ++-- datafusion/sql/src/planner.rs | 35 ++++++++++++++----- datafusion/sql/src/select.rs | 2 +- 5 files changed, 40 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 42a2993fbea37..0889afd08fee4 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -208,15 +208,17 @@ pub fn check_subquery_expr( if group_expr.contains(expr) && !aggr_expr.contains(expr) { // TODO revisit this validation logic plan_err!( - "Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions" + "Correlated scalar subquery in the GROUP BY clause must \ + also be in the aggregate expressions" ) } else { Ok(()) } } _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" - ) + "Correlated scalar subquery can only be used in Projection, \ + Filter, Aggregate plan nodes" + ), }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 7fbc18187cbfb..e569754691ee2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1135,7 +1135,9 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(pred)); + .partition(|pred| { + pred.is_volatile() || has_scalar_subquery(pred) + }); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1427,9 +1429,9 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } -fn has_subquery(expr: &Expr) -> bool { +fn has_scalar_subquery(expr: &Expr) -> bool { expr.exists(|e| match e { - Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + Expr::ScalarSubquery(_) => Ok(true), _ => Ok(false), }) .unwrap() diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 2a8e2a9a77579..547e26c55f4a1 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -76,7 +76,7 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) { @@ -173,10 +173,10 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - let outer_schemas = planner_context.outer_query_schema(); + let outer_schemas = planner_context.outer_queries_schemas(); let mut maybe_result = None; if !outer_schemas.is_empty() { - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { let search_result = search_dfschema(&ids, outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 121ed9655912a..d769d81f57c72 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -261,8 +261,16 @@ pub struct PlannerContext { /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, + + /// The queries schemas of outer query relations, used to resolve the outer referenced + /// columns in subquery (recursive aware) + outer_queries_schemas_stack: Vec, + /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Vec, + /// This field is maintained to support deprecated functions + /// `outer_query_schema` and `set_outer_query_schema` + /// which is only aware of the adjacent outer relation + outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -282,7 +290,8 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: vec![], + outer_query_schema: None, + outer_queries_schemas_stack: vec![], outer_from_schema: None, create_table_schema: None, } @@ -298,8 +307,16 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Vec<&DFSchema> { - self.outer_query_schema + // This function is only compatible with + #[deprecated(note = "Use outer_queries_schemas instead")] + pub fn outer_query_schema(&self) -> Option<&DFSchema> { + self.outer_query_schema.as_ref().map(|s| s.as_ref()) + } + + /// Return the stack of outer relations' schemas, the outer most + /// relation are at the first entry + pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { + self.outer_queries_schemas_stack .iter() .map(|sc| sc.as_ref()) .collect() @@ -308,17 +325,17 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { - self.outer_query_schema.push(schema); + self.outer_queries_schemas_stack.push(schema); } + /// The schema of the adjacent outer relation pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().cloned() + self.outer_queries_schemas_stack.last().cloned() } - /// Sets the outer query schema, returning the existing one, if - /// any + /// Remove the schema of the adjacent outer relation pub fn pop_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.pop() + self.outer_queries_schemas_stack.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 0803733a048bc..624c04f892815 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -640,7 +640,7 @@ impl SqlToRel<'_, S> { let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_query_schema(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = From c4cd3f7d13aaf581bf1fa87926061c1af500d974 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:38:51 +0200 Subject: [PATCH 06/26] fix: lateral join losing its outer ref columns --- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/planner.rs | 7 ++----- datafusion/sql/src/relation/mod.rs | 8 ++++++-- datafusion/sql/src/select.rs | 16 +++++++++++++--- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 547e26c55f4a1..ace8170241e66 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -177,7 +177,7 @@ impl SqlToRel<'_, S> { let mut maybe_result = None; if !outer_schemas.is_empty() { for outer in planner_context.outer_queries_schemas() { - let search_result = search_dfschema(&ids, outer); + let search_result = search_dfschema(&ids, &outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure Some((field, qualifier, nested_names)) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index d769d81f57c72..0368f215e627d 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -315,11 +315,8 @@ impl PlannerContext { /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry - pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { - self.outer_queries_schemas_stack - .iter() - .map(|sc| sc.as_ref()) - .collect() + pub fn outer_queries_schemas(&self) -> Vec { + self.outer_queries_schemas_stack.to_vec() } /// Sets the outer query schema, returning the existing one, if diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index d11ddb435c702..470e099dda3b3 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -309,8 +309,9 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.pop_outer_query_schema() { - Some(old_query_schema) => { + let outer_query_schema = planner_context.pop_outer_query_schema(); + let new_query_schema = match outer_query_schema { + Some(ref old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); new_query_schema.merge(old_query_schema.as_ref()); Arc::new(new_query_schema) @@ -323,6 +324,9 @@ impl SqlToRel<'_, S> { let outer_ref_columns = plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); + if let Some(schema) = outer_query_schema { + planner_context.set_outer_query_schema(schema); + } planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 624c04f892815..4e0946bdd4e4e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -29,7 +29,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{Column, Result, not_impl_err, plan_err}; +use datafusion_common::{DFSchema, Column, Result, not_impl_err, plan_err}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -638,9 +638,9 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = @@ -653,9 +653,19 @@ impl SqlToRel<'_, S> { let mut using_columns = HashSet::new(); expr_to_columns(&filter_expr, &mut using_columns)?; + let mut schema_stack: Vec> = + vec![vec![plan.schema()], fallback_schemas]; + for sc in outer_query_schema_vec.iter().rev() { + schema_stack.push(vec![sc.as_ref()]); + } + let filter_expr = normalize_col_with_schemas_and_ambiguity_check( filter_expr, - &[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec], + schema_stack + .iter() + .map(|sc| sc.as_slice()) + .collect::>() + .as_slice(), &[using_columns], )?; From 3d924af2a4e2f8f178bcddf5d8d9b30057e319cf Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:56:11 +0200 Subject: [PATCH 07/26] test: more test case for other decorrelation --- datafusion/sqllogictest/test_files/joins.slt | 24 ++++++++ .../sqllogictest/test_files/subquery.slt | 57 ++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index dd7f4710d9dbb..80697fbfcc431 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4646,6 +4646,30 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +# 2 nested lateral join with the deepest join referencing the outer most relation +query TT +explain SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2; +---- +logical_plan +01)Cross Join: +02)--SubqueryAlias: j1_outer +03)----TableScan: j1 projection=[j1_string, j1_id] +04)--SubqueryAlias: j2 +05)----Subquery: +06)------Cross Join: +07)--------SubqueryAlias: j1_inner +08)----------TableScan: j1 projection=[j1_string, j1_id] +09)--------SubqueryAlias: j2 +10)----------Subquery: +11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id +12)--------------TableScan: j2 projection=[j2_string, j2_id] +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) + + query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 34c2c3b1003ac..8fbea2d48c908 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,8 +1528,8 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) -======= -# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation + +# correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation query TT explain select c_custkey from customer where c_acctbal < ( @@ -1557,3 +1557,56 @@ logical_plan 13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) 14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] 15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] + +# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] + +# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey +13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From df34dc733cd6382fef9dfe30737a461d138c4ed0 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 26 May 2025 06:35:10 +0200 Subject: [PATCH 08/26] doc: better comments --- datafusion/sql/src/expr/subquery.rs | 8 ++++---- datafusion/sql/src/planner.rs | 23 ++++++++++++++++++++--- datafusion/sql/src/relation/mod.rs | 4 ++-- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 007de5c581464..662c44f6f2620 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,7 +31,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); @@ -53,7 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = &subquery.body.as_ref() { @@ -96,7 +96,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -169,7 +169,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0368f215e627d..6191c9963560f 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -306,13 +306,30 @@ impl PlannerContext { self } - // Return a reference to the outer query's schema - // This function is only compatible with + /// Return a reference to the outer query's schema + /// This function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` #[deprecated(note = "Use outer_queries_schemas instead")] pub fn outer_query_schema(&self) -> Option<&DFSchema> { self.outer_query_schema.as_ref().map(|s| s.as_ref()) } + /// Sets the outer query schema, returning the existing one, if + /// any, this function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` + #[deprecated( + note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" + )] + pub fn set_outer_query_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.outer_query_schema, &mut schema); + schema + } + /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry pub fn outer_queries_schemas(&self) -> Vec { @@ -321,7 +338,7 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { + pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) { self.outer_queries_schemas_stack.push(schema); } diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 470e099dda3b3..3372416d39726 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -318,14 +318,14 @@ impl SqlToRel<'_, S> { } None => Arc::clone(&old_from_schema), }; - planner_context.set_outer_query_schema(new_query_schema); + planner_context.append_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); if let Some(schema) = outer_query_schema { - planner_context.set_outer_query_schema(schema); + planner_context.append_outer_query_schema(schema); } planner_context.set_outer_from_schema(Some(old_from_schema)); From 68b423f90a76fa1da13089b468f50ca9a267ab6b Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 22 Nov 2025 15:24:40 +0100 Subject: [PATCH 09/26] fix: test --- datafusion/sql/src/relation/mod.rs | 6 ++++-- datafusion/sqllogictest/test_files/joins.slt | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 3372416d39726..d978d1737debd 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -262,8 +262,10 @@ impl SqlToRel<'_, S> { } => { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context - .latest_outer_query_schema() - .unwrap_or(Arc::new(DFSchema::empty())); + .outer_queries_schemas() + .last() + .cloned() + .unwrap_or_else(|| Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 80697fbfcc431..a0987e6d72227 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4667,8 +4667,7 @@ logical_plan 10)----------Subquery: 11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id 12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; From a83e4d260bf3b3770203f312dadfd2b1ab5489f2 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 13:10:16 +0100 Subject: [PATCH 10/26] Remove outer_query_schema from planner --- datafusion/sql/src/planner.rs | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 6191c9963560f..6a3a2cc4e5b5a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -266,11 +266,6 @@ pub struct PlannerContext { /// columns in subquery (recursive aware) outer_queries_schemas_stack: Vec, - /// The query schema of the outer query plan, used to resolve the columns in subquery - /// This field is maintained to support deprecated functions - /// `outer_query_schema` and `set_outer_query_schema` - /// which is only aware of the adjacent outer relation - outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -290,7 +285,6 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, outer_queries_schemas_stack: vec![], outer_from_schema: None, create_table_schema: None, @@ -306,30 +300,6 @@ impl PlannerContext { self } - /// Return a reference to the outer query's schema - /// This function should not be used together with - /// `outer_queries_schemas`, `append_outer_query_schema` - /// `latest_outer_query_schema` and `pop_outer_query_schema` - #[deprecated(note = "Use outer_queries_schemas instead")] - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) - } - - /// Sets the outer query schema, returning the existing one, if - /// any, this function should not be used together with - /// `outer_queries_schemas`, `append_outer_query_schema` - /// `latest_outer_query_schema` and `pop_outer_query_schema` - #[deprecated( - note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" - )] - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema - } - /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry pub fn outer_queries_schemas(&self) -> Vec { From 73e938781f8ea02e08e5a926bbf48a8e72555832 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 10:20:50 +0100 Subject: [PATCH 11/26] Add new tests --- datafusion/sql/tests/common/mod.rs | 17 ++- datafusion/sql/tests/sql_integration.rs | 131 ++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 9dc6b895e49ab..0972fdd191b48 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -160,13 +160,26 @@ impl ContextProvider for MockContextProvider { Field::new("last_name", DataType::Utf8, false), ])), "orders" => Ok(Schema::new(vec![ - Field::new("order_id", DataType::UInt32, false), + Field::new("o_orderkey", DataType::UInt32, false), + Field::new("o_custkey", DataType::UInt32, false), + Field::new("o_orderstatus", DataType::Utf8, false), Field::new("customer_id", DataType::UInt32, false), + Field::new("o_totalprice", DataType::Decimal32(15, 2), false), Field::new("o_item_id", DataType::Utf8, false), Field::new("qty", DataType::Int32, false), Field::new("price", DataType::Float64, false), Field::new("delivered", DataType::Boolean, false), ])), + "customer" => Ok(Schema::new(vec![ + Field::new("c_custkey", DataType::UInt32, false), + Field::new("c_name", DataType::Utf8, false), + Field::new("c_address", DataType::Utf8, false), + Field::new("c_nationkey", DataType::UInt32, false), + Field::new("c_phone", DataType::Decimal32(15, 2), false), + Field::new("c_acctbal", DataType::Float64, false), + Field::new("c_mktsegment", DataType::Utf8, false), + Field::new("c_comment", DataType::Utf8, false), + ])), "array" => Ok(Schema::new(vec![ Field::new( "left", @@ -186,8 +199,10 @@ impl ContextProvider for MockContextProvider { ), ])), "lineitem" => Ok(Schema::new(vec![ + Field::new("l_orderkey", DataType::UInt32, false), Field::new("l_item_id", DataType::UInt32, false), Field::new("l_description", DataType::Utf8, false), + Field::new("l_extendedprice", DataType::Decimal32(15, 2), false), Field::new("price", DataType::Float64, false), ])), "aggregate_test_100" => Ok(Schema::new(vec![ diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index aaf0b0ae30fd0..7e8d1feebca44 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4855,3 +4855,134 @@ fn test_using_join_wildcard_schema() { ] ); } + +#[test] +fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() { + let sql = "SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_outer + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_inner + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j2.j2_id, j2.j2_string + Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id + TableScan: j2 +"# +); +} + +#[test] +fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice < ( + select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND orders.o_totalprice < () + Subquery: + Projection: sum(lineitem.l_extendedprice) AS price + Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} + +#[test] +fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND EXISTS () + Subquery: + Projection: lineitem.l_orderkey, lineitem.l_item_id, lineitem.l_description, lineitem.l_extendedprice, lineitem.price + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} + +#[test] +fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND orders.o_totalprice IN () + Subquery: + Projection: lineitem.l_extendedprice AS price + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} \ No newline at end of file From 614427780dba2688b25f1e310c7dccdea5ce1071 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 13:31:12 +0100 Subject: [PATCH 12/26] fixup! Add new tests --- datafusion/sql/tests/sql_integration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 7e8d1feebca44..5c07e5517c414 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4869,13 +4869,13 @@ fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_r plan, @r#" Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_outer TableScan: j1 SubqueryAlias: j2 Subquery: Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_inner TableScan: j1 SubqueryAlias: j2 From 193be03b0fcd20451b314fec6c7594a487c0d7d0 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 14:10:52 +0100 Subject: [PATCH 13/26] fixup! fixup! Add new tests --- datafusion/optimizer/src/push_down_filter.rs | 16 +--- datafusion/sql/tests/common/mod.rs | 1 + datafusion/sql/tests/sql_integration.rs | 19 +++-- .../sqllogictest/test_files/subquery.slt | 82 ------------------- 4 files changed, 14 insertions(+), 104 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index e569754691ee2..ecd6a89f2a3e6 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1131,13 +1131,7 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = filter_predicates .into_iter() - // TODO: subquery decorrelation sometimes cannot decorrelated all the expr - // (i.e in the case of recursive subquery) - // this function may accidentally pushdown the subquery expr as well - // until then, we have to exclude these exprs here - .partition(|pred| { - pred.is_volatile() || has_scalar_subquery(pred) - }); + .partition(|pred| pred.is_volatile()); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1429,14 +1423,6 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } -fn has_scalar_subquery(expr: &Expr) -> bool { - expr.exists(|e| match e { - Expr::ScalarSubquery(_) => Ok(true), - _ => Ok(false), - }) - .unwrap() -} - #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 0972fdd191b48..686fdf503f3d6 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -160,6 +160,7 @@ impl ContextProvider for MockContextProvider { Field::new("last_name", DataType::Utf8, false), ])), "orders" => Ok(Schema::new(vec![ + Field::new("order_id", DataType::UInt32, false), Field::new("o_orderkey", DataType::UInt32, false), Field::new("o_custkey", DataType::UInt32, false), Field::new("o_orderstatus", DataType::Utf8, false), diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 5c07e5517c414..5ea6fb42b8c89 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -995,15 +995,15 @@ fn select_nested_with_filters() { #[test] fn table_with_column_alias() { - let sql = "SELECT a, b, c - FROM lineitem l (a, b, c)"; + let sql = "SELECT a, b, c, d, e + FROM lineitem l (a, b, c, d, e)"; let plan = logical_plan(sql).unwrap(); assert_snapshot!( plan, @r" - Projection: l.a, l.b, l.c + Projection: l.a, l.b, l.c, l.d, l.e SubqueryAlias: l - Projection: lineitem.l_item_id AS a, lineitem.l_description AS b, lineitem.price AS c + Projection: lineitem.l_orderkey AS a, lineitem.l_item_id AS b, lineitem.l_description AS c, lineitem.l_extendedprice AS d, lineitem.price AS e TableScan: lineitem " ); @@ -1017,7 +1017,7 @@ fn table_with_column_alias_number_cols() { assert_snapshot!( err.strip_backtrace(), - @"Error during planning: Source table contains 3 columns but only 2 names given as column alias" + @"Error during planning: Source table contains 5 columns but only 2 names given as column alias" ); } @@ -1058,7 +1058,7 @@ fn natural_left_join() { plan, @r" Projection: a.l_item_id - Left Join: Using a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.price = b.price + Left Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem SubqueryAlias: b @@ -1075,7 +1075,7 @@ fn natural_right_join() { plan, @r" Projection: a.l_item_id - Right Join: Using a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.price = b.price + Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem SubqueryAlias: b @@ -4801,11 +4801,16 @@ fn test_using_join_wildcard_schema() { // Only columns from one join side should be present let expected_fields = vec![ "o1.order_id".to_string(), + "o1.o_orderkey".to_string(), + "o1.o_custkey".to_string(), + "o1.o_orderstatus".to_string(), "o1.customer_id".to_string(), + "o1.o_totalprice".to_string(), "o1.o_item_id".to_string(), "o1.qty".to_string(), "o1.price".to_string(), "o1.delivered".to_string(), + ]; assert_eq!(plan.schema().field_names(), expected_fields); diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 8fbea2d48c908..e73f4ec3e32da 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,85 +1528,3 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) - -# correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and o_totalprice < ( - select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_1 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < () -10)----------------Subquery: -11)------------------Projection: sum(lineitem.l_extendedprice) AS price -12)--------------------Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] -13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) -14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] -15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] - -# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and exists ( - select * from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_2 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal -10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] -11)----------------SubqueryAlias: __correlated_sq_1 -12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] - -# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and o_totalprice in ( - select l_extendedprice as price from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_2 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal -10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] -11)----------------SubqueryAlias: __correlated_sq_1 -12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey -13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From 4dbb62d1e27c023fa8f161f1d81e043b76fb88c8 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 14:19:57 +0100 Subject: [PATCH 14/26] Fix fmt --- datafusion/sql/src/expr/identifier.rs | 8 +++---- datafusion/sql/src/relation/mod.rs | 2 +- datafusion/sql/src/select.rs | 2 +- datafusion/sql/tests/sql_integration.rs | 31 ++++++++++++++----------- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index ace8170241e66..d6a40ceb51655 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -185,10 +185,10 @@ impl SqlToRel<'_, S> { { // TODO: remove when can support nested identifiers for OuterReferenceColumn not_impl_err!( - "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)) - .quoted_flat_name() - ) + "Nested identifiers are not yet supported for OuterReferenceColumn {}", + Column::from((qualifier, field)) + .quoted_flat_name() + ) } // Found matching field with no spare identifier(s) Some((field, qualifier, _nested_names)) => { diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index d978d1737debd..6558763ca4e42 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -27,7 +27,7 @@ use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ PlannedRelation, RelationPlannerContext, RelationPlanning, }; -use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, expr::Unnest}; use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 4e0946bdd4e4e..182bc97ad4d98 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -29,7 +29,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{DFSchema, Column, Result, not_impl_err, plan_err}; +use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 5ea6fb42b8c89..cf16cbdfa8b26 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4810,7 +4810,6 @@ fn test_using_join_wildcard_schema() { "o1.qty".to_string(), "o1.price".to_string(), "o1.delivered".to_string(), - ]; assert_eq!(plan.schema().field_names(), expected_fields); @@ -4862,7 +4861,8 @@ fn test_using_join_wildcard_schema() { } #[test] -fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() { +fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() +{ let sql = "SELECT * FROM j1 j1_outer, LATERAL ( SELECT * FROM j1 j1_inner, LATERAL ( SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id @@ -4889,11 +4889,12 @@ Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id TableScan: j2 "# -); + ); } #[test] -fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { +fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4923,11 +4924,12 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); + ); } #[test] -fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() { +fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4940,8 +4942,8 @@ fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing let plan = logical_plan(sql).unwrap(); assert_snapshot!( - plan, - @r#" + plan, + @r#" Sort: customer.c_custkey ASC NULLS LAST Projection: customer.c_custkey Filter: customer.c_acctbal < () @@ -4956,11 +4958,12 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); + ); } #[test] -fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() { +fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4973,8 +4976,8 @@ fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_lev let plan = logical_plan(sql).unwrap(); assert_snapshot!( - plan, - @r#" + plan, + @r#" Sort: customer.c_custkey ASC NULLS LAST Projection: customer.c_custkey Filter: customer.c_acctbal < () @@ -4989,5 +4992,5 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); -} \ No newline at end of file + ); +} From 0e77c16b02df8f8d064cc1d12087f9cf2e9e5556 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 18:08:51 +0100 Subject: [PATCH 15/26] Remove test from joins.slt --- datafusion/sqllogictest/test_files/joins.slt | 23 -------------------- 1 file changed, 23 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index a0987e6d72227..dd7f4710d9dbb 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4646,29 +4646,6 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) -# 2 nested lateral join with the deepest join referencing the outer most relation -query TT -explain SELECT * FROM j1 j1_outer, LATERAL ( - SELECT * FROM j1 j1_inner, LATERAL ( - SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id - ) as j2 -) as j2; ----- -logical_plan -01)Cross Join: -02)--SubqueryAlias: j1_outer -03)----TableScan: j1 projection=[j1_string, j1_id] -04)--SubqueryAlias: j2 -05)----Subquery: -06)------Cross Join: -07)--------SubqueryAlias: j1_inner -08)----------TableScan: j1 projection=[j1_string, j1_id] -09)--------SubqueryAlias: j2 -10)----------Subquery: -11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id -12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- From 75a6ebb107d85b46980fa02c03f07485dcc1329f Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Fri, 23 Jan 2026 09:32:51 +0100 Subject: [PATCH 16/26] Remove whitespace in CROSS JOIN output --- datafusion/sql/tests/sql_integration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index cf16cbdfa8b26..01620fb029fbb 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4874,13 +4874,13 @@ fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_r plan, @r#" Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_outer TableScan: j1 SubqueryAlias: j2 Subquery: Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_inner TableScan: j1 SubqueryAlias: j2 From ed7b5d948ba7172fd0c64e5c05a51624976652fc Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Fri, 6 Feb 2026 08:49:11 +0100 Subject: [PATCH 17/26] Return references in planner --- datafusion/sql/src/planner.rs | 8 ++++---- datafusion/sql/src/select.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 6a3a2cc4e5b5a..3ddae499875ce 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -302,8 +302,8 @@ impl PlannerContext { /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry - pub fn outer_queries_schemas(&self) -> Vec { - self.outer_queries_schemas_stack.to_vec() + pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] { + &self.outer_queries_schemas_stack } /// Sets the outer query schema, returning the existing one, if @@ -313,8 +313,8 @@ impl PlannerContext { } /// The schema of the adjacent outer relation - pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_queries_schemas_stack.last().cloned() + pub fn latest_outer_query_schema(&mut self) -> Option<&DFSchemaRef> { + self.outer_queries_schemas_stack.last() } /// Remove the schema of the adjacent outer relation diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 182bc97ad4d98..8f966a0f5065c 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -638,7 +638,7 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); - let outer_query_schema_vec = planner_context.outer_queries_schemas(); + let outer_query_schema_vec = planner_context.outer_queries_schemas().to_vec(); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; From cc9ee37b46c94b63821e33fec7fbad830d5cb249 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Fri, 6 Feb 2026 16:13:16 +0100 Subject: [PATCH 18/26] Add sql logic tests back --- datafusion/sqllogictest/test_files/joins.slt | 23 ++++++++ .../sqllogictest/test_files/subquery.slt | 53 +++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index dd7f4710d9dbb..a0987e6d72227 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4646,6 +4646,29 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +# 2 nested lateral join with the deepest join referencing the outer most relation +query TT +explain SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2; +---- +logical_plan +01)Cross Join: +02)--SubqueryAlias: j1_outer +03)----TableScan: j1 projection=[j1_string, j1_id] +04)--SubqueryAlias: j2 +05)----Subquery: +06)------Cross Join: +07)--------SubqueryAlias: j1_inner +08)----------TableScan: j1 projection=[j1_string, j1_id] +09)--------SubqueryAlias: j2 +10)----------Subquery: +11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id +12)--------------TableScan: j2 projection=[j2_string, j2_id] +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) + query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e73f4ec3e32da..67630fe549e68 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,3 +1528,56 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) + +# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] + +# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey +13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From 818926d8acf578992b0dbd4d7ab3936db5d030f1 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 10:37:47 +0100 Subject: [PATCH 19/26] Simplify loop logic --- datafusion/sql/src/expr/identifier.rs | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index d6a40ceb51655..1e7a6f3b75776 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -173,9 +173,6 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - let outer_schemas = planner_context.outer_queries_schemas(); - let mut maybe_result = None; - if !outer_schemas.is_empty() { for outer in planner_context.outer_queries_schemas() { let search_result = search_dfschema(&ids, &outer); let result = match search_result { @@ -186,8 +183,7 @@ impl SqlToRel<'_, S> { // TODO: remove when can support nested identifiers for OuterReferenceColumn not_impl_err!( "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)) - .quoted_flat_name() + Column::from((qualifier, field)).quoted_flat_name() ) } // Found matching field with no spare identifier(s) @@ -201,18 +197,8 @@ impl SqlToRel<'_, S> { // Found no matching field, will return a default None => continue, }; - maybe_result = Some(result); - break; + return result; } - if let Some(result) = maybe_result { - result - } else { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } - } else { let s = &ids[0..ids.len()]; // Safe unwrap as s can never be empty or exceed the bounds let (relation, column_name) = form_identifier(s).unwrap(); @@ -223,7 +209,6 @@ impl SqlToRel<'_, S> { column.spans_mut().add_span(span); } Ok(Expr::Column(column)) - } } } } From 8a228bc38ddbda29f4979b63924d77df3c8b60d5 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 14:09:52 +0100 Subject: [PATCH 20/26] Fix fmt --- datafusion/sql/src/expr/identifier.rs | 73 ++++++++++++++------------- datafusion/sql/src/select.rs | 3 +- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 1e7a6f3b75776..511bd4ad47608 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -173,42 +173,43 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - for outer in planner_context.outer_queries_schemas() { - let search_result = search_dfschema(&ids, &outer); - let result = match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( - "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)).quoted_flat_name() - ) - } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - Arc::clone(field), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => continue, - }; - return result; - } - let s = &ids[0..ids.len()]; - // Safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = form_identifier(s).unwrap(); - let mut column = Column::new(relation, column_name); - if self.options.collect_spans - && let Some(span) = ids_span - { - column.spans_mut().add_span(span); - } - Ok(Expr::Column(column)) + for outer in planner_context.outer_queries_schemas() { + let search_result = search_dfschema(&ids, &outer); + let result = match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove this when we have support for nested identifiers for OuterReferenceColumn + not_impl_err!( + "Nested identifiers are not yet supported for OuterReferenceColumn {}", + Column::from((qualifier, field)) + .quoted_flat_name() + ) + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + Ok(Expr::OuterReferenceColumn( + Arc::clone(field), + Column::from((qualifier, field)), + )) + } + // Found no matching field, will return a default + None => continue, + }; + return result; + } + // Safe unwrap as column name can never be empty or exceed the bounds + let (relation, column_name) = + form_identifier(&ids[0..ids.len()]).unwrap(); + let mut column = Column::new(relation, column_name); + if self.options.collect_spans + && let Some(span) = ids_span + { + column.spans_mut().add_span(span); + } + Ok(Expr::Column(column)) } } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 8f966a0f5065c..e64b41fecb997 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -638,7 +638,8 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); - let outer_query_schema_vec = planner_context.outer_queries_schemas().to_vec(); + let outer_query_schema_vec = + planner_context.outer_queries_schemas().to_vec(); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; From 20583c5e59a3898904ea528c9a21efb41d07401e Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 14:32:15 +0100 Subject: [PATCH 21/26] Remove redundant tests for sql integrations --- datafusion/sql/tests/sql_integration.rs | 70 +------------------------ 1 file changed, 1 insertion(+), 69 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 01620fb029fbb..38dc04ee5d741 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4925,72 +4925,4 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: customer "# ); -} - -#[test] -fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() - { - let sql = "select c_custkey from customer - where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and exists ( - select * from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) - ) order by c_custkey"; - - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" -Sort: customer.c_custkey ASC NULLS LAST - Projection: customer.c_custkey - Filter: customer.c_acctbal < () - Subquery: - Projection: sum(orders.o_totalprice) - Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] - Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND EXISTS () - Subquery: - Projection: lineitem.l_orderkey, lineitem.l_item_id, lineitem.l_description, lineitem.l_extendedprice, lineitem.price - Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) - TableScan: lineitem - TableScan: orders - TableScan: customer -"# - ); -} - -#[test] -fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() - { - let sql = "select c_custkey from customer - where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and o_totalprice in ( - select l_extendedprice as price from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) - ) order by c_custkey"; - - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" -Sort: customer.c_custkey ASC NULLS LAST - Projection: customer.c_custkey - Filter: customer.c_acctbal < () - Subquery: - Projection: sum(orders.o_totalprice) - Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] - Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND orders.o_totalprice IN () - Subquery: - Projection: lineitem.l_extendedprice AS price - Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) - TableScan: lineitem - TableScan: orders - TableScan: customer -"# - ); -} +} \ No newline at end of file From 510838a8ebfef20d8348364652d1240146fa886a Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 14:34:44 +0100 Subject: [PATCH 22/26] fixup! Remove redundant tests for sql integrations --- datafusion/sql/tests/sql_integration.rs | 32 ------------------------- 1 file changed, 32 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 38dc04ee5d741..4100dd0bf3585 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4860,38 +4860,6 @@ fn test_using_join_wildcard_schema() { ); } -#[test] -fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() -{ - let sql = "SELECT * FROM j1 j1_outer, LATERAL ( - SELECT * FROM j1 j1_inner, LATERAL ( - SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id - ) as j2 -) as j2"; - - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" -Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string - Cross Join: - SubqueryAlias: j1_outer - TableScan: j1 - SubqueryAlias: j2 - Subquery: - Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string - Cross Join: - SubqueryAlias: j1_inner - TableScan: j1 - SubqueryAlias: j2 - Subquery: - Projection: j2.j2_id, j2.j2_string - Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id - TableScan: j2 -"# - ); -} - #[test] fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { From 42170bff6b5fe9c69159fca8263047383d310a7c Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 14:38:35 +0100 Subject: [PATCH 23/26] fixup! fixup! Remove redundant tests for sql integrations --- datafusion/sql/tests/sql_integration.rs | 32 +++++++++++++++++ datafusion/sqllogictest/test_files/joins.slt | 38 ++++---------------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 4100dd0bf3585..38dc04ee5d741 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4860,6 +4860,38 @@ fn test_using_join_wildcard_schema() { ); } +#[test] +fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() +{ + let sql = "SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_outer + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_inner + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j2.j2_id, j2.j2_string + Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id + TableScan: j2 +"# + ); +} + #[test] fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index a0987e6d72227..35f750f8bb1c0 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -57,15 +57,15 @@ statement ok CREATE TABLE join_t3(s3 struct) AS VALUES (NULL), - ({id: 1}), - ({id: 2}); + (struct(1)), + (struct(2)); statement ok CREATE TABLE join_t4(s4 struct) AS VALUES (NULL), - ({id: 2}), - ({id: 3}); + (struct(2)), + (struct(3)); # Left semi anti join @@ -4646,29 +4646,6 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) -# 2 nested lateral join with the deepest join referencing the outer most relation -query TT -explain SELECT * FROM j1 j1_outer, LATERAL ( - SELECT * FROM j1 j1_inner, LATERAL ( - SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id - ) as j2 -) as j2; ----- -logical_plan -01)Cross Join: -02)--SubqueryAlias: j1_outer -03)----TableScan: j1 projection=[j1_string, j1_id] -04)--SubqueryAlias: j2 -05)----Subquery: -06)------Cross Join: -07)--------SubqueryAlias: j1_inner -08)----------TableScan: j1 projection=[j1_string, j1_id] -09)--------SubqueryAlias: j2 -10)----------Subquery: -11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id -12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- @@ -5072,10 +5049,9 @@ WHERE k1 < 0 ---- physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)] -02)--FilterExec: k2@0 < 0 -03)----DataSourceExec: partitions=1, partition_sizes=[0] -04)--FilterExec: k1@0 < 0 -05)----DataSourceExec: partitions=1, partition_sizes=[10000] +02)--DataSourceExec: partitions=1, partition_sizes=[0] +03)--FilterExec: k1@0 < 0 +04)----DataSourceExec: partitions=1, partition_sizes=[10000] query II SELECT * From 885c337147a9c64fd1be612f792b6f7da6ab6250 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 14:55:02 +0100 Subject: [PATCH 24/26] fixup! fixup! fixup! Remove redundant tests for sql integrations --- datafusion/sqllogictest/test_files/joins.slt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 35f750f8bb1c0..dd7f4710d9dbb 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -57,15 +57,15 @@ statement ok CREATE TABLE join_t3(s3 struct) AS VALUES (NULL), - (struct(1)), - (struct(2)); + ({id: 1}), + ({id: 2}); statement ok CREATE TABLE join_t4(s4 struct) AS VALUES (NULL), - (struct(2)), - (struct(3)); + ({id: 2}), + ({id: 3}); # Left semi anti join @@ -5049,9 +5049,10 @@ WHERE k1 < 0 ---- physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)] -02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--FilterExec: k1@0 < 0 -04)----DataSourceExec: partitions=1, partition_sizes=[10000] +02)--FilterExec: k2@0 < 0 +03)----DataSourceExec: partitions=1, partition_sizes=[0] +04)--FilterExec: k1@0 < 0 +05)----DataSourceExec: partitions=1, partition_sizes=[10000] query II SELECT * From d094f132b2fe8f1bf88059e5d27df8f3001d13a7 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 17:08:27 +0100 Subject: [PATCH 25/26] Fix fmt --- datafusion/sql/tests/sql_integration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 38dc04ee5d741..444bdae73ac26 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4925,4 +4925,4 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: customer "# ); -} \ No newline at end of file +} From 14aac861b43f9c2f1cf505bd2bcd7949eed17541 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sat, 7 Feb 2026 17:37:20 +0100 Subject: [PATCH 26/26] Fix clippy --- datafusion/sql/src/expr/identifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 511bd4ad47608..b1ed735839c16 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -174,7 +174,7 @@ impl SqlToRel<'_, S> { } else { // Check the outer_query_schema and try to find a match for outer in planner_context.outer_queries_schemas() { - let search_result = search_dfschema(&ids, &outer); + let search_result = search_dfschema(&ids, outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure Some((field, qualifier, nested_names))