From ebfb7b1a334a41aebf42f22b7e1b51ab199d88d8 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 16:10:07 +0200 Subject: [PATCH 01/15] fix: allow OuterRefColumn for non-adjacent outer relation --- datafusion/sql/src/expr/identifier.rs | 77 +++++++++++--------- datafusion/sql/src/expr/subquery.rs | 20 ++--- datafusion/sql/src/planner.rs | 29 +++++--- datafusion/sql/src/relation/mod.rs | 19 +++-- datafusion/sql/src/select.rs | 6 +- datafusion/sqllogictest/test_files/debug.slt | 52 +++++++++++++ 6 files changed, 132 insertions(+), 71 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 34fbe2edf8dd9..221fd517a2418 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -76,15 +76,16 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - if let Some(outer) = planner_context.outer_query_schema() - && let Ok((qualifier, field)) = + for outer in planner_context.outer_query_schema() { + if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) - { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - return Ok(Expr::OuterReferenceColumn( - Arc::clone(field), - Column::from((qualifier, field)), - )); + { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + return Ok(Expr::OuterReferenceColumn( + Arc::clone(field), + Column::from((qualifier, field)), + )); + } } // Default case @@ -172,36 +173,44 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - let search_result = search_dfschema(&ids, outer); - match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( + let outer_schemas = planner_context.outer_query_schema(); + let mut maybe_result = None; + if outer_schemas.len() > 0 { + for outer in planner_context.outer_query_schema() { + let search_result = search_dfschema(&ids, outer); + let result = match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove when can support nested identifiers for OuterReferenceColumn + not_impl_err!( "Nested identifiers are not yet supported for OuterReferenceColumn {}", Column::from((qualifier, field)) .quoted_flat_name() ) - } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - Arc::clone(field), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = - form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + Ok(Expr::OuterReferenceColumn( + Arc::clone(field), + Column::from((qualifier, field)), + )) + } + // Found no matching field, will return a default + None => continue, + }; + maybe_result = Some(result); + break; + } + if let Some(result) = maybe_result { + result + } else { + let s = &ids[0..ids.len()]; + // safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + Ok(Expr::Column(Column::new(relation, column_name))) } } else { let s = &ids[0..ids.len()]; diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 6837b2671cb1c..007de5c581464 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,11 +31,10 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); Ok(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(sub_plan), @@ -54,8 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = &subquery.body.as_ref() { @@ -70,7 +68,7 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, @@ -98,8 +96,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -112,7 +109,7 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, @@ -172,8 +169,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -188,7 +184,7 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 520a2d55ef6a2..0de96c15a2555 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -262,7 +262,7 @@ pub struct PlannerContext { /// Use `Arc` to allow cheap cloning ctes: HashMap>, /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Option, + outer_query_schema: Vec, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -282,7 +282,7 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, + outer_query_schema: vec![], outer_from_schema: None, create_table_schema: None, } @@ -298,18 +298,27 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + pub fn outer_query_schema(&self) -> Vec<&DFSchema> { + self.outer_query_schema + .iter() + .map(|sc| sc.as_ref()) + .collect() } /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema + pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { + self.outer_query_schema.push(schema); + } + + pub fn latest_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.last().clone().cloned() + } + + /// Sets the outer query schema, returning the existing one, if + /// any + pub fn pop_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index cef3726c62e40..d11ddb435c702 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -27,7 +27,7 @@ use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ PlannedRelation, RelationPlannerContext, RelationPlanning, }; -use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, expr::Unnest}; +use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; @@ -262,9 +262,8 @@ impl SqlToRel<'_, S> { } => { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context - .outer_query_schema() - .cloned() - .unwrap_or_else(DFSchema::empty); + .latest_outer_query_schema() + .unwrap_or(Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { @@ -310,20 +309,20 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.outer_query_schema() { + let new_query_schema = match planner_context.pop_outer_query_schema() { Some(old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); - new_query_schema.merge(old_query_schema); - Some(Arc::new(new_query_schema)) + new_query_schema.merge(old_query_schema.as_ref()); + Arc::new(new_query_schema) } - None => Some(Arc::clone(&old_from_schema)), + None => Arc::clone(&old_from_schema), }; - let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); + planner_context.set_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_query_schema); + planner_context.pop_outer_query_schema(); planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..0803733a048bc 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -637,14 +637,10 @@ impl SqlToRel<'_, S> { match selection { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); - let outer_query_schema = planner_context.outer_query_schema().cloned(); - let outer_query_schema_vec = outer_query_schema - .as_ref() - .map(|schema| vec![schema]) - .unwrap_or_else(Vec::new); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; + let outer_query_schema_vec = planner_context.outer_query_schema(); // Check for aggregation functions let aggregate_exprs = diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt new file mode 100644 index 0000000000000..48fd16bc0fd97 --- /dev/null +++ b/datafusion/sqllogictest/test_files/debug.slt @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + +statement ok +CREATE TABLE employees ( + employee_id INTEGER, + employee_name VARCHAR, + dept_id INTEGER, + salary DECIMAL +); + +statement ok +CREATE TABLE project_assignments ( + project_id INTEGER, + employee_id INTEGER, + priority INTEGER +); + + + +query TT +explain SELECT e1.employee_name, e1.salary +FROM employees e1 +WHERE e1.salary > ( + SELECT AVG(e2.salary) + FROM employees e2 + WHERE e2.dept_id = e1.dept_id + AND e2.salary > ( + SELECT AVG(e3.salary) + FROM employees e3 + WHERE e3.dept_id = e1.dept_id + ) +); +---- \ No newline at end of file From 7337cd07db88a58b47fc17dbed838cadffafb37c Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:14:53 +0200 Subject: [PATCH 02/15] fix: accidentally pushdown filter with subquery --- .../expr/src/logical_plan/invariants.rs | 9 ++++-- datafusion/optimizer/src/push_down_filter.rs | 14 ++++++++- .../sqllogictest/test_files/subquery.slt | 29 +++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index b39b23e30f4e8..3bb76706563a1 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -214,9 +214,12 @@ pub fn check_subquery_expr( Ok(()) } } - _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" - ), + any => { + println!("here {any}"); + plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" + ) + } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 1ff8bdfeff4c0..ef691db186e55 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1131,7 +1131,11 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = filter_predicates .into_iter() - .partition(|pred| pred.is_volatile()); + // TODO: subquery decorrelation sometimes cannot decorrelated all the expr + // (i.e in the case of recursive subquery) + // this function may accidentally pushdown the subquery expr as well + // until then, we have to exclude these exprs here + .partition(|pred| pred.is_volatile() || has_subquery(*pred)); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1423,6 +1427,14 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } +fn has_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + _ => Ok(false), + }) + .unwrap() +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e73f4ec3e32da..34c2c3b1003ac 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,3 +1528,32 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) +======= +# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice < ( + select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_1 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < () +10)----------------Subquery: +11)------------------Projection: sum(lineitem.l_extendedprice) AS price +12)--------------------Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] +13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) +14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] +15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] From 1f5e134ea6541af0e73fbccc0a2162df932d822f Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:20:20 +0200 Subject: [PATCH 03/15] chore: clippy --- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/planner.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index ef691db186e55..5621b76080897 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1135,7 +1135,7 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(*pred)); + .partition(|pred| pred.is_volatile() || has_subquery(pred)); // Check which non-volatile filters are supported by source let supported_filters = scan diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 221fd517a2418..2a8e2a9a77579 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -175,7 +175,7 @@ impl SqlToRel<'_, S> { // Check the outer_query_schema and try to find a match let outer_schemas = planner_context.outer_query_schema(); let mut maybe_result = None; - if outer_schemas.len() > 0 { + if !outer_schemas.is_empty() { for outer in planner_context.outer_query_schema() { let search_result = search_dfschema(&ids, outer); let result = match search_result { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0de96c15a2555..121ed9655912a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -312,7 +312,7 @@ impl PlannerContext { } pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().clone().cloned() + self.outer_query_schema.last().cloned() } /// Sets the outer query schema, returning the existing one, if From 293f58c717a43887ebb31af7590e1f5cdcf7ea5f Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:23:33 +0200 Subject: [PATCH 04/15] chore: rm debug details --- .../expr/src/logical_plan/invariants.rs | 7 +-- datafusion/sqllogictest/test_files/debug.slt | 52 ------------------- 2 files changed, 2 insertions(+), 57 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 3bb76706563a1..42a2993fbea37 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -214,12 +214,9 @@ pub fn check_subquery_expr( Ok(()) } } - any => { - println!("here {any}"); - plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" + _ => plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" ) - } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt deleted file mode 100644 index 48fd16bc0fd97..0000000000000 --- a/datafusion/sqllogictest/test_files/debug.slt +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# make sure to a batch size smaller than row number of the table. -statement ok -set datafusion.execution.batch_size = 2; - -statement ok -CREATE TABLE employees ( - employee_id INTEGER, - employee_name VARCHAR, - dept_id INTEGER, - salary DECIMAL -); - -statement ok -CREATE TABLE project_assignments ( - project_id INTEGER, - employee_id INTEGER, - priority INTEGER -); - - - -query TT -explain SELECT e1.employee_name, e1.salary -FROM employees e1 -WHERE e1.salary > ( - SELECT AVG(e2.salary) - FROM employees e2 - WHERE e2.dept_id = e1.dept_id - AND e2.salary > ( - SELECT AVG(e3.salary) - FROM employees e3 - WHERE e3.dept_id = e1.dept_id - ) -); ----- \ No newline at end of file From aebbd275dd950b3ae8e430129ba3bf40e854c1c1 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 18:50:33 +0200 Subject: [PATCH 05/15] fix: breaking changes --- .../expr/src/logical_plan/invariants.rs | 8 +++-- datafusion/optimizer/src/push_down_filter.rs | 8 +++-- datafusion/sql/src/expr/identifier.rs | 6 ++-- datafusion/sql/src/planner.rs | 35 ++++++++++++++----- datafusion/sql/src/select.rs | 2 +- 5 files changed, 40 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 42a2993fbea37..0889afd08fee4 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -208,15 +208,17 @@ pub fn check_subquery_expr( if group_expr.contains(expr) && !aggr_expr.contains(expr) { // TODO revisit this validation logic plan_err!( - "Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions" + "Correlated scalar subquery in the GROUP BY clause must \ + also be in the aggregate expressions" ) } else { Ok(()) } } _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" - ) + "Correlated scalar subquery can only be used in Projection, \ + Filter, Aggregate plan nodes" + ), }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 5621b76080897..d0e2355a1d9f2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1135,7 +1135,9 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(pred)); + .partition(|pred| { + pred.is_volatile() || has_scalar_subquery(pred) + }); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1427,9 +1429,9 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } -fn has_subquery(expr: &Expr) -> bool { +fn has_scalar_subquery(expr: &Expr) -> bool { expr.exists(|e| match e { - Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + Expr::ScalarSubquery(_) => Ok(true), _ => Ok(false), }) .unwrap() diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 2a8e2a9a77579..547e26c55f4a1 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -76,7 +76,7 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) { @@ -173,10 +173,10 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - let outer_schemas = planner_context.outer_query_schema(); + let outer_schemas = planner_context.outer_queries_schemas(); let mut maybe_result = None; if !outer_schemas.is_empty() { - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { let search_result = search_dfschema(&ids, outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 121ed9655912a..d769d81f57c72 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -261,8 +261,16 @@ pub struct PlannerContext { /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, + + /// The queries schemas of outer query relations, used to resolve the outer referenced + /// columns in subquery (recursive aware) + outer_queries_schemas_stack: Vec, + /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Vec, + /// This field is maintained to support deprecated functions + /// `outer_query_schema` and `set_outer_query_schema` + /// which is only aware of the adjacent outer relation + outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -282,7 +290,8 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: vec![], + outer_query_schema: None, + outer_queries_schemas_stack: vec![], outer_from_schema: None, create_table_schema: None, } @@ -298,8 +307,16 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Vec<&DFSchema> { - self.outer_query_schema + // This function is only compatible with + #[deprecated(note = "Use outer_queries_schemas instead")] + pub fn outer_query_schema(&self) -> Option<&DFSchema> { + self.outer_query_schema.as_ref().map(|s| s.as_ref()) + } + + /// Return the stack of outer relations' schemas, the outer most + /// relation are at the first entry + pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { + self.outer_queries_schemas_stack .iter() .map(|sc| sc.as_ref()) .collect() @@ -308,17 +325,17 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { - self.outer_query_schema.push(schema); + self.outer_queries_schemas_stack.push(schema); } + /// The schema of the adjacent outer relation pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().cloned() + self.outer_queries_schemas_stack.last().cloned() } - /// Sets the outer query schema, returning the existing one, if - /// any + /// Remove the schema of the adjacent outer relation pub fn pop_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.pop() + self.outer_queries_schemas_stack.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 0803733a048bc..624c04f892815 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -640,7 +640,7 @@ impl SqlToRel<'_, S> { let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_query_schema(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = From 5f533bb35af4623f44eec97e27af32d5b1763316 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:38:51 +0200 Subject: [PATCH 06/15] fix: lateral join losing its outer ref columns --- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/planner.rs | 7 ++----- datafusion/sql/src/relation/mod.rs | 8 ++++++-- datafusion/sql/src/select.rs | 16 +++++++++++++--- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 547e26c55f4a1..ace8170241e66 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -177,7 +177,7 @@ impl SqlToRel<'_, S> { let mut maybe_result = None; if !outer_schemas.is_empty() { for outer in planner_context.outer_queries_schemas() { - let search_result = search_dfschema(&ids, outer); + let search_result = search_dfschema(&ids, &outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure Some((field, qualifier, nested_names)) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index d769d81f57c72..0368f215e627d 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -315,11 +315,8 @@ impl PlannerContext { /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry - pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { - self.outer_queries_schemas_stack - .iter() - .map(|sc| sc.as_ref()) - .collect() + pub fn outer_queries_schemas(&self) -> Vec { + self.outer_queries_schemas_stack.to_vec() } /// Sets the outer query schema, returning the existing one, if diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index d11ddb435c702..470e099dda3b3 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -309,8 +309,9 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.pop_outer_query_schema() { - Some(old_query_schema) => { + let outer_query_schema = planner_context.pop_outer_query_schema(); + let new_query_schema = match outer_query_schema { + Some(ref old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); new_query_schema.merge(old_query_schema.as_ref()); Arc::new(new_query_schema) @@ -323,6 +324,9 @@ impl SqlToRel<'_, S> { let outer_ref_columns = plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); + if let Some(schema) = outer_query_schema { + planner_context.set_outer_query_schema(schema); + } planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 624c04f892815..4e0946bdd4e4e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -29,7 +29,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{Column, Result, not_impl_err, plan_err}; +use datafusion_common::{DFSchema, Column, Result, not_impl_err, plan_err}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -638,9 +638,9 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = @@ -653,9 +653,19 @@ impl SqlToRel<'_, S> { let mut using_columns = HashSet::new(); expr_to_columns(&filter_expr, &mut using_columns)?; + let mut schema_stack: Vec> = + vec![vec![plan.schema()], fallback_schemas]; + for sc in outer_query_schema_vec.iter().rev() { + schema_stack.push(vec![sc.as_ref()]); + } + let filter_expr = normalize_col_with_schemas_and_ambiguity_check( filter_expr, - &[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec], + schema_stack + .iter() + .map(|sc| sc.as_slice()) + .collect::>() + .as_slice(), &[using_columns], )?; From 69de77aa6378b5e5cdc94ad1c7ba5c4902876bfc Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:56:11 +0200 Subject: [PATCH 07/15] test: more test case for other decorrelation --- datafusion/sqllogictest/test_files/joins.slt | 24 ++++++++ .../sqllogictest/test_files/subquery.slt | 57 ++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index ae87fd11d397c..816ab13419762 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4646,6 +4646,30 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +# 2 nested lateral join with the deepest join referencing the outer most relation +query TT +explain SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2; +---- +logical_plan +01)Cross Join: +02)--SubqueryAlias: j1_outer +03)----TableScan: j1 projection=[j1_string, j1_id] +04)--SubqueryAlias: j2 +05)----Subquery: +06)------Cross Join: +07)--------SubqueryAlias: j1_inner +08)----------TableScan: j1 projection=[j1_string, j1_id] +09)--------SubqueryAlias: j2 +10)----------Subquery: +11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id +12)--------------TableScan: j2 projection=[j2_string, j2_id] +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) + + query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 34c2c3b1003ac..8fbea2d48c908 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,8 +1528,8 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) -======= -# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation + +# correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation query TT explain select c_custkey from customer where c_acctbal < ( @@ -1557,3 +1557,56 @@ logical_plan 13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) 14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] 15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] + +# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] + +# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey +13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From a5a03c8dd7ad806539c06035c7110205e1e7884d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 26 May 2025 06:35:10 +0200 Subject: [PATCH 08/15] doc: better comments --- datafusion/sql/src/expr/subquery.rs | 8 ++++---- datafusion/sql/src/planner.rs | 23 ++++++++++++++++++++--- datafusion/sql/src/relation/mod.rs | 4 ++-- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 007de5c581464..662c44f6f2620 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,7 +31,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); @@ -53,7 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = &subquery.body.as_ref() { @@ -96,7 +96,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -169,7 +169,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(Arc::new(input_schema.clone())); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0368f215e627d..6191c9963560f 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -306,13 +306,30 @@ impl PlannerContext { self } - // Return a reference to the outer query's schema - // This function is only compatible with + /// Return a reference to the outer query's schema + /// This function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` #[deprecated(note = "Use outer_queries_schemas instead")] pub fn outer_query_schema(&self) -> Option<&DFSchema> { self.outer_query_schema.as_ref().map(|s| s.as_ref()) } + /// Sets the outer query schema, returning the existing one, if + /// any, this function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` + #[deprecated( + note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" + )] + pub fn set_outer_query_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.outer_query_schema, &mut schema); + schema + } + /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry pub fn outer_queries_schemas(&self) -> Vec { @@ -321,7 +338,7 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema(&mut self, schema: DFSchemaRef) { + pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) { self.outer_queries_schemas_stack.push(schema); } diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 470e099dda3b3..3372416d39726 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -318,14 +318,14 @@ impl SqlToRel<'_, S> { } None => Arc::clone(&old_from_schema), }; - planner_context.set_outer_query_schema(new_query_schema); + planner_context.append_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); if let Some(schema) = outer_query_schema { - planner_context.set_outer_query_schema(schema); + planner_context.append_outer_query_schema(schema); } planner_context.set_outer_from_schema(Some(old_from_schema)); From a66e85dc9324ebb9eb82cc8d926f77cb75dd6d5d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 22 Nov 2025 15:24:40 +0100 Subject: [PATCH 09/15] fix: test --- datafusion/sql/src/relation/mod.rs | 6 ++++-- datafusion/sqllogictest/test_files/joins.slt | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 3372416d39726..d978d1737debd 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -262,8 +262,10 @@ impl SqlToRel<'_, S> { } => { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context - .latest_outer_query_schema() - .unwrap_or(Arc::new(DFSchema::empty())); + .outer_queries_schemas() + .last() + .cloned() + .unwrap_or_else(|| Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 816ab13419762..7d2ac3c15710e 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4667,8 +4667,7 @@ logical_plan 10)----------Subquery: 11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id 12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; From dd584010fef78119e13cbe509f6dc40a0bec9e29 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 13:10:16 +0100 Subject: [PATCH 10/15] Remove outer_query_schema from planner --- datafusion/sql/src/planner.rs | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 6191c9963560f..6a3a2cc4e5b5a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -266,11 +266,6 @@ pub struct PlannerContext { /// columns in subquery (recursive aware) outer_queries_schemas_stack: Vec, - /// The query schema of the outer query plan, used to resolve the columns in subquery - /// This field is maintained to support deprecated functions - /// `outer_query_schema` and `set_outer_query_schema` - /// which is only aware of the adjacent outer relation - outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -290,7 +285,6 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, outer_queries_schemas_stack: vec![], outer_from_schema: None, create_table_schema: None, @@ -306,30 +300,6 @@ impl PlannerContext { self } - /// Return a reference to the outer query's schema - /// This function should not be used together with - /// `outer_queries_schemas`, `append_outer_query_schema` - /// `latest_outer_query_schema` and `pop_outer_query_schema` - #[deprecated(note = "Use outer_queries_schemas instead")] - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) - } - - /// Sets the outer query schema, returning the existing one, if - /// any, this function should not be used together with - /// `outer_queries_schemas`, `append_outer_query_schema` - /// `latest_outer_query_schema` and `pop_outer_query_schema` - #[deprecated( - note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" - )] - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema - } - /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry pub fn outer_queries_schemas(&self) -> Vec { From 95e6d51b51fdcfe66133669bad9c4cfdc179fa37 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 10:20:50 +0100 Subject: [PATCH 11/15] Add new tests --- datafusion/sql/tests/common/mod.rs | 17 ++- datafusion/sql/tests/sql_integration.rs | 131 ++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 9dc6b895e49ab..0972fdd191b48 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -160,13 +160,26 @@ impl ContextProvider for MockContextProvider { Field::new("last_name", DataType::Utf8, false), ])), "orders" => Ok(Schema::new(vec![ - Field::new("order_id", DataType::UInt32, false), + Field::new("o_orderkey", DataType::UInt32, false), + Field::new("o_custkey", DataType::UInt32, false), + Field::new("o_orderstatus", DataType::Utf8, false), Field::new("customer_id", DataType::UInt32, false), + Field::new("o_totalprice", DataType::Decimal32(15, 2), false), Field::new("o_item_id", DataType::Utf8, false), Field::new("qty", DataType::Int32, false), Field::new("price", DataType::Float64, false), Field::new("delivered", DataType::Boolean, false), ])), + "customer" => Ok(Schema::new(vec![ + Field::new("c_custkey", DataType::UInt32, false), + Field::new("c_name", DataType::Utf8, false), + Field::new("c_address", DataType::Utf8, false), + Field::new("c_nationkey", DataType::UInt32, false), + Field::new("c_phone", DataType::Decimal32(15, 2), false), + Field::new("c_acctbal", DataType::Float64, false), + Field::new("c_mktsegment", DataType::Utf8, false), + Field::new("c_comment", DataType::Utf8, false), + ])), "array" => Ok(Schema::new(vec![ Field::new( "left", @@ -186,8 +199,10 @@ impl ContextProvider for MockContextProvider { ), ])), "lineitem" => Ok(Schema::new(vec![ + Field::new("l_orderkey", DataType::UInt32, false), Field::new("l_item_id", DataType::UInt32, false), Field::new("l_description", DataType::Utf8, false), + Field::new("l_extendedprice", DataType::Decimal32(15, 2), false), Field::new("price", DataType::Float64, false), ])), "aggregate_test_100" => Ok(Schema::new(vec![ diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 491873b4afe02..ca4b760726b14 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4855,3 +4855,134 @@ fn test_using_join_wildcard_schema() { ] ); } + +#[test] +fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() { + let sql = "SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_outer + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string + Cross Join: + SubqueryAlias: j1_inner + TableScan: j1 + SubqueryAlias: j2 + Subquery: + Projection: j2.j2_id, j2.j2_string + Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id + TableScan: j2 +"# +); +} + +#[test] +fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice < ( + select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND orders.o_totalprice < () + Subquery: + Projection: sum(lineitem.l_extendedprice) AS price + Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} + +#[test] +fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND EXISTS () + Subquery: + Projection: lineitem.l_orderkey, lineitem.l_item_id, lineitem.l_description, lineitem.l_extendedprice, lineitem.price + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} + +#[test] +fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() { + let sql = "select c_custkey from customer + where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) + ) order by c_custkey"; + + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: customer.c_acctbal < () + Subquery: + Projection: sum(orders.o_totalprice) + Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]] + Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND orders.o_totalprice IN () + Subquery: + Projection: lineitem.l_extendedprice AS price + Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) + TableScan: lineitem + TableScan: orders + TableScan: customer +"# +); +} \ No newline at end of file From 1bad56d87d527b889731e72844a1340d590a0c06 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 13:31:12 +0100 Subject: [PATCH 12/15] fixup! Add new tests --- datafusion/sql/tests/sql_integration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index ca4b760726b14..230ccce8d2eaf 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4869,13 +4869,13 @@ fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_r plan, @r#" Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_outer TableScan: j1 SubqueryAlias: j2 Subquery: Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string - Cross Join: + Cross Join: SubqueryAlias: j1_inner TableScan: j1 SubqueryAlias: j2 From 46861e10f46e557f067de27db5d7665ba6a2ae5e Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 14:10:52 +0100 Subject: [PATCH 13/15] fixup! fixup! Add new tests --- datafusion/optimizer/src/push_down_filter.rs | 16 +--- datafusion/sql/tests/common/mod.rs | 1 + datafusion/sql/tests/sql_integration.rs | 19 +++-- .../sqllogictest/test_files/subquery.slt | 82 ------------------- 4 files changed, 14 insertions(+), 104 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index d0e2355a1d9f2..1ff8bdfeff4c0 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1131,13 +1131,7 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = filter_predicates .into_iter() - // TODO: subquery decorrelation sometimes cannot decorrelated all the expr - // (i.e in the case of recursive subquery) - // this function may accidentally pushdown the subquery expr as well - // until then, we have to exclude these exprs here - .partition(|pred| { - pred.is_volatile() || has_scalar_subquery(pred) - }); + .partition(|pred| pred.is_volatile()); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1429,14 +1423,6 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } -fn has_scalar_subquery(expr: &Expr) -> bool { - expr.exists(|e| match e { - Expr::ScalarSubquery(_) => Ok(true), - _ => Ok(false), - }) - .unwrap() -} - #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 0972fdd191b48..686fdf503f3d6 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -160,6 +160,7 @@ impl ContextProvider for MockContextProvider { Field::new("last_name", DataType::Utf8, false), ])), "orders" => Ok(Schema::new(vec![ + Field::new("order_id", DataType::UInt32, false), Field::new("o_orderkey", DataType::UInt32, false), Field::new("o_custkey", DataType::UInt32, false), Field::new("o_orderstatus", DataType::Utf8, false), diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 230ccce8d2eaf..ccf6c0345548b 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -995,15 +995,15 @@ fn select_nested_with_filters() { #[test] fn table_with_column_alias() { - let sql = "SELECT a, b, c - FROM lineitem l (a, b, c)"; + let sql = "SELECT a, b, c, d, e + FROM lineitem l (a, b, c, d, e)"; let plan = logical_plan(sql).unwrap(); assert_snapshot!( plan, @r" - Projection: l.a, l.b, l.c + Projection: l.a, l.b, l.c, l.d, l.e SubqueryAlias: l - Projection: lineitem.l_item_id AS a, lineitem.l_description AS b, lineitem.price AS c + Projection: lineitem.l_orderkey AS a, lineitem.l_item_id AS b, lineitem.l_description AS c, lineitem.l_extendedprice AS d, lineitem.price AS e TableScan: lineitem " ); @@ -1017,7 +1017,7 @@ fn table_with_column_alias_number_cols() { assert_snapshot!( err.strip_backtrace(), - @"Error during planning: Source table contains 3 columns but only 2 names given as column alias" + @"Error during planning: Source table contains 5 columns but only 2 names given as column alias" ); } @@ -1058,7 +1058,7 @@ fn natural_left_join() { plan, @r" Projection: a.l_item_id - Left Join: Using a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.price = b.price + Left Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem SubqueryAlias: b @@ -1075,7 +1075,7 @@ fn natural_right_join() { plan, @r" Projection: a.l_item_id - Right Join: Using a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.price = b.price + Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem SubqueryAlias: b @@ -4801,11 +4801,16 @@ fn test_using_join_wildcard_schema() { // Only columns from one join side should be present let expected_fields = vec![ "o1.order_id".to_string(), + "o1.o_orderkey".to_string(), + "o1.o_custkey".to_string(), + "o1.o_orderstatus".to_string(), "o1.customer_id".to_string(), + "o1.o_totalprice".to_string(), "o1.o_item_id".to_string(), "o1.qty".to_string(), "o1.price".to_string(), "o1.delivered".to_string(), + ]; assert_eq!(plan.schema().field_names(), expected_fields); diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 8fbea2d48c908..e73f4ec3e32da 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,85 +1528,3 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) - -# correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and o_totalprice < ( - select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_1 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < () -10)----------------Subquery: -11)------------------Projection: sum(lineitem.l_extendedprice) AS price -12)--------------------Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] -13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) -14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] -15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] - -# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and exists ( - select * from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_2 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal -10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] -11)----------------SubqueryAlias: __correlated_sq_1 -12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] - -# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation -query TT -explain select c_custkey from customer -where c_acctbal < ( - select sum(o_totalprice) from orders - where o_custkey = c_custkey - and o_totalprice in ( - select l_extendedprice as price from lineitem where l_orderkey = o_orderkey - and l_extendedprice < c_acctbal - ) -) order by c_custkey; ----- -logical_plan -01)Sort: customer.c_custkey ASC NULLS LAST -02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) -04)------TableScan: customer projection=[c_custkey, c_acctbal] -05)------SubqueryAlias: __scalar_sq_2 -06)--------Projection: sum(orders.o_totalprice), orders.o_custkey -07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] -08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal -10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] -11)----------------SubqueryAlias: __correlated_sq_1 -12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey -13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From 5762e553c4d4704648613d5e5d3eb6e4d37066ce Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 14:19:57 +0100 Subject: [PATCH 14/15] Fix fmt --- datafusion/sql/src/expr/identifier.rs | 8 +++---- datafusion/sql/src/relation/mod.rs | 2 +- datafusion/sql/src/select.rs | 2 +- datafusion/sql/tests/sql_integration.rs | 31 ++++++++++++++----------- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index ace8170241e66..d6a40ceb51655 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -185,10 +185,10 @@ impl SqlToRel<'_, S> { { // TODO: remove when can support nested identifiers for OuterReferenceColumn not_impl_err!( - "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)) - .quoted_flat_name() - ) + "Nested identifiers are not yet supported for OuterReferenceColumn {}", + Column::from((qualifier, field)) + .quoted_flat_name() + ) } // Found matching field with no spare identifier(s) Some((field, qualifier, _nested_names)) => { diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index d978d1737debd..6558763ca4e42 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -27,7 +27,7 @@ use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ PlannedRelation, RelationPlannerContext, RelationPlanning, }; -use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, expr::Unnest}; use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 4e0946bdd4e4e..182bc97ad4d98 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -29,7 +29,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{DFSchema, Column, Result, not_impl_err, plan_err}; +use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index ccf6c0345548b..61569ea76ed2b 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4810,7 +4810,6 @@ fn test_using_join_wildcard_schema() { "o1.qty".to_string(), "o1.price".to_string(), "o1.delivered".to_string(), - ]; assert_eq!(plan.schema().field_names(), expected_fields); @@ -4862,7 +4861,8 @@ fn test_using_join_wildcard_schema() { } #[test] -fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() { +fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() +{ let sql = "SELECT * FROM j1 j1_outer, LATERAL ( SELECT * FROM j1 j1_inner, LATERAL ( SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id @@ -4889,11 +4889,12 @@ Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, j2.j2_id Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id TableScan: j2 "# -); + ); } #[test] -fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() { +fn test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4923,11 +4924,12 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); + ); } #[test] -fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() { +fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4940,8 +4942,8 @@ fn correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing let plan = logical_plan(sql).unwrap(); assert_snapshot!( - plan, - @r#" + plan, + @r#" Sort: customer.c_custkey ASC NULLS LAST Projection: customer.c_custkey Filter: customer.c_acctbal < () @@ -4956,11 +4958,12 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); + ); } #[test] -fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() { +fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation() + { let sql = "select c_custkey from customer where c_acctbal < ( select sum(o_totalprice) from orders @@ -4973,8 +4976,8 @@ fn correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_lev let plan = logical_plan(sql).unwrap(); assert_snapshot!( - plan, - @r#" + plan, + @r#" Sort: customer.c_custkey ASC NULLS LAST Projection: customer.c_custkey Filter: customer.c_acctbal < () @@ -4989,5 +4992,5 @@ Sort: customer.c_custkey ASC NULLS LAST TableScan: orders TableScan: customer "# -); -} \ No newline at end of file + ); +} From 99902bca76258ebf787bf8ad1c96ead4621446d9 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Wed, 21 Jan 2026 18:08:51 +0100 Subject: [PATCH 15/15] Remove test from joins.slt --- datafusion/sqllogictest/test_files/joins.slt | 23 -------------------- 1 file changed, 23 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 7d2ac3c15710e..ae87fd11d397c 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4646,29 +4646,6 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) -# 2 nested lateral join with the deepest join referencing the outer most relation -query TT -explain SELECT * FROM j1 j1_outer, LATERAL ( - SELECT * FROM j1 j1_inner, LATERAL ( - SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id - ) as j2 -) as j2; ----- -logical_plan -01)Cross Join: -02)--SubqueryAlias: j1_outer -03)----TableScan: j1 projection=[j1_string, j1_id] -04)--SubqueryAlias: j2 -05)----Subquery: -06)------Cross Join: -07)--------SubqueryAlias: j1_inner -08)----------TableScan: j1 projection=[j1_string, j1_id] -09)--------SubqueryAlias: j2 -10)----------Subquery: -11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id -12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ----