Skip to content

Commit

Permalink
refactor: async writer + multipart
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 23, 2025
1 parent a5eb11a commit 2353a21
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 18 deletions.
82 changes: 82 additions & 0 deletions crates/core/src/operations/write/async_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Async Sharable Buffer for async writer
//!
use std::sync::Arc;

use futures::TryFuture;

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use tokio::sync::RwLock as TokioRwLock;

/// An in-memory buffer that allows for shared ownership and interior mutability.
/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance
/// allows multiple owners to have access to the same underlying buffer.
#[derive(Debug, Default, Clone)]
pub struct AsyncShareableBuffer {
buffer: Arc<TokioRwLock<Vec<u8>>>,
}

impl AsyncShareableBuffer {
/// Consumes this instance and returns the underlying buffer.
/// Returns `None` if there are other references to the instance.
pub async fn into_inner(self) -> Option<Vec<u8>> {
Arc::try_unwrap(self.buffer)
.ok()
.map(|lock| lock.into_inner())
}

/// Returns a clone of the underlying buffer as a `Vec`.
pub async fn to_vec(&self) -> Vec<u8> {
let inner = self.buffer.read().await;
inner.clone()
}

/// Returns the number of bytes in the underlying buffer.
pub async fn len(&self) -> usize {
let inner = self.buffer.read().await;
inner.len()
}

/// Returns `true` if the underlying buffer is empty.
pub async fn is_empty(&self) -> bool {
let inner = self.buffer.read().await;
inner.is_empty()
}

/// Creates a new instance with the buffer initialized from the provided bytes.
pub fn from_bytes(bytes: &[u8]) -> Self {
Self {
buffer: Arc::new(TokioRwLock::new(bytes.to_vec())),
}
}
}

impl AsyncWrite for AsyncShareableBuffer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let this = self.clone();
let buf = buf.to_vec();

let fut = async move {
let mut buffer = this.buffer.write().await;
buffer.extend_from_slice(&buf);
Ok(buf.len())
};

tokio::pin!(fut);
fut.try_poll(cx)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
1 change: 1 addition & 0 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! let table = ops.write(vec![batch]).await?;
//! ````
pub(crate) mod async_utils;
pub mod configs;
pub(crate) mod execution;
pub(crate) mod generated_columns;
Expand Down
88 changes: 70 additions & 18 deletions crates/core/src/operations/write/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Abstractions and implementations for writing data to delta tables
use std::collections::HashMap;
use std::sync::OnceLock;

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
Expand All @@ -9,11 +10,13 @@ use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::task::JoinSet;
use tracing::debug;

use super::async_utils::AsyncShareableBuffer;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, PartitionsExt};
Expand All @@ -22,12 +25,35 @@ use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
ShareableBuffer,
};

// TODO databricks often suggests a file size of 100mb, should we set this default?
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;

fn upload_part_size() -> usize {
static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
*UPLOAD_SIZE.get_or_init(|| {
std::env::var("DELTARS_UPLOAD_PART_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.map(|size| {
if size < DEFAULT_UPLOAD_PART_SIZE {
// Minimum part size in GCS and S3
debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
DEFAULT_UPLOAD_PART_SIZE
} else if size > 1024 * 1024 * 1024 * 5 {
// Maximum part size in GCS and S3
debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
1024 * 1024 * 1024 * 5
} else {
size
}
})
.unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
})
}

#[derive(thiserror::Error, Debug)]
enum WriteError {
Expand Down Expand Up @@ -122,7 +148,6 @@ impl WriterConfig {
}
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
/// An object store pointing at Delta table root
Expand Down Expand Up @@ -286,13 +311,12 @@ impl PartitionWriterConfig {
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
#[derive(Debug)]
pub struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
buffer: AsyncShareableBuffer,
arrow_writer: AsyncArrowWriter<AsyncShareableBuffer>,
part_counter: usize,
files_written: Vec<Add>,
/// Num index cols to collect stats for
Expand All @@ -309,8 +333,8 @@ impl PartitionWriter {
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
) -> DeltaResult<Self> {
let buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
let buffer = AsyncShareableBuffer::default();
let arrow_writer = AsyncArrowWriter::try_new(
buffer.clone(),
config.file_schema.clone(),
Some(config.writer_properties.clone()),
Expand Down Expand Up @@ -340,9 +364,11 @@ impl PartitionWriter {
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
fn reset_writer(
&mut self,
) -> DeltaResult<(AsyncArrowWriter<AsyncShareableBuffer>, AsyncShareableBuffer)> {
let new_buffer = AsyncShareableBuffer::default();
let arrow_writer = AsyncArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Some(self.config.writer_properties.clone()),
Expand All @@ -353,20 +379,20 @@ impl PartitionWriter {
))
}

fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch)?)
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch).await?)
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
let metadata = writer.close().await?;
// don't write empty file
if metadata.num_rows == 0 {
return Ok(());
}

let buffer = match buffer.into_inner() {
let mut buffer = match buffer.into_inner().await {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), // Nothing to write
};
Expand All @@ -376,7 +402,33 @@ impl PartitionWriter {
let file_size = buffer.len() as i64;

// write file to object store
self.object_store.put(&path, buffer.into()).await?;
let mut multi_part_upload = self.object_store.put_multipart(&path).await?;
let part_size = upload_part_size();
let mut tasks = JoinSet::new();
let max_concurrent_tasks = 10; // TODO: make configurable

while buffer.len() > part_size {
let part = buffer.split_to(part_size);
let upload_future = multi_part_upload.put_part(part.into());

// wait until one spot frees up before spawning new task
if tasks.len() >= max_concurrent_tasks {
tasks.join_next().await;
}
tasks.spawn(upload_future);
}

if !buffer.is_empty() {
let upload_future = multi_part_upload.put_part(buffer.into());
tasks.spawn(upload_future);
}

// wait for all remaining tasks to complete
while let Some(result) = tasks.join_next().await {
result.map_err(|e| DeltaTableError::generic(e.to_string()))??;
}

multi_part_upload.complete().await?;

self.files_written.push(
create_add(
Expand Down Expand Up @@ -412,9 +464,9 @@ impl PartitionWriter {
let max_offset = batch.num_rows();
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.write_batch(&batch.slice(offset, length))?;
self.write_batch(&batch.slice(offset, length)).await?;
// flush currently buffered data to disk once we meet or exceed the target file size.
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size();
if estimated_size >= self.config.target_file_size {
debug!(
"Writing file with estimated size {:?} to disk.",
Expand Down

0 comments on commit 2353a21

Please sign in to comment.