Skip to content

Commit

Permalink
Merge branch 'main' into visit-schema-ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann authored Feb 19, 2025
2 parents 38a7d33 + 16d2557 commit 1b51b4d
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 21 deletions.
14 changes: 13 additions & 1 deletion kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use tracing::debug;

use std::collections::{HashMap, HashSet};

/// ArrowEngineData holds an Arrow RecordBatch, implements `EngineData` so the kernel can extract from it.
pub use crate::engine::arrow_utils::fix_nested_null_masks;

/// ArrowEngineData holds an Arrow `RecordBatch`, implements `EngineData` so the kernel can extract from it.
///
/// WARNING: Row visitors require that all leaf columns of the record batch have correctly computed
/// NULL masks. The arrow parquet reader is known to produce incomplete NULL masks, for
/// example. When in doubt, call [`fix_nested_null_masks`] first.
pub struct ArrowEngineData {
data: RecordBatch,
}
Expand Down Expand Up @@ -43,6 +49,12 @@ impl From<RecordBatch> for ArrowEngineData {
}
}

impl From<StructArray> for ArrowEngineData {
fn from(value: StructArray) -> Self {
ArrowEngineData::new(value.into())
}
}

impl From<ArrowEngineData> for RecordBatch {
fn from(value: ArrowEngineData) -> Self {
value.data
Expand Down
170 changes: 168 additions & 2 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use crate::{
};

use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
RecordBatch, StringArray, StructArray,
cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray,
OffsetSizeTrait, RecordBatch, StringArray, StructArray,
};
use arrow_buffer::NullBuffer;
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
Expand Down Expand Up @@ -62,6 +63,21 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace()
}

/// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
/// accurate null masks that row visitors rely on for correctness.
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
) -> DeltaResult<T>
where
StructArray: Into<T>,
{
let data = reorder_struct_array(batch.into(), requested_ordering)?;
let data = fix_nested_null_masks(data);
Ok(data.into())
}

