From fdd2e48c9ddfafc93471e9a7a0d0618651fa3906 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sun, 31 Mar 2024 23:07:31 +0800 Subject: [PATCH] read to view type using offset_buffer --- arrow-array/src/array/byte_view_array.rs | 6 +- parquet/src/arrow/array_reader/builder.rs | 2 +- parquet/src/arrow/array_reader/byte_array.rs | 144 +++- .../src/arrow/array_reader/byte_view_array.rs | 638 ------------------ parquet/src/arrow/array_reader/mod.rs | 1 - parquet/src/arrow/buffer/mod.rs | 1 - parquet/src/arrow/buffer/offset_buffer.rs | 99 ++- parquet/src/arrow/buffer/view_buffer.rs | 119 ---- 8 files changed, 233 insertions(+), 777 deletions(-) delete mode 100644 parquet/src/arrow/array_reader/byte_view_array.rs delete mode 100644 parquet/src/arrow/buffer/view_buffer.rs diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 10c3e6379d05..980dd2a48979 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -403,7 +403,7 @@ pub type BinaryViewArray = GenericByteViewArray; impl BinaryViewArray { /// Convert the [`BinaryViewArray`] to [`StringViewArray`] /// If items not utf8 data, validate will fail and error returned. - pub fn to_stringview(&self) -> Result { + pub fn to_stringview(self) -> Result { StringViewType::validate(self.views(), self.data_buffers())?; unsafe { Ok(self.to_stringview_unchecked()) } } @@ -411,8 +411,8 @@ impl BinaryViewArray { /// Convert the [`BinaryViewArray`] to [`StringViewArray`] /// # Safety /// Caller is responsible for ensuring that items in array are utf8 data. - pub unsafe fn to_stringview_unchecked(&self) -> StringViewArray { - StringViewArray::new_unchecked(self.views.clone(), self.buffers.clone(), self.nulls.clone()) + pub unsafe fn to_stringview_unchecked(self) -> StringViewArray { + StringViewArray::new_unchecked(self.views, self.buffers, self.nulls) } } diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 945f62526a7e..958594c93232 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; -use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; +use crate::arrow::array_reader::byte_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 19086878c151..d0aa6f7b1ebe 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -74,6 +74,36 @@ pub fn make_byte_array_reader( } } +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::BinaryView | ArrowType::Utf8View => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + + _ => Err(general_err!( + "invalid data type for byte array reader read to view type - {}", + data_type + )), + } +} + /// An [`ArrayReader`] for variable length byte arrays struct ByteArrayReader { data_type: ArrowType, @@ -588,7 +618,7 @@ mod tests { use super::*; use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringArray}; + use arrow_array::{Array, StringArray, StringViewArray}; use arrow_buffer::Buffer; #[test] @@ -646,6 +676,64 @@ mod tests { } } + #[test] + fn test_byte_array_string_view_decoder() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + assert_eq!( + output.values.as_slice(), + "helloworldlarge payload over 12 bytesb".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("large payload over 12 bytes"), + Some("b"), + None, + None, + ] + ); + } + } + #[test] fn test_byte_array_decoder_skip() { let (pages, encoded_dictionary) = @@ -690,6 +778,60 @@ mod tests { } } + #[test] + fn test_byte_array_string_view_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + assert_eq!( + output.values.as_slice(), + "hellolarge payload over 12 bytes".as_bytes() + ); + assert_eq!(output.offsets.as_slice(), &[0, 5, 32]); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + #[test] fn test_byte_array_decoder_nulls() { let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs deleted file mode 100644 index 972fbdb20552..000000000000 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ /dev/null @@ -1,638 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::buffer::view_buffer::ViewBuffer; -use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::GenericRecordReader; -use crate::arrow::schema::parquet_to_arrow_field; -use crate::basic::{ConvertedType, Encoding}; -use crate::column::page::PageIterator; -use crate::column::reader::decoder::ColumnValueDecoder; -use crate::data_type::Int32Type; -use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; -use crate::errors::{ParquetError, Result}; -use crate::schema::types::ColumnDescPtr; -use arrow_array::ArrayRef; -use arrow_schema::DataType as ArrowType; -use bytes::Bytes; -use std::any::Any; - -pub fn make_byte_view_array_reader( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, -) -> Result> { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { - ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, - _ => ArrowType::BinaryView, - }, - }; - - match data_type { - ArrowType::Utf8View | ArrowType::BinaryView => { - let reader = GenericRecordReader::new(column_desc); - Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) - } - _ => Err(general_err!( - "invalid data type for byte array reader - {}", - data_type - )), - } -} - -/// An [`ArrayReader`] for variable length byte arrays -struct ByteViewArrayReader { - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option>, - rep_levels_buffer: Option>, - record_reader: GenericRecordReader, -} - -impl ByteViewArrayReader { - fn new( - pages: Box, - data_type: ArrowType, - record_reader: GenericRecordReader, - ) -> Self { - Self { - data_type, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - record_reader, - } - } -} - -impl ArrayReader for ByteViewArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn read_records(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) - } - - fn consume_batch(&mut self) -> Result { - let buffer = self.record_reader.consume_record_data(); - let null_buffer = self.record_reader.consume_bitmap_buffer(); - self.def_levels_buffer = self.record_reader.consume_def_levels(); - self.rep_levels_buffer = self.record_reader.consume_rep_levels(); - self.record_reader.reset(); - - let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); - - Ok(array) - } - - fn skip_records(&mut self, num_records: usize) -> Result { - skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_deref() - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_deref() - } -} - -/// A [`ColumnValueDecoder`] for variable length byte arrays -struct ByteViewArrayColumnValueDecoder { - dict: Option, - decoder: Option, - validate_utf8: bool, -} - -impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { - type Buffer = ViewBuffer; - - fn new(desc: &ColumnDescPtr) -> Self { - let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; - Self { - dict: None, - decoder: None, - validate_utf8, - } - } - - fn set_dict( - &mut self, - buf: Bytes, - num_values: u32, - encoding: Encoding, - _is_sorted: bool, - ) -> Result<()> { - if !matches!( - encoding, - Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY - ) { - return Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )); - } - - let mut buffer = ViewBuffer::default(); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); - Ok(()) - } - - fn set_data( - &mut self, - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - ) -> Result<()> { - self.decoder = Some(ByteViewArrayDecoder::new( - encoding, - data, - num_levels, - num_values, - self.validate_utf8, - )?); - Ok(()) - } - - fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.read(out, num_values, self.dict.as_ref()) - } - - fn skip_values(&mut self, num_values: usize) -> Result { - let decoder = self - .decoder - .as_mut() - .ok_or_else(|| general_err!("no decoder set"))?; - - decoder.skip(num_values, self.dict.as_ref()) - } -} - -/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] -pub enum ByteViewArrayDecoder { - Plain(ByteViewArrayDecoderPlain), - Dictionary(ByteViewArrayDecoderDictionary), - DeltaLength(ByteViewArrayDecoderDeltaLength), - DeltaByteArray(ByteViewArrayDecoderDelta), -} - -impl ByteViewArrayDecoder { - pub fn new( - encoding: Encoding, - data: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Result { - let decoder = match encoding { - Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( - data, - num_levels, - num_values, - validate_utf8, - )), - Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( - data, num_levels, num_values, - )) - } - Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( - ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, - ), - Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( - ByteViewArrayDecoderDelta::new(data, validate_utf8)?, - ), - _ => { - return Err(general_err!( - "unsupported encoding for byte array: {}", - encoding - )) - } - }; - - Ok(decoder) - } - - /// Read up to `len` values to `out` with the optional dictionary - pub fn read( - &mut self, - out: &mut ViewBuffer, - len: usize, - dict: Option<&ViewBuffer>, - ) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.read(out, len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.read(out, dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), - } - } - - /// Skip `len` values - pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { - match self { - ByteViewArrayDecoder::Plain(d) => d.skip(len), - ByteViewArrayDecoder::Dictionary(d) => { - let dict = - dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; - - d.skip(dict, len) - } - ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), - ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), - } - } -} - -/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderPlain { - buf: Bytes, - offset: usize, - validate_utf8: bool, - - /// This is a maximum as the null count is not always known, e.g. value data from - /// a v1 data page - max_remaining_values: usize, -} - -impl ByteViewArrayDecoderPlain { - pub fn new( - buf: Bytes, - num_levels: usize, - num_values: Option, - validate_utf8: bool, - ) -> Self { - Self { - buf, - validate_utf8, - offset: 0, - max_remaining_values: num_values.unwrap_or(num_levels), - } - } - - pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let to_read = len.min(self.max_remaining_values); - - let remaining_bytes = self.buf.len() - self.offset; - if remaining_bytes == 0 { - return Ok(0); - } - - let mut read = 0; - - let buf = self.buf.as_ref(); - while self.offset < self.buf.len() && read != to_read { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes); - - let start_offset = self.offset + 4; - let end_offset = start_offset + len as usize; - if end_offset > buf.len() { - return Err(ParquetError::EOF("eof decoding byte view array".into())); - } - - output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; - - self.offset = end_offset; - read += 1; - } - self.max_remaining_values -= to_read; - - Ok(to_read) - } - - pub fn skip(&mut self, to_skip: usize) -> Result { - let to_skip = to_skip.min(self.max_remaining_values); - let mut skip = 0; - let buf = self.buf.as_ref(); - - while self.offset < self.buf.len() && skip != to_skip { - if self.offset + 4 > buf.len() { - return Err(ParquetError::EOF("eof decoding byte array".into())); - } - let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); - let len = u32::from_le_bytes(len_bytes) as usize; - skip += 1; - self.offset = self.offset + 4 + len; - } - self.max_remaining_values -= skip; - Ok(skip) - } -} - -/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDeltaLength { - lengths: Vec, - data: Bytes, - length_offset: usize, - data_offset: usize, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDeltaLength { - fn new(data: Bytes, validate_utf8: bool) -> Result { - let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.clone(), 0)?; - let values = len_decoder.values_left(); - - let mut lengths = vec![0; values]; - len_decoder.get(&mut lengths)?; - - Ok(Self { - lengths, - data, - validate_utf8, - length_offset: 0, - data_offset: len_decoder.get_offset(), - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let to_read = len.min(self.lengths.len() - self.length_offset); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; - - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - if self.data_offset + total_bytes > self.data.len() { - return Err(ParquetError::EOF( - "Insufficient delta length byte array bytes".to_string(), - )); - } - - let mut start_offset = self.data_offset; - for length in src_lengths { - let end_offset = start_offset + *length as usize; - output.try_push( - &self.data.as_ref()[start_offset..end_offset], - self.validate_utf8, - )?; - start_offset = end_offset; - } - - self.data_offset = start_offset; - self.length_offset += to_read; - - Ok(to_read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - let remain_values = self.lengths.len() - self.length_offset; - let to_skip = remain_values.min(to_skip); - - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - - self.data_offset += total_bytes; - self.length_offset += to_skip; - Ok(to_skip) - } -} - -/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDelta { - decoder: DeltaByteArrayDecoder, - validate_utf8: bool, -} - -impl ByteViewArrayDecoderDelta { - fn new(data: Bytes, validate_utf8: bool) -> Result { - Ok(Self { - decoder: DeltaByteArrayDecoder::new(data)?, - validate_utf8, - }) - } - - fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - let read = self - .decoder - .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; - - Ok(read) - } - - fn skip(&mut self, to_skip: usize) -> Result { - self.decoder.skip(to_skip) - } -} - -/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] -pub struct ByteViewArrayDecoderDictionary { - decoder: DictIndexDecoder, -} - -impl ByteViewArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } - } - - fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.values) - }) - } - - fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { - // All data must be NULL - if dict.is_empty() { - return Ok(0); - } - - self.decoder.skip(to_skip) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; - use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringViewArray}; - use arrow_buffer::Buffer; - - #[test] - fn test_byte_array_decoder() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!( - output.values.first().unwrap().as_ref().unwrap(), - "hello".as_bytes() - ); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); - - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("world"), - None, - Some("a"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_skip() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!( - output.values.first().unwrap().as_ref().unwrap(), - "hello".as_bytes() - ); - - assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.skip_values(1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - assert_eq!( - output.values.get(1).unwrap().as_ref().unwrap(), - "large payload over 12 bytes".as_bytes() - ); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - - #[test] - fn test_byte_array_decoder_nulls() { - let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); - - let column_desc = utf8_column(); - let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - // test nulls read - for (encoding, page) in pages.clone() { - let mut output = ViewBuffer::default(); - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); - } - - // test nulls skip - for (encoding, page) in pages { - decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.skip_values(1024).unwrap(), 0); - } - } -} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 8a65aa96a296..4ae0f5669e87 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -41,7 +41,6 @@ mod null_array; mod primitive_array; mod struct_array; -mod byte_view_array; #[cfg(test)] mod test_util; diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index e7566d9a505f..cbc795d94f57 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,4 +20,3 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; -pub mod view_buffer; diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index ce9eb1142a5b..ab6347d021a1 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -18,10 +18,13 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; +use arrow_array::builder::GenericByteViewBuilder; +use arrow_array::types::BinaryViewType; use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::DataType as ArrowType; +use std::sync::Arc; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] @@ -125,18 +128,50 @@ impl OffsetBuffer { /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(self.len()) - .add_buffer(Buffer::from_vec(self.offsets)) - .add_buffer(Buffer::from_vec(self.values)) - .null_bit_buffer(null_buffer); - - let data = match cfg!(debug_assertions) { - true => array_data_builder.build().unwrap(), - false => unsafe { array_data_builder.build_unchecked() }, - }; - - make_array(data) + match data_type { + ArrowType::Utf8View => { + let mut builder = self.build_generic_byte_view(); + Arc::new(builder.finish().to_stringview().unwrap()) + } + ArrowType::BinaryView => { + let mut builder = self.build_generic_byte_view(); + Arc::new(builder.finish()) + } + _ => { + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(self.len()) + .add_buffer(Buffer::from_vec(self.offsets)) + .add_buffer(Buffer::from_vec(self.values)) + .null_bit_buffer(null_buffer); + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() }, + }; + + make_array(data) + } + } + } + + fn build_generic_byte_view(&self) -> GenericByteViewBuilder { + let mut builder = GenericByteViewBuilder::::with_capacity(self.len()); + for i in self.offsets.windows(2) { + let start = i[0]; + let end = i[1]; + let b = unsafe { + std::slice::from_raw_parts( + self.values.as_ptr().offset(start.to_isize().unwrap()), + (end - start).to_usize().unwrap(), + ) + }; + if b.is_empty() { + builder.append_null(); + } else { + builder.append_value(b); + } + } + builder } } @@ -193,7 +228,7 @@ impl ValuesBuffer for OffsetBuffer { #[cfg(test)] mod tests { use super::*; - use arrow_array::{Array, LargeStringArray, StringArray}; + use arrow_array::{Array, LargeStringArray, StringArray, StringViewArray}; #[test] fn test_offset_buffer_empty() { @@ -244,6 +279,44 @@ mod tests { ); } + #[test] + fn test_string_view() { + let mut buffer = OffsetBuffer::::default(); + for v in [ + "hello", + "world", + "large payload over 12 bytes", + "a", + "b", + "c", + ] { + buffer.try_push(v.as_bytes(), false).unwrap() + } + let split = std::mem::take(&mut buffer); + + let array = split.into_array(None, ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec![ + "hello", + "world", + "large payload over 12 bytes", + "a", + "b", + "c" + ] + ); + + buffer.try_push("test".as_bytes(), false).unwrap(); + let array = buffer.into_array(None, ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec!["test"] + ); + } + #[test] fn test_offset_buffer_pad_nulls() { let mut buffer = OffsetBuffer::::default(); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs deleted file mode 100644 index 45707182ce04..000000000000 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::ValuesBuffer; -use crate::errors::{ParquetError, Result}; -use arrow_array::builder::GenericByteViewBuilder; -use arrow_array::types::BinaryViewType; -use arrow_array::ArrayRef; -use arrow_buffer::{ArrowNativeType, Buffer}; -use arrow_schema::DataType as ArrowType; -use std::sync::Arc; - -/// A buffer of variable-sized byte arrays that can be converted into -/// a corresponding [`ArrayRef`] -#[derive(Debug, Default)] -pub struct ViewBuffer { - pub values: Vec>>, -} - -impl ViewBuffer { - /// Returns the number of byte arrays in this buffer - pub fn len(&self) -> usize { - self.values.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { - if validate_utf8 { - if let Some(&b) = data.first() { - // A valid code-point iff it does not start with 0b10xxxxxx - // Bit-magic taken from `std::str::is_char_boundary` - if (b as i8) < -0x40 { - return Err(ParquetError::General( - "encountered non UTF-8 data".to_string(), - )); - } - } - } - self.values.push(Some(data.to_vec())); - Ok(()) - } - - /// Extends this buffer with a list of keys - pub fn extend_from_dictionary( - &mut self, - keys: &[K], - dict: &[Option>], - ) -> Result<()> { - for key in keys { - let index = key.as_usize(); - if index + 1 > dict.len() { - return Err(general_err!( - "dictionary key beyond bounds of dictionary: 0..{}", - dict.len() - )); - } - - let value = dict.get(index).unwrap(); - - // Dictionary values are verified when decoding dictionary page - self.try_push(value.as_ref().unwrap(), false)?; - } - Ok(()) - } - - /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` - pub fn into_array(self, _null_buffer: Option, data_type: ArrowType) -> ArrayRef { - let mut builder = - GenericByteViewBuilder::::with_capacity(self.values.len()); - self.values - .into_iter() - .for_each(|v| builder.append_option(v)); - - match data_type { - ArrowType::BinaryView => Arc::new(builder.finish()), - ArrowType::Utf8View => Arc::new(builder.finish().to_stringview().unwrap()), - _ => unreachable!(), - } - } -} - -impl ValuesBuffer for ViewBuffer { - fn pad_nulls( - &mut self, - read_offset: usize, - values_read: usize, - levels_read: usize, - valid_mask: &[u8], - ) { - self.values.resize(read_offset + levels_read, None); - - let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - self.values[level_pos] = self.values[value_pos].take(); - } - } -}