Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some dependency removals and setup for refactor of FileScanConfig #14543

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use arrow::{
array::{AsArray, RecordBatch, StringArray, UInt8Array},
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use datafusion::common::{GetExt, Statistics};
use datafusion::datasource::data_source::FileSource;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
catalog::Session,
common::{GetExt, Statistics},
};
use datafusion::{
datasource::{
file_format::{
Expand All @@ -36,7 +39,6 @@ use datafusion::{
MemTable,
},
error::Result,
execution::context::SessionState,
physical_plan::ExecutionPlan,
prelude::SessionContext,
};
Expand Down Expand Up @@ -84,7 +86,7 @@ impl FileFormat for TSVFileFormat {

async fn infer_schema(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -95,7 +97,7 @@ impl FileFormat for TSVFileFormat {

async fn infer_stats(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
Expand All @@ -107,7 +109,7 @@ impl FileFormat for TSVFileFormat {

async fn create_physical_plan(
&self,
state: &SessionState,
state: &dyn Session,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -119,7 +121,7 @@ impl FileFormat for TSVFileFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -153,7 +155,7 @@ impl TSVFileFactory {
impl FileFormatFactory for TSVFileFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &std::collections::HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let mut new_options = format_options.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Logic for managing groups of [`PartitionedFile`]s in DataFusion

use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::{FileRange, PartitionedFile};
use itertools::Itertools;
use std::cmp::min;
use std::collections::BinaryHeap;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.

pub mod file_groups;
pub mod helpers;
pub mod url;

use chrono::TimeZone;
use datafusion_common::Result;
use datafusion_common::{ScalarValue, Statistics};
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ pub trait FileSource: Send + Sync {
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
/// Return true if the file format supports repartition
///
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
}
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ use crate::datasource::physical_plan::{
ArrowSource, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};

use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ArrowFormatFactory {
impl FileFormatFactory for ArrowFormatFactory {
fn create(
&self,
_state: &SessionState,
_state: &dyn Session,
_format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
Ok(Arc::new(ArrowFormat))
Expand Down Expand Up @@ -135,7 +135,7 @@ impl FileFormat for ArrowFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -159,7 +159,7 @@ impl FileFormat for ArrowFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -169,7 +169,7 @@ impl FileFormat for ArrowFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -180,7 +180,7 @@ impl FileFormat for ArrowFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use super::FileFormatFactory;
use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::physical_plan::{AvroSource, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

use crate::datasource::data_source::FileSource;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::internal_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::GetExt;
Expand All @@ -57,7 +57,7 @@ impl AvroFormatFactory {
impl FileFormatFactory for AvroFormatFactory {
fn create(
&self,
_state: &SessionState,
_state: &dyn Session,
_format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
Ok(Arc::new(AvroFormat))
Expand Down Expand Up @@ -112,7 +112,7 @@ impl FileFormat for AvroFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -137,7 +137,7 @@ impl FileFormat for AvroFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -147,7 +147,7 @@ impl FileFormat for AvroFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -505,7 +505,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow_schema::ArrowError;
use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions};
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{
Expand Down Expand Up @@ -95,9 +96,10 @@ impl Debug for CsvFormatFactory {
impl FileFormatFactory for CsvFormatFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let state = state.as_any().downcast_ref::<SessionState>().unwrap();
let csv_options = match &self.options {
None => {
let mut table_options = state.default_table_options();
Expand Down Expand Up @@ -365,7 +367,7 @@ impl FileFormat for CsvFormat {

async fn infer_schema(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand Down Expand Up @@ -400,7 +402,7 @@ impl FileFormat for CsvFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -410,7 +412,7 @@ impl FileFormat for CsvFormat {

async fn create_physical_plan(
&self,
state: &SessionState,
state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -440,7 +442,7 @@ impl FileFormat for CsvFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -485,7 +487,7 @@ impl CsvFormat {
/// number of lines that were read
async fn infer_schema_from_stream(
&self,
state: &SessionState,
state: &dyn Session,
mut records_to_read: usize,
stream: impl Stream<Item = Result<Bytes>>,
) -> Result<(Schema, usize)> {
Expand Down Expand Up @@ -1147,7 +1149,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::datasource::physical_plan::{
FileGroupDisplay, FileSink, FileSinkConfig, JsonSource,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::execution::SessionState;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
Expand All @@ -48,6 +48,7 @@ use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
Expand Down Expand Up @@ -87,9 +88,10 @@ impl JsonFormatFactory {
impl FileFormatFactory for JsonFormatFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let state = state.as_any().downcast_ref::<SessionState>().unwrap();
let json_options = match &self.options {
None => {
let mut table_options = state.default_table_options();
Expand Down Expand Up @@ -189,7 +191,7 @@ impl FileFormat for JsonFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand Down Expand Up @@ -237,7 +239,7 @@ impl FileFormat for JsonFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -247,7 +249,7 @@ impl FileFormat for JsonFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -261,7 +263,7 @@ impl FileFormat for JsonFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -538,7 +540,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
Loading