Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move FileSinkConfig out of Core #14585

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ async-compression = { version = "0.4.0", features = [
"zstd",
"tokio",
], optional = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.5.0", optional = true }
chrono = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand All @@ -57,14 +59,14 @@ glob = "0.3.0"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7.4", features = ["io"], optional = true }
url = { workspace = true }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
async-trait = { workspace = true }
tempfile = { workspace = true }

[lints]
Expand Down
114 changes: 114 additions & 0 deletions datafusion/catalog-listing/src/file_sink_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::{ListingTableUrl, PartitionedFile};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::insert::DataSink;
use object_store::ObjectStore;
use std::sync::Arc;

/// General behaviors for files that do `DataSink` operations
#[async_trait]
pub trait FileSink: DataSink {
/// Retrieves the file sink configuration.
fn config(&self) -> &FileSinkConfig;

/// Spawns writer tasks and joins them to perform file writing operations.
/// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`.
///
/// This function handles the process of writing data to files by:
/// 1. Spawning tasks for writing data to individual files.
/// 2. Coordinating the tasks using a demuxer to distribute data among files.
/// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully.
///
/// # Parameters
/// - `context`: The execution context (`TaskContext`) that provides resources
/// like memory management and runtime environment.
/// - `demux_task`: A spawned task that handles demuxing, responsible for splitting
/// an input [`SendableRecordBatchStream`] into dynamically determined partitions.
/// See `start_demuxer_task()`
/// - `file_stream_rx`: A receiver that yields streams of record batches and their
/// corresponding file paths for writing. See `start_demuxer_task()`
/// - `object_store`: A handle to the object store where the files are written.
///
/// # Returns
/// - `Result<u64>`: Returns the total number of rows written across all files.
async fn spawn_writer_tasks_and_join(
&self,
context: &Arc<TaskContext>,
demux_task: SpawnedTask<Result<()>>,
file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64>;

/// File sink implementation of the [`DataSink::write_all`] method.
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let config = self.config();
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
self.spawn_writer_tasks_and_join(
context,
demux_task,
file_stream_rx,
object_store,
)
.await
}
}

/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
pub struct FileSinkConfig {
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file partition
pub file_groups: Vec<PartitionedFile>,
/// Vector of partition paths
pub table_paths: Vec<ListingTableUrl>,
/// The schema of the output file
pub output_schema: SchemaRef,
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls how new data should be written to the file, determining whether
/// to append to, overwrite, or replace records in existing files.
pub insert_op: InsertOp,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
/// File extension without a dot(.)
pub file_extension: String,
}

impl FileSinkConfig {
/// Get output schema
pub fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}
}
2 changes: 2 additions & 0 deletions datafusion/catalog-listing/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

pub mod file_compression_type;
pub mod file_groups;
pub mod file_sink_config;
pub mod helpers;
pub mod url;
pub mod write;
use chrono::TimeZone;
use datafusion_common::Result;
use datafusion_common::{ScalarValue, Statistics};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use crate::url::ListingTableUrl;
use crate::write::FileSinkConfig;
use datafusion_common::error::Result;
use datafusion_physical_plan::SendableRecordBatchStream;

use arrow::array::{
builder::UInt64Builder, cast::AsArray, downcast_dictionary_array, RecordBatch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
use std::io::Write;
use std::sync::Arc;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::file_compression_type::FileCompressionType;
use crate::file_sink_config::FileSinkConfig;
use datafusion_common::error::Result;

use arrow::array::RecordBatch;
use arrow_schema::Schema;
Expand All @@ -33,18 +33,18 @@ use object_store::path::Path;
use object_store::ObjectStore;
use tokio::io::AsyncWrite;

pub(crate) mod demux;
pub(crate) mod orchestration;
pub mod demux;
pub mod orchestration;

/// A buffer with interior mutability shared by the SerializedFileWriter and
/// ObjectStore writer
#[derive(Clone)]
pub(crate) struct SharedBuffer {
pub struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
pub(crate) buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
Expand Down Expand Up @@ -79,7 +79,7 @@ pub trait BatchSerializer: Sync + Send {
/// with the specified compression.
/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
/// Users can configure automatic cleanup with their cloud provider.
pub(crate) async fn create_writer(
pub async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
Expand All @@ -91,7 +91,7 @@ pub(crate) async fn create_writer(
/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
pub(crate) fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
let schema = config.output_schema();
let partition_names: Vec<_> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;

use super::demux::DemuxedStreamReceiver;
use super::{create_writer, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::error::Result;
use crate::file_compression_type::FileCompressionType;
use datafusion_common::error::Result;

use arrow::array::RecordBatch;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
Expand Down Expand Up @@ -237,7 +237,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
/// Orchestrates multipart put of a dynamic number of output files from a single input stream
/// for any statelessly serialized file type. That is, any file type for which each [RecordBatch]
/// can be serialized independently of all other [RecordBatch]s.
pub(crate) async fn spawn_writer_tasks_and_join(
pub async fn spawn_writer_tasks_and_join(
context: &Arc<TaskContext>,
serializer: Arc<dyn BatchSerializer>,
compression: FileCompressionType,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
pub mod write;
pub use datafusion_catalog_listing::file_compression_type;
pub use datafusion_catalog_listing::write;

use std::any::Any;
use std::collections::{HashMap, VecDeque};
Expand Down
Loading