/*
* The code below implements proper pruning of columns when reading parquet, reordering of columns to
* match the specified schema, and insertion of null columns if the requested schema includes a
Expand Down Expand Up @@ -609,6 +625,53 @@ fn reorder_list<O: OffsetSizeTrait>(
}
}

/// Use this function to recursively compute properly unioned null masks for all nested
/// columns of a record batch, making it safe to project out and consume nested columns.
///
/// Arrow does not guarantee that the null masks associated with nested columns are accurate --
/// instead, the reader must consult the union of logical null masks the column and all
/// ancestors. The parquet reader stopped doing this automatically as of arrow-53.3, for example.
pub fn fix_nested_null_masks(batch: StructArray) -> StructArray {
compute_nested_null_masks(batch, None)
}

/// Splits a StructArray into its parts, unions in the parent null mask, and uses the result to
/// recursively update the children as well before putting everything back together.
fn compute_nested_null_masks(sa: StructArray, parent_nulls: Option<&NullBuffer>) -> StructArray {
let (fields, columns, nulls) = sa.into_parts();
let nulls = NullBuffer::union(parent_nulls, nulls.as_ref());
let columns = columns
.into_iter()
.map(|column| match column.as_struct_opt() {
Some(sa) => Arc::new(compute_nested_null_masks(sa.clone(), nulls.as_ref())) as _,
None => {
let data = column.to_data();
let nulls = NullBuffer::union(nulls.as_ref(), data.nulls());
let builder = data.into_builder().nulls(nulls);
// Use an unchecked build to avoid paying a redundant O(k) validation cost for a
// `RecordBatch` with k leaf columns.
//
// SAFETY: The builder was constructed from an `ArrayData` we extracted from the
// column. The change we make is the null buffer, via `NullBuffer::union` with input
// null buffers that were _also_ extracted from the column and its parent. A union
// can only _grow_ the set of NULL rows, so data validity is preserved. Even if the
// `parent_nulls` somehow had a length mismatch --- which it never should, having
// also been extracted from our grandparent --- the mismatch would have already
// caused `NullBuffer::union` to panic.
let data = unsafe { builder.build_unchecked() };
make_array(data)
}
})
.collect();

// Use an unchecked constructor to avoid paying O(n*k) a redundant null buffer validation cost
// for a `RecordBatch` with n rows and k leaf columns.
//
// SAFETY: We are simply reassembling the input `StructArray` we previously broke apart, with
// updated null buffers. See above for details about null buffer safety.
unsafe { StructArray::new_unchecked(fields, columns, nulls) }
}

/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
pub(crate) fn parse_json(
Expand Down Expand Up @@ -1432,4 +1495,107 @@ mod tests {
);
Ok(())
}

#[test]
fn test_arrow_broken_nested_null_masks() {
use crate::engine::arrow_utils::fix_nested_null_masks;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

// Parse some JSON into a nested schema
let schema = Arc::new(Schema::new(vec![Field::new(
"outer",
DataType::Struct(Fields::from(vec![
Field::new(
"inner_nullable",
DataType::Struct(Fields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
true,
),
Field::new(
"inner_non_null",
DataType::Struct(Fields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
false,
),
])),
true,
)]));
let json_string = r#"
{ }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 1 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 2, "leaf_nullable" : 3 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } }
"#;
let batch1 = arrow::json::ReaderBuilder::new(schema.clone())
.build(json_string.as_bytes())
.unwrap()
.next()
.unwrap()
.unwrap();
println!("Batch 1: {batch1:?}");

macro_rules! assert_nulls {
( $column: expr, $nulls: expr ) => {
assert_eq!($column.nulls().unwrap(), &NullBuffer::from(&$nulls[..]));
};
}

// If any of these tests ever fail, it means the arrow JSON reader started producing
// incomplete nested NULL masks. If that happens, we need to update all JSON reads to call
// `fix_nested_null_masks`.
let outer_1 = batch1.column(0).as_struct();
assert_nulls!(outer_1, [false, true, true, true, true]);
let inner_nullable_1 = outer_1.column(0).as_struct();
assert_nulls!(inner_nullable_1, [false, false, false, true, true]);
let nullable_leaf_non_null_1 = inner_nullable_1.column(0);
assert_nulls!(nullable_leaf_non_null_1, [false, false, false, true, true]);
let nullable_leaf_nullable_1 = inner_nullable_1.column(1);
assert_nulls!(nullable_leaf_nullable_1, [false, false, false, false, true]);
let inner_non_null_1 = outer_1.column(1).as_struct();
assert_nulls!(inner_non_null_1, [false, true, true, true, true]);
let non_null_leaf_non_null_1 = inner_non_null_1.column(0);
assert_nulls!(non_null_leaf_non_null_1, [false, true, true, true, true]);
let non_null_leaf_nullable_1 = inner_non_null_1.column(1);
assert_nulls!(non_null_leaf_nullable_1, [false, false, true, false, false]);

// Write the batch to a parquet file and read it back
let mut buffer = vec![];
let mut writer =
parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
writer.write(&batch1).unwrap();
writer.close().unwrap(); // writer must be closed to write footer
let batch2 = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer))
.unwrap()
.build()
.unwrap()
.next()
.unwrap()
.unwrap();
println!("Batch 2 before: {batch2:?}");

// Starting from arrow-53.3, the parquet reader started returning broken nested NULL masks.
let batch2 = RecordBatch::from(fix_nested_null_masks(batch2.into()));

// Verify the data survived the round trip
let outer_2 = batch2.column(0).as_struct();
assert_eq!(outer_2, outer_1);
let inner_nullable_2 = outer_2.column(0).as_struct();
assert_eq!(inner_nullable_2, inner_nullable_1);
let nullable_leaf_non_null_2 = inner_nullable_2.column(0);
assert_eq!(nullable_leaf_non_null_2, nullable_leaf_non_null_1);
let nullable_leaf_nullable_2 = inner_nullable_2.column(1);
assert_eq!(nullable_leaf_nullable_2, nullable_leaf_nullable_1);
let inner_non_null_2 = outer_2.column(1).as_struct();
assert_eq!(inner_non_null_2, inner_non_null_1);
let non_null_leaf_non_null_2 = inner_non_null_2.column(0);
assert_eq!(non_null_leaf_non_null_2, non_null_leaf_non_null_1);
let non_null_leaf_nullable_2 = inner_non_null_2.column(1);
assert_eq!(non_null_leaf_nullable_2, non_null_leaf_nullable_1);
}
}
16 changes: 3 additions & 13 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
Expand Down Expand Up @@ -281,12 +281,7 @@ impl FileOpener for ParquetOpener {

let stream = builder.with_batch_size(batch_size).build()?;

let stream = stream.map(move |rbr| {
// re-order each batch if needed
rbr.map_err(Error::Parquet).and_then(|rb| {
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
})
});
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
Ok(stream.boxed())
}))
}
Expand Down Expand Up @@ -355,12 +350,7 @@ impl FileOpener for PresignedUrlOpener {
let reader = builder.with_batch_size(batch_size).build()?;

let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
// re-order each batch if needed
rbr.map_err(Error::Arrow).and_then(|rb| {
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
})
});
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
Ok(stream.boxed())
}))
}
Expand Down
8 changes: 3 additions & 5 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};
Expand All @@ -28,10 +28,8 @@ fn try_create_from_parquet(
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate.as_ref());
}
Ok(builder.build()?.map(move |data| {
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
Ok(ArrowEngineData::new(reordered.into()))
}))
let stream = builder.build()?;
Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)))
}

impl ParquetHandler for SyncParquetHandler {
Expand Down

0 comments on commit 1b51b4d

Please sign in to comment.