Skip to content

Commit

Permalink
Move FileSinkConfig out of Core (#14585)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
logan-keede and alamb authored Feb 11, 2025
1 parent c0e78d2 commit 0af3169
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 127 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ async-compression = { version = "0.4.0", features = [
"zstd",
"tokio",
], optional = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.5.0", optional = true }
chrono = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand All @@ -56,14 +58,14 @@ glob = "0.3.0"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7.4", features = ["io"], optional = true }
url = { workspace = true }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
async-trait = { workspace = true }
tempfile = { workspace = true }

[lints]
Expand Down
114 changes: 114 additions & 0 deletions datafusion/catalog-listing/src/file_sink_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::{ListingTableUrl, PartitionedFile};
use arrow::datatypes::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::insert::DataSink;
use object_store::ObjectStore;
use std::sync::Arc;

/// General behaviors for files that do `DataSink` operations
#[async_trait]
pub trait FileSink: DataSink {
/// Retrieves the file sink configuration.
fn config(&self) -> &FileSinkConfig;

/// Spawns writer tasks and joins them to perform file writing operations.
/// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`.
///
/// This function handles the process of writing data to files by:
/// 1. Spawning tasks for writing data to individual files.
/// 2. Coordinating the tasks using a demuxer to distribute data among files.
/// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully.
///
/// # Parameters
/// - `context`: The execution context (`TaskContext`) that provides resources
/// like memory management and runtime environment.
/// - `demux_task`: A spawned task that handles demuxing, responsible for splitting
/// an input [`SendableRecordBatchStream`] into dynamically determined partitions.
/// See `start_demuxer_task()`
/// - `file_stream_rx`: A receiver that yields streams of record batches and their
/// corresponding file paths for writing. See `start_demuxer_task()`
/// - `object_store`: A handle to the object store where the files are written.
///
/// # Returns
/// - `Result<u64>`: Returns the total number of rows written across all files.
async fn spawn_writer_tasks_and_join(
&self,
context: &Arc<TaskContext>,
demux_task: SpawnedTask<Result<()>>,
file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64>;

/// File sink implementation of the [`DataSink::write_all`] method.
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let config = self.config();
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
self.spawn_writer_tasks_and_join(
context,
demux_task,
file_stream_rx,
object_store,
)
.await
}
}

/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
pub struct FileSinkConfig {
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file partition
pub file_groups: Vec<PartitionedFile>,
/// Vector of partition paths
pub table_paths: Vec<ListingTableUrl>,
/// The schema of the output file
pub output_schema: SchemaRef,
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls how new data should be written to the file, determining whether
/// to append to, overwrite, or replace records in existing files.
pub insert_op: InsertOp,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
/// File extension without a dot(.)
pub file_extension: String,
}

impl FileSinkConfig {
/// Get output schema
pub fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}
}
2 changes: 2 additions & 0 deletions datafusion/catalog-listing/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
pub mod file_compression_type;
pub mod file_groups;
pub mod file_sink_config;
pub mod helpers;
pub mod url;
pub mod write;
use chrono::TimeZone;
use datafusion_common::Result;
use datafusion_common::{ScalarValue, Statistics};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use crate::url::ListingTableUrl;
use crate::write::FileSinkConfig;
use datafusion_common::error::Result;
use datafusion_physical_plan::SendableRecordBatchStream;

use arrow::array::{
builder::UInt64Builder, cast::AsArray, downcast_dictionary_array, RecordBatch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
use std::io::Write;
use std::sync::Arc;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::file_compression_type::FileCompressionType;
use crate::file_sink_config::FileSinkConfig;
use datafusion_common::error::Result;

use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
Expand All @@ -33,18 +33,18 @@ use object_store::path::Path;
use object_store::ObjectStore;
use tokio::io::AsyncWrite;

pub(crate) mod demux;
pub(crate) mod orchestration;
pub mod demux;
pub mod orchestration;

/// A buffer with interior mutability shared by the SerializedFileWriter and
/// ObjectStore writer
#[derive(Clone)]
pub(crate) struct SharedBuffer {
pub struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
pub(crate) buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
Expand Down Expand Up @@ -79,7 +79,7 @@ pub trait BatchSerializer: Sync + Send {
/// with the specified compression.
/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
/// Users can configure automatic cleanup with their cloud provider.
pub(crate) async fn create_writer(
pub async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
Expand All @@ -91,7 +91,7 @@ pub(crate) async fn create_writer(
/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
pub(crate) fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
let schema = config.output_schema();
let partition_names: Vec<_> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;

use super::demux::DemuxedStreamReceiver;
use super::{create_writer, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::error::Result;
use crate::file_compression_type::FileCompressionType;
use datafusion_common::error::Result;

use arrow::array::RecordBatch;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
Expand Down Expand Up @@ -237,7 +237,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
/// Orchestrates multipart put of a dynamic number of output files from a single input stream
/// for any statelessly serialized file type. That is, any file type for which each [RecordBatch]
/// can be serialized independently of all other [RecordBatch]s.
pub(crate) async fn spawn_writer_tasks_and_join(
pub async fn spawn_writer_tasks_and_join(
context: &Arc<TaskContext>,
serializer: Arc<dyn BatchSerializer>,
compression: FileCompressionType,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
pub mod write;
pub use datafusion_catalog_listing::file_compression_type;
pub use datafusion_catalog_listing::write;

use std::any::Any;
use std::collections::{HashMap, VecDeque};
Expand Down
Loading

0 comments on commit 0af3169

Please sign in to comment.