Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: async writer + multi-part #3255

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ pub mod set_tbl_properties;
pub mod update;
#[cfg(feature = "datafusion")]
pub mod write;
pub mod writer;

#[async_trait]
pub trait CustomExecuteHandler: Send + Sync {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tracing::*;
use uuid::Uuid;

use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
use super::write::writer::{PartitionWriter, PartitionWriterConfig};
use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{scalars::ScalarExt, Action, PartitionsExt, Remove};
Expand Down
82 changes: 82 additions & 0 deletions crates/core/src/operations/write/async_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Async Sharable Buffer for async writer
//!
use std::sync::Arc;

use futures::TryFuture;

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use tokio::sync::RwLock as TokioRwLock;

/// An in-memory buffer that allows for shared ownership and interior mutability.
/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance
/// allows multiple owners to have access to the same underlying buffer.
#[derive(Debug, Default, Clone)]
pub struct AsyncShareableBuffer {
buffer: Arc<TokioRwLock<Vec<u8>>>,
}

impl AsyncShareableBuffer {
/// Consumes this instance and returns the underlying buffer.
/// Returns `None` if there are other references to the instance.
pub async fn into_inner(self) -> Option<Vec<u8>> {
Arc::try_unwrap(self.buffer)
.ok()
.map(|lock| lock.into_inner())
}

/// Returns a clone of the underlying buffer as a `Vec`.
pub async fn to_vec(&self) -> Vec<u8> {
let inner = self.buffer.read().await;
inner.clone()
}

/// Returns the number of bytes in the underlying buffer.
pub async fn len(&self) -> usize {
let inner = self.buffer.read().await;
inner.len()
}

/// Returns `true` if the underlying buffer is empty.
pub async fn is_empty(&self) -> bool {
let inner = self.buffer.read().await;
inner.is_empty()
}

/// Creates a new instance with the buffer initialized from the provided bytes.
pub fn from_bytes(bytes: &[u8]) -> Self {
Self {
buffer: Arc::new(TokioRwLock::new(bytes.to_vec())),
}
}
}

impl AsyncWrite for AsyncShareableBuffer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let this = self.clone();
let buf = buf.to_vec();

let fut = async move {
let mut buffer = this.buffer.write().await;
buffer.extend_from_slice(&buf);
Ok(buf.len())
};

tokio::pin!(fut);
fut.try_poll(cx)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
2 changes: 1 addition & 1 deletion crates/core/src/operations/write/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use parquet::file::properties::WriterProperties;
use tracing::log::*;
use uuid::Uuid;

use super::writer::{DeltaWriter, WriterConfig};
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, DeltaScanConfigBuilder, DeltaTableProvider};
use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, AddCDCFile, Remove, StructType, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::operations::cdc::should_write_cdc;
use crate::operations::writer::{DeltaWriter, WriterConfig};
use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
//! let table = ops.write(vec![batch]).await?;
//! ````

pub(crate) mod async_utils;
pub mod configs;
pub(crate) mod execution;
pub(crate) mod generated_columns;
pub(crate) mod metrics;
pub(crate) mod schema_evolution;
pub mod writer;

use arrow_schema::Schema;
pub use configs::WriterStatsConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Abstractions and implementations for writing data to delta tables
use std::collections::HashMap;
use std::sync::OnceLock;

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
Expand All @@ -9,11 +10,13 @@ use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::task::JoinSet;
use tracing::debug;

use super::async_utils::AsyncShareableBuffer;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, PartitionsExt};
Expand All @@ -22,12 +25,35 @@ use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
ShareableBuffer,
};

// TODO databricks often suggests a file size of 100mb, should we set this default?
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;

fn upload_part_size() -> usize {
static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
*UPLOAD_SIZE.get_or_init(|| {
std::env::var("DELTARS_UPLOAD_PART_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.map(|size| {
if size < DEFAULT_UPLOAD_PART_SIZE {
// Minimum part size in GCS and S3
debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
DEFAULT_UPLOAD_PART_SIZE
} else if size > 1024 * 1024 * 1024 * 5 {
// Maximum part size in GCS and S3
debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
1024 * 1024 * 1024 * 5
} else {
size
}
})
.unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
})
}

#[derive(thiserror::Error, Debug)]
enum WriteError {
Expand Down Expand Up @@ -122,7 +148,6 @@ impl WriterConfig {
}
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
/// An object store pointing at Delta table root
Expand Down Expand Up @@ -286,13 +311,12 @@ impl PartitionWriterConfig {
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
#[derive(Debug)]
pub struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
buffer: AsyncShareableBuffer,
arrow_writer: AsyncArrowWriter<AsyncShareableBuffer>,
part_counter: usize,
files_written: Vec<Add>,
/// Num index cols to collect stats for
Expand All @@ -309,8 +333,8 @@ impl PartitionWriter {
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
) -> DeltaResult<Self> {
let buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
let buffer = AsyncShareableBuffer::default();
let arrow_writer = AsyncArrowWriter::try_new(
buffer.clone(),
config.file_schema.clone(),
Some(config.writer_properties.clone()),
Expand Down Expand Up @@ -340,9 +364,11 @@ impl PartitionWriter {
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
fn reset_writer(
&mut self,
) -> DeltaResult<(AsyncArrowWriter<AsyncShareableBuffer>, AsyncShareableBuffer)> {
let new_buffer = AsyncShareableBuffer::default();
let arrow_writer = AsyncArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Some(self.config.writer_properties.clone()),
Expand All @@ -353,20 +379,20 @@ impl PartitionWriter {
))
}

fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch)?)
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch).await?)
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
let metadata = writer.close().await?;
// don't write empty file
if metadata.num_rows == 0 {
return Ok(());
}

let buffer = match buffer.into_inner() {
let mut buffer = match buffer.into_inner().await {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), // Nothing to write
};
Expand All @@ -376,7 +402,33 @@ impl PartitionWriter {
let file_size = buffer.len() as i64;

// write file to object store
self.object_store.put(&path, buffer.into()).await?;
let mut multi_part_upload = self.object_store.put_multipart(&path).await?;
let part_size = upload_part_size();
let mut tasks = JoinSet::new();
let max_concurrent_tasks = 10; // TODO: make configurable
Comment on lines -379 to +408
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really such a major behavior change. I am not terribly familiar with the maturity level of multipart uploads in object_store I don't think this is necessarily a bad change, but I am doubtful of this addressing the originally linked issue.

As best as I can tell the buffers are still going to fill up memory until the flush, and then the flush is going to fan out to have parallel uploads

flowchart LR
    write --> buffer_batch;
    write --> buffer_batch;
    write --> buffer_batch;
    buffer_batch --> flush;
    flush --> p1;
    flush --> p2;
    flush --> p3;
    flush --> p4;
    p1 --> close;
    p2 --> close;
    p3 --> close;
    p4 --> close;
Loading

Copy link
Collaborator Author

@ion-elgreco ion-elgreco Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, in its current form we indeed still buffer longer until the flush, but the buffering and writing has more parallelism now, so it should be faster.

We could do two things here btw:

  • release this as is for people to experiment with (maybe after 1.0)
  • iterate on this to also flush after the min - part size is available, also create some benchmark tests


while buffer.len() > part_size {
let part = buffer.split_to(part_size);
let upload_future = multi_part_upload.put_part(part.into());

// wait until one spot frees up before spawning new task
if tasks.len() >= max_concurrent_tasks {
tasks.join_next().await;
}
tasks.spawn(upload_future);
}

if !buffer.is_empty() {
let upload_future = multi_part_upload.put_part(buffer.into());
tasks.spawn(upload_future);
}

// wait for all remaining tasks to complete
while let Some(result) = tasks.join_next().await {
result.map_err(|e| DeltaTableError::generic(e.to_string()))??;
}

multi_part_upload.complete().await?;

self.files_written.push(
create_add(
Expand Down Expand Up @@ -412,9 +464,9 @@ impl PartitionWriter {
let max_offset = batch.num_rows();
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.write_batch(&batch.slice(offset, length))?;
self.write_batch(&batch.slice(offset, length)).await?;
// flush currently buffered data to disk once we meet or exceed the target file size.
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size();
if estimated_size >= self.config.target_file_size {
debug!(
"Writing file with estimated size {:?} to disk.",
Expand Down
Loading