From b5cc6b9c19b2d784d3907804be701a3d81b97756 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Tue, 7 May 2024 10:18:21 +0800 Subject: [PATCH] Minor: remove old `create_physical_expr` to `scalar_function` (#10387) * rm old code Signed-off-by: jayzhan211 * move to scalarfunction Signed-off-by: jayzhan211 * fix import Signed-off-by: jayzhan211 * Update datafusion/physical-expr/src/scalar_function.rs Co-authored-by: Andrew Lamb --------- Signed-off-by: jayzhan211 Co-authored-by: Andrew Lamb --- datafusion/core/src/execution/context/mod.rs | 3 +- datafusion/physical-expr/src/functions.rs | 354 +----------------- datafusion/physical-expr/src/lib.rs | 4 +- datafusion/physical-expr/src/planner.rs | 19 +- .../physical-expr/src/scalar_function.rs | 97 ++++- datafusion/physical-expr/src/udf.rs | 95 ----- 6 files changed, 116 insertions(+), 456 deletions(-) delete mode 100644 datafusion/physical-expr/src/udf.rs diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 35c5c6791023..d84983f08ec6 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -44,6 +44,7 @@ use crate::{ error::{DataFusionError, Result}, execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry}, logical_expr::AggregateUDF, + logical_expr::ScalarUDF, logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, @@ -53,7 +54,7 @@ use crate::{ optimizer::analyzer::{Analyzer, AnalyzerRule}, optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule}, physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule}, - physical_plan::{udf::ScalarUDF, ExecutionPlan}, + physical_plan::ExecutionPlan, physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, variable::{VarProvider, VarType}, }; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 06c4bd1c9531..21cf6d348cd5 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Declaration of built-in (scalar) functions. +//! Deprecated module. Add new feature in scalar_function.rs +//! //! This module contains built-in functions' enumeration and metadata. //! //! Generally, a function has: @@ -30,52 +31,15 @@ //! an argument i32 is passed to a function that supports f64, the //! argument is automatically is coerced to f64. -use std::ops::Neg; use std::sync::Arc; -use arrow::{array::ArrayRef, datatypes::Schema}; +use arrow::array::ArrayRef; use arrow_array::Array; -use datafusion_common::{DFSchema, Result, ScalarValue}; +pub use crate::scalar_function::create_physical_expr; +use datafusion_common::{Result, ScalarValue}; pub use datafusion_expr::FuncMonotonicity; -use datafusion_expr::{ - type_coercion::functions::data_types, ColumnarValue, ScalarFunctionImplementation, -}; -use datafusion_expr::{Expr, ScalarFunctionDefinition, ScalarUDF}; - -use crate::sort_properties::SortProperties; -use crate::{PhysicalExpr, ScalarFunctionExpr}; - -/// Create a physical (function) expression. -/// This function errors when `args`' can't be coerced to a valid argument type of the function. -pub fn create_physical_expr( - fun: &ScalarUDF, - input_phy_exprs: &[Arc], - input_schema: &Schema, - args: &[Expr], - input_dfschema: &DFSchema, -) -> Result> { - let input_expr_types = input_phy_exprs - .iter() - .map(|e| e.data_type(input_schema)) - .collect::>>()?; - - // verify that input data types is consistent with function's `TypeSignature` - data_types(&input_expr_types, fun.signature())?; - - // Since we have arg_types, we don't need args and schema. - let return_type = - fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?; - - let fun_def = ScalarFunctionDefinition::UDF(Arc::new(fun.clone())); - Ok(Arc::new(ScalarFunctionExpr::new( - fun.name(), - fun_def, - input_phy_exprs.to_vec(), - return_type, - fun.monotonicity()?, - ))) -} +use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation}; #[derive(Debug, Clone, Copy)] pub enum Hint { @@ -164,309 +128,3 @@ where } }) } - -/// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments -/// and the function's behavior depending on its arguments. -pub fn out_ordering( - func: &FuncMonotonicity, - arg_orderings: &[SortProperties], -) -> SortProperties { - func.iter().zip(arg_orderings).fold( - SortProperties::Singleton, - |prev_sort, (item, arg)| { - let current_sort = func_order_in_one_dimension(item, arg); - - match (prev_sort, current_sort) { - (_, SortProperties::Unordered) => SortProperties::Unordered, - (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, - (SortProperties::Ordered(prev), SortProperties::Ordered(current)) - if prev.descending != current.descending => - { - SortProperties::Unordered - } - _ => prev_sort, - } - }, - ) -} - -/// This function decides the monotonicity property of a [`ScalarFunctionExpr`] for a single argument (i.e. across a single dimension), given that argument's sort properties. -fn func_order_in_one_dimension( - func_monotonicity: &Option, - arg: &SortProperties, -) -> SortProperties { - if *arg == SortProperties::Singleton { - SortProperties::Singleton - } else { - match func_monotonicity { - None => SortProperties::Unordered, - Some(false) => { - if let SortProperties::Ordered(_) = arg { - arg.neg() - } else { - SortProperties::Unordered - } - } - Some(true) => { - if let SortProperties::Ordered(_) = arg { - *arg - } else { - SortProperties::Unordered - } - } - } - } -} - -#[cfg(test)] -mod tests { - use arrow::{ - array::UInt64Array, - datatypes::{DataType, Field}, - }; - use arrow_schema::DataType::Utf8; - - use datafusion_common::cast::as_uint64_array; - use datafusion_common::DataFusionError; - use datafusion_common::{internal_err, plan_err}; - use datafusion_expr::{Signature, Volatility}; - - use crate::expressions::try_cast; - use crate::utils::tests::TestScalarUDF; - - use super::*; - - #[test] - fn test_empty_arguments_error() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let udf = ScalarUDF::new_from_impl(TestScalarUDF { - signature: Signature::variadic(vec![Utf8], Volatility::Immutable), - }); - let expr = create_physical_expr_with_type_coercion( - &udf, - &[], - &schema, - &[], - &DFSchema::empty(), - ); - - match expr { - Ok(..) => { - return plan_err!( - "ScalarUDF function {udf:?} does not support empty arguments" - ); - } - Err(DataFusionError::Plan(_)) => { - // Continue the loop - } - Err(..) => { - return internal_err!( - "ScalarUDF function {udf:?} didn't got the right error with empty arguments"); - } - } - - Ok(()) - } - - // Helper function just for testing. - // Returns `expressions` coerced to types compatible with - // `signature`, if possible. - pub fn coerce( - expressions: &[Arc], - schema: &Schema, - signature: &Signature, - ) -> Result>> { - if expressions.is_empty() { - return Ok(vec![]); - } - - let current_types = expressions - .iter() - .map(|e| e.data_type(schema)) - .collect::>>()?; - - let new_types = data_types(¤t_types, signature)?; - - expressions - .iter() - .enumerate() - .map(|(i, expr)| try_cast(expr.clone(), schema, new_types[i].clone())) - .collect::>>() - } - - // Helper function just for testing. - // The type coercion will be done in the logical phase, should do the type coercion for the test - fn create_physical_expr_with_type_coercion( - fun: &ScalarUDF, - input_phy_exprs: &[Arc], - input_schema: &Schema, - args: &[Expr], - input_dfschema: &DFSchema, - ) -> Result> { - let type_coerced_phy_exprs = - coerce(input_phy_exprs, input_schema, fun.signature()).unwrap(); - create_physical_expr( - fun, - &type_coerced_phy_exprs, - input_schema, - args, - input_dfschema, - ) - } - - fn dummy_function(args: &[ArrayRef]) -> Result { - let result: UInt64Array = - args.iter().map(|array| Some(array.len() as u64)).collect(); - Ok(Arc::new(result) as ArrayRef) - } - - fn unpack_uint64_array(col: Result) -> Result> { - if let ColumnarValue::Array(array) = col? { - Ok(as_uint64_array(&array)?.values().to_vec()) - } else { - internal_err!("Unexpected scalar created by a test function") - } - } - - #[test] - fn test_make_scalar_function() -> Result<()> { - let adapter_func = make_scalar_function_inner(dummy_function); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; - assert_eq!(result, vec![5, 5]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_no_hints() -> Result<()> { - let adapter_func = make_scalar_function_with_hints(dummy_function, vec![]); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; - assert_eq!(result, vec![5, 5]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_hints() -> Result<()> { - let adapter_func = make_scalar_function_with_hints( - dummy_function, - vec![Hint::Pad, Hint::AcceptsSingular], - ); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; - assert_eq!(result, vec![5, 1]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> { - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let adapter_func = make_scalar_function_with_hints( - dummy_function, - vec![Hint::Pad, Hint::AcceptsSingular], - ); - - let result = unpack_uint64_array(adapter_func(&[array_arg.clone(), array_arg]))?; - assert_eq!(result, vec![5, 5]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_mixed_hints() -> Result<()> { - let adapter_func = make_scalar_function_with_hints( - dummy_function, - vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad], - ); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[ - array_arg, - scalar_arg.clone(), - scalar_arg, - ]))?; - assert_eq!(result, vec![5, 1, 5]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_more_arguments_than_hints() -> Result<()> { - let adapter_func = make_scalar_function_with_hints( - dummy_function, - vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad], - ); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[ - array_arg.clone(), - scalar_arg.clone(), - scalar_arg, - array_arg, - ]))?; - assert_eq!(result, vec![5, 1, 5, 5]); - - Ok(()) - } - - #[test] - fn test_make_scalar_function_with_hints_than_arguments() -> Result<()> { - let adapter_func = make_scalar_function_with_hints( - dummy_function, - vec![ - Hint::Pad, - Hint::AcceptsSingular, - Hint::Pad, - Hint::Pad, - Hint::AcceptsSingular, - Hint::Pad, - ], - ); - - let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = ColumnarValue::Array( - ScalarValue::Int64(Some(1)) - .to_array_of_size(5) - .expect("Failed to convert to array of size"), - ); - let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; - assert_eq!(result, vec![5, 1]); - - Ok(()) - } -} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e0f19ad133e5..aef5aa7c00e7 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -27,7 +27,9 @@ mod partitioning; mod physical_expr; pub mod planner; mod scalar_function; -pub mod udf; +pub mod udf { + pub use crate::scalar_function::create_physical_expr; +} pub mod utils; pub mod window; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 7fff55ed42d5..2621b817b2da 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -31,9 +31,10 @@ use datafusion_expr::{ Operator, ScalarFunctionDefinition, TryCast, }; +use crate::scalar_function; use crate::{ expressions::{self, binary, like, Column, Literal}, - udf, PhysicalExpr, + PhysicalExpr, }; /// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 @@ -309,13 +310,15 @@ pub fn create_physical_expr( create_physical_exprs(args, input_dfschema, execution_props)?; match func_def { - ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - input_schema, - args, - input_dfschema, - ), + ScalarFunctionDefinition::UDF(fun) => { + scalar_function::create_physical_expr( + fun.clone().as_ref(), + &physical_args, + input_schema, + args, + input_dfschema, + ) + } } } Expr::Between(Between { diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 95a2de1f7156..6b84b81e9fae 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -32,17 +32,19 @@ use std::any::Any; use std::fmt::{self, Debug, Formatter}; use std::hash::{Hash, Hasher}; +use std::ops::Neg; use std::sync::Arc; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_err, DFSchema, Result}; +use datafusion_expr::type_coercion::functions::data_types; use datafusion_expr::{ - expr_vec_fmt, ColumnarValue, FuncMonotonicity, ScalarFunctionDefinition, + expr_vec_fmt, ColumnarValue, Expr, FuncMonotonicity, ScalarFunctionDefinition, + ScalarUDF, }; -use crate::functions::out_ordering; use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; use crate::sort_properties::SortProperties; use crate::PhysicalExpr; @@ -208,3 +210,92 @@ impl PartialEq for ScalarFunctionExpr { .unwrap_or(false) } } + +/// Create a physical expression for the UDF. +/// +/// Arguments: +pub fn create_physical_expr( + fun: &ScalarUDF, + input_phy_exprs: &[Arc], + input_schema: &Schema, + args: &[Expr], + input_dfschema: &DFSchema, +) -> Result> { + let input_expr_types = input_phy_exprs + .iter() + .map(|e| e.data_type(input_schema)) + .collect::>>()?; + + // verify that input data types is consistent with function's `TypeSignature` + data_types(&input_expr_types, fun.signature())?; + + // Since we have arg_types, we dont need args and schema. + let return_type = + fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?; + + let fun_def = ScalarFunctionDefinition::UDF(Arc::new(fun.clone())); + Ok(Arc::new(ScalarFunctionExpr::new( + fun.name(), + fun_def, + input_phy_exprs.to_vec(), + return_type, + fun.monotonicity()?, + ))) +} + +/// Determines a [ScalarFunctionExpr]'s monotonicity for the given arguments +/// and the function's behavior depending on its arguments. +/// +/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr +pub fn out_ordering( + func: &FuncMonotonicity, + arg_orderings: &[SortProperties], +) -> SortProperties { + func.iter().zip(arg_orderings).fold( + SortProperties::Singleton, + |prev_sort, (item, arg)| { + let current_sort = func_order_in_one_dimension(item, arg); + + match (prev_sort, current_sort) { + (_, SortProperties::Unordered) => SortProperties::Unordered, + (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, + (SortProperties::Ordered(prev), SortProperties::Ordered(current)) + if prev.descending != current.descending => + { + SortProperties::Unordered + } + _ => prev_sort, + } + }, + ) +} + +/// This function decides the monotonicity property of a [ScalarFunctionExpr] for a single argument (i.e. across a single dimension), given that argument's sort properties. +/// +/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr +fn func_order_in_one_dimension( + func_monotonicity: &Option, + arg: &SortProperties, +) -> SortProperties { + if *arg == SortProperties::Singleton { + SortProperties::Singleton + } else { + match func_monotonicity { + None => SortProperties::Unordered, + Some(false) => { + if let SortProperties::Ordered(_) = arg { + arg.neg() + } else { + SortProperties::Unordered + } + } + Some(true) => { + if let SortProperties::Ordered(_) = arg { + *arg + } else { + SortProperties::Unordered + } + } + } + } +} diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs deleted file mode 100644 index aad78b7c2f90..000000000000 --- a/datafusion/physical-expr/src/udf.rs +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! UDF support -use std::sync::Arc; - -use arrow_schema::Schema; - -use datafusion_common::{DFSchema, Result}; -pub use datafusion_expr::ScalarUDF; -use datafusion_expr::{ - type_coercion::functions::data_types, Expr, ScalarFunctionDefinition, -}; - -use crate::{PhysicalExpr, ScalarFunctionExpr}; - -/// Create a physical expression of the UDF. -/// -/// Arguments: -pub fn create_physical_expr( - fun: &ScalarUDF, - input_phy_exprs: &[Arc], - input_schema: &Schema, - args: &[Expr], - input_dfschema: &DFSchema, -) -> Result> { - let input_expr_types = input_phy_exprs - .iter() - .map(|e| e.data_type(input_schema)) - .collect::>>()?; - - // verify that input data types is consistent with function's `TypeSignature` - data_types(&input_expr_types, fun.signature())?; - - // Since we have arg_types, we dont need args and schema. - let return_type = - fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?; - - let fun_def = ScalarFunctionDefinition::UDF(Arc::new(fun.clone())); - Ok(Arc::new(ScalarFunctionExpr::new( - fun.name(), - fun_def, - input_phy_exprs.to_vec(), - return_type, - fun.monotonicity()?, - ))) -} - -#[cfg(test)] -mod tests { - use arrow_schema::Schema; - - use datafusion_common::{DFSchema, Result}; - use datafusion_expr::ScalarUDF; - - use crate::utils::tests::TestScalarUDF; - use crate::ScalarFunctionExpr; - - use super::create_physical_expr; - - #[test] - fn test_functions() -> Result<()> { - // create and register the udf - let udf = ScalarUDF::from(TestScalarUDF::new()); - - let e = crate::expressions::lit(1.1); - let p_expr = - create_physical_expr(&udf, &[e], &Schema::empty(), &[], &DFSchema::empty())?; - - assert_eq!( - p_expr - .as_any() - .downcast_ref::() - .unwrap() - .monotonicity(), - &Some(vec![Some(true)]) - ); - - Ok(()) - } -}