Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce unified DataSourceExec for provided datasources, remove ParquetExec, CsvExec, etc #14224

Merged
merged 86 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
5b1714c
unify ParquetExec, AvroExec, ArrowExec, NDJsonExec, MemoryExec into o…
mertak-synnada Jan 7, 2025
d2fe022
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 7, 2025
70505b3
fix license headers
mertak-synnada Jan 7, 2025
00db759
fix compile errors on documents
mertak-synnada Jan 7, 2025
cea3ecd
separate non-parquet code
mertak-synnada Jan 7, 2025
2331577
format code
mertak-synnada Jan 8, 2025
15b812f
fix typo
mertak-synnada Jan 8, 2025
f020bd2
fix imports
mertak-synnada Jan 8, 2025
a7a5bd8
fix clippy
mertak-synnada Jan 8, 2025
94a306f
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 8, 2025
d69b012
add comment to the example
mertak-synnada Jan 8, 2025
f14a6d8
fix cargo docs
mertak-synnada Jan 8, 2025
8040147
change MemoryExec with MemorySourceConfig
mertak-synnada Jan 8, 2025
f540df0
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 10, 2025
ddb221d
merge fixes
mertak-synnada Jan 10, 2025
069d28c
change MemoryExec to DataSourceExec
mertak-synnada Jan 10, 2025
78dfce8
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 15, 2025
cb6a5ff
fix merge conflicts
mertak-synnada Jan 15, 2025
4acb2fc
apply some syntactic sugars
mertak-synnada Jan 15, 2025
ff68caa
fix imports and comment line
mertak-synnada Jan 15, 2025
7a62190
simplify some lines
mertak-synnada Jan 15, 2025
fd37183
rename source_config as file_source
mertak-synnada Jan 15, 2025
cf13028
format code
mertak-synnada Jan 15, 2025
201d8a0
format code
mertak-synnada Jan 15, 2025
7edc80f
make memory metrics default behavior
mertak-synnada Jan 15, 2025
fe25de3
remove unnecessary cfg check
mertak-synnada Jan 16, 2025
2452825
format code
mertak-synnada Jan 16, 2025
e084359
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 16, 2025
a4d5da3
remove ParquetExec strings
mertak-synnada Jan 16, 2025
a6c018e
fix documents and imports
mertak-synnada Jan 16, 2025
104c428
fix imports
mertak-synnada Jan 16, 2025
38134d3
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 17, 2025
4af421e
add constraints and fix tests
mertak-synnada Jan 20, 2025
91110cc
delete redundant file
mertak-synnada Jan 20, 2025
6c76b3f
make metrics and statistics a part of File type specific configurations
mertak-synnada Jan 20, 2025
12f0ac8
format code
mertak-synnada Jan 20, 2025
f528bf5
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 20, 2025
887922d
fix tests
mertak-synnada Jan 20, 2025
91fb10e
format code
mertak-synnada Jan 20, 2025
cb8c2ae
split repartitioning into DataSourceExec and FileSourceConfig parts
mertak-synnada Jan 21, 2025
64ccad7
move properties into DataSourceExec and split eq_properties and outpu…
mertak-synnada Jan 21, 2025
aa047d7
clone source with Arc
mertak-synnada Jan 21, 2025
93888a7
return file type as enum and do not downcast if not necessary
mertak-synnada Jan 21, 2025
44f21a2
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 21, 2025
aabcd04
format code
mertak-synnada Jan 21, 2025
18da494
re-add deprecated plans in order to support backward compatibility
mertak-synnada Jan 21, 2025
82b5257
reduce diff
mertak-synnada Jan 21, 2025
6ec3aee
fix doc
mertak-synnada Jan 21, 2025
aeb1954
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 23, 2025
f42c245
merge fixes
mertak-synnada Jan 23, 2025
3b8b4e9
remove unnecessary files
mertak-synnada Jan 23, 2025
de2c8e4
rename config structs to source
mertak-synnada Jan 24, 2025
7112f2f
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 24, 2025
23b272f
remove empty files
mertak-synnada Jan 24, 2025
0fe7f00
removed FileSourceConfig
mertak-synnada Jan 24, 2025
1eb5343
fix base_config formatting
mertak-synnada Jan 24, 2025
b03ed3d
format code
mertak-synnada Jan 24, 2025
7acb09a
fix repartition logic
mertak-synnada Jan 24, 2025
72440dd
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 3, 2025
165f0ce
fix merge conflicts
mertak-synnada Feb 3, 2025
184f293
fix csv projection error
mertak-synnada Feb 3, 2025
ae0e6a7
clippy fix
mertak-synnada Feb 3, 2025
e43a9fa
use new() on initialization
mertak-synnada Feb 3, 2025
105c488
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 4, 2025
d9d3d64
use DataSourceExec on deprecated file operators as well
mertak-synnada Feb 4, 2025
7fe6a28
move ParquetSource into source.rs
mertak-synnada Feb 4, 2025
a6cbab1
use ParquetSource only if parquet feature is enabled
mertak-synnada Feb 4, 2025
dd4c7ac
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 4, 2025
f2d31a0
fix slt tests
mertak-synnada Feb 4, 2025
4dd7b73
add with_fetch API to MemorySourceConfig and re-add deprecated Memory…
mertak-synnada Feb 5, 2025
25badc4
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 5, 2025
e8b6232
fix merge conflicts
mertak-synnada Feb 5, 2025
962494a
format code
mertak-synnada Feb 5, 2025
da67917
change FileType enum into a dyn Trait so that it can be extensible
mertak-synnada Feb 6, 2025
6957121
remove metadata_size_hint from required ParquetSource parameters
mertak-synnada Feb 6, 2025
d39e441
remove FileType trait and split with_predicate logic for ParquetSource
mertak-synnada Feb 6, 2025
4c32ca2
remove predicate from initialization of ParquetSource
mertak-synnada Feb 6, 2025
91071bd
remove unnecessary imports
mertak-synnada Feb 6, 2025
e2d5a8d
deprecate ParquetExecBuilder and add doc hints
mertak-synnada Feb 6, 2025
81179ae
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 6, 2025
9a32a0a
fix slt
mertak-synnada Feb 6, 2025
72059ee
fix clippy
mertak-synnada Feb 6, 2025
3e10283
fix fmt
mertak-synnada Feb 6, 2025
a952c41
return reference of the Arc in source()
mertak-synnada Feb 6, 2025
9734ddf
re-add deprecated exec files
mertak-synnada Feb 6, 2025
f797aaa
fix doc error
mertak-synnada Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,32 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command

