Skip to content

Commit

Permalink
Add new configuration item listing_table_ignore_subdirectory (#8565)
Browse files Browse the repository at this point in the history
* init

* test

* add config

* rename

* doc

* fix doc

* add sqllogictests & rename

* fmt & fix test

* clippy

* test read partition table

* simplify testing

* simplify testing
  • Loading branch information
Asura7969 authored Dec 22, 2023
1 parent 26a488d commit ba46434
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 10 deletions.
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ config_namespace! {
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2

/// When scanning file paths, whether to ignore subdirectory files,
/// ignored by default (true), when reading a partitioned table,
/// `listing_table_ignore_subdirectory` is always equal to false, even if set to true
pub listing_table_ignore_subdirectory: bool, default = true

}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ pub async fn pruned_partition_list<'a>(
store.list(Some(&partition.path)).try_collect().await?
}
};

let files = files.into_iter().filter(move |o| {
let extension_match = o.location.as_ref().ends_with(file_extension);
let glob_match = table_path.contains(&o.location);
// here need to scan subdirectories(`listing_table_ignore_subdirectory` = false)
let glob_match = table_path.contains(&o.location, false);
extension_match && glob_match
});

Expand Down
26 changes: 21 additions & 5 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use datafusion_optimizer::OptimizerConfig;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
Expand Down Expand Up @@ -184,14 +185,27 @@ impl ListingTableUrl {
}

/// Returns `true` if `path` matches this [`ListingTableUrl`]
pub fn contains(&self, path: &Path) -> bool {
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
match self.strip_prefix(path) {
Some(mut segments) => match &self.glob {
Some(glob) => {
let stripped = segments.join("/");
glob.matches(&stripped)
if ignore_subdirectory {
segments
.next()
.map_or(false, |file_name| glob.matches(file_name))
} else {
let stripped = segments.join("/");
glob.matches(&stripped)
}
}
None => {
if ignore_subdirectory {
let has_subdirectory = segments.collect::<Vec<_>>().len() > 1;
!has_subdirectory
} else {
true
}
}
None => true,
},
None => false,
}
Expand Down Expand Up @@ -223,6 +237,8 @@ impl ListingTableUrl {
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
// If the prefix is a file, use a head request, otherwise list
let list = match self.is_collection() {
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
Expand All @@ -246,7 +262,7 @@ impl ListingTableUrl {
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path);
let glob_match = self.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.map_err(DataFusionError::ObjectStore)
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod tests {
use crate::dataframe::DataFrameWriteOptions;
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;
use datafusion_execution::config::SessionConfig;
use tempfile::tempdir;

use super::*;
Expand All @@ -103,8 +104,12 @@ mod tests {

#[tokio::test]
async fn read_with_glob_path_issue_2465() -> Result<()> {
let ctx = SessionContext::new();

let config =
SessionConfig::from_string_hash_map(std::collections::HashMap::from([(
"datafusion.execution.listing_table_ignore_subdirectory".to_owned(),
"false".to_owned(),
)]))?;
let ctx = SessionContext::new_with_config(config);
let df = ctx
.read_parquet(
// it was reported that when a path contains // (two consecutive separator) no files were found
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.minimum_parallel_output_files 4
Expand Down Expand Up @@ -224,6 +225,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.listing_table_ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
Expand Down
38 changes: 37 additions & 1 deletion datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,39 @@ LIMIT 10;
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))

# Test config listing_table_ignore_subdirectory:

query ITID
COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)
TO 'test_files/scratch/parquet/test_table/subdir/3.parquet'
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
----
3

statement ok
CREATE EXTERNAL TABLE listing_table
STORED AS PARQUET
WITH HEADER ROW
LOCATION 'test_files/scratch/parquet/test_table/*.parquet';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table;
----
12

# Clean up
statement ok
DROP TABLE timestamp_with_tz;
Expand Down Expand Up @@ -303,7 +336,6 @@ NULL
statement ok
DROP TABLE single_nan;


statement ok
CREATE EXTERNAL TABLE list_columns
STORED AS PARQUET
Expand All @@ -319,3 +351,7 @@ SELECT int64_list, utf8_list FROM list_columns

statement ok
DROP TABLE list_columns;

# Clean up
statement ok
DROP TABLE listing_table;
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |
| datafusion.execution.listing_table_ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true |
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
Expand Down

0 comments on commit ba46434

Please sign in to comment.