Skip to content

Commit

Permalink
fix: Quadratic allocations when loading nested Parquet column metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Feb 3, 2025
1 parent 2df0404 commit e8a0d8e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
40 changes: 38 additions & 2 deletions crates/polars-parquet/src/parquet/metadata/column_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::ops::Deref;
use std::sync::Arc;

use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};
Expand All @@ -19,6 +22,39 @@ pub struct Descriptor {
pub max_rep_level: i16,
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub enum BaseType {
Owned(ParquetType),
Arc(Arc<ParquetType>),
}

impl BaseType {
pub fn into_arc(self) -> Self {
match self {
BaseType::Owned(t) => Self::Arc(Arc::new(t)),
BaseType::Arc(t) => Self::Arc(t),
}
}
}

impl PartialEq for BaseType {
fn eq(&self, other: &Self) -> bool {
self.deref() == other.deref()
}
}

impl Deref for BaseType {
type Target = ParquetType;

fn deref(&self) -> &Self::Target {
match self {
BaseType::Owned(i) => i,
BaseType::Arc(i) => i.as_ref(),
}
}
}

/// A descriptor for leaf-level primitive columns.
/// This encapsulates information such as definition and repetition levels and is used to
/// re-assemble nested data.
Expand All @@ -32,15 +68,15 @@ pub struct ColumnDescriptor {
pub path_in_schema: Vec<PlSmallStr>,

/// The [`ParquetType`] this descriptor is a leaf of
pub base_type: ParquetType,
pub base_type: BaseType,
}

impl ColumnDescriptor {
/// Creates new descriptor for leaf-level column.
pub fn new(
descriptor: Descriptor,
path_in_schema: Vec<PlSmallStr>,
base_type: ParquetType,
base_type: BaseType,
) -> Self {
Self {
descriptor,
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ impl InitColumnLookup for ColumnLookup {
/// Metadata for a row group.
#[derive(Debug, Clone, Default)]
pub struct RowGroupMetadata {
columns: Arc<[ColumnChunkMetadata]>,
// Moving of `ColumnChunkMetadata` is very expensive they are rather big. So, we arc the vec
// instead of having an arc slice. This way we don't to move the vec values into an arc when
// collecting.
columns: Arc<Vec<ColumnChunkMetadata>>,
column_lookup: PlHashMap<PlSmallStr, UnitVec<usize>>,
num_rows: usize,
total_byte_size: usize,
Expand Down Expand Up @@ -135,7 +138,8 @@ impl RowGroupMetadata {

Ok(column)
})
.collect::<ParquetResult<Arc<[_]>>>()?;
.collect::<ParquetResult<Vec<_>>>()?;
let columns = Arc::new(columns);

Ok(RowGroupMetadata {
columns,
Expand Down
11 changes: 6 additions & 5 deletions crates/polars-parquet/src/parquet/metadata/schema_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

use super::column_descriptor::{ColumnDescriptor, Descriptor};
use super::column_descriptor::{BaseType, ColumnDescriptor, Descriptor};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::io_message::from_message;
use crate::parquet::schema::types::{FieldInfo, ParquetType};
Expand All @@ -29,7 +29,7 @@ impl SchemaDescriptor {
let mut leaves = vec![];
for f in &fields {
let mut path = vec![];
build_tree(f, f, 0, 0, &mut leaves, &mut path);
build_tree(f, BaseType::Owned(f.clone()), 0, 0, &mut leaves, &mut path);
}

Self {
Expand Down Expand Up @@ -99,7 +99,7 @@ impl SchemaDescriptor {

fn build_tree<'a>(
tp: &'a ParquetType,
base_tp: &ParquetType,
base_tp: BaseType,
mut max_rep_level: i16,
mut max_def_level: i16,
leaves: &mut Vec<ColumnDescriptor>,
Expand Down Expand Up @@ -127,14 +127,15 @@ fn build_tree<'a>(
max_rep_level,
},
path_in_schema,
base_tp.clone(),
base_tp,
));
},
ParquetType::GroupType { ref fields, .. } => {
let base_tp = base_tp.into_arc();
for f in fields {
build_tree(
f,
base_tp,
base_tp.clone(),
max_rep_level,
max_def_level,
leaves,
Expand Down

0 comments on commit e8a0d8e

Please sign in to comment.