Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::{any::Any, vec};

use crate::ExecutionPlanProperties;
use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
Expand Down Expand Up @@ -60,6 +59,7 @@ use crate::{
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
};
use crate::{ExecutionPlanProperties, parallel_concat};

use arrow::array::{ArrayRef, BooleanBufferBuilder};
use arrow::compute::concat_batches;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub(crate) const HASH_JOIN_SEED: SeededRandomState =
const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count";

#[expect(clippy::too_many_arguments)]
fn try_create_array_map(
async fn try_create_array_map(
bounds: &Option<PartitionBounds>,
schema: &SchemaRef,
batches: &[RecordBatch],
Expand All @@ -107,6 +107,7 @@ fn try_create_array_map(
perfect_hash_join_small_build_threshold: usize,
perfect_hash_join_min_key_density: f64,
null_equality: NullEquality,
parallel_concat: bool,
) -> Result<Option<(ArrayMap, RecordBatch, Vec<ArrayRef>)>> {
if on_left.len() != 1 {
return Ok(None);
Expand Down Expand Up @@ -172,7 +173,15 @@ fn try_create_array_map(
let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
reservation.try_grow(mem_size)?;

let batch = concat_batches(schema, batches)?;
let batch = if parallel_concat {
crate::parallel_concat::parallel_concat_batches(
schema,
&batches.iter().collect::<Vec<_>>(),
)
.await?
} else {
concat_batches(schema, batches.iter())?
};
let left_values = evaluate_expressions_to_arrays(on_left, &batch)?;

let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
Expand Down Expand Up @@ -1116,6 +1125,7 @@ impl ExecutionPlan for HashJoinExec {
Arc::clone(context.session_config().options()),
self.null_equality,
array_map_created_count,
true,
))
})?,
PartitionMode::Partitioned => {
Expand All @@ -1137,6 +1147,7 @@ impl ExecutionPlan for HashJoinExec {
Arc::clone(context.session_config().options()),
self.null_equality,
array_map_created_count,
false,
))
}
PartitionMode::Auto => {
Expand Down Expand Up @@ -1555,6 +1566,7 @@ async fn collect_left_input(
config: Arc<ConfigOptions>,
null_equality: NullEquality,
array_map_created_count: Count,
parallel_concat: bool,
) -> Result<JoinLeftData> {
let schema = left_stream.schema();

Expand Down Expand Up @@ -1625,7 +1637,10 @@ async fn collect_left_input(
config.execution.perfect_hash_join_small_build_threshold,
config.execution.perfect_hash_join_min_key_density,
null_equality,
)? {
parallel_concat,
)
.await?
{
array_map_created_count.add(1);
metrics.build_mem_used.add(array_map.size());

Expand Down Expand Up @@ -1676,7 +1691,17 @@ async fn collect_left_input(
}

// Merge all batches into a single batch, so we can directly index into the arrays
let batch = concat_batches(&schema, batches_iter.clone())?;
let batch = if parallel_concat {
let batches_to_concat: Vec<&RecordBatch> = batches_iter.clone().collect();
let batch = crate::parallel_concat::parallel_concat_batches(
&schema,
&batches_to_concat,
)
.await?;
batch
} else {
concat_batches(&schema, batches_iter)?
};

let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
pub mod parallel_concat;
pub mod placeholder_row;
pub mod projection;
pub mod recursive_query;
Expand Down
65 changes: 65 additions & 0 deletions datafusion/physical-plan/src/parallel_concat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines a parallel version of `concat_batches`.

use std::sync::Arc;

use arrow::array::{Array, ArrayRef};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use futures::future::try_join_all;

/// Concatenates `RecordBatch`es by concatenating each column in parallel.
pub async fn parallel_concat_batches<'a>(
schema: &SchemaRef,
batches: &[&'a RecordBatch],
) -> Result<RecordBatch> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(Arc::clone(schema)));
}

let num_columns = schema.fields().len();
let mut tasks = Vec::with_capacity(num_columns);

for i in 0..num_columns {
let column_arrays: Vec<ArrayRef> = batches
.iter()
.map(|batch| batch.column(i).clone())
.collect();

let task = tokio::spawn(async move {
let arrays_to_concat: Vec<&dyn Array> =
column_arrays.iter().map(|a| a.as_ref()).collect();
arrow::compute::concat(&arrays_to_concat)
});
tasks.push(task);
}

let task_outputs = try_join_all(tasks).await.map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"Tokio join error during parallel concatenation: {e}"
))
})?;

let columns = task_outputs
.into_iter()
.collect::<std::result::Result<Vec<_>, arrow::error::ArrowError>>()?;

RecordBatch::try_new(Arc::clone(schema), columns).map_err(Into::into)
}
Loading