From ab870580248d8d6ab885f9aa56296b4ec6682f50 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 21 Jan 2026 23:12:09 +0530 Subject: [PATCH 1/4] fix: respect DataFrameWriteOptions::with_single_file_output for paths without extensions --- datafusion/catalog-listing/src/table.rs | 1 + datafusion/core/src/dataframe/mod.rs | 16 +++++- datafusion/core/src/dataframe/parquet.rs | 57 ++++++++++++++++++- .../src/datasource/file_format/parquet.rs | 3 + datafusion/core/src/physical_planner.rs | 25 +++++++- datafusion/datasource/src/file_sink_config.rs | 5 ++ datafusion/datasource/src/write/demux.rs | 7 ++- .../proto/src/physical_plan/from_proto.rs | 2 + .../tests/cases/roundtrip_physical_plan.rs | 3 + 9 files changed, 113 insertions(+), 6 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..a01e4c5a1f72e 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -674,6 +674,7 @@ impl TableProvider for ListingTable { insert_op, keep_partition_by_columns, file_extension: self.options().format.get_ext(), + single_file_output: None, // Use extension heuristic for table inserts }; // For writes, we only use user-specified ordering (no file groups to derive from) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 1e9f72501e4cc..96c57049fd35d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2048,11 +2048,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - HashMap::new(), + copy_options, options.partition_by, )? .build()?; @@ -2116,11 +2122,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6edf628e2d6d6..da2bddb623476 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::datasource::file_format::{ @@ -84,11 +85,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options = HashMap::::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; @@ -324,4 +331,52 @@ mod tests { Ok(()) } + + /// Test that single_file_output works for paths WITHOUT file extensions. + /// This verifies the fix for the regression where extension heuristics + /// ignored the explicit with_single_file_output(true) setting. + #[tokio::test] + async fn test_single_file_output_without_extension() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + + // Path WITHOUT .parquet extension - this is the key scenario + let output_path = tmp_dir.path().join("data_no_ext"); + let output_path_str = output_path.to_str().unwrap(); + + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?)?; + + // Explicitly request single file output + df.write_parquet( + output_path_str, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + // Verify: output should be a FILE, not a directory + assert!( + output_path.is_file(), + "Expected single file at {:?}, but got is_file={}, is_dir={}", + output_path, + output_path.is_file(), + output_path.is_dir() + ); + + // Verify the file is readable as parquet + let file = std::fs::File::open(&output_path)?; + let reader = parquet::file::reader::SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.file_metadata().num_rows(), 3); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 47ce519f01289..dd63440ff3359 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1547,6 +1547,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1638,6 +1639,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1728,6 +1730,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a382..c062166312920 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner { } }; + // Parse single_file_output option if explicitly set + let single_file_output = match source_option_tuples + .get("single_file_output") + .map(|v| v.trim()) + { + None => None, + Some("true") => Some(true), + Some("false") => Some(false), + Some(value) => { + return Err(DataFusionError::Configuration(format!( + "provided value for 'single_file_output' was not recognized: \"{value}\"" + ))); + } + }; + + // Filter out sink-related options that are not format options + let format_options: HashMap = source_option_tuples + .iter() + .filter(|(k, _)| k.as_str() != "single_file_output") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let sink_format = file_type_to_format(file_type)? - .create(session_state, source_option_tuples)?; + .create(session_state, &format_options)?; // Determine extension based on format extension and compression let file_extension = match sink_format.compression_type() { @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner { insert_op: InsertOp::Append, keep_partition_by_columns, file_extension, + single_file_output, }; let ordering = input_exec.properties().output_ordering().cloned(); diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 643831a1199f8..8c5bc560780f6 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -112,6 +112,11 @@ pub struct FileSinkConfig { pub keep_partition_by_columns: bool, /// File extension without a dot(.) pub file_extension: String, + /// Override for single file output behavior. + /// - `None`: use extension heuristic (path with extension = single file) + /// - `Some(true)`: force single file output at exact path + /// - `Some(false)`: force directory output with generated filenames + pub single_file_output: Option, } impl FileSinkConfig { diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index bec5b8b0bff0e..921c1f3b41b55 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + // Use explicit single_file_output if set, otherwise fall back to extension heuristic + let single_file_output = config.single_file_output.unwrap_or_else(|| { + !base_output_path.is_collection() + && base_output_path.file_extension().is_some() + }); SpawnedTask::spawn(async move { row_count_demuxer( tx, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 3cfc796700dae..12b683cb15244 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { insert_op, keep_partition_by_columns: conf.keep_partition_by_columns, file_extension: conf.file_extension.clone(), + // For deserialized plans, use extension heuristic (backward compatible) + single_file_output: None, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b54b7030fc52a..33f7ec6b881e8 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "json".into(), + single_file_output: None, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "csv".into(), + single_file_output: None, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "parquet".into(), + single_file_output: None, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config, From d4e67d320f26be4ad9b9e11bc51d6c1d8facd8da Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 22 Jan 2026 12:12:48 +0530 Subject: [PATCH 2/4] fix parquet file load --- datafusion-examples/examples/data_io/parquet_encrypted.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/data_io/parquet_encrypted.rs b/datafusion-examples/examples/data_io/parquet_encrypted.rs index d3cc6a121f8ea..26361e9b52be0 100644 --- a/datafusion-examples/examples/data_io/parquet_encrypted.rs +++ b/datafusion-examples/examples/data_io/parquet_encrypted.rs @@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> { // Create a temporary file location for the encrypted parquet file let tmp_source = TempDir::new()?; - let tempfile = tmp_source.path().join("cars_encrypted"); + let tempfile = tmp_source.path().join("cars_encrypted.parquet"); // Write encrypted parquet let mut options = TableParquetOptions::default(); From e53f51f9fd6c37446f7957656556212aaaab02ca Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 23 Jan 2026 09:26:31 +0530 Subject: [PATCH 3/4] use proto serialization --- datafusion/proto/proto/datafusion.proto | 5 +++++ datafusion/proto/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 6 ++++++ .../proto/src/physical_plan/from_proto.rs | 4 ++-- datafusion/proto/src/physical_plan/to_proto.rs | 1 + 5 files changed, 32 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 2b5e2368c1fa1..626b520188062 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -771,6 +771,11 @@ message FileSinkConfig { bool keep_partition_by_columns = 9; InsertOp insert_op = 10; string file_extension = 11; + // Optional override for single file output behavior. + // When not set, uses extension heuristic (path with extension = single file). + // When set to true, forces single file output at exact path. + // When set to false, forces directory output with generated filenames. + optional bool single_file_output = 12; } enum InsertOp { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 842dc7f6326dd..1939218cdb548 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6382,6 +6382,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { len += 1; } + if self.single_file_output.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; if !self.object_store_url.is_empty() { struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; @@ -6409,6 +6412,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { struct_ser.serialize_field("fileExtension", &self.file_extension)?; } + if let Some(v) = self.single_file_output.as_ref() { + struct_ser.serialize_field("singleFileOutput", v)?; + } struct_ser.end() } } @@ -6435,6 +6441,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "insertOp", "file_extension", "fileExtension", + "single_file_output", + "singleFileOutput", ]; #[allow(clippy::enum_variant_names)] @@ -6447,6 +6455,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { KeepPartitionByColumns, InsertOp, FileExtension, + SingleFileOutput, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6476,6 +6485,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "keepPartitionByColumns" | "keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns), "insertOp" | "insert_op" => Ok(GeneratedField::InsertOp), "fileExtension" | "file_extension" => Ok(GeneratedField::FileExtension), + "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6503,6 +6513,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut keep_partition_by_columns__ = None; let mut insert_op__ = None; let mut file_extension__ = None; + let mut single_file_output__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ObjectStoreUrl => { @@ -6553,6 +6564,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } file_extension__ = Some(map_.next_value()?); } + GeneratedField::SingleFileOutput => { + if single_file_output__.is_some() { + return Err(serde::de::Error::duplicate_field("singleFileOutput")); + } + single_file_output__ = map_.next_value()?; + } } } Ok(FileSinkConfig { @@ -6564,6 +6581,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { keep_partition_by_columns: keep_partition_by_columns__.unwrap_or_default(), insert_op: insert_op__.unwrap_or_default(), file_extension: file_extension__.unwrap_or_default(), + single_file_output: single_file_output__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 3a7b35509eaa1..d53c7327941b7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1186,6 +1186,12 @@ pub struct FileSinkConfig { pub insert_op: i32, #[prost(string, tag = "11")] pub file_extension: ::prost::alloc::string::String, + /// Optional override for single file output behavior. + /// When not set, uses extension heuristic (path with extension = single file). + /// When set to true, forces single file output at exact path. + /// When set to false, forces directory output with generated filenames. + #[prost(bool, optional, tag = "12")] + pub single_file_output: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSink { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 12b683cb15244..d12e145f113c6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -737,8 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { insert_op, keep_partition_by_columns: conf.keep_partition_by_columns, file_extension: conf.file_extension.clone(), - // For deserialized plans, use extension heuristic (backward compatible) - single_file_output: None, + // Read from proto; None if not present (backward compatible with old plans) + single_file_output: conf.single_file_output, }) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9558effb8a2a6..26fcf32e6eefd 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -704,6 +704,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { keep_partition_by_columns: conf.keep_partition_by_columns, insert_op: conf.insert_op as i32, file_extension: conf.file_extension.to_string(), + single_file_output: conf.single_file_output, }) } } From bb712558dc52e0f25005997550f1b851fe950685 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Jan 2026 13:56:52 -0500 Subject: [PATCH 4/4] Add upgrade guide note --- docs/source/library-user-guide/upgrading.md | 32 ++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 916ff4a82b2ef..90424e434694d 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -21,7 +21,37 @@ ## DataFusion `53.0.0` -**Note:** DataFusion `53.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +**Note:** DataFusion `53.0.0` has not been released yet. The information provided +*in this section pertains to features and changes that have already been merged +*to the main branch and are awaiting release in this version. See [#19692] for +\*more details. + +[#19692]: https://github.com/apache/datafusion/issues/19692 + +### `FileSinkConfig` adds `single_file_output` + +`FileSinkConfig` now includes a `single_file_output: Option` field to override the +single-file vs directory output behavior. Any code constructing `FileSinkConfig` via struct +literals must initialize this field. + +**Before:** + +```rust,ignore +FileSinkConfig { + // ... + file_extension: "parquet".into(), +} +``` + +**After:** + +```rust,ignore +FileSinkConfig { + // ... + file_extension: "parquet".into(), + single_file_output: None, +} +``` ### `SimplifyInfo` trait removed, `SimplifyContext` now uses builder-style API