Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ pub fn check_subquery_expr(
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
"Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"
"Correlated scalar subquery in the GROUP BY clause must \
also be in the aggregate expressions"
)
} else {
Ok(())
}
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
"Correlated scalar subquery can only be used in Projection, \
Filter, Aggregate plan nodes"
),
}?;
}
Expand Down
85 changes: 47 additions & 38 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema()
&& let Ok((qualifier, field)) =
for outer in planner_context.outer_queries_schemas() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
));
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
));
}
}

// Default case
Expand Down Expand Up @@ -172,36 +173,44 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
not_impl_err!("compound identifier: {ids:?}")
} else {
// Check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
let search_result = search_dfschema(&ids, outer);
match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
Column::from((qualifier, field))
.quoted_flat_name()
)
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) =
form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
let outer_schemas = planner_context.outer_queries_schemas();
let mut maybe_result = None;
if !outer_schemas.is_empty() {
for outer in planner_context.outer_queries_schemas() {
let search_result = search_dfschema(&ids, &outer);
let result = match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
Column::from((qualifier, field))
.quoted_flat_name()
)
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
Arc::clone(field),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => continue,
};
maybe_result = Some(result);
break;
}
if let Some(result) = maybe_result {
result
} else {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) = form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
} else {
let s = &ids[0..ids.len()];
Expand Down
20 changes: 8 additions & 12 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();
Ok(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -54,8 +53,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = &subquery.body.as_ref() {
Expand All @@ -70,7 +68,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -98,8 +96,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
Expand All @@ -112,7 +109,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -172,8 +169,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
Expand All @@ -188,7 +184,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down
34 changes: 22 additions & 12 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,11 @@ pub struct PlannerContext {
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
outer_query_schema: Option<DFSchemaRef>,

/// The queries schemas of outer query relations, used to resolve the outer referenced
/// columns in subquery (recursive aware)
outer_queries_schemas_stack: Vec<DFSchemaRef>,

/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
outer_from_schema: Option<DFSchemaRef>,
Expand All @@ -282,7 +285,7 @@ impl PlannerContext {
Self {
prepare_param_data_types: Arc::new(vec![]),
ctes: HashMap::new(),
outer_query_schema: None,
outer_queries_schemas_stack: vec![],
outer_from_schema: None,
create_table_schema: None,
}
Expand All @@ -297,19 +300,26 @@ impl PlannerContext {
self
}

// Return a reference to the outer query's schema
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref().map(|s| s.as_ref())
/// Return the stack of outer relations' schemas, the outer most
/// relation are at the first entry
pub fn outer_queries_schemas(&self) -> Vec<DFSchemaRef> {
self.outer_queries_schemas_stack.to_vec()
}

/// Sets the outer query schema, returning the existing one, if
/// any
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.outer_query_schema, &mut schema);
schema
pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
self.outer_queries_schemas_stack.push(schema);
}

/// The schema of the adjacent outer relation
pub fn latest_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.last().cloned()
}

/// Remove the schema of the adjacent outer relation
pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.pop()
}

pub fn set_table_schema(
Expand Down
23 changes: 14 additions & 9 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} => {
let tbl_func_ref = self.object_name_to_table_reference(name)?;
let schema = planner_context
.outer_query_schema()
.outer_queries_schemas()
.last()
.cloned()
.unwrap_or_else(DFSchema::empty);
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let func_args = args
.into_iter()
.map(|arg| match arg {
Expand Down Expand Up @@ -310,20 +311,24 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let old_from_schema = planner_context
.set_outer_from_schema(None)
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let new_query_schema = match planner_context.outer_query_schema() {
Some(old_query_schema) => {
let outer_query_schema = planner_context.pop_outer_query_schema();
let new_query_schema = match outer_query_schema {
Some(ref old_query_schema) => {
let mut new_query_schema = old_from_schema.as_ref().clone();
new_query_schema.merge(old_query_schema);
Some(Arc::new(new_query_schema))
new_query_schema.merge(old_query_schema.as_ref());
Arc::new(new_query_schema)
}
None => Some(Arc::clone(&old_from_schema)),
None => Arc::clone(&old_from_schema),
};
let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
planner_context.append_outer_query_schema(new_query_schema);

let plan = self.create_relation(subquery, planner_context)?;
let outer_ref_columns = plan.all_out_ref_exprs();

planner_context.set_outer_query_schema(old_query_schema);
planner_context.pop_outer_query_schema();
if let Some(schema) = outer_query_schema {
planner_context.append_outer_query_schema(schema);
}
planner_context.set_outer_from_schema(Some(old_from_schema));

// We can omit the subquery wrapper if there are no columns
Expand Down
20 changes: 13 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, Result, not_impl_err, plan_err};
use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -637,12 +637,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let outer_query_schema_vec = planner_context.outer_queries_schemas();
let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;

Expand All @@ -657,9 +653,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let mut schema_stack: Vec<Vec<&DFSchema>> =
vec![vec![plan.schema()], fallback_schemas];
for sc in outer_query_schema_vec.iter().rev() {
schema_stack.push(vec![sc.as_ref()]);
}

let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
schema_stack
.iter()
.map(|sc| sc.as_slice())
.collect::<Vec<&[&DFSchema]>>()
.as_slice(),
&[using_columns],
)?;

Expand Down
16 changes: 16 additions & 0 deletions datafusion/sql/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,26 @@ impl ContextProvider for MockContextProvider {
])),
"orders" => Ok(Schema::new(vec![
Field::new("order_id", DataType::UInt32, false),
Field::new("o_orderkey", DataType::UInt32, false),
Copy link
Contributor Author

@mkleen mkleen Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These extensions were necessary to get the tests to work.

Field::new("o_custkey", DataType::UInt32, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("customer_id", DataType::UInt32, false),
Field::new("o_totalprice", DataType::Decimal32(15, 2), false),
Field::new("o_item_id", DataType::Utf8, false),
Field::new("qty", DataType::Int32, false),
Field::new("price", DataType::Float64, false),
Field::new("delivered", DataType::Boolean, false),
])),
"customer" => Ok(Schema::new(vec![
Field::new("c_custkey", DataType::UInt32, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_nationkey", DataType::UInt32, false),
Field::new("c_phone", DataType::Decimal32(15, 2), false),
Field::new("c_acctbal", DataType::Float64, false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
])),
"array" => Ok(Schema::new(vec![
Field::new(
"left",
Expand All @@ -186,8 +200,10 @@ impl ContextProvider for MockContextProvider {
),
])),
"lineitem" => Ok(Schema::new(vec![
Field::new("l_orderkey", DataType::UInt32, false),
Field::new("l_item_id", DataType::UInt32, false),
Field::new("l_description", DataType::Utf8, false),
Field::new("l_extendedprice", DataType::Decimal32(15, 2), false),
Field::new("price", DataType::Float64, false),
])),
"aggregate_test_100" => Ok(Schema::new(vec![
Expand Down
Loading