From ae38f66f328e0a9a2fa28124431c520c769584b7 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 11:57:02 +0400 Subject: [PATCH 1/4] feat(spark): add unix date and timestamp functions --- datafusion/spark/src/function/datetime/mod.rs | 29 +++ .../spark/src/function/datetime/unix.rs | 188 ++++++++++++++++++ .../test_files/spark/datetime/unix.slt | 134 +++++++++++++ 3 files changed, 351 insertions(+) create mode 100644 datafusion/spark/src/function/datetime/unix.rs create mode 100644 datafusion/sqllogictest/test_files/spark/datetime/unix.slt diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 3535df963e209..315735f1a50fe 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -30,6 +30,7 @@ pub mod next_day; pub mod time_trunc; pub mod to_utc_timestamp; pub mod trunc; +pub mod unix; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -55,6 +56,10 @@ make_udf_function!(next_day::SparkNextDay, next_day); make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc); make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp); make_udf_function!(trunc::SparkTrunc, trunc); +make_udf_function!(unix::SparkUnixDate, unix_date); +make_udf_function!(unix::SparkUnixMicros, unix_micros); +make_udf_function!(unix::SparkUnixMillis, unix_millis); +make_udf_function!(unix::SparkUnixSeconds, unix_seconds); pub mod expr_fn { use datafusion_functions::export_functions; @@ -142,6 +147,26 @@ pub mod expr_fn { "Interpret a given timestamp `ts` in timezone `tz` and then convert it to UTC timezone.", ts tz )); + export_functions!(( + unix_date, + "Returns the number of days since epoch (1970-01-01) for the given date `dt`.", + dt + )); + export_functions!(( + unix_micros, + "Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.", + ts + )); + export_functions!(( + unix_millis, + "Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.", + ts + )); + export_functions!(( + unix_seconds, + "Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.", + ts + )); } pub fn functions() -> Vec> { @@ -163,5 +188,9 @@ pub fn functions() -> Vec> { time_trunc(), to_utc_timestamp(), trunc(), + unix_date(), + unix_micros(), + unix_millis(), + unix_seconds(), ] } diff --git a/datafusion/spark/src/function/datetime/unix.rs b/datafusion/spark/src/function/datetime/unix.rs new file mode 100644 index 0000000000000..ad85896430336 --- /dev/null +++ b/datafusion/spark/src/function/datetime/unix.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use datafusion_common::types::logical_date; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, internal_err}; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, TypeSignatureClass, Volatility, +}; + +/// Returns the number of days since epoch (1970-01-01) for the given date. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixDate { + signature: Signature, +} + +impl Default for SparkUnixDate { + fn default() -> Self { + Self::new() + } +} + +impl SparkUnixDate { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_exact(TypeSignatureClass::Native( + logical_date(), + ))], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkUnixDate { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unix_date" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields[0].is_nullable(); + Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("invoke_with_args should not be called on SparkUnixDate") + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [date] = take_function_args(self.name(), args)?; + Ok(ExprSimplifyResult::Simplified( + date.cast_to(&DataType::Date32, info.schema())? + .cast_to(&DataType::Int32, info.schema())?, + )) + } +} + +/// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixMicros { + signature: Signature, +} + +/// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixMillis { + signature: Signature, +} + +/// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixSeconds { + signature: Signature, +} + +macro_rules! define_unix_timestamp_udf { + ($func:ty, $func_name:literal, $time_unit:expr) => { + impl Default for $func { + fn default() -> Self { + Self::new() + } + } + + impl $func { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_exact(TypeSignatureClass::Timestamp)], + Volatility::Immutable, + ), + } + } + } + + impl ScalarUDFImpl for $func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + $func_name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields[0].is_nullable(); + Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable))) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + internal_err!( + "invoke_with_args should not be called on `{}`", + self.name() + ) + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [date] = take_function_args(self.name(), args)?; + Ok(ExprSimplifyResult::Simplified( + date.cast_to( + &DataType::Timestamp($time_unit, Some("UTC".into())), + info.schema(), + )? + .cast_to(&DataType::Int64, info.schema())?, + )) + } + } + }; +} + +define_unix_timestamp_udf!(SparkUnixMicros, "unix_micros", TimeUnit::Microsecond); +define_unix_timestamp_udf!(SparkUnixMillis, "unix_millis", TimeUnit::Millisecond); +define_unix_timestamp_udf!(SparkUnixSeconds, "unix_seconds", TimeUnit::Second); diff --git a/datafusion/sqllogictest/test_files/spark/datetime/unix.slt b/datafusion/sqllogictest/test_files/spark/datetime/unix.slt new file mode 100644 index 0000000000000..f6e057bb8d664 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/datetime/unix.slt @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Unix Date tests + +query I +SELECT unix_date('1970-01-02'::date); +---- +1 + +query I +SELECT unix_date('1900-01-02'::date); +---- +-25566 + + +query I +SELECT unix_date(arrow_cast('1970-01-02', 'Date64')); +---- +1 + +query I +SELECT unix_date(NULL::date); +---- +NULL + +query error Expect TypeSignatureClass::Native\(LogicalType\(Native\(Date\), Date\)\) but received NativeType::String, DataType: Utf8View +SELECT unix_date('1970-01-02'::string); + +# Unix Micro Tests + +query I +SELECT unix_micros('1970-01-01 00:00:01Z'::timestamp); +---- +1000000 + +query I +SELECT unix_micros('1900-01-01 00:00:01Z'::timestamp); +---- +-2208988799000000 + +query I +SELECT unix_micros(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)')); +---- +-7199000000 + +query I +SELECT unix_micros(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)')); +---- +1000000 + +query I +SELECT unix_micros(NULL::timestamp); +---- +NULL + +query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +SELECT unix_micros('1970-01-01 00:00:01Z'::string); + + +# Unix Millis Tests + +query I +SELECT unix_millis('1970-01-01 00:00:01Z'::timestamp); +---- +1000 + +query I +SELECT unix_millis('1900-01-01 00:00:01Z'::timestamp); +---- +-2208988799000 + +query I +SELECT unix_millis(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)')); +---- +-7199000 + +query I +SELECT unix_millis(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)')); +---- +1000 + +query I +SELECT unix_millis(NULL::timestamp); +---- +NULL + +query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +SELECT unix_millis('1970-01-01 00:00:01Z'::string); + + +# Unix Seconds Tests + +query I +SELECT unix_seconds('1970-01-01 00:00:01Z'::timestamp); +---- +1 + +query I +SELECT unix_seconds('1900-01-01 00:00:01Z'::timestamp); +---- +-2208988799 + +query I +SELECT unix_seconds(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)')); +---- +-7199 + +query I +SELECT unix_seconds(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)')); +---- +1 + +query I +SELECT unix_seconds(NULL::timestamp); +---- +NULL + +query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +SELECT unix_seconds('1970-01-01 00:00:01Z'::string); From 7488bd8bdcbb296c4fab89988b185969fb440432 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 20 Jan 2026 12:00:46 +0400 Subject: [PATCH 2/4] fix(spark): correct argument name in unix timestamp UDF --- datafusion/spark/src/function/datetime/unix.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/datetime/unix.rs b/datafusion/spark/src/function/datetime/unix.rs index ad85896430336..6f668c45937ac 100644 --- a/datafusion/spark/src/function/datetime/unix.rs +++ b/datafusion/spark/src/function/datetime/unix.rs @@ -170,9 +170,9 @@ macro_rules! define_unix_timestamp_udf { args: Vec, info: &SimplifyContext, ) -> Result { - let [date] = take_function_args(self.name(), args)?; + let [ts] = take_function_args(self.name(), args)?; Ok(ExprSimplifyResult::Simplified( - date.cast_to( + ts.cast_to( &DataType::Timestamp($time_unit, Some("UTC".into())), info.schema(), )? From 9026f44ac084fd749df672a6884f68e47d5521a4 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sun, 25 Jan 2026 08:10:07 +0400 Subject: [PATCH 3/4] feat(spark): refactor unix timestamp functions into a single struct --- datafusion/spark/src/function/datetime/mod.rs | 18 ++- .../spark/src/function/datetime/unix.rs | 152 ++++++++---------- 2 files changed, 84 insertions(+), 86 deletions(-) diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 315735f1a50fe..3133ed7337f25 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -57,9 +57,21 @@ make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc); make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp); make_udf_function!(trunc::SparkTrunc, trunc); make_udf_function!(unix::SparkUnixDate, unix_date); -make_udf_function!(unix::SparkUnixMicros, unix_micros); -make_udf_function!(unix::SparkUnixMillis, unix_millis); -make_udf_function!(unix::SparkUnixSeconds, unix_seconds); +make_udf_function!( + unix::SparkUnixTimestamp, + unix_micros, + unix::SparkUnixTimestamp::microseconds +); +make_udf_function!( + unix::SparkUnixTimestamp, + unix_millis, + unix::SparkUnixTimestamp::milliseconds +); +make_udf_function!( + unix::SparkUnixTimestamp, + unix_seconds, + unix::SparkUnixTimestamp::seconds +); pub mod expr_fn { use datafusion_functions::export_functions; diff --git a/datafusion/spark/src/function/datetime/unix.rs b/datafusion/spark/src/function/datetime/unix.rs index 6f668c45937ac..4254b2ed85d58 100644 --- a/datafusion/spark/src/function/datetime/unix.rs +++ b/datafusion/spark/src/function/datetime/unix.rs @@ -93,96 +93,82 @@ impl ScalarUDFImpl for SparkUnixDate { } } -/// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. -/// #[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkUnixMicros { +pub struct SparkUnixTimestamp { + time_unit: TimeUnit, signature: Signature, + name: &'static str, } -/// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. -/// -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkUnixMillis { - signature: Signature, -} +impl SparkUnixTimestamp { + pub fn new(name: &'static str, time_unit: TimeUnit) -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_exact(TypeSignatureClass::Timestamp)], + Volatility::Immutable, + ), + time_unit, + name, + } + } -/// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. -/// -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkUnixSeconds { - signature: Signature, + /// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. + /// + pub fn microseconds() -> Self { + Self::new("unix_micros", TimeUnit::Microsecond) + } + + /// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. + /// + pub fn milliseconds() -> Self { + Self::new("unix_millis", TimeUnit::Millisecond) + } + + /// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp. + /// + pub fn seconds() -> Self { + Self::new("unix_seconds", TimeUnit::Second) + } } -macro_rules! define_unix_timestamp_udf { - ($func:ty, $func_name:literal, $time_unit:expr) => { - impl Default for $func { - fn default() -> Self { - Self::new() - } - } +impl ScalarUDFImpl for SparkUnixTimestamp { + fn as_any(&self) -> &dyn Any { + self + } - impl $func { - pub fn new() -> Self { - Self { - signature: Signature::coercible( - vec![Coercion::new_exact(TypeSignatureClass::Timestamp)], - Volatility::Immutable, - ), - } - } - } + fn name(&self) -> &str { + self.name + } - impl ScalarUDFImpl for $func { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - $func_name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - internal_err!("return_field_from_args should be used instead") - } - - fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let nullable = args.arg_fields[0].is_nullable(); - Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable))) - } - - fn invoke_with_args( - &self, - _args: ScalarFunctionArgs, - ) -> Result { - internal_err!( - "invoke_with_args should not be called on `{}`", - self.name() - ) - } - - fn simplify( - &self, - args: Vec, - info: &SimplifyContext, - ) -> Result { - let [ts] = take_function_args(self.name(), args)?; - Ok(ExprSimplifyResult::Simplified( - ts.cast_to( - &DataType::Timestamp($time_unit, Some("UTC".into())), - info.schema(), - )? - .cast_to(&DataType::Int64, info.schema())?, - )) - } - } - }; -} + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields[0].is_nullable(); + Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable))) + } -define_unix_timestamp_udf!(SparkUnixMicros, "unix_micros", TimeUnit::Microsecond); -define_unix_timestamp_udf!(SparkUnixMillis, "unix_millis", TimeUnit::Millisecond); -define_unix_timestamp_udf!(SparkUnixSeconds, "unix_seconds", TimeUnit::Second); + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("invoke_with_args should not be called on `{}`", self.name()) + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [ts] = take_function_args(self.name(), args)?; + Ok(ExprSimplifyResult::Simplified( + ts.cast_to( + &DataType::Timestamp(self.time_unit, Some("UTC".into())), + info.schema(), + )? + .cast_to(&DataType::Int64, info.schema())?, + )) + } +} From 978514ae87f8d26cb57b09ee6e4e67f21e52cff2 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Tue, 27 Jan 2026 15:38:43 +0400 Subject: [PATCH 4/4] fix(spark): update error messages for unix date and timestamp functions to clarify type requirements --- .../sqllogictest/test_files/spark/datetime/unix.slt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/unix.slt b/datafusion/sqllogictest/test_files/spark/datetime/unix.slt index f6e057bb8d664..d7441f487d037 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/unix.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/unix.slt @@ -38,7 +38,7 @@ SELECT unix_date(NULL::date); ---- NULL -query error Expect TypeSignatureClass::Native\(LogicalType\(Native\(Date\), Date\)\) but received NativeType::String, DataType: Utf8View +query error Function 'unix_date' requires TypeSignatureClass::Native\(LogicalType\(Native\(Date\), Date\)\), but received String \(DataType: Utf8View\) SELECT unix_date('1970-01-02'::string); # Unix Micro Tests @@ -68,7 +68,7 @@ SELECT unix_micros(NULL::timestamp); ---- NULL -query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +query error Function 'unix_micros' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\) SELECT unix_micros('1970-01-01 00:00:01Z'::string); @@ -99,7 +99,7 @@ SELECT unix_millis(NULL::timestamp); ---- NULL -query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +query error Function 'unix_millis' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\) SELECT unix_millis('1970-01-01 00:00:01Z'::string); @@ -130,5 +130,5 @@ SELECT unix_seconds(NULL::timestamp); ---- NULL -query error Expect TypeSignatureClass::Timestamp but received NativeType::String, DataType: Utf8View +query error Function 'unix_seconds' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\) SELECT unix_seconds('1970-01-01 00:00:01Z'::string);