Skip to content

Commit

Permalink
to_timestamp functions should preserve timezone (#11038)
Browse files Browse the repository at this point in the history
* to_timestamp functions should preserve timezone

* add tests for to_timestamp timezone preservation
  • Loading branch information
maxburke authored Jun 24, 2024
1 parent 459afbb commit e266018
Showing 1 changed file with 150 additions and 10 deletions.
160 changes: 150 additions & 10 deletions datafusion/functions/src/datetime/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ impl ScalarUDFImpl for ToTimestampFunc {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Timestamp(Nanosecond, None))
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Timestamp(_, Some(tz)) => {
Ok(Timestamp(Nanosecond, Some(tz.clone())))
}
_ => Ok(Timestamp(Nanosecond, None)),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -167,6 +172,9 @@ impl ScalarUDFImpl for ToTimestampFunc {
DataType::Null | DataType::Float64 | Timestamp(_, None) => {
args[0].cast_to(&Timestamp(Nanosecond, None), None)
}
DataType::Timestamp(_, Some(tz)) => {
args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None)
}
DataType::Utf8 => {
to_timestamp_impl::<TimestampNanosecondType>(args, "to_timestamp")
}
Expand All @@ -193,8 +201,11 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Timestamp(Second, None))
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Timestamp(_, Some(tz)) => Ok(Timestamp(Second, Some(tz.clone()))),
_ => Ok(Timestamp(Second, None)),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -214,6 +225,9 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc {
DataType::Null | DataType::Int32 | DataType::Int64 | Timestamp(_, None) => {
args[0].cast_to(&Timestamp(Second, None), None)
}
DataType::Timestamp(_, Some(tz)) => {
args[0].cast_to(&Timestamp(Second, Some(tz)), None)
}
DataType::Utf8 => {
to_timestamp_impl::<TimestampSecondType>(args, "to_timestamp_seconds")
}
Expand All @@ -240,8 +254,13 @@ impl ScalarUDFImpl for ToTimestampMillisFunc {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Timestamp(Millisecond, None))
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Timestamp(_, Some(tz)) => {
Ok(Timestamp(Millisecond, Some(tz.clone())))
}
_ => Ok(Timestamp(Millisecond, None)),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -261,6 +280,9 @@ impl ScalarUDFImpl for ToTimestampMillisFunc {
DataType::Null | DataType::Int32 | DataType::Int64 | Timestamp(_, None) => {
args[0].cast_to(&Timestamp(Millisecond, None), None)
}
DataType::Timestamp(_, Some(tz)) => {
args[0].cast_to(&Timestamp(Millisecond, Some(tz)), None)
}
DataType::Utf8 => {
to_timestamp_impl::<TimestampMillisecondType>(args, "to_timestamp_millis")
}
Expand All @@ -287,8 +309,13 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Timestamp(Microsecond, None))
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Timestamp(_, Some(tz)) => {
Ok(Timestamp(Microsecond, Some(tz.clone())))
}
_ => Ok(Timestamp(Microsecond, None)),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -308,6 +335,9 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc {
DataType::Null | DataType::Int32 | DataType::Int64 | Timestamp(_, None) => {
args[0].cast_to(&Timestamp(Microsecond, None), None)
}
DataType::Timestamp(_, Some(tz)) => {
args[0].cast_to(&Timestamp(Microsecond, Some(tz)), None)
}
DataType::Utf8 => {
to_timestamp_impl::<TimestampMicrosecondType>(args, "to_timestamp_micros")
}
Expand All @@ -334,8 +364,13 @@ impl ScalarUDFImpl for ToTimestampNanosFunc {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Timestamp(Nanosecond, None))
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Timestamp(_, Some(tz)) => {
Ok(Timestamp(Nanosecond, Some(tz.clone())))
}
_ => Ok(Timestamp(Nanosecond, None)),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -355,6 +390,9 @@ impl ScalarUDFImpl for ToTimestampNanosFunc {
DataType::Null | DataType::Int32 | DataType::Int64 | Timestamp(_, None) => {
args[0].cast_to(&Timestamp(Nanosecond, None), None)
}
DataType::Timestamp(_, Some(tz)) => {
args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None)
}
DataType::Utf8 => {
to_timestamp_impl::<TimestampNanosecondType>(args, "to_timestamp_nanos")
}
Expand Down Expand Up @@ -740,6 +778,103 @@ mod tests {
}
}

#[test]
fn test_tz() {
let udfs: Vec<Box<dyn ScalarUDFImpl>> = vec![
Box::new(ToTimestampFunc::new()),
Box::new(ToTimestampSecondsFunc::new()),
Box::new(ToTimestampMillisFunc::new()),
Box::new(ToTimestampNanosFunc::new()),
Box::new(ToTimestampSecondsFunc::new()),
];

let mut nanos_builder = TimestampNanosecondArray::builder(2);
let mut millis_builder = TimestampMillisecondArray::builder(2);
let mut micros_builder = TimestampMicrosecondArray::builder(2);
let mut sec_builder = TimestampSecondArray::builder(2);

nanos_builder.append_value(1599572549190850000);
millis_builder.append_value(1599572549190);
micros_builder.append_value(1599572549190850);
sec_builder.append_value(1599572549);

let nanos_timestamps =
Arc::new(nanos_builder.finish().with_timezone("UTC")) as ArrayRef;
let millis_timestamps =
Arc::new(millis_builder.finish().with_timezone("UTC")) as ArrayRef;
let micros_timestamps =
Arc::new(micros_builder.finish().with_timezone("UTC")) as ArrayRef;
let sec_timestamps =
Arc::new(sec_builder.finish().with_timezone("UTC")) as ArrayRef;

let arrays = &[
ColumnarValue::Array(nanos_timestamps.clone()),
ColumnarValue::Array(millis_timestamps.clone()),
ColumnarValue::Array(micros_timestamps.clone()),
ColumnarValue::Array(sec_timestamps.clone()),
];

for udf in &udfs {
for array in arrays {
let rt = udf.return_type(&[array.data_type()]).unwrap();
assert!(matches!(rt, DataType::Timestamp(_, Some(_))));

let res = udf
.invoke(&[array.clone()])
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
_ => panic!("Expected a columnar array"),
};
let ty = array.data_type();
assert!(matches!(ty, DataType::Timestamp(_, Some(_))));
}
}

let mut nanos_builder = TimestampNanosecondArray::builder(2);
let mut millis_builder = TimestampMillisecondArray::builder(2);
let mut micros_builder = TimestampMicrosecondArray::builder(2);
let mut sec_builder = TimestampSecondArray::builder(2);
let mut i64_builder = Int64Array::builder(2);

nanos_builder.append_value(1599572549190850000);
millis_builder.append_value(1599572549190);
micros_builder.append_value(1599572549190850);
sec_builder.append_value(1599572549);
i64_builder.append_value(1599572549);

let nanos_timestamps = Arc::new(nanos_builder.finish()) as ArrayRef;
let millis_timestamps = Arc::new(millis_builder.finish()) as ArrayRef;
let micros_timestamps = Arc::new(micros_builder.finish()) as ArrayRef;
let sec_timestamps = Arc::new(sec_builder.finish()) as ArrayRef;
let i64_timestamps = Arc::new(i64_builder.finish()) as ArrayRef;

let arrays = &[
ColumnarValue::Array(nanos_timestamps.clone()),
ColumnarValue::Array(millis_timestamps.clone()),
ColumnarValue::Array(micros_timestamps.clone()),
ColumnarValue::Array(sec_timestamps.clone()),
ColumnarValue::Array(i64_timestamps.clone()),
];

for udf in &udfs {
for array in arrays {
let rt = udf.return_type(&[array.data_type()]).unwrap();
assert!(matches!(rt, DataType::Timestamp(_, None)));

let res = udf
.invoke(&[array.clone()])
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
_ => panic!("Expected a columnar array"),
};
let ty = array.data_type();
assert!(matches!(ty, DataType::Timestamp(_, None)));
}
}
}

#[test]
fn test_to_timestamp_arg_validation() {
let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
Expand Down Expand Up @@ -811,6 +946,11 @@ mod tests {
.expect("that to_timestamp with format args parsed values without error");
if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
assert_eq!(parsed_array.len(), 1);
assert!(matches!(
parsed_array.data_type(),
DataType::Timestamp(_, None)
));

match time_unit {
Nanosecond => {
assert_eq!(nanos_expected_timestamps, parsed_array.as_ref())
Expand Down

0 comments on commit e266018

Please sign in to comment.