diff --git a/crates/core/src/operations/write/async_utils.rs b/crates/core/src/operations/write/async_utils.rs new file mode 100644 index 0000000000..9fac4b062a --- /dev/null +++ b/crates/core/src/operations/write/async_utils.rs @@ -0,0 +1,82 @@ +//! Async Sharable Buffer for async writer +//! + +use std::sync::Arc; + +use futures::TryFuture; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncWrite; +use tokio::sync::RwLock as TokioRwLock; + +/// An in-memory buffer that allows for shared ownership and interior mutability. +/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance +/// allows multiple owners to have access to the same underlying buffer. +#[derive(Debug, Default, Clone)] +pub struct AsyncShareableBuffer { + buffer: Arc>>, +} + +impl AsyncShareableBuffer { + /// Consumes this instance and returns the underlying buffer. + /// Returns `None` if there are other references to the instance. + pub async fn into_inner(self) -> Option> { + Arc::try_unwrap(self.buffer) + .ok() + .map(|lock| lock.into_inner()) + } + + /// Returns a clone of the underlying buffer as a `Vec`. + pub async fn to_vec(&self) -> Vec { + let inner = self.buffer.read().await; + inner.clone() + } + + /// Returns the number of bytes in the underlying buffer. + pub async fn len(&self) -> usize { + let inner = self.buffer.read().await; + inner.len() + } + + /// Returns `true` if the underlying buffer is empty. + pub async fn is_empty(&self) -> bool { + let inner = self.buffer.read().await; + inner.is_empty() + } + + /// Creates a new instance with the buffer initialized from the provided bytes. + pub fn from_bytes(bytes: &[u8]) -> Self { + Self { + buffer: Arc::new(TokioRwLock::new(bytes.to_vec())), + } + } +} + +impl AsyncWrite for AsyncShareableBuffer { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.clone(); + let buf = buf.to_vec(); + + let fut = async move { + let mut buffer = this.buffer.write().await; + buffer.extend_from_slice(&buf); + Ok(buf.len()) + }; + + tokio::pin!(fut); + fut.try_poll(cx) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index a07053cb47..7277411109 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -23,6 +23,7 @@ //! let table = ops.write(vec![batch]).await?; //! ```` +pub(crate) mod async_utils; pub mod configs; pub(crate) mod execution; pub(crate) mod generated_columns; diff --git a/crates/core/src/operations/write/writer.rs b/crates/core/src/operations/write/writer.rs index 9b63a6ef8e..9d901670c6 100644 --- a/crates/core/src/operations/write/writer.rs +++ b/crates/core/src/operations/write/writer.rs @@ -1,6 +1,7 @@ //! Abstractions and implementations for writing data to delta tables use std::collections::HashMap; +use std::sync::OnceLock; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; @@ -9,11 +10,13 @@ use delta_kernel::expressions::Scalar; use futures::{StreamExt, TryStreamExt}; use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; -use parquet::arrow::ArrowWriter; +use parquet::arrow::AsyncArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use tokio::task::JoinSet; use tracing::debug; +use super::async_utils::AsyncShareableBuffer; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, PartitionsExt}; @@ -22,12 +25,35 @@ use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; use crate::writer::stats::create_add; use crate::writer::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, - ShareableBuffer, }; // TODO databricks often suggests a file size of 100mb, should we set this default? const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600; const DEFAULT_WRITE_BATCH_SIZE: usize = 1024; +const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5; + +fn upload_part_size() -> usize { + static UPLOAD_SIZE: OnceLock = OnceLock::new(); + *UPLOAD_SIZE.get_or_init(|| { + std::env::var("DELTARS_UPLOAD_PART_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .map(|size| { + if size < DEFAULT_UPLOAD_PART_SIZE { + // Minimum part size in GCS and S3 + debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB."); + DEFAULT_UPLOAD_PART_SIZE + } else if size > 1024 * 1024 * 1024 * 5 { + // Maximum part size in GCS and S3 + debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB."); + 1024 * 1024 * 1024 * 5 + } else { + size + } + }) + .unwrap_or(DEFAULT_UPLOAD_PART_SIZE) + }) +} #[derive(thiserror::Error, Debug)] enum WriteError { @@ -122,7 +148,6 @@ impl WriterConfig { } } -#[derive(Debug)] /// A parquet writer implementation tailored to the needs of writing data to a delta table. pub struct DeltaWriter { /// An object store pointing at Delta table root @@ -286,13 +311,12 @@ impl PartitionWriterConfig { /// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files. /// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes. /// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log. -#[derive(Debug)] pub struct PartitionWriter { object_store: ObjectStoreRef, writer_id: uuid::Uuid, config: PartitionWriterConfig, - buffer: ShareableBuffer, - arrow_writer: ArrowWriter, + buffer: AsyncShareableBuffer, + arrow_writer: AsyncArrowWriter, part_counter: usize, files_written: Vec, /// Num index cols to collect stats for @@ -309,8 +333,8 @@ impl PartitionWriter { num_indexed_cols: i32, stats_columns: Option>, ) -> DeltaResult { - let buffer = ShareableBuffer::default(); - let arrow_writer = ArrowWriter::try_new( + let buffer = AsyncShareableBuffer::default(); + let arrow_writer = AsyncArrowWriter::try_new( buffer.clone(), config.file_schema.clone(), Some(config.writer_properties.clone()), @@ -340,9 +364,11 @@ impl PartitionWriter { ) } - fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter, ShareableBuffer)> { - let new_buffer = ShareableBuffer::default(); - let arrow_writer = ArrowWriter::try_new( + fn reset_writer( + &mut self, + ) -> DeltaResult<(AsyncArrowWriter, AsyncShareableBuffer)> { + let new_buffer = AsyncShareableBuffer::default(); + let arrow_writer = AsyncArrowWriter::try_new( new_buffer.clone(), self.config.file_schema.clone(), Some(self.config.writer_properties.clone()), @@ -353,20 +379,20 @@ impl PartitionWriter { )) } - fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - Ok(self.arrow_writer.write(batch)?) + async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { + Ok(self.arrow_writer.write(batch).await?) } async fn flush_arrow_writer(&mut self) -> DeltaResult<()> { // replace counter / buffers and close the current writer let (writer, buffer) = self.reset_writer()?; - let metadata = writer.close()?; + let metadata = writer.close().await?; // don't write empty file if metadata.num_rows == 0 { return Ok(()); } - let buffer = match buffer.into_inner() { + let mut buffer = match buffer.into_inner().await { Some(buffer) => Bytes::from(buffer), None => return Ok(()), // Nothing to write }; @@ -376,7 +402,33 @@ impl PartitionWriter { let file_size = buffer.len() as i64; // write file to object store - self.object_store.put(&path, buffer.into()).await?; + let mut multi_part_upload = self.object_store.put_multipart(&path).await?; + let part_size = upload_part_size(); + let mut tasks = JoinSet::new(); + let max_concurrent_tasks = 10; // TODO: make configurable + + while buffer.len() > part_size { + let part = buffer.split_to(part_size); + let upload_future = multi_part_upload.put_part(part.into()); + + // wait until one spot frees up before spawning new task + if tasks.len() >= max_concurrent_tasks { + tasks.join_next().await; + } + tasks.spawn(upload_future); + } + + if !buffer.is_empty() { + let upload_future = multi_part_upload.put_part(buffer.into()); + tasks.spawn(upload_future); + } + + // wait for all remaining tasks to complete + while let Some(result) = tasks.join_next().await { + result.map_err(|e| DeltaTableError::generic(e.to_string()))??; + } + + multi_part_upload.complete().await?; self.files_written.push( create_add( @@ -412,9 +464,9 @@ impl PartitionWriter { let max_offset = batch.num_rows(); for offset in (0..max_offset).step_by(self.config.write_batch_size) { let length = usize::min(self.config.write_batch_size, max_offset - offset); - self.write_batch(&batch.slice(offset, length))?; + self.write_batch(&batch.slice(offset, length)).await?; // flush currently buffered data to disk once we meet or exceed the target file size. - let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size(); + let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size(); if estimated_size >= self.config.target_file_size { debug!( "Writing file with estimated size {:?} to disk.",