use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;

use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::memory::MemorySourceConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_catalog::TableFunctionImpl;

use async_trait::async_trait;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Debug)]
pub enum Function {
Expand Down Expand Up @@ -241,11 +243,11 @@ impl TableProvider for ParquetMetadataTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?))
)?)
}
}

Expand Down
102 changes: 55 additions & 47 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, ParquetConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -42,22 +47,19 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

Expand All @@ -82,8 +84,8 @@ use url::Url;
/// Specifically, this example illustrates how to:
/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query
/// 2. Use [`PruningPredicate`] for predicate analysis
/// 3. Pass a row group selection to [`ParquetExec`]
/// 4. Pass a row selection (within a row group) to [`ParquetExec`]
/// 3. Pass a row group selection to [`ParquetConfig`]
/// 4. Pass a row selection (within a row group) to [`ParquetConfig`]
///
/// Note this is a *VERY* low level example for people who want to build their
/// own custom indexes (e.g. for low latency queries). Most users should use
Expand All @@ -93,38 +95,38 @@ use url::Url;
///
/// # Diagram
///
/// This diagram shows how the `ParquetExec` is configured to do only a single
/// This diagram shows how the `DataSourceExec` with `ParquetConfig` is configured to do only a single
/// (range) read from a parquet file, for the data that is needed. It does
/// not read the file footer or any of the row groups that are not needed.
///
/// ```text
/// ┌───────────────────────┐ The TableProvider configures the
/// │ ┌───────────────────┐ │ ParquetExec:
/// │ ┌───────────────────┐ │ DataSourceExec:
/// │ │ │ │
/// │ └───────────────────┘ │
/// │ ┌───────────────────┐ │
/// Row │ │ │ │ 1. To read only specific Row
/// Groups │ └───────────────────┘ │ Groups (the ParquetExec tries
/// Groups │ └───────────────────┘ │ Groups (the DataSourceExec tries
/// │ ┌───────────────────┐ │ to reduce this further based
/// │ │ │ │ on metadata)
/// │ └───────────────────┘ │ ┌────────────────────┐
/// │ ┌───────────────────┐ │ │ │
/// │ │ │◀┼ ─ ─ ┐ │ ParquetExec
/// │ └───────────────────┘ │ │ (Parquet Reader) │
/// │ ... │ └ ─ ─ ─ ─│ │
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗ │
/// │ │ │ │ │ ║ParquetMetadata║ │
/// │ └───────────────────┘ │ │ ╚═══════════════╝ │
/// │ ╔═══════════════════╗ │ └────────────────────┘
/// │ └───────────────────┘ │ ┌──────────────────────
/// │ ┌───────────────────┐ │ │
/// │ │ │◀┼ ─ ─ ┐ │ DataSourceExec
/// │ └───────────────────┘ │ │ (Parquet Reader)
/// │ ... │ └ ─ ─ ─ ─│
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗
/// │ │ │ │ │ ║ParquetMetadata║
/// │ └───────────────────┘ │ │ ╚═══════════════╝
/// │ ╔═══════════════════╗ │ └──────────────────────
/// │ ║ Thrift metadata ║ │
/// │ ╚═══════════════════╝ │ 1. With cached ParquetMetadata, so
/// └───────────────────────┘ the ParquetExec does not re-read /
/// └───────────────────────┘ the ParquetConfig does not re-read /
/// Parquet File decode the thrift footer
///
/// ```
///
/// Within a Row Group, Column Chunks store data in DataPages. This example also
/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges)
/// shows how to configure the ParquetConfig to read a `RowSelection` (row ranges)
/// which will skip unneeded data pages. This requires that the Parquet file has
/// a [Page Index].
///
Expand All @@ -134,15 +136,15 @@ use url::Url;
/// │ │ Data Page is not fetched or decoded.
/// │ ┌───────────────────┐ │ Note this requires a PageIndex
/// │ │ ┌──────────┐ │ │
/// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐
/// Groups │ │ └──────────┘ │ │ │ │
/// │ │ ┌──────────┐ │ │ │ ParquetExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader) │
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│ │
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗ │
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║ │
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝ │
/// │ └───────────────────┘ │ are selected, └────────────────────┘
/// Row │ │ │DataPage 0│ │ │ ┌──────────────────────
/// Groups │ │ └──────────┘ │ │ │
/// │ │ ┌──────────┐ │ │ │ DataSourceExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader)
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝
/// │ └───────────────────┘ │ are selected, └──────────────────────
/// │ │ only DataPage 1
/// │ ... │ is fetched and
/// │ │ decoded
Expand Down Expand Up @@ -210,7 +212,7 @@ async fn main() -> Result<()> {
// pages that must be decoded
//
// Note: in order to prune pages, the Page Index must be loaded and the
// ParquetExec will load it on demand if not present. To avoid a second IO
// DataSourceExec will load it on demand if not present. To avoid a second IO
// during query, this example loaded the Page Index preemptively by setting
// `ArrowReader::with_page_index` in `IndexedFile::try_new`
provider.set_use_row_selection(true);
Expand Down Expand Up @@ -477,7 +479,7 @@ impl TableProvider for IndexTableProvider {

let partitioned_file = indexed_file
.partitioned_file()
// provide the starting access plan to the ParquetExec by
// provide the starting access plan to the DataSourceExec by
// storing it as "extensions" on PartitionedFile
.with_extensions(Arc::new(access_plan) as _);

Expand All @@ -494,14 +496,20 @@ impl TableProvider for IndexTableProvider {
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);

// Finally, put it all together into a ParquetExec
Ok(ParquetExecBuilder::new(file_scan_config)
// provide the predicate so the ParquetExec can try and prune
// row groups internally
.with_predicate(predicate)
let source_config = Arc::new(
ParquetConfig::new(
Arc::clone(&file_scan_config.file_schema),
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
Some(predicate),
None,
TableParquetOptions::default(),
)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
// Finally, put it all together into a DataSourceExec
Ok(FileSourceConfig::new_exec(file_scan_config, source_config))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
43 changes: 22 additions & 21 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::data_source::FileSource;
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener},
physical_plan::{CsvConfig, FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -48,29 +50,24 @@ async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();

let config = CsvConfig::new(
8192,
schema.clone(),
Some(vec![12, 0]),
true,
b',',
b'"',
None,
object_store,
Some(b'#'),
);

let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);

let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), Arc::clone(&schema))
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let config = CsvConfig::new(true, b',', b'"')
.with_comment(Some(b'#'))
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);

let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;

let mut result = vec![];
let mut stream =
Expand Down Expand Up @@ -125,8 +122,12 @@ async fn json_opener() -> Result<()> {
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
let mut stream = FileStream::new(
&scan_config,
0,
Arc::new(opener),
&ExecutionPlanMetricsSet::new(),
)?;
let mut result = vec![];
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
Expand Down
25 changes: 16 additions & 9 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

use std::sync::Arc;

use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::source::DataSourceExec;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
Expand Down Expand Up @@ -95,15 +96,21 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileSourceConfig>()
{
if file_config.file_source().file_type().is_parquet() {
self.file_groups =
Some(file_config.base_config().file_groups.clone());

let metrics = match parquet_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
let metrics = match data_source.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
}
}
Ok(true)
}
Expand Down
Loading
Loading