Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 24, 2023
1 parent 8effc59 commit 9ba3285
Show file tree
Hide file tree
Showing 32 changed files with 797 additions and 159 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
| Expr::Cast { .. }
| Expr::PromotePrecision { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use async_trait::async_trait;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::expr::{
self, AggregateFunction, Between, BinaryExpr, Cast, GetIndexedField, GroupingSet,
Like, TryCast, WindowFunction,
Like, PromotePrecision, TryCast, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
Expand Down Expand Up @@ -111,7 +111,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Alias(_, name) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
Expr::BinaryExpr(BinaryExpr {
left, op, right, ..
}) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
Expand All @@ -134,6 +136,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::PromotePrecision(PromotePrecision { expr }) => {
// PromotePrecision does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
Expand Down Expand Up @@ -1924,7 +1930,7 @@ mod tests {
// verify that the plan correctly casts u8 to i64
// the cast from u8 to i64 for literal will be simplified, and get lit(int64(5))
// the cast here is implicit so has CastOptions with safe=true
let expected = "BinaryExpr { left: Column { name: \"c7\", index: 2 }, op: Lt, right: Literal { value: Int64(5) } }";
let expected = "BinaryExpr { left: Column { name: \"c7\", index: 2 }, op: Lt, right: Literal { value: Int64(5) }, data_type: None }";
assert!(format!("{exec_plan:?}").contains(expected));
Ok(())
}
Expand Down Expand Up @@ -2170,7 +2176,7 @@ mod tests {
let execution_plan = plan(&logical_plan).await?;
// verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated.

let expected = "expr: [(BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\") } }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\") } } }";
let expected = "expr: [(BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\") }, data_type: None }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\") }, data_type: None }, data_type: None }";

let actual = format!("{execution_plan:?}");
assert!(actual.contains(expected), "{}", actual);
Expand Down
44 changes: 43 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ pub enum Expr {
/// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
/// This expression is guaranteed to have a fixed type.
Cast(Cast),
/// Wraps the child expression when promoting the precision of DecimalType to avoid promote multiple times.
PromotePrecision(PromotePrecision),
/// Casts the expression to a given type and will return a null value if the expression cannot be cast.
/// This expression is guaranteed to have a fixed type.
TryCast(TryCast),
Expand Down Expand Up @@ -234,12 +236,33 @@ pub struct BinaryExpr {
pub op: Operator,
/// Right-hand side of the expression
pub right: Box<Expr>,
/// The data type of the expression, if known
pub data_type: Option<DataType>,
}

impl BinaryExpr {
/// Create a new binary expression
pub fn new(left: Box<Expr>, op: Operator, right: Box<Expr>) -> Self {
Self { left, op, right }
Self {
left,
op,
right,
data_type: None,
}
}

pub fn new_with_data_type(
left: Box<Expr>,
op: Operator,
right: Box<Expr>,
data_type: Option<DataType>,
) -> Self {
Self {
left,
op,
right,
data_type,
}
}
}

Expand Down Expand Up @@ -385,6 +408,20 @@ impl Cast {
}
}

/// Cast expression
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct PromotePrecision {
/// The expression being promoted
pub expr: Box<Expr>,
}

impl PromotePrecision {
/// Create a new PromotePrecision expression
pub fn new(expr: Box<Expr>) -> Self {
Self { expr }
}
}

/// TryCast Expression
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct TryCast {
Expand Down Expand Up @@ -569,6 +606,7 @@ impl Expr {
Expr::BinaryExpr { .. } => "BinaryExpr",
Expr::Case { .. } => "Case",
Expr::Cast { .. } => "Cast",
Expr::PromotePrecision { .. } => "PromotePrecision",
Expr::Column(..) => "Column",
Expr::OuterReferenceColumn(_, _) => "Outer",
Expr::Exists { .. } => "Exists",
Expand Down Expand Up @@ -858,6 +896,9 @@ impl fmt::Debug for Expr {
Expr::Cast(Cast { expr, data_type }) => {
write!(f, "CAST({expr:?} AS {data_type:?})")
}
Expr::PromotePrecision(PromotePrecision { expr }) => {
write!(f, "PROMOTE_PRECISION({expr:?})")
}
Expr::TryCast(TryCast { expr, data_type }) => {
write!(f, "TRY_CAST({expr:?} AS {data_type:?})")
}
Expand Down Expand Up @@ -1211,6 +1252,7 @@ fn create_name(e: &Expr) -> Result<String> {
// CAST does not change the expression name
create_name(expr)
}
Expr::PromotePrecision(PromotePrecision { expr }) => create_name(expr),
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_name(expr)
Expand Down
15 changes: 15 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
}

/// Return a new expression `left <op> right`
pub fn binary_expr_with_data_type(
left: Expr,
op: Operator,
right: Expr,
data_type: Option<DataType>,
) -> Expr {
Expr::BinaryExpr(BinaryExpr::new_with_data_type(
Box::new(left),
op,
Box::new(right),
data_type,
))
}

/// Return a new expression with a logical AND
pub fn and(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(
Expand Down
23 changes: 15 additions & 8 deletions datafusion/expr/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, GetIndexedField, GroupingSet,
Like, Sort, TryCast, WindowFunction,
Like, PromotePrecision, Sort, TryCast, WindowFunction,
};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
Expand Down Expand Up @@ -135,13 +135,17 @@ impl ExprRewritable for Expr {
Expr::ScalarSubquery(_) => self.clone(),
Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
Expr::BinaryExpr(BinaryExpr::new(
rewrite_boxed(left, rewriter)?,
op,
rewrite_boxed(right, rewriter)?,
))
}
Expr::BinaryExpr(BinaryExpr {
left,
op,
right,
data_type,
}) => Expr::BinaryExpr(BinaryExpr::new_with_data_type(
rewrite_boxed(left, rewriter)?,
op,
rewrite_boxed(right, rewriter)?,
data_type,
)),
Expr::Like(Like {
negated,
expr,
Expand Down Expand Up @@ -218,6 +222,9 @@ impl ExprRewritable for Expr {
Expr::Cast(Cast { expr, data_type }) => {
Expr::Cast(Cast::new(rewrite_boxed(expr, rewriter)?, data_type))
}
Expr::PromotePrecision(PromotePrecision { expr }) => Expr::PromotePrecision(
PromotePrecision::new(rewrite_boxed(expr, rewriter)?),
),
Expr::TryCast(TryCast { expr, data_type }) => {
Expr::TryCast(TryCast::new(rewrite_boxed(expr, rewriter)?, data_type))
}
Expand Down
48 changes: 42 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, BinaryExpr, Cast, GetIndexedField, Sort, TryCast, WindowFunction,
AggregateFunction, BinaryExpr, Cast, GetIndexedField, PromotePrecision, Sort,
TryCast, WindowFunction,
};
use crate::field_util::get_indexed_field;
use crate::type_coercion::binary::binary_operator_data_type;
Expand All @@ -39,6 +40,13 @@ pub trait ExprSchemable {

/// cast to a type with respect to a schema
fn cast_to<S: ExprSchema>(self, cast_to_type: &DataType, schema: &S) -> Result<Expr>;

/// promote to a type with respect to a schema
fn promote_to<S: ExprSchema>(
self,
promote_to_type: &DataType,
schema: &S,
) -> Result<Expr>;
}

impl ExprSchemable for Expr {
Expand Down Expand Up @@ -71,6 +79,7 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::PromotePrecision(PromotePrecision { expr }) => expr.get_type(schema),
Expr::ScalarUDF { fun, args } => {
let data_types = args
.iter()
Expand Down Expand Up @@ -126,11 +135,18 @@ impl ExprSchemable for Expr {
ref left,
ref right,
ref op,
}) => binary_operator_data_type(
&left.get_type(schema)?,
op,
&right.get_type(schema)?,
),
ref data_type,
}) => {
if let Some(dt) = data_type {
Ok(dt.clone())
} else {
binary_operator_data_type(
&left.get_type(schema)?,
op,
&right.get_type(schema)?,
)
}
}
Expr::Like { .. } | Expr::ILike { .. } | Expr::SimilarTo { .. } => {
Ok(DataType::Boolean)
}
Expand Down Expand Up @@ -195,6 +211,9 @@ impl ExprSchemable for Expr {
}
}
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
Expr::PromotePrecision(PromotePrecision { expr }) => {
expr.nullable(input_schema)
}
Expr::ScalarVariable(_, _)
| Expr::TryCast { .. }
| Expr::ScalarFunction { .. }
Expand Down Expand Up @@ -284,6 +303,23 @@ impl ExprSchemable for Expr {
)))
}
}

/// Wraps this expression in a promote precision to a target [arrow::datatypes::DataType].
///
/// # Errors
///
/// This function errors when it is impossible to cast the
/// expression to the target [arrow::datatypes::DataType].
fn promote_to<S: ExprSchema>(
self,
promote_to_type: &DataType,
schema: &S,
) -> Result<Expr> {
let casted = self.cast_to(promote_to_type, schema)?;
Ok(Expr::PromotePrecision(PromotePrecision::new(Box::new(
casted,
))))
}
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression visitor
use crate::expr::{AggregateFunction, Cast, Sort, WindowFunction};
use crate::expr::{AggregateFunction, Cast, PromotePrecision, Sort, WindowFunction};
use crate::{
expr::{BinaryExpr, GroupingSet, TryCast},
Between, Expr, GetIndexedField, Like,
Expand Down Expand Up @@ -116,6 +116,7 @@ impl ExprVisitable for Expr {
| Expr::IsNull(expr)
| Expr::Negative(expr)
| Expr::Cast(Cast { expr, .. })
| Expr::PromotePrecision(PromotePrecision { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
| Expr::InSubquery { expr, .. } => expr.accept(visitor),
Expand Down
Loading

0 comments on commit 9ba3285

Please sign in to comment.