Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add values list expression #1165

Merged
merged 2 commits into from
Oct 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
}
}

Expand Down Expand Up @@ -316,12 +317,12 @@ message SelectionNode {
LogicalExprNode expr = 2;
}

message SortNode{
message SortNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
}

message RepartitionNode{
message RepartitionNode {
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
Expand All @@ -334,19 +335,26 @@ message HashRepartition {
uint64 partition_count = 2;
}

message EmptyRelationNode{
message EmptyRelationNode {
bool produce_one_row = 1;
}

message CreateExternalTableNode{
message CreateExternalTableNode {
string name = 1;
string location = 2;
FileType file_type = 3;
bool has_header = 4;
DfSchema schema = 5;
}

enum FileType{
// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
uint64 n_cols = 1;
repeated LogicalExprNode values_list = 2;
}

enum FileType {
NdJson = 0;
Parquet = 1;
CSV = 2;
Expand Down
25 changes: 25 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
))
})?;
match plan {
LogicalPlanType::Values(values) => {
let n_cols = values.n_cols as usize;
let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
Ok(Vec::new())
} else if values.values_list.len() % n_cols != 0 {
Err(BallistaError::General(format!(
"Invalid values list length, expect {} to be divisible by {}",
values.values_list.len(),
n_cols
)))
} else {
values
.values_list
.chunks_exact(n_cols)
.map(|r| {
r.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()
})
.collect::<Result<Vec<_>, _>>()
}?;
LogicalPlanBuilder::values(values)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan = convert_box_required!(projection.input)?;
let x: Vec<Expr> = projection
Expand Down
20 changes: 20 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,26 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
use protobuf::logical_plan_node::LogicalPlanType;
match self {
LogicalPlan::Values { values, .. } => {
let n_cols = if values.is_empty() {
0
} else {
values[0].len()
} as u64;
let values_list = values
.iter()
.flatten()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Values(
protobuf::ValuesNode {
n_cols,
values_list,
},
)),
})
}
LogicalPlan::TableScan {
table_name,
source,
Expand Down
94 changes: 84 additions & 10 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

//! This module provides a builder for creating LogicalPlans

use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};

use crate::datasource::{
empty::EmptyTable,
file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
Expand All @@ -37,6 +27,16 @@ use crate::datasource::{
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::ToStringifiedPlan;
use crate::prelude::*;
use crate::scalar::ScalarValue;
use arrow::{
datatypes::{DataType, Schema, SchemaRef},
record_batch::RecordBatch,
};
use std::convert::TryFrom;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
Expand Down Expand Up @@ -111,6 +111,80 @@ impl LogicalPlanBuilder {
})
}

/// Create a values list based relation, and the schema is inferred from data, consuming
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
///
/// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
/// The column names are not specified by the SQL standard and different database systems do it differently,
/// so it's usually better to override the default names with a table alias list.
pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
if values.is_empty() {
return Err(DataFusionError::Plan("Values list cannot be empty".into()));
}
let n_cols = values[0].len();
if n_cols == 0 {
return Err(DataFusionError::Plan(
"Values list cannot be zero length".into(),
));
}
let empty_schema = DFSchema::empty();
let mut field_types: Vec<Option<DataType>> = Vec::with_capacity(n_cols);
for _ in 0..n_cols {
field_types.push(None);
}
// hold all the null holes so that we can correct their data types later
let mut nulls: Vec<(usize, usize)> = Vec::new();
for (i, row) in values.iter().enumerate() {
if row.len() != n_cols {
return Err(DataFusionError::Plan(format!(
"Inconsistent data length across values list: got {} values in row {} but expected {}",
row.len(),
i,
n_cols
)));
}
field_types = row
.iter()
.enumerate()
.map(|(j, expr)| {
if let Expr::Literal(ScalarValue::Utf8(None)) = expr {
nulls.push((i, j));
Ok(field_types[j].clone())
} else {
let data_type = expr.get_type(&empty_schema)?;
if let Some(prev_data_type) = &field_types[j] {
if prev_data_type != &data_type {
let err = format!("Inconsistent data type across values list at row {} column {}", i, j);
return Err(DataFusionError::Plan(err));
}
}
Ok(Some(data_type))
}
})
.collect::<Result<Vec<Option<DataType>>>>()?;
}
let fields = field_types
.iter()
.enumerate()
.map(|(j, data_type)| {
// naming is following convention https://www.postgresql.org/docs/current/queries-values.html
let name = &format!("column{}", j + 1);
DFField::new(
None,
name,
data_type.clone().unwrap_or(DataType::Utf8),
true,
)
})
.collect::<Vec<_>>();
for (i, j) in nulls {
values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
}
let schema = DFSchemaRef::new(DFSchema::new(fields)?);
Ok(Self::from(LogicalPlan::Values { schema, values }))
}

