From 8da0bc3b7e9b319ae84460548c36266198465d3d Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 25 Nov 2024 22:42:15 -0500 Subject: [PATCH 1/9] feat: Add GroupColumn `Decimal128Array --- .../src/aggregates/group_values/mod.rs | 5 ++- .../group_values/multi_group_by/mod.rs | 13 ++++++- .../group_values/multi_group_by/primitive.rs | 11 +++++- .../sqllogictest/test_files/group_by.slt | 39 +++++++++++++++++++ 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 58bc7bb90a88..e4a7eb049e9e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -19,7 +19,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::types::{ - Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, + Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; @@ -170,6 +170,9 @@ pub(crate) fn new_group_values( TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d), TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d), }, + DataType::Decimal128(_, _) => { + downcast_helper!(Decimal128Type, d); + } DataType::Utf8 => { return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 89041eb0f04e..333eb6bbcbe8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -31,8 +31,8 @@ use crate::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::compute::cast; use arrow::datatypes::{ - BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType, + BinaryViewType, Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, @@ -1008,6 +1008,14 @@ impl GroupValues for GroupValuesColumn { ) } }, + &DataType::Decimal128(_, _) => { + instantiate_primitive! { + v, + nullable, + Decimal128Type, + data_type + } + } &DataType::Utf8 => { let b = ByteGroupValueBuilder::::new(OutputType::Utf8); v.push(Box::new(b) as _) @@ -1214,6 +1222,7 @@ fn supported_type(data_type: &DataType) -> bool { | DataType::UInt64 | DataType::Float32 | DataType::Float64 + | DataType::Decimal128(_, _) | DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 4686a78f24b0..78cdf93f60b1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -22,6 +22,7 @@ use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; use arrow_schema::DataType; use datafusion_execution::memory_pool::proxy::VecAllocExt; +use datafusion_physical_expr::aggregate::utils::adjust_output_array; use itertools::izip; use std::iter; use std::sync::Arc; @@ -190,9 +191,15 @@ impl GroupColumn assert!(nulls.is_none(), "unexpected nulls in non nullable input"); } - let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls); + let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls) + .with_data_type(data_type.clone()); + let array_ref = Arc::new(arr) as ArrayRef; + + let adjusted_array = adjust_output_array(&data_type, array_ref) + .expect("Failed to adjust array data type"); + // Set timezone information for timestamp - Arc::new(arr.with_data_type(data_type)) + adjusted_array } fn take_n(&mut self, n: usize) -> ArrayRef { diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 4acf519c5de4..df7e21c2da44 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5499,3 +5499,42 @@ SELECT GROUP BY ts, text ---- foo 2024-01-01T08:00:00+08:00 + +# Test multi group by int + Decimal128 +statement ok +create table source as values +(1, '123.45'), +(1, '123.45'), +(2, '678.90'), +(2, '1011.12'), +(3, '1314.15'), +(3, '1314.15'), +(2, '1011.12'), +(null, null), +(null, '123.45'), +(null, null), +(null, '123.45'), +(2, '678.90'), +(2, '678.90'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Decimal128(10, 2)') as b from source; + +query IRI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 123.45 2 +1 NULL 1 +2 678.9 3 +2 1011.12 2 +3 1314.15 2 +NULL 123.45 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; From dcc488dc11945051f47679735490a8259d4c5c92 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 25 Nov 2024 23:32:32 -0500 Subject: [PATCH 2/9] fix clippy --- .../aggregates/group_values/multi_group_by/primitive.rs | 6 ++---- .../aggregates/group_values/single_group_by/primitive.rs | 8 +++++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 78cdf93f60b1..a6908f336315 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -195,11 +195,9 @@ impl GroupColumn .with_data_type(data_type.clone()); let array_ref = Arc::new(arr) as ArrayRef; - let adjusted_array = adjust_output_array(&data_type, array_ref) - .expect("Failed to adjust array data type"); - // Set timezone information for timestamp - adjusted_array + adjust_output_array(&data_type, array_ref) + .expect("Failed to adjust array data type") } fn take_n(&mut self, n: usize) -> ArrayRef { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 05214ec10d68..484fa73dde2f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -28,6 +28,7 @@ use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; +use datafusion_physical_expr::aggregate::utils::adjust_output_array; use half::f16; use hashbrown::raw::RawTable; use std::mem::size_of; @@ -208,7 +209,12 @@ where build_primitive(split, null_group) } }; - Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) + let array_ref = Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; + + let adjusted_array = adjust_output_array(&self.data_type, array_ref) + .expect("Failed to adjust array data type"); + + Ok(vec![adjusted_array]) } fn clear_shrink(&mut self, batch: &RecordBatch) { From a707d128a31bb6b05205aa59afd9054e47f4eeaa Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 26 Nov 2024 00:15:21 -0500 Subject: [PATCH 3/9] fix errors --- .../tests/fuzz_cases/aggregation_fuzzer/data_generator.rs | 7 ++++++- .../aggregates/group_values/single_group_by/primitive.rs | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index e4c0cb6fe77f..a3a8271ba8d3 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -87,7 +87,12 @@ impl DatasetGeneratorConfig { .iter() .filter_map(|d| { if d.column_type.is_numeric() - && !matches!(d.column_type, DataType::Float32 | DataType::Float64) + && !matches!( + d.column_type, + DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + ) { Some(d.name.as_str()) } else { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 484fa73dde2f..c52f602cd7da 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -209,7 +209,8 @@ where build_primitive(split, null_group) } }; - let array_ref = Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; + let array_ref = + Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; let adjusted_array = adjust_output_array(&self.data_type, array_ref) .expect("Failed to adjust array data type"); From ea6f77aff4ae52faef6adf961697bb589820f844 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 26 Nov 2024 02:01:45 -0500 Subject: [PATCH 4/9] remove .with_data_type --- .../src/aggregates/group_values/multi_group_by/primitive.rs | 3 +-- .../src/aggregates/group_values/single_group_by/primitive.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index a6908f336315..98c129766043 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -191,8 +191,7 @@ impl GroupColumn assert!(nulls.is_none(), "unexpected nulls in non nullable input"); } - let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls) - .with_data_type(data_type.clone()); + let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls); let array_ref = Arc::new(arr) as ArrayRef; // Set timezone information for timestamp diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index c52f602cd7da..105d3f419cc8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -209,8 +209,7 @@ where build_primitive(split, null_group) } }; - let array_ref = - Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; + let array_ref = Arc::new(array) as ArrayRef; let adjusted_array = adjust_output_array(&self.data_type, array_ref) .expect("Failed to adjust array data type"); From 2101112d47542a4aa766ef1fafb92b65294ed15c Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 26 Nov 2024 03:22:20 -0500 Subject: [PATCH 5/9] fix --- .../src/aggregates/group_values/multi_group_by/primitive.rs | 3 ++- .../src/aggregates/group_values/single_group_by/primitive.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 98c129766043..a6908f336315 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -191,7 +191,8 @@ impl GroupColumn assert!(nulls.is_none(), "unexpected nulls in non nullable input"); } - let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls); + let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls) + .with_data_type(data_type.clone()); let array_ref = Arc::new(arr) as ArrayRef; // Set timezone information for timestamp diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 105d3f419cc8..484fa73dde2f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -209,7 +209,7 @@ where build_primitive(split, null_group) } }; - let array_ref = Arc::new(array) as ArrayRef; + let array_ref = Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; let adjusted_array = adjust_output_array(&self.data_type, array_ref) .expect("Failed to adjust array data type"); From 403cfa7d6b3c8f2c460992ccf418df02f33af43b Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 26 Nov 2024 03:39:45 -0500 Subject: [PATCH 6/9] fmt --- .../src/aggregates/group_values/single_group_by/primitive.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 484fa73dde2f..c52f602cd7da 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -209,7 +209,8 @@ where build_primitive(split, null_group) } }; - let array_ref = Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; + let array_ref = + Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; let adjusted_array = adjust_output_array(&self.data_type, array_ref) .expect("Failed to adjust array data type"); From afd6e2c9ea0f9c5af2398445d7df1611e398e53c Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 27 Nov 2024 17:19:22 -0500 Subject: [PATCH 7/9] remove adjust_output_array --- .../aggregates/group_values/multi_group_by/primitive.rs | 8 ++------ .../aggregates/group_values/single_group_by/primitive.rs | 8 +------- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index a6908f336315..f655b7f5c1b4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -191,13 +191,9 @@ impl GroupColumn assert!(nulls.is_none(), "unexpected nulls in non nullable input"); } - let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls) - .with_data_type(data_type.clone()); - let array_ref = Arc::new(arr) as ArrayRef; - + let arr = PrimitiveArray::::new(ScalarBuffer::from(group_values), nulls); // Set timezone information for timestamp - adjust_output_array(&data_type, array_ref) - .expect("Failed to adjust array data type") + Arc::new(arr.with_data_type(data_type)) } fn take_n(&mut self, n: usize) -> ArrayRef { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index c52f602cd7da..85cd2e79b936 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -28,7 +28,6 @@ use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; -use datafusion_physical_expr::aggregate::utils::adjust_output_array; use half::f16; use hashbrown::raw::RawTable; use std::mem::size_of; @@ -209,13 +208,8 @@ where build_primitive(split, null_group) } }; - let array_ref = - Arc::new(array.with_data_type(self.data_type.clone())) as ArrayRef; - let adjusted_array = adjust_output_array(&self.data_type, array_ref) - .expect("Failed to adjust array data type"); - - Ok(vec![adjusted_array]) + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } fn clear_shrink(&mut self, batch: &RecordBatch) { From 079942ca00652d09f77dc1465f5d400ead62ac67 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 27 Nov 2024 17:20:51 -0500 Subject: [PATCH 8/9] fix --- .../src/aggregates/group_values/multi_group_by/primitive.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index f655b7f5c1b4..4686a78f24b0 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -22,7 +22,6 @@ use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; use arrow_schema::DataType; use datafusion_execution::memory_pool::proxy::VecAllocExt; -use datafusion_physical_expr::aggregate::utils::adjust_output_array; use itertools::izip; use std::iter; use std::sync::Arc; From 76ec4f937002d0d06531cafe2a8cb69b9a7e7cfb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 2 Dec 2024 14:05:47 -0500 Subject: [PATCH 9/9] Add missing data type --- .../tests/fuzz_cases/aggregation_fuzzer/data_generator.rs | 7 +------ .../aggregates/group_values/multi_group_by/primitive.rs | 8 ++++---- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index a3a8271ba8d3..e4c0cb6fe77f 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -87,12 +87,7 @@ impl DatasetGeneratorConfig { .iter() .filter_map(|d| { if d.column_type.is_numeric() - && !matches!( - d.column_type, - DataType::Float32 - | DataType::Float64 - | DataType::Decimal128(_, _) - ) + && !matches!(d.column_type, DataType::Float32 | DataType::Float64) { Some(d.name.as_str()) } else { diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 4686a78f24b0..4ceeb634bad2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -200,10 +200,10 @@ impl GroupColumn let first_n_nulls = if NULLABLE { self.nulls.take_n(n) } else { None }; - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(first_n), - first_n_nulls, - )) + Arc::new( + PrimitiveArray::::new(ScalarBuffer::from(first_n), first_n_nulls) + .with_data_type(self.data_type.clone()), + ) } }