diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 338c5a66642f..95b78fcc6d24 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -255,6 +255,7 @@ message LogicalPlanNode { WindowNode window = 13; AnalyzeNode analyze = 14; CrossJoinNode cross_join = 15; + ValuesNode values = 16; } } @@ -316,12 +317,12 @@ message SelectionNode { LogicalExprNode expr = 2; } -message SortNode{ +message SortNode { LogicalPlanNode input = 1; repeated LogicalExprNode expr = 2; } -message RepartitionNode{ +message RepartitionNode { LogicalPlanNode input = 1; oneof partition_method { uint64 round_robin = 2; @@ -334,11 +335,11 @@ message HashRepartition { uint64 partition_count = 2; } -message EmptyRelationNode{ +message EmptyRelationNode { bool produce_one_row = 1; } -message CreateExternalTableNode{ +message CreateExternalTableNode { string name = 1; string location = 2; FileType file_type = 3; @@ -346,7 +347,14 @@ message CreateExternalTableNode{ DfSchema schema = 5; } -enum FileType{ +// a node containing data for defining values list. unlike in SQL where it's two dimensional, here +// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows +message ValuesNode { + uint64 n_cols = 1; + repeated LogicalExprNode values_list = 2; +} + +enum FileType { NdJson = 0; Parquet = 1; CSV = 2; diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 07eced784004..26231c5e25c7 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -60,6 +60,31 @@ impl TryInto for &protobuf::LogicalPlanNode { )) })?; match plan { + LogicalPlanType::Values(values) => { + let n_cols = values.n_cols as usize; + let values: Vec> = if values.values_list.is_empty() { + Ok(Vec::new()) + } else if values.values_list.len() % n_cols != 0 { + Err(BallistaError::General(format!( + "Invalid values list length, expect {} to be divisible by {}", + values.values_list.len(), + n_cols + ))) + } else { + values + .values_list + .chunks_exact(n_cols) + .map(|r| { + r.iter() + .map(|v| v.try_into()) + .collect::, _>>() + }) + .collect::, _>>() + }?; + LogicalPlanBuilder::values(values)? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Projection(projection) => { let input: LogicalPlan = convert_box_required!(projection.input)?; let x: Vec = projection diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index e79e6549620e..ae25d72d57f9 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -675,6 +675,26 @@ impl TryInto for &LogicalPlan { fn try_into(self) -> Result { use protobuf::logical_plan_node::LogicalPlanType; match self { + LogicalPlan::Values { values, .. } => { + let n_cols = if values.is_empty() { + 0 + } else { + values[0].len() + } as u64; + let values_list = values + .iter() + .flatten() + .map(|v| v.try_into()) + .collect::, _>>()?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Values( + protobuf::ValuesNode { + n_cols, + values_list, + }, + )), + }) + } LogicalPlan::TableScan { table_name, source, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 3a1d12735658..3c6c444bf76f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -17,16 +17,6 @@ //! This module provides a builder for creating LogicalPlans -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use arrow::{ - datatypes::{Schema, SchemaRef}, - record_batch::RecordBatch, -}; - use crate::datasource::{ empty::EmptyTable, file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, @@ -37,6 +27,16 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::ToStringifiedPlan; use crate::prelude::*; +use crate::scalar::ScalarValue; +use arrow::{ + datatypes::{DataType, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use std::convert::TryFrom; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; @@ -111,6 +111,80 @@ impl LogicalPlanBuilder { }) } + /// Create a values list based relation, and the schema is inferred from data, consuming + /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) + /// documentation for more details. + /// + /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table. + /// The column names are not specified by the SQL standard and different database systems do it differently, + /// so it's usually better to override the default names with a table alias list. + pub fn values(mut values: Vec>) -> Result { + if values.is_empty() { + return Err(DataFusionError::Plan("Values list cannot be empty".into())); + } + let n_cols = values[0].len(); + if n_cols == 0 { + return Err(DataFusionError::Plan( + "Values list cannot be zero length".into(), + )); + } + let empty_schema = DFSchema::empty(); + let mut field_types: Vec> = Vec::with_capacity(n_cols); + for _ in 0..n_cols { + field_types.push(None); + } + // hold all the null holes so that we can correct their data types later + let mut nulls: Vec<(usize, usize)> = Vec::new(); + for (i, row) in values.iter().enumerate() { + if row.len() != n_cols { + return Err(DataFusionError::Plan(format!( + "Inconsistent data length across values list: got {} values in row {} but expected {}", + row.len(), + i, + n_cols + ))); + } + field_types = row + .iter() + .enumerate() + .map(|(j, expr)| { + if let Expr::Literal(ScalarValue::Utf8(None)) = expr { + nulls.push((i, j)); + Ok(field_types[j].clone()) + } else { + let data_type = expr.get_type(&empty_schema)?; + if let Some(prev_data_type) = &field_types[j] { + if prev_data_type != &data_type { + let err = format!("Inconsistent data type across values list at row {} column {}", i, j); + return Err(DataFusionError::Plan(err)); + } + } + Ok(Some(data_type)) + } + }) + .collect::>>>()?; + } + let fields = field_types + .iter() + .enumerate() + .map(|(j, data_type)| { + // naming is following convention https://www.postgresql.org/docs/current/queries-values.html + let name = &format!("column{}", j + 1); + DFField::new( + None, + name, + data_type.clone().unwrap_or(DataType::Utf8), + true, + ) + }) + .collect::>(); + for (i, j) in nulls { + values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?); + } + let schema = DFSchemaRef::new(DFSchema::new(fields)?); + Ok(Self::from(LogicalPlan::Values { schema, values })) + } + /// Scan a memory data source pub fn scan_memory( partitions: Vec>, diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 7552fc6caceb..13921d5934b3 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -203,6 +203,15 @@ pub enum LogicalPlan { /// Whether the CSV file contains a header has_header: bool, }, + /// Values expression. See + /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) + /// documentation for more details. + Values { + /// The table schema + schema: DFSchemaRef, + /// Values + values: Vec>, + }, /// Produces a relation with string representations of /// various parts of the plan Explain { @@ -237,6 +246,7 @@ impl LogicalPlan { pub fn schema(&self) -> &DFSchemaRef { match self { LogicalPlan::EmptyRelation { schema, .. } => schema, + LogicalPlan::Values { schema, .. } => schema, LogicalPlan::TableScan { projected_schema, .. } => projected_schema, @@ -263,6 +273,7 @@ impl LogicalPlan { LogicalPlan::TableScan { projected_schema, .. } => vec![projected_schema], + LogicalPlan::Values { schema, .. } => vec![schema], LogicalPlan::Window { input, schema, .. } | LogicalPlan::Aggregate { input, schema, .. } | LogicalPlan::Projection { input, schema, .. } => { @@ -315,6 +326,9 @@ impl LogicalPlan { pub fn expressions(self: &LogicalPlan) -> Vec { match self { LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Values { values, .. } => { + values.iter().flatten().cloned().collect() + } LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], LogicalPlan::Repartition { partitioning_scheme, @@ -369,6 +383,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable { .. } => vec![], } } @@ -515,6 +530,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable { .. } => true, }; if !recurse { @@ -702,6 +718,25 @@ impl LogicalPlan { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &*self.0 { LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"), + LogicalPlan::Values { ref values, .. } => { + let str_values: Vec<_> = values + .iter() + // limit to only 5 values to avoid horrible display + .take(5) + .map(|row| { + let item = row + .iter() + .map(|expr| expr.to_string()) + .collect::>() + .join(", "); + format!("({})", item) + }) + .collect(); + + let elipse = if values.len() > 5 { "..." } else { "" }; + write!(f, "Values: {}{}", str_values.join(", "), elipse) + } + LogicalPlan::TableScan { ref table_name, ref projection, diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 7192471d9a93..8d87b2218304 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result Ok(LogicalPlan::Values { + schema: schema.clone(), + values: expr + .chunks_exact(schema.fields().len()) + .map(|s| s.to_vec()) + .collect::>(), + }), LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 3accaadce607..ef53d8602b40 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -645,5 +645,6 @@ pub mod udf; #[cfg(feature = "unicode_expressions")] pub mod unicode_expressions; pub mod union; +pub mod values; pub mod window_functions; pub mod windows; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index be8c588bfda5..8cfb907350b5 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -20,7 +20,7 @@ use super::analyze::AnalyzeExec; use super::{ aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, windows, + hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ @@ -323,6 +323,30 @@ impl DefaultPhysicalPlanner { let filters = unnormalize_cols(filters.iter().cloned()); source.scan(projection, batch_size, &filters, *limit).await } + LogicalPlan::Values { + values, + schema, + } => { + let exec_schema = schema.as_ref().to_owned().into(); + let exprs = values.iter() + .map(|row| { + row.iter().map(|expr|{ + self.create_physical_expr( + expr, + schema, + &exec_schema, + ctx_state, + ) + }) + .collect::>>>() + }) + .collect::>>()?; + let value_exec = ValuesExec::try_new( + SchemaRef::new(exec_schema), + exprs + )?; + Ok(Arc::new(value_exec)) + } LogicalPlan::Window { input, window_expr, .. } => { diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs new file mode 100644 index 000000000000..6658f672834f --- /dev/null +++ b/datafusion/src/physical_plan/values.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Values execution plan + +use super::{common, SendableRecordBatchStream, Statistics}; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan, + Partitioning, PhysicalExpr, +}; +use crate::scalar::ScalarValue; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use std::any::Any; +use std::sync::Arc; + +/// Execution plan for values list based relation (produces constant rows) +#[derive(Debug)] +pub struct ValuesExec { + /// The schema + schema: SchemaRef, + /// The data + data: Vec, +} + +impl ValuesExec { + /// create a new values exec from data as expr + pub fn try_new( + schema: SchemaRef, + data: Vec>>, + ) -> Result { + if data.is_empty() { + return Err(DataFusionError::Plan("Values list cannot be empty".into())); + } + // we have this empty batch as a placeholder to satisfy evaluation argument + let batch = RecordBatch::new_empty(schema.clone()); + let n_row = data.len(); + let n_col = schema.fields().len(); + let arr = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let r = data[i][j].evaluate(&batch); + match r { + Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), + Ok(ColumnarValue::Array(_)) => Err(DataFusionError::Plan( + "Cannot have array values in a values list".into(), + )), + Err(err) => Err(err), + } + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + let batch = RecordBatch::try_new(schema.clone(), arr)?; + let data: Vec = vec![batch]; + Ok(Self { schema, data }) + } + + /// provides the data + fn data(&self) -> Vec { + self.data.clone() + } +} + +#[async_trait] +impl ExecutionPlan for ValuesExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn required_child_distribution(&self) -> Distribution { + Distribution::UnspecifiedDistribution + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 0 => Ok(Arc::new(ValuesExec { + schema: self.schema.clone(), + data: self.data.clone(), + })), + _ => Err(DataFusionError::Internal( + "ValuesExec wrong number of children".to_string(), + )), + } + } + + async fn execute(&self, partition: usize) -> Result { + // GlobalLimitExec has a single output partition + if 0 != partition { + return Err(DataFusionError::Internal(format!( + "ValuesExec invalid partition {} (expected 0)", + partition + ))); + } + + Ok(Box::pin(MemoryStream::try_new( + self.data(), + self.schema.clone(), + None, + )?)) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ValuesExec") + } + } + } + + fn statistics(&self) -> Statistics { + let batch = self.data(); + common::compute_record_batch_statistics(&[batch], &self.schema, None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test; + + #[tokio::test] + async fn values_empty_case() -> Result<()> { + let schema = test::aggr_test_schema(); + let empty = ValuesExec::try_new(schema, vec![]); + assert!(!empty.is_ok()); + Ok(()) + } +} diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 5c1b50107e25..a16c40ac61da 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -49,7 +49,7 @@ use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins, - TrimWhereField, UnaryOperator, Value, + TrimWhereField, UnaryOperator, Value, Values as SQLValues, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{OrderByExpr, Statement}; @@ -160,6 +160,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { match set_expr { SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes, alias), + SetExpr::Values(v) => self.sql_values_to_plan(v), SetExpr::SetOperation { op, left, @@ -1068,6 +1069,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + fn sql_values_to_plan(&self, values: &SQLValues) -> Result { + let values = values + .0 + .iter() + .map(|row| { + row.iter() + .map(|v| match v { + SQLExpr::Value(Value::Number(n, _)) => match n.parse::() { + Ok(n) => Ok(lit(n)), + Err(_) => Ok(lit(n.parse::().unwrap())), + }, + SQLExpr::Value(Value::SingleQuotedString(ref s)) => { + Ok(lit(s.clone())) + } + SQLExpr::Value(Value::Null) => { + Ok(Expr::Literal(ScalarValue::Utf8(None))) + } + SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)), + other => Err(DataFusionError::NotImplemented(format!( + "Unsupported value {:?} in a values list expression", + other + ))), + }) + .collect::>>() + }) + .collect::>>()?; + LogicalPlanBuilder::values(values)?.build() + } + fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> Result { match sql { SQLExpr::Value(Value::Number(n, _)) => match n.parse::() { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 67270c5cfb04..4d299ec09214 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -476,6 +476,180 @@ async fn csv_query_group_by_float32() -> Result<()> { Ok(()) } +#[tokio::test] +async fn select_values_list() -> Result<()> { + let mut ctx = ExecutionContext::new(); + { + let sql = "VALUES (1)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+", + "| column1 |", + "+---------+", + "| 1 |", + "+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES ()"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1),(2)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+", + "| column1 |", + "+---------+", + "| 1 |", + "| 2 |", + "+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (1),()"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1,'a'),(2,'b')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | a |", + "| 2 | b |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (1),(1,2)"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1),('2')"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1),(2.0)"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1,2), (1,'2')"; + let plan = ctx.create_logical_plan(sql); + assert!(plan.is_err()); + } + { + let sql = "VALUES (1,'a'),(NULL,'b'),(3,'c')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | a |", + "| | b |", + "| 3 | c |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (NULL,'a'),(NULL,'b'),(3,'c')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| | a |", + "| | b |", + "| 3 | c |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (NULL,'a'),(NULL,'b'),(NULL,'c')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| | a |", + "| | b |", + "| | c |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (1,'a'),(2,NULL),(3,'c')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | a |", + "| 2 | |", + "| 3 | c |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (1,NULL),(2,NULL),(3,'c')"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | |", + "| 2 | |", + "| 3 | c |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+", + "| column1 | column2 | column3 | column4 | column5 | column6 | column7 | column8 | column9 | column10 | column11 | column12 | column13 | column14 | column15 | column16 |", + "+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+", + "| 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | | F | 3.5 |", + "+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "SELECT * FROM (VALUES (1,'a'),(2,NULL)) AS t(c1, c2)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | a |", + "| 2 | |", + "+----+----+", + ]; + assert_batches_eq!(expected, &actual); + } + Ok(()) +} + #[tokio::test] async fn select_all() -> Result<()> { let mut ctx = ExecutionContext::new(); @@ -598,7 +772,7 @@ async fn select_distinct_simple_4() { async fn select_distinct_from() { let mut ctx = ExecutionContext::new(); - let sql = "select + let sql = "select 1 IS DISTINCT FROM CAST(NULL as INT) as a, 1 IS DISTINCT FROM 1 as b, 1 IS NOT DISTINCT FROM CAST(NULL as INT) as c, @@ -621,7 +795,7 @@ async fn select_distinct_from() { async fn select_distinct_from_utf8() { let mut ctx = ExecutionContext::new(); - let sql = "select + let sql = "select 'x' IS DISTINCT FROM NULL as a, 'x' IS DISTINCT FROM 'x' as b, 'x' IS NOT DISTINCT FROM NULL as c,