Skip to content

Commit

Permalink
feat: add fragment take / fixed-size-binary support to v2 format (#2354)
Browse files Browse the repository at this point in the history
This also enables a lot of tests to run on both the v1 and the v2 format
  • Loading branch information
westonpace authored May 21, 2024
1 parent 7acac5f commit 6bfe5d0
Show file tree
Hide file tree
Showing 12 changed files with 642 additions and 173 deletions.
46 changes: 46 additions & 0 deletions rust/lance-datagen/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,47 @@ impl<T: ArrowPrimitiveType + Send + Sync> ArrayGenerator for RandomBytesGenerato
}
}

// This is pretty much the same thing as RandomBinaryGenerator but we can't use that
// because there is no ArrowPrimitiveType for FixedSizeBinary
pub struct RandomFixedSizeBinaryGenerator {
data_type: DataType,
size: i32,
}

impl RandomFixedSizeBinaryGenerator {
fn new(size: i32) -> Self {
Self {
size,
data_type: DataType::FixedSizeBinary(size),
}
}
}

impl ArrayGenerator for RandomFixedSizeBinaryGenerator {
fn generate(
&mut self,
length: RowCount,
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> Result<Arc<dyn arrow_array::Array>, ArrowError> {
let num_bytes = length.0 * self.size as u64;
let mut bytes = vec![0; num_bytes as usize];
rng.fill_bytes(&mut bytes);
Ok(Arc::new(FixedSizeBinaryArray::new(
self.size,
Buffer::from(bytes),
None,
)))
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn element_size_bytes(&self) -> Option<ByteCount> {
Some(ByteCount::from(self.size as u64))
}
}

pub struct RandomBinaryGenerator {
bytes_per_element: ByteCount,
scale_to_utf8: bool,
Expand Down Expand Up @@ -1416,6 +1457,10 @@ pub mod array {
Box::new(RandomBytesGenerator::<T>::new(data_type))
}

pub fn rand_fsb(size: i32) -> Box<dyn ArrayGenerator> {
Box::new(RandomFixedSizeBinaryGenerator::new(size))
}

/// Create a generator of randomly sampled date32 values
///
/// Instead of sampling the entire range, all values will be drawn from the last year as this
Expand Down Expand Up @@ -1610,6 +1655,7 @@ pub mod array {
rand_type(child.data_type()),
Dimension::from(*dimension as u32),
),
DataType::FixedSizeBinary(size) => rand_fsb(*size),
DataType::List(child) => rand_list(child.data_type()),
DataType::Duration(unit) => match unit {
TimeUnit::Second => rand::<DurationSecondType>(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl DecodeBatchScheduler {
} else {
match data_type {
// DataType::is_primitive doesn't consider these primitive but we do
DataType::Boolean | DataType::Null => true,
DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
_ => false,
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl BatchEncoder {
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _) => {
let my_col_idx = *col_idx;
*col_idx += 1;
Expand Down
13 changes: 12 additions & 1 deletion rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use arrow_array::{
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
ArrayRef, BooleanArray, FixedSizeListArray, PrimitiveArray,
ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, PrimitiveArray,
};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -356,6 +356,17 @@ impl PrimitiveFieldDecodeTask {
DataType::UInt8 => Ok(Self::new_primitive_array::<UInt8Type>(
buffers, num_rows, data_type,
)),
DataType::FixedSizeBinary(dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsb_validity = buffers_iter.next().unwrap();
let fsb_nulls = Self::bytes_to_validity(fsb_validity, num_rows);

let fsb_values = buffers_iter.next().unwrap();
let fsb_values = Buffer::from_bytes(fsb_values.freeze().into());
Ok(Arc::new(FixedSizeBinaryArray::new(
*dimension, fsb_values, fsb_nulls,
)))
}
DataType::FixedSizeList(items, dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsl_validity = buffers_iter.next().unwrap();
Expand Down
11 changes: 6 additions & 5 deletions rust/lance-encoding/src/encodings/physical/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,17 @@ pub struct ValueEncoder {

impl ValueEncoder {
pub fn try_new(data_type: &DataType) -> Result<Self> {
if data_type.is_primitive() {
if *data_type == DataType::Boolean {
Ok(Self {
buffer_encoder: Box::<FlatBufferEncoder>::default(),
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
})
} else if *data_type == DataType::Boolean {
} else if data_type.is_fixed_stride() {
Ok(Self {
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
buffer_encoder: Box::<FlatBufferEncoder>::default(),
})
} else {
Err(Error::invalid_input(
format!("Cannot use value encoded to encode {}", data_type),
format!("Cannot use ValueEncoder to encode {}", data_type),
location!(),
))
}
Expand Down Expand Up @@ -189,6 +189,7 @@ pub(crate) mod tests {
use crate::testing::check_round_trip_encoding_random;

const PRIMITIVE_TYPES: &[DataType] = &[
DataType::FixedSizeBinary(2),
DataType::Date32,
DataType::Date64,
DataType::Int8,
Expand Down
22 changes: 22 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ pub enum ReadBatchParams {
Indices(UInt32Array),
}

impl std::fmt::Display for ReadBatchParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
Self::RangeFull => write!(f, "RangeFull"),
Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
Self::Indices(indices) => {
let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
acc.push_str(&v.to_string());
acc.push(',');
acc
});
if !indices_str.is_empty() {
indices_str.pop();
}
write!(f, "Indices({})", indices_str)
}
}
}
}

impl Default for ReadBatchParams {
fn default() -> Self {
// Default of ReadBatchParams is reading the full batch.
Expand Down
Loading

0 comments on commit 6bfe5d0

Please sign in to comment.