From e2a2861601a28936b1f3e4224d74736817d51da0 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 13:01:49 +0400 Subject: [PATCH 01/10] feat: Add with_spark_features to SessionStateBuilder --- Cargo.lock | 2 +- datafusion/core/Cargo.toml | 2 + .../core/src/execution/session_state.rs | 37 +++++++++++++++++++ datafusion/core/src/lib.rs | 7 ++++ datafusion/spark/src/lib.rs | 28 ++++++++++++++ datafusion/sqllogictest/Cargo.toml | 3 +- datafusion/sqllogictest/src/test_context.rs | 15 ++------ 7 files changed, 80 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e9337b50e6f2..8d3c5b1bea4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1749,6 +1749,7 @@ dependencies = [ "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-session", + "datafusion-spark", "datafusion-sql", "doc-comment", "env_logger", @@ -2618,7 +2619,6 @@ dependencies = [ "chrono", "clap", "datafusion", - "datafusion-spark", "datafusion-substrait", "env_logger", "futures", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5c7e944e59f7b..0b8fbc05861ea 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -97,6 +97,7 @@ serde = [ # statements in `arrow-schema` crate "arrow-schema/serde", ] +spark = ["datafusion-spark"] sql = [ "datafusion-common/sql", "datafusion-functions-nested?/sql", @@ -142,6 +143,7 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } +datafusion-spark = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } futures = { workspace = true } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 7cdbc77ae90c3..94dfab1ae7462 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -30,6 +30,8 @@ use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +#[cfg(feature = "spark")] +use crate::spark; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; use datafusion_catalog::information_schema::{ @@ -1139,6 +1141,41 @@ impl SessionStateBuilder { self } + /// Adds all expr_planners, scalar, aggregate, window and table functions + /// compatible with Apache Spark. + /// + /// Note overwrites any previously registered items with the same name. + #[cfg(feature = "spark")] + pub fn with_spark_features(mut self) -> Self { + self.expr_planners + .get_or_insert_with(Vec::new) + // planners are evaluated in order of insertion. Push Apache Spark function planner to the front + // to take precedence over others + .insert(0, Arc::new(spark::planner::SparkFunctionPlanner)); + + self.scalar_functions + .get_or_insert_with(Vec::new) + .extend(spark::all_default_scalar_functions()); + + self.aggregate_functions + .get_or_insert_with(Vec::new) + .extend(spark::all_default_aggregate_functions()); + + self.window_functions + .get_or_insert_with(Vec::new) + .extend(spark::all_default_window_functions()); + + self.table_functions + .get_or_insert_with(HashMap::new) + .extend( + spark::all_default_table_functions() + .into_iter() + .map(|f| (f.name().to_string(), f)), + ); + + self + } + /// Returns a new [`SessionStateBuilder`] with default features. /// /// This is equivalent to calling [`Self::new()`] followed by [`Self::with_default_features()`]. diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index e83934a8e281d..9e813e4732654 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -726,6 +726,7 @@ //! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions //! * [datafusion_physical_plan]: [`ExecutionPlan`] and related expressions //! * [datafusion_physical_optimizer]: [`ExecutionPlan`] and related expressions +//! * [datafusion_spark]: Apache Spark compatible functions //! * [datafusion_sql]: SQL planner ([`SqlToRel`]) //! //! [`SchemaProvider`]: datafusion_catalog::SchemaProvider @@ -874,6 +875,12 @@ pub mod functions_nested { pub use datafusion_functions_nested::*; } +/// re-export of [`datafusion_spark`] crate, if "spark" feature is enabled +pub mod spark { + #[cfg(feature = "spark")] + pub use datafusion_spark::*; +} + /// re-export of [`datafusion_functions_aggregate`] crate pub mod functions_aggregate { pub use datafusion_functions_aggregate::*; diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index f67367734cf93..3b28a66716043 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -93,6 +93,34 @@ //! ``` //! //![`Expr`]: datafusion_expr::Expr +//! +//! //! # Example: enabling Apache Spark features with SessionStateBuilder +//! +//! The recommended way to enable Apache Spark compatibility is to use the +//! [`with_spark_features`] method on [`SessionStateBuilder`]. This registers all +//! Apache Spark functions (scalar, aggregate, window, and table) as well as the Apache Spark +//! expression planner. +//! +//! Note: This requires the `spark` feature to be enabled in the `datafusion` crate +//! +//! ``` +//! use datafusion::execution::session_state::SessionStateBuilder; +//! use datafusion::prelude::SessionContext; +//! +//! // Create a SessionState with Apache Spark features enabled +//! // note: the order matters here, `with_spark_features` should be +//! // called after `with_default_features` to overwrite any existing functions +//! let state = SessionStateBuilder::new() +//! .with_default_features() +//! .with_spark_features() +//! .build(); +//! +//! // Create a SessionContext using this state +//! let ctx = SessionContext::new_with_state(state); +//! ``` +//! +//! [`with_spark_features`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html#method.with_spark_features +//! [`SessionStateBuilder`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html pub mod function; pub mod planner; diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 13ae6e6a57e01..4c8860ab40270 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -46,8 +46,7 @@ bigdecimal = { workspace = true } bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.53", features = ["derive", "env"] } -datafusion = { workspace = true, default-features = true, features = ["avro"] } -datafusion-spark = { workspace = true, default-features = true } +datafusion = { workspace = true, default-features = true, features = ["avro", "spark"] } datafusion-substrait = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 19ec3e7613942..06c63320edaaf 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -84,21 +84,14 @@ impl TestContext { let mut state_builder = SessionStateBuilder::new() .with_config(config) - .with_runtime_env(runtime); + .with_runtime_env(runtime) + .with_default_features(); if is_spark_path(relative_path) { - state_builder = state_builder.with_expr_planners(vec![Arc::new( - datafusion_spark::planner::SparkFunctionPlanner, - )]); + state_builder = state_builder.with_spark_features(); } - let mut state = state_builder.with_default_features().build(); - - if is_spark_path(relative_path) { - info!("Registering Spark functions"); - datafusion_spark::register_all(&mut state) - .expect("Can not register Spark functions"); - } + let state = state_builder.build(); let mut test_ctx = TestContext::new(SessionContext::new_with_state(state)); From c45f8fa2797f347e96bf021ebe6ca1172132432e Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 13:22:01 +0400 Subject: [PATCH 02/10] test: Add unit test for SessionState with Spark features --- .../core/src/execution/session_state.rs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 94dfab1ae7462..85c909f594059 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -2542,4 +2542,25 @@ mod tests { self.state.window_functions().keys().cloned().collect() } } + + #[test] + #[cfg(feature = "spark")] + fn test_session_state_with_spark_features() { + let state = SessionStateBuilder::new().with_spark_features().build(); + + assert!( + state.scalar_functions().contains_key("sha2"), + "Apache Spark scalar function 'sha2' should be registered" + ); + + assert!( + state.aggregate_functions().contains_key("try_sum"), + "Apache Spark aggregate function 'try_sum' should be registered" + ); + + assert!( + !state.expr_planners().is_empty(), + "Apache Spark expr planners should be registered" + ); + } } From be3cc4afc70dab31b28c7b656a91ead5f3a19097 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 13:22:09 +0400 Subject: [PATCH 03/10] feat: Add Spark feature checks to CI workflow --- .github/workflows/rust.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6235679d5a05d..898627a9b4778 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -217,6 +217,8 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=serde - name: Check datafusion (sql) run: cargo check --profile ci --no-default-features -p datafusion --features=sql + - name: Check datafusion (spark) + run: cargo check --profile ci --no-default-features -p datafusion --features=spark - name: Check datafusion (string_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=string_expressions - name: Check datafusion (unicode_expressions) @@ -299,7 +301,7 @@ jobs: --lib \ --tests \ --bins \ - --features serde,avro,json,backtrace,integration-tests,parquet_encryption + --features serde,avro,json,backtrace,integration-tests,parquet_encryption,spark - name: Verify Working Directory Clean run: git diff --exit-code # Check no temporary directories created during test. @@ -557,7 +559,7 @@ jobs: uses: ./.github/actions/setup-macos-aarch64-builder - name: Run tests (excluding doctests) shell: bash - run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests + run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests,spark vendor: name: Verify Vendored Code @@ -778,4 +780,4 @@ jobs: run: cargo msrv --output-format json --log-target stdout verify - name: Check datafusion-proto working-directory: datafusion/proto - run: cargo msrv --output-format json --log-target stdout verify \ No newline at end of file + run: cargo msrv --output-format json --log-target stdout verify From 168747a7423928bde46418fe2a6745c879ad84d4 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 19:26:34 +0400 Subject: [PATCH 04/10] feat: Refactor Spark feature integration and update dependencies --- .github/workflows/rust.yml | 8 +- Cargo.lock | 3 +- datafusion/core/Cargo.toml | 2 - .../core/src/execution/session_state.rs | 58 --------- datafusion/core/src/lib.rs | 7 -- datafusion/spark/Cargo.toml | 8 ++ datafusion/spark/src/lib.rs | 28 +++-- datafusion/spark/src/session_state.rs | 116 ++++++++++++++++++ datafusion/sqllogictest/Cargo.toml | 3 +- datafusion/sqllogictest/src/test_context.rs | 1 + 10 files changed, 150 insertions(+), 84 deletions(-) create mode 100644 datafusion/spark/src/session_state.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 898627a9b4778..94a946d869fcf 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -217,8 +217,8 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=serde - name: Check datafusion (sql) run: cargo check --profile ci --no-default-features -p datafusion --features=sql - - name: Check datafusion (spark) - run: cargo check --profile ci --no-default-features -p datafusion --features=spark + - name: Check datafusion-spark (core) + run: cargo check --profile ci -p datafusion-spark --features=core - name: Check datafusion (string_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=string_expressions - name: Check datafusion (unicode_expressions) @@ -301,7 +301,7 @@ jobs: --lib \ --tests \ --bins \ - --features serde,avro,json,backtrace,integration-tests,parquet_encryption,spark + --features serde,avro,json,backtrace,integration-tests,parquet_encryption - name: Verify Working Directory Clean run: git diff --exit-code # Check no temporary directories created during test. @@ -559,7 +559,7 @@ jobs: uses: ./.github/actions/setup-macos-aarch64-builder - name: Run tests (excluding doctests) shell: bash - run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests,spark + run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests vendor: name: Verify Vendored Code diff --git a/Cargo.lock b/Cargo.lock index 8d3c5b1bea4f2..8d9a4634231e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1749,7 +1749,6 @@ dependencies = [ "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-session", - "datafusion-spark", "datafusion-sql", "doc-comment", "env_logger", @@ -2568,6 +2567,7 @@ dependencies = [ "chrono", "crc32fast", "criterion", + "datafusion", "datafusion-catalog", "datafusion-common", "datafusion-execution", @@ -2619,6 +2619,7 @@ dependencies = [ "chrono", "clap", "datafusion", + "datafusion-spark", "datafusion-substrait", "env_logger", "futures", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 0b8fbc05861ea..5c7e944e59f7b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -97,7 +97,6 @@ serde = [ # statements in `arrow-schema` crate "arrow-schema/serde", ] -spark = ["datafusion-spark"] sql = [ "datafusion-common/sql", "datafusion-functions-nested?/sql", @@ -143,7 +142,6 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } -datafusion-spark = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } futures = { workspace = true } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 85c909f594059..7cdbc77ae90c3 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -30,8 +30,6 @@ use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -#[cfg(feature = "spark")] -use crate::spark; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; use datafusion_catalog::information_schema::{ @@ -1141,41 +1139,6 @@ impl SessionStateBuilder { self } - /// Adds all expr_planners, scalar, aggregate, window and table functions - /// compatible with Apache Spark. - /// - /// Note overwrites any previously registered items with the same name. - #[cfg(feature = "spark")] - pub fn with_spark_features(mut self) -> Self { - self.expr_planners - .get_or_insert_with(Vec::new) - // planners are evaluated in order of insertion. Push Apache Spark function planner to the front - // to take precedence over others - .insert(0, Arc::new(spark::planner::SparkFunctionPlanner)); - - self.scalar_functions - .get_or_insert_with(Vec::new) - .extend(spark::all_default_scalar_functions()); - - self.aggregate_functions - .get_or_insert_with(Vec::new) - .extend(spark::all_default_aggregate_functions()); - - self.window_functions - .get_or_insert_with(Vec::new) - .extend(spark::all_default_window_functions()); - - self.table_functions - .get_or_insert_with(HashMap::new) - .extend( - spark::all_default_table_functions() - .into_iter() - .map(|f| (f.name().to_string(), f)), - ); - - self - } - /// Returns a new [`SessionStateBuilder`] with default features. /// /// This is equivalent to calling [`Self::new()`] followed by [`Self::with_default_features()`]. @@ -2542,25 +2505,4 @@ mod tests { self.state.window_functions().keys().cloned().collect() } } - - #[test] - #[cfg(feature = "spark")] - fn test_session_state_with_spark_features() { - let state = SessionStateBuilder::new().with_spark_features().build(); - - assert!( - state.scalar_functions().contains_key("sha2"), - "Apache Spark scalar function 'sha2' should be registered" - ); - - assert!( - state.aggregate_functions().contains_key("try_sum"), - "Apache Spark aggregate function 'try_sum' should be registered" - ); - - assert!( - !state.expr_planners().is_empty(), - "Apache Spark expr planners should be registered" - ); - } } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 9e813e4732654..e83934a8e281d 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -726,7 +726,6 @@ //! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions //! * [datafusion_physical_plan]: [`ExecutionPlan`] and related expressions //! * [datafusion_physical_optimizer]: [`ExecutionPlan`] and related expressions -//! * [datafusion_spark]: Apache Spark compatible functions //! * [datafusion_sql]: SQL planner ([`SqlToRel`]) //! //! [`SchemaProvider`]: datafusion_catalog::SchemaProvider @@ -875,12 +874,6 @@ pub mod functions_nested { pub use datafusion_functions_nested::*; } -/// re-export of [`datafusion_spark`] crate, if "spark" feature is enabled -pub mod spark { - #[cfg(feature = "spark")] - pub use datafusion_spark::*; -} - /// re-export of [`datafusion_functions_aggregate`] crate pub mod functions_aggregate { pub use datafusion_functions_aggregate::*; diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index ad2620a532f24..dcb586ee809c2 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -29,6 +29,10 @@ edition = { workspace = true } [package.metadata.docs.rs] all-features = true +[features] +default = [] +core = ["datafusion"] + # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet # https://github.com/rust-lang/cargo/issues/13157 @@ -43,6 +47,8 @@ arrow = { workspace = true } bigdecimal = { workspace = true } chrono = { workspace = true } crc32fast = "1.4" +# Optional dependency for SessionStateBuilderSpark extension trait +datafusion = { workspace = true, optional = true, default-features = false } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } @@ -59,6 +65,8 @@ url = { workspace = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } +# for SessionStateBuilderSpark tests +datafusion = { workspace = true, default-features = false } [[bench]] harness = false diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index 3b28a66716043..f9c130cc92e76 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -94,18 +94,22 @@ //! //![`Expr`]: datafusion_expr::Expr //! -//! //! # Example: enabling Apache Spark features with SessionStateBuilder +//! # Example: enabling Apache Spark features with SessionStateBuilder //! //! The recommended way to enable Apache Spark compatibility is to use the -//! [`with_spark_features`] method on [`SessionStateBuilder`]. This registers all +//! [`SessionStateBuilderSpark`] extension trait. This registers all //! Apache Spark functions (scalar, aggregate, window, and table) as well as the Apache Spark //! expression planner. //! -//! Note: This requires the `spark` feature to be enabled in the `datafusion` crate -//! +//! Enable the `core` feature in your `Cargo.toml`: +//! ```toml +//! datafusion-spark = { version = "X", features = ["core"] } //! ``` -//! use datafusion::execution::session_state::SessionStateBuilder; -//! use datafusion::prelude::SessionContext; +//! +//! Then use the extension trait: +//! ```ignore +//! use datafusion_core::execution::SessionStateBuilder; +//! use datafusion_spark::SessionStateBuilderSpark; //! //! // Create a SessionState with Apache Spark features enabled //! // note: the order matters here, `with_spark_features` should be @@ -114,17 +118,19 @@ //! .with_default_features() //! .with_spark_features() //! .build(); -//! -//! // Create a SessionContext using this state -//! let ctx = SessionContext::new_with_state(state); //! ``` //! -//! [`with_spark_features`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html#method.with_spark_features -//! [`SessionStateBuilder`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html +//! [`SessionStateBuilderSpark`]: crate::SessionStateBuilderSpark pub mod function; pub mod planner; +#[cfg(feature = "core")] +mod session_state; + +#[cfg(feature = "core")] +pub use session_state::SessionStateBuilderSpark; + use datafusion_catalog::TableFunction; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; diff --git a/datafusion/spark/src/session_state.rs b/datafusion/spark/src/session_state.rs new file mode 100644 index 0000000000000..e735dfb04870c --- /dev/null +++ b/datafusion/spark/src/session_state.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extension trait for adding Apache Spark features to [`SessionStateBuilder`]. +//! +//! This module provides the [`SessionStateBuilderSpark`] trait which adds +//! Apache Spark compatibility features to DataFusion's `SessionStateBuilder`. +//! +//! # Example +//! +//! ```rust +//! use datafusion_core::execution::session_state::SessionStateBuilder; +//! use datafusion_spark::SessionStateBuilderSpark; +//! +//! // Create a SessionState with Apache Spark features enabled +//! // note: the order matters here, `with_spark_features` should be +//! // called after `with_default_features` to overwrite any existing functions +//! let state = SessionStateBuilder::new() +//! .with_default_features() +//! .with_spark_features() +//! .build(); +//! ``` + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::execution::session_state::SessionStateBuilder; + +use crate::planner::SparkFunctionPlanner; +use crate::{ + all_default_aggregate_functions, all_default_scalar_functions, + all_default_table_functions, all_default_window_functions, +}; + +/// Extension trait for adding Apache Spark features to [`SessionStateBuilder`]. +/// +/// This trait provides a convenient way to register all Apache Spark-compatible +/// functions and planners with a DataFusion session. +pub trait SessionStateBuilderSpark { + /// Adds all expr_planners, scalar, aggregate, window and table functions + /// compatible with Apache Spark. + /// + /// Note: This overwrites any previously registered items with the same name. + fn with_spark_features(self) -> Self; +} + +impl SessionStateBuilderSpark for SessionStateBuilder { + fn with_spark_features(mut self) -> Self { + self.expr_planners() + .get_or_insert_with(Vec::new) + // planners are evaluated in order of insertion. Push Apache Spark function planner to the front + // to take precedence over others + .insert(0, Arc::new(SparkFunctionPlanner)); + + self.scalar_functions() + .get_or_insert_with(Vec::new) + .extend(all_default_scalar_functions()); + + self.aggregate_functions() + .get_or_insert_with(Vec::new) + .extend(all_default_aggregate_functions()); + + self.window_functions() + .get_or_insert_with(Vec::new) + .extend(all_default_window_functions()); + + self.table_functions() + .get_or_insert_with(HashMap::new) + .extend( + all_default_table_functions() + .into_iter() + .map(|f| (f.name().to_string(), f)), + ); + + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_state_with_spark_features() { + let state = SessionStateBuilder::new().with_spark_features().build(); + + assert!( + state.scalar_functions().contains_key("sha2"), + "Apache Spark scalar function 'sha2' should be registered" + ); + + assert!( + state.aggregate_functions().contains_key("try_sum"), + "Apache Spark aggregate function 'try_sum' should be registered" + ); + + assert!( + !state.expr_planners().is_empty(), + "Apache Spark expr planners should be registered" + ); + } +} diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 4c8860ab40270..ca5c126b91d1a 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -46,7 +46,8 @@ bigdecimal = { workspace = true } bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.53", features = ["derive", "env"] } -datafusion = { workspace = true, default-features = true, features = ["avro", "spark"] } +datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 06c63320edaaf..8bd0cabcb05b0 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -46,6 +46,7 @@ use datafusion::{ datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; +use datafusion_spark::SessionStateBuilderSpark; use crate::is_spark_path; use async_trait::async_trait; From a07eab4dade2c10ab983a1a191a8d3f633cb3966 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 19:27:17 +0400 Subject: [PATCH 05/10] feat: Remove datafusion-spark core feature check from CI workflow --- .github/workflows/rust.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 94a946d869fcf..4c66f9bc4d2ca 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -217,8 +217,6 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=serde - name: Check datafusion (sql) run: cargo check --profile ci --no-default-features -p datafusion --features=sql - - name: Check datafusion-spark (core) - run: cargo check --profile ci -p datafusion-spark --features=core - name: Check datafusion (string_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=string_expressions - name: Check datafusion (unicode_expressions) From 6a9625466e1b69bd98285f9a94d97cb8db36173c Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 19:28:06 +0400 Subject: [PATCH 06/10] f --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4c66f9bc4d2ca..6235679d5a05d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -778,4 +778,4 @@ jobs: run: cargo msrv --output-format json --log-target stdout verify - name: Check datafusion-proto working-directory: datafusion/proto - run: cargo msrv --output-format json --log-target stdout verify + run: cargo msrv --output-format json --log-target stdout verify \ No newline at end of file From 5e44faf524a87ca5867b720a4895ba96751a1264 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 19:29:30 +0400 Subject: [PATCH 07/10] refactor: Remove unnecessary documentation for SessionStateBuilderSpark --- datafusion/spark/src/session_state.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/datafusion/spark/src/session_state.rs b/datafusion/spark/src/session_state.rs index e735dfb04870c..8e5ebd48134b8 100644 --- a/datafusion/spark/src/session_state.rs +++ b/datafusion/spark/src/session_state.rs @@ -15,26 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Extension trait for adding Apache Spark features to [`SessionStateBuilder`]. -//! -//! This module provides the [`SessionStateBuilderSpark`] trait which adds -//! Apache Spark compatibility features to DataFusion's `SessionStateBuilder`. -//! -//! # Example -//! -//! ```rust -//! use datafusion_core::execution::session_state::SessionStateBuilder; -//! use datafusion_spark::SessionStateBuilderSpark; -//! -//! // Create a SessionState with Apache Spark features enabled -//! // note: the order matters here, `with_spark_features` should be -//! // called after `with_default_features` to overwrite any existing functions -//! let state = SessionStateBuilder::new() -//! .with_default_features() -//! .with_spark_features() -//! .build(); -//! ``` - use std::collections::HashMap; use std::sync::Arc; From 078a1bf53e36c810cbe545c2c5b153cba9cc3a27 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 19:38:54 +0400 Subject: [PATCH 08/10] fix: Update documentation for SessionStateBuilderSpark usage example --- datafusion/spark/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index f9c130cc92e76..270f56d2e3245 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -97,7 +97,7 @@ //! # Example: enabling Apache Spark features with SessionStateBuilder //! //! The recommended way to enable Apache Spark compatibility is to use the -//! [`SessionStateBuilderSpark`] extension trait. This registers all +//! `SessionStateBuilderSpark` extension trait. This registers all //! Apache Spark functions (scalar, aggregate, window, and table) as well as the Apache Spark //! expression planner. //! @@ -119,8 +119,6 @@ //! .with_spark_features() //! .build(); //! ``` -//! -//! [`SessionStateBuilderSpark`]: crate::SessionStateBuilderSpark pub mod function; pub mod planner; From 90faba892bbd781dfc2717c5cd63bd1d9e4938cc Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 19 Jan 2026 12:01:20 +0400 Subject: [PATCH 09/10] fix: Correct code block syntax in documentation for SessionStateBuilderSpark --- datafusion/spark/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index 270f56d2e3245..0421c43dacdd3 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -107,7 +107,7 @@ //! ``` //! //! Then use the extension trait: -//! ```ignore +//! ```rust //! use datafusion_core::execution::SessionStateBuilder; //! use datafusion_spark::SessionStateBuilderSpark; //! From 5f89e47aeb8596253f0ef37c121158eba0f5cffe Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 19 Jan 2026 12:53:19 +0400 Subject: [PATCH 10/10] fix: Update documentation for SessionStateBuilderSpark example and usage --- datafusion/spark/src/lib.rs | 15 ++------------- datafusion/spark/src/session_state.rs | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index 0421c43dacdd3..ff32a9bb1fc69 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -106,19 +106,8 @@ //! datafusion-spark = { version = "X", features = ["core"] } //! ``` //! -//! Then use the extension trait: -//! ```rust -//! use datafusion_core::execution::SessionStateBuilder; -//! use datafusion_spark::SessionStateBuilderSpark; -//! -//! // Create a SessionState with Apache Spark features enabled -//! // note: the order matters here, `with_spark_features` should be -//! // called after `with_default_features` to overwrite any existing functions -//! let state = SessionStateBuilder::new() -//! .with_default_features() -//! .with_spark_features() -//! .build(); -//! ``` +//! Then use the extension trait - see [`SessionStateBuilderSpark::with_spark_features`] +//! for an example. pub mod function; pub mod planner; diff --git a/datafusion/spark/src/session_state.rs b/datafusion/spark/src/session_state.rs index 8e5ebd48134b8..e39de3a5888ea 100644 --- a/datafusion/spark/src/session_state.rs +++ b/datafusion/spark/src/session_state.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; -use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::execution::SessionStateBuilder; use crate::planner::SparkFunctionPlanner; use crate::{ @@ -30,6 +30,21 @@ use crate::{ /// /// This trait provides a convenient way to register all Apache Spark-compatible /// functions and planners with a DataFusion session. +/// +/// # Example +/// +/// ```rust +/// use datafusion::execution::SessionStateBuilder; +/// use datafusion_spark::SessionStateBuilderSpark; +/// +/// // Create a SessionState with Apache Spark features enabled +/// // note: the order matters here, `with_spark_features` should be +/// // called after `with_default_features` to overwrite any existing functions +/// let state = SessionStateBuilder::new() +/// .with_default_features() +/// .with_spark_features() +/// .build(); +/// ``` pub trait SessionStateBuilderSpark { /// Adds all expr_planners, scalar, aggregate, window and table functions /// compatible with Apache Spark.