diff --git a/datafusion-examples/examples/data_io/parquet_encrypted.rs b/datafusion-examples/examples/data_io/parquet_encrypted.rs index d3cc6a121f8ea..26361e9b52be0 100644 --- a/datafusion-examples/examples/data_io/parquet_encrypted.rs +++ b/datafusion-examples/examples/data_io/parquet_encrypted.rs @@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> { // Create a temporary file location for the encrypted parquet file let tmp_source = TempDir::new()?; - let tempfile = tmp_source.path().join("cars_encrypted"); + let tempfile = tmp_source.path().join("cars_encrypted.parquet"); // Write encrypted parquet let mut options = TableParquetOptions::default(); diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..a01e4c5a1f72e 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -674,6 +674,7 @@ impl TableProvider for ListingTable { insert_op, keep_partition_by_columns, file_extension: self.options().format.get_ext(), + single_file_output: None, // Use extension heuristic for table inserts }; // For writes, we only use user-specified ordering (no file groups to derive from) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 1e9f72501e4cc..96c57049fd35d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2048,11 +2048,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - HashMap::new(), + copy_options, options.partition_by, )? .build()?; @@ -2116,11 +2122,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6edf628e2d6d6..da2bddb623476 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::datasource::file_format::{ @@ -84,11 +85,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options = HashMap::::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; @@ -324,4 +331,52 @@ mod tests { Ok(()) } + + /// Test that single_file_output works for paths WITHOUT file extensions. + /// This verifies the fix for the regression where extension heuristics + /// ignored the explicit with_single_file_output(true) setting. + #[tokio::test] + async fn test_single_file_output_without_extension() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + + // Path WITHOUT .parquet extension - this is the key scenario + let output_path = tmp_dir.path().join("data_no_ext"); + let output_path_str = output_path.to_str().unwrap(); + + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?)?; + + // Explicitly request single file output + df.write_parquet( + output_path_str, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + // Verify: output should be a FILE, not a directory + assert!( + output_path.is_file(), + "Expected single file at {:?}, but got is_file={}, is_dir={}", + output_path, + output_path.is_file(), + output_path.is_dir() + ); + + // Verify the file is readable as parquet + let file = std::fs::File::open(&output_path)?; + let reader = parquet::file::reader::SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.file_metadata().num_rows(), 3); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 47ce519f01289..dd63440ff3359 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1547,6 +1547,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1638,6 +1639,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1728,6 +1730,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a382..c062166312920 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner { } }; + // Parse single_file_output option if explicitly set + let single_file_output = match source_option_tuples + .get("single_file_output") + .map(|v| v.trim()) + { + None => None, + Some("true") => Some(true), + Some("false") => Some(false), + Some(value) => { + return Err(DataFusionError::Configuration(format!( + "provided value for 'single_file_output' was not recognized: \"{value}\"" + ))); + } + }; + + // Filter out sink-related options that are not format options + let format_options: HashMap = source_option_tuples + .iter() + .filter(|(k, _)| k.as_str() != "single_file_output") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let sink_format = file_type_to_format(file_type)? - .create(session_state, source_option_tuples)?; + .create(session_state, &format_options)?; // Determine extension based on format extension and compression let file_extension = match sink_format.compression_type() { @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner { insert_op: InsertOp::Append, keep_partition_by_columns, file_extension, + single_file_output, }; let ordering = input_exec.properties().output_ordering().cloned(); diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 643831a1199f8..8c5bc560780f6 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -112,6 +112,11 @@ pub struct FileSinkConfig { pub keep_partition_by_columns: bool, /// File extension without a dot(.) pub file_extension: String, + /// Override for single file output behavior. + /// - `None`: use extension heuristic (path with extension = single file) + /// - `Some(true)`: force single file output at exact path + /// - `Some(false)`: force directory output with generated filenames + pub single_file_output: Option, } impl FileSinkConfig { diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index bec5b8b0bff0e..921c1f3b41b55 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + // Use explicit single_file_output if set, otherwise fall back to extension heuristic + let single_file_output = config.single_file_output.unwrap_or_else(|| { + !base_output_path.is_collection() + && base_output_path.file_extension().is_some() + }); SpawnedTask::spawn(async move { row_count_demuxer( tx, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 2b5e2368c1fa1..626b520188062 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -771,6 +771,11 @@ message FileSinkConfig { bool keep_partition_by_columns = 9; InsertOp insert_op = 10; string file_extension = 11; + // Optional override for single file output behavior. + // When not set, uses extension heuristic (path with extension = single file). + // When set to true, forces single file output at exact path. + // When set to false, forces directory output with generated filenames. + optional bool single_file_output = 12; } enum InsertOp { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 842dc7f6326dd..1939218cdb548 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6382,6 +6382,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { len += 1; } + if self.single_file_output.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; if !self.object_store_url.is_empty() { struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; @@ -6409,6 +6412,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { struct_ser.serialize_field("fileExtension", &self.file_extension)?; } + if let Some(v) = self.single_file_output.as_ref() { + struct_ser.serialize_field("singleFileOutput", v)?; + } struct_ser.end() } } @@ -6435,6 +6441,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "insertOp", "file_extension", "fileExtension", + "single_file_output", + "singleFileOutput", ]; #[allow(clippy::enum_variant_names)] @@ -6447,6 +6455,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { KeepPartitionByColumns, InsertOp, FileExtension, + SingleFileOutput, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6476,6 +6485,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "keepPartitionByColumns" | "keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns), "insertOp" | "insert_op" => Ok(GeneratedField::InsertOp), "fileExtension" | "file_extension" => Ok(GeneratedField::FileExtension), + "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6503,6 +6513,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut keep_partition_by_columns__ = None; let mut insert_op__ = None; let mut file_extension__ = None; + let mut single_file_output__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ObjectStoreUrl => { @@ -6553,6 +6564,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } file_extension__ = Some(map_.next_value()?); } + GeneratedField::SingleFileOutput => { + if single_file_output__.is_some() { + return Err(serde::de::Error::duplicate_field("singleFileOutput")); + } + single_file_output__ = map_.next_value()?; + } } } Ok(FileSinkConfig { @@ -6564,6 +6581,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { keep_partition_by_columns: keep_partition_by_columns__.unwrap_or_default(), insert_op: insert_op__.unwrap_or_default(), file_extension: file_extension__.unwrap_or_default(), + single_file_output: single_file_output__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 3a7b35509eaa1..d53c7327941b7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1186,6 +1186,12 @@ pub struct FileSinkConfig { pub insert_op: i32, #[prost(string, tag = "11")] pub file_extension: ::prost::alloc::string::String, + /// Optional override for single file output behavior. + /// When not set, uses extension heuristic (path with extension = single file). + /// When set to true, forces single file output at exact path. + /// When set to false, forces directory output with generated filenames. + #[prost(bool, optional, tag = "12")] + pub single_file_output: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSink { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 3cfc796700dae..d12e145f113c6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { insert_op, keep_partition_by_columns: conf.keep_partition_by_columns, file_extension: conf.file_extension.clone(), + // Read from proto; None if not present (backward compatible with old plans) + single_file_output: conf.single_file_output, }) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9558effb8a2a6..26fcf32e6eefd 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -704,6 +704,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { keep_partition_by_columns: conf.keep_partition_by_columns, insert_op: conf.insert_op as i32, file_extension: conf.file_extension.to_string(), + single_file_output: conf.single_file_output, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b54b7030fc52a..33f7ec6b881e8 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "json".into(), + single_file_output: None, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "csv".into(), + single_file_output: None, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "parquet".into(), + single_file_output: None, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 916ff4a82b2ef..90424e434694d 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -21,7 +21,37 @@ ## DataFusion `53.0.0` -**Note:** DataFusion `53.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +**Note:** DataFusion `53.0.0` has not been released yet. The information provided +*in this section pertains to features and changes that have already been merged +*to the main branch and are awaiting release in this version. See [#19692] for +\*more details. + +[#19692]: https://github.com/apache/datafusion/issues/19692 + +### `FileSinkConfig` adds `single_file_output` + +`FileSinkConfig` now includes a `single_file_output: Option` field to override the +single-file vs directory output behavior. Any code constructing `FileSinkConfig` via struct +literals must initialize this field. + +**Before:** + +```rust,ignore +FileSinkConfig { + // ... + file_extension: "parquet".into(), +} +``` + +**After:** + +```rust,ignore +FileSinkConfig { + // ... + file_extension: "parquet".into(), + single_file_output: None, +} +``` ### `SimplifyInfo` trait removed, `SimplifyContext` now uses builder-style API