From ee74e2acd508396b868fd5a3e84567a5eb1b1bb1 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 16:30:44 +0400 Subject: [PATCH 1/8] feat: add rand_distr dependency to Cargo.toml files --- Cargo.lock | 1 + Cargo.toml | 1 + datafusion/core/Cargo.toml | 2 +- datafusion/spark/Cargo.toml | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0e9337b50e6f2..23973b5db5463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2577,6 +2577,7 @@ dependencies = [ "log", "percent-encoding", "rand 0.9.2", + "rand_distr", "sha1", "url", ] diff --git a/Cargo.toml b/Cargo.toml index e2bbf2ea9885f..5c5e7e35ea45a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,6 +177,7 @@ pbjson-types = "0.8" # Should match arrow-flight's version of prost. prost = "0.14.1" rand = "0.9" +rand_distr = "0.5.1" recursive = "0.1.1" regex = "1.12" rstest = "0.26.1" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5c7e944e59f7b..e4b949538b024 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -176,7 +176,7 @@ glob = { workspace = true } insta = { workspace = true } paste = { workspace = true } rand = { workspace = true, features = ["small_rng"] } -rand_distr = "0.5" +rand_distr = { workspace = true } recursive = { workspace = true } regex = { workspace = true } rstest = { workspace = true } diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index ad2620a532f24..b5d9fd8bba7a2 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -53,6 +53,7 @@ datafusion-functions-nested = { workspace = true } log = { workspace = true } percent-encoding = "2.3.2" rand = { workspace = true } +rand_distr = { workspace = true } sha1 = "0.10" url = { workspace = true } From 6631390f44bb1cddc5df341e571263b15bb31b4d Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 16:33:00 +0400 Subject: [PATCH 2/8] feat(spark): Add random functions --- datafusion/spark/src/function/math/mod.rs | 22 ++ datafusion/spark/src/function/math/random.rs | 325 +++++++++++++++++++ 2 files changed, 347 insertions(+) create mode 100644 datafusion/spark/src/function/math/random.rs diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 1422eb250d939..492aea540939a 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -20,6 +20,7 @@ pub mod expm1; pub mod factorial; pub mod hex; pub mod modulus; +pub mod random; pub mod rint; pub mod trigonometry; pub mod width_bucket; @@ -34,6 +35,9 @@ make_udf_function!(factorial::SparkFactorial, factorial); make_udf_function!(hex::SparkHex, hex); make_udf_function!(modulus::SparkMod, modulus); make_udf_function!(modulus::SparkPmod, pmod); +make_udf_function!(random::SparkRandom, random); +make_udf_function!(random::SparkRandN, randn); +make_udf_function!(random::SparkRandStr, randstr); make_udf_function!(rint::SparkRint, rint); make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); make_udf_function!(trigonometry::SparkCsc, csc); @@ -60,6 +64,21 @@ pub mod expr_fn { export_functions!((width_bucket, "Returns the bucket number into which the value of this expression would fall after being evaluated.", arg1 arg2 arg3 arg4)); export_functions!((csc, "Returns the cosecant of expr.", arg1)); export_functions!((sec, "Returns the secant of expr.", arg1)); + export_functions!(( + random, + "Returns a random float value sampled from a uniform distribution in [0, 1).", + opt_seed + )); + export_functions!(( + randn, + "Returns a random float value sampled from the standard normal distribution.", + opt_seed + )); + export_functions!(( + randstr, + "Returns a random string of the specified length.", + length opt_seed + )); } pub fn functions() -> Vec> { @@ -74,5 +93,8 @@ pub fn functions() -> Vec> { width_bucket(), csc(), sec(), + random(), + randn(), + randstr(), ] } diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs new file mode 100644 index 0000000000000..1ea4854708a8e --- /dev/null +++ b/datafusion/spark/src/function/math/random.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Float64Array, StringArray}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::types::{NativeType, logical_int32, logical_int64}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err, plan_err}; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, TypeSignature, + TypeSignatureClass, +}; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use datafusion_functions::expr_fn::random; +use rand::rngs::SmallRng; +use rand::{Rng, RngCore, SeedableRng, rng}; +use rand_distr::{Alphanumeric, StandardNormal, Uniform}; + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkRandom { + signature: Signature, + aliases: Vec, +} + +impl Default for SparkRandom { + fn default() -> Self { + SparkRandom::new() + } +} + +impl SparkRandom { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Nullary, + TypeSignature::Coercible(vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_int64()), + vec![TypeSignatureClass::Integer], + NativeType::Int64, + )]), + ], + Volatility::Volatile, + ), + aliases: vec!["rand".to_string()], + } + } +} + +impl ScalarUDFImpl for SparkRandom { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "random" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Float64, false))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [seed] = take_function_args(self.name(), args.args)?; + + let seed = match seed { + ColumnarValue::Scalar(ScalarValue::Int64(Some(val))) => val as u64, + ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, + _ => { + return exec_err!( + "`{}` function expects an Int64 or Int32 seed argument", + self.name() + ); + } + }; + + let mut rng = SmallRng::seed_from_u64(seed); + let uniform = + Uniform::new(0.0, 1.0).expect("Failed to create uniform distribution"); + + let array: Float64Array = (0..args.number_rows) + .map(|_| Some(rng.sample(uniform))) + .collect(); + + Ok(ColumnarValue::Array(Arc::new(array))) + } + + fn simplify( + &self, + args: Vec, + _info: &SimplifyContext, + ) -> Result { + // if no seed is provided, we can simplify to Datafusion built-in random() + match args.len() { + 0 => Ok(ExprSimplifyResult::Simplified(random())), + 1 => Ok(ExprSimplifyResult::Original(args)), + _ => plan_err!("`{}` function expects 0 or 1 argument(s)", self.name()), + } + } +} + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkRandN { + signature: Signature, +} + +impl Default for SparkRandN { + fn default() -> Self { + SparkRandN::new() + } +} + +impl SparkRandN { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Nullary, + TypeSignature::Coercible(vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_int64()), + vec![TypeSignatureClass::Integer], + NativeType::Int64, + )]), + ], + Volatility::Volatile, + ), + } + } +} + +impl ScalarUDFImpl for SparkRandN { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "randn" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Float64, false))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let seed = match args.args.len() { + // Apache Spark uses a random seed when none is provided + 0 => rng().next_u64(), + 1 => match args.args[0] { + ColumnarValue::Scalar(ScalarValue::Int64(Some(val))) => val as u64, + // Apache Spark uses a seed of 0 when NULL is provided + ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, + _ => { + return exec_err!( + "`{}` function expects an Int64 seed argument", + self.name() + ); + } + }, + _ => { + return exec_err!( + "`{}` function expects 0 or 1 argument(s)", + self.name() + ); + } + }; + + let mut rng = SmallRng::seed_from_u64(seed); + let array: Float64Array = (0..args.number_rows) + .map(|_| Some(rng.sample(StandardNormal))) + .collect(); + Ok(ColumnarValue::Array(Arc::new(array))) + } +} + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkRandStr { + signature: Signature, +} + +impl Default for SparkRandStr { + fn default() -> Self { + SparkRandStr::new() + } +} + +impl SparkRandStr { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Coercible(vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_int32()), + vec![TypeSignatureClass::Integer], + NativeType::Int32, + )]), + TypeSignature::Coercible(vec![ + Coercion::new_implicit( + TypeSignatureClass::Native(logical_int32()), + vec![TypeSignatureClass::Integer], + NativeType::Int32, + ), + Coercion::new_implicit( + TypeSignatureClass::Native(logical_int64()), + vec![TypeSignatureClass::Integer], + NativeType::Int64, + ), + ]), + ], + Volatility::Volatile, + ), + } + } +} + +impl ScalarUDFImpl for SparkRandStr { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "randstr" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Utf8, false))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let length = match args.args[0] { + ColumnarValue::Scalar(ScalarValue::Int32(Some(val))) if val > 0 => { + val as usize + } + ColumnarValue::Scalar(ScalarValue::Int32(None)) => 10, + _ => { + return exec_err!( + "`{}` function expects a positive Int32 length argument", + self.name() + ); + } + }; + + let seed = match args.args.len() { + // Apache Spark uses a random seed when none is provided + 1 => rng().next_u64(), + 2 => match args.args[1] { + ColumnarValue::Scalar(ScalarValue::Int64(Some(val))) => val as u64, + // Apache Spark uses a seed of 0 when NULL is provided + ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, + _ => { + return exec_err!( + "`{}` function expects an Int64 seed argument", + self.name() + ); + } + }, + _ => { + return exec_err!( + "`{}` function expects 1 or 2 argument(s)", + self.name() + ); + } + }; + + let mut rng = SmallRng::seed_from_u64(seed); + let values: StringArray = (0..args.number_rows) + .map(|_| { + let s: String = (0..length) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + Some(s) + }) + .collect(); + + Ok(ColumnarValue::Array(Arc::new(values))) + } +} From 51efae4d50a246e947a70b113554e8051a36be9b Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 16:33:12 +0400 Subject: [PATCH 3/8] test(spark): test random functions --- .../test_files/spark/math/rand.slt | 37 ------ .../test_files/spark/math/randn.slt | 37 ------ .../test_files/spark/math/random.slt | 105 +++++++++++++++--- 3 files changed, 91 insertions(+), 88 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/spark/math/rand.slt delete mode 100644 datafusion/sqllogictest/test_files/spark/math/randn.slt diff --git a/datafusion/sqllogictest/test_files/spark/math/rand.slt b/datafusion/sqllogictest/test_files/spark/math/rand.slt deleted file mode 100644 index 53b4c6f822218..0000000000000 --- a/datafusion/sqllogictest/test_files/spark/math/rand.slt +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT rand(); -## PySpark 3.5.5 Result: {'rand()': 0.949892358232337, 'typeof(rand())': 'double'} -#query -#SELECT rand(); - -## Original Query: SELECT rand(0); -## PySpark 3.5.5 Result: {'rand(0)': 0.7604953758285915, 'typeof(rand(0))': 'double', 'typeof(0)': 'int'} -#query -#SELECT rand(0::int); - -## Original Query: SELECT rand(null); -## PySpark 3.5.5 Result: {'rand(NULL)': 0.7604953758285915, 'typeof(rand(NULL))': 'double', 'typeof(NULL)': 'void'} -#query -#SELECT rand(NULL::void); diff --git a/datafusion/sqllogictest/test_files/spark/math/randn.slt b/datafusion/sqllogictest/test_files/spark/math/randn.slt deleted file mode 100644 index daf81babd02c4..0000000000000 --- a/datafusion/sqllogictest/test_files/spark/math/randn.slt +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT randn(); -## PySpark 3.5.5 Result: {'randn()': 1.498983714060803, 'typeof(randn())': 'double'} -#query -#SELECT randn(); - -## Original Query: SELECT randn(0); -## PySpark 3.5.5 Result: {'randn(0)': 1.6034991609278433, 'typeof(randn(0))': 'double', 'typeof(0)': 'int'} -#query -#SELECT randn(0::int); - -## Original Query: SELECT randn(null); -## PySpark 3.5.5 Result: {'randn(NULL)': 1.6034991609278433, 'typeof(randn(NULL))': 'double', 'typeof(NULL)': 'void'} -#query -#SELECT randn(NULL::void); diff --git a/datafusion/sqllogictest/test_files/spark/math/random.slt b/datafusion/sqllogictest/test_files/spark/math/random.slt index 280a81b8888c0..21d1e0ff6e018 100644 --- a/datafusion/sqllogictest/test_files/spark/math/random.slt +++ b/datafusion/sqllogictest/test_files/spark/math/random.slt @@ -21,17 +21,94 @@ # For more information, please see: # https://github.com/apache/datafusion/issues/15914 -## Original Query: SELECT random(); -## PySpark 3.5.5 Result: {'rand()': 0.7460731389309176, 'typeof(rand())': 'double'} -#query -#SELECT random(); - -## Original Query: SELECT random(0); -## PySpark 3.5.5 Result: {'rand(0)': 0.7604953758285915, 'typeof(rand(0))': 'double', 'typeof(0)': 'int'} -#query -#SELECT random(0::int); - -## Original Query: SELECT random(null); -## PySpark 3.5.5 Result: {'rand(NULL)': 0.7604953758285915, 'typeof(rand(NULL))': 'double', 'typeof(NULL)': 'void'} -#query -#SELECT random(NULL::void); +query R +SELECT random(42::integer); +---- +0.81430514512291 + +query R +SELECT random(0::integer); +---- +0.324575268031407 + +query R +SELECT random(NULL::integer); +---- +0.324575268031407 + +query BBB +SELECT + random() BETWEEN 0.0 AND 1.0, + random() = random(), + random(1) = random(1); +---- +true false true + +query R +SELECT rand(42::integer); +---- +0.81430514512291 + +query R +SELECT rand(0::integer); +---- +0.324575268031407 + +query R +SELECT rand(NULL::integer); +---- +0.324575268031407 + +query BBB +SELECT + rand() BETWEEN 0.0 AND 1.0, + rand() = rand(), + rand(1) = rand(1); +---- +true false true + + +query R +SELECT randn(42::integer); +---- +0.834397546843796 + +query R +SELECT randn(0::integer); +---- +-0.292803298604708 + +query R +SELECT randn(NULL::integer); +---- +-0.292803298604708 + + +query BB +SELECT + randn() = randn(), + randn(1) = randn(1); +---- +false true + +query T +SELECT randstr(3::integer, 42::integer); +---- +0Us + +query T +SELECT randstr(3::integer, 0::integer); +---- +UYX + +query T +SELECT randstr(3::integer, NULL::integer); +---- +UYX + +query BB +SELECT + randstr(10) = randstr(10), + randstr(10, 2) = randstr(10, 2); +---- +false true From c833133f120849e27d09ec7f12bc5ae5b14548fc Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 17:39:09 +0400 Subject: [PATCH 4/8] feat(spark): add uniform function --- datafusion/spark/src/function/math/mod.rs | 7 + datafusion/spark/src/function/math/random.rs | 134 +++++++++++++++++- .../test_files/spark/math/random.slt | 54 ++++++- 3 files changed, 183 insertions(+), 12 deletions(-) diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 492aea540939a..9c3cf0a518cca 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -39,6 +39,7 @@ make_udf_function!(random::SparkRandom, random); make_udf_function!(random::SparkRandN, randn); make_udf_function!(random::SparkRandStr, randstr); make_udf_function!(rint::SparkRint, rint); +make_udf_function!(random::SparkUniform, uniform); make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); make_udf_function!(trigonometry::SparkCsc, csc); make_udf_function!(trigonometry::SparkSec, sec); @@ -79,6 +80,11 @@ pub mod expr_fn { "Returns a random string of the specified length.", length opt_seed )); + export_functions!(( + uniform, + "Returns a random float value sampled from a uniform distribution in [`min`, `max`).", + min max opt_seed + )); } pub fn functions() -> Vec> { @@ -96,5 +102,6 @@ pub fn functions() -> Vec> { random(), randn(), randstr(), + uniform(), ] } diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs index 1ea4854708a8e..6672247208f3d 100644 --- a/datafusion/spark/src/function/math/random.rs +++ b/datafusion/spark/src/function/math/random.rs @@ -16,20 +16,24 @@ // under the License. use std::any::Any; +use std::ops::{Add, Mul, Sub}; use std::sync::Arc; use arrow::array::{Float64Array, StringArray}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::types::{NativeType, logical_int32, logical_int64}; use datafusion_common::utils::take_function_args; -use datafusion_common::{Result, ScalarValue, exec_err, internal_err, plan_err}; +use datafusion_common::{ + Result, ScalarValue, exec_err, internal_datafusion_err, internal_err, plan_err, +}; +use datafusion_expr::binary::binary_numeric_coercion; +use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ - Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, TypeSignature, - TypeSignatureClass, + Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, + TypeSignature, TypeSignatureClass, }; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; -use datafusion_functions::expr_fn::random; use rand::rngs::SmallRng; use rand::{Rng, RngCore, SeedableRng, rng}; use rand_distr::{Alphanumeric, StandardNormal, Uniform}; @@ -123,7 +127,9 @@ impl ScalarUDFImpl for SparkRandom { ) -> Result { // if no seed is provided, we can simplify to Datafusion built-in random() match args.len() { - 0 => Ok(ExprSimplifyResult::Simplified(random())), + 0 => Ok(ExprSimplifyResult::Simplified( + datafusion_functions::expr_fn::random(), + )), 1 => Ok(ExprSimplifyResult::Original(args)), _ => plan_err!("`{}` function expects 0 or 1 argument(s)", self.name()), } @@ -323,3 +329,121 @@ impl ScalarUDFImpl for SparkRandStr { Ok(ColumnarValue::Array(Arc::new(values))) } } + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUniform { + signature: Signature, +} + +impl Default for SparkUniform { + fn default() -> Self { + SparkUniform::new() + } +} + +impl SparkUniform { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Numeric), + Coercion::new_exact(TypeSignatureClass::Numeric), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Numeric), + Coercion::new_exact(TypeSignatureClass::Numeric), + Coercion::new_implicit( + TypeSignatureClass::Native(logical_int64()), + vec![TypeSignatureClass::Integer], + NativeType::Int64, + ), + ]), + ], + Volatility::Volatile, + ), + } + } + + fn get_return_type(&self, min: &DataType, max: &DataType) -> Result { + let return_type = binary_numeric_coercion(min, max).ok_or_else(|| { + internal_datafusion_err!("Incompatible types for {} function", self.name()) + })?; + Ok(return_type) + } +} + +impl ScalarUDFImpl for SparkUniform { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "uniform" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + if args.arg_fields.iter().any(|f| f.data_type().is_null()) { + return Ok(Arc::new(Field::new(self.name(), DataType::Null, nullable))); + } + + let return_type = self.get_return_type( + args.arg_fields[0].data_type(), + args.arg_fields[1].data_type(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("`invoke_with_args` is not implemented for {}", self.name()) + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let rand_expr = match args.len() { + 2 => Expr::ScalarFunction(ScalarFunction::new_udf( + Arc::new(SparkRandom::new().into()), + vec![], + )), + 3 => Expr::ScalarFunction(ScalarFunction::new_udf( + Arc::new(SparkRandom::new().into()), + vec![args[2].clone()], + )), + _ => { + return plan_err!( + "`{}` function expects 2 or 3 argument(s)", + self.name() + ); + } + }; + + let min = args[0].clone(); + let max = args[1].clone(); + let (_, min_field) = min.to_field(info.schema())?; + let (_, max_field) = max.to_field(info.schema())?; + let return_type = + self.get_return_type(min_field.data_type(), max_field.data_type())?; + + let min = min.cast_to(&DataType::Float64, info.schema())?; + let max = max.cast_to(&DataType::Float64, info.schema())?; + + Ok(ExprSimplifyResult::Simplified( + min.clone() + .add((max.sub(min)).mul(rand_expr)) + .cast_to(&return_type, info.schema())?, + )) + } +} diff --git a/datafusion/sqllogictest/test_files/spark/math/random.slt b/datafusion/sqllogictest/test_files/spark/math/random.slt index 21d1e0ff6e018..6f45386527418 100644 --- a/datafusion/sqllogictest/test_files/spark/math/random.slt +++ b/datafusion/sqllogictest/test_files/spark/math/random.slt @@ -15,12 +15,6 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - query R SELECT random(42::integer); ---- @@ -44,6 +38,7 @@ SELECT ---- true false true + query R SELECT rand(42::integer); ---- @@ -83,7 +78,6 @@ SELECT randn(NULL::integer); ---- -0.292803298604708 - query BB SELECT randn() = randn(), @@ -91,6 +85,7 @@ SELECT ---- false true + query T SELECT randstr(3::integer, 42::integer); ---- @@ -112,3 +107,48 @@ SELECT randstr(10, 2) = randstr(10, 2); ---- false true + + +query I +SELECT uniform(3::integer, 42::integer, 42::integer); +---- +34 + +query I +SELECT uniform(0::integer, 34::integer, 12::integer); +---- +19 + +query I +SELECT uniform(3::integer, 0::integer, 12::integer); +---- +1 + +query R +SELECT uniform(3::float, 42::integer, 42::integer); +---- +34.7579 + +query R +SELECT uniform(3::double, 42::integer, 42::integer); +---- +34.75790065979348 + +query I +SELECT uniform(3::integer, NULL::integer); +---- +NULL + +query I +SELECT uniform(NULL::integer, 3::integer); +---- +NULL + +query BBBB +SELECT + uniform(0, 1) BETWEEN 0 AND 1, + uniform(0.0, 1.0) BETWEEN 0.0 AND 1.0, + uniform(0.0, 1.0) = uniform(0.0, 1.0), + uniform(10, 2, 23) = uniform(10, 2, 23); +---- +true true false true From fc6f9a63dd60b965337b0b7af4a19da8dd86663f Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 17:47:15 +0400 Subject: [PATCH 5/8] fix(spark): update error messages for random functions to specify argument types --- datafusion/spark/src/function/math/random.rs | 3 +-- datafusion/sqllogictest/test_files/spark/math/random.slt | 6 ++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs index 6672247208f3d..8b78eab7af7d2 100644 --- a/datafusion/spark/src/function/math/random.rs +++ b/datafusion/spark/src/function/math/random.rs @@ -103,7 +103,7 @@ impl ScalarUDFImpl for SparkRandom { ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, _ => { return exec_err!( - "`{}` function expects an Int64 or Int32 seed argument", + "`{}` function expects an Int64 seed argument", self.name() ); } @@ -285,7 +285,6 @@ impl ScalarUDFImpl for SparkRandStr { ColumnarValue::Scalar(ScalarValue::Int32(Some(val))) if val > 0 => { val as usize } - ColumnarValue::Scalar(ScalarValue::Int32(None)) => 10, _ => { return exec_err!( "`{}` function expects a positive Int32 length argument", diff --git a/datafusion/sqllogictest/test_files/spark/math/random.slt b/datafusion/sqllogictest/test_files/spark/math/random.slt index 6f45386527418..1ccace29ce319 100644 --- a/datafusion/sqllogictest/test_files/spark/math/random.slt +++ b/datafusion/sqllogictest/test_files/spark/math/random.slt @@ -101,6 +101,12 @@ SELECT randstr(3::integer, NULL::integer); ---- UYX +query error Execution error: `randstr` function expects a positive Int32 length argument +SELECT randstr(NULL::integer, 23::integer); + +query error Execution error: `randstr` function expects a positive Int32 length argument +SELECT randstr(-2::integer, 23::integer); + query BB SELECT randstr(10) = randstr(10), From 7536adea817c871cbf6fc185ab5d9325613b3148 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 17:52:53 +0400 Subject: [PATCH 6/8] fix(spark): update SparkRandom instantiation to use crate::function::math::random --- datafusion/spark/src/function/math/random.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs index 8b78eab7af7d2..eabc3dda7430c 100644 --- a/datafusion/spark/src/function/math/random.rs +++ b/datafusion/spark/src/function/math/random.rs @@ -414,11 +414,11 @@ impl ScalarUDFImpl for SparkUniform { ) -> Result { let rand_expr = match args.len() { 2 => Expr::ScalarFunction(ScalarFunction::new_udf( - Arc::new(SparkRandom::new().into()), + crate::function::math::random(), vec![], )), 3 => Expr::ScalarFunction(ScalarFunction::new_udf( - Arc::new(SparkRandom::new().into()), + crate::function::math::random(), vec![args[2].clone()], )), _ => { From a28defaae99f1e139155a67723e44976d7db0f43 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Wed, 21 Jan 2026 14:07:27 +0400 Subject: [PATCH 7/8] fix --- datafusion/spark/src/function/math/random.rs | 8 ++++---- datafusion/sqllogictest/test_files/spark/math/random.slt | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs index eabc3dda7430c..3bf67768e1cce 100644 --- a/datafusion/spark/src/function/math/random.rs +++ b/datafusion/spark/src/function/math/random.rs @@ -103,7 +103,7 @@ impl ScalarUDFImpl for SparkRandom { ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, _ => { return exec_err!( - "`{}` function expects an Int64 seed argument", + "`{}` function expects a constant Int64 seed argument", self.name() ); } @@ -197,7 +197,7 @@ impl ScalarUDFImpl for SparkRandN { ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, _ => { return exec_err!( - "`{}` function expects an Int64 seed argument", + "`{}` function expects a constant Int64 seed argument", self.name() ); } @@ -282,12 +282,12 @@ impl ScalarUDFImpl for SparkRandStr { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let length = match args.args[0] { - ColumnarValue::Scalar(ScalarValue::Int32(Some(val))) if val > 0 => { + ColumnarValue::Scalar(ScalarValue::Int32(Some(val))) if val >= 0 => { val as usize } _ => { return exec_err!( - "`{}` function expects a positive Int32 length argument", + "`{}` function expects a constant positive Int32 length argument", self.name() ); } diff --git a/datafusion/sqllogictest/test_files/spark/math/random.slt b/datafusion/sqllogictest/test_files/spark/math/random.slt index 1ccace29ce319..57a81ffb729b9 100644 --- a/datafusion/sqllogictest/test_files/spark/math/random.slt +++ b/datafusion/sqllogictest/test_files/spark/math/random.slt @@ -101,6 +101,11 @@ SELECT randstr(3::integer, NULL::integer); ---- UYX +query T +SELECT randstr(0::integer); +---- +(empty) + query error Execution error: `randstr` function expects a positive Int32 length argument SELECT randstr(NULL::integer, 23::integer); From b5870f339d84a40128a0d47dc2704e99ab881f88 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Wed, 21 Jan 2026 14:20:51 +0400 Subject: [PATCH 8/8] fix(spark): update error messages to specify constant arguments for random functions --- datafusion/spark/src/function/math/random.rs | 6 ++++- .../test_files/spark/math/random.slt | 27 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/math/random.rs b/datafusion/spark/src/function/math/random.rs index 3bf67768e1cce..385ee0f23f66d 100644 --- a/datafusion/spark/src/function/math/random.rs +++ b/datafusion/spark/src/function/math/random.rs @@ -302,7 +302,7 @@ impl ScalarUDFImpl for SparkRandStr { ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, _ => { return exec_err!( - "`{}` function expects an Int64 seed argument", + "`{}` function expects a constant Int64 seed argument", self.name() ); } @@ -429,6 +429,10 @@ impl ScalarUDFImpl for SparkUniform { } }; + if args.iter().any(|e| e.as_literal().is_none()) { + return plan_err!("arguments of `{}` should be literals", self.name()); + } + let min = args[0].clone(); let max = args[1].clone(); let (_, min_field) = min.to_field(info.schema())?; diff --git a/datafusion/sqllogictest/test_files/spark/math/random.slt b/datafusion/sqllogictest/test_files/spark/math/random.slt index 57a81ffb729b9..1f319e1f94837 100644 --- a/datafusion/sqllogictest/test_files/spark/math/random.slt +++ b/datafusion/sqllogictest/test_files/spark/math/random.slt @@ -38,6 +38,10 @@ SELECT ---- true false true +query error `random` function expects a constant Int64 seed argument +SELECT random(column1) +FROM VALUES +(1); query R SELECT rand(42::integer); @@ -85,6 +89,11 @@ SELECT ---- false true +query error `randn` function expects a constant Int64 seed argument +SELECT randn(column1) +FROM VALUES +(1); + query T SELECT randstr(3::integer, 42::integer); @@ -106,10 +115,10 @@ SELECT randstr(0::integer); ---- (empty) -query error Execution error: `randstr` function expects a positive Int32 length argument +query error Execution error: `randstr` function expects a constant positive Int32 length argument SELECT randstr(NULL::integer, 23::integer); -query error Execution error: `randstr` function expects a positive Int32 length argument +query error Execution error: `randstr` function expects a constant positive Int32 length argument SELECT randstr(-2::integer, 23::integer); query BB @@ -119,6 +128,15 @@ SELECT ---- false true +query error `randstr` function expects a constant positive Int32 length argument +SELECT randstr(column1, 12::integer) +FROM VALUES +(1); + +query error `randstr` function expects a constant Int64 seed argument +SELECT randstr(12::integer, column1) +FROM VALUES +(1); query I SELECT uniform(3::integer, 42::integer, 42::integer); @@ -163,3 +181,8 @@ SELECT uniform(10, 2, 23) = uniform(10, 2, 23); ---- true true false true + +query error Error during planning: arguments of `uniform` should be literals +SELECT uniform(column1, column2) +FROM VALUES +(1, 10);