Skip to content
Open
29 changes: 24 additions & 5 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ pub struct ParquetWriterExec {
job_id: Option<String>,
/// Task attempt ID for this specific task
task_attempt_id: Option<i32>,
/// Complete staging file path from FileCommitProtocol.newTaskTempFile()
/// When set, writes directly to this path for proper 2PC support
staging_file_path: Option<String>,
/// Compression codec
compression: CompressionCodec,
/// Partition ID (from Spark TaskContext)
Expand All @@ -220,6 +223,7 @@ impl ParquetWriterExec {
work_dir: String,
job_id: Option<String>,
task_attempt_id: Option<i32>,
staging_file_path: Option<String>,
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
Expand All @@ -241,6 +245,7 @@ impl ParquetWriterExec {
work_dir,
job_id,
task_attempt_id,
staging_file_path,
compression,
partition_id,
column_names,
Expand Down Expand Up @@ -432,6 +437,7 @@ impl ExecutionPlan for ParquetWriterExec {
self.work_dir.clone(),
self.job_id.clone(),
self.task_attempt_id,
self.staging_file_path.clone(),
self.compression.clone(),
self.partition_id,
self.column_names.clone(),
Expand All @@ -458,7 +464,9 @@ impl ExecutionPlan for ParquetWriterExec {
let runtime_env = context.runtime_env();
let input = self.input.execute(partition, context)?;
let input_schema = self.input.schema();
let output_path = self.output_path.clone();
let work_dir = self.work_dir.clone();
let staging_file_path = self.staging_file_path.clone();
let task_attempt_id = self.task_attempt_id;
let compression = self.compression_to_parquet()?;
let column_names = self.column_names.clone();
Expand All @@ -474,15 +482,25 @@ impl ExecutionPlan for ParquetWriterExec {
.collect();
let output_schema = Arc::new(arrow::datatypes::Schema::new(fields));

// Generate part file name for this partition
// If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename
let part_file = if let Some(attempt_id) = task_attempt_id {
// Determine output file path:
// 1. If staging_file_path is set (proper 2PC), use it directly
// 2. If work_dir is set, use work_dir-based path construction
// 3. Otherwise use output_path directly
let base_dir = if !work_dir.is_empty() {
work_dir
} else {
output_path
};

let part_file = if let Some(ref staging_path) = staging_file_path {
staging_path.clone()
} else if let Some(attempt_id) = task_attempt_id {
format!(
"{}/part-{:05}-{:05}.parquet",
work_dir, self.partition_id, attempt_id
base_dir, self.partition_id, attempt_id
)
} else {
format!("{}/part-{:05}.parquet", work_dir, self.partition_id)
format!("{}/part-{:05}.parquet", base_dir, self.partition_id)
};

// Configure writer properties
Expand Down Expand Up @@ -812,6 +830,7 @@ mod tests {
work_dir,
None, // job_id
Some(123), // task_attempt_id
None, // staging_file_path
CompressionCodec::None,
0, // partition_id
column_names,
Expand Down
7 changes: 2 additions & 5 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,13 +1257,10 @@ impl PhysicalPlanner {
let parquet_writer = Arc::new(ParquetWriterExec::try_new(
Arc::clone(&child.native_plan),
writer.output_path.clone(),
writer
.work_dir
.as_ref()
.expect("work_dir is provided")
.clone(),
writer.work_dir.clone().unwrap_or_default(),
writer.job_id.clone(),
writer.task_attempt_id,
writer.staging_file_path.clone(),
codec,
self.partition,
writer.column_names.clone(),
Expand Down
5 changes: 4 additions & 1 deletion native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,15 @@ message ParquetWriter {
CompressionCodec compression = 2;
repeated string column_names = 4;
// Working directory for temporary files (used by FileCommitProtocol)
// If not set, files are written directly to output_path
// DEPRECATED: Use staging_file_path instead for proper 2PC support
optional string work_dir = 5;
// Job ID for tracking this write operation
optional string job_id = 6;
// Task attempt ID for this specific task
optional int32 task_attempt_id = 7;
// Complete staging file path from FileCommitProtocol.newTaskTempFile()
// When set, native writer writes directly to this path for proper 2PC
optional string staging_file_path = 8;
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
// stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec}
Expand Down Expand Up @@ -179,29 +178,13 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
other
}

// Create FileCommitProtocol for atomic writes
val jobId = java.util.UUID.randomUUID().toString
val committer =
try {
// Use Spark's SQLHadoopMapReduceCommitProtocol
val committerClass =
classOf[org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol]
val constructor =
committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean])
Some(
constructor
.newInstance(
jobId,
outputPath,
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
)
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
} catch {
case e: Exception =>
throw new SparkException(s"Could not instantiate FileCommitProtocol: ${e.getMessage}")
}

CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId)
// Note: We don't create our own FileCommitProtocol here because:
// 1. InsertIntoHadoopFsRelationCommand creates and manages its own committer
// 2. That committer is passed to FileFormatWriter which handles the commit flow
// 3. Our CometNativeWriteExec child is only used for data, not commit protocol
// The native writer writes directly to the output path, relying on Spark's
// existing commit protocol for atomicity.
CometNativeWriteExec(nativeOp, childPlan, outputPath)
}

private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = {
Expand Down
Loading