/// Scan a memory data source
pub fn scan_memory(
partitions: Vec<Vec<RecordBatch>>,
Expand Down
35 changes: 35 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Values {
/// The table schema
schema: DFSchemaRef,
/// Values
values: Vec<Vec<Expr>>,
},
/// Produces a relation with string representations of
/// various parts of the plan
Explain {
Expand Down Expand Up @@ -237,6 +246,7 @@ impl LogicalPlan {
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::EmptyRelation { schema, .. } => schema,
LogicalPlan::Values { schema, .. } => schema,
LogicalPlan::TableScan {
projected_schema, ..
} => projected_schema,
Expand All @@ -263,6 +273,7 @@ impl LogicalPlan {
LogicalPlan::TableScan {
projected_schema, ..
} => vec![projected_schema],
LogicalPlan::Values { schema, .. } => vec![schema],
LogicalPlan::Window { input, schema, .. }
| LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
Expand Down Expand Up @@ -315,6 +326,9 @@ impl LogicalPlan {
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection { expr, .. } => expr.clone(),
LogicalPlan::Values { values, .. } => {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
LogicalPlan::Repartition {
partitioning_scheme,
Expand Down Expand Up @@ -369,6 +383,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => vec![],
}
}
Expand Down Expand Up @@ -515,6 +530,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => true,
};
if !recurse {
Expand Down Expand Up @@ -702,6 +718,25 @@ impl LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"),
LogicalPlan::Values { ref values, .. } => {
let str_values: Vec<_> = values
.iter()
// limit to only 5 values to avoid horrible display
.take(5)
.map(|row| {
let item = row
.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({})", item)
})
.collect();

let elipse = if values.len() > 5 { "..." } else { "" };
write!(f, "Values: {}{}", str_values.join(", "), elipse)
}

LogicalPlan::TableScan {
ref table_name,
ref projection,
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Repartition { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Values { .. }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it is not likely to matter, but constant folding could be applied to the Exprs in values. As written this code will not apply constant folding to those expressions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

address in #1170

| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn optimize_plan(
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
Expand Down
7 changes: 7 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ pub fn from_plan(
schema: schema.clone(),
alias: alias.clone(),
}),
LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values {
schema: schema.clone(),
values: expr
.chunks_exact(schema.fields().len())
.map(|s| s.to_vec())
.collect::<Vec<_>>(),
}),
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,5 +645,6 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
pub mod values;
pub mod window_functions;
pub mod windows;
26 changes: 25 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::analyze::AnalyzeExec;
use super::{
aggregates, empty::EmptyExec, expressions::binary, functions,
hash_join::PartitionMode, udaf, union::UnionExec, windows,
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
Expand Down Expand Up @@ -323,6 +323,30 @@ impl DefaultPhysicalPlanner {
let filters = unnormalize_cols(filters.iter().cloned());
source.scan(projection, batch_size, &filters, *limit).await
}
LogicalPlan::Values {
values,
schema,
} => {
let exec_schema = schema.as_ref().to_owned().into();
let exprs = values.iter()
.map(|row| {
row.iter().map(|expr|{
self.create_physical_expr(
expr,
schema,
&exec_schema,
ctx_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
let value_exec = ValuesExec::try_new(
SchemaRef::new(exec_schema),
exprs
)?;
Ok(Arc::new(value_exec))
}
LogicalPlan::Window {
input, window_expr, ..
} => {
Expand Down
Loading