Skip to content

Commit

Permalink
add values list expression
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Oct 22, 2021
1 parent e6657f0 commit bb74f4d
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 8 deletions.
18 changes: 13 additions & 5 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
}
}

Expand Down Expand Up @@ -316,12 +317,12 @@ message SelectionNode {
LogicalExprNode expr = 2;
}

message SortNode{
message SortNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
}

message RepartitionNode{
message RepartitionNode {
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
Expand All @@ -334,19 +335,26 @@ message HashRepartition {
uint64 partition_count = 2;
}

message EmptyRelationNode{
message EmptyRelationNode {
bool produce_one_row = 1;
}

message CreateExternalTableNode{
message CreateExternalTableNode {
string name = 1;
string location = 2;
FileType file_type = 3;
bool has_header = 4;
DfSchema schema = 5;
}

enum FileType{
// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
uint64 n_cols = 1;
repeated LogicalExprNode values_list = 2;
}

enum FileType {
NdJson = 0;
Parquet = 1;
CSV = 2;
Expand Down
25 changes: 25 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
))
})?;
match plan {
LogicalPlanType::Values(values) => {
let n_cols = values.n_cols as usize;
let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
Ok(Vec::new())
} else if values.values_list.len() % n_cols != 0 {
Err(BallistaError::General(format!(
"Invalid values list length, expect {} to be divisible by {}",
values.values_list.len(),
n_cols
)))
} else {
values
.values_list
.chunks_exact(n_cols)
.map(|r| {
r.into_iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()
})
.collect::<Result<Vec<_>, _>>()
}?;
LogicalPlanBuilder::values(values)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan = convert_box_required!(projection.input)?;
let x: Vec<Expr> = projection
Expand Down
20 changes: 20 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,26 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
use protobuf::logical_plan_node::LogicalPlanType;
match self {
LogicalPlan::Values { values, .. } => {
let n_cols = if values.is_empty() {
0
} else {
values[0].len()
} as u64;
let values_list = values
.iter()
.flatten()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Values(
protobuf::ValuesNode {
n_cols,
values_list,
},
)),
})
}
LogicalPlan::TableScan {
table_name,
source,
Expand Down
43 changes: 42 additions & 1 deletion datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
};

use arrow::{
datatypes::{Schema, SchemaRef},
datatypes::{DataType, Schema, SchemaRef},
record_batch::RecordBatch,
};

Expand Down Expand Up @@ -111,6 +111,47 @@ impl LogicalPlanBuilder {
})
}

/// Create a values list based relation, and the schema is inferred from data
pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
let mut schema = DFSchema::empty();
for row in &values {
let first_row = schema.fields().is_empty();
if !first_row && row.len() != schema.fields().len() {
return Err(DataFusionError::Plan(format!(
"Inconsistent data length across values list: {} != {}",
row.len(),
schema.fields().len()
)));
}
let fields = row
.iter()
.enumerate()
.map(|(i, expr)| {
let mut nullable = expr.nullable(&schema)?;
let data_type = expr.get_type(&schema)?;
if !first_row {
nullable |= schema.field(i).is_nullable();
if schema.field(i).data_type() != &data_type {
return Err(DataFusionError::Plan(format!(
"Inconsistent data type across values list at column {}",
i
)));
}
}
Ok(DFField::new(
None,
&format!("column{}", i + 1),
data_type,
nullable,
))
})
.collect::<Result<Vec<_>>>()?;
schema = DFSchema::new(fields)?;
}
let schema = DFSchemaRef::new(schema);
Ok(Self::from(LogicalPlan::Values { schema, values }))
}

/// Scan a memory data source
pub fn scan_memory(
partitions: Vec<Vec<RecordBatch>>,
Expand Down
17 changes: 17 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
/// Values expression
Values {
/// The table schema
schema: DFSchemaRef,
/// Values
values: Vec<Vec<Expr>>,
},
/// Produces a relation with string representations of
/// various parts of the plan
Explain {
Expand Down Expand Up @@ -237,6 +244,7 @@ impl LogicalPlan {
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::EmptyRelation { schema, .. } => schema,
LogicalPlan::Values { schema, .. } => schema,
LogicalPlan::TableScan {
projected_schema, ..
} => projected_schema,
Expand All @@ -263,6 +271,7 @@ impl LogicalPlan {
LogicalPlan::TableScan {
projected_schema, ..
} => vec![projected_schema],
LogicalPlan::Values { schema, .. } => vec![schema],
LogicalPlan::Window { input, schema, .. }
| LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
Expand Down Expand Up @@ -315,6 +324,9 @@ impl LogicalPlan {
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection { expr, .. } => expr.clone(),
LogicalPlan::Values { values, .. } => {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
LogicalPlan::Repartition {
partitioning_scheme,
Expand Down Expand Up @@ -369,6 +381,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => vec![],
}
}
Expand Down Expand Up @@ -515,6 +528,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => true,
};
if !recurse {
Expand Down Expand Up @@ -702,6 +716,9 @@ impl LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"),
LogicalPlan::Values { ref values, .. } => {
write!(f, "Values: {} rows", values.len())
}
LogicalPlan::TableScan {
ref table_name,
ref projection,
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Repartition { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn optimize_plan(
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ pub fn from_plan(
schema: schema.clone(),
alias: alias.clone(),
}),
LogicalPlan::Values { schema, values } => Ok(LogicalPlan::Values {
schema: schema.clone(),
values: values.to_vec(),
}),
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,5 +645,6 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
pub mod values;
pub mod window_functions;
pub mod windows;
13 changes: 12 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::analyze::AnalyzeExec;
use super::{
aggregates, empty::EmptyExec, expressions::binary, functions,
hash_join::PartitionMode, udaf, union::UnionExec, windows,
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
Expand Down Expand Up @@ -323,6 +323,17 @@ impl DefaultPhysicalPlanner {
let filters = unnormalize_cols(filters.iter().cloned());
source.scan(projection, batch_size, &filters, *limit).await
}
LogicalPlan::Values {
values,
schema,
} => {
let exprs = vec![];
let value_exec = ValuesExec::try_new(
SchemaRef::new(schema.as_ref().to_owned().into()),
&exprs
)?;
Ok(Arc::new(value_exec))
}
LogicalPlan::Window {
input, window_expr, ..
} => {
Expand Down
Loading

0 comments on commit bb74f4d

Please sign in to comment.