diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..d581ee86f9097 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -21,7 +21,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; -use crate::ExecutionPlanProperties; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -60,6 +59,7 @@ use crate::{ }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; +use crate::{ExecutionPlanProperties, parallel_concat}; use arrow::array::{ArrayRef, BooleanBufferBuilder}; use arrow::compute::concat_batches; @@ -98,7 +98,7 @@ pub(crate) const HASH_JOIN_SEED: SeededRandomState = const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; #[expect(clippy::too_many_arguments)] -fn try_create_array_map( +async fn try_create_array_map( bounds: &Option, schema: &SchemaRef, batches: &[RecordBatch], @@ -107,6 +107,7 @@ fn try_create_array_map( perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, + parallel_concat: bool, ) -> Result)>> { if on_left.len() != 1 { return Ok(None); @@ -172,7 +173,15 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; + let batch = if parallel_concat { + crate::parallel_concat::parallel_concat_batches( + schema, + &batches.iter().collect::>(), + ) + .await? + } else { + concat_batches(schema, batches.iter())? + }; let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; @@ -1116,6 +1125,7 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + true, )) })?, PartitionMode::Partitioned => { @@ -1137,6 +1147,7 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + false, )) } PartitionMode::Auto => { @@ -1555,6 +1566,7 @@ async fn collect_left_input( config: Arc, null_equality: NullEquality, array_map_created_count: Count, + parallel_concat: bool, ) -> Result { let schema = left_stream.schema(); @@ -1625,7 +1637,10 @@ async fn collect_left_input( config.execution.perfect_hash_join_small_build_threshold, config.execution.perfect_hash_join_min_key_density, null_equality, - )? { + parallel_concat, + ) + .await? + { array_map_created_count.add(1); metrics.build_mem_used.add(array_map.size()); @@ -1676,7 +1691,17 @@ async fn collect_left_input( } // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + let batch = if parallel_concat { + let batches_to_concat: Vec<&RecordBatch> = batches_iter.clone().collect(); + let batch = crate::parallel_concat::parallel_concat_batches( + &schema, + &batches_to_concat, + ) + .await?; + batch + } else { + concat_batches(&schema, batches_iter)? + }; let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index ec8e154caec91..55f948bf2b3ef 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -80,6 +80,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +pub mod parallel_concat; pub mod placeholder_row; pub mod projection; pub mod recursive_query; diff --git a/datafusion/physical-plan/src/parallel_concat.rs b/datafusion/physical-plan/src/parallel_concat.rs new file mode 100644 index 0000000000000..c5f8a49ddc03a --- /dev/null +++ b/datafusion/physical-plan/src/parallel_concat.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines a parallel version of `concat_batches`. + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use futures::future::try_join_all; + +/// Concatenates `RecordBatch`es by concatenating each column in parallel. +pub async fn parallel_concat_batches<'a>( + schema: &SchemaRef, + batches: &[&'a RecordBatch], +) -> Result { + if batches.is_empty() { + return Ok(RecordBatch::new_empty(Arc::clone(schema))); + } + + let num_columns = schema.fields().len(); + let mut tasks = Vec::with_capacity(num_columns); + + for i in 0..num_columns { + let column_arrays: Vec = batches + .iter() + .map(|batch| batch.column(i).clone()) + .collect(); + + let task = tokio::spawn(async move { + let arrays_to_concat: Vec<&dyn Array> = + column_arrays.iter().map(|a| a.as_ref()).collect(); + arrow::compute::concat(&arrays_to_concat) + }); + tasks.push(task); + } + + let task_outputs = try_join_all(tasks).await.map_err(|e| { + datafusion_common::DataFusionError::Execution(format!( + "Tokio join error during parallel concatenation: {e}" + )) + })?; + + let columns = task_outputs + .into_iter() + .collect::, arrow::error::ArrowError>>()?; + + RecordBatch::try_new(Arc::clone(schema), columns).map_err(Into::into) +}