From ce17c97593160e3cf3bfc43090c0f0840f28cdc0 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 4 Sep 2024 09:53:31 -0400 Subject: [PATCH 01/12] update --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index 550368ca77b9..4439a223a315 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 550368ca77b97231efead39251a96bd6f8f08c6e +Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e diff --git a/testing b/testing index 735ae7128d57..e270341fb5f3 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 +Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c From a06742f42e52e3c975f9c2b5a8e345e596e16d62 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 22 Dec 2024 07:51:44 -0600 Subject: [PATCH 02/12] update --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index 4439a223a315..1ba34478f535 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e +Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd diff --git a/testing b/testing index e270341fb5f3..735ae7128d57 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 From 84f0f9c29eeea89bdf3772f6dc42f1e912a93e81 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sat, 28 Dec 2024 12:57:00 -0600 Subject: [PATCH 03/12] update --- arrow-buffer/src/buffer/boolean.rs | 45 ++++ parquet/src/arrow/array_reader/builder.rs | 6 + .../src/arrow/array_reader/primitive_array.rs | 13 +- .../src/arrow/array_reader/struct_array.rs | 2 +- parquet/src/arrow/arrow_reader/filter.rs | 2 +- parquet/src/arrow/arrow_reader/selection.rs | 6 + .../src/arrow/async_reader/arrow_reader.rs | 244 ++++++++++++++++++ parquet/src/arrow/async_reader/mod.rs | 1 + parquet/src/arrow/mod.rs | 26 ++ 9 files changed, 339 insertions(+), 6 deletions(-) create mode 100644 parquet/src/arrow/async_reader/arrow_reader.rs diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index 49a75b468dbe..534f5aecd81f 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -205,6 +205,51 @@ impl BooleanBuffer { pub fn set_slices(&self) -> BitSliceIterator<'_> { BitSliceIterator::new(self.values(), self.offset, self.len) } + + /// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits. + /// + /// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`, + /// i.e., self.count_set_bits() == other.len(). + /// + /// This method will keep only the bits in `self` that are also set in `other` + /// at the positions corresponding to `self`'s set bits. + /// For example: + /// self: NNYYYNNYYNYN + /// other: YNY NY N + /// result: NNYNYNNNYNNN + pub fn and_then(&self, other: &Self) -> Self { + // Ensure that 'other' has exactly as many set bits as 'self' + debug_assert_eq!( + self.count_set_bits(), + other.len(), + "The 'other' selection must have exactly as many set bits as 'self'." + ); + + if self.len() == other.len() { + // fast path if the two bool masks are the same length + // this happens when self selects all rows + debug_assert_eq!(self.count_set_bits(), self.len()); + return other.clone(); + } + + let mut buffer = MutableBuffer::from_len_zeroed(self.values().len()); + buffer.copy_from_slice(self.values()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); + + // Create iterators for 'self' and 'other' bits + let mut other_bits = other.iter(); + + for bit_idx in self.set_indices() { + let predicate = other_bits + .next() + .expect("Mismatch in set bits between self and other"); + if !predicate { + builder.set_bit(bit_idx, false); + } + } + + builder.finish() + } } impl Not for &BooleanBuffer { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 945f62526a7e..23f77a9ab96f 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -245,6 +245,7 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::INT32 => { if let Some(DataType::Null) = arrow_type { @@ -257,6 +258,7 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _ } } @@ -264,21 +266,25 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::BYTE_ARRAY => match arrow_type { Some(DataType::Dictionary(_, _)) => { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 010e9c2eed3f..a4274d51383c 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,6 +80,7 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, + column_idx: usize, } impl PrimitiveArrayReader @@ -93,6 +94,7 @@ where pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + column_idx: usize, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -110,6 +112,7 @@ where def_levels_buffer: None, rep_levels_buffer: None, record_reader, + column_idx, }) } } @@ -371,6 +374,7 @@ mod tests { Box::::default(), schema.column(0), None, + 0, ) .unwrap(); @@ -414,7 +418,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // Read first 50 values, which are all from the first column chunk @@ -484,6 +488,7 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, + 0, ) .expect("Unable to get array reader"); @@ -620,7 +625,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); let mut accu_len: usize = 0; @@ -696,7 +701,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // read data from the reader @@ -755,7 +760,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // read data from the reader diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index fb2f2f8928b9..e048fbae66fa 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -25,7 +25,7 @@ use std::sync::Arc; /// Implementation of struct array reader. pub struct StructArrayReader { - children: Vec>, + pub children: Vec>, data_type: ArrowType, struct_def_level: i16, struct_rep_level: i16, diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 2e22f7e01cf0..931e13e252f5 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -110,7 +110,7 @@ where /// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection pub struct RowFilter { /// A list of [`ArrowPredicate`] - pub(crate) predicates: Vec>, + pub predicates: Vec>, } impl RowFilter { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 378d2253f19a..92d7eab1e58c 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -366,6 +366,12 @@ impl RowSelection { self } + /// Returns the internal selectors of this [`RowSelection`], testing only + #[cfg(test)] + pub(crate) fn selectors(&self) -> &[RowSelector] { + &self.selectors + } + /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs new file mode 100644 index 000000000000..a2497fc9adcc --- /dev/null +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::VecDeque, sync::Arc}; + +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow_schema::{ArrowError, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; + +use crate::arrow::{ + array_reader::{build_array_reader, ArrayReader, RowGroups, StructArrayReader}, + arrow_reader::{ArrowPredicate, RowFilter, RowSelection, RowSelector}, +}; +use crate::errors::ParquetError; + +use super::ParquetField; + +pub struct FilteredParquetRecordBatchReader { + batch_size: usize, + array_reader: StructArrayReader, + predicate_readers: Vec>, + schema: SchemaRef, + selection: VecDeque, + row_filter: Option, +} + +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + for selector in selection.iter() { + if selector.skip { + let skipped = reader.skip_records(selector.row_count)?; + debug_assert_eq!(skipped, selector.row_count, "failed to skip rows"); + } else { + let read_records = reader.read_records(selector.row_count)?; + debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); + } + } + let array = reader.consume_batch()?; + let struct_array = array + .as_struct_opt() + .ok_or_else(|| general_err!("Struct array reader should return struct array"))?; + Ok(struct_array.clone()) +} + +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + +fn build_array_reader_for_filters( + filters: &RowFilter, + fields: &Option>, + row_group: &dyn RowGroups, +) -> Result>, ArrowError> { + let mut array_readers = Vec::new(); + for predicate in filters.predicates.iter() { + let predicate_projection = predicate.projection(); + let array_reader = build_array_reader(fields.as_deref(), predicate_projection, row_group)?; + array_readers.push(array_reader); + } + Ok(array_readers) +} + +impl FilteredParquetRecordBatchReader { + fn new(batch_size: usize, array_reader: StructArrayReader, selection: RowSelection) -> Self { + todo!() + } + + fn build_predicate_filter( + &mut self, + selection: &RowSelection, + ) -> Result { + match &mut self.row_filter { + None => Ok(selection.clone()), + Some(filter) => { + debug_assert_eq!( + self.predicate_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + let mut selection = selection.clone(); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.predicate_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &selection)?; + let batch = RecordBatch::from(array); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch)?; + if predicate_filter.len() != input_rows { + return Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + ))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + let raw = RowSelection::from_filters(&[predicate_filter]); + selection = selection.and_then(&raw); + } + Ok(selection) + } + } + } +} + +impl Iterator for FilteredParquetRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + let selection = take_next_selection(&mut self.selection, self.batch_size)?; + let filtered_selection = match self.build_predicate_filter(&selection) { + Ok(selection) => selection, + Err(e) => return Some(Err(e)), + }; + + let rt = read_selection(&mut self.array_reader, &filtered_selection); + match rt { + Ok(array) => Some(Ok(RecordBatch::from(array))), + Err(e) => Some(Err(e.into())), + } + } +} + +impl RecordBatchReader for FilteredParquetRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_take_next_selection_exact_match() { + let mut queue = VecDeque::from(vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2), + RowSelector::select(7), + ]); + + // Request exactly 10 rows (5 skip + 3 select + 2 skip) + let selection = take_next_selection(&mut queue, 3).unwrap(); + assert_eq!( + selection, + vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2) + ] + .into() + ); + + // Check remaining queue + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].row_count, 7); + assert_eq!(queue[0].skip, false); + } + + #[test] + fn test_take_next_selection_split_required() { + let mut queue = VecDeque::from(vec![RowSelector::select(10), RowSelector::select(10)]); + + // Request 15 rows, which should split the first selector + let selection = take_next_selection(&mut queue, 15).unwrap(); + + assert_eq!( + selection, + vec![RowSelector::select(10), RowSelector::select(5)].into() + ); + + // Check remaining queue - should have 5 rows from split and original 10 + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].skip, false); + assert_eq!(queue[0].row_count, 5); + } + + #[test] + fn test_take_next_selection_empty_queue() { + let mut queue = VecDeque::new(); + + // Should return None for empty queue + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + + // Test with queue that becomes empty + queue.push_back(RowSelector::select(5)); + let selection = take_next_selection(&mut queue, 10).unwrap(); + assert_eq!(selection, vec![RowSelector::select(5)].into()); + + // Queue should now be empty + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + } +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 8b315cc9f784..38dc382b8b3e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -110,6 +110,7 @@ use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +mod arrow_reader; mod metadata; pub use metadata::*; diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 2d09cd19203f..9f593ab60f7f 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -210,6 +210,32 @@ impl ProjectionMask { pub fn leaf_included(&self, leaf_idx: usize) -> bool { self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true) } + + /// Union two projection masks + pub fn union(&mut self, other: &Self) { + match (self.mask.as_ref(), other.mask.as_ref()) { + (None, _) | (_, None) => self.mask = None, + (Some(a), Some(b)) => { + assert_eq!(a.len(), b.len()); + let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect(); + self.mask = Some(mask); + } + } + } + + /// Returns true if the mask contains the other mask + pub fn contains(&self, other: &Self) -> bool { + match (self.mask.as_ref(), other.mask.as_ref()) { + (None, _) => true, + (Some(a), Some(b)) => { + assert_eq!(a.len(), b.len()); + a.iter() + .zip(b.iter()) + .all(|(&a, &b)| if b { a } else { true }) + } + (Some(_), None) => false, + } + } } /// Lookups up the parquet column by name From dcb99ee1260c3703f122611539ab639961809ac8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 29 Dec 2024 15:46:03 -0600 Subject: [PATCH 04/12] update --- Cargo.toml | 2 +- .../src/arrow/array_reader/primitive_array.rs | 41 ++-- parquet/src/arrow/arrow_reader/selection.rs | 10 +- .../src/arrow/async_reader/arrow_reader.rs | 195 +++++++++++++++--- parquet/src/arrow/async_reader/mod.rs | 91 ++++---- parquet/src/file/serialized_reader.rs | 64 +++++- testing | 2 +- 7 files changed, 310 insertions(+), 95 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f210ae210012..db6a39541f27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ exclude = [ ] [workspace.package] -version = "53.2.0" +version = "53.3.0" homepage = "https://github.com/apache/arrow-rs" repository = "https://github.com/apache/arrow-rs" authors = ["Apache Arrow "] diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index a4274d51383c..a68311ebae23 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,6 +80,7 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, + #[allow(unused)] column_idx: usize, } @@ -417,9 +418,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -624,9 +629,13 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); let mut accu_len: usize = 0; @@ -700,9 +709,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -759,9 +772,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // read data from the reader // the data type is decimal(18,4) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 92d7eab1e58c..f83724a3841b 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -366,12 +366,6 @@ impl RowSelection { self } - /// Returns the internal selectors of this [`RowSelection`], testing only - #[cfg(test)] - pub(crate) fn selectors(&self) -> &[RowSelector] { - &self.selectors - } - /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 { @@ -447,6 +441,10 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } + + pub(crate) fn extend(&mut self, other: Self) { + self.selectors.extend(other.selectors); + } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index a2497fc9adcc..04cc115ce39c 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -15,23 +15,27 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; -use arrow_schema::{ArrowError, SchemaRef}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; -use crate::arrow::{ - array_reader::{build_array_reader, ArrayReader, RowGroups, StructArrayReader}, - arrow_reader::{ArrowPredicate, RowFilter, RowSelection, RowSelector}, -}; +use crate::column::page::{Page, PageMetadata, PageReader}; use crate::errors::ParquetError; - -use super::ParquetField; +use crate::{ + arrow::{ + array_reader::ArrayReader, + arrow_reader::{RowFilter, RowSelection, RowSelector}, + }, + file::reader::{ChunkReader, SerializedPageReader}, +}; pub struct FilteredParquetRecordBatchReader { batch_size: usize, - array_reader: StructArrayReader, + array_reader: Box, predicate_readers: Vec>, schema: SchemaRef, selection: VecDeque, @@ -90,38 +94,47 @@ fn take_next_selection( None } -fn build_array_reader_for_filters( - filters: &RowFilter, - fields: &Option>, - row_group: &dyn RowGroups, -) -> Result>, ArrowError> { - let mut array_readers = Vec::new(); - for predicate in filters.predicates.iter() { - let predicate_projection = predicate.projection(); - let array_reader = build_array_reader(fields.as_deref(), predicate_projection, row_group)?; - array_readers.push(array_reader); - } - Ok(array_readers) -} - impl FilteredParquetRecordBatchReader { - fn new(batch_size: usize, array_reader: StructArrayReader, selection: RowSelection) -> Self { - todo!() + pub(crate) fn new( + batch_size: usize, + array_reader: Box, + selection: RowSelection, + filter_readers: Vec>, + row_filter: Option, + ) -> Self { + let schema = match array_reader.get_data_type() { + DataType::Struct(ref fields) => Schema::new(fields.clone()), + _ => unreachable!("Struct array reader's data type is not struct!"), + }; + + Self { + batch_size, + array_reader, + predicate_readers: filter_readers, + schema: Arc::new(schema), + selection: selection.into(), + row_filter, + } + } + + pub(crate) fn take_filter(&mut self) -> Option { + self.row_filter.take() } + #[inline(never)] + /// Take a selection, and return the new selection where the rows are filtered by the predicate. fn build_predicate_filter( &mut self, - selection: &RowSelection, + mut selection: RowSelection, ) -> Result { match &mut self.row_filter { - None => Ok(selection.clone()), + None => Ok(selection), Some(filter) => { debug_assert_eq!( self.predicate_readers.len(), filter.predicates.len(), "predicate readers and predicates should have the same length" ); - let mut selection = selection.clone(); for (predicate, reader) in filter .predicates @@ -155,13 +168,36 @@ impl Iterator for FilteredParquetRecordBatchReader { type Item = Result; fn next(&mut self) -> Option { - let selection = take_next_selection(&mut self.selection, self.batch_size)?; - let filtered_selection = match self.build_predicate_filter(&selection) { - Ok(selection) => selection, - Err(e) => return Some(Err(e)), - }; + // With filter pushdown, it's very hard to predict the number of rows to return -- depends on the selectivity of the filter. + // We can do one of the following: + // 1. Add a coalescing step to coalesce the resulting batches. + // 2. Ask parquet reader to collect more rows before returning. - let rt = read_selection(&mut self.array_reader, &filtered_selection); + // Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient. + // Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size. + // It boils down to leveraging array_reader's ability to collect large batches natively, + // rather than concatenating multiple small batches. + + let mut selection = RowSelection::default(); + let mut selected = 0; + while let Some(cur_selection) = + take_next_selection(&mut self.selection, self.batch_size - selected) + { + let filtered_selection = match self.build_predicate_filter(cur_selection) { + Ok(selection) => selection, + Err(e) => return Some(Err(e)), + }; + selected += filtered_selection.row_count(); + selection.extend(filtered_selection); + if selected >= (self.batch_size / 4 * 3) { + break; + } + } + if !selection.selects_any() { + return None; + } + + let rt = read_selection(&mut *self.array_reader, &selection); match rt { Ok(array) => Some(Ok(RecordBatch::from(array))), Err(e) => Some(Err(e.into())), @@ -175,6 +211,99 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } } +struct PageCacheInner { + queue: VecDeque, + pages: HashMap, +} + +/// A simple FIFO cache for pages. +pub(crate) struct PageCache { + inner: RwLock, +} + +impl PageCache { + const CAPACITY: usize = 16; + + pub(crate) fn new() -> Self { + Self { + inner: RwLock::new(PageCacheInner { + queue: VecDeque::with_capacity(Self::CAPACITY), + pages: HashMap::with_capacity(Self::CAPACITY), + }), + } + } + + pub(crate) fn get_page(&self, offset: usize) -> Option { + let read_lock = self.inner.read().unwrap(); + read_lock.pages.get(&offset).cloned() + } + + pub(crate) fn insert_page(&self, offset: usize, page: Page) { + let mut write_lock = self.inner.write().unwrap(); + if write_lock.pages.len() >= Self::CAPACITY { + let oldest_offset = write_lock.queue.pop_front().unwrap(); + write_lock.pages.remove(&oldest_offset).unwrap(); + } + write_lock.pages.insert(offset, page); + write_lock.queue.push_back(offset); + } +} + +pub(crate) struct CachedPageReader { + inner: SerializedPageReader, + cache: Arc, +} + +impl CachedPageReader { + pub(crate) fn new(inner: SerializedPageReader, cache: Arc) -> Self { + Self { inner, cache } + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result, ParquetError> { + // self.inner.get_next_page() + let next_page_offset = self.inner.peek_next_page_offset()?; + + let Some(offset) = next_page_offset else { + return Ok(None); + }; + + let page = self.cache.get_page(offset); + if let Some(page) = page { + self.inner.skip_next_page()?; + Ok(Some(page)) + } else { + let inner_page = self.inner.get_next_page()?; + let Some(inner_page) = inner_page else { + return Ok(None); + }; + self.cache.insert_page(offset, inner_page.clone()); + Ok(Some(inner_page)) + } + } + + fn peek_next_page(&mut self) -> Result, ParquetError> { + self.inner.peek_next_page() + } + + fn skip_next_page(&mut self) -> Result<(), ParquetError> { + self.inner.skip_next_page() + } + + fn at_record_boundary(&mut self) -> Result { + self.inner.at_record_boundary() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 38dc382b8b3e..8ea825dc4b72 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -83,6 +83,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PageCache}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -94,8 +95,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -121,6 +121,8 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +use super::arrow_reader::RowSelector; + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files /// /// Notes: @@ -469,7 +471,7 @@ impl ParquetRecordBatchStreamBuilder { } } -type ReadResult = Result<(ReaderFactory, Option)>; +type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create /// [`ParquetRecordBatchReader`] @@ -497,18 +499,16 @@ where async fn read_row_group( mut self, row_group_idx: usize, - mut selection: Option, + selection: Option, projection: ProjectionMask, batch_size: usize, ) -> ReadResult { - // TODO: calling build_array multiple times is wasteful - let meta = self.metadata.row_group(row_group_idx); let offset_index = self .metadata .offset_index() // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) + .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); let mut row_group = InMemoryRowGroup { @@ -517,48 +517,47 @@ where row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], offset_index, + cache: Arc::new(PageCache::new()), }; + let mut selection = + selection.unwrap_or_else(|| vec![RowSelector::select(row_group.row_count)].into()); + + let mut filter_readers = Vec::new(); if let Some(filter) = self.filter.as_mut() { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { + if !selection.selects_any() { return Ok((self, None)); } let predicate_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch(&mut self.input, predicate_projection, Some(&selection)) .await?; let array_reader = build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + filter_readers.push(array_reader); } } // Compute the number of rows in the selection before applying limit and offset - let rows_before = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_before = selection.row_count(); if rows_before == 0 { return Ok((self, None)); } - selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + if let Some(offset) = self.offset { + selection = selection.offset(offset); + } + + if let Some(limit) = self.limit { + selection = selection.limit(limit); + } // Compute the number of rows in the selection after applying limit and offset - let rows_after = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_after = selection.row_count(); // Update offset if necessary if let Some(offset) = &mut self.offset { @@ -576,13 +575,16 @@ where } row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, Some(&selection)) .await?; - let reader = ParquetRecordBatchReader::new( + let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let reader = FilteredParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, + array_reader, selection, + filter_readers, + self.filter.take(), ); Ok((self, Some(reader))) @@ -593,7 +595,7 @@ enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch - Decoding(ParquetRecordBatchReader), + Decoding(FilteredParquetRecordBatchReader), /// Reading data from input Reading(BoxFuture<'static, ReadResult>), /// Error @@ -671,7 +673,12 @@ where self.state = StreamState::Error; return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); } - None => self.state = StreamState::Init, + None => { + // this is ugly, but works for now. + let filter = batch_reader.take_filter(); + self.reader.as_mut().unwrap().filter = filter; + self.state = StreamState::Init + } }, StreamState::Init => { let row_group_idx = match self.row_groups.pop_front() { @@ -723,6 +730,7 @@ struct InMemoryRowGroup<'a> { offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, + cache: Arc, } impl<'a> InMemoryRowGroup<'a> { @@ -834,12 +842,23 @@ impl RowGroups for InMemoryRowGroup<'_> { // filter out empty offset indexes (old versions specified Some(vec![]) when no present) .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( - data.clone(), - self.metadata.column(i), - self.row_count, - page_locations, - )?); + + // let page_reader: Box = Box::new(SerializedPageReader::new( + // data.clone(), + // self.metadata.column(i), + // self.row_count, + // page_locations, + // )?); + + let page_reader: Box = Box::new(CachedPageReader::new( + SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?, + self.cache.clone(), + )); Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 3262d1fba704..237cb9eaf47c 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -371,7 +371,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// Decodes a [`Page`] from the provided `buffer` pub(crate) fn decode_page( page_header: PageHeader, - buffer: Bytes, + buffer: Vec, physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { @@ -406,8 +406,8 @@ pub(crate) fn decode_page( Some(decompressor) if can_decompress => { let uncompressed_size = page_header.uncompressed_page_size as usize; let mut decompressed = Vec::with_capacity(uncompressed_size); - let compressed = &buffer.as_ref()[offset..]; - decompressed.extend_from_slice(&buffer.as_ref()[..offset]); + let compressed = &buffer[offset..]; + decompressed.extend_from_slice(&buffer[..offset]); decompressor.decompress( compressed, &mut decompressed, @@ -422,10 +422,11 @@ pub(crate) fn decode_page( )); } - Bytes::from(decompressed) + decompressed } _ => buffer, }; + let buffer = Bytes::from(buffer); let result = match page_header.type_ { PageType::DICTIONARY_PAGE => { @@ -568,6 +569,57 @@ impl SerializedPageReader { physical_type: meta.column_type(), }) } + + pub(crate) fn peek_next_page_offset(&mut self) -> Result> { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + return if let Some(header) = next_page_header.as_ref() { + if let Ok(_page_meta) = PageMetadata::try_from(&**header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut read = self.reader.get_read(*offset as u64)?; + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining_bytes -= header_len; + let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + *next_page_header = Some(Box::new(header)); + page_meta + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + .. + } => { + if let Some(page) = dictionary_page { + Ok(Some(page.offset as usize)) + } else if let Some(page) = page_locations.front() { + Ok(Some(page.offset as usize)) + } else { + Ok(None) + } + } + } + } } impl Iterator for SerializedPageReader { @@ -621,7 +673,7 @@ impl PageReader for SerializedPageReader { decode_page( header, - Bytes::from(buffer), + buffer, self.physical_type, self.decompressor.as_mut(), )? @@ -650,7 +702,7 @@ impl PageReader for SerializedPageReader { let bytes = buffer.slice(offset..); decode_page( header, - bytes, + bytes.to_vec(), self.physical_type, self.decompressor.as_mut(), )? diff --git a/testing b/testing index 735ae7128d57..4d209492d514 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 +Subproject commit 4d209492d514c2d3cb2d392681b9aa00e6d8da1c From 72ff78ff79a0e67c1d9547f440552b65e17527dc Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 30 Dec 2024 09:58:04 -0600 Subject: [PATCH 05/12] poc reader --- parquet/src/arrow/arrow_reader/selection.rs | 4 - .../src/arrow/async_reader/arrow_reader.rs | 86 ++++++++++++------- parquet/src/arrow/async_reader/mod.rs | 30 ++++--- 3 files changed, 71 insertions(+), 49 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index f83724a3841b..378d2253f19a 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -441,10 +441,6 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } - - pub(crate) fn extend(&mut self, other: Self) { - self.selectors.extend(other.selectors); - } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 04cc115ce39c..f2f681cc7d37 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -19,7 +19,8 @@ use std::collections::HashMap; use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; -use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow_array::ArrayRef; +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; @@ -45,7 +46,7 @@ pub struct FilteredParquetRecordBatchReader { fn read_selection( reader: &mut dyn ArrayReader, selection: &RowSelection, -) -> Result { +) -> Result { for selector in selection.iter() { if selector.skip { let skipped = reader.skip_records(selector.row_count)?; @@ -55,11 +56,7 @@ fn read_selection( debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); } } - let array = reader.consume_batch()?; - let struct_array = array - .as_struct_opt() - .ok_or_else(|| general_err!("Struct array reader should return struct array"))?; - Ok(struct_array.clone()) + reader.consume_batch() } /// Take the next selection from the selection queue, and return the selection @@ -142,7 +139,9 @@ impl FilteredParquetRecordBatchReader { .zip(self.predicate_readers.iter_mut()) { let array = read_selection(reader.as_mut(), &selection)?; - let batch = RecordBatch::from(array); + let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + })?); let input_rows = batch.num_rows(); let predicate_filter = predicate.evaluate(batch)?; if predicate_filter.len() != input_rows { @@ -178,7 +177,6 @@ impl Iterator for FilteredParquetRecordBatchReader { // It boils down to leveraging array_reader's ability to collect large batches natively, // rather than concatenating multiple small batches. - let mut selection = RowSelection::default(); let mut selected = 0; while let Some(cur_selection) = take_next_selection(&mut self.selection, self.batch_size - selected) @@ -187,21 +185,29 @@ impl Iterator for FilteredParquetRecordBatchReader { Ok(selection) => selection, Err(e) => return Some(Err(e)), }; + + for selector in filtered_selection.iter() { + if selector.skip { + self.array_reader.skip_records(selector.row_count).ok()?; + } else { + self.array_reader.read_records(selector.row_count).ok()?; + } + } selected += filtered_selection.row_count(); - selection.extend(filtered_selection); if selected >= (self.batch_size / 4 * 3) { break; } } - if !selection.selects_any() { + if selected == 0 { return None; } - let rt = read_selection(&mut *self.array_reader, &selection); - match rt { - Ok(array) => Some(Ok(RecordBatch::from(array))), - Err(e) => Some(Err(e.into())), - } + let array = self.array_reader.consume_batch().ok()?; + let struct_array = array + .as_struct_opt() + .ok_or_else(|| general_err!("Struct array reader should return struct array")) + .ok()?; + Some(Ok(RecordBatch::from(struct_array.clone()))) } } @@ -212,11 +218,11 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } struct PageCacheInner { - queue: VecDeque, - pages: HashMap, + pages: HashMap, // col_id -> (offset, page) } -/// A simple FIFO cache for pages. +/// A simple cache for decompressed pages. +/// We cache only one page per column pub(crate) struct PageCache { inner: RwLock, } @@ -227,36 +233,49 @@ impl PageCache { pub(crate) fn new() -> Self { Self { inner: RwLock::new(PageCacheInner { - queue: VecDeque::with_capacity(Self::CAPACITY), pages: HashMap::with_capacity(Self::CAPACITY), }), } } - pub(crate) fn get_page(&self, offset: usize) -> Option { + pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { let read_lock = self.inner.read().unwrap(); - read_lock.pages.get(&offset).cloned() + read_lock + .pages + .get(&col_id) + .and_then(|(cached_offset, page)| { + if *cached_offset == offset { + Some(page) + } else { + None + } + }) + .cloned() } - pub(crate) fn insert_page(&self, offset: usize, page: Page) { + pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { let mut write_lock = self.inner.write().unwrap(); - if write_lock.pages.len() >= Self::CAPACITY { - let oldest_offset = write_lock.queue.pop_front().unwrap(); - write_lock.pages.remove(&oldest_offset).unwrap(); - } - write_lock.pages.insert(offset, page); - write_lock.queue.push_back(offset); + write_lock.pages.insert(col_id, (offset, page)); } } pub(crate) struct CachedPageReader { inner: SerializedPageReader, cache: Arc, + col_id: usize, } impl CachedPageReader { - pub(crate) fn new(inner: SerializedPageReader, cache: Arc) -> Self { - Self { inner, cache } + pub(crate) fn new( + inner: SerializedPageReader, + cache: Arc, + col_id: usize, + ) -> Self { + Self { + inner, + cache, + col_id, + } } } @@ -277,7 +296,7 @@ impl PageReader for CachedPageReader { return Ok(None); }; - let page = self.cache.get_page(offset); + let page = self.cache.get_page(self.col_id, offset); if let Some(page) = page { self.inner.skip_next_page()?; Ok(Some(page)) @@ -286,7 +305,8 @@ impl PageReader for CachedPageReader { let Some(inner_page) = inner_page else { return Ok(None); }; - self.cache.insert_page(offset, inner_page.clone()); + self.cache + .insert_page(self.col_id, offset, inner_page.clone()); Ok(Some(inner_page)) } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 8ea825dc4b72..e048077533b0 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -843,22 +843,28 @@ impl RowGroups for InMemoryRowGroup<'_> { .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - // let page_reader: Box = Box::new(SerializedPageReader::new( - // data.clone(), - // self.metadata.column(i), - // self.row_count, - // page_locations, - // )?); - - let page_reader: Box = Box::new(CachedPageReader::new( - SerializedPageReader::new( + let page_reader: Box = if std::env::var("CACHE_PAGES") + .map(|v| v == "1") + .unwrap_or(false) + { + Box::new(CachedPageReader::new( + SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?, + self.cache.clone(), + i, + )) + } else { + Box::new(SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, - )?, - self.cache.clone(), - )); + )?) + }; Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), From 66cbf946ddac283512e5daa60cdaf1f6e7cef0da Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 09:37:59 -0600 Subject: [PATCH 06/12] update --- parquet/Cargo.toml | 1 + .../src/arrow/array_reader/byte_view_array.rs | 6 +- .../src/arrow/async_reader/arrow_reader.rs | 58 ++++++++++++++----- parquet/src/arrow/buffer/view_buffer.rs | 7 +++ 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 4064baba0947..f78d776addab 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } +simdutf8 = "0.1.5" [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 5845e2c08cec..2fc4d04e0707 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -161,7 +161,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::default(); + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); let mut decoder = ByteViewArrayDecoderPlain::new( buf, num_values as usize, @@ -458,6 +458,8 @@ impl ByteViewArrayDecoderDictionary { } } + output.views.reserve(len); + // Calculate the offset of the dictionary buffers in the output buffers // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, // then the base_buffer_idx is 5 - 2 = 3 @@ -679,7 +681,7 @@ impl ByteViewArrayDecoderDelta { /// Check that `val` is a valid UTF-8 sequence pub fn check_valid_utf8(val: &[u8]) -> Result<()> { - match std::str::from_utf8(val) { + match simdutf8::basic::from_utf8(val) { Ok(_) => Ok(()), Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), } diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index f2f681cc7d37..91e651b77e29 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; @@ -24,6 +25,7 @@ use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; +use crate::basic::PageType; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::errors::ParquetError; use crate::{ @@ -217,12 +219,17 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } } +struct CachedPage { + dict: Option<(usize, Page)>, + data: Option<(usize, Page)>, +} + struct PageCacheInner { - pages: HashMap, // col_id -> (offset, page) + pages: HashMap, // col_id -> CachedPage } /// A simple cache for decompressed pages. -/// We cache only one page per column +/// We cache only one dictionary page and one data page per column pub(crate) struct PageCache { inner: RwLock, } @@ -240,22 +247,45 @@ impl PageCache { pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { let read_lock = self.inner.read().unwrap(); - read_lock - .pages - .get(&col_id) - .and_then(|(cached_offset, page)| { - if *cached_offset == offset { - Some(page) - } else { - None - } - }) - .cloned() + read_lock.pages.get(&col_id).and_then(|pages| { + pages + .dict + .iter() + .chain(pages.data.iter()) + .find(|(page_offset, _)| *page_offset == offset) + .map(|(_, page)| page.clone()) + }) } pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { let mut write_lock = self.inner.write().unwrap(); - write_lock.pages.insert(col_id, (offset, page)); + + let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + + let cached_pages = write_lock.pages.entry(col_id); + match cached_pages { + Entry::Occupied(mut entry) => { + if is_dict { + entry.get_mut().dict = Some((offset, page)); + } else { + entry.get_mut().data = Some((offset, page)); + } + } + Entry::Vacant(entry) => { + let cached_page = if is_dict { + CachedPage { + dict: Some((offset, page)), + data: None, + } + } else { + CachedPage { + dict: None, + data: Some((offset, page)), + } + }; + entry.insert(cached_page); + } + } } } diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 2256f4877d68..3d0e4b926770 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -33,6 +33,13 @@ pub struct ViewBuffer { } impl ViewBuffer { + pub fn with_capacity(view_capacity: usize, buffer_capacity: usize) -> Self { + Self { + views: Vec::with_capacity(view_capacity), + buffers: Vec::with_capacity(buffer_capacity), + } + } + pub fn is_empty(&self) -> bool { self.views.is_empty() } From 98844f4fd6352750b7a93ddae1d6a36edab2db6b Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 13:04:35 -0600 Subject: [PATCH 07/12] avoid recreating new buffers --- .../src/arrow/array_reader/byte_view_array.rs | 75 ++++++++++++++----- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 2fc4d04e0707..7bc7f5620418 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,6 +33,9 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Arc, LazyLock, Mutex}; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -127,11 +130,14 @@ impl ArrayReader for ByteViewArrayReader { /// A [`ColumnValueDecoder`] for variable length byte arrays struct ByteViewArrayColumnValueDecoder { - dict: Option, + dict: Option>, decoder: Option, validate_utf8: bool, } +pub(crate) static DICT_CACHE: LazyLock>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -144,6 +150,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { } } + #[inline(never)] fn set_dict( &mut self, buf: Bytes, @@ -161,18 +168,35 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); + let buf_id = buf.as_ptr() as usize; + + let mut cache = DICT_CACHE.lock().unwrap(); + + match cache.entry(buf_id) { + Entry::Vacant(v) => { + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + + let dict = Arc::new(buffer); + v.insert(dict.clone()); + self.dict = Some(dict); + } + Entry::Occupied(e) => { + // Remove and take ownership of the existing dictionary + self.dict = Some(e.remove()); + // self.dict = Some(e.get().clone()); + } + } Ok(()) } + #[inline(never)] fn set_data( &mut self, encoding: Encoding, @@ -190,22 +214,24 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { Ok(()) } + #[inline(never)] fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.read(out, num_values, self.dict.as_ref()) + decoder.read(out, num_values, self.dict.as_ref().map(|b| b.as_ref())) } + #[inline(never)] fn skip_values(&mut self, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.skip(num_values, self.dict.as_ref()) + decoder.skip(num_values, self.dict.as_ref().map(|b| b.as_ref())) } } @@ -255,6 +281,7 @@ impl ByteViewArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary + #[inline(never)] pub fn read( &mut self, out: &mut ViewBuffer, @@ -290,7 +317,7 @@ impl ByteViewArrayDecoder { /// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] pub struct ByteViewArrayDecoderPlain { - buf: Bytes, + buf: Buffer, offset: usize, validate_utf8: bool, @@ -307,6 +334,9 @@ impl ByteViewArrayDecoderPlain { num_values: Option, validate_utf8: bool, ) -> Self { + // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy + // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy + let buf = arrow_buffer::Buffer::from_bytes(buf.clone().into()); Self { buf, offset: 0, @@ -315,11 +345,21 @@ impl ByteViewArrayDecoderPlain { } } + #[inline(never)] pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy - // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy - let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into()); - let block_id = output.append_block(buf); + let need_to_create_new_buffer = { + if let Some(last_buffer) = output.buffers.last() { + last_buffer.ptr_eq(&self.buf) + } else { + true + } + }; + + let block_id = if need_to_create_new_buffer { + output.append_block(self.buf.clone()) + } else { + output.buffers.len() as u32 - 1 + }; let to_read = len.min(self.max_remaining_values); @@ -433,6 +473,7 @@ impl ByteViewArrayDecoderDictionary { /// Assumptions / Optimization /// This function checks if dict.buffers() are the last buffers in `output`, and if so /// reuses the dictionary page buffers directly without copying data + #[inline(never)] fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { if dict.is_empty() || len == 0 { return Ok(0); From bf740faf7591d5a785d32ccf4ba5d9ee3391fca8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 14:02:34 -0600 Subject: [PATCH 08/12] update --- .../src/arrow/array_reader/byte_view_array.rs | 72 +++++++++++-------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 7bc7f5620418..1f83de45a071 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,9 +33,9 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::{Arc, LazyLock, Mutex}; +// use std::collections::hash_map::Entry; +// use std::collections::HashMap; +use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -135,8 +135,8 @@ struct ByteViewArrayColumnValueDecoder { validate_utf8: bool, } -pub(crate) static DICT_CACHE: LazyLock>>> = - LazyLock::new(|| Mutex::new(HashMap::new())); +// pub(crate) static DICT_CACHE: LazyLock>>> = +// LazyLock::new(|| Mutex::new(HashMap::new())); impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -168,31 +168,43 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let buf_id = buf.as_ptr() as usize; - - let mut cache = DICT_CACHE.lock().unwrap(); - - match cache.entry(buf_id) { - Entry::Vacant(v) => { - let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - - let dict = Arc::new(buffer); - v.insert(dict.clone()); - self.dict = Some(dict); - } - Entry::Occupied(e) => { - // Remove and take ownership of the existing dictionary - self.dict = Some(e.remove()); - // self.dict = Some(e.get().clone()); - } - } + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + + let dict = Arc::new(buffer); + self.dict = Some(dict); + + // let buf_id = buf.as_ptr() as usize; + + // let mut cache = DICT_CACHE.lock().unwrap(); + + // match cache.entry(buf_id) { + // Entry::Vacant(v) => { + // let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + // let mut decoder = ByteViewArrayDecoderPlain::new( + // buf, + // num_values as usize, + // Some(num_values as usize), + // self.validate_utf8, + // ); + // decoder.read(&mut buffer, usize::MAX)?; + + // let dict = Arc::new(buffer); + // v.insert(dict.clone()); + // self.dict = Some(dict); + // } + // Entry::Occupied(e) => { + // // Remove and take ownership of the existing dictionary + // self.dict = Some(e.remove()); + // // self.dict = Some(e.get().clone()); + // } + // } Ok(()) } From 8296bde311d18ff5a8abc236a95b07d2a0debb7e Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 14:26:56 -0600 Subject: [PATCH 09/12] bug fix --- .../src/arrow/array_reader/byte_view_array.rs | 52 +++---------------- 1 file changed, 7 insertions(+), 45 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1f83de45a071..b99dcb3c9cc2 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,9 +33,6 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; -// use std::collections::hash_map::Entry; -// use std::collections::HashMap; -use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -130,14 +127,11 @@ impl ArrayReader for ByteViewArrayReader { /// A [`ColumnValueDecoder`] for variable length byte arrays struct ByteViewArrayColumnValueDecoder { - dict: Option>, + dict: Option, decoder: Option, validate_utf8: bool, } -// pub(crate) static DICT_CACHE: LazyLock>>> = -// LazyLock::new(|| Mutex::new(HashMap::new())); - impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -150,7 +144,6 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { } } - #[inline(never)] fn set_dict( &mut self, buf: Bytes, @@ -177,38 +170,11 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { ); decoder.read(&mut buffer, usize::MAX)?; - let dict = Arc::new(buffer); - self.dict = Some(dict); - - // let buf_id = buf.as_ptr() as usize; - - // let mut cache = DICT_CACHE.lock().unwrap(); - - // match cache.entry(buf_id) { - // Entry::Vacant(v) => { - // let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - // let mut decoder = ByteViewArrayDecoderPlain::new( - // buf, - // num_values as usize, - // Some(num_values as usize), - // self.validate_utf8, - // ); - // decoder.read(&mut buffer, usize::MAX)?; - - // let dict = Arc::new(buffer); - // v.insert(dict.clone()); - // self.dict = Some(dict); - // } - // Entry::Occupied(e) => { - // // Remove and take ownership of the existing dictionary - // self.dict = Some(e.remove()); - // // self.dict = Some(e.get().clone()); - // } - // } + self.dict = Some(buffer); + Ok(()) } - #[inline(never)] fn set_data( &mut self, encoding: Encoding, @@ -226,24 +192,22 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { Ok(()) } - #[inline(never)] fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.read(out, num_values, self.dict.as_ref().map(|b| b.as_ref())) + decoder.read(out, num_values, self.dict.as_ref()) } - #[inline(never)] fn skip_values(&mut self, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.skip(num_values, self.dict.as_ref().map(|b| b.as_ref())) + decoder.skip(num_values, self.dict.as_ref()) } } @@ -293,7 +257,6 @@ impl ByteViewArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary - #[inline(never)] pub fn read( &mut self, out: &mut ViewBuffer, @@ -357,11 +320,11 @@ impl ByteViewArrayDecoderPlain { } } - #[inline(never)] pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + // let block_id = output.append_block(self.buf.clone()); let need_to_create_new_buffer = { if let Some(last_buffer) = output.buffers.last() { - last_buffer.ptr_eq(&self.buf) + !last_buffer.ptr_eq(&self.buf) } else { true } @@ -485,7 +448,6 @@ impl ByteViewArrayDecoderDictionary { /// Assumptions / Optimization /// This function checks if dict.buffers() are the last buffers in `output`, and if so /// reuses the dictionary page buffers directly without copying data - #[inline(never)] fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { if dict.is_empty() || len == 0 { return Ok(0); From c43318e5158c614b5af1270522ec4709091f6e26 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 22:04:18 -0600 Subject: [PATCH 10/12] selective cache --- parquet/src/arrow/async_reader/mod.rs | 35 +++++++++++++++++++++------ parquet/src/arrow/mod.rs | 12 +++++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index e048077533b0..a3e6671270d7 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -511,6 +511,22 @@ where .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); + let mut predicate_projection: Option = None; + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + let p_projection = predicate.projection(); + if let Some(ref mut p) = predicate_projection { + p.union(&p_projection); + } else { + predicate_projection = Some(p_projection.clone()); + } + } + } + let projection_to_cache = predicate_projection.map(|mut p| { + p.intersect(&projection); + p + }); + let mut row_group = InMemoryRowGroup { metadata: meta, // schema: meta.schema_descr_ptr(), @@ -518,6 +534,7 @@ where column_chunks: vec![None; meta.columns().len()], offset_index, cache: Arc::new(PageCache::new()), + projection_to_cache, }; let mut selection = @@ -530,13 +547,13 @@ where return Ok((self, None)); } - let predicate_projection = predicate.projection(); + let p_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, Some(&selection)) + .fetch(&mut self.input, p_projection, Some(&selection)) .await?; let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; + build_array_reader(self.fields.as_deref(), p_projection, &row_group)?; filter_readers.push(array_reader); } } @@ -731,6 +748,7 @@ struct InMemoryRowGroup<'a> { column_chunks: Vec>>, row_count: usize, cache: Arc, + projection_to_cache: Option, } impl<'a> InMemoryRowGroup<'a> { @@ -843,10 +861,13 @@ impl RowGroups for InMemoryRowGroup<'_> { .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = if std::env::var("CACHE_PAGES") - .map(|v| v == "1") - .unwrap_or(false) - { + let cached_reader = if let Some(projection_to_cache) = &self.projection_to_cache { + projection_to_cache.leaf_included(i) + } else { + false + }; + + let page_reader: Box = if cached_reader { Box::new(CachedPageReader::new( SerializedPageReader::new( data.clone(), diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 9f593ab60f7f..69801319ca54 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -223,6 +223,18 @@ impl ProjectionMask { } } + /// Intersect two projection masks + pub fn intersect(&mut self, other: &Self) { + match (self.mask.as_ref(), other.mask.as_ref()) { + (None, _) => self.mask = other.mask.clone(), + (_, None) => {} + (Some(a), Some(b)) => { + let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect(); + self.mask = Some(mask); + } + } + } + /// Returns true if the mask contains the other mask pub fn contains(&self, other: &Self) -> bool { match (self.mask.as_ref(), other.mask.as_ref()) { From c4c2c97539b00b9c732d1aaf29e6e6140c431bff Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 22:27:14 -0600 Subject: [PATCH 11/12] clean up changes --- parquet/src/arrow/array_reader/builder.rs | 6 ------ parquet/src/arrow/array_reader/primitive_array.rs | 10 ---------- 2 files changed, 16 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 23f77a9ab96f..945f62526a7e 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -245,7 +245,6 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::INT32 => { if let Some(DataType::Null) = arrow_type { @@ -258,7 +257,6 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _ } } @@ -266,25 +264,21 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::BYTE_ARRAY => match arrow_type { Some(DataType::Dictionary(_, _)) => { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index a68311ebae23..f9abe846dddc 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,8 +80,6 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, - #[allow(unused)] - column_idx: usize, } impl PrimitiveArrayReader @@ -95,7 +93,6 @@ where pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, - column_idx: usize, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -113,7 +110,6 @@ where def_levels_buffer: None, rep_levels_buffer: None, record_reader, - column_idx, }) } } @@ -375,7 +371,6 @@ mod tests { Box::::default(), schema.column(0), None, - 0, ) .unwrap(); @@ -422,7 +417,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -493,7 +487,6 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, - 0, ) .expect("Unable to get array reader"); @@ -633,7 +626,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -713,7 +705,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -776,7 +767,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); From 44bc31f3e2de6011a7e6245eccd4d8ee6f9fcf39 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 1 Jan 2025 10:13:00 -0500 Subject: [PATCH 12/12] Avoid a copy --- parquet/src/file/serialized_reader.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 237cb9eaf47c..1eda8f9c3bc4 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -371,7 +371,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// Decodes a [`Page`] from the provided `buffer` pub(crate) fn decode_page( page_header: PageHeader, - buffer: Vec, + buffer: Bytes, physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { @@ -422,11 +422,10 @@ pub(crate) fn decode_page( )); } - decompressed + Bytes::from(decompressed) } _ => buffer, }; - let buffer = Bytes::from(buffer); let result = match page_header.type_ { PageType::DICTIONARY_PAGE => { @@ -673,7 +672,7 @@ impl PageReader for SerializedPageReader { decode_page( header, - buffer, + Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), )? @@ -702,7 +701,7 @@ impl PageReader for SerializedPageReader { let bytes = buffer.slice(offset..); decode_page( header, - bytes.to_vec(), + bytes, self.physical_type, self.decompressor.as_mut(), )?