Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve UX Rename FileScanConfig::new_exec to FileScanConfig::build #14670

Merged
merged 3 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl TableProvider for IndexTableProvider {
.with_file(partitioned_file);

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.new_exec())
Ok(file_scan_config.build())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the change -- use build rather than new_exec which I think is more consistent with the style of code in the rest of this crate (and the Rust builder style in general)

}

/// Tell DataFusion to push filters down to the scan method
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl TableProvider for IndexTableProvider {
file_size,
));
}
Ok(file_scan_config.new_exec())
Ok(file_scan_config.build())
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,10 @@ impl FileFormat for ArrowFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
conf = conf.with_source(Arc::new(ArrowSource::default()));
Ok(conf.new_exec())
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dive by cleanup to avoid mut and make it more concise

}

async fn create_writer_physical_plan(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ impl FileFormat for AvroFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
conf = conf.with_source(self.file_source());
Ok(conf.new_exec())
Ok(conf.with_source(self.file_source()).build())
}

fn file_source(&self) -> Arc<dyn FileSource> {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,7 @@ impl FileFormat for CsvFormat {
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
);
conf = conf.with_source(source);

Ok(conf.new_exec())
Ok(conf.with_source(source).build())
}

async fn create_writer_physical_plan(
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ impl FileFormat for JsonFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(JsonSource::new());
conf.file_compression_type = FileCompressionType::from(self.options.compression);
conf = conf.with_source(source);

Ok(conf.new_exec())
Ok(conf.with_source(source).build())
}

async fn create_writer_physical_plan(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl FileFormat for ParquetFormat {
async fn create_physical_plan(
&self,
_state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut predicate = None;
Expand All @@ -424,8 +424,7 @@ impl FileFormat for ParquetFormat {
if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
conf = conf.with_source(Arc::new(source));
Ok(conf.new_exec())
Ok(conf.with_source(Arc::new(source)).build())
}

async fn create_writer_physical_plan(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]));

let source_exec = conf.new_exec();
let source_exec = conf.build();
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -472,7 +472,7 @@ mod tests {
.with_file(meta.into())
.with_projection(projection);

let source_exec = conf.new_exec();
let source_exec = conf.build();
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -546,7 +546,7 @@ mod tests {
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);

let source_exec = conf.new_exec();
let source_exec = conf.build();

assert_eq!(
source_exec
Expand Down
38 changes: 19 additions & 19 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,14 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_file_compression_type(file_compression_type)
.with_newlines_in_values(false);
config.projection = Some(vec![0, 2, 4]);

let csv = config.new_exec();
.with_newlines_in_values(false)
.with_projection(Some(vec![0, 2, 4]));

assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();

assert_eq!(3, csv.schema().fields().len());

let mut stream = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -901,12 +901,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.projection = Some(vec![4, 0, 2]);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_projection(Some(vec![4, 0, 2]));
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(3, csv.schema().fields().len());

let mut stream = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -964,12 +964,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.limit = Some(5);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5));
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(13, csv.schema().fields().len());

let mut it = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -1024,12 +1024,12 @@ mod tests {
)?;

let source = Arc::new(CsvSource::new(true, b',', b'"'));
let mut config = partitioned_csv_config(file_schema, file_groups, source)
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
config.limit = Some(5);
let csv = config.new_exec();
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5));
assert_eq!(14, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(14, csv.schema().fields().len());

// errors due to https://github.com/apache/datafusion/issues/4918
Expand Down Expand Up @@ -1089,8 +1089,8 @@ mod tests {
// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway

let csv = config.new_exec();
assert_eq!(13, config.file_schema.fields().len());
let csv = config.build();
assert_eq!(2, csv.schema().fields().len());

let mut it = csv.execute(0, task_ctx)?;
Expand Down Expand Up @@ -1179,7 +1179,7 @@ mod tests {
let config = partitioned_csv_config(file_schema, file_groups, source)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned());
let csv = config.new_exec();
let csv = config.build();

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();
Expand Down
26 changes: 16 additions & 10 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
}

/// The base configurations to provide when creating a physical plan for
/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
///
/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
///
/// # Example
/// ```
/// # use std::sync::Arc;
Expand All @@ -79,6 +81,7 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
/// # use datafusion::datasource::physical_plan::FileScanConfig;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # use datafusion::datasource::physical_plan::ArrowSource;
/// use datafusion_physical_plan::ExecutionPlan;
/// # let file_schema = Arc::new(Schema::empty());
/// // create FileScan config for reading data from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
Expand All @@ -93,6 +96,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
/// ]);
/// // create an execution plan from the config
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried to add some docs that could make build easier to discover

/// let plan: Arc<dyn ExecutionPlan> = config.build();
/// ```
#[derive(Clone)]
pub struct FileScanConfig {
Expand Down Expand Up @@ -252,19 +257,20 @@ impl DataSource for FileScanConfig {
// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
Ok(all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = self.clone();
let file_scan = self.clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously calling new_exec always cloned. Now it is only cloned when necessary

let source = Arc::clone(&file_scan.source);
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.clone()
.unwrap_or((0..self.file_schema.fields().len()).collect()),
);
file_scan.projection = Some(new_projections);
// Assign projected statistics to source
file_scan = file_scan.with_source(source);

file_scan.new_exec() as _
file_scan
// Assign projected statistics to source
.with_projection(Some(new_projections))
.with_source(source)
.build() as _
}))
}
}
Expand Down Expand Up @@ -574,9 +580,9 @@ impl FileScanConfig {
}

// TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core
/// Returns a new [`DataSourceExec`] from file configurations
pub fn new_exec(&self) -> Arc<DataSourceExec> {
Arc::new(DataSourceExec::new(Arc::new(self.clone())))
/// Returns a new [`DataSourceExec`] to scan the files specified by this config
pub fn build(self) -> Arc<DataSourceExec> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed and changed to take self rather than &self

Arc::new(DataSourceExec::new(Arc::new(self)))
}

/// Write the data_type based on file_source
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();

// TODO: this is not where schema inference should be tested

Expand Down Expand Up @@ -660,7 +660,7 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();

let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
Expand Down Expand Up @@ -700,7 +700,7 @@ mod tests {
.with_file_groups(file_groups)
.with_projection(Some(vec![0, 2]))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 2);

Expand Down Expand Up @@ -745,7 +745,7 @@ mod tests {
.with_file_groups(file_groups)
.with_projection(Some(vec![3, 0, 2]))
.with_file_compression_type(file_compression_type.to_owned());
let exec = conf.new_exec();
let exec = conf.build();
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 3);

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ mod tests {

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let parquet_exec = base_config.new_exec();
let parquet_exec = base_config.clone().build();
RoundTripResult {
batches: collect(parquet_exec.clone(), task_ctx).await,
parquet_exec,
Expand Down Expand Up @@ -1354,7 +1354,7 @@ mod tests {
Arc::new(ParquetSource::default()),
)
.with_file_groups(file_groups)
.new_exec();
.build();
assert_eq!(
parquet_exec
.properties()
Expand Down Expand Up @@ -1468,7 +1468,7 @@ mod tests {
false,
),
])
.new_exec();
.build();
let partition_count = parquet_exec
.source()
.output_partitioning()
Expand Down Expand Up @@ -1531,7 +1531,7 @@ mod tests {
Arc::new(ParquetSource::default()),
)
.with_file(partitioned_file)
.new_exec();
.build();

let mut results = parquet_exec.execute(0, state.task_ctx())?;
let batch = results.next().await.unwrap();
Expand Down Expand Up @@ -2188,7 +2188,7 @@ mod tests {
extensions: None,
metadata_size_hint: None,
})
.new_exec();
.build();

let res = collect(exec, ctx.task_ctx()).await.unwrap();
assert_eq!(res.len(), 2);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ mod tests {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
.with_file(partitioned_file);

let parquet_exec = base_conf.new_exec();
let parquet_exec = base_conf.build();

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn scan_partitioned_csv(
let source = Arc::new(CsvSource::new(true, b'"', b'"'));
let config = partitioned_csv_config(schema, file_groups, source)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED);
Ok(config.new_exec())
Ok(config.build())
}

/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning `partitions` of `filename`
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl TestParquetFile {
) -> Result<Arc<dyn ExecutionPlan>> {
let parquet_options = ctx.copied_table_options().parquet;
let source = Arc::new(ParquetSource::new(parquet_options.clone()));
let mut scan_config = FileScanConfig::new(
let scan_config = FileScanConfig::new(
self.object_store_url.clone(),
Arc::clone(&self.schema),
source,
Expand Down Expand Up @@ -185,13 +185,12 @@ impl TestParquetFile {
Arc::clone(&scan_config.file_schema),
Arc::clone(&physical_filter_expr),
));
scan_config = scan_config.with_source(source);
let parquet_exec = scan_config.new_exec();
let parquet_exec = scan_config.with_source(source).build();

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
Ok(scan_config.new_exec())
Ok(scan_config.build())
}
}

Expand Down
Loading
Loading