Skip to content

Commit

Permalink
Also test missing stats for non-partition column
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Feb 18, 2025
1 parent e083aee commit 44c7588
Showing 1 changed file with 33 additions and 26 deletions.
59 changes: 33 additions & 26 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,24 +908,25 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
}

#[tokio::test]
async fn predicate_on_non_nullable_partition_col() -> Result<(), Box<dyn std::error::Error>> {
async fn predicate_on_non_nullable_columns() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
let batch_1 = generate_batch(vec![
("id", vec![1, 1, 1].into_array()),
("val", vec!["a", "b", "c"].into_array()),
])?;
let batch_2 = generate_batch(vec![
("id", vec![2, 2, 2].into_array()),
("val", vec!["d", "e", "f"].into_array()),
("val", vec!["d", "d", "d"].into_array()),
])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"d\"}},\"maxValues\":{{\"val\":\"f\"}}}}"}}}}"#),
// Stats are optional even for non-partition fields, test with no stats for val field in second file:
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
Expand All @@ -943,32 +944,38 @@ async fn predicate_on_non_nullable_partition_col() -> Result<(), Box<dyn std::er
.await?;

let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));

let table = Table::new(location);
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("id"), 2);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for batch in stream {
let result_batch = into_record_batch(batch?.raw_data?);
// Convert schema so that nullability matches the Delta table schema
let expected = batch_2.clone().with_schema(result_batch.schema())?;
assert_eq!(expected, result_batch);
files_scanned += 1;
for predicate in [
Expression::eq(column_expr!("id"), 2),
Expression::eq(column_expr!("val"), "d"),
]
.into_iter()
{
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for batch in stream {
let result_batch = into_record_batch(batch?.raw_data?);
// Convert schema so that nullability matches the Delta table schema
let expected = batch_2.clone().with_schema(result_batch.schema())?;
assert_eq!(expected, result_batch);
files_scanned += 1;
}
assert_eq!(1, files_scanned);
}
assert_eq!(1, files_scanned);
Ok(())
}

Expand Down

0 comments on commit 44c7588

Please sign in to comment.