-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve UX Rename FileScanConfig::new_exec
to FileScanConfig::build
#14670
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,11 +171,10 @@ impl FileFormat for ArrowFormat { | |
async fn create_physical_plan( | ||
&self, | ||
_state: &dyn Session, | ||
mut conf: FileScanConfig, | ||
conf: FileScanConfig, | ||
_filters: Option<&Arc<dyn PhysicalExpr>>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
conf = conf.with_source(Arc::new(ArrowSource::default())); | ||
Ok(conf.new_exec()) | ||
Ok(conf.with_source(Arc::new(ArrowSource::default())).build()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dive by cleanup to avoid mut and make it more concise |
||
} | ||
|
||
async fn create_writer_physical_plan( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,21 +68,30 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { | |
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val)) | ||
} | ||
|
||
/// The base configurations to provide when creating a physical plan for | ||
/// The base configurations for a [`DataSourceExec`], the a physical plan for | ||
/// any given file format. | ||
/// | ||
/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`. | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
/// # use arrow::datatypes::Schema; | ||
/// # use arrow::datatypes::{Field, Fields, DataType, Schema}; | ||
/// # use datafusion::datasource::listing::PartitionedFile; | ||
/// # use datafusion::datasource::physical_plan::FileScanConfig; | ||
/// # use datafusion_execution::object_store::ObjectStoreUrl; | ||
/// # use datafusion::datasource::physical_plan::ArrowSource; | ||
/// # let file_schema = Arc::new(Schema::empty()); | ||
/// // create FileScan config for reading data from file:// | ||
/// # use datafusion_physical_plan::ExecutionPlan; | ||
/// # let file_schema = Arc::new(Schema::new(vec![ | ||
/// # Field::new("c1", DataType::Int32, false), | ||
/// # Field::new("c2", DataType::Int32, false), | ||
/// # Field::new("c3", DataType::Int32, false), | ||
/// # Field::new("c4", DataType::Int32, false), | ||
/// # ])); | ||
/// // create FileScan config for reading arrow files from file:// | ||
/// let object_store_url = ObjectStoreUrl::local_filesystem(); | ||
/// let config = FileScanConfig::new(object_store_url, file_schema, Arc::new(ArrowSource::default())) | ||
/// let file_source = Arc::new(ArrowSource::default()); | ||
/// let config = FileScanConfig::new(object_store_url, file_schema, file_source) | ||
/// .with_limit(Some(1000)) // read only the first 1000 records | ||
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 | ||
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group | ||
|
@@ -93,6 +102,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { | |
/// PartitionedFile::new("file2.parquet", 56), | ||
/// PartitionedFile::new("file3.parquet", 78), | ||
/// ]); | ||
/// // create an execution plan from the config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also tried to add some docs that could make |
||
/// let plan: Arc<dyn ExecutionPlan> = config.build(); | ||
/// ``` | ||
#[derive(Clone)] | ||
pub struct FileScanConfig { | ||
|
@@ -252,19 +263,20 @@ impl DataSource for FileScanConfig { | |
// If there is any non-column or alias-carrier expression, Projection should not be removed. | ||
// This process can be moved into CsvExec, but it would be an overlap of their responsibility. | ||
Ok(all_alias_free_columns(projection.expr()).then(|| { | ||
let mut file_scan = self.clone(); | ||
let file_scan = self.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously calling |
||
let source = Arc::clone(&file_scan.source); | ||
let new_projections = new_projections_for_columns( | ||
projection, | ||
&file_scan | ||
.projection | ||
.clone() | ||
.unwrap_or((0..self.file_schema.fields().len()).collect()), | ||
); | ||
file_scan.projection = Some(new_projections); | ||
// Assign projected statistics to source | ||
file_scan = file_scan.with_source(source); | ||
|
||
file_scan.new_exec() as _ | ||
file_scan | ||
// Assign projected statistics to source | ||
.with_projection(Some(new_projections)) | ||
.with_source(source) | ||
.build() as _ | ||
})) | ||
} | ||
} | ||
|
@@ -574,9 +586,9 @@ impl FileScanConfig { | |
} | ||
|
||
// TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core | ||
/// Returns a new [`DataSourceExec`] from file configurations | ||
pub fn new_exec(&self) -> Arc<DataSourceExec> { | ||
Arc::new(DataSourceExec::new(Arc::new(self.clone()))) | ||
/// Returns a new [`DataSourceExec`] to scan the files specified by this config | ||
pub fn build(self) -> Arc<DataSourceExec> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed and changed to take |
||
Arc::new(DataSourceExec::new(Arc::new(self))) | ||
} | ||
|
||
/// Write the data_type based on file_source | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically the change -- use
build
rather thannew_exec
which I think is more consistent with the style of code in the rest of this crate (and the Rust builder style in general)