diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index f8d9ed238be4..5390fa9f2271 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {} impl TableFunctionImpl for ParquetMetadataFunc { fn call(&self, exprs: &[Expr]) -> Result> { - let filename = match exprs.get(0) { + let filename = match exprs.first() { Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") _ => { diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 057cdd475273..1296c74ea277 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -59,8 +59,9 @@ cargo run --example csv_sql - [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 - [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP - [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass +- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF) +- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) - [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) -- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined (scalar) Function (UDF) - [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) ## Distributed diff --git a/datafusion-examples/examples/advanced_udf.rs b/datafusion-examples/examples/advanced_udf.rs new file mode 100644 index 000000000000..6ebf88a0b671 --- /dev/null +++ b/datafusion-examples/examples/advanced_udf.rs @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::{ + arrow::{ + array::{ArrayRef, Float32Array, Float64Array}, + datatypes::DataType, + record_batch::RecordBatch, + }, + logical_expr::Volatility, +}; +use std::any::Any; + +use arrow::array::{new_null_array, Array, AsArray}; +use arrow::compute; +use arrow::datatypes::Float64Type; +use datafusion::error::Result; +use datafusion::prelude::*; +use datafusion_common::{internal_err, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature}; +use std::sync::Arc; + +/// This example shows how to use the full ScalarUDFImpl API to implement a user +/// defined function. As in the `simple_udf.rs` example, this struct implements +/// a function that takes two arguments and returns the first argument raised to +/// the power of the second argument `a^b`. +/// +/// To do so, we must implement the `ScalarUDFImpl` trait. +struct PowUdf { + signature: Signature, + aliases: Vec, +} + +impl PowUdf { + /// Create a new instance of the `PowUdf` struct + fn new() -> Self { + Self { + signature: Signature::exact( + // this function will always take two arguments of type f64 + vec![DataType::Float64, DataType::Float64], + // this function is deterministic and will always return the same + // result for the same input + Volatility::Immutable, + ), + // we will also add an alias of "my_pow" + aliases: vec!["my_pow".to_string()], + } + } +} + +impl ScalarUDFImpl for PowUdf { + /// We implement as_any so that we can downcast the ScalarUDFImpl trait object + fn as_any(&self) -> &dyn Any { + self + } + + /// Return the name of this function + fn name(&self) -> &str { + "pow" + } + + /// Return the "signature" of this function -- namely what types of arguments it will take + fn signature(&self) -> &Signature { + &self.signature + } + + /// What is the type of value that will be returned by this function? In + /// this case it will always be a constant value, but it could also be a + /// function of the input types. + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + /// This is the function that actually calculates the results. + /// + /// This is the same way that functions built into DataFusion are invoked, + /// which permits important special cases when one or both of the arguments + /// are single values (constants). For example `pow(a, 2)` + /// + /// However, it also means the implementation is more complex than when + /// using `create_udf`. + fn invoke(&self, args: &[ColumnarValue]) -> Result { + // DataFusion has arranged for the correct inputs to be passed to this + // function, but we check again to make sure + assert_eq!(args.len(), 2); + let (base, exp) = (&args[0], &args[1]); + assert_eq!(base.data_type(), DataType::Float64); + assert_eq!(exp.data_type(), DataType::Float64); + + match (base, exp) { + // For demonstration purposes we also implement the scalar / scalar + // case here, but it is not typically required for high performance. + // + // For performance it is most important to optimize cases where at + // least one argument is an array. If all arguments are constants, + // the DataFusion expression simplification logic will often invoke + // this path once during planning, and simply use the result during + // execution. + ( + ColumnarValue::Scalar(ScalarValue::Float64(base)), + ColumnarValue::Scalar(ScalarValue::Float64(exp)), + ) => { + // compute the output. Note DataFusion treats `None` as NULL. + let res = match (base, exp) { + (Some(base), Some(exp)) => Some(base.powf(*exp)), + // one or both arguments were NULL + _ => None, + }; + Ok(ColumnarValue::Scalar(ScalarValue::from(res))) + } + // special case if the exponent is a constant + ( + ColumnarValue::Array(base_array), + ColumnarValue::Scalar(ScalarValue::Float64(exp)), + ) => { + let result_array = match exp { + // a ^ null = null + None => new_null_array(base_array.data_type(), base_array.len()), + // a ^ exp + Some(exp) => { + // DataFusion has ensured both arguments are Float64: + let base_array = base_array.as_primitive::(); + // calculate the result for every row. The `unary` + // kernel creates very fast "vectorized" code and + // handles things like null values for us. + let res: Float64Array = + compute::unary(base_array, |base| base.powf(*exp)); + Arc::new(res) + } + }; + Ok(ColumnarValue::Array(result_array)) + } + + // special case if the base is a constant (note this code is quite + // similar to the previous case, so we omit comments) + ( + ColumnarValue::Scalar(ScalarValue::Float64(base)), + ColumnarValue::Array(exp_array), + ) => { + let res = match base { + None => new_null_array(exp_array.data_type(), exp_array.len()), + Some(base) => { + let exp_array = exp_array.as_primitive::(); + let res: Float64Array = + compute::unary(exp_array, |exp| base.powf(exp)); + Arc::new(res) + } + }; + Ok(ColumnarValue::Array(res)) + } + // Both arguments are arrays so we have to perform the calculation for every row + (ColumnarValue::Array(base_array), ColumnarValue::Array(exp_array)) => { + let res: Float64Array = compute::binary( + base_array.as_primitive::(), + exp_array.as_primitive::(), + |base, exp| base.powf(exp), + )?; + Ok(ColumnarValue::Array(Arc::new(res))) + } + // if the types were not float, it is a bug in DataFusion + _ => { + use datafusion_common::DataFusionError; + internal_err!("Invalid argument types to pow function") + } + } + } + + /// We will also add an alias of "my_pow" + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +/// In this example we register `PowUdf` as a user defined function +/// and invoke it via the DataFrame API and SQL +#[tokio::main] +async fn main() -> Result<()> { + let ctx = create_context()?; + + // create the UDF + let pow = ScalarUDF::from(PowUdf::new()); + + // register the UDF with the context so it can be invoked by name and from SQL + ctx.register_udf(pow.clone()); + + // get a DataFrame from the context for scanning the "t" table + let df = ctx.table("t").await?; + + // Call pow(a, 10) using the DataFrame API + let df = df.select(vec![pow.call(vec![col("a"), lit(10i32)])])?; + + // note that the second argument is passed as an i32, not f64. DataFusion + // automatically coerces the types to match the UDF's defined signature. + + // print the results + df.show().await?; + + // You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL + let sql_df = ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t").await?; + sql_df.show().await?; + + Ok(()) +} + +/// create local execution context with an in-memory table: +/// +/// ```text +/// +-----+-----+ +/// | a | b | +/// +-----+-----+ +/// | 2.1 | 1.0 | +/// | 3.1 | 2.0 | +/// | 4.1 | 3.0 | +/// | 5.1 | 4.0 | +/// +-----+-----+ +/// ``` +fn create_context() -> Result { + // define data. + let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; + + // declare a new context. In Spark API, this corresponds to a new SparkSession + let ctx = SessionContext::new(); + + // declare a table in memory. In Spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + Ok(ctx) +} diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf.rs index 591991786515..39e1e13ce39a 100644 --- a/datafusion-examples/examples/simple_udf.rs +++ b/datafusion-examples/examples/simple_udf.rs @@ -140,5 +140,11 @@ async fn main() -> Result<()> { // print the results df.show().await?; + // Given that `pow` is registered in the context, we can also use it in SQL: + let sql_df = ctx.sql("SELECT pow(a, b) FROM t").await?; + + // print the results + sql_df.show().await?; + Ok(()) } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 515acc6d1c47..e58faaa15096 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -558,7 +558,6 @@ macro_rules! arrow_err { // To avoid compiler error when using macro in the same crate: // macros from the current crate cannot be referred to by absolute paths -pub use exec_err as _exec_err; pub use internal_datafusion_err as _internal_datafusion_err; pub use internal_err as _internal_err; pub use not_impl_err as _not_impl_err; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 0cf34a70fcc7..7a0af3ff0809 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -19,21 +19,9 @@ use std::any::Any; use std::collections::HashSet; -use std::fmt; -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::sync::Arc; -use arrow_array::RecordBatch; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; - -use bytes::{Buf, Bytes}; -use datafusion_physical_plan::metrics::MetricsSet; -use futures::stream::BoxStream; -use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; -use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; - use super::write::orchestration::stateless_multipart_put; use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -47,11 +35,20 @@ use crate::physical_plan::insert::{DataSink, FileSinkExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; +use bytes::{Buf, Bytes}; +use futures::stream::BoxStream; +use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; +use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; /// Character Separated Value `FileFormat` implementation. #[derive(Debug)] @@ -400,8 +397,6 @@ impl Default for CsvSerializer { pub struct CsvSerializer { // CSV writer builder builder: WriterBuilder, - // Inner buffer for avoiding reallocation - buffer: Vec, // Flag to indicate whether there will be a header header: bool, } @@ -412,7 +407,6 @@ impl CsvSerializer { Self { builder: WriterBuilder::new(), header: true, - buffer: Vec::with_capacity(4096), } } @@ -431,21 +425,14 @@ impl CsvSerializer { #[async_trait] impl BatchSerializer for CsvSerializer { - async fn serialize(&mut self, batch: RecordBatch) -> Result { + async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result { + let mut buffer = Vec::with_capacity(4096); let builder = self.builder.clone(); - let mut writer = builder.with_header(self.header).build(&mut self.buffer); + let header = self.header && initial; + let mut writer = builder.with_header(header).build(&mut buffer); writer.write(&batch)?; drop(writer); - self.header = false; - Ok(Bytes::from(self.buffer.drain(..).collect::>())) - } - - fn duplicate(&mut self) -> Result> { - let new_self = CsvSerializer::new() - .with_builder(self.builder.clone()) - .with_header(self.header); - self.header = false; - Ok(Box::new(new_self)) + Ok(Bytes::from(buffer)) } } @@ -495,13 +482,11 @@ impl CsvSink { let builder_clone = builder.clone(); let options_clone = writer_options.clone(); let get_serializer = move || { - let inner_clone = builder_clone.clone(); - let serializer: Box = Box::new( + Arc::new( CsvSerializer::new() - .with_builder(inner_clone) + .with_builder(builder_clone.clone()) .with_header(options_clone.writer_options.header()), - ); - serializer + ) as _ }; stateless_multipart_put( @@ -548,15 +533,15 @@ mod tests { use crate::physical_plan::collect; use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; use crate::test_util::arrow_test_data; + use arrow::compute::concat_batches; - use bytes::Bytes; - use chrono::DateTime; use datafusion_common::cast::as_string_array; - use datafusion_common::internal_err; use datafusion_common::stats::Precision; - use datafusion_common::FileType; - use datafusion_common::GetExt; + use datafusion_common::{internal_err, FileType, GetExt}; use datafusion_expr::{col, lit}; + + use bytes::Bytes; + use chrono::DateTime; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; @@ -843,8 +828,8 @@ mod tests { .collect() .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; - let mut serializer = CsvSerializer::new(); - let bytes = serializer.serialize(batch).await?; + let serializer = CsvSerializer::new(); + let bytes = serializer.serialize(batch, true).await?; assert_eq!( "c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", String::from_utf8(bytes.into()).unwrap() @@ -867,8 +852,8 @@ mod tests { .collect() .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; - let mut serializer = CsvSerializer::new().with_header(false); - let bytes = serializer.serialize(batch).await?; + let serializer = CsvSerializer::new().with_header(false); + let bytes = serializer.serialize(batch, true).await?; assert_eq!( "2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", String::from_utf8(bytes.into()).unwrap() diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index fcb1d5f8e527..3d437bc5fe68 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -23,40 +23,34 @@ use std::fmt::Debug; use std::io::BufReader; use std::sync::Arc; -use super::{FileFormat, FileScanConfig}; -use arrow::datatypes::Schema; -use arrow::datatypes::SchemaRef; -use arrow::json; -use arrow::json::reader::infer_json_schema_from_iterator; -use arrow::json::reader::ValueIter; -use arrow_array::RecordBatch; -use async_trait::async_trait; -use bytes::Buf; - -use bytes::Bytes; -use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::PhysicalSortRequirement; -use datafusion_physical_plan::ExecutionPlan; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; - -use crate::datasource::physical_plan::FileGroupDisplay; -use crate::physical_plan::insert::DataSink; -use crate::physical_plan::insert::FileSinkExec; -use crate::physical_plan::SendableRecordBatchStream; -use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; - use super::write::orchestration::stateless_multipart_put; - +use super::{FileFormat, FileScanConfig}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; +use crate::datasource::physical_plan::FileGroupDisplay; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; use crate::execution::context::SessionState; +use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::{ + DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics, +}; +use arrow::datatypes::Schema; +use arrow::datatypes::SchemaRef; +use arrow::json; +use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; +use arrow_array::RecordBatch; use datafusion_common::{not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; +use datafusion_physical_plan::ExecutionPlan; + +use async_trait::async_trait; +use bytes::{Buf, Bytes}; +use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; /// New line delimited JSON `FileFormat` implementation. #[derive(Debug)] @@ -201,31 +195,22 @@ impl Default for JsonSerializer { } /// Define a struct for serializing Json records to a stream -pub struct JsonSerializer { - // Inner buffer for avoiding reallocation - buffer: Vec, -} +pub struct JsonSerializer {} impl JsonSerializer { /// Constructor for the JsonSerializer object pub fn new() -> Self { - Self { - buffer: Vec::with_capacity(4096), - } + Self {} } } #[async_trait] impl BatchSerializer for JsonSerializer { - async fn serialize(&mut self, batch: RecordBatch) -> Result { - let mut writer = json::LineDelimitedWriter::new(&mut self.buffer); + async fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result { + let mut buffer = Vec::with_capacity(4096); + let mut writer = json::LineDelimitedWriter::new(&mut buffer); writer.write(&batch)?; - //drop(writer); - Ok(Bytes::from(self.buffer.drain(..).collect::>())) - } - - fn duplicate(&mut self) -> Result> { - Ok(Box::new(JsonSerializer::new())) + Ok(Bytes::from(buffer)) } } @@ -272,10 +257,7 @@ impl JsonSink { let writer_options = self.config.file_type_writer_options.try_into_json()?; let compression = &writer_options.compression; - let get_serializer = move || { - let serializer: Box = Box::new(JsonSerializer::new()); - serializer - }; + let get_serializer = move || Arc::new(JsonSerializer::new()) as _; stateless_multipart_put( data, @@ -312,16 +294,17 @@ impl DataSink for JsonSink { #[cfg(test)] mod tests { use super::super::test_util::scan_format; - use datafusion_common::cast::as_int64_array; - use datafusion_common::stats::Precision; - use futures::StreamExt; - use object_store::local::LocalFileSystem; - use super::*; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; + use datafusion_common::cast::as_int64_array; + use datafusion_common::stats::Precision; + + use futures::StreamExt; + use object_store::local::LocalFileSystem; + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 68fe81ce91fa..c481f2accf19 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -24,20 +24,16 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::error::Result; use arrow_array::RecordBatch; - use datafusion_common::DataFusionError; use async_trait::async_trait; use bytes::Bytes; - use futures::future::BoxFuture; use object_store::path::Path; use object_store::{MultipartId, ObjectStore}; - use tokio::io::AsyncWrite; pub(crate) mod demux; @@ -149,15 +145,11 @@ impl AsyncWrite for AbortableWrite { /// A trait that defines the methods required for a RecordBatch serializer. #[async_trait] -pub trait BatchSerializer: Unpin + Send { +pub trait BatchSerializer: Sync + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. - async fn serialize(&mut self, batch: RecordBatch) -> Result; - /// Duplicates self to support serializing multiple batches in parallel on multiple cores - fn duplicate(&mut self) -> Result> { - Err(DataFusionError::NotImplemented( - "Parallel serialization is not implemented for this file type".into(), - )) - } + /// Parameter `initial` signals whether the given batch is the first batch. + /// This distinction is important for certain serializers (like CSV). + async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; } /// Returns an [`AbortableWrite`] which writes to the given object store location diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 120e27ecf669..9b820a15b280 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -21,28 +21,25 @@ use std::sync::Arc; +use super::demux::start_demuxer_task; +use super::{create_writer, AbortableWrite, BatchSerializer}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; use arrow_array::RecordBatch; - -use datafusion_common::DataFusionError; - -use bytes::Bytes; +use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; use datafusion_execution::TaskContext; +use bytes::Bytes; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; use tokio::task::{JoinHandle, JoinSet}; use tokio::try_join; -use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer}; - type WriterType = AbortableWrite>; -type SerializerType = Box; +type SerializerType = Arc; /// Serializes a single data stream in parallel and writes to an ObjectStore /// concurrently. Data order is preserved. In the event of an error, @@ -50,33 +47,28 @@ type SerializerType = Box; /// so that the caller may handle aborting failed writes. pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, - mut serializer: Box, + serializer: Arc, mut writer: AbortableWrite>, ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); - let serialize_task = tokio::spawn(async move { + // Some serializers (like CSV) handle the first batch differently than + // subsequent batches, so we track that here. + let mut initial = true; while let Some(batch) = data_rx.recv().await { - match serializer.duplicate() { - Ok(mut serializer_clone) => { - let handle = tokio::spawn(async move { - let num_rows = batch.num_rows(); - let bytes = serializer_clone.serialize(batch).await?; - Ok((num_rows, bytes)) - }); - tx.send(handle).await.map_err(|_| { - DataFusionError::Internal( - "Unknown error writing to object store".into(), - ) - })?; - } - Err(_) => { - return Err(DataFusionError::Internal( - "Unknown error writing to object store".into(), - )) - } + let serializer_clone = serializer.clone(); + let handle = tokio::spawn(async move { + let num_rows = batch.num_rows(); + let bytes = serializer_clone.serialize(batch, initial).await?; + Ok((num_rows, bytes)) + }); + if initial { + initial = false; } + tx.send(handle).await.map_err(|_| { + internal_datafusion_err!("Unknown error writing to object store") + })?; } Ok(()) }); @@ -120,7 +112,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( Err(_) => { return Err(( writer, - DataFusionError::Internal("Unknown error writing to object store".into()), + internal_datafusion_err!("Unknown error writing to object store"), )) } }; @@ -171,9 +163,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // this thread, so we cannot clean it up (hence any_abort_errors is true) any_errors = true; any_abort_errors = true; - triggering_error = Some(DataFusionError::Internal(format!( + triggering_error = Some(internal_datafusion_err!( "Unexpected join error while serializing file {e}" - ))); + )); } } } @@ -190,24 +182,24 @@ pub(crate) async fn stateless_serialize_and_write_files( false => { writer.shutdown() .await - .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; + .map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?; } } } if any_errors { match any_abort_errors{ - true => return Err(DataFusionError::Internal("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written.".into())), + true => return internal_err!("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written."), false => match triggering_error { Some(e) => return Err(e), - None => return Err(DataFusionError::Internal("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.".into())) + None => return internal_err!("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.") } } } tx.send(row_count).map_err(|_| { - DataFusionError::Internal( - "Error encountered while sending row count back to file sink!".into(), + internal_datafusion_err!( + "Error encountered while sending row count back to file sink!" ) })?; Ok(()) @@ -220,7 +212,7 @@ pub(crate) async fn stateless_multipart_put( data: SendableRecordBatchStream, context: &Arc, file_extension: String, - get_serializer: Box Box + Send>, + get_serializer: Box Arc + Send>, config: &FileSinkConfig, compression: FileCompressionType, ) -> Result { @@ -264,8 +256,8 @@ pub(crate) async fn stateless_multipart_put( .send((rb_stream, serializer, writer)) .await .map_err(|_| { - DataFusionError::Internal( - "Writer receive file bundle channel closed unexpectedly!".into(), + internal_datafusion_err!( + "Writer receive file bundle channel closed unexpectedly!" ) })?; } @@ -288,9 +280,7 @@ pub(crate) async fn stateless_multipart_put( } let total_count = rx_row_cnt.await.map_err(|_| { - DataFusionError::Internal( - "Did not receieve row count from write coordinater".into(), - ) + internal_datafusion_err!("Did not receieve row count from write coordinater") })?; Ok(total_count) diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 99fb088b66f4..bb4c8313642c 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -518,10 +518,8 @@ impl RecordBatchStream for FileStream { #[cfg(test)] mod tests { - use arrow_schema::Schema; - use datafusion_common::internal_err; - use datafusion_common::DataFusionError; - use datafusion_common::Statistics; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use super::*; use crate::datasource::file_format::write::BatchSerializer; @@ -534,8 +532,8 @@ mod tests { test::{make_partition, object_store::register_test_store}, }; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; + use arrow_schema::Schema; + use datafusion_common::{internal_err, DataFusionError, Statistics}; use async_trait::async_trait; use bytes::Bytes; @@ -993,7 +991,7 @@ mod tests { #[async_trait] impl BatchSerializer for TestSerializer { - async fn serialize(&mut self, _batch: RecordBatch) -> Result { + async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result { Ok(self.bytes.clone()) } } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 7e1312dad23e..d237a3e8607e 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -990,6 +990,10 @@ fn update_join_on( proj_right_exprs: &[(Column, String)], hash_join_on: &[(Column, Column)], ) -> Option> { + // TODO: Clippy wants the "map" call removed, but doing so generates + // a compilation error. Remove the clippy directive once this + // issue is fixed. + #[allow(clippy::map_identity)] let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on .iter() .map(|(left, right)| (left, right)) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index b46e9ec8f69d..0ec19bcadbf6 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1724,13 +1724,13 @@ mod test { use crate::expr::Cast; use crate::expr_fn::col; use crate::{ - case, lit, BuiltinScalarFunction, ColumnarValue, Expr, ReturnTypeFunction, - ScalarFunctionDefinition, ScalarFunctionImplementation, ScalarUDF, Signature, - Volatility, + case, lit, BuiltinScalarFunction, ColumnarValue, Expr, ScalarFunctionDefinition, + ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; use arrow::datatypes::DataType; use datafusion_common::Column; use datafusion_common::{Result, ScalarValue}; + use std::any::Any; use std::sync::Arc; #[test] @@ -1848,24 +1848,41 @@ mod test { ); // UDF - let return_type: ReturnTypeFunction = - Arc::new(move |_| Ok(Arc::new(DataType::Utf8))); - let fun: ScalarFunctionImplementation = - Arc::new(move |_| Ok(ColumnarValue::Scalar(ScalarValue::new_utf8("a")))); - let udf = Arc::new(ScalarUDF::new( - "TestScalarUDF", - &Signature::uniform(1, vec![DataType::Float32], Volatility::Stable), - &return_type, - &fun, - )); + struct TestScalarUDF { + signature: Signature, + } + impl ScalarUDFImpl for TestScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "TestScalarUDF" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::from("a"))) + } + } + let udf = Arc::new(ScalarUDF::from(TestScalarUDF { + signature: Signature::uniform(1, vec![DataType::Float32], Volatility::Stable), + })); assert!(!ScalarFunctionDefinition::UDF(udf).is_volatile().unwrap()); - let udf = Arc::new(ScalarUDF::new( - "TestScalarUDF", - &Signature::uniform(1, vec![DataType::Float32], Volatility::Volatile), - &return_type, - &fun, - )); + let udf = Arc::new(ScalarUDF::from(TestScalarUDF { + signature: Signature::uniform( + 1, + vec![DataType::Float32], + Volatility::Volatile, + ), + })); assert!(ScalarFunctionDefinition::UDF(udf).is_volatile().unwrap()); // Unresolved function diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index cedf1d845137..eed41d97ccba 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -22,15 +22,16 @@ use crate::expr::{ Placeholder, ScalarFunction, TryCast, }; use crate::function::PartitionEvaluatorFactory; -use crate::WindowUDF; use crate::{ aggregate_function, built_in_function, conditional_expressions::CaseBuilder, logical_plan::Subquery, AccumulatorFactoryFunction, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan, Operator, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, StateTypeFunction, Volatility, }; +use crate::{ColumnarValue, ScalarUDFImpl, WindowUDF}; use arrow::datatypes::DataType; use datafusion_common::{Column, Result}; +use std::any::Any; use std::ops::Not; use std::sync::Arc; @@ -944,11 +945,18 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder { CaseBuilder::new(None, vec![when], vec![then], None) } -/// Creates a new UDF with a specific signature and specific return type. -/// This is a helper function to create a new UDF. -/// The function `create_udf` returns a subset of all possible `ScalarFunction`: -/// * the UDF has a fixed return type -/// * the UDF has a fixed signature (e.g. [f64, f64]) +/// Convenience method to create a new user defined scalar function (UDF) with a +/// specific signature and specific return type. +/// +/// Note this function does not expose all available features of [`ScalarUDF`], +/// such as +/// +/// * computing return types based on input types +/// * multiple [`Signature`]s +/// * aliases +/// +/// See [`ScalarUDF`] for details and examples on how to use the full +/// functionality. pub fn create_udf( name: &str, input_types: Vec, @@ -956,13 +964,66 @@ pub fn create_udf( volatility: Volatility, fun: ScalarFunctionImplementation, ) -> ScalarUDF { - let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone())); - ScalarUDF::new( + let return_type = Arc::try_unwrap(return_type).unwrap_or_else(|t| t.as_ref().clone()); + ScalarUDF::from(SimpleScalarUDF::new( name, - &Signature::exact(input_types, volatility), - &return_type, - &fun, - ) + input_types, + return_type, + volatility, + fun, + )) +} + +/// Implements [`ScalarUDFImpl`] for functions that have a single signature and +/// return type. +pub struct SimpleScalarUDF { + name: String, + signature: Signature, + return_type: DataType, + fun: ScalarFunctionImplementation, +} + +impl SimpleScalarUDF { + /// Create a new `SimpleScalarUDF` from a name, input types, return type and + /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility + pub fn new( + name: impl Into, + input_types: Vec, + return_type: DataType, + volatility: Volatility, + fun: ScalarFunctionImplementation, + ) -> Self { + let name = name.into(); + let signature = Signature::exact(input_types, volatility); + Self { + name, + signature, + return_type, + fun, + } + } +} + +impl ScalarUDFImpl for SimpleScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + (self.fun)(args) + } } /// Creates a new UDAF with a specific signature, state type and return type. diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 48532e13dcd7..bf8e9e2954f4 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -80,7 +80,7 @@ pub use signature::{ }; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::AggregateUDF; -pub use udf::ScalarUDF; +pub use udf::{ScalarUDF, ScalarUDFImpl}; pub use udwf::WindowUDF; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; pub use window_function::{BuiltInWindowFunction, WindowFunction}; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 3a18ca2d25e8..2ec80a4a9ea1 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -17,9 +17,12 @@ //! [`ScalarUDF`]: Scalar User Defined Functions -use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature}; +use crate::{ + ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature, +}; use arrow::datatypes::DataType; use datafusion_common::Result; +use std::any::Any; use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; @@ -27,11 +30,19 @@ use std::sync::Arc; /// Logical representation of a Scalar User Defined Function. /// -/// A scalar function produces a single row output for each row of input. +/// A scalar function produces a single row output for each row of input. This +/// struct contains the information DataFusion needs to plan and invoke +/// functions you supply such name, type signature, return type, and actual +/// implementation. /// -/// This struct contains the information DataFusion needs to plan and invoke -/// functions such name, type signature, return type, and actual implementation. /// +/// 1. For simple (less performant) use cases, use [`create_udf`] and [`simple_udf.rs`]. +/// +/// 2. For advanced use cases, use [`ScalarUDFImpl`] and [`advanced_udf.rs`]. +/// +/// [`create_udf`]: crate::expr_fn::create_udf +/// [`simple_udf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udf.rs +/// [`advanced_udf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs #[derive(Clone)] pub struct ScalarUDF { /// The name of the function @@ -79,7 +90,11 @@ impl std::hash::Hash for ScalarUDF { } impl ScalarUDF { - /// Create a new ScalarUDF + /// Create a new ScalarUDF from low level details. + /// + /// See [`ScalarUDFImpl`] for a more convenient way to create a + /// `ScalarUDF` using trait objects + #[deprecated(since = "34.0.0", note = "please implement ScalarUDFImpl instead")] pub fn new( name: &str, signature: &Signature, @@ -95,6 +110,34 @@ impl ScalarUDF { } } + /// Create a new `ScalarUDF` from a `[ScalarUDFImpl]` trait object + /// + /// Note this is the same as using the `From` impl (`ScalarUDF::from`) + pub fn new_from_impl(fun: F) -> ScalarUDF + where + F: ScalarUDFImpl + Send + Sync + 'static, + { + // TODO change the internal implementation to use the trait object + let arc_fun = Arc::new(fun); + let captured_self = arc_fun.clone(); + let return_type: ReturnTypeFunction = Arc::new(move |arg_types| { + let return_type = captured_self.return_type(arg_types)?; + Ok(Arc::new(return_type)) + }); + + let captured_self = arc_fun.clone(); + let func: ScalarFunctionImplementation = + Arc::new(move |args| captured_self.invoke(args)); + + Self { + name: arc_fun.name().to_string(), + signature: arc_fun.signature().clone(), + return_type: return_type.clone(), + fun: func, + aliases: arc_fun.aliases().to_vec(), + } + } + /// Adds additional names that can be used to invoke this function, in addition to `name` pub fn with_aliases( mut self, @@ -105,7 +148,9 @@ impl ScalarUDF { self } - /// creates a logical expression with a call of the UDF + /// Returns a [`Expr`] logical expression to call this UDF with specified + /// arguments. + /// /// This utility allows using the UDF without requiring access to the registry. pub fn call(&self, args: Vec) -> Expr { Expr::ScalarFunction(crate::expr::ScalarFunction::new_udf( @@ -124,22 +169,126 @@ impl ScalarUDF { &self.aliases } - /// Returns this function's signature (what input types are accepted) + /// Returns this function's [`Signature`] (what input types are accepted) pub fn signature(&self) -> &Signature { &self.signature } - /// Return the type of the function given its input types + /// The datatype this function returns given the input argument input types pub fn return_type(&self, args: &[DataType]) -> Result { // Old API returns an Arc of the datatype for some reason let res = (self.return_type)(args)?; Ok(res.as_ref().clone()) } - /// Return the actual implementation + /// Return an [`Arc`] to the function implementation pub fn fun(&self) -> ScalarFunctionImplementation { self.fun.clone() } +} - // TODO maybe add an invoke() method that runs the actual function? +impl From for ScalarUDF +where + F: ScalarUDFImpl + Send + Sync + 'static, +{ + fn from(fun: F) -> Self { + Self::new_from_impl(fun) + } +} + +/// Trait for implementing [`ScalarUDF`]. +/// +/// This trait exposes the full API for implementing user defined functions and +/// can be used to implement any function. +/// +/// See [`advanced_udf.rs`] for a full example with complete implementation and +/// [`ScalarUDF`] for other available options. +/// +/// +/// [`advanced_udf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs +/// # Basic Example +/// ``` +/// # use std::any::Any; +/// # use arrow::datatypes::DataType; +/// # use datafusion_common::{DataFusionError, plan_err, Result}; +/// # use datafusion_expr::{col, ColumnarValue, Signature, Volatility}; +/// # use datafusion_expr::{ScalarUDFImpl, ScalarUDF}; +/// struct AddOne { +/// signature: Signature +/// }; +/// +/// impl AddOne { +/// fn new() -> Self { +/// Self { +/// signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable) +/// } +/// } +/// } +/// +/// /// Implement the ScalarUDFImpl trait for AddOne +/// impl ScalarUDFImpl for AddOne { +/// fn as_any(&self) -> &dyn Any { self } +/// fn name(&self) -> &str { "add_one" } +/// fn signature(&self) -> &Signature { &self.signature } +/// fn return_type(&self, args: &[DataType]) -> Result { +/// if !matches!(args.get(0), Some(&DataType::Int32)) { +/// return plan_err!("add_one only accepts Int32 arguments"); +/// } +/// Ok(DataType::Int32) +/// } +/// // The actual implementation would add one to the argument +/// fn invoke(&self, args: &[ColumnarValue]) -> Result { unimplemented!() } +/// } +/// +/// // Create a new ScalarUDF from the implementation +/// let add_one = ScalarUDF::from(AddOne::new()); +/// +/// // Call the function `add_one(col)` +/// let expr = add_one.call(vec![col("a")]); +/// ``` +pub trait ScalarUDFImpl { + /// Returns this object as an [`Any`] trait object + fn as_any(&self) -> &dyn Any; + + /// Returns this function's name + fn name(&self) -> &str; + + /// Returns the function's [`Signature`] for information about what input + /// types are accepted and the function's Volatility. + fn signature(&self) -> &Signature; + + /// What [`DataType`] will be returned by this function, given the types of + /// the arguments + fn return_type(&self, arg_types: &[DataType]) -> Result; + + /// Invoke the function on `args`, returning the appropriate result + /// + /// The function will be invoked passed with the slice of [`ColumnarValue`] + /// (either scalar or array). + /// + /// # Zero Argument Functions + /// If the function has zero parameters (e.g. `now()`) it will be passed a + /// single element slice which is a a null array to indicate the batch's row + /// count (so the function can know the resulting array size). + /// + /// # Performance + /// + /// For the best performance, the implementations of `invoke` should handle + /// the common case when one or more of their arguments are constant values + /// (aka [`ColumnarValue::Scalar`]). Calling [`ColumnarValue::into_array`] + /// and treating all arguments as arrays will work, but will be slower. + fn invoke(&self, args: &[ColumnarValue]) -> Result; + + /// Returns any aliases (alternate names) for this function. + /// + /// Aliases can be used to invoke the same function using different names. + /// For example in some databases `now()` and `current_timestamp()` are + /// aliases for the same function. This behavior can be obtained by + /// returning `current_timestamp` as an alias for the `now` function. + /// + /// Note: `aliases` should only include names other than [`Self::name`]. + /// Defaults to `[]` (no aliases) + fn aliases(&self) -> &[String] { + &[] + } } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index c5e1180b9f97..b6298f5b552f 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -738,7 +738,8 @@ fn coerce_case_expression(case: Case, schema: &DFSchemaRef) -> Result { #[cfg(test)] mod test { - use std::sync::Arc; + use std::any::Any; + use std::sync::{Arc, OnceLock}; use arrow::array::{FixedSizeListArray, Int32Array}; use arrow::datatypes::{DataType, TimeUnit}; @@ -750,13 +751,13 @@ mod test { use datafusion_expr::{ cast, col, concat, concat_ws, create_udaf, is_true, AccumulatorFactoryFunction, AggregateFunction, AggregateUDF, BinaryExpr, BuiltinScalarFunction, Case, - ColumnarValue, ExprSchemable, Filter, Operator, StateTypeFunction, Subquery, + ColumnarValue, ExprSchemable, Filter, Operator, ScalarUDFImpl, StateTypeFunction, + Subquery, }; use datafusion_expr::{ lit, logical_plan::{EmptyRelation, Projection}, - Expr, LogicalPlan, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, - Signature, Volatility, + Expr, LogicalPlan, ReturnTypeFunction, ScalarUDF, Signature, Volatility, }; use datafusion_physical_expr::expressions::AvgAccumulator; @@ -808,22 +809,36 @@ mod test { assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, expected) } + static TEST_SIGNATURE: OnceLock = OnceLock::new(); + + struct TestScalarUDF {} + impl ScalarUDFImpl for TestScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "TestScalarUDF" + } + fn signature(&self) -> &Signature { + TEST_SIGNATURE.get_or_init(|| { + Signature::uniform(1, vec![DataType::Float32], Volatility::Stable) + }) + } + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::from("a"))) + } + } + #[test] fn scalar_udf() -> Result<()> { let empty = empty(); - let return_type: ReturnTypeFunction = - Arc::new(move |_| Ok(Arc::new(DataType::Utf8))); - let fun: ScalarFunctionImplementation = - Arc::new(move |_| Ok(ColumnarValue::Scalar(ScalarValue::new_utf8("a")))); - let udf = Expr::ScalarFunction(expr::ScalarFunction::new_udf( - Arc::new(ScalarUDF::new( - "TestScalarUDF", - &Signature::uniform(1, vec![DataType::Float32], Volatility::Stable), - &return_type, - &fun, - )), - vec![lit(123_i32)], - )); + + let udf = ScalarUDF::from(TestScalarUDF {}).call(vec![lit(123_i32)]); let plan = LogicalPlan::Projection(Projection::try_new(vec![udf], empty)?); let expected = "Projection: TestScalarUDF(CAST(Int32(123) AS Float32))\n EmptyRelation"; @@ -833,24 +848,13 @@ mod test { #[test] fn scalar_udf_invalid_input() -> Result<()> { let empty = empty(); - let return_type: ReturnTypeFunction = - Arc::new(move |_| Ok(Arc::new(DataType::Utf8))); - let fun: ScalarFunctionImplementation = Arc::new(move |_| unimplemented!()); - let udf = Expr::ScalarFunction(expr::ScalarFunction::new_udf( - Arc::new(ScalarUDF::new( - "TestScalarUDF", - &Signature::uniform(1, vec![DataType::Int32], Volatility::Stable), - &return_type, - &fun, - )), - vec![lit("Apple")], - )); + let udf = ScalarUDF::from(TestScalarUDF {}).call(vec![lit("Apple")]); let plan = LogicalPlan::Projection(Projection::try_new(vec![udf], empty)?); let err = assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, "") .err() .unwrap(); assert_eq!( - "type_coercion\ncaused by\nError during planning: Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed.", + "type_coercion\ncaused by\nError during planning: Coercion from [Utf8] to the signature Uniform(1, [Float32]) failed.", err.strip_backtrace() ); Ok(()) diff --git a/datafusion/optimizer/src/simplify_expressions/guarantees.rs b/datafusion/optimizer/src/simplify_expressions/guarantees.rs index 860dc326b9b0..aa7bb4f78a93 100644 --- a/datafusion/optimizer/src/simplify_expressions/guarantees.rs +++ b/datafusion/optimizer/src/simplify_expressions/guarantees.rs @@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> { guarantees: impl IntoIterator, ) -> Self { Self { + // TODO: Clippy wants the "map" call removed, but doing so generates + // a compilation error. Remove the clippy directive once this + // issue is fixed. + #[allow(clippy::map_identity)] guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(), } } diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index c009881d8918..c7032e601cf8 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; +use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; use crate::expressions::format_state_name; use crate::{ reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, @@ -29,9 +29,10 @@ use crate::{ use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; -use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; -use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::Accumulator; /// FIRST_VALUE aggregate expression @@ -211,10 +212,45 @@ impl FirstValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.first = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let [value, orderings @ ..] = row else { + return internal_err!("Empty row in FIRST_VALUE"); + }; + // Update when there is no entry in the state, or we have an "earlier" + // entry according to sort requirements. + if !self.is_set + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_gt() + { + self.first = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) + } + + fn get_first_idx(&self, values: &[ArrayRef]) -> Result> { + let [value, ordering_values @ ..] = values else { + return internal_err!("Empty row in FIRST_VALUE"); + }; + if self.ordering_req.is_empty() { + // Get first entry according to receive order (0th index) + return Ok((!value.is_empty()).then_some(0)); + } + let sort_columns = ordering_values + .iter() + .zip(self.ordering_req.iter()) + .map(|(values, req)| SortColumn { + values: values.clone(), + options: Some(req.options), + }) + .collect::>(); + let indices = lexsort_to_indices(&sort_columns, Some(1))?; + Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } } @@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // If we have seen first value, we shouldn't update it - if !values[0].is_empty() && !self.is_set { - let row = get_row_at_idx(values, 0)?; - // Update with first value in the array. - self.update_with_new_row(&row); + if let Some(first_idx) = self.get_first_idx(values)? { + let row = get_row_at_idx(values, first_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator { // Update with first value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&first_row[0..is_set_idx]); + self.update_with_new_row(&first_row[0..is_set_idx])?; } } Ok(()) @@ -459,10 +493,50 @@ impl LastValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.last = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let [value, orderings @ ..] = row else { + return internal_err!("Empty row in LAST_VALUE"); + }; + // Update when there is no entry in the state, or we have a "later" + // entry (either according to sort requirements or the order of execution). + if !self.is_set + || self.orderings.is_empty() + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_lt() + { + self.last = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) + } + + fn get_last_idx(&self, values: &[ArrayRef]) -> Result> { + let [value, ordering_values @ ..] = values else { + return internal_err!("Empty row in LAST_VALUE"); + }; + if self.ordering_req.is_empty() { + // Get last entry according to the order of data: + return Ok((!value.is_empty()).then_some(value.len() - 1)); + } + let sort_columns = ordering_values + .iter() + .zip(self.ordering_req.iter()) + .map(|(values, req)| { + // Take the reverse ordering requirement. This enables us to + // use "fetch = 1" to get the last value. + SortColumn { + values: values.clone(), + options: Some(!req.options), + } + }) + .collect::>(); + let indices = lexsort_to_indices(&sort_columns, Some(1))?; + Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } } @@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if !values[0].is_empty() { - let row = get_row_at_idx(values, values[0].len() - 1)?; - // Update with last value in the array. - self.update_with_new_row(&row); + if let Some(last_idx) = self.get_last_idx(values)? { + let row = get_row_at_idx(values, last_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator { // Update with last value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&last_row[0..is_set_idx]); + self.update_with_new_row(&last_row[0..is_set_idx])?; } } Ok(()) @@ -559,26 +632,18 @@ fn convert_to_sort_cols( .collect::>() } -/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. -fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { - ordering_req - .iter() - .map(|item| item.options) - .collect::>() -} - #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator}; + use arrow::compute::concat; use arrow_array::{ArrayRef, Int64Array}; use arrow_schema::DataType; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; - use arrow::compute::concat; - use std::sync::Arc; - #[test] fn test_first_last_value_value() -> Result<()> { let mut first_accumulator = diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 329bb1e6415e..5bd1fca385b1 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,16 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; -use crate::{PhysicalExpr, PhysicalSortExpr}; -use arrow::datatypes::Field; -use datafusion_common::{not_impl_err, DataFusionError, Result}; -use datafusion_expr::Accumulator; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; use self::groups_accumulator::GroupsAccumulator; +use crate::expressions::OrderSensitiveArrayAgg; +use crate::{PhysicalExpr, PhysicalSortExpr}; + +use arrow::datatypes::Field; +use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_expr::Accumulator; + +mod hyperloglog; +mod tdigest; pub(crate) mod approx_distinct; pub(crate) mod approx_median; @@ -46,19 +50,18 @@ pub(crate) mod median; pub(crate) mod string_agg; #[macro_use] pub(crate) mod min_max; -pub mod build_in; pub(crate) mod groups_accumulator; -mod hyperloglog; -pub mod moving_min_max; pub(crate) mod regr; pub(crate) mod stats; pub(crate) mod stddev; pub(crate) mod sum; pub(crate) mod sum_distinct; -mod tdigest; -pub mod utils; pub(crate) mod variance; +pub mod build_in; +pub mod moving_min_max; +pub mod utils; + /// An aggregate expression that: /// * knows its resulting field /// * knows how to create its accumulator @@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. -/// However, a `FirstValue` depends on the input ordering (if the order changes, -/// the first value in the list would change). +/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering. pub fn is_order_sensitive(aggr_expr: &Arc) -> bool { - aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() + aggr_expr.as_any().is::() } diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index e5421ef5ab7e..9777158da133 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -17,20 +17,21 @@ //! Utilities used in aggregates +use std::any::Any; +use std::sync::Arc; + use crate::{AggregateExpr, PhysicalSortExpr}; -use arrow::array::ArrayRef; + +use arrow::array::{ArrayRef, ArrowNativeTypeOp}; use arrow_array::cast::AsArray; use arrow_array::types::{ Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; -use arrow_array::ArrowNativeTypeOp; use arrow_buffer::ArrowNativeType; -use arrow_schema::{DataType, Field}; +use arrow_schema::{DataType, Field, SortOptions}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; -use std::any::Any; -use std::sync::Arc; /// Convert scalar values from an accumulator into arrays. pub fn get_accum_scalar_values_as_arrays( @@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays( .state()? .iter() .map(|s| s.to_array_of_size(1)) - .collect::>>() + .collect() } /// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow @@ -205,3 +206,8 @@ pub(crate) fn ordering_fields( }) .collect() } + +/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. +pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { + ordering_req.iter().map(|item| item.options).collect() +} diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 274d1db4eb0d..250250630eff 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -743,22 +743,78 @@ where )?)) } -/// array_pop_back SQL function -pub fn array_pop_back(args: &[ArrayRef]) -> Result { - if args.len() != 1 { - return exec_err!("array_pop_back needs one argument"); - } +fn general_pop_front_list( + array: &GenericListArray, +) -> Result +where + i64: TryInto, +{ + let from_array = Int64Array::from(vec![2; array.len()]); + let to_array = Int64Array::from( + array + .iter() + .map(|arr| arr.map_or(0, |arr| arr.len() as i64)) + .collect::>(), + ); + general_array_slice::(array, &from_array, &to_array) +} - let list_array = as_list_array(&args[0])?; - let from_array = Int64Array::from(vec![1; list_array.len()]); +fn general_pop_back_list( + array: &GenericListArray, +) -> Result +where + i64: TryInto, +{ + let from_array = Int64Array::from(vec![1; array.len()]); let to_array = Int64Array::from( - list_array + array .iter() .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1)) .collect::>(), ); - let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)]; - array_slice(args.as_slice()) + general_array_slice::(array, &from_array, &to_array) +} + +/// array_pop_front SQL function +pub fn array_pop_front(args: &[ArrayRef]) -> Result { + let array_data_type = args[0].data_type(); + match array_data_type { + DataType::List(_) => { + let array = as_list_array(&args[0])?; + general_pop_front_list::(array) + } + DataType::LargeList(_) => { + let array = as_large_list_array(&args[0])?; + general_pop_front_list::(array) + } + _ => exec_err!( + "array_pop_front does not support type: {:?}", + array_data_type + ), + } +} + +/// array_pop_back SQL function +pub fn array_pop_back(args: &[ArrayRef]) -> Result { + if args.len() != 1 { + return exec_err!("array_pop_back needs one argument"); + } + + let array_data_type = args[0].data_type(); + match array_data_type { + DataType::List(_) => { + let array = as_list_array(&args[0])?; + general_pop_back_list::(array) + } + DataType::LargeList(_) => { + let array = as_large_list_array(&args[0])?; + general_pop_back_list::(array) + } + _ => exec_err!( + "array_pop_back does not support type: {:?}", + array_data_type + ), + } } /// Appends or prepends elements to a ListArray. @@ -882,20 +938,6 @@ pub fn gen_range(args: &[ArrayRef]) -> Result { Ok(arr) } -/// array_pop_front SQL function -pub fn array_pop_front(args: &[ArrayRef]) -> Result { - let list_array = as_list_array(&args[0])?; - let from_array = Int64Array::from(vec![2; list_array.len()]); - let to_array = Int64Array::from( - list_array - .iter() - .map(|arr| arr.map_or(0, |arr| arr.len() as i64)) - .collect::>(), - ); - let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)]; - array_slice(args.as_slice()) -} - /// Array_append SQL function pub fn array_append(args: &[ArrayRef]) -> Result { if args.len() != 2 { @@ -2453,7 +2495,7 @@ pub fn general_array_distinct( let last_offset: OffsetSize = offsets.last().copied().unwrap(); offsets.push(last_offset + OffsetSize::usize_as(rows.len())); let arrays = converter.convert_rows(rows)?; - let array = match arrays.get(0) { + let array = match arrays.first() { Some(array) => array.clone(), None => { return internal_err!("array_distinct: failed to get array from rows") diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index f779322456ca..f5bb4fe59b5d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -27,7 +27,7 @@ use crate::aggregates::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::windows::{get_ordered_partition_by_indices, get_window_mode}; +use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning, SendableRecordBatchStream, Statistics, @@ -45,11 +45,11 @@ use datafusion_physical_expr::{ aggregate::is_order_sensitive, equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, - LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, + LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use itertools::{izip, Itertools}; +use itertools::Itertools; mod group_values; mod no_grouping; @@ -277,159 +277,6 @@ pub struct AggregateExec { output_ordering: Option, } -/// This function returns the ordering requirement of the first non-reversible -/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves -/// as the initial requirement while calculating the finest requirement among all -/// aggregate functions. If this function returns `None`, it means there is no -/// hard ordering requirement for the aggregate functions (in terms of direction). -/// Then, we can generate two alternative requirements with opposite directions. -fn get_init_req( - aggr_expr: &[Arc], - order_by_expr: &[Option], -) -> Option { - for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) { - // If the aggregation function is a non-reversible order-sensitive function - // and there is a hard requirement, choose first such requirement: - if is_order_sensitive(aggr_expr) - && aggr_expr.reverse_expr().is_none() - && fn_reqs.is_some() - { - return fn_reqs.clone(); - } - } - None -} - -/// This function gets the finest ordering requirement among all the aggregation -/// functions. If requirements are conflicting, (i.e. we can not compute the -/// aggregations in a single [`AggregateExec`]), the function returns an error. -fn get_finest_requirement( - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - eq_properties: &EquivalenceProperties, -) -> Result> { - // First, we check if all the requirements are satisfied by the existing - // ordering. If so, we return `None` to indicate this. - let mut all_satisfied = true; - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - if eq_properties.ordering_satisfy(fn_req.as_deref().unwrap_or(&[])) { - continue; - } - if let Some(reverse) = aggr_expr.reverse_expr() { - let reverse_req = fn_req.as_ref().map(|item| reverse_order_bys(item)); - if eq_properties.ordering_satisfy(reverse_req.as_deref().unwrap_or(&[])) { - // We need to update `aggr_expr` with its reverse since only its - // reverse requirement is compatible with the existing requirements: - *aggr_expr = reverse; - *fn_req = reverse_req; - continue; - } - } - // Requirement is not satisfied: - all_satisfied = false; - } - if all_satisfied { - // All of the requirements are already satisfied. - return Ok(None); - } - let mut finest_req = get_init_req(aggr_expr, order_by_expr); - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - let Some(fn_req) = fn_req else { - continue; - }; - - if let Some(finest_req) = &mut finest_req { - if let Some(finer) = eq_properties.get_finer_ordering(finest_req, fn_req) { - *finest_req = finer; - continue; - } - // If an aggregate function is reversible, analyze whether its reverse - // direction is compatible with existing requirements: - if let Some(reverse) = aggr_expr.reverse_expr() { - let fn_req_reverse = reverse_order_bys(fn_req); - if let Some(finer) = - eq_properties.get_finer_ordering(finest_req, &fn_req_reverse) - { - // We need to update `aggr_expr` with its reverse, since only its - // reverse requirement is compatible with existing requirements: - *aggr_expr = reverse; - *finest_req = finer; - *fn_req = fn_req_reverse; - continue; - } - } - // If neither of the requirements satisfy the other, this means - // requirements are conflicting. Currently, we do not support - // conflicting requirements. - return not_impl_err!( - "Conflicting ordering requirements in aggregate functions is not supported" - ); - } else { - finest_req = Some(fn_req.clone()); - } - } - Ok(finest_req) -} - -/// Calculates search_mode for the aggregation -fn get_aggregate_search_mode( - group_by: &PhysicalGroupBy, - input: &Arc, - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - ordering_req: &mut Vec, -) -> InputOrderMode { - let groupby_exprs = group_by - .expr - .iter() - .map(|(item, _)| item.clone()) - .collect::>(); - let mut input_order_mode = InputOrderMode::Linear; - if !group_by.is_single() || groupby_exprs.is_empty() { - return input_order_mode; - } - - if let Some((should_reverse, mode)) = - get_window_mode(&groupby_exprs, ordering_req, input) - { - let all_reversible = aggr_expr - .iter() - .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some()); - if should_reverse && all_reversible { - izip!(aggr_expr.iter_mut(), order_by_expr.iter_mut()).for_each( - |(aggr, order_by)| { - if let Some(reverse) = aggr.reverse_expr() { - *aggr = reverse; - } else { - unreachable!(); - } - *order_by = order_by.as_ref().map(|ob| reverse_order_bys(ob)); - }, - ); - *ordering_req = reverse_order_bys(ordering_req); - } - input_order_mode = mode; - } - input_order_mode -} - -/// Check whether group by expression contains all of the expression inside `requirement` -// As an example Group By (c,b,a) contains all of the expressions in the `requirement`: (a ASC, b DESC) -fn group_by_contains_all_requirements( - group_by: &PhysicalGroupBy, - requirement: &LexOrdering, -) -> bool { - let physical_exprs = group_by.input_exprs(); - // When we have multiple groups (grouping set) - // since group by may be calculated on the subset of the group_by.expr() - // it is not guaranteed to have all of the requirements among group by expressions. - // Hence do the analysis: whether group by contains all requirements in the single group case. - group_by.is_single() - && requirement - .iter() - .all(|req| physical_exprs_contains(&physical_exprs, &req.expr)) -} - impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( @@ -477,50 +324,14 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, original_schema: SchemaRef, ) -> Result { - // Reset ordering requirement to `None` if aggregator is not order-sensitive - let mut order_by_expr = aggr_expr - .iter() - .map(|aggr_expr| { - let fn_reqs = aggr_expr.order_bys().map(|ordering| ordering.to_vec()); - // If - // - aggregation function is order-sensitive and - // - aggregation is performing a "first stage" calculation, and - // - at least one of the aggregate function requirement is not inside group by expression - // keep the ordering requirement as is; otherwise ignore the ordering requirement. - // In non-first stage modes, we accumulate data (using `merge_batch`) - // from different partitions (i.e. merge partial results). During - // this merge, we consider the ordering of each partial result. - // Hence, we do not need to use the ordering requirement in such - // modes as long as partial results are generated with the - // correct ordering. - fn_reqs.filter(|req| { - is_order_sensitive(aggr_expr) - && mode.is_first_stage() - && !group_by_contains_all_requirements(&group_by, req) - }) - }) - .collect::>(); - let requirement = get_finest_requirement( - &mut aggr_expr, - &mut order_by_expr, - &input.equivalence_properties(), - )?; - let mut ordering_req = requirement.unwrap_or(vec![]); - let input_order_mode = get_aggregate_search_mode( - &group_by, - &input, - &mut aggr_expr, - &mut order_by_expr, - &mut ordering_req, - ); - + let input_eq_properties = input.equivalence_properties(); // Get GROUP BY expressions: let groupby_exprs = group_by.input_exprs(); // If existing ordering satisfies a prefix of the GROUP BY expressions, @@ -528,17 +339,31 @@ impl AggregateExec { // work more efficiently. let indices = get_ordered_partition_by_indices(&groupby_exprs, &input); let mut new_requirement = indices - .into_iter() - .map(|idx| PhysicalSortRequirement { + .iter() + .map(|&idx| PhysicalSortRequirement { expr: groupby_exprs[idx].clone(), options: None, }) .collect::>(); - // Postfix ordering requirement of the aggregation to the requirement. - let req = PhysicalSortRequirement::from_sort_exprs(&ordering_req); + + let req = get_aggregate_exprs_requirement( + &aggr_expr, + &group_by, + &input_eq_properties, + &mode, + )?; new_requirement.extend(req); new_requirement = collapse_lex_req(new_requirement); + let input_order_mode = + if indices.len() == groupby_exprs.len() && !indices.is_empty() { + InputOrderMode::Sorted + } else if !indices.is_empty() { + InputOrderMode::PartiallySorted(indices) + } else { + InputOrderMode::Linear + }; + // construct a map from the input expression to the output expression of the Aggregation group by let projection_mapping = ProjectionMapping::try_new(&group_by.expr, &input.schema())?; @@ -546,9 +371,8 @@ impl AggregateExec { let required_input_ordering = (!new_requirement.is_empty()).then_some(new_requirement); - let aggregate_eqs = input - .equivalence_properties() - .project(&projection_mapping, schema.clone()); + let aggregate_eqs = + input_eq_properties.project(&projection_mapping, schema.clone()); let output_ordering = aggregate_eqs.oeq_class().output_ordering(); Ok(AggregateExec { @@ -998,6 +822,121 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { Arc::new(Schema::new(group_fields)) } +/// Determines the lexical ordering requirement for an aggregate expression. +/// +/// # Parameters +/// +/// - `aggr_expr`: A reference to an `Arc` representing the +/// aggregate expression. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. +/// +/// # Returns +/// +/// A `LexOrdering` instance indicating the lexical ordering requirement for +/// the aggregate expression. +fn get_aggregate_expr_req( + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + agg_mode: &AggregateMode, +) -> LexOrdering { + // If the aggregation function is not order sensitive, or the aggregation + // is performing a "second stage" calculation, or all aggregate function + // requirements are inside the GROUP BY expression, then ignore the ordering + // requirement. + if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() { + return vec![]; + } + + let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec(); + + // In non-first stage modes, we accumulate data (using `merge_batch`) from + // different partitions (i.e. merge partial results). During this merge, we + // consider the ordering of each partial result. Hence, we do not need to + // use the ordering requirement in such modes as long as partial results are + // generated with the correct ordering. + if group_by.is_single() { + // Remove all orderings that occur in the group by. These requirements + // will definitely be satisfied -- Each group by expression will have + // distinct values per group, hence all requirements are satisfied. + let physical_exprs = group_by.input_exprs(); + req.retain(|sort_expr| { + !physical_exprs_contains(&physical_exprs, &sort_expr.expr) + }); + } + req +} + +/// Computes the finer ordering for between given existing ordering requirement +/// of aggregate expression. +/// +/// # Parameters +/// +/// * `existing_req` - The existing lexical ordering that needs refinement. +/// * `aggr_expr` - A reference to an aggregate expression trait object. +/// * `group_by` - Information about the physical grouping (e.g group by expression). +/// * `eq_properties` - Equivalence properties relevant to the computation. +/// * `agg_mode` - The mode of aggregation (e.g., Partial, Final, etc.). +/// +/// # Returns +/// +/// An `Option` representing the computed finer lexical ordering, +/// or `None` if there is no finer ordering; e.g. the existing requirement and +/// the aggregator requirement is incompatible. +fn finer_ordering( + existing_req: &LexOrdering, + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Option { + let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode); + eq_properties.get_finer_ordering(existing_req, &aggr_req) +} + +/// Get the common requirement that satisfies all the aggregate expressions. +/// +/// # Parameters +/// +/// - `aggr_exprs`: A slice of `Arc` containing all the +/// aggregate expressions. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance +/// representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. +/// +/// # Returns +/// +/// A `LexRequirement` instance, which is the requirement that satisfies all the +/// aggregate requirements. Returns an error in case of conflicting requirements. +fn get_aggregate_exprs_requirement( + aggr_exprs: &[Arc], + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Result { + let mut requirement = vec![]; + for aggr_expr in aggr_exprs.iter() { + if let Some(finer_ordering) = + finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode) + { + requirement = finer_ordering; + } else { + // If neither of the requirements satisfy the other, this means + // requirements are conflicting. Currently, we do not support + // conflicting requirements. + return not_impl_err!( + "Conflicting ordering requirements in aggregate functions is not supported" + ); + } + } + Ok(PhysicalSortRequirement::from_sort_exprs(&requirement)) +} + /// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: /// * Partial: AggregateExpr::expressions @@ -1013,33 +952,27 @@ fn aggregate_expressions( | AggregateMode::SinglePartitioned => Ok(aggr_expr .iter() .map(|agg| { - let mut result = agg.expressions().clone(); - // In partial mode, append ordering requirements to expressions' results. - // Ordering requirements are used by subsequent executors to satisfy the required - // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes. - if matches!(mode, AggregateMode::Partial) { - if let Some(ordering_req) = agg.order_bys() { - let ordering_exprs = ordering_req - .iter() - .map(|item| item.expr.clone()) - .collect::>(); - result.extend(ordering_exprs); - } + let mut result = agg.expressions(); + // Append ordering requirements to expressions' results. This + // way order sensitive aggregators can satisfy requirement + // themselves. + if let Some(ordering_req) = agg.order_bys() { + result.extend(ordering_req.iter().map(|item| item.expr.clone())); } result }) .collect()), - // in this mode, we build the merge expressions of the aggregation + // In this mode, we build the merge expressions of the aggregation. AggregateMode::Final | AggregateMode::FinalPartitioned => { let mut col_idx_base = col_idx_base; - Ok(aggr_expr + aggr_expr .iter() .map(|agg| { let exprs = merge_expressions(col_idx_base, agg)?; col_idx_base += exprs.len(); Ok(exprs) }) - .collect::>>()?) + .collect() } } } @@ -1052,14 +985,13 @@ fn merge_expressions( index_base: usize, expr: &Arc, ) -> Result>> { - Ok(expr - .state_fields()? - .iter() - .enumerate() - .map(|(idx, f)| { - Arc::new(Column::new(f.name(), index_base + idx)) as Arc - }) - .collect::>()) + expr.state_fields().map(|fields| { + fields + .iter() + .enumerate() + .map(|(idx, f)| Arc::new(Column::new(f.name(), index_base + idx)) as _) + .collect() + }) } pub(crate) type AccumulatorItem = Box; @@ -1070,7 +1002,7 @@ fn create_accumulators( aggr_expr .iter() .map(|expr| expr.create_accumulator()) - .collect::>>() + .collect() } /// returns a vector of ArrayRefs, where each entry corresponds to either the @@ -1081,8 +1013,8 @@ fn finalize_aggregation( ) -> Result> { match mode { AggregateMode::Partial => { - // build the vector of states - let a = accumulators + // Build the vector of states + accumulators .iter() .map(|accumulator| { accumulator.state().and_then(|e| { @@ -1091,18 +1023,18 @@ fn finalize_aggregation( .collect::>>() }) }) - .collect::>>()?; - Ok(a.iter().flatten().cloned().collect::>()) + .flatten_ok() + .collect() } AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Single | AggregateMode::SinglePartitioned => { - // merge the state to the final value + // Merge the state to the final value accumulators .iter() .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array())) - .collect::>>() + .collect() } } } @@ -1125,9 +1057,7 @@ pub(crate) fn evaluate_many( expr: &[Vec>], batch: &RecordBatch, ) -> Result>> { - expr.iter() - .map(|expr| evaluate(expr, batch)) - .collect::>>() + expr.iter().map(|expr| evaluate(expr, batch)).collect() } fn evaluate_optional( @@ -1143,7 +1073,7 @@ fn evaluate_optional( }) .transpose() }) - .collect::>>() + .collect() } /// Evaluate a group by expression against a `RecordBatch` @@ -1204,9 +1134,7 @@ mod tests { use std::task::{Context, Poll}; use super::*; - use crate::aggregates::{ - get_finest_requirement, AggregateExec, AggregateMode, PhysicalGroupBy, - }; + use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; @@ -1228,15 +1156,16 @@ mod tests { Result, ScalarValue, }; use datafusion_execution::config::SessionConfig; + use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Count, FirstValue, LastValue, Median, + lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr, + PhysicalSortExpr, }; - use datafusion_execution::memory_pool::FairSpillPool; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) @@ -2093,11 +2022,6 @@ mod tests { descending: false, nulls_first: false, }; - // This is the reverse requirement of options1 - let options2 = SortOptions { - descending: true, - nulls_first: true, - }; let col_a = &col("a", &test_schema)?; let col_b = &col("b", &test_schema)?; let col_c = &col("c", &test_schema)?; @@ -2106,7 +2030,7 @@ mod tests { eq_properties.add_equal_conditions(col_a, col_b); // Aggregate requirements are // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively - let mut order_by_exprs = vec![ + let order_by_exprs = vec![ None, Some(vec![PhysicalSortExpr { expr: col_a.clone(), @@ -2136,14 +2060,8 @@ mod tests { options: options1, }, ]), - // Since aggregate expression is reversible (FirstValue), we should be able to resolve below - // contradictory requirement by reversing it. - Some(vec![PhysicalSortExpr { - expr: col_b.clone(), - options: options2, - }]), ]; - let common_requirement = Some(vec![ + let common_requirement = vec![ PhysicalSortExpr { expr: col_a.clone(), options: options1, @@ -2152,17 +2070,28 @@ mod tests { expr: col_c.clone(), options: options1, }, - ]); - let aggr_expr = Arc::new(FirstValue::new( - col_a.clone(), - "first1", - DataType::Int32, - vec![], - vec![], - )) as _; - let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()]; - let res = - get_finest_requirement(&mut aggr_exprs, &mut order_by_exprs, &eq_properties)?; + ]; + let aggr_exprs = order_by_exprs + .into_iter() + .map(|order_by_expr| { + Arc::new(OrderSensitiveArrayAgg::new( + col_a.clone(), + "array_agg", + DataType::Int32, + false, + vec![], + order_by_expr.unwrap_or_default(), + )) as _ + }) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(vec![]); + let res = get_aggregate_exprs_requirement( + &aggr_exprs, + &group_by, + &eq_properties, + &AggregateMode::Partial, + )?; + let res = PhysicalSortRequirement::to_sort_exprs(res); assert_eq!(res, common_requirement); Ok(()) } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index b4662910f7bd..8eca4cc79c99 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1764,8 +1764,8 @@ pub(crate) fn writer_properties_to_proto( pub(crate) fn writer_properties_from_proto( props: &protobuf::WriterProperties, ) -> Result { - let writer_version = WriterVersion::from_str(&props.writer_version) - .map_err(|e| proto_error(e.to_string()))?; + let writer_version = + WriterVersion::from_str(&props.writer_version).map_err(proto_error)?; Ok(WriterProperties::builder() .set_created_by(props.created_by.clone()) .set_writer_version(writer_version) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs index 663bbdd5a3c7..8e2bbbfe4f69 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs @@ -21,5 +21,4 @@ mod normalize; mod runner; pub use error::*; -pub use normalize::*; pub use runner::*; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 4c4adbabfda5..b8d89edb49b1 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -994,18 +994,33 @@ select array_pop_back(make_array(1, 2, 3, 4, 5)), array_pop_back(make_array('h', ---- [1, 2, 3, 4] [h, e, l, l] +query ?? +select array_pop_back(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)')), array_pop_back(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)')); +---- +[1, 2, 3, 4] [h, e, l, l] + # array_pop_back scalar function #2 (after array_pop_back, array is empty) query ? select array_pop_back(make_array(1)); ---- [] +query ? +select array_pop_back(arrow_cast(make_array(1), 'LargeList(Int64)')); +---- +[] + # array_pop_back scalar function #3 (array_pop_back the empty array) query ? select array_pop_back(array_pop_back(make_array(1))); ---- [] +query ? +select array_pop_back(array_pop_back(arrow_cast(make_array(1), 'LargeList(Int64)'))); +---- +[] + # array_pop_back scalar function #4 (array_pop_back the arrays which have NULL) query ?? select array_pop_back(make_array(1, 2, 3, 4, NULL)), array_pop_back(make_array(NULL, 'e', 'l', NULL, 'o')); @@ -1018,24 +1033,44 @@ select array_pop_back(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_ ---- [[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] +query ? +select array_pop_back(arrow_cast(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4), make_array(4, 5, 6)), 'LargeList(List(Int64))')); +---- +[[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] + # array_pop_back scalar function #6 (array_pop_back the nested arrays with NULL) query ? select array_pop_back(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4), NULL)); ---- [[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] +query ? +select array_pop_back(arrow_cast(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4), NULL), 'LargeList(List(Int64))')); +---- +[[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] + # array_pop_back scalar function #7 (array_pop_back the nested arrays with NULL) query ? select array_pop_back(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), NULL, make_array(1, 7, 4))); ---- [[1, 2, 3], [2, 9, 1], [7, 8, 9], ] +query ? +select array_pop_back(arrow_cast(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), NULL, make_array(1, 7, 4)), 'LargeList(List(Int64))')); +---- +[[1, 2, 3], [2, 9, 1], [7, 8, 9], ] + # array_pop_back scalar function #8 (after array_pop_back, nested array is empty) query ? select array_pop_back(make_array(make_array(1, 2, 3))); ---- [] +query ? +select array_pop_back(arrow_cast(make_array(make_array(1, 2, 3)), 'LargeList(List(Int64))')); +---- +[] + # array_pop_back with columns query ? select array_pop_back(column1) from arrayspop; @@ -1047,6 +1082,16 @@ select array_pop_back(column1) from arrayspop; [] [, 10, 11] +query ? +select array_pop_back(arrow_cast(column1, 'LargeList(Int64)')) from arrayspop; +---- +[1, 2] +[3, 4, 5] +[6, 7, 8, ] +[, ] +[] +[, 10, 11] + ## array_pop_front (aliases: `list_pop_front`) # array_pop_front scalar function #1 @@ -1055,36 +1100,66 @@ select array_pop_front(make_array(1, 2, 3, 4, 5)), array_pop_front(make_array('h ---- [2, 3, 4, 5] [e, l, l, o] +query ?? +select array_pop_front(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)')), array_pop_front(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)')); +---- +[2, 3, 4, 5] [e, l, l, o] + # array_pop_front scalar function #2 (after array_pop_front, array is empty) query ? select array_pop_front(make_array(1)); ---- [] +query ? +select array_pop_front(arrow_cast(make_array(1), 'LargeList(Int64)')); +---- +[] + # array_pop_front scalar function #3 (array_pop_front the empty array) query ? select array_pop_front(array_pop_front(make_array(1))); ---- [] +query ? +select array_pop_front(array_pop_front(arrow_cast(make_array(1), 'LargeList(Int64)'))); +---- +[] + # array_pop_front scalar function #5 (array_pop_front the nested arrays) query ? select array_pop_front(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4), make_array(4, 5, 6))); ---- [[2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4], [4, 5, 6]] +query ? +select array_pop_front(arrow_cast(make_array(make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4), make_array(4, 5, 6)), 'LargeList(List(Int64))')); +---- +[[2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4], [4, 5, 6]] + # array_pop_front scalar function #6 (array_pop_front the nested arrays with NULL) query ? select array_pop_front(make_array(NULL, make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4))); ---- [[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] +query ? +select array_pop_front(arrow_cast(make_array(NULL, make_array(1, 2, 3), make_array(2, 9, 1), make_array(7, 8, 9), make_array(1, 2, 3), make_array(1, 7, 4)), 'LargeList(List(Int64))')); +---- +[[1, 2, 3], [2, 9, 1], [7, 8, 9], [1, 2, 3], [1, 7, 4]] + # array_pop_front scalar function #8 (after array_pop_front, nested array is empty) query ? select array_pop_front(make_array(make_array(1, 2, 3))); ---- [] +query ? +select array_pop_front(arrow_cast(make_array(make_array(1, 2, 3)), 'LargeList(List(Int64))')); +---- +[] + ## array_slice (aliases: list_slice) # array_slice scalar function #1 (with positive indexes) diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index 9a7117b69b99..3f609e254839 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -78,7 +78,7 @@ c 4 query I SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3; ---- -5 +4 4 2 1 @@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes ------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted ---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true +------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true # ON expressions are not a sub-set of the ORDER BY expressions query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index f1b6a57287b5..bbf21e135fe4 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2019,17 +2019,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0]) ---------------SortExec: expr=[col0@3 ASC NULLS LAST] -----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] -------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] +------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] +--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] +----------------CoalesceBatchesExec: target_batch_size=8192 +------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2209,7 +2208,7 @@ ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III -SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c +SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c FROM annotated_data_infinite2 GROUP BY a, b ---- @@ -2509,7 +2508,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 DESC] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2540,7 +2539,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2572,7 +2571,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2637,9 +2636,8 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ------TableScan: sales_global projection=[country, ts, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2672,8 +2670,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 DESC] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2709,12 +2706,11 @@ physical_plan SortExec: expr=[sn@2 ASC NULLS LAST] --ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate] ----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)] -------SortExec: expr=[sn@5 ASC NULLS LAST] ---------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] -----------CoalesceBatchesExec: target_batch_size=8192 -------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 ---------------MemoryExec: partitions=1, partition_sizes=[1] ---------------MemoryExec: partitions=1, partition_sizes=[1] +------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] +--------CoalesceBatchesExec: target_batch_size=8192 +----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 +------------MemoryExec: partitions=1, partition_sizes=[1] +------------MemoryExec: partitions=1, partition_sizes=[1] query ITIPTR rowsort SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate @@ -2759,8 +2755,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 --------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2791,13 +2786,12 @@ physical_plan SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2] -------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2831,16 +2825,15 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts --AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, LAST_VALUE(amount ORDER BY ts ASC) AS fv2 FROM sales_global ---- -30 80 +30 100 # Conversion in between FIRST_VALUE and LAST_VALUE to resolve # contradictory requirements should work in multi partitions. @@ -2855,12 +2848,11 @@ Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS ----TableScan: sales_global projection=[ts, amount] physical_plan ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2] ---AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----CoalescePartitionsExec -------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2993,10 +2985,10 @@ physical_plan SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------------SortExec: expr=[amount@1 DESC] ----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3639,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_tab ----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] ---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 ---------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 9a349f600091..a7146a5a91c4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3454,7 +3454,7 @@ SortPreservingMergeExec: [a@0 ASC] ------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2 -------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0]) +------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------------CoalesceBatchesExec: target_batch_size=2 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] ------------------CoalesceBatchesExec: target_batch_size=2 @@ -3462,7 +3462,7 @@ SortPreservingMergeExec: [a@0 ASC] ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true ------------------CoalesceBatchesExec: target_batch_size=2 ---------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC NULLS LAST +--------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 2b3b4bf2e45b..c84e46c965fa 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -1730,14 +1730,11 @@ SELECT TIMESTAMPTZ '2022-01-01 01:10:00 AEST' query P rowsort SELECT TIMESTAMPTZ '2022-01-01 01:10:00 Australia/Sydney' as ts_geo UNION ALL -SELECT TIMESTAMPTZ '2022-01-01 01:10:00 Antarctica/Vostok' as ts_geo - UNION ALL SELECT TIMESTAMPTZ '2022-01-01 01:10:00 Africa/Johannesburg' as ts_geo UNION ALL SELECT TIMESTAMPTZ '2022-01-01 01:10:00 America/Los_Angeles' as ts_geo ---- 2021-12-31T14:10:00Z -2021-12-31T19:10:00Z 2021-12-31T23:10:00Z 2022-01-01T09:10:00Z diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index 11cf52eb3fcf..c51e4de3236c 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -76,7 +76,9 @@ The challenge however is that DataFusion doesn't know about this function. We ne ### Registering a Scalar UDF -To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `SessionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier. +To register a Scalar UDF, you need to wrap the function implementation in a [`ScalarUDF`] struct and then register it with the `SessionContext`. +DataFusion provides the [`create_udf`] and helper functions to make this easier. +There is a lower level API with more functionality but is more complex, that is documented in [`advanced_udf.rs`]. ```rust use datafusion::logical_expr::{Volatility, create_udf}; @@ -93,6 +95,11 @@ let udf = create_udf( ); ``` +[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html +[`create_udf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udf.html +[`make_scalar_function`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/fn.make_scalar_function.html +[`advanced_udf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs + A few things to note: - The first argument is the name of the function. This is the name that will be used in SQL queries. diff --git a/docs/source/user-guide/introduction.md b/docs/source/user-guide/introduction.md index 6c1e54c2b701..b737c3bab266 100644 --- a/docs/source/user-guide/introduction.md +++ b/docs/source/user-guide/introduction.md @@ -75,7 +75,7 @@ latency). Here are some example systems built using DataFusion: -- Specialized Analytical Database systems such as [CeresDB] and more general Apache Spark like system such a [Ballista]. +- Specialized Analytical Database systems such as [HoraeDB] and more general Apache Spark like system such a [Ballista]. - New query language engines such as [prql-query] and accelerators such as [VegaFusion] - Research platform for new Database Systems, such as [Flock] - SQL support to another library, such as [dask sql] @@ -96,7 +96,6 @@ Here are some active projects using DataFusion: - [Arroyo](https://github.com/ArroyoSystems/arroyo) Distributed stream processing engine in Rust - [Ballista](https://github.com/apache/arrow-ballista) Distributed SQL Query Engine -- [CeresDB](https://github.com/CeresDB/ceresdb) Distributed Time-Series Database - [CnosDB](https://github.com/cnosdb/cnosdb) Open Source Distributed Time Series Database - [Cube Store](https://github.com/cube-js/cube.js/tree/master/rust) - [Dask SQL](https://github.com/dask-contrib/dask-sql) Distributed SQL query engine in Python @@ -104,6 +103,7 @@ Here are some active projects using DataFusion: - [delta-rs](https://github.com/delta-io/delta-rs) Native Rust implementation of Delta Lake - [GreptimeDB](https://github.com/GreptimeTeam/greptimedb) Open Source & Cloud Native Distributed Time Series Database - [GlareDB](https://github.com/GlareDB/glaredb) Fast SQL database for querying and analyzing distributed data. +- [HoraeDB](https://github.com/apache/incubator-horaedb) Distributed Time-Series Database - [InfluxDB IOx](https://github.com/influxdata/influxdb_iox) Time Series Database - [Kamu](https://github.com/kamu-data/kamu-cli/) Planet-scale streaming data pipeline - [LakeSoul](https://github.com/lakesoul-io/LakeSoul) Open source LakeHouse framework with native IO in Rust. @@ -128,7 +128,6 @@ Here are some less active projects that used DataFusion: [ballista]: https://github.com/apache/arrow-ballista [blaze]: https://github.com/blaze-init/blaze -[ceresdb]: https://github.com/CeresDB/ceresdb [cloudfuse buzz]: https://github.com/cloudfuse-io/buzz-rust [cnosdb]: https://github.com/cnosdb/cnosdb [cube store]: https://github.com/cube-js/cube.js/tree/master/rust @@ -138,6 +137,7 @@ Here are some less active projects that used DataFusion: [flock]: https://github.com/flock-lab/flock [kamu]: https://github.com/kamu-data/kamu-cli [greptime db]: https://github.com/GreptimeTeam/greptimedb +[horaedb]: https://github.com/apache/incubator-horaedb [influxdb iox]: https://github.com/influxdata/influxdb_iox [parseable]: https://github.com/parseablehq/parseable [prql-query]: https://github.com/prql/prql-query