From 2e4f555bb685b796c70172182b2577ec9ae79d59 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 20 Feb 2025 05:40:15 +1300 Subject: [PATCH] fix: Handle predicates on non-nullable columns without stats (#700) Fixes #698 ## What changes are proposed in this pull request? Updates the `DataSkippingFilter` to treat all columns as nullable for the purpose of parsing stats, as suggested in https://github.com/delta-io/delta-kernel-rs/issues/698#issuecomment-2658229733. This is particularly important for partition columns, which won't have values present in stats. But stats are also only usually stored for the first 32 columns, so we shouldn't rely on stats being present for non-partition fields either. ## How was this change tested? I've added a new unit test. I've also tested building duckdb-delta with this change (cherry-picked onto 0.6.1) and verified that the code in #698 now works. --- kernel/src/scan/data_skipping.rs | 33 +++++++- kernel/tests/read.rs | 127 ++++++++++++++++++++++++++++++- test-utils/src/lib.rs | 12 ++- 3 files changed, 166 insertions(+), 6 deletions(-) diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 11181863d..057574744 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -75,6 +75,28 @@ impl DataSkippingFilter { let (predicate, referenced_schema) = physical_predicate?; debug!("Creating a data skipping filter for {:#?}", predicate); + // Convert all fields into nullable, as stats may not be available for all columns + // (and usually aren't for partition columns). + struct NullableStatsTransform; + impl<'a> SchemaTransform<'a> for NullableStatsTransform { + fn transform_struct_field( + &mut self, + field: &'a StructField, + ) -> Option> { + use Cow::*; + let field = match self.transform(&field.data_type)? { + Borrowed(_) if field.is_nullable() => Borrowed(field), + data_type => Owned(StructField { + name: field.name.clone(), + data_type: data_type.into_owned(), + nullable: true, + metadata: field.metadata.clone(), + }), + }; + Some(field) + } + } + // Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG) struct NullCountStatsTransform; impl<'a> SchemaTransform<'a> for NullCountStatsTransform { @@ -85,14 +107,19 @@ impl DataSkippingFilter { Some(Cow::Owned(PrimitiveType::Long)) } } - let nullcount_schema = NullCountStatsTransform + + let stats_schema = NullableStatsTransform .transform_struct(&referenced_schema)? .into_owned(); + + let nullcount_schema = NullCountStatsTransform + .transform_struct(&stats_schema)? + .into_owned(); let stats_schema = Arc::new(StructType::new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", nullcount_schema), - StructField::nullable("minValues", referenced_schema.clone()), - StructField::nullable("maxValues", referenced_schema), + StructField::nullable("minValues", stats_schema.clone()), + StructField::nullable("maxValues", stats_schema), ])); // Skipping happens in several steps: diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 9d5d24314..1ce9b9017 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -15,10 +15,12 @@ use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, use delta_kernel::scan::Scan; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::{Engine, FileMeta, Table}; +use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; use test_utils::{ actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch, - record_batch_to_bytes, IntoArray, TestAction, METADATA, + record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA, }; use url::Url; @@ -906,6 +908,129 @@ fn with_predicate_and_removes() -> Result<(), Box> { Ok(()) } +#[tokio::test] +async fn predicate_on_non_nullable_partition_column() -> Result<(), Box> { + // Test for https://github.com/delta-io/delta-kernel-rs/issues/698 + let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?; + + let storage = Arc::new(InMemory::new()); + let actions = [ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(), + r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(), + r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(), + format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + ]; + + add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?; + storage + .put( + &Path::from("id=1").child(PARQUET_FILE1), + record_batch_to_bytes(&batch).into(), + ) + .await?; + storage + .put( + &Path::from("id=2").child(PARQUET_FILE2), + record_batch_to_bytes(&batch).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let table = Table::new(location); + + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + )); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?); + + let predicate = Expression::eq(column_expr!("id"), 2); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let stream = scan.execute(engine)?; + + let mut files_scanned = 0; + for engine_data in stream { + let mut result_batch = into_record_batch(engine_data?.raw_data?); + let _ = result_batch.remove_column(result_batch.schema().index_of("id")?); + assert_eq!(&batch, &result_batch); + files_scanned += 1; + } + // Partition pruning is not yet implemented, so we still read the data for both partitions + assert_eq!(2, files_scanned); + Ok(()) +} + +#[tokio::test] +async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box> +{ + let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?; + let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?; + + let storage = Arc::new(InMemory::new()); + let actions = [ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(), + r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(), + r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(), + // Add one file with stats, one file without + format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#), + ]; + + // Disable writing Parquet statistics so these cannot be used for pruning row groups + let writer_props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?; + storage + .put( + &Path::from(PARQUET_FILE1), + record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(), + ) + .await?; + storage + .put( + &Path::from(PARQUET_FILE2), + record_batch_to_bytes_with_props(&batch_2, writer_props).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let table = Table::new(location); + + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + )); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?); + + let predicate = Expression::eq(column_expr!("val"), "g"); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let stream = scan.execute(engine)?; + + let mut files_scanned = 0; + for engine_data in stream { + let result_batch = into_record_batch(engine_data?.raw_data?); + assert_eq!(&batch_2, &result_batch); + files_scanned += 1; + } + // One file is scanned as stats are missing so we don't know the predicate isn't satisfied + assert_eq!(1, files_scanned); + + Ok(()) +} + #[test] fn short_dv() -> Result<(), Box> { let expected = vec![ diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 2605bea56..0aeee887d 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec) -> String { /// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign /// types pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec { - let mut data: Vec = Vec::new(); let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap(); + record_batch_to_bytes_with_props(batch, props) +} + +pub fn record_batch_to_bytes_with_props( + batch: &RecordBatch, + writer_properties: WriterProperties, +) -> Vec { + let mut data: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap(); writer.write(batch).expect("Writing batch"); // writer must be closed to write footer writer.close().unwrap();