diff --git a/README.md b/README.md index e1f96f024523..1bcf79c6ecbc 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,7 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - [x] Common math functions - [x] cast - [x] try_cast +- [x] [`VALUES` lists](https://www.postgresql.org/docs/current/queries-values.html) - Postgres compatible String functions - [x] ascii - [x] bit_length @@ -191,7 +192,7 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - Miscellaneous/Boolean functions - [x] nullif - Approximation functions - - [ ] approx_distinct + - [x] approx_distinct - Common date/time functions - [ ] Basic date functions - [ ] Basic time functions diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs index 6658f672834f..de15d40c68f9 100644 --- a/datafusion/src/physical_plan/values.rs +++ b/datafusion/src/physical_plan/values.rs @@ -24,6 +24,7 @@ use crate::physical_plan::{ Partitioning, PhysicalExpr, }; use crate::scalar::ScalarValue; +use arrow::array::new_null_array; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -48,10 +49,17 @@ impl ValuesExec { if data.is_empty() { return Err(DataFusionError::Plan("Values list cannot be empty".into())); } - // we have this empty batch as a placeholder to satisfy evaluation argument - let batch = RecordBatch::new_empty(schema.clone()); let n_row = data.len(); let n_col = schema.fields().len(); + // we have this single row, null, typed batch as a placeholder to satisfy evaluation argument + let batch = RecordBatch::try_new( + schema.clone(), + schema + .fields() + .iter() + .map(|field| new_null_array(field.data_type(), 1)) + .collect::>(), + )?; let arr = (0..n_col) .map(|j| { (0..n_row) @@ -59,9 +67,15 @@ impl ValuesExec { let r = data[i][j].evaluate(&batch); match r { Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), - Ok(ColumnarValue::Array(_)) => Err(DataFusionError::Plan( - "Cannot have array values in a values list".into(), - )), + Ok(ColumnarValue::Array(a)) if a.len() == 1 => { + ScalarValue::try_from_array(&a, 0) + } + Ok(ColumnarValue::Array(a)) => { + Err(DataFusionError::Plan(format!( + "Cannot have array values {:?} in a values list", + a + ))) + } Err(err) => Err(err), } }) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index a16c40ac61da..73fb6817de54 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1069,17 +1069,92 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + fn parse_sql_binary_op( + &self, + left: &SQLExpr, + op: &BinaryOperator, + right: &SQLExpr, + schema: &DFSchema, + ) -> Result { + let operator = match *op { + BinaryOperator::Gt => Ok(Operator::Gt), + BinaryOperator::GtEq => Ok(Operator::GtEq), + BinaryOperator::Lt => Ok(Operator::Lt), + BinaryOperator::LtEq => Ok(Operator::LtEq), + BinaryOperator::Eq => Ok(Operator::Eq), + BinaryOperator::NotEq => Ok(Operator::NotEq), + BinaryOperator::Plus => Ok(Operator::Plus), + BinaryOperator::Minus => Ok(Operator::Minus), + BinaryOperator::Multiply => Ok(Operator::Multiply), + BinaryOperator::Divide => Ok(Operator::Divide), + BinaryOperator::Modulo => Ok(Operator::Modulo), + BinaryOperator::And => Ok(Operator::And), + BinaryOperator::Or => Ok(Operator::Or), + BinaryOperator::Like => Ok(Operator::Like), + BinaryOperator::NotLike => Ok(Operator::NotLike), + BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch), + BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch), + BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch), + BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch), + _ => Err(DataFusionError::NotImplemented(format!( + "Unsupported SQL binary operator {:?}", + op + ))), + }?; + + Ok(Expr::BinaryExpr { + left: Box::new(self.sql_expr_to_logical_expr(left, schema)?), + op: operator, + right: Box::new(self.sql_expr_to_logical_expr(right, schema)?), + }) + } + + fn parse_sql_unary_op( + &self, + op: &UnaryOperator, + expr: &SQLExpr, + schema: &DFSchema, + ) -> Result { + match op { + UnaryOperator::Not => Ok(Expr::Not(Box::new( + self.sql_expr_to_logical_expr(expr, schema)?, + ))), + UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, schema)?), + UnaryOperator::Minus => { + match expr { + // optimization: if it's a number literal, we apply the negative operator + // here directly to calculate the new literal. + SQLExpr::Value(Value::Number(n,_)) => match n.parse::() { + Ok(n) => Ok(lit(-n)), + Err(_) => Ok(lit(-n + .parse::() + .map_err(|_e| { + DataFusionError::Internal(format!( + "negative operator can be only applied to integer and float operands, got: {}", + n)) + })?)), + }, + // not a literal, apply negative operator on expression + _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))), + } + } + _ => Err(DataFusionError::NotImplemented(format!( + "Unsupported SQL unary operator {:?}", + op + ))), + } + } + fn sql_values_to_plan(&self, values: &SQLValues) -> Result { + // values should not be based on any other schema + let schema = DFSchema::empty(); let values = values .0 .iter() .map(|row| { row.iter() .map(|v| match v { - SQLExpr::Value(Value::Number(n, _)) => match n.parse::() { - Ok(n) => Ok(lit(n)), - Err(_) => Ok(lit(n.parse::().unwrap())), - }, + SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n), SQLExpr::Value(Value::SingleQuotedString(ref s)) => { Ok(lit(s.clone())) } @@ -1087,6 +1162,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok(Expr::Literal(ScalarValue::Utf8(None))) } SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)), + SQLExpr::UnaryOp { ref op, ref expr } => { + self.parse_sql_unary_op(op, expr, &schema) + } + SQLExpr::BinaryOp { + ref left, + ref op, + ref right, + } => self.parse_sql_binary_op(left, op, right, &schema), other => Err(DataFusionError::NotImplemented(format!( "Unsupported value {:?} in a values list expression", other @@ -1100,14 +1183,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> Result { match sql { - SQLExpr::Value(Value::Number(n, _)) => match n.parse::() { - Ok(n) => Ok(lit(n)), - Err(_) => Ok(lit(n.parse::().unwrap())), - }, + SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n), SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())), - SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)), - SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))), SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction { fun: functions::BuiltinScalarFunction::DatePart, @@ -1244,34 +1322,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { right: Box::new(self.sql_expr_to_logical_expr(right, schema)?), }), - SQLExpr::UnaryOp { ref op, ref expr } => match op { - UnaryOperator::Not => Ok(Expr::Not(Box::new( - self.sql_expr_to_logical_expr(expr, schema)?, - ))), - UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, schema)?), - UnaryOperator::Minus => { - match expr.as_ref() { - // optimization: if it's a number literal, we apply the negative operator - // here directly to calculate the new literal. - SQLExpr::Value(Value::Number(n,_)) => match n.parse::() { - Ok(n) => Ok(lit(-n)), - Err(_) => Ok(lit(-n - .parse::() - .map_err(|_e| { - DataFusionError::Internal(format!( - "negative operator can be only applied to integer and float operands, got: {}", - n)) - })?)), - }, - // not a literal, apply negative operator on expression - _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))), - } - } - _ => Err(DataFusionError::NotImplemented(format!( - "Unsupported SQL unary operator {:?}", - op - ))), - }, + SQLExpr::UnaryOp { ref op, ref expr } => { + self.parse_sql_unary_op(op, expr, schema) + } SQLExpr::Between { ref expr, @@ -1306,39 +1359,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ref left, ref op, ref right, - } => { - let operator = match *op { - BinaryOperator::Gt => Ok(Operator::Gt), - BinaryOperator::GtEq => Ok(Operator::GtEq), - BinaryOperator::Lt => Ok(Operator::Lt), - BinaryOperator::LtEq => Ok(Operator::LtEq), - BinaryOperator::Eq => Ok(Operator::Eq), - BinaryOperator::NotEq => Ok(Operator::NotEq), - BinaryOperator::Plus => Ok(Operator::Plus), - BinaryOperator::Minus => Ok(Operator::Minus), - BinaryOperator::Multiply => Ok(Operator::Multiply), - BinaryOperator::Divide => Ok(Operator::Divide), - BinaryOperator::Modulo => Ok(Operator::Modulo), - BinaryOperator::And => Ok(Operator::And), - BinaryOperator::Or => Ok(Operator::Or), - BinaryOperator::Like => Ok(Operator::Like), - BinaryOperator::NotLike => Ok(Operator::NotLike), - BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch), - BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch), - BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch), - BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch), - _ => Err(DataFusionError::NotImplemented(format!( - "Unsupported SQL binary operator {:?}", - op - ))), - }?; - - Ok(Expr::BinaryExpr { - left: Box::new(self.sql_expr_to_logical_expr(left, schema)?), - op: operator, - right: Box::new(self.sql_expr_to_logical_expr(right, schema)?), - }) - } + } => self.parse_sql_binary_op(left, op, right, schema), SQLExpr::Trim { expr, trim_where } => { let (fun, where_expr) = match trim_where { @@ -3630,3 +3651,10 @@ mod tests { quick_test(sql, expected); } } + +fn parse_sql_number(n: &str) -> Result { + match n.parse::() { + Ok(n) => Ok(lit(n)), + Err(_) => Ok(lit(n.parse::().unwrap())), + } +} diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 4d299ec09214..34b6b6c86ace 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -491,6 +491,30 @@ async fn select_values_list() -> Result<()> { ]; assert_batches_eq!(expected, &actual); } + { + let sql = "VALUES (-1)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+", + "| column1 |", + "+---------+", + "| -1 |", + "+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + { + let sql = "VALUES (2+1,2-1,2>1)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------+---------+---------+", + "| column1 | column2 | column3 |", + "+---------+---------+---------+", + "| 3 | 1 | true |", + "+---------+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } { let sql = "VALUES"; let plan = ctx.create_logical_plan(sql); @@ -647,6 +671,20 @@ async fn select_values_list() -> Result<()> { ]; assert_batches_eq!(expected, &actual); } + { + let sql = "EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5)"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------+-----------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+-----------------------------------------------------------------------------------------------------------+", + "| logical_plan | Values: (Int64(1), Utf8(\"a\"), Int64(-1), Float64(1.1)), (Int64(NULL), Utf8(\"b\"), Int64(-3), Float64(0.5)) |", + "| physical_plan | ValuesExec |", + "| | |", + "+---------------+-----------------------------------------------------------------------------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } Ok(()) } diff --git a/integration-tests/sqls/values_list.sql b/integration-tests/sqls/values_list.sql new file mode 100644 index 000000000000..b94e59a4ce2b --- /dev/null +++ b/integration-tests/sqls/values_list.sql @@ -0,0 +1,19 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at + +-- http://www.apache.org/licenses/LICENSE-2.0 + +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SELECT * FROM +(VALUES (1,2.0,-3,1+1),(10,20.0,-30,2+2)) +AS tbl(int_col, float_col, negative_col, summation); diff --git a/integration-tests/test_psql_parity.py b/integration-tests/test_psql_parity.py index a85a2c2f4b37..e9776e04f7c7 100644 --- a/integration-tests/test_psql_parity.py +++ b/integration-tests/test_psql_parity.py @@ -77,7 +77,7 @@ def generate_csv_from_psql(fname: str): class TestPsqlParity: def test_tests_count(self): - assert len(test_files) == 15, "tests are missed" + assert len(test_files) == 16, "tests are missed" @pytest.mark.parametrize("fname", test_files) def test_sql_file(self, fname):