Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into nl_join_reorder
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Apr 1, 2024
2 parents b04e2a7 + d8d521a commit bdd5905
Show file tree
Hide file tree
Showing 14 changed files with 581 additions and 333 deletions.
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@
//! overview of how DataFusion is organized and then link to other
//! sections of the docs with more details -->
//!
//! You can find a formal description of DataFusion's architecture in our
//! [SIGMOD 2024 Paper].
//!
//! [SIGMOD 2024 Paper]: https://github.com/apache/arrow-datafusion/files/14789704/DataFusion_Query_Engine___SIGMOD_2024-FINAL.pdf
//!
//! ## Overview Presentations
//!
//! The following presentations offer high level overviews of the
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ required-features = ["datetime_expressions"]
harness = false
name = "to_char"
required-features = ["datetime_expressions"]

[[bench]]
harness = false
name = "substr_index"
required-features = ["unicode_expressions"]
103 changes: 103 additions & 0 deletions datafusion/functions/benches/substr_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate criterion;

use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, StringArray};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::distributions::{Alphanumeric, Uniform};
use rand::prelude::Distribution;
use rand::Rng;

use datafusion_expr::ColumnarValue;
use datafusion_functions::unicode::substr_index;

struct Filter<Dist, Test> {
dist: Dist,
test: Test,
}

impl<T, Dist, Test> Distribution<T> for Filter<Dist, Test>
where
Dist: Distribution<T>,
Test: Fn(&T) -> bool,
{
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> T {
loop {
let x = self.dist.sample(rng);
if (self.test)(&x) {
return x;
}
}
}
}

fn data() -> (StringArray, StringArray, Int64Array) {
let dist = Filter {
dist: Uniform::new(-4, 5),
test: |x: &i64| x != &0,
};
let mut rng = rand::thread_rng();
let mut strings: Vec<String> = vec![];
let mut delimiters: Vec<String> = vec![];
let mut counts: Vec<i64> = vec![];

for _ in 0..1000 {
let length = rng.gen_range(20..50);
let text: String = (&mut rng)
.sample_iter(&Alphanumeric)
.take(length)
.map(char::from)
.collect();
let char = rng.gen_range(0..text.len());
let delimiter = &text.chars().nth(char).unwrap();
let count = rng.sample(&dist);

strings.push(text);
delimiters.push(delimiter.to_string());
counts.push(count);
}

(
StringArray::from(strings),
StringArray::from(delimiters),
Int64Array::from(counts),
)
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("substr_index_array_array_1000", |b| {
let (strings, delimiters, counts) = data();
let strings = ColumnarValue::Array(Arc::new(strings) as ArrayRef);
let delimiters = ColumnarValue::Array(Arc::new(delimiters) as ArrayRef);
let counts = ColumnarValue::Array(Arc::new(counts) as ArrayRef);

let args = [strings, delimiters, counts];
b.iter(|| {
black_box(
substr_index()
.invoke(&args)
.expect("substr_index should work on valid values"),
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
107 changes: 106 additions & 1 deletion datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,112 @@ macro_rules! make_math_unary_udf {
};
}

#[macro_export]
/// Macro to create a binary math UDF.
///
/// A binary math function takes two arguments of types Float32 or Float64,
/// applies a binary floating function to the argument, and returns a value of the same type.
///
/// $UDF: the name of the UDF struct that implements `ScalarUDFImpl`
/// $GNAME: a singleton instance of the UDF
/// $NAME: the name of the function
/// $BINARY_FUNC: the binary function to apply to the argument
/// $MONOTONIC_FUNC: the monotonicity of the function
macro_rules! make_math_binary_udf {
($UDF:ident, $GNAME:ident, $NAME:ident, $BINARY_FUNC:ident, $MONOTONICITY:expr) => {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);

mod $NAME {
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}

impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
Exact(vec![Float32, Float32]),
Exact(vec![Float64, Float64]),
],
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];

match arg_type {
DataType::Float32 => Ok(DataType::Float32),
// For other types (possible values float64/null/int), use Float64
_ => Ok(DataType::Float64),
}
}

fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok($MONOTONICITY)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;

let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float64Array,
{ f64::$BINARY_FUNC }
)),

DataType::Float32 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float32Array,
{ f32::$BINARY_FUNC }
)),
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
}
}
};
}

macro_rules! make_function_inputs2 {
($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{
let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE);
Expand Down
140 changes: 0 additions & 140 deletions datafusion/functions/src/math/atan2.rs

This file was deleted.

Loading

0 comments on commit bdd5905

Please sign in to comment.