Skip to content

Commit

Permalink
perf: improve to_field performance (apache#9722)
Browse files Browse the repository at this point in the history
* perf: improve to_field performance

* finish

* remove duplicate code

* retrigger ci

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
haohuaijin and alamb authored Mar 22, 2024
1 parent 5f0cb49 commit 2b69acc
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 20 deletions.
12 changes: 12 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ pub trait ExprSchema: std::fmt::Debug {

/// Returns the column's optional metadata.
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;

/// Return the coulmn's datatype and nullability
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -755,6 +758,10 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
ExprSchema::metadata(self.as_ref(), col)
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
self.as_ref().data_type_and_nullable(col)
}
}

impl ExprSchema for DFSchema {
Expand All @@ -769,6 +776,11 @@ impl ExprSchema for DFSchema {
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
let field = self.field_from_column(col)?;
Ok((field.data_type(), field.is_nullable()))
}
}

/// DFField wraps an Arrow field and adds an optional qualifier
Expand Down
106 changes: 86 additions & 20 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub trait ExprSchemable {

/// cast to a type with respect to a schema
fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;

/// given a schema, return the type and nullability of the expr
fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
-> Result<(DataType, bool)>;
}

impl ExprSchemable for Expr {
Expand Down Expand Up @@ -370,32 +374,90 @@ impl ExprSchemable for Expr {
}
}

/// Returns the datatype and nullability of the expression based on [ExprSchema].
///
/// Note: [`DFSchema`] implements [ExprSchema].
///
/// [`DFSchema`]: datafusion_common::DFSchema
///
/// # Errors
///
/// This function errors when it is not possible to compute its
/// datatype or nullability.
fn data_type_and_nullable(
&self,
schema: &dyn ExprSchema,
) -> Result<(DataType, bool)> {
match self {
Expr::Alias(Alias { expr, name, .. }) => match &**expr {
Expr::Placeholder(Placeholder { data_type, .. }) => match &data_type {
None => schema
.data_type_and_nullable(&Column::from_name(name))
.map(|(d, n)| (d.clone(), n)),
Some(dt) => Ok((dt.clone(), expr.nullable(schema)?)),
},
_ => expr.data_type_and_nullable(schema),
},
Expr::Sort(Sort { expr, .. }) | Expr::Negative(expr) => {
expr.data_type_and_nullable(schema)
}
Expr::Column(c) => schema
.data_type_and_nullable(c)
.map(|(d, n)| (d.clone(), n)),
Expr::OuterReferenceColumn(ty, _) => Ok((ty.clone(), true)),
Expr::ScalarVariable(ty, _) => Ok((ty.clone(), true)),
Expr::Literal(l) => Ok((l.data_type(), l.is_null())),
Expr::IsNull(_)
| Expr::IsNotNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Exists { .. } => Ok((DataType::Boolean, false)),
Expr::ScalarSubquery(subquery) => Ok((
subquery.subquery.schema().field(0).data_type().clone(),
subquery.subquery.schema().field(0).is_nullable(),
)),
Expr::BinaryExpr(BinaryExpr {
ref left,
ref right,
ref op,
}) => {
let left = left.data_type_and_nullable(schema)?;
let right = right.data_type_and_nullable(schema)?;
Ok((get_result_type(&left.0, op, &right.0)?, left.1 || right.1))
}
_ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
}
}

/// Returns a [arrow::datatypes::Field] compatible with this expression.
///
/// So for example, a projected expression `col(c1) + col(c2)` is
/// placed in an output field **named** col("c1 + c2")
fn to_field(&self, input_schema: &dyn ExprSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
relation.clone(),
name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
Expr::Column(c) => {
let (data_type, nullable) = self.data_type_and_nullable(input_schema)?;
Ok(
DFField::new(c.relation.clone(), &c.name, data_type, nullable)
.with_metadata(self.metadata(input_schema)?),
)
}
Expr::Alias(Alias { relation, name, .. }) => {
let (data_type, nullable) = self.data_type_and_nullable(input_schema)?;
Ok(DFField::new(relation.clone(), name, data_type, nullable)
.with_metadata(self.metadata(input_schema)?))
}
_ => {
let (data_type, nullable) = self.data_type_and_nullable(input_schema)?;
Ok(
DFField::new_unqualified(&self.display_name()?, data_type, nullable)
.with_metadata(self.metadata(input_schema)?),
)
}
}
}

Expand Down Expand Up @@ -704,5 +766,9 @@ mod tests {
fn metadata(&self, _col: &Column) -> Result<&HashMap<String, String>> {
Ok(&self.metadata)
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
Ok((self.data_type(col)?, self.nullable(col)?))
}
}
}

0 comments on commit 2b69acc

Please sign in to comment.