Skip to content

Commit

Permalink
some dependency removals and setup for refactor of FileScanConfig (#…
Browse files Browse the repository at this point in the history
…14543)

* some dependency removals and setup for refactor

* fix: CI for linux build

* move FileGroupsPartitioner

* remove old

* Fix supports_repartition

* fix

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
logan-keede and alamb authored Feb 7, 2025
1 parent d5f19f3 commit 7ccc6d7
Show file tree
Hide file tree
Showing 18 changed files with 97 additions and 73 deletions.
16 changes: 9 additions & 7 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use arrow::{
array::{AsArray, RecordBatch, StringArray, UInt8Array},
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use datafusion::common::{GetExt, Statistics};
use datafusion::datasource::data_source::FileSource;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
catalog::Session,
common::{GetExt, Statistics},
};
use datafusion::{
datasource::{
file_format::{
Expand All @@ -36,7 +39,6 @@ use datafusion::{
MemTable,
},
error::Result,
execution::context::SessionState,
physical_plan::ExecutionPlan,
prelude::SessionContext,
};
Expand Down Expand Up @@ -84,7 +86,7 @@ impl FileFormat for TSVFileFormat {

async fn infer_schema(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -95,7 +97,7 @@ impl FileFormat for TSVFileFormat {

async fn infer_stats(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
Expand All @@ -107,7 +109,7 @@ impl FileFormat for TSVFileFormat {

async fn create_physical_plan(
&self,
state: &SessionState,
state: &dyn Session,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -119,7 +121,7 @@ impl FileFormat for TSVFileFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -153,7 +155,7 @@ impl TSVFileFactory {
impl FileFormatFactory for TSVFileFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &std::collections::HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let mut new_options = format_options.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::{FileRange, PartitionedFile};
use itertools::Itertools;
use std::cmp::min;
use std::collections::BinaryHeap;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.
pub mod file_groups;
pub mod helpers;
pub mod url;

use chrono::TimeZone;
use datafusion_common::Result;
use datafusion_common::{ScalarValue, Statistics};
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ pub trait FileSource: Send + Sync {
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
/// Return true if the file format supports repartition
///
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
}
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ use crate::datasource::physical_plan::{
ArrowSource, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};

use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ArrowFormatFactory {
impl FileFormatFactory for ArrowFormatFactory {
fn create(
&self,
_state: &SessionState,
_state: &dyn Session,
_format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
Ok(Arc::new(ArrowFormat))
Expand Down Expand Up @@ -135,7 +135,7 @@ impl FileFormat for ArrowFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -159,7 +159,7 @@ impl FileFormat for ArrowFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -169,7 +169,7 @@ impl FileFormat for ArrowFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -180,7 +180,7 @@ impl FileFormat for ArrowFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use super::FileFormatFactory;
use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::physical_plan::{AvroSource, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

use crate::datasource::data_source::FileSource;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::internal_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::GetExt;
Expand All @@ -57,7 +57,7 @@ impl AvroFormatFactory {
impl FileFormatFactory for AvroFormatFactory {
fn create(
&self,
_state: &SessionState,
_state: &dyn Session,
_format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
Ok(Arc::new(AvroFormat))
Expand Down Expand Up @@ -112,7 +112,7 @@ impl FileFormat for AvroFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -137,7 +137,7 @@ impl FileFormat for AvroFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -147,7 +147,7 @@ impl FileFormat for AvroFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -505,7 +505,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow_schema::ArrowError;
use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions};
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{
Expand Down Expand Up @@ -95,9 +96,10 @@ impl Debug for CsvFormatFactory {
impl FileFormatFactory for CsvFormatFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let state = state.as_any().downcast_ref::<SessionState>().unwrap();
let csv_options = match &self.options {
None => {
let mut table_options = state.default_table_options();
Expand Down Expand Up @@ -365,7 +367,7 @@ impl FileFormat for CsvFormat {

async fn infer_schema(
&self,
state: &SessionState,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand Down Expand Up @@ -400,7 +402,7 @@ impl FileFormat for CsvFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -410,7 +412,7 @@ impl FileFormat for CsvFormat {

async fn create_physical_plan(
&self,
state: &SessionState,
state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -440,7 +442,7 @@ impl FileFormat for CsvFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -485,7 +487,7 @@ impl CsvFormat {
/// number of lines that were read
async fn infer_schema_from_stream(
&self,
state: &SessionState,
state: &dyn Session,
mut records_to_read: usize,
stream: impl Stream<Item = Result<Bytes>>,
) -> Result<(Schema, usize)> {
Expand Down Expand Up @@ -1147,7 +1149,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::datasource::physical_plan::{
FileGroupDisplay, FileSink, FileSinkConfig, JsonSource,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::execution::SessionState;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
Expand All @@ -48,6 +48,7 @@ use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
Expand Down Expand Up @@ -87,9 +88,10 @@ impl JsonFormatFactory {
impl FileFormatFactory for JsonFormatFactory {
fn create(
&self,
state: &SessionState,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let state = state.as_any().downcast_ref::<SessionState>().unwrap();
let json_options = match &self.options {
None => {
let mut table_options = state.default_table_options();
Expand Down Expand Up @@ -189,7 +191,7 @@ impl FileFormat for JsonFormat {

async fn infer_schema(
&self,
_state: &SessionState,
_state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand Down Expand Up @@ -237,7 +239,7 @@ impl FileFormat for JsonFormat {

async fn infer_stats(
&self,
_state: &SessionState,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
Expand All @@ -247,7 +249,7 @@ impl FileFormat for JsonFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
_state: &dyn Session,
mut conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -261,7 +263,7 @@ impl FileFormat for JsonFormat {
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -538,7 +540,7 @@ mod tests {
}

async fn get_exec(
state: &SessionState,
state: &dyn Session,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
Loading

0 comments on commit 7ccc6d7

Please sign in to comment.