Skip to content

Commit

Permalink
fix: Handle predicates on non-nullable columns without stats (#700)
Browse files Browse the repository at this point in the history
Fixes #698

## What changes are proposed in this pull request?

Updates the `DataSkippingFilter` to treat all columns as nullable for
the purpose of parsing stats, as suggested in
#698 (comment).

This is particularly important for partition columns, which won't have
values present in stats. But stats are also only usually stored for the
first 32 columns, so we shouldn't rely on stats being present for
non-partition fields either.

## How was this change tested?

I've added a new unit test.

I've also tested building duckdb-delta with this change (cherry-picked
onto 0.6.1) and verified that the code in #698 now works.
  • Loading branch information
adamreeve authored Feb 19, 2025
1 parent 16d2557 commit 2e4f555
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 6 deletions.
33 changes: 30 additions & 3 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ impl DataSkippingFilter {
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);

// Convert all fields into nullable, as stats may not be available for all columns
// (and usually aren't for partition columns).
struct NullableStatsTransform;
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: field.metadata.clone(),
}),
};
Some(field)
}
}

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
Expand All @@ -85,14 +107,19 @@ impl DataSkippingFilter {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform

let stats_schema = NullableStatsTransform
.transform_struct(&referenced_schema)?
.into_owned();

let nullcount_schema = NullCountStatsTransform
.transform_struct(&stats_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", nullcount_schema),
StructField::nullable("minValues", referenced_schema.clone()),
StructField::nullable("maxValues", referenced_schema),
StructField::nullable("minValues", stats_schema.clone()),
StructField::nullable("maxValues", stats_schema),
]));

// Skipping happens in several steps:
Expand Down
127 changes: 126 additions & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo,
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
record_batch_to_bytes, IntoArray, TestAction, METADATA,
record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA,
};
use url::Url;

Expand Down Expand Up @@ -906,6 +908,129 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").child(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").child(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("id"), 2);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?.raw_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
// Partition pruning is not yet implemented, so we still read the data for both partitions
assert_eq!(2, files_scanned);
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
// Add one file with stats, one file without
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];

// Disable writing Parquet statistics so these cannot be used for pruning row groups
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes_with_props(&batch_2, writer_props).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("val"), "g");
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?.raw_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
// One file is scanned as stats are missing so we don't know the predicate isn't satisfied
assert_eq!(1, files_scanned);

Ok(())
}

#[test]
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
Expand Down
12 changes: 10 additions & 2 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec<TestAction>) -> String {
/// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign
/// types
pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap();
record_batch_to_bytes_with_props(batch, props)
}

pub fn record_batch_to_bytes_with_props(
batch: &RecordBatch,
writer_properties: WriterProperties,
) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap();
writer.write(batch).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
Expand Down

0 comments on commit 2e4f555

Please sign in to comment.