From 57682652df046624c6a139f67df050c77fc71d6a Mon Sep 17 00:00:00 2001 From: Albert Skalt Date: Mon, 19 Jan 2026 11:47:50 +0300 Subject: [PATCH] Wrap immutable plan parts into Arc - Closes https://github.com/apache/datafusion/issues/19852 Improve performance of query planning and plan state re-set by making node clone cheap. - Store projection as `Option>` instead of `Option>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`. --- .../custom_data_source/custom_datasource.rs | 3 +- datafusion/catalog-listing/src/table.rs | 2 +- datafusion/common/src/stats.rs | 16 +- datafusion/common/src/utils/mod.rs | 6 +- datafusion/core/src/datasource/empty.rs | 3 +- datafusion/core/src/physical_planner.rs | 2 +- .../core/tests/custom_sources_cases/mod.rs | 2 +- .../physical_optimizer/join_selection.rs | 2 +- datafusion/datasource/src/memory.rs | 2 +- datafusion/physical-expr/src/projection.rs | 188 ++++++++++++++++-- .../src/enforce_sorting/sort_pushdown.rs | 2 +- .../src/projection_pushdown.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 43 ++-- .../src/aggregates/no_grouping.rs | 6 +- .../physical-plan/src/aggregates/row_hash.rs | 12 +- .../src/aggregates/topk_stream.rs | 4 +- datafusion/physical-plan/src/common.rs | 2 +- datafusion/physical-plan/src/filter.rs | 51 +++-- .../physical-plan/src/joins/hash_join/exec.rs | 26 +-- .../src/joins/nested_loop_join.rs | 30 +-- datafusion/physical-plan/src/joins/utils.rs | 4 +- datafusion/physical-plan/src/projection.rs | 18 +- datafusion/physical-plan/src/test.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- .../custom-table-providers.md | 4 +- docs/source/library-user-guide/upgrading.md | 13 ++ 26 files changed, 307 insertions(+), 140 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf247..5b2438354710e 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -202,7 +202,8 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = + project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); let cache = Self::compute_properties(projected_schema.clone()); Self { db, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..e081280825135 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -522,7 +522,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + let projected_schema = project_schema(&self.schema(), projection.as_deref())?; return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..8511712c4837b 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,8 +391,8 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { - let Some(projection) = projection else { + pub fn project(mut self, projection: Option<&[usize]>) -> Self { + let Some(projection) = projection.map(AsRef::as_ref) else { return self; }; @@ -410,7 +410,7 @@ impl Statistics { .map(Slot::Present) .collect(); - for idx in projection { + for idx in projection.iter() { let next_idx = self.column_statistics.len(); let slot = std::mem::replace( columns.get_mut(*idx).expect("projection out of bounds"), @@ -1066,29 +1066,29 @@ mod tests { #[test] fn test_project_none() { - let projection = None; - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[usize]> = None; + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![10, 20, 30])); } #[test] fn test_project_empty() { let projection = Some(vec![]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref()); assert_eq!(stats, make_stats(vec![])); } #[test] fn test_project_swap() { let projection = Some(vec![2, 1]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref()); assert_eq!(stats, make_stats(vec![30, 20])); } #[test] fn test_project_repeated() { let projection = Some(vec![1, 2, 1, 1, 0, 2]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref()); assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde193..c22709af94814 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -59,7 +59,7 @@ use std::thread::available_parallelism; /// /// // Pick columns 'c' and 'b' /// let projection = Some(vec![2, 1]); -/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap(); +/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap(); /// /// let expected_schema = SchemaRef::new(Schema::new(vec![ /// Field::new("c", DataType::Utf8, true), @@ -70,9 +70,9 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { - let schema = match projection { + let schema = match projection.map(AsRef::as_ref) { Some(columns) => Arc::new(schema.project(columns)?), None => Arc::clone(schema), }; diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626d..882abd921a155 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable { _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = project_schema(&self.schema, projection)?; + let projected_schema = + project_schema(&self.schema, projection.map(AsRef::as_ref))?; Ok(Arc::new( EmptyExec::new(projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e7035910deb07..c7fe8fef37c56 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -984,7 +984,7 @@ impl DefaultPhysicalPlanner { // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .apply_projection(Some( - (0..input.schema().fields().len()).collect(), + (0..input.schema().fields().len()).collect::>(), ))? .with_batch_size(session_state.config().batch_size()) .build()? diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886b..17d5ff6469be1 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -86,7 +86,7 @@ impl CustomExecutionPlan { fn new(projection: Option>) -> Self { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = - project_schema(&schema, projection.as_ref()).expect("projected schema"); + project_schema(&schema, projection.as_deref()).expect("projected schema"); let cache = Self::compute_properties(schema); Self { projection, cache } } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591baa..9219ae43120ae 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..3fc388cd3c4ad 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -262,7 +262,7 @@ impl MemorySourceConfig { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..7039308bfe298 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err, + Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err, + plan_err, project_schema, }; use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet; @@ -125,7 +126,8 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +139,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +156,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new(exprs: I) -> Self - where - I: IntoIterator, - { + /// Make a new [`ProjectionExprs`] from expressions iterator. + pub fn new(exprs: impl IntoIterator) -> Self { + Self { + exprs: exprs.into_iter().collect(), + } + } + + /// Make a new [`ProjectionExprs`] from expressions. + pub fn from_expressions(exprs: impl Into>) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into(), } } @@ -285,13 +294,14 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() + .iter() + .cloned() .map(|mut proj| { proj.expr = f(proj.expr)?; Ok(proj) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>>()?; + Ok(Self::from_expressions(exprs)) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +371,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -602,12 +612,12 @@ impl ProjectionExprs { /// ``` pub fn project_statistics( &self, - mut stats: datafusion_common::Statistics, + mut stats: Statistics, output_schema: &Schema, - ) -> Result { + ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,12 +764,146 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; +/// Describes an option immutable reference counted shared projection. +/// +/// This structure represents projecting a set of columns by index. +/// It uses an [`Arc`] internally to make it cheap to clone. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct OptionProjectionRef { + inner: Option>, +} - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() +impl OptionProjectionRef { + /// Make a new [`OptionProjectionRef`]. + pub fn new(inner: Option>>) -> Self { + Self { + inner: inner.map(Into::into), + } + } + + /// Project inner. + pub fn as_inner(&self) -> &Option> { + &self.inner + } + + /// Consume self and return inner. + pub fn into_inner(self) -> Option> { + self.inner + } + + /// Represent this projection as option slice. + pub fn as_ref(&self) -> Option<&[usize]> { + self.inner.as_deref() + } + + /// Check if the projection is set. + pub fn is_some(&self) -> bool { + self.inner.is_some() + } + + /// Check if the projection is not set. + pub fn is_none(&self) -> bool { + self.inner.is_none() + } + + /// Apply passed `projection` to inner one. + /// + /// If inner projection is [`None`] then there are no changes. + /// Otherwise, if passed `projection` is not [`None`] then it is remapped + /// according to the stored one. Otherwise, there are no changes. + /// + /// # Example + /// + /// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`, + /// then the resulting projection will be [0, 3]. + /// + /// # Error + /// + /// Returns an internal error if existing projection contains index that is + /// greater than len of the passed `projection`. + /// + pub fn apply_projection<'a>( + self, + projection: impl Into>, + ) -> Result { + let projection = projection.into(); + let Some(existing_projection) = self.inner else { + return Ok(self); + }; + let Some(new_projection) = projection else { + return Ok(Self { + inner: Some(existing_projection), + }); + }; + Ok(Self::new(Some( + existing_projection + .iter() + .map(|i| { + let idx = *i; + assert_or_internal_err!( + idx < new_projection.len(), + "unable to apply projection: index {} is greater than new projection len {}", + idx, + new_projection.len(), + ); + Ok(new_projection[*i]) + }) + .collect::>>()?, + ))) + } + + /// Applies an optional projection to a [`SchemaRef`], returning the + /// projected schema. + pub fn project_schema(&self, schema: &SchemaRef) -> Result { + project_schema(schema, self.inner.as_deref()) + } + + /// Applies an optional projection to a [`Statistics`], returning the + /// projected stats. + pub fn project_statistics(&self, stats: Statistics) -> Statistics { + stats.project(self.inner.as_deref()) + } +} + +impl<'a> From<&'a OptionProjectionRef> for Option<&'a [usize]> { + fn from(value: &'a OptionProjectionRef) -> Self { + value.inner.as_deref() + } +} + +impl From> for OptionProjectionRef { + fn from(value: Vec) -> Self { + Self::new(Some(value)) + } +} + +impl From>> for OptionProjectionRef { + fn from(value: Option>) -> Self { + Self::new(value) + } +} + +impl FromIterator for OptionProjectionRef { + fn from_iter>(iter: T) -> Self { + Self::new(Some(iter.into_iter().collect::>())) + } +} + +impl PartialEq> for OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) + } +} + +impl PartialEq> for &OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) } } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e766e..2dc61ba2453fb 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -723,7 +723,7 @@ fn handle_hash_join( .collect(); let column_indices = build_join_column_index(plan); - let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() { projection.iter().map(|&i| &column_indices[i]).collect() } else { column_indices.iter().collect() diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 281d61aecf538..a4d652a7d9af8 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -135,7 +135,7 @@ fn try_push_down_join_filter( ); let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); - let projections = match projections { + let projections = match projections.as_ref() { None => match join.join_type() { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Build projections that ignore the newly projected columns. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d645f5c55d434..b183d7fce0d4c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -544,11 +544,14 @@ pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, /// Group by expressions - group_by: PhysicalGroupBy, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + group_by: Arc, /// Aggregate expressions - aggr_expr: Vec>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + filter_expr: Arc<[Option>]>, /// Configuration for limit-based optimizations limit_options: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -582,18 +585,18 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: impl Into]>>, ) -> Self { Self { - aggr_expr, + aggr_expr: aggr_expr.into(), // clone the rest of the fields required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + filter_expr: Arc::clone(&self.filter_expr), limit_options: self.limit_options, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -612,9 +615,9 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -629,12 +632,13 @@ impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); @@ -659,13 +663,16 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, mut aggr_expr: Vec>, - filter_expr: Vec>>, + filter_expr: impl Into>]>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); + let filter_expr = filter_expr.into(); + // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -732,13 +739,13 @@ impl AggregateExec { &group_expr_mapping, &mode, &input_order_mode, - aggr_expr.as_slice(), + aggr_expr.as_ref(), )?; let mut exec = AggregateExec { mode, group_by, - aggr_expr, + aggr_expr: aggr_expr.into(), filter_expr, input, schema, @@ -1287,9 +1294,9 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), + Arc::clone(&self.group_by), + self.aggr_expr.to_vec(), + Arc::clone(&self.filter_expr), Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..01ddd8517adab 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -61,7 +61,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, aggregate_expressions: Vec>>, - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, // ==== Runtime States/Buffers ==== accumulators: Vec, @@ -276,7 +276,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -287,7 +287,7 @@ impl AggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; let accumulators = create_accumulators(&agg.aggr_expr)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 49ce125e739b3..e5488754a1c7a 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream { /// /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, /// the filter expression is `x > 100`. - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, /// GROUP BY expressions - group_by: PhysicalGroupBy, + group_by: Arc, /// max rows in output RecordBatches batch_size: usize, @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream { ) -> Result { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_group_by = Arc::clone(&agg.group_by); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -496,7 +496,7 @@ impl GroupedHashAggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 72c5d0c86745d..4aa566ccfcd0a 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream { baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, aggregate_arguments: Vec>>, - group_by: PhysicalGroupBy, + group_by: Arc, priority_map: PriorityMap, } @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream { limit: usize, ) -> Result { let agg_schema = Arc::clone(&aggr.schema); - let group_by = aggr.group_by.clone(); + let group_by = Arc::clone(&aggr.group_by); let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad48..590f6f09e8b9e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 50fae84b85d0d..ea5b14f872fec 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use datafusion_physical_expr::projection::OptionProjectionRef; use itertools::Itertools; use super::{ @@ -85,7 +86,7 @@ pub struct FilterExec { /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: OptionProjectionRef, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -96,7 +97,7 @@ pub struct FilterExec { pub struct FilterExecBuilder { predicate: Arc, input: Arc, - projection: Option>, + projection: OptionProjectionRef, default_selectivity: u8, batch_size: usize, fetch: Option, @@ -108,7 +109,7 @@ impl FilterExecBuilder { Self { predicate, input, - projection: None, + projection: None.into(), default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, @@ -136,18 +137,14 @@ impl FilterExecBuilder { /// /// If no projection is currently set, the new projection is used directly. /// If `None` is passed, the projection is cleared. - pub fn apply_projection(mut self, projection: Option>) -> Result { + pub fn apply_projection( + mut self, + projection: impl Into, + ) -> Result { // Check if the projection is valid against current output schema + let projection = projection.into(); can_project(&self.input.schema(), projection.as_ref())?; - self.projection = match projection { - Some(new_proj) => match &self.projection { - Some(existing_proj) => { - Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) - } - None => Some(new_proj), - }, - None => None, - }; + self.projection = projection.apply_projection(&self.projection)?; Ok(self) } @@ -189,9 +186,7 @@ impl FilterExecBuilder { } // Validate projection if provided - if let Some(ref proj) = self.projection { - can_project(&self.input.schema(), Some(proj))?; - } + can_project(&self.input.schema(), self.projection.as_ref())?; // Compute properties once with all parameters let cache = FilterExec::compute_properties( @@ -302,8 +297,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -380,7 +375,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -554,7 +549,7 @@ impl ExecutionPlan for FilterExec { self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -648,7 +643,7 @@ impl ExecutionPlan for FilterExec { let new_predicate = conjunction(unhandled_filters); let updated_node = if new_predicate.eq(&lit(true)) { // FilterExec is no longer needed, but we may need to leave a projection in place - match self.projection() { + match self.projection().as_ref() { Some(projection_indices) => { let filter_child_schema = filter_input.schema(); let proj_exprs = projection_indices @@ -686,7 +681,7 @@ impl ExecutionPlan for FilterExec { self.default_selectivity, self.projection.as_ref(), )?, - projection: None, + projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, }; @@ -796,7 +791,7 @@ struct FilterExecStream { /// Runtime metrics recording metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema - projection: Option>, + projection: OptionProjectionRef, /// Batch coalescer to combine small batches batch_coalescer: LimitedBatchCoalescer, } @@ -887,8 +882,8 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match self.projection { - Some(ref projection) => { + Ok(match self.projection.as_ref() { + Some(projection) => { let projected_batch = batch.project(projection)?; (array, projected_batch) }, @@ -1739,7 +1734,7 @@ mod tests { let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set - assert_eq!(filter.projection(), None); + assert!(filter.projection().is_none()); // Verify schema contains all columns let output_schema = filter.schema(); @@ -1953,7 +1948,7 @@ mod tests { .build()?; // Verify composed projection is [0, 3] - assert_eq!(filter.projection(), Some(&vec![0, 3])); + assert_eq!(filter.projection(), Some(&[0, 3])); // Verify schema contains only columns a and d let output_schema = filter.schema(); @@ -1987,7 +1982,7 @@ mod tests { .build()?; // Projection should be cleared - assert_eq!(filter.projection(), None); + assert_eq!(filter.projection(), None::<&[usize]>); // Schema should have all columns let output_schema = filter.schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..96db8841082c4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -81,6 +81,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::projection::OptionProjectionRef; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; @@ -466,7 +467,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: OptionProjectionRef, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -530,7 +531,7 @@ impl HashJoinExec { on: JoinOn, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, partition_mode: PartitionMode, null_equality: NullEquality, null_aware: bool, @@ -566,6 +567,7 @@ impl HashJoinExec { let join_schema = Arc::new(join_schema); // check if the projection is valid + let projection = projection.into(); can_project(&join_schema, projection.as_ref())?; let cache = Self::compute_properties( @@ -686,16 +688,14 @@ impl HashJoinExec { } /// Return new instance of [HashJoinExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -717,7 +717,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -1181,7 +1181,7 @@ impl ExecutionPlan for HashJoinExec { let right_stream = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -1240,7 +1240,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b57f9132253bf..cd4dbcbadc588 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -71,6 +71,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; +use datafusion_physical_expr::projection::OptionProjectionRef; use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -192,7 +193,7 @@ pub struct NestedLoopJoinExec { /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join - projection: Option>, + projection: OptionProjectionRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -207,8 +208,9 @@ impl NestedLoopJoinExec { right: Arc, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, ) -> Result { + let projection = projection.into(); let left_schema = left.schema(); let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; @@ -257,8 +259,8 @@ impl NestedLoopJoinExec { &self.join_type } - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -267,7 +269,7 @@ impl NestedLoopJoinExec { right: &Arc, schema: &SchemaRef, join_type: JoinType, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -333,16 +335,14 @@ impl NestedLoopJoinExec { self.projection.is_some() } - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -521,7 +521,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -577,7 +577,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..e709703e07d45 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(|p| p.to_vec()), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8d4c775f87348..b10e54ce7f4ed 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -137,13 +137,19 @@ impl ProjectionExec { E: Into, { let input_schema = input.schema(); - // convert argument to Vec - let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = ProjectionExprs::new(expr_vec); + let expr_arc = expr.into_iter().map(Into::into).collect::>(); + let projection = ProjectionExprs::from_expressions(expr_arc); let projector = projection.make_projector(&input_schema)?; + Self::try_from_projector(projector, input) + } + fn try_from_projector( + projector: Projector, + input: Arc, + ) -> Result { // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = projection.projection_mapping(&input_schema)?; + let projection_mapping = + projector.projection().projection_mapping(&input.schema())?; let cache = Self::compute_properties( &input, &projection_mapping, @@ -301,8 +307,8 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { - ProjectionExec::try_new( - self.projector.projection().clone(), + ProjectionExec::try_from_projector( + self.projector.clone(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c35480..9b129f8020b75 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -235,7 +235,7 @@ impl TestMemoryExec { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 54892597b5a34..5fb7e28c22c19 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2957,7 +2957,7 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().map_or_else(Vec::new, |v| { + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::>() }), }, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 8e1dee9e843ac..983e7ab0edd0f 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -291,7 +291,7 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); Self { db, projected_schema, @@ -483,7 +483,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # schema: SchemaRef, # db: CustomDataSource, # ) -> Self { -# let projected_schema = project_schema(&schema, projections).unwrap(); +# let projected_schema = project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); # Self { # db, # projected_schema, diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 916ff4a82b2ef..b9a657e2c4094 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -23,6 +23,19 @@ **Note:** DataFusion `53.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +### Schema, statistics project fn take an option slice instead of Vec ref + +`project_schema` and `Statistics::project` now take `Option<&[usize]>` instead of `Option<&Vec>`. + +To convert `Option<&Vec>` into `Option<&[usize]>` you can use `map(|v| v.as_ref())` call, +for example: + +```diff +- let projected_schema = project_schema(&schema, projections)?; ++ let projected_schema = ++ project_schema(&schema, projections.map(|v| v.as_ref()))?; +``` + ### `SimplifyInfo` trait removed, `SimplifyContext` now uses builder-style API The `SimplifyInfo` trait has been removed and replaced with the concrete `SimplifyContext` struct. This simplifies the expression simplification API and removes the need for trait objects.