Skip to content

Commit

Permalink
Merge branch 'main' into visit-schema-ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann authored Feb 19, 2025
2 parents 1b51b4d + 2e4f555 commit 3db0d7b
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 6 deletions.
33 changes: 30 additions & 3 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ impl DataSkippingFilter {
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);

// Convert all fields into nullable, as stats may not be available for all columns
// (and usually aren't for partition columns).
struct NullableStatsTransform;
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: field.metadata.clone(),
}),
};
Some(field)
}
}

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
Expand All @@ -85,14 +107,19 @@ impl DataSkippingFilter {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform

let stats_schema = NullableStatsTransform
.transform_struct(&referenced_schema)?
.into_owned();

let nullcount_schema = NullCountStatsTransform
.transform_struct(&stats_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", nullcount_schema),
StructField::nullable("minValues", referenced_schema.clone()),
StructField::nullable("maxValues", referenced_schema),
StructField::nullable("minValues", stats_schema.clone()),
StructField::nullable("maxValues", stats_schema),
]));

// Skipping happens in several steps:
Expand Down
127 changes: 126 additions & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo,
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
record_batch_to_bytes, IntoArray, TestAction, METADATA,
record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA,
};
use url::Url;

Expand Down Expand Up @@ -906,6 +908,129 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").child(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").child(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("id"), 2);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?.raw_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
// Partition pruning is not yet implemented, so we still read the data for both partitions
assert_eq!(2, files_scanned);
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
// Add one file with stats, one file without
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];

// Disable writing Parquet statistics so these cannot be used for pruning row groups
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes_with_props(&batch_2, writer_props).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("val"), "g");
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?.raw_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
// One file is scanned as stats are missing so we don't know the predicate isn't satisfied
assert_eq!(1, files_scanned);

Ok(())
}

#[test]
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
Expand Down
12 changes: 10 additions & 2 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec<TestAction>) -> String {
/// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign
/// types
pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap();
record_batch_to_bytes_with_props(batch, props)
}

pub fn record_batch_to_bytes_with_props(
batch: &RecordBatch,
writer_properties: WriterProperties,
) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap();
writer.write(batch).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
Expand Down

0 comments on commit 3db0d7b

Please sign in to comment.