From 77d844ce981c60f0d8c7af91263b9620f7d459fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=93=87=E5=91=9C=E5=93=87=E5=91=9C=E5=91=80=E5=92=A6?= =?UTF-8?q?=E8=80=B6?= Date: Wed, 2 Nov 2022 04:06:05 +0800 Subject: [PATCH] Implement `current_time` scalar function (#4054) * implement `current_time` * edit test case * fix nanosecond after midnight * fix: fmt Co-authored-by: pingao Co-authored-by: Andrew Lamb --- datafusion/core/tests/sql/timestamp.rs | 31 +++++++++++++++++++ datafusion/expr/src/built_in_function.rs | 5 +++ datafusion/expr/src/expr_fn.rs | 8 +++++ datafusion/expr/src/function.rs | 1 + .../physical-expr/src/datetime_expressions.rs | 13 ++++++++ datafusion/physical-expr/src/functions.rs | 6 ++++ datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/from_proto.rs | 1 + datafusion/proto/src/generated/pbjson.rs | 3 ++ datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/proto/src/to_proto.rs | 1 + 11 files changed, 72 insertions(+) diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index 8909502005381..5192b6cb56772 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -1651,3 +1651,34 @@ async fn test_current_date() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_current_time() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select current_time() dt"; + let results = execute_to_batches(&ctx, sql).await; + assert_eq!( + results[0] + .schema() + .field_with_name("dt") + .unwrap() + .data_type() + .to_owned(), + DataType::Time64(TimeUnit::Nanosecond) + ); + + let sql = "select case when current_time() = (now()::bigint % 86400000000000)::time then 'OK' else 'FAIL' end result"; + let results = execute_to_batches(&ctx, sql).await; + + let expected = vec![ + "+--------+", + "| result |", + "+--------+", + "| OK |", + "+--------+", + ]; + + assert_batches_eq!(expected, &results); + Ok(()) +} diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 796f0c90919c1..c8e144718ff7f 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -158,6 +158,8 @@ pub enum BuiltinScalarFunction { Now, ///current_date CurrentDate, + /// current_time + CurrentTime, /// translate Translate, /// trim @@ -181,6 +183,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Random | BuiltinScalarFunction::Now | BuiltinScalarFunction::CurrentDate + | BuiltinScalarFunction::CurrentTime ) } /// Returns the [Volatility] of the builtin function. @@ -259,6 +262,7 @@ impl BuiltinScalarFunction { // Stable builtin functions BuiltinScalarFunction::Now => Volatility::Stable, BuiltinScalarFunction::CurrentDate => Volatility::Stable, + BuiltinScalarFunction::CurrentTime => Volatility::Stable, // Volatile builtin functions BuiltinScalarFunction::Random => Volatility::Volatile, @@ -315,6 +319,7 @@ impl FromStr for BuiltinScalarFunction { "concat_ws" => BuiltinScalarFunction::ConcatWithSeparator, "chr" => BuiltinScalarFunction::Chr, "current_date" => BuiltinScalarFunction::CurrentDate, + "current_time" => BuiltinScalarFunction::CurrentTime, "date_part" | "datepart" => BuiltinScalarFunction::DatePart, "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc, "date_bin" => BuiltinScalarFunction::DateBin, diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index cfd043a3ae56e..006bcac5e9c0c 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -460,6 +460,14 @@ pub fn current_date() -> Expr { } } +/// Returns current UTC time as a [`DataType::Time64`] value +pub fn current_time() -> Expr { + Expr::ScalarFunction { + fun: BuiltinScalarFunction::CurrentTime, + args: vec![], + } +} + /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. pub fn case(expr: Expr) -> CaseBuilder { CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None) diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 4a47d820942ba..de5ab0e1a36d7 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -222,6 +222,7 @@ pub fn return_type( Some("UTC".to_owned()), )), BuiltinScalarFunction::CurrentDate => Ok(DataType::Date32), + BuiltinScalarFunction::CurrentTime => Ok(DataType::Time64(TimeUnit::Nanosecond)), BuiltinScalarFunction::Translate => { utf8_to_str_type(&input_expr_types[0], "translate") } diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 2a0cf3012a109..48f093a2fc75f 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -200,6 +200,19 @@ pub fn make_current_date( move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Date32(days))) } +/// Create an implementation of `current_time()` that always returns the +/// specified current time. +/// +/// The semantics of `current_time()` require it to return the same value +/// wherever it appears within a single statement. This value is +/// chosen during planning time. +pub fn make_current_time( + now_ts: DateTime, +) -> impl Fn(&[ColumnarValue]) -> Result { + let nano = Some(now_ts.timestamp_nanos() % 86400000000000); + move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64(nano))) +} + fn quarter_month(date: &NaiveDateTime) -> u32 { 1 + 3 * ((date.month() - 1) / 3) } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index f7c3cbef9798f..cd1d31544371e 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -433,6 +433,12 @@ pub fn create_physical_fun( execution_props.query_execution_start_time, )) } + BuiltinScalarFunction::CurrentTime => { + // bind value for current_time at plan time + Arc::new(datetime_expressions::make_current_time( + execution_props.query_execution_start_time, + )) + } BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::initcap::)(args) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 1ff2952df1f48..083f24502354b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -496,6 +496,7 @@ enum ScalarFunction { DateBin=68; ArrowTypeof=69; CurrentDate=70; + CurrentTime=71; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index cd61bba7a0aab..708fb51ecbcea 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -430,6 +430,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds, ScalarFunction::Now => Self::Now, ScalarFunction::CurrentDate => Self::CurrentDate, + ScalarFunction::CurrentTime => Self::CurrentTime, ScalarFunction::Translate => Self::Translate, ScalarFunction::RegexpMatch => Self::RegexpMatch, ScalarFunction::Coalesce => Self::Coalesce, diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 941cc461f5169..3557dee467e46 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -9621,6 +9621,7 @@ impl serde::Serialize for ScalarFunction { Self::DateBin => "DateBin", Self::ArrowTypeof => "ArrowTypeof", Self::CurrentDate => "CurrentDate", + Self::CurrentTime => "CurrentTime", }; serializer.serialize_str(variant) } @@ -9703,6 +9704,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "DateBin", "ArrowTypeof", "CurrentDate", + "CurrentTime", ]; struct GeneratedVisitor; @@ -9816,6 +9818,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "DateBin" => Ok(ScalarFunction::DateBin), "ArrowTypeof" => Ok(ScalarFunction::ArrowTypeof), "CurrentDate" => Ok(ScalarFunction::CurrentDate), + "CurrentTime" => Ok(ScalarFunction::CurrentTime), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 038195c9465f6..d404f02759e7c 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1225,6 +1225,7 @@ pub enum ScalarFunction { DateBin = 68, ArrowTypeof = 69, CurrentDate = 70, + CurrentTime = 71, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -1304,6 +1305,7 @@ impl ScalarFunction { ScalarFunction::DateBin => "DateBin", ScalarFunction::ArrowTypeof => "ArrowTypeof", ScalarFunction::CurrentDate => "CurrentDate", + ScalarFunction::CurrentTime => "CurrentTime", } } } diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index a089f0768b179..1aea38f3a1207 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1179,6 +1179,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds, BuiltinScalarFunction::Now => Self::Now, BuiltinScalarFunction::CurrentDate => Self::CurrentDate, + BuiltinScalarFunction::CurrentTime => Self::CurrentTime, BuiltinScalarFunction::Translate => Self::Translate, BuiltinScalarFunction::RegexpMatch => Self::RegexpMatch, BuiltinScalarFunction::Coalesce => Self::Coalesce,