From d3d6a75390a1cebb95429d11a66f507bdf83b7f3 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Fri, 16 Jan 2026 11:52:43 +0400 Subject: [PATCH 1/7] feat(spark): implement from_utc_timestamp function --- .../function/datetime/from_utc_timestamp.rs | 130 ++++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 11 ++ .../spark/datetime/from_utc_timestamp.slt | 68 +++++++++ 3 files changed, 209 insertions(+) create mode 100644 datafusion/spark/src/function/datetime/from_utc_timestamp.rs create mode 100644 datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt diff --git a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs new file mode 100644 index 000000000000..7c8de9c61dc9 --- /dev/null +++ b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use datafusion_common::types::{NativeType, logical_string}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, TypeSignatureClass, Volatility, lit, +}; +use datafusion_functions::datetime::to_local_time; + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkFromUtcTimestamp { + signature: Signature, +} + +impl SparkFromUtcTimestamp { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![ + Coercion::new_implicit( + TypeSignatureClass::Timestamp, + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Timestamp(TimeUnit::Microsecond, None), + ), + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkFromUtcTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "from_utc_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new( + self.name(), + args.arg_fields[0].data_type().clone(), + nullable, + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("`from_utc_timestamp` should be simplified away before execution") + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?; + + match timezone.as_literal() { + Some(ScalarValue::Utf8(timezone_opt)) + | Some(ScalarValue::LargeUtf8(timezone_opt)) + | Some(ScalarValue::Utf8View(timezone_opt)) => match timezone_opt { + Some(timezone) => { + let strip_timezone = Expr::ScalarFunction(ScalarFunction::new_udf( + to_local_time(), + vec![timestamp], + )); + + let cast_to_utc = strip_timezone.cast_to( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + info.schema(), + )?; + + let cast_to_timezone = cast_to_utc.cast_to( + &DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from(timezone.to_string())), + ), + info.schema(), + )?; + + Ok(ExprSimplifyResult::Simplified(cast_to_timezone)) + } + None => { + let timestamp_type = info.get_data_type(×tamp)?; + Ok(ExprSimplifyResult::Simplified(lit( + ScalarValue::Null.cast_to(×tamp_type)? + ))) + } + }, + _ => Ok(ExprSimplifyResult::Original(vec![timestamp, timezone])), + } + } +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 7d6c9e9493bf..dac16ebe5947 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -22,6 +22,7 @@ pub mod date_part; pub mod date_sub; pub mod date_trunc; pub mod extract; +pub mod from_utc_timestamp; pub mod last_day; pub mod make_dt_interval; pub mod make_interval; @@ -39,6 +40,10 @@ make_udf_function!(date_diff::SparkDateDiff, date_diff); make_udf_function!(date_part::SparkDatePart, date_part); make_udf_function!(date_sub::SparkDateSub, date_sub); make_udf_function!(date_trunc::SparkDateTrunc, date_trunc); +make_udf_function!( + from_utc_timestamp::SparkFromUtcTimestamp, + from_utc_timestamp +); make_udf_function!(extract::SparkHour, hour); make_udf_function!(extract::SparkMinute, minute); make_udf_function!(extract::SparkSecond, second); @@ -125,6 +130,11 @@ pub mod expr_fn { "Extracts a part of the date or time from a date, time, or timestamp expression.", arg1 arg2 )); + export_functions!(( + from_utc_timestamp, + "Interpret a given timestamp `ts` in UTC timezone and then convert it to timezone `tz`.", + ts tz + )); } pub fn functions() -> Vec> { @@ -135,6 +145,7 @@ pub fn functions() -> Vec> { date_part(), date_sub(), date_trunc(), + from_utc_timestamp(), hour(), last_day(), make_dt_interval(), diff --git a/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt b/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt new file mode 100644 index 000000000000..804630b1732d --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query P +SELECT from_utc_timestamp('2016-08-31'::string, 'Asia/Seoul'::string); +---- +2016-08-31T09:00:00+09:00 + +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'Asia/Seoul'::string); +---- +2018-03-13T13:18:23+09:00 + +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+00:00'::string, 'Asia/Seoul'::string); +---- +2018-03-13T15:18:23+09:00 + +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string); +---- +2018-03-13T13:18:23+09:00 + +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+00:00'::timestamp, 'Asia/Seoul'::string); +---- +2018-03-13T15:18:23+09:00 + +query P +SELECT from_utc_timestamp(NULL::string, 'Asia/Seoul'::string); +---- +NULL + +query P +SELECT from_utc_timestamp('2016-08-31'::string, NULL::string); +---- +NULL + + +query P +SELECT from_utc_timestamp(column1, column2) +FROM VALUES +('2016-08-31'::string, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::string, 'Asia/Tokyo'::string), +('2018-03-13T06:18:23+00:00'::string, 'America/New_York'::string), +(NULL::string, 'Asia/Seoul'::string), +('2016-08-31'::string, NULL::string); +---- +2016-08-31T09:00:00+09:00 +2018-03-13T13:18:23+09:00 +2018-03-13T15:18:23+09:00 +NULL +NULL + From a5c759aaecc9ad33d1fbe148b71e58209f0586fb Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sun, 18 Jan 2026 16:38:52 +0400 Subject: [PATCH 2/7] feat: make `adjust_to_local_time` public for broader accessibility --- datafusion/functions/src/datetime/to_local_time.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index 86c949711d01..0500497a15fa 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -324,7 +324,7 @@ fn to_local_time(time_value: &ColumnarValue) -> Result { /// ``` /// /// See `test_adjust_to_local_time()` for example -fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { +pub fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { fn convert_timestamp(ts: i64, converter: F) -> Result> where F: Fn(i64) -> MappedLocalTime>, From 28244eb051de943f68d04f05832de80fe7a2b622 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sun, 18 Jan 2026 17:01:37 +0400 Subject: [PATCH 3/7] feat(spark): add from_utc_timestamp function --- .../function/datetime/from_utc_timestamp.rs | 183 +++++++++++++----- .../spark/datetime/from_utc_timestamp.slt | 110 +++++++++-- 2 files changed, 231 insertions(+), 62 deletions(-) diff --git a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs index 7c8de9c61dc9..9bdb7a62461d 100644 --- a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs @@ -15,27 +15,62 @@ // specific language governing permissions and limitations // under the License. +//! Spark-compatible `from_utc_timestamp` function. +//! +//! This function interprets a timestamp as UTC and converts it to a specified timezone. +//! Unlike DataFusion's `to_local_time` which strips the timezone from the result, +//! `from_utc_timestamp` preserves the original timezone annotation on the output. +//! +//! # How it works +//! +//! The function reuses [`adjust_to_local_time`] from DataFusion's `to_local_time` module +//! to perform the actual timestamp adjustment. The key insight is that both functions +//! need to add the timezone offset to the underlying UTC timestamp value. +//! +//! For example, given a timestamp `2024-01-15T10:00:00Z` (UTC) and target timezone +//! `America/New_York` (UTC-5 in winter): +//! +//! 1. The input timestamp is stored as a UTC value (e.g., `1705312800` seconds) +//! 2. `adjust_to_local_time` calculates the offset for `America/New_York` (-5 hours) +//! 3. The offset is added to get the local time value (`1705312800 + (-18000)`) +//! 4. The result represents `2024-01-15T05:00:00` in the target timezone + use std::any::Any; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType}; +use arrow::datatypes::TimeUnit; +use arrow::datatypes::{ + ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; use datafusion_common::types::{NativeType, logical_string}; use datafusion_common::utils::take_function_args; -use datafusion_common::{Result, ScalarValue, internal_err}; -use datafusion_expr::expr::ScalarFunction; -use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_common::{Result, exec_datafusion_err, exec_err, internal_err}; use datafusion_expr::{ - Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, - ScalarUDFImpl, Signature, TypeSignatureClass, Volatility, lit, + Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, TypeSignatureClass, Volatility, }; -use datafusion_functions::datetime::to_local_time; - -/// +use datafusion_functions::datetime::to_local_time::adjust_to_local_time; +use datafusion_functions::utils::make_scalar_function; + +/// Spark `from_utc_timestamp` function. +/// +/// Interprets the given timestamp as UTC and converts it to the given timezone. +/// +/// See #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkFromUtcTimestamp { signature: Signature, } +impl Default for SparkFromUtcTimestamp { + fn default() -> Self { + Self::new() + } +} + impl SparkFromUtcTimestamp { pub fn new() -> Self { Self { @@ -81,50 +116,96 @@ impl ScalarUDFImpl for SparkFromUtcTimestamp { ))) } - fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { - internal_err!("`from_utc_timestamp` should be simplified away before execution") + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_from_utc_timestamp, vec![])(&args.args) + } +} + +fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result { + let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?; + + match (timestamp.data_type(), timezone.data_type()) { + (DataType::Timestamp(TimeUnit::Nanosecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Microsecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Millisecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Second, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (ts_type, _) => { + exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}") + } + } +} + +fn process_timestamp_with_tz_array( + ts_array: &ArrayRef, + tz_array: &ArrayRef, + tz_opt: Option>, +) -> Result { + match tz_array.data_type() { + DataType::Utf8 => { + process_arrays::(tz_opt, ts_array, tz_array.as_string::()) + } + DataType::LargeUtf8 => { + process_arrays::(tz_opt, ts_array, tz_array.as_string::()) + } + DataType::Utf8View => { + process_arrays::(tz_opt, ts_array, tz_array.as_string_view()) + } + other => { + exec_err!("`from_utc_timestamp`: timezone must be a string type, got {other}") + } } +} - fn simplify( - &self, - args: Vec, - info: &SimplifyContext, - ) -> Result { - let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?; - - match timezone.as_literal() { - Some(ScalarValue::Utf8(timezone_opt)) - | Some(ScalarValue::LargeUtf8(timezone_opt)) - | Some(ScalarValue::Utf8View(timezone_opt)) => match timezone_opt { - Some(timezone) => { - let strip_timezone = Expr::ScalarFunction(ScalarFunction::new_udf( - to_local_time(), - vec![timestamp], - )); - - let cast_to_utc = strip_timezone.cast_to( - &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - info.schema(), - )?; - - let cast_to_timezone = cast_to_utc.cast_to( - &DataType::Timestamp( - TimeUnit::Microsecond, - Some(Arc::from(timezone.to_string())), - ), - info.schema(), - )?; - - Ok(ExprSimplifyResult::Simplified(cast_to_timezone)) - } - None => { - let timestamp_type = info.get_data_type(×tamp)?; - Ok(ExprSimplifyResult::Simplified(lit( - ScalarValue::Null.cast_to(×tamp_type)? - ))) - } - }, - _ => Ok(ExprSimplifyResult::Original(vec![timestamp, timezone])), +fn process_arrays<'a, T: ArrowTimestampType, S>( + return_tz_opt: Option>, + ts_array: &ArrayRef, + tz_array: &'a S, +) -> Result +where + &'a S: StringArrayType<'a>, +{ + let ts_primitive = ts_array.as_primitive::(); + let mut builder = PrimitiveBuilder::::with_capacity(ts_array.len()); + + for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) { + match (ts_opt, tz_opt) { + (Some(ts), Some(tz_str)) => { + let tz: Tz = tz_str.parse().map_err(|e| { + exec_datafusion_err!( + "`from_utc_timestamp`: invalid timezone '{tz_str}': {e}" + ) + })?; + let val = adjust_to_local_time::(ts, tz)?; + builder.append_value(val); + } + _ => builder.append_null(), } } + + builder = builder.with_timezone_opt(return_tz_opt); + Ok(Arc::new(builder.finish())) } diff --git a/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt b/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt index 804630b1732d..5a39bda0a651 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/from_utc_timestamp.slt @@ -15,54 +15,142 @@ # specific language governing permissions and limitations # under the License. +# String inputs +query P +SELECT from_utc_timestamp('2016-08-31'::string, 'UTC'::string); +---- +2016-08-31T00:00:00 + query P SELECT from_utc_timestamp('2016-08-31'::string, 'Asia/Seoul'::string); ---- -2016-08-31T09:00:00+09:00 +2016-08-31T09:00:00 + +query P +SELECT from_utc_timestamp('2016-08-31'::string, 'America/New_York'::string); +---- +2016-08-30T20:00:00 + +# String inputs with offsets +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'UTC'::string); +---- +2018-03-13T04:18:23 query P SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'Asia/Seoul'::string); ---- -2018-03-13T13:18:23+09:00 +2018-03-13T13:18:23 query P -SELECT from_utc_timestamp('2018-03-13T06:18:23+00:00'::string, 'Asia/Seoul'::string); +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'America/New_York'::string); ---- -2018-03-13T15:18:23+09:00 +2018-03-13T00:18:23 + +# Timestamp inputs +query P +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string); +---- +2018-03-13T04:18:23 query P SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string); ---- -2018-03-13T13:18:23+09:00 +2018-03-13T13:18:23 query P -SELECT from_utc_timestamp('2018-03-13T06:18:23+00:00'::timestamp, 'Asia/Seoul'::string); +SELECT from_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string); ---- -2018-03-13T15:18:23+09:00 +2018-03-13T00:18:23 +# Null inputs query P SELECT from_utc_timestamp(NULL::string, 'Asia/Seoul'::string); ---- NULL query P -SELECT from_utc_timestamp('2016-08-31'::string, NULL::string); +SELECT from_utc_timestamp(NULL::timestamp, 'Asia/Seoul'::string); ---- NULL +query P +SELECT from_utc_timestamp('2016-08-31'::string, NULL::string); +---- +NULL query P SELECT from_utc_timestamp(column1, column2) FROM VALUES ('2016-08-31'::string, 'Asia/Seoul'::string), -('2018-03-13T06:18:23+02:00'::string, 'Asia/Tokyo'::string), -('2018-03-13T06:18:23+00:00'::string, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::string, 'Asia/Seoul'::string), +('2016-08-31'::string, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::string, 'UTC'::string), +('2016-08-31'::string, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::string, 'America/New_York'::string), (NULL::string, 'Asia/Seoul'::string), ('2016-08-31'::string, NULL::string); ---- +2016-08-31T09:00:00 +2018-03-13T13:18:23 +2016-08-31T00:00:00 +2018-03-13T04:18:23 +2016-08-30T20:00:00 +2018-03-13T00:18:23 +NULL +NULL + +query P +SELECT from_utc_timestamp(column1, column2) +FROM VALUES +('2016-08-31'::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string), +('2016-08-31'::timestamp, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string), +('2016-08-31'::timestamp, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string), +(NULL::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+00:00'::timestamp, NULL::string); +---- +2016-08-31T09:00:00 +2018-03-13T13:18:23 +2016-08-31T00:00:00 +2018-03-13T04:18:23 +2016-08-30T20:00:00 +2018-03-13T00:18:23 +NULL +NULL + +query P +SELECT from_utc_timestamp(arrow_cast(column1, 'Timestamp(Microsecond, Some("Asia/Seoul"))'), column2) +FROM VALUES +('2016-08-31'::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string), +('2016-08-31'::timestamp, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string), +('2016-08-31'::timestamp, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string), +(NULL::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+00:00'::timestamp, NULL::string); +---- 2016-08-31T09:00:00+09:00 2018-03-13T13:18:23+09:00 -2018-03-13T15:18:23+09:00 +2016-08-31T00:00:00+09:00 +2018-03-13T04:18:23+09:00 +2016-08-30T20:00:00+09:00 +2018-03-13T00:18:23+09:00 NULL NULL + +# DST edge cases +query P +SELECT from_utc_timestamp('2020-03-31T13:40:00'::timestamp, 'America/New_York'::string); +---- +2020-03-31T09:40:00 + + +query P +SELECT from_utc_timestamp('2020-11-04T14:06:40'::timestamp, 'America/New_York'::string); +---- +2020-11-04T09:06:40 From 5986ff5b1d673c4e857abd1ab259a444b3b798e8 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 19 Jan 2026 08:54:58 +0400 Subject: [PATCH 4/7] feat(spark): add to_utc_timestamp function --- .../function/datetime/from_utc_timestamp.rs | 4 +- datafusion/spark/src/function/datetime/mod.rs | 8 + .../src/function/datetime/to_utc_timestamp.rs | 229 ++++++++++++++++++ .../spark/datetime/to_utc_timestamp.slt | 150 +++++++++++- 4 files changed, 379 insertions(+), 12 deletions(-) create mode 100644 datafusion/spark/src/function/datetime/to_utc_timestamp.rs diff --git a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs index 9bdb7a62461d..484184eec977 100644 --- a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Spark-compatible `from_utc_timestamp` function. +//! Apache Spark-compatible `from_utc_timestamp` function. //! //! This function interprets a timestamp as UTC and converts it to a specified timezone. //! Unlike DataFusion's `to_local_time` which strips the timezone from the result, @@ -55,7 +55,7 @@ use datafusion_expr::{ use datafusion_functions::datetime::to_local_time::adjust_to_local_time; use datafusion_functions::utils::make_scalar_function; -/// Spark `from_utc_timestamp` function. +/// Apache Spark `from_utc_timestamp` function. /// /// Interprets the given timestamp as UTC and converts it to the given timezone. /// diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index dac16ebe5947..3535df963e20 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -28,6 +28,7 @@ pub mod make_dt_interval; pub mod make_interval; pub mod next_day; pub mod time_trunc; +pub mod to_utc_timestamp; pub mod trunc; use datafusion_expr::ScalarUDF; @@ -52,6 +53,7 @@ make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval); make_udf_function!(make_interval::SparkMakeInterval, make_interval); make_udf_function!(next_day::SparkNextDay, next_day); make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc); +make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp); make_udf_function!(trunc::SparkTrunc, trunc); pub mod expr_fn { @@ -135,6 +137,11 @@ pub mod expr_fn { "Interpret a given timestamp `ts` in UTC timezone and then convert it to timezone `tz`.", ts tz )); + export_functions!(( + to_utc_timestamp, + "Interpret a given timestamp `ts` in timezone `tz` and then convert it to UTC timezone.", + ts tz + )); } pub fn functions() -> Vec> { @@ -154,6 +161,7 @@ pub fn functions() -> Vec> { next_day(), second(), time_trunc(), + to_utc_timestamp(), trunc(), ] } diff --git a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs new file mode 100644 index 000000000000..38ebccbef225 --- /dev/null +++ b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Sub; +use std::sync::Arc; + +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType}; +use arrow::datatypes::TimeUnit; +use arrow::datatypes::{ + ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; +use chrono::{DateTime, Offset, TimeDelta, TimeZone}; +use datafusion_common::types::{NativeType, logical_string}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{ + Result, exec_datafusion_err, exec_err, internal_datafusion_err, internal_err, +}; +use datafusion_expr::{ + Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, TypeSignatureClass, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +/// Apache Spark `to_utc_timestamp` function. +/// +/// Interprets the given timestamp in the provided timezone and then converts it to UTC. +/// +/// See +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkToUtcTimestamp { + signature: Signature, +} + +impl Default for SparkToUtcTimestamp { + fn default() -> Self { + Self::new() + } +} + +impl SparkToUtcTimestamp { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![ + Coercion::new_implicit( + TypeSignatureClass::Timestamp, + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Timestamp(TimeUnit::Microsecond, None), + ), + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkToUtcTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_utc_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new( + self.name(), + args.arg_fields[0].data_type().clone(), + nullable, + ))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(to_utc_timestamp, vec![])(&args.args) + } +} + +fn to_utc_timestamp(args: &[ArrayRef]) -> Result { + let [timestamp, timezone] = take_function_args("to_utc_timestamp", args)?; + + match (timestamp.data_type(), timezone.data_type()) { + (DataType::Timestamp(TimeUnit::Nanosecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Microsecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Millisecond, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (DataType::Timestamp(TimeUnit::Second, tz_opt), _) => { + process_timestamp_with_tz_array::( + timestamp, + timezone, + tz_opt.clone(), + ) + } + (ts_type, _) => { + exec_err!("`to_utc_timestamp`: unsupported argument types: {ts_type}") + } + } +} + +fn process_timestamp_with_tz_array( + ts_array: &ArrayRef, + tz_array: &ArrayRef, + tz_opt: Option>, +) -> Result { + match tz_array.data_type() { + DataType::Utf8 => { + process_arrays::(tz_opt, ts_array, tz_array.as_string::()) + } + DataType::LargeUtf8 => { + process_arrays::(tz_opt, ts_array, tz_array.as_string::()) + } + DataType::Utf8View => { + process_arrays::(tz_opt, ts_array, tz_array.as_string_view()) + } + other => { + exec_err!("`to_utc_timestamp`: timezone must be a string type, got {other}") + } + } +} + +fn process_arrays<'a, T: ArrowTimestampType, S>( + return_tz_opt: Option>, + ts_array: &ArrayRef, + tz_array: &'a S, +) -> Result +where + &'a S: StringArrayType<'a>, +{ + let ts_primitive = ts_array.as_primitive::(); + let mut builder = PrimitiveBuilder::::with_capacity(ts_array.len()); + + for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) { + match (ts_opt, tz_opt) { + (Some(ts), Some(tz_str)) => { + let tz: Tz = tz_str.parse().map_err(|e| { + exec_datafusion_err!( + "`to_utc_timestamp`: invalid timezone '{tz_str}': {e}" + ) + })?; + let val = adjust_to_utc_time::(ts, tz)?; + builder.append_value(val); + } + _ => builder.append_null(), + } + } + + builder = builder.with_timezone_opt(return_tz_opt); + Ok(Arc::new(builder.finish())) +} + +fn adjust_to_utc_time(ts: i64, tz: Tz) -> Result { + let date_time = match T::UNIT { + TimeUnit::Nanosecond => Some(DateTime::from_timestamp_nanos(ts)), + TimeUnit::Microsecond => DateTime::from_timestamp_micros(ts), + TimeUnit::Millisecond => DateTime::from_timestamp_millis(ts), + TimeUnit::Second => DateTime::from_timestamp(ts, 0), + } + .unwrap() + .with_timezone(&tz); + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time.naive_utc()) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = date_time.sub( + // This should not fail under normal circumstances as the + // maximum possible offset is 26 hours (93,600 seconds) + TimeDelta::try_seconds(offset_seconds) + .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, + ); + + // convert the naive datetime back to i64 + match T::UNIT { + TimeUnit::Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| + internal_datafusion_err!( + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" + ) + ), + TimeUnit::Microsecond => Ok(adjusted_date_time.timestamp_micros()), + TimeUnit::Millisecond => Ok(adjusted_date_time.timestamp_millis()), + TimeUnit::Second => Ok(adjusted_date_time.timestamp()), + } +} diff --git a/datafusion/sqllogictest/test_files/spark/datetime/to_utc_timestamp.slt b/datafusion/sqllogictest/test_files/spark/datetime/to_utc_timestamp.slt index 24693016be1a..086716e5bcd0 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/to_utc_timestamp.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/to_utc_timestamp.slt @@ -15,13 +15,143 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT to_utc_timestamp('2016-08-31', 'Asia/Seoul'); -## PySpark 3.5.5 Result: {'to_utc_timestamp(2016-08-31, Asia/Seoul)': datetime.datetime(2016, 8, 30, 15, 0), 'typeof(to_utc_timestamp(2016-08-31, Asia/Seoul))': 'timestamp', 'typeof(2016-08-31)': 'string', 'typeof(Asia/Seoul)': 'string'} -#query -#SELECT to_utc_timestamp('2016-08-31'::string, 'Asia/Seoul'::string); + +# String inputs +query P +SELECT to_utc_timestamp('2016-08-31'::string, 'UTC'::string); +---- +2016-08-31T00:00:00 + +query P +SELECT to_utc_timestamp('2016-08-31'::string, 'Asia/Seoul'::string); +---- +2016-08-30T15:00:00 + +query P +SELECT to_utc_timestamp('2016-08-31'::string, 'America/New_York'::string); +---- +2016-08-31T04:00:00 + +# String inputs with offsets +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'UTC'::string); +---- +2018-03-13T04:18:23 + +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'Asia/Seoul'::string); +---- +2018-03-12T19:18:23 + +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::string, 'America/New_York'::string); +---- +2018-03-13T08:18:23 + +# Timestamp inputs +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string); +---- +2018-03-13T04:18:23 + +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string); +---- +2018-03-12T19:18:23 + +query P +SELECT to_utc_timestamp('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string); +---- +2018-03-13T08:18:23 + +# Null inputs +query P +SELECT to_utc_timestamp(NULL::string, 'Asia/Seoul'::string); +---- +NULL + +query P +SELECT to_utc_timestamp(NULL::timestamp, 'Asia/Seoul'::string); +---- +NULL + +query P +SELECT to_utc_timestamp('2016-08-31'::string, NULL::string); +---- +NULL + +query P +SELECT to_utc_timestamp(column1, column2) +FROM VALUES +('2016-08-31'::string, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::string, 'Asia/Seoul'::string), +('2016-08-31'::string, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::string, 'UTC'::string), +('2016-08-31'::string, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::string, 'America/New_York'::string), +(NULL::string, 'Asia/Seoul'::string), +('2016-08-31'::string, NULL::string); +---- +2016-08-30T15:00:00 +2018-03-12T19:18:23 +2016-08-31T00:00:00 +2018-03-13T04:18:23 +2016-08-31T04:00:00 +2018-03-13T08:18:23 +NULL +NULL + +query P +SELECT to_utc_timestamp(column1, column2) +FROM VALUES +('2016-08-31'::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string), +('2016-08-31'::timestamp, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string), +('2016-08-31'::timestamp, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string), +(NULL::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+00:00'::timestamp, NULL::string); +---- +2016-08-30T15:00:00 +2018-03-12T19:18:23 +2016-08-31T00:00:00 +2018-03-13T04:18:23 +2016-08-31T04:00:00 +2018-03-13T08:18:23 +NULL +NULL + +query P +SELECT to_utc_timestamp(arrow_cast(column1, 'Timestamp(Microsecond, Some("Asia/Seoul"))'), column2) +FROM VALUES +('2016-08-31'::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'Asia/Seoul'::string), +('2016-08-31'::timestamp, 'UTC'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'UTC'::string), +('2016-08-31'::timestamp, 'America/New_York'::string), +('2018-03-13T06:18:23+02:00'::timestamp, 'America/New_York'::string), +(NULL::timestamp, 'Asia/Seoul'::string), +('2018-03-13T06:18:23+00:00'::timestamp, NULL::string); +---- +2016-08-30T15:00:00+09:00 +2018-03-12T19:18:23+09:00 +2016-08-31T00:00:00+09:00 +2018-03-13T04:18:23+09:00 +2016-08-31T04:00:00+09:00 +2018-03-13T08:18:23+09:00 +NULL +NULL + + +# DST edge cases +query P +SELECT to_utc_timestamp('2020-03-31T13:40:00'::timestamp, 'America/New_York'::string); +---- +2020-03-31T17:40:00 + + +query P +SELECT to_utc_timestamp('2020-11-04T14:06:40'::timestamp, 'America/New_York'::string); +---- +2020-11-04T19:06:40 From 1870d102c7a41de87ce5351fb4aa7e51cdc58bb2 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 19 Jan 2026 09:40:23 +0400 Subject: [PATCH 5/7] docs: update documentation for `from_utc_timestamp` and `to_utc_timestamp` functions to clarify timestamp handling --- .../function/datetime/from_utc_timestamp.rs | 24 ++++--------------- .../src/function/datetime/to_utc_timestamp.rs | 4 ++++ 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs index 484184eec977..0c1b1828e5c2 100644 --- a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs @@ -15,26 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Apache Spark-compatible `from_utc_timestamp` function. -//! -//! This function interprets a timestamp as UTC and converts it to a specified timezone. -//! Unlike DataFusion's `to_local_time` which strips the timezone from the result, -//! `from_utc_timestamp` preserves the original timezone annotation on the output. -//! -//! # How it works -//! -//! The function reuses [`adjust_to_local_time`] from DataFusion's `to_local_time` module -//! to perform the actual timestamp adjustment. The key insight is that both functions -//! need to add the timezone offset to the underlying UTC timestamp value. -//! -//! For example, given a timestamp `2024-01-15T10:00:00Z` (UTC) and target timezone -//! `America/New_York` (UTC-5 in winter): -//! -//! 1. The input timestamp is stored as a UTC value (e.g., `1705312800` seconds) -//! 2. `adjust_to_local_time` calculates the offset for `America/New_York` (-5 hours) -//! 3. The offset is added to get the local time value (`1705312800 + (-18000)`) -//! 4. The result represents `2024-01-15T05:00:00` in the target timezone - use std::any::Any; use std::sync::Arc; @@ -59,6 +39,10 @@ use datafusion_functions::utils::make_scalar_function; /// /// Interprets the given timestamp as UTC and converts it to the given timezone. /// +/// Timestamp in Apache Spark represents number of microseconds from the Unix epoch, which is not +/// timezone-agnostic. So in Apache Spark this function just shift the timestamp value from UTC timezone to +/// the given timezone. +/// /// See #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkFromUtcTimestamp { diff --git a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs index 38ebccbef225..41832c632ea2 100644 --- a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs @@ -42,6 +42,10 @@ use datafusion_functions::utils::make_scalar_function; /// /// Interprets the given timestamp in the provided timezone and then converts it to UTC. /// +/// Timestamp in Apache Spark represents number of microseconds from the Unix epoch, which is not +/// timezone-agnostic. So in Apache Spark this function just shift the timestamp value from the given +/// timezone to UTC timezone. +/// /// See #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkToUtcTimestamp { From 28bcea63f2bee4dd892e8f6dc74138904bc09a9e Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 19 Jan 2026 10:01:13 +0400 Subject: [PATCH 6/7] fix(spark): optimize timestamp adjustment in to_utc_timestamp function --- .../src/function/datetime/to_utc_timestamp.rs | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs index 41832c632ea2..32422b9d18b5 100644 --- a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::ops::Sub; use std::sync::Arc; use arrow::array::timezone::Tz; @@ -26,7 +25,7 @@ use arrow::datatypes::{ ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; -use chrono::{DateTime, Offset, TimeDelta, TimeZone}; +use chrono::{DateTime, Offset, TimeZone}; use datafusion_common::types::{NativeType, logical_string}; use datafusion_common::utils::take_function_args; use datafusion_common::{ @@ -198,36 +197,29 @@ where } fn adjust_to_utc_time(ts: i64, tz: Tz) -> Result { - let date_time = match T::UNIT { + let dt = match T::UNIT { TimeUnit::Nanosecond => Some(DateTime::from_timestamp_nanos(ts)), TimeUnit::Microsecond => DateTime::from_timestamp_micros(ts), TimeUnit::Millisecond => DateTime::from_timestamp_millis(ts), TimeUnit::Second => DateTime::from_timestamp(ts, 0), } - .unwrap() - .with_timezone(&tz); + .ok_or_else(|| internal_datafusion_err!("Invalid timestamp"))?; + let naive_dt = dt.naive_utc(); - let offset_seconds: i64 = tz - .offset_from_utc_datetime(&date_time.naive_utc()) + let offset_seconds = tz + .offset_from_utc_datetime(&naive_dt) .fix() .local_minus_utc() as i64; - let adjusted_date_time = date_time.sub( - // This should not fail under normal circumstances as the - // maximum possible offset is 26 hours (93,600 seconds) - TimeDelta::try_seconds(offset_seconds) - .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, - ); - - // convert the naive datetime back to i64 - match T::UNIT { - TimeUnit::Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| - internal_datafusion_err!( - "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" - ) - ), - TimeUnit::Microsecond => Ok(adjusted_date_time.timestamp_micros()), - TimeUnit::Millisecond => Ok(adjusted_date_time.timestamp_millis()), - TimeUnit::Second => Ok(adjusted_date_time.timestamp()), + let offset_in_unit = match T::UNIT { + TimeUnit::Nanosecond => offset_seconds.checked_mul(1_000_000_000), + TimeUnit::Microsecond => offset_seconds.checked_mul(1_000_000), + TimeUnit::Millisecond => offset_seconds.checked_mul(1_000), + TimeUnit::Second => Some(offset_seconds), } + .ok_or_else(|| internal_datafusion_err!("Offset overflow"))?; + + ts.checked_sub(offset_in_unit).ok_or_else(|| { + internal_datafusion_err!("Timestamp overflow during timezone adjustment") + }) } From 64c138ba59fee2d0942de2bb64e5092d25d72ad0 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sun, 25 Jan 2026 08:17:24 +0400 Subject: [PATCH 7/7] fix(spark): simplify match statements in from_utc_timestamp and to_utc_timestamp functions --- .../src/function/datetime/from_utc_timestamp.rs | 12 ++++++------ .../spark/src/function/datetime/to_utc_timestamp.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs index 0c1b1828e5c2..77cc66da5f37 100644 --- a/datafusion/spark/src/function/datetime/from_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/from_utc_timestamp.rs @@ -108,36 +108,36 @@ impl ScalarUDFImpl for SparkFromUtcTimestamp { fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result { let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?; - match (timestamp.data_type(), timezone.data_type()) { - (DataType::Timestamp(TimeUnit::Nanosecond, tz_opt), _) => { + match timestamp.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Microsecond, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Millisecond, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Second, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Second, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (ts_type, _) => { + ts_type => { exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}") } } diff --git a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs index 32422b9d18b5..0e8c267a390e 100644 --- a/datafusion/spark/src/function/datetime/to_utc_timestamp.rs +++ b/datafusion/spark/src/function/datetime/to_utc_timestamp.rs @@ -110,36 +110,36 @@ impl ScalarUDFImpl for SparkToUtcTimestamp { fn to_utc_timestamp(args: &[ArrayRef]) -> Result { let [timestamp, timezone] = take_function_args("to_utc_timestamp", args)?; - match (timestamp.data_type(), timezone.data_type()) { - (DataType::Timestamp(TimeUnit::Nanosecond, tz_opt), _) => { + match timestamp.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Microsecond, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Millisecond, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (DataType::Timestamp(TimeUnit::Second, tz_opt), _) => { + DataType::Timestamp(TimeUnit::Second, tz_opt) => { process_timestamp_with_tz_array::( timestamp, timezone, tz_opt.clone(), ) } - (ts_type, _) => { + ts_type => { exec_err!("`to_utc_timestamp`: unsupported argument types: {ts_type}") } }