From dea4a1b6a6320b0c317396e59524279f924971e7 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Thu, 18 Apr 2024 01:35:18 +0800 Subject: [PATCH] [minor] make parquet prune tests more readable (#10112) * [minor] make parquet prune tests more readable * typo --- datafusion/core/tests/parquet/mod.rs | 26 +++++---- datafusion/core/tests/parquet/page_pruning.rs | 56 +++++++++++++++++-- .../core/tests/parquet/row_group_pruning.rs | 13 +++-- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f90d0e8afb4c..d92a56d7fa04 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -81,8 +81,10 @@ enum Scenario { } enum Unit { - RowGroup, - Page, + // pass max row per row_group in parquet writer + RowGroup(usize), + // pass max row per page in parquet writer + Page(usize), } /// Test fixture that has an execution context that has an external @@ -185,13 +187,13 @@ impl ContextWithParquet { mut config: SessionConfig, ) -> Self { let file = match unit { - Unit::RowGroup => { + Unit::RowGroup(row_per_group) => { config = config.with_parquet_bloom_filter_pruning(true); - make_test_file_rg(scenario).await + make_test_file_rg(scenario, row_per_group).await } - Unit::Page => { + Unit::Page(row_per_page) => { config = config.with_parquet_page_index_pruning(true); - make_test_file_page(scenario).await + make_test_file_page(scenario, row_per_page).await } }; let parquet_path = file.path().to_string_lossy(); @@ -880,7 +882,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { } /// Create a test parquet file with various data types -async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { +async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_pruning") .suffix(".parquet") @@ -888,7 +890,7 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { .expect("tempfile creation"); let props = WriterProperties::builder() - .set_max_row_group_size(5) + .set_max_row_group_size(row_per_group) .set_bloom_filter_enabled(true) .build(); @@ -906,17 +908,17 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { output_file } -async fn make_test_file_page(scenario: Scenario) -> NamedTempFile { +async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_page_pruning") .suffix(".parquet") .tempfile() .expect("tempfile creation"); - // set row count to 5, should get same result as rowGroup + // set row count to row_per_page, should get same result as rowGroup let props = WriterProperties::builder() - .set_data_page_row_count_limit(5) - .set_write_batch_size(5) + .set_data_page_row_count_limit(row_per_page) + .set_write_batch_size(row_per_page) .build(); let batches = create_data_batch(scenario); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 1615a1c5766a..ccaa65b7ee5f 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -241,9 +241,10 @@ async fn test_prune( expected_errors: Option, expected_row_pages_pruned: Option, expected_results: usize, + row_per_page: usize, ) { let output: crate::parquet::TestOutput = - ContextWithParquet::new(case_data_type, Page) + ContextWithParquet::new(case_data_type, Page(row_per_page)) .await .query(sql) .await; @@ -272,6 +273,7 @@ async fn prune_timestamps_nanos() { Some(0), Some(5), 10, + 5, ) .await; } @@ -289,6 +291,7 @@ async fn prune_timestamps_micros() { Some(0), Some(5), 10, + 5, ) .await; } @@ -306,6 +309,7 @@ async fn prune_timestamps_millis() { Some(0), Some(5), 10, + 5, ) .await; } @@ -324,6 +328,7 @@ async fn prune_timestamps_seconds() { Some(0), Some(5), 10, + 5, ) .await; } @@ -341,6 +346,7 @@ async fn prune_date32() { Some(0), Some(15), 1, + 5, ) .await; } @@ -359,7 +365,7 @@ async fn prune_date64() { .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis())); - let output = ContextWithParquet::new(Scenario::Dates, Page) + let output = ContextWithParquet::new(Scenario::Dates, Page(5)) .await .query_with_expr(col("date64").lt(lit(date))) .await; @@ -387,6 +393,7 @@ macro_rules! int_tests { Some(0), Some(5), 11, + 5, ) .await; // result of sql "SELECT * FROM t where i < 1" is same as @@ -397,6 +404,7 @@ macro_rules! int_tests { Some(0), Some(5), 11, + 5, ) .await; } @@ -409,6 +417,7 @@ macro_rules! int_tests { Some(0), Some(15), 1, + 5, ) .await; @@ -418,6 +427,7 @@ macro_rules! int_tests { Some(0), Some(15), 1, + 5, ) .await; } @@ -430,6 +440,7 @@ macro_rules! int_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -441,6 +452,7 @@ macro_rules! int_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -453,6 +465,7 @@ macro_rules! int_tests { Some(0), Some(0), 3, + 5 ) .await; } @@ -465,6 +478,7 @@ macro_rules! int_tests { Some(0), Some(0), 2, + 5 ) .await; } @@ -477,6 +491,7 @@ macro_rules! int_tests { Some(0), Some(0), 9, + 5 ) .await; } @@ -490,6 +505,7 @@ macro_rules! int_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -503,6 +519,7 @@ macro_rules! int_tests { Some(0), Some(0), 19, + 5 ) .await; } @@ -531,6 +548,7 @@ macro_rules! uint_tests { Some(0), Some(5), 11, + 5 ) .await; } @@ -543,6 +561,7 @@ macro_rules! uint_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -555,6 +574,7 @@ macro_rules! uint_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -567,6 +587,7 @@ macro_rules! uint_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -579,6 +600,7 @@ macro_rules! uint_tests { Some(0), Some(0), 2, + 5 ) .await; } @@ -591,6 +613,7 @@ macro_rules! uint_tests { Some(0), Some(0), 2, + 5 ) .await; } @@ -604,6 +627,7 @@ macro_rules! uint_tests { Some(0), Some(15), 1, + 5 ) .await; } @@ -617,6 +641,7 @@ macro_rules! uint_tests { Some(0), Some(0), 19, + 5 ) .await; } @@ -642,6 +667,7 @@ async fn prune_f64_lt() { Some(0), Some(5), 11, + 5, ) .await; test_prune( @@ -650,6 +676,7 @@ async fn prune_f64_lt() { Some(0), Some(5), 11, + 5, ) .await; } @@ -664,6 +691,7 @@ async fn prune_f64_scalar_fun_and_gt() { Some(0), Some(10), 1, + 5, ) .await; } @@ -677,6 +705,7 @@ async fn prune_f64_scalar_fun() { Some(0), Some(0), 1, + 5, ) .await; } @@ -690,6 +719,7 @@ async fn prune_f64_complex_expr() { Some(0), Some(0), 9, + 5, ) .await; } @@ -703,6 +733,7 @@ async fn prune_f64_complex_expr_subtract() { Some(0), Some(0), 9, + 5, ) .await; } @@ -718,6 +749,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 6, + 5, ) .await; // compare with the casted decimal value @@ -727,6 +759,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 8, + 5, ) .await; @@ -737,6 +770,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 6, + 5, ) .await; // compare with the casted decimal value @@ -746,6 +780,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 8, + 5, ) .await; } @@ -761,6 +796,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -769,6 +805,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; @@ -779,6 +816,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -787,6 +825,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -795,6 +834,7 @@ async fn prune_decimal_eq() { Some(0), Some(10), 2, + 5, ) .await; } @@ -810,6 +850,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 5, + 5, ) .await; test_prune( @@ -818,6 +859,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 6, + 5, ) .await; @@ -828,6 +870,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 5, + 5, ) .await; test_prune( @@ -836,17 +879,18 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 6, + 5, ) .await; } #[tokio::test] async fn without_pushdown_filter() { - let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await; let output1 = context.query("SELECT * FROM t").await; - let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await; let output2 = context .query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')") @@ -887,6 +931,7 @@ async fn test_pages_with_null_values() { // (row_group1, page2), (row_group4, page2) Some(10), 22, + 5, ) .await; @@ -897,6 +942,7 @@ async fn test_pages_with_null_values() { // expect prune (row_group1, page2) and (row_group4, page2) = 10 rows Some(10), 29, + 5, ) .await; @@ -907,6 +953,7 @@ async fn test_pages_with_null_values() { // expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows Some(25), 11, + 5, ) .await; @@ -918,6 +965,7 @@ async fn test_pages_with_null_values() { // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows Some(30), 7, + 5, ) .await; } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b3f1fec1753b..d6de2b6f8ef0 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -100,7 +100,7 @@ impl RowGroupPruningTest { // Execute the test with the current configuration async fn test_row_group_prune(self) { - let output = ContextWithParquet::new(self.scenario, RowGroup) + let output = ContextWithParquet::new(self.scenario, RowGroup(5)) .await .query(&self.query) .await; @@ -231,7 +231,7 @@ async fn prune_date64() { .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis())); - let output = ContextWithParquet::new(Scenario::Dates, RowGroup) + let output = ContextWithParquet::new(Scenario::Dates, RowGroup(5)) .await .query_with_expr(col("date64").lt(lit(date))) // .query( @@ -267,10 +267,11 @@ async fn prune_disabled() { let expected_rows = 10; let config = SessionConfig::new().with_parquet_pruning(false); - let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config) - .await - .query(query) - .await; + let output = + ContextWithParquet::with_config(Scenario::Timestamps, RowGroup(5), config) + .await + .query(query) + .await; println!("{}", output.description()); // This should not prune any