From bb74f4db3419bb43f063ff210f74ee8d96fe2fff Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 22 Oct 2021 21:41:11 +0800 Subject: [PATCH] add values list expression --- ballista/rust/core/proto/ballista.proto | 18 +- .../core/src/serde/logical_plan/from_proto.rs | 25 +++ .../core/src/serde/logical_plan/to_proto.rs | 20 +++ datafusion/src/logical_plan/builder.rs | 43 ++++- datafusion/src/logical_plan/plan.rs | 17 ++ .../src/optimizer/common_subexpr_eliminate.rs | 1 + datafusion/src/optimizer/constant_folding.rs | 1 + .../src/optimizer/projection_push_down.rs | 1 + datafusion/src/optimizer/utils.rs | 4 + datafusion/src/physical_plan/mod.rs | 1 + datafusion/src/physical_plan/planner.rs | 13 +- datafusion/src/physical_plan/values.rs | 159 ++++++++++++++++++ datafusion/src/sql/planner.rs | 29 +++- 13 files changed, 324 insertions(+), 8 deletions(-) create mode 100644 datafusion/src/physical_plan/values.rs diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 338c5a66642fc..95b78fcc6d24e 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -255,6 +255,7 @@ message LogicalPlanNode { WindowNode window = 13; AnalyzeNode analyze = 14; CrossJoinNode cross_join = 15; + ValuesNode values = 16; } } @@ -316,12 +317,12 @@ message SelectionNode { LogicalExprNode expr = 2; } -message SortNode{ +message SortNode { LogicalPlanNode input = 1; repeated LogicalExprNode expr = 2; } -message RepartitionNode{ +message RepartitionNode { LogicalPlanNode input = 1; oneof partition_method { uint64 round_robin = 2; @@ -334,11 +335,11 @@ message HashRepartition { uint64 partition_count = 2; } -message EmptyRelationNode{ +message EmptyRelationNode { bool produce_one_row = 1; } -message CreateExternalTableNode{ +message CreateExternalTableNode { string name = 1; string location = 2; FileType file_type = 3; @@ -346,7 +347,14 @@ message CreateExternalTableNode{ DfSchema schema = 5; } -enum FileType{ +// a node containing data for defining values list. unlike in SQL where it's two dimensional, here +// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows +message ValuesNode { + uint64 n_cols = 1; + repeated LogicalExprNode values_list = 2; +} + +enum FileType { NdJson = 0; Parquet = 1; CSV = 2; diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 07eced7840043..7c89c5c052583 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -60,6 +60,31 @@ impl TryInto for &protobuf::LogicalPlanNode { )) })?; match plan { + LogicalPlanType::Values(values) => { + let n_cols = values.n_cols as usize; + let values: Vec> = if values.values_list.is_empty() { + Ok(Vec::new()) + } else if values.values_list.len() % n_cols != 0 { + Err(BallistaError::General(format!( + "Invalid values list length, expect {} to be divisible by {}", + values.values_list.len(), + n_cols + ))) + } else { + values + .values_list + .chunks_exact(n_cols) + .map(|r| { + r.into_iter() + .map(|v| v.try_into()) + .collect::, _>>() + }) + .collect::, _>>() + }?; + LogicalPlanBuilder::values(values)? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Projection(projection) => { let input: LogicalPlan = convert_box_required!(projection.input)?; let x: Vec = projection diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index e79e6549620ec..ae25d72d57f95 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -675,6 +675,26 @@ impl TryInto for &LogicalPlan { fn try_into(self) -> Result { use protobuf::logical_plan_node::LogicalPlanType; match self { + LogicalPlan::Values { values, .. } => { + let n_cols = if values.is_empty() { + 0 + } else { + values[0].len() + } as u64; + let values_list = values + .iter() + .flatten() + .map(|v| v.try_into()) + .collect::, _>>()?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Values( + protobuf::ValuesNode { + n_cols, + values_list, + }, + )), + }) + } LogicalPlan::TableScan { table_name, source, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 3a1d127356588..660fa60ded05a 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -23,7 +23,7 @@ use std::{ }; use arrow::{ - datatypes::{Schema, SchemaRef}, + datatypes::{DataType, Schema, SchemaRef}, record_batch::RecordBatch, }; @@ -111,6 +111,47 @@ impl LogicalPlanBuilder { }) } + /// Create a values list based relation, and the schema is inferred from data + pub fn values(values: Vec>) -> Result { + let mut schema = DFSchema::empty(); + for row in &values { + let first_row = schema.fields().is_empty(); + if !first_row && row.len() != schema.fields().len() { + return Err(DataFusionError::Plan(format!( + "Inconsistent data length across values list: {} != {}", + row.len(), + schema.fields().len() + ))); + } + let fields = row + .iter() + .enumerate() + .map(|(i, expr)| { + let mut nullable = expr.nullable(&schema)?; + let data_type = expr.get_type(&schema)?; + if !first_row { + nullable |= schema.field(i).is_nullable(); + if schema.field(i).data_type() != &data_type { + return Err(DataFusionError::Plan(format!( + "Inconsistent data type across values list at column {}", + i + ))); + } + } + Ok(DFField::new( + None, + &format!("column{}", i + 1), + data_type, + nullable, + )) + }) + .collect::>>()?; + schema = DFSchema::new(fields)?; + } + let schema = DFSchemaRef::new(schema); + Ok(Self::from(LogicalPlan::Values { schema, values })) + } + /// Scan a memory data source pub fn scan_memory( partitions: Vec>, diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 7552fc6cacebe..69813dbd84051 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -203,6 +203,13 @@ pub enum LogicalPlan { /// Whether the CSV file contains a header has_header: bool, }, + /// Values expression + Values { + /// The table schema + schema: DFSchemaRef, + /// Values + values: Vec>, + }, /// Produces a relation with string representations of /// various parts of the plan Explain { @@ -237,6 +244,7 @@ impl LogicalPlan { pub fn schema(&self) -> &DFSchemaRef { match self { LogicalPlan::EmptyRelation { schema, .. } => schema, + LogicalPlan::Values { schema, .. } => schema, LogicalPlan::TableScan { projected_schema, .. } => projected_schema, @@ -263,6 +271,7 @@ impl LogicalPlan { LogicalPlan::TableScan { projected_schema, .. } => vec![projected_schema], + LogicalPlan::Values { schema, .. } => vec![schema], LogicalPlan::Window { input, schema, .. } | LogicalPlan::Aggregate { input, schema, .. } | LogicalPlan::Projection { input, schema, .. } => { @@ -315,6 +324,9 @@ impl LogicalPlan { pub fn expressions(self: &LogicalPlan) -> Vec { match self { LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Values { values, .. } => { + values.iter().flatten().cloned().collect() + } LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], LogicalPlan::Repartition { partitioning_scheme, @@ -369,6 +381,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable { .. } => vec![], } } @@ -515,6 +528,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable { .. } => true, }; if !recurse { @@ -702,6 +716,9 @@ impl LogicalPlan { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &*self.0 { LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"), + LogicalPlan::Values { ref values, .. } => { + write!(f, "Values: {} rows", values.len()) + } LogicalPlan::TableScan { ref table_name, ref projection, diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 7192471d9a93e..8d87b22183042 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result Ok(LogicalPlan::Values { + schema: schema.clone(), + values: values.to_vec(), + }), LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 3accaadce6073..ef53d8602b405 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -645,5 +645,6 @@ pub mod udf; #[cfg(feature = "unicode_expressions")] pub mod unicode_expressions; pub mod union; +pub mod values; pub mod window_functions; pub mod windows; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index be8c588bfda54..6039072a5dd36 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -20,7 +20,7 @@ use super::analyze::AnalyzeExec; use super::{ aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, windows, + hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ @@ -323,6 +323,17 @@ impl DefaultPhysicalPlanner { let filters = unnormalize_cols(filters.iter().cloned()); source.scan(projection, batch_size, &filters, *limit).await } + LogicalPlan::Values { + values, + schema, + } => { + let exprs = vec![]; + let value_exec = ValuesExec::try_new( + SchemaRef::new(schema.as_ref().to_owned().into()), + &exprs + )?; + Ok(Arc::new(value_exec)) + } LogicalPlan::Window { input, window_expr, .. } => { diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs new file mode 100644 index 0000000000000..9451ef884a3fd --- /dev/null +++ b/datafusion/src/physical_plan/values.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Values execution plan + +use super::{common, SendableRecordBatchStream, Statistics}; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PhysicalExpr, +}; +use arrow::array::NullArray; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use std::any::Any; +use std::sync::Arc; + +/// Execution plan for values list based relation (produces constant rows) +#[derive(Debug)] +pub struct ValuesExec { + /// The schema + schema: SchemaRef, + /// The data + data: Vec, +} + +impl ValuesExec { + /// create a new values exec + pub fn new(schema: SchemaRef, data: Vec) -> Self { + Self { schema, data } + } + + /// create a new values exec from data as expr + pub fn try_new( + schema: SchemaRef, + data: &[Vec>], + ) -> Result { + let data: Vec = if data.len() == 0 { + vec![] + } else { + println!("{:#?}", data.clone()); + let arr = schema + .fields() + .iter() + .map(|field| unimplemented!()) + .collect::>(); + let batch = RecordBatch::try_new(schema.clone(), arr)?; + vec![batch] + }; + Ok(Self::new(schema, data)) + } + + /// provides the data + fn data(&self) -> Vec { + self.data.clone() + } +} + +#[async_trait] +impl ExecutionPlan for ValuesExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn required_child_distribution(&self) -> Distribution { + Distribution::UnspecifiedDistribution + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 0 => Ok(Arc::new(ValuesExec::new( + self.schema.clone(), + self.data.clone(), + ))), + _ => Err(DataFusionError::Internal( + "ValuesExec wrong number of children".to_string(), + )), + } + } + + async fn execute(&self, partition: usize) -> Result { + // GlobalLimitExec has a single output partition + if 0 != partition { + return Err(DataFusionError::Internal(format!( + "ValuesExec invalid partition {} (expected 0)", + partition + ))); + } + + Ok(Box::pin(MemoryStream::try_new( + self.data(), + self.schema.clone(), + None, + )?)) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ValuesExec") + } + } + } + + fn statistics(&self) -> Statistics { + let batch = self.data(); + common::compute_record_batch_statistics(&[batch], &self.schema, None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::common; + use crate::test; + + #[tokio::test] + async fn values_empty_case() -> Result<()> { + let schema = test::aggr_test_schema(); + let empty = ValuesExec::new(schema.clone()); + Ok(()) + } +} diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 5c1b50107e25b..84501a9fc586a 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -49,7 +49,7 @@ use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins, - TrimWhereField, UnaryOperator, Value, + TrimWhereField, UnaryOperator, Value, Values as SQLValues, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{OrderByExpr, Statement}; @@ -160,6 +160,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { match set_expr { SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes, alias), + SetExpr::Values(v) => self.sql_values_to_plan(v), SetExpr::SetOperation { op, left, @@ -1068,6 +1069,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + fn sql_values_to_plan(&self, values: &SQLValues) -> Result { + let values = values + .0 + .iter() + .map(|row| { + row.iter() + .map(|v| match v { + SQLExpr::Value(Value::Number(n, _)) => match n.parse::() { + Ok(n) => Ok(lit(n)), + Err(_) => Ok(lit(n.parse::().unwrap())), + }, + SQLExpr::Value(Value::SingleQuotedString(ref s)) => { + Ok(lit(s.clone())) + } + SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)), + other => Err(DataFusionError::NotImplemented(format!( + "Unsupported value {:?} in a values list expression", + other + ))), + }) + .collect::>>() + }) + .collect::>>()?; + LogicalPlanBuilder::values(values)?.build() + } + fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> Result { match sql { SQLExpr::Value(Value::Number(n, _)) => match n.parse::() {