Skip to content

Commit

Permalink
Improve UX Rename FileScanConfig::new_exec to `FileScanConfig::buil…
Browse files Browse the repository at this point in the history
…d` (#14670)

* Rename `FileScanConfig::new_exec` to `FileScanConfig::build`

* Update docs

* fix
  • Loading branch information
alamb authored Feb 15, 2025
1 parent 9681c3b commit a2e6c90
Show file tree
Hide file tree
Showing 28 changed files with 100 additions and 96 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl TableProvider for IndexTableProvider {
.with_file(partitioned_file);

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.new_exec())
Ok(file_scan_config.build())
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl TableProvider for IndexTableProvider {
file_size,
));
}
Ok(file_scan_config.new_exec())
Ok(file_scan_config.build())
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,10 @@ impl FileFormat for ArrowFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
conf = conf.with_source(Arc::new(ArrowSource::default()));
Ok(conf.new_exec())
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
}

async fn create_writer_physical_plan(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ impl FileFormat for AvroFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
conf = conf.with_source(self.file_source());
Ok(conf.new_exec())
Ok(conf.with_source(self.file_source()).build())
}

fn file_source(&self) -> Arc<dyn FileSource> {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,7 @@ impl FileFormat for CsvFormat {
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
);
conf = conf.with_source(source);

Ok(conf.new_exec())
Ok(conf.with_source(source).build())
}

async fn create_writer_physical_plan(
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ impl FileFormat for JsonFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(JsonSource::new());
conf.file_compression_type = FileCompressionType::from(self.options.compression);
conf = conf.with_source(source);

Ok(conf.new_exec())
Ok(conf.with_source(source).build())
}

async fn create_writer_physical_plan(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl FileFormat for ParquetFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut predicate = None;
Expand All @@ -424,8 +424,7 @@ impl FileFormat for ParquetFormat {
if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
conf = conf.with_source(Arc::new(source));
Ok(conf.new_exec())
Ok(conf.with_source(Arc::new(source)).build())
}

async fn create_writer_physical_plan(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]));

let source_exec = conf.new_exec();
let source_exec = conf.build();
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -472,7 +472,7 @@ mod tests {
.with_file(meta.into())
.with_projection(projection);

let source_exec = conf.new_exec();
let source_exec = conf.build();
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -546,7 +546,7 @@ mod tests {
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);

let source_exec = conf.new_exec();
let source_exec = conf.build();

assert_eq!(
source_exec
Expand Down
40 changes: 20 additions & 20 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl ExecutionPlan for CsvExec {
/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source)
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
/// .with_newlines_in_values(true); // The file contains newlines in values;
/// let exec = file_scan_config.new_exec();
/// let exec = file_scan_config.build();
/// ```
#[derive(Debug, Clone, Default)]
pub struct CsvSource {
Expand Down Expand Up @@ -836,14 +836,14 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_file_compression_type(file_compression_type)
.with_newlines_in_values(false);
config.projection = Some(vec![0, 2, 4]);

let csv = config.new_exec();
.with_newlines_in_values(false)
.with_projection(Some(vec![0, 2, 4]));

assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();

assert_eq!(3, csv.schema().fields().len());

let mut stream = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -901,12 +901,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.projection = Some(vec![4, 0, 2]);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_projection(Some(vec![4, 0, 2]));
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(3, csv.schema().fields().len());

let mut stream = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -964,12 +964,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.limit = Some(5);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5));
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(13, csv.schema().fields().len());

let mut it = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -1024,12 +1024,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.limit = Some(5);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5));
assert_eq!(14, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(14, csv.schema().fields().len());

// errors due to https://github.com/apache/datafusion/issues/4918
Expand Down Expand Up @@ -1089,8 +1089,8 @@ mod tests {
// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway

let csv = config.new_exec();
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(2, csv.schema().fields().len());

let mut it = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -1179,7 +1179,7 @@ mod tests {
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
let csv = config.new_exec();
let csv = config.build();

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();
Expand Down
40 changes: 26 additions & 14 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,30 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
}

/// The base configurations to provide when creating a physical plan for
/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
///
/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use arrow::datatypes::Schema;
/// # use arrow::datatypes::{Field, Fields, DataType, Schema};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::FileScanConfig;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # use datafusion::datasource::physical_plan::ArrowSource;
/// # let file_schema = Arc::new(Schema::empty());
/// // create FileScan config for reading data from file://
/// # use datafusion_physical_plan::ExecutionPlan;
/// # let file_schema = Arc::new(Schema::new(vec![
/// # Field::new("c1", DataType::Int32, false),
/// # Field::new("c2", DataType::Int32, false),
/// # Field::new("c3", DataType::Int32, false),
/// # Field::new("c4", DataType::Int32, false),
/// # ]));
/// // create FileScan config for reading arrow files from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
/// let config = FileScanConfig::new(object_store_url, file_schema, Arc::new(ArrowSource::default()))
/// let file_source = Arc::new(ArrowSource::default());
/// let config = FileScanConfig::new(object_store_url, file_schema, file_source)
/// .with_limit(Some(1000)) // read only the first 1000 records
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
Expand All @@ -93,6 +102,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
/// ]);
/// // create an execution plan from the config
/// let plan: Arc<dyn ExecutionPlan> = config.build();
/// ```
#[derive(Clone)]
pub struct FileScanConfig {
Expand Down Expand Up @@ -252,19 +263,20 @@ impl DataSource for FileScanConfig {
// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
Ok(all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = self.clone();
let file_scan = self.clone();
let source = Arc::clone(&file_scan.source);
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.clone()
.unwrap_or((0..self.file_schema.fields().len()).collect()),
);
file_scan.projection = Some(new_projections);
// Assign projected statistics to source
file_scan = file_scan.with_source(source);

file_scan.new_exec() as _
file_scan
// Assign projected statistics to source
.with_projection(Some(new_projections))
.with_source(source)
.build() as _
}))
}
}
Expand Down Expand Up @@ -574,9 +586,9 @@ impl FileScanConfig {
}

// TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core
/// Returns a new [`DataSourceExec`] from file configurations
pub fn new_exec(&self) -> Arc<DataSourceExec> {
Arc::new(DataSourceExec::new(Arc::new(self.clone())))
/// Returns a new [`DataSourceExec`] to scan the files specified by this config
pub fn build(self) -> Arc<DataSourceExec> {
Arc::new(DataSourceExec::new(Arc::new(self)))
}

/// Write the data_type based on file_source
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();

// TODO: this is not where schema inference should be tested

Expand Down Expand Up @@ -660,7 +660,7 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();

let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
Expand Down Expand Up @@ -700,7 +700,7 @@ mod tests {
.with_file_groups(file_groups)
.with_projection(Some(vec![0, 2]))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 2);

Expand Down Expand Up @@ -745,7 +745,7 @@ mod tests {
.with_file_groups(file_groups)
.with_projection(Some(vec![3, 0, 2]))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 3);

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ mod tests {

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let parquet_exec = base_config.new_exec();
let parquet_exec = base_config.clone().build();
RoundTripResult {
batches: collect(parquet_exec.clone(), task_ctx).await,
parquet_exec,
Expand Down Expand Up @@ -1354,7 +1354,7 @@ mod tests {
Arc::new(ParquetSource::default()),
)
.with_file_groups(file_groups)
.new_exec();
.build();
assert_eq!(
parquet_exec
.properties()
Expand Down Expand Up @@ -1468,7 +1468,7 @@ mod tests {
false,
),
])
.new_exec();
.build();
let partition_count = parquet_exec
.source()
.output_partitioning()
Expand Down Expand Up @@ -1531,7 +1531,7 @@ mod tests {
Arc::new(ParquetSource::default()),
)
.with_file(partitioned_file)
.new_exec();
.build();

let mut results = parquet_exec.execute(0, state.task_ctx())?;
let batch = results.next().await.unwrap();
Expand Down Expand Up @@ -2188,7 +2188,7 @@ mod tests {
extensions: None,
metadata_size_hint: None,
})
.new_exec();
.build();

let res = collect(exec, ctx.task_ctx()).await.unwrap();
assert_eq!(res.len(), 2);
Expand Down
Loading

0 comments on commit a2e6c90

Please sign in to comment.