Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement bitmap_distinct function using bitmap #1827

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ enum AggregateFunction {
CORRELATION=13;
APPROX_PERCENTILE_CONT = 14;
APPROX_MEDIAN=15;
BITMAP_DISTINCT=16;
}

message AggregateExprNode {
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::ApproxMedian => {
protobuf::AggregateFunction::ApproxMedian
}
AggregateFunction::BitMapCountDistinct=> {
protobuf::AggregateFunction::BitmapDistinct
}
};

let aggregate_expr = protobuf::AggregateExprNode {
Expand Down Expand Up @@ -995,6 +998,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont,
AggregateFunction::ApproxMedian => Self::ApproxMedian,
AggregateFunction::BitMapCountDistinct => Self::BitmapDistinct,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
AggregateFunction::ApproxPercentileCont
}
protobuf::AggregateFunction::ApproxMedian => AggregateFunction::ApproxMedian,
protobuf::AggregateFunction::BitmapDistinct => AggregateFunction::BitMapCountDistinct,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion-expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub enum AggregateFunction {
ApproxPercentileCont,
/// ApproxMedian
ApproxMedian,
/// BitMap count distinct function
BitMapCountDistinct,
}

impl fmt::Display for AggregateFunction {
Expand Down Expand Up @@ -87,6 +89,7 @@ impl FromStr for AggregateFunction {
"corr" => AggregateFunction::Correlation,
"approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
"approx_median" => AggregateFunction::ApproxMedian,
"bitmap_distinct" => AggregateFunction::BitMapCountDistinct,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
Expand Down
9 changes: 9 additions & 0 deletions datafusion-expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr {
}
}

/// Returns the precise number of distinct input values using bitmap.
pub fn bitmap_count_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregate_function::AggregateFunction::BitMapCountDistinct,
distinct: false,
args: vec![expr],
}
}

// TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
// varying arity functions
/// Create an convenience function representing a unary scalar function
Expand Down
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.15", optional = true }
tempfile = "3"
parking_lot = "0.12"
roaring = "0.8.1"

[dev-dependencies]
criterion = "0.3"
Expand Down
16 changes: 12 additions & 4 deletions datafusion/src/physical_plan/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ pub fn return_type(

match fun {
// TODO If the datafusion is compatible with PostgreSQL, the returned data type should be INT64.
AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
Ok(DataType::UInt64)
}
AggregateFunction::Count
| AggregateFunction::ApproxDistinct
| AggregateFunction::BitMapCountDistinct => Ok(DataType::UInt64),
AggregateFunction::Max | AggregateFunction::Min => {
// For min and max agg function, the returned type is same as input type.
// The coerced_data_types is same with input_types.
Expand Down Expand Up @@ -281,6 +281,13 @@ pub fn create_aggregate_expr(
"MEDIAN(DISTINCT) aggregations are not available".to_string(),
));
}
(AggregateFunction::BitMapCountDistinct, _) => {
Arc::new(expressions::BitMapDistinct::new(
coerced_phy_exprs[0].clone(),
name,
coerced_exprs_types[0].clone(),
))
}
})
}

Expand Down Expand Up @@ -314,7 +321,8 @@ pub(super) fn signature(fun: &AggregateFunction) -> Signature {
match fun {
AggregateFunction::Count
| AggregateFunction::ApproxDistinct
| AggregateFunction::ArrayAgg => Signature::any(1, Volatility::Immutable),
| AggregateFunction::ArrayAgg
| AggregateFunction::BitMapCountDistinct=> Signature::any(1, Volatility::Immutable),
AggregateFunction::Min | AggregateFunction::Max => {
let valid = STRINGS
.iter()
Expand Down
11 changes: 10 additions & 1 deletion datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::physical_plan::aggregates::AggregateFunction;
use crate::physical_plan::expressions::{
is_avg_support_arg_type, is_correlation_support_arg_type,
is_covariance_support_arg_type, is_stddev_support_arg_type, is_sum_support_arg_type,
is_variance_support_arg_type, try_cast,
is_variance_support_arg_type, is_bitmap_count_distinct_supported_arg_type, try_cast,
};
use crate::physical_plan::functions::{Signature, TypeSignature};
use crate::physical_plan::PhysicalExpr;
Expand Down Expand Up @@ -163,6 +163,15 @@ pub(crate) fn coerce_types(
}
Ok(input_types.to_vec())
}
AggregateFunction::BitMapCountDistinct => {
if !is_bitmap_count_distinct_supported_arg_type(&input_types[0]) {
return Err(DataFusionError::Plan(format!(
"The function {:?} does not support inputs of type {:?}.",
agg_fun, input_types[0]
)));
}
Ok(input_types.to_vec())
}
}
}

Expand Down
217 changes: 217 additions & 0 deletions datafusion/src/physical_plan/expressions/bitmap_distinct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution

use std::any::Any;

use std::fmt::Debug;
use std::ops::BitOrAssign;
use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
UInt32Array, UInt8Array,
};
use arrow::datatypes::{DataType, Field};
use log::info;
use roaring::RoaringBitmap;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;

use super::format_state_name;

/// APPROX_DISTINCT aggregate expression
#[derive(Debug)]
pub struct BitMapDistinct {
name: String,
input_data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
}

impl BitMapDistinct {
/// Create a new ApproxDistinct aggregate function.
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
) -> Self {
Self {
name: name.into(),
input_data_type,
expr,
}
}
}

impl AggregateExpr for BitMapDistinct {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, DataType::UInt64, false))
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::Int8
| DataType::Int16
| DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
//DataType::UInt64 => Box::new(),
//DataType::Int64 => Box::new(),
other => {
return Err(DataFusionError::NotImplemented(format!(
"Support for 'bitmap_distinct' for data type {} is not implemented",
other
)))
}
};
Ok(accumulator)
}

fn state_fields(&self) -> Result<Vec<Field>> {
Ok(vec![Field::new(
&format_state_name(&self.name, "bitmap_registers"),
DataType::Binary,
false,
)])
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
struct BitmapDistinctCountAccumulator {
bitmap: roaring::bitmap::RoaringBitmap,
}

impl BitmapDistinctCountAccumulator {
fn try_new() -> Self {
Self {
bitmap: RoaringBitmap::new(),
}
}
}

impl Accumulator for BitmapDistinctCountAccumulator {
//state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut bytes = vec![];
self.bitmap.serialize_into(&mut bytes).unwrap();
Ok(vec![ScalarValue::Binary(Some(bytes))])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
//implement this in arrow-rs with simd
let value = &values[0];
if value.is_empty() {
info!("BitmapDistinctCountAccumulator update_batch in empty batch");
return Ok(());
}
match value.data_type() {
DataType::Int8 => {
let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i) as u32);
}
}
DataType::Int16 => {
let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i) as u32);
}
}
DataType::Int32 => {
let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i) as u32);
}
}
DataType::UInt8 => {
let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i) as u32);
}
}
DataType::UInt16 => {
let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i) as u32);
}
}
DataType::UInt32 => {
let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
for i in 0..array.len() {
self.bitmap.insert(array.value(i));
}
}
e => {
return Err(DataFusionError::Internal(format!(
"BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
e
)));
}
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
assert_eq!(1, states.len(), "expect only 1 element in the states");
let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();

for b in binary_array.iter() {
let v = b.ok_or_else(|| {
DataFusionError::Internal(
"Impossibly got empty binary array from states".into(),
)
})?;
let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
self.bitmap.bitor_assign(bitmap);
}
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::from(self.bitmap.len()))
}
}

pub(crate) fn is_bitmap_count_distinct_supported_arg_type(arg_type: &DataType) -> bool {
matches!(
arg_type,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::Int8
| DataType::Int16
| DataType::Int32
)
}
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod approx_distinct;
mod approx_percentile_cont;
mod array_agg;
mod average;
mod bitmap_distinct;
#[macro_use]
mod binary;
mod case;
Expand Down Expand Up @@ -112,6 +113,9 @@ pub use try_cast::{try_cast, TryCastExpr};
pub(crate) use variance::{
is_variance_support_arg_type, variance_return_type, Variance, VariancePop,
};
pub(crate) use bitmap_distinct::{
is_bitmap_count_distinct_supported_arg_type, BitMapDistinct
};

/// returns the name of the state
pub fn format_state_name(name: &str, state_name: &str) -> String {
Expand Down