-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Wrap immutable plan parts into Arc #19893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,7 +125,7 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) { | |
| /// indices. | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct ProjectionExprs { | ||
| exprs: Vec<ProjectionExpr>, | ||
| exprs: Arc<[ProjectionExpr]>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is worth commenting here the rationale for using Arc<[...]>, namely that it makes this structure inexpensive to copy as happens during PhysicalPlanning |
||
| } | ||
|
|
||
| impl std::fmt::Display for ProjectionExprs { | ||
|
|
@@ -137,22 +137,24 @@ impl std::fmt::Display for ProjectionExprs { | |
|
|
||
| impl From<Vec<ProjectionExpr>> for ProjectionExprs { | ||
| fn from(value: Vec<ProjectionExpr>) -> Self { | ||
| Self { exprs: value } | ||
| Self { | ||
| exprs: value.into(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl From<&[ProjectionExpr]> for ProjectionExprs { | ||
| fn from(value: &[ProjectionExpr]) -> Self { | ||
| Self { | ||
| exprs: value.to_vec(), | ||
| exprs: value.iter().cloned().collect(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl FromIterator<ProjectionExpr> for ProjectionExprs { | ||
| fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self { | ||
| Self { | ||
| exprs: exprs.into_iter().collect::<Vec<_>>(), | ||
| exprs: exprs.into_iter().collect(), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -164,12 +166,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { | |
| } | ||
|
|
||
| impl ProjectionExprs { | ||
| pub fn new<I>(exprs: I) -> Self | ||
| where | ||
| I: IntoIterator<Item = ProjectionExpr>, | ||
| { | ||
| /// Make a new [`ProjectionExprs`] from expressions iterator. | ||
| pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self { | ||
| Self { | ||
| exprs: exprs.into_iter().collect(), | ||
| } | ||
| } | ||
|
|
||
| /// Make a new [`ProjectionExprs`] from expressions. | ||
| pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self { | ||
| Self { | ||
| exprs: exprs.into_iter().collect::<Vec<_>>(), | ||
| exprs: exprs.into(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -285,13 +292,14 @@ impl ProjectionExprs { | |
| { | ||
| let exprs = self | ||
| .exprs | ||
| .into_iter() | ||
| .iter() | ||
| .cloned() | ||
| .map(|mut proj| { | ||
| proj.expr = f(proj.expr)?; | ||
| Ok(proj) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| Ok(Self::new(exprs)) | ||
| .collect::<Result<Arc<_>>>()?; | ||
| Ok(Self::from_expressions(exprs)) | ||
| } | ||
|
|
||
| /// Apply another projection on top of this projection, returning the combined projection. | ||
|
|
@@ -361,7 +369,7 @@ impl ProjectionExprs { | |
| /// applied on top of this projection. | ||
| pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> { | ||
| let mut new_exprs = Vec::with_capacity(other.exprs.len()); | ||
| for proj_expr in &other.exprs { | ||
| for proj_expr in other.exprs.iter() { | ||
| let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? | ||
| .ok_or_else(|| { | ||
| internal_datafusion_err!( | ||
|
|
@@ -607,7 +615,7 @@ impl ProjectionExprs { | |
| ) -> Result<datafusion_common::Statistics> { | ||
| let mut column_statistics = vec![]; | ||
|
|
||
| for proj_expr in &self.exprs { | ||
| for proj_expr in self.exprs.iter() { | ||
| let expr = &proj_expr.expr; | ||
| let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() { | ||
| std::mem::take(&mut stats.column_statistics[col.index()]) | ||
|
|
@@ -754,15 +762,6 @@ impl Projector { | |
| } | ||
| } | ||
|
|
||
| impl IntoIterator for ProjectionExprs { | ||
| type Item = ProjectionExpr; | ||
| type IntoIter = std::vec::IntoIter<ProjectionExpr>; | ||
|
|
||
| fn into_iter(self) -> Self::IntoIter { | ||
| self.exprs.into_iter() | ||
| } | ||
| } | ||
|
|
||
| /// The function operates in two modes: | ||
| /// | ||
| /// 1) When `sync_with_child` is `true`: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -544,11 +544,11 @@ pub struct AggregateExec { | |
| /// Aggregation mode (full, partial) | ||
| mode: AggregateMode, | ||
| /// Group by expressions | ||
| group_by: PhysicalGroupBy, | ||
| group_by: Arc<PhysicalGroupBy>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. likewise here I think it would be good to note they are Arc to make clone/plan rewriting faster |
||
| /// Aggregate expressions | ||
| aggr_expr: Vec<Arc<AggregateFunctionExpr>>, | ||
| aggr_expr: Arc<[Arc<AggregateFunctionExpr>]>, | ||
| /// FILTER (WHERE clause) expression for each aggregate expression | ||
| filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, | ||
| filter_expr: Arc<[Option<Arc<dyn PhysicalExpr>>]>, | ||
| /// Configuration for limit-based optimizations | ||
| limit_options: Option<LimitOptions>, | ||
| /// Input plan, could be a partial aggregate or the input to the aggregate | ||
|
|
@@ -582,18 +582,18 @@ impl AggregateExec { | |
| /// Rewrites aggregate exec with new aggregate expressions. | ||
| pub fn with_new_aggr_exprs( | ||
| &self, | ||
| aggr_expr: Vec<Arc<AggregateFunctionExpr>>, | ||
| aggr_expr: impl Into<Arc<[Arc<AggregateFunctionExpr>]>>, | ||
| ) -> Self { | ||
| Self { | ||
| aggr_expr, | ||
| aggr_expr: aggr_expr.into(), | ||
| // clone the rest of the fields | ||
| required_input_ordering: self.required_input_ordering.clone(), | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| input_order_mode: self.input_order_mode.clone(), | ||
| cache: self.cache.clone(), | ||
| mode: self.mode, | ||
| group_by: self.group_by.clone(), | ||
| filter_expr: self.filter_expr.clone(), | ||
| group_by: Arc::clone(&self.group_by), | ||
| filter_expr: Arc::clone(&self.filter_expr), | ||
| limit_options: self.limit_options, | ||
| input: Arc::clone(&self.input), | ||
| schema: Arc::clone(&self.schema), | ||
|
|
@@ -612,9 +612,9 @@ impl AggregateExec { | |
| input_order_mode: self.input_order_mode.clone(), | ||
| cache: self.cache.clone(), | ||
| mode: self.mode, | ||
| group_by: self.group_by.clone(), | ||
| aggr_expr: self.aggr_expr.clone(), | ||
| filter_expr: self.filter_expr.clone(), | ||
| group_by: Arc::clone(&self.group_by), | ||
| aggr_expr: Arc::clone(&self.aggr_expr), | ||
| filter_expr: Arc::clone(&self.filter_expr), | ||
| input: Arc::clone(&self.input), | ||
| schema: Arc::clone(&self.schema), | ||
| input_schema: Arc::clone(&self.input_schema), | ||
|
|
@@ -629,12 +629,13 @@ impl AggregateExec { | |
| /// Create a new hash aggregate execution plan | ||
| pub fn try_new( | ||
| mode: AggregateMode, | ||
| group_by: PhysicalGroupBy, | ||
| group_by: impl Into<Arc<PhysicalGroupBy>>, | ||
| aggr_expr: Vec<Arc<AggregateFunctionExpr>>, | ||
| filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, | ||
| input: Arc<dyn ExecutionPlan>, | ||
| input_schema: SchemaRef, | ||
| ) -> Result<Self> { | ||
| let group_by = group_by.into(); | ||
| let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; | ||
|
|
||
| let schema = Arc::new(schema); | ||
|
|
@@ -659,13 +660,16 @@ impl AggregateExec { | |
| /// the schema in such cases. | ||
| fn try_new_with_schema( | ||
| mode: AggregateMode, | ||
| group_by: PhysicalGroupBy, | ||
| group_by: impl Into<Arc<PhysicalGroupBy>>, | ||
| mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>, | ||
| filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, | ||
| filter_expr: impl Into<Arc<[Option<Arc<dyn PhysicalExpr>>]>>, | ||
| input: Arc<dyn ExecutionPlan>, | ||
| input_schema: SchemaRef, | ||
| schema: SchemaRef, | ||
| ) -> Result<Self> { | ||
| let group_by = group_by.into(); | ||
| let filter_expr = filter_expr.into(); | ||
|
|
||
| // Make sure arguments are consistent in size | ||
| assert_eq_or_internal_err!( | ||
| aggr_expr.len(), | ||
|
|
@@ -732,13 +736,13 @@ impl AggregateExec { | |
| &group_expr_mapping, | ||
| &mode, | ||
| &input_order_mode, | ||
| aggr_expr.as_slice(), | ||
| aggr_expr.as_ref(), | ||
| )?; | ||
|
|
||
| let mut exec = AggregateExec { | ||
| mode, | ||
| group_by, | ||
| aggr_expr, | ||
| aggr_expr: aggr_expr.into(), | ||
| filter_expr, | ||
| input, | ||
| schema, | ||
|
|
@@ -1287,9 +1291,9 @@ impl ExecutionPlan for AggregateExec { | |
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let mut me = AggregateExec::try_new_with_schema( | ||
| self.mode, | ||
| self.group_by.clone(), | ||
| self.aggr_expr.clone(), | ||
| self.filter_expr.clone(), | ||
| Arc::clone(&self.group_by), | ||
| self.aggr_expr.to_vec(), | ||
| Arc::clone(&self.filter_expr), | ||
| Arc::clone(&children[0]), | ||
| Arc::clone(&self.input_schema), | ||
| Arc::clone(&self.schema), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per @crepererum 's suggestion, I tried out what it would look like and came up with
It does seem to be reasonable
The original signature is
Option<&Vec<..>>I think to align with TableProvider::scan (which also shouldn't have a owned Vec, but that I think is a historic accident)