Skip to content

Commit

Permalink
Split test into two and fix partition column setup
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Feb 19, 2025
1 parent 44c7588 commit 873d0b5
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 44 deletions.
136 changes: 94 additions & 42 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
record_batch_to_bytes, IntoArray, TestAction, METADATA,
record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA,
};
use url::Url;

Expand Down Expand Up @@ -908,74 +909,125 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
}

#[tokio::test]
async fn predicate_on_non_nullable_columns() -> Result<(), Box<dyn std::error::Error>> {
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
let batch_1 = generate_batch(vec![
("id", vec![1, 1, 1].into_array()),
("val", vec!["a", "b", "c"].into_array()),
])?;
let batch_2 = generate_batch(vec![
("id", vec![2, 2, 2].into_array()),
("val", vec!["d", "d", "d"].into_array()),
])?;
let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
// Stats are optional even for non-partition fields, test with no stats for val field in second file:
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").child(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").child(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("id"), 2);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?.raw_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
// Partition pruning is not yet implemented, so we still read the data for both partitions
assert_eq!(2, files_scanned);
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
// Add one file with stats, one file without
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];

// Disable writing Parquet statistics so these cannot be used for pruning row groups
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch_1).into(),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch_2).into(),
record_batch_to_bytes_with_props(&batch_2, writer_props.clone()).into(),
)
.await?;

let location = Url::parse("memory:///")?;

let table = Table::new(location);

for predicate in [
Expression::eq(column_expr!("id"), 2),
Expression::eq(column_expr!("val"), "d"),
]
.into_iter()
{
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let stream = scan.execute(engine)?;
let predicate = Expression::eq(column_expr!("val"), "g");
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let mut files_scanned = 0;
for batch in stream {
let result_batch = into_record_batch(batch?.raw_data?);
// Convert schema so that nullability matches the Delta table schema
let expected = batch_2.clone().with_schema(result_batch.schema())?;
assert_eq!(expected, result_batch);
files_scanned += 1;
}
assert_eq!(1, files_scanned);
let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?.raw_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
// One file is scanned as stats are missing so we don't know the predicate isn't satisfied
assert_eq!(1, files_scanned);

Ok(())
}

Expand Down
12 changes: 10 additions & 2 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec<TestAction>) -> String {
/// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign
/// types
pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap();
record_batch_to_bytes_with_props(batch, props)
}

pub fn record_batch_to_bytes_with_props(
batch: &RecordBatch,
writer_properties: WriterProperties,
) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap();
writer.write(batch).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
Expand Down

0 comments on commit 873d0b5

Please sign in to comment.