Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a CardinalityAwareRowConverter #4736

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ use std::sync::Arc;

use arrow_array::cast::*;
use arrow_array::*;
use arrow_array::types::*;
use arrow_buffer::ArrowNativeType;
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
Expand Down Expand Up @@ -1445,6 +1446,70 @@ unsafe fn decode_column(
Ok(array)
}

macro_rules! downcast_dict {
($array:ident, $key:ident) => {{
$array
.as_any()
.downcast_ref::<DictionaryArray<$key>>()
.unwrap()
}};
}

const LOW_CARDINALITY_THRESHOLD: usize = 10;

#[derive(Debug)]
pub struct CardinalityAwareRowConverter {
inner: RowConverter,
done: bool,
}

impl CardinalityAwareRowConverter {
pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
Ok(Self {
inner: RowConverter::new(fields)?,
done: false,
})
}

pub fn size(&self) -> usize {
self.inner.size()
}

pub fn convert_rows(&self, rows: &Rows) -> Result<Vec<ArrayRef>, ArrowError> {
self.inner.convert_rows(rows)
}

pub fn convert_columns(
&mut self,
columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
if !self.done {
for (i, col) in columns.iter().enumerate() {
if let DataType::Dictionary(k, _) = col.data_type() {
// let cardinality = col.as_any().downcast_ref::<DictionaryArray<Int32Type>>().unwrap().values().len();
let cardinality = match k.as_ref() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought that we should base the decision of "is this a high cardinality column" on the "in use" cardinality of the dictionary (aka how may distinct key values there were -- as suggested on apache/datafusion#7200 (comment))

However, I now realize that maybe the number of potential key values (aka the length of the values array) is actually a more robust predictor of being "high cardinality" (as the other values in the dictionary could be used in subsequent batches, perhaps)

Do you have any opinion @tustvold ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the RowConverter blindly generates a mapping for all values, regardless of if they appear in the keys, I think we should just use the length of the values. Whilst an argument could be made for doing something more sophisticated, this would only really make sense if the dictionary interner itself followed a similar approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thank you

DataType::Int8 => downcast_dict!(col, Int32Type).values().len(),
DataType::Int16 => downcast_dict!(col, Int32Type).values().len(),
DataType::Int32 => downcast_dict!(col, Int32Type).values().len(),
DataType::Int64 => downcast_dict!(col, Int64Type).values().len(),
DataType::UInt16 => downcast_dict!(col, UInt16Type).values().len(),
DataType::UInt32 => downcast_dict!(col, UInt32Type).values().len(),
DataType::UInt64 => downcast_dict!(col, UInt64Type).values().len(),
_ => unreachable!(),
};

if cardinality >= LOW_CARDINALITY_THRESHOLD {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will effectively switch the encoding mid-steam, I think -- which will mean that the output can't be compared with previously created rows, which is not correct.

I think the decision has to be made based on the first batch and then that decision used for encoding all rows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I am sorry, I didn't quite get it. I thought what I was doing was, tapping into the first batch of the stream, looking into it, setting the right codec (whether to use the interner or not) to encode the batches, and then let the conversion going for all the batches (including the first one).

let mut sort_field = self.inner.fields[i].clone();
sort_field.preserve_dictionaries = false;
self.inner.codecs[i] = Codec::new(&sort_field).unwrap();
}
}
}
}
self.done = true;
self.inner.convert_columns(columns)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -1464,6 +1529,32 @@ mod tests {
use super::*;

#[test]
fn test_card_aware_row_converter() {
let values = StringArray::from_iter_values(["a", "b", "c"]);
let keys = Int32Array::from(vec![0, 0, 1, 2, 2, 1, 1, 0, 2]);
let a: ArrayRef = Arc::new(DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap());
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
Some("c"),
Some("e"),
Some("g"),
Some("i"),
Some("k"),
Some("m"),
Some("o"),
Some("q"),
]));
let cols = [a, b];
let mut converter = CardinalityAwareRowConverter::new(vec![
SortField::new(DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))),
SortField::new(DataType::Utf8),
])
.unwrap();
let rows = converter.convert_columns(&cols).unwrap();
let back = converter.convert_rows(&rows).unwrap();
println!("{:?}", back);
}

fn test_fixed_width() {
let cols = [
Arc::new(Int16Array::from_iter([
Expand Down