Skip to content

Commit

Permalink
use arrow IPC Stream format for spill files
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhewitt committed Feb 25, 2025
1 parent aadb0b6 commit 7ac9fdf
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 46 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub fn compute_record_batch_statistics(
}
}

/// Write in Arrow IPC format.
/// Write in Arrow IPC File format.
pub struct IPCWriter {
/// Path
pub path: PathBuf,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use arrow::compute::{
};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::StreamReader;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
JoinType, Result,
Expand Down Expand Up @@ -1394,7 +1394,7 @@ impl SortMergeJoinStream {

if let Some(batch) = buffered_batch.batch {
spill_record_batches(
vec![batch],
&[batch],
spill_file.path().into(),
Arc::clone(&self.buffered_schema),
)?;
Expand Down Expand Up @@ -2270,7 +2270,7 @@ fn fetch_right_columns_from_batch_by_idxs(
Vec::with_capacity(buffered_indices.len());

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

for batch in reader {
batch?.columns().iter().for_each(|column| {
Expand Down
56 changes: 28 additions & 28 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::common::{spawn_buffered, IPCWriter};
use crate::common::spawn_buffered;
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
Expand All @@ -35,6 +35,7 @@ use crate::projection::{make_with_child, update_expr, ProjectionExec};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
IPCStreamWriter,
};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
Expand Down Expand Up @@ -402,7 +403,7 @@ impl ExternalSorter {
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let (spilled_rows, spilled_bytes) = spill_record_batches(
batches,
&batches,
spill_file.path().into(),
Arc::clone(&self.schema),
)?;
Expand Down Expand Up @@ -439,36 +440,35 @@ impl ExternalSorter {
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
let mut spill_writer: Option<IPCStreamWriter> = None;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
match &mut spill_writer {
None => {
let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Directly write in_mem_batches as well as all the remaining batches in
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
// be handled by the `Some(writer)` matching arm.
let spill_file =
self.runtime.disk_manager.create_tmp_file("Sorting")?;
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?;
// Flush everything in memory to the spill file
for batch in self.in_mem_batches.drain(..) {
writer.write(&batch)?;
}
// as well as the newly sorted batch
writer.write(&batch)?;
spill_writer = Some(writer);
self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}
Some(writer) => {

// If we've started spilling, just keep spilling
if let Some(spill_writer) = &mut spill_writer {
spill_writer.write(&batch)?;
continue;
}

let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Directly write in_mem_batches as well as all the remaining batches in
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
// be handled by the `Some(writer)` matching arm.
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let mut writer = IPCStreamWriter::new(spill_file.path(), &self.schema)?;
// Flush everything in memory to the spill file
for batch in self.in_mem_batches.drain(..) {
writer.write(&batch)?;
}
// as well as the newly sorted batch
writer.write(&batch)?;
spill_writer = Some(writer);
self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}

Expand Down
77 changes: 63 additions & 14 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::path::{Path, PathBuf};
use std::ptr::NonNull;

use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;
Expand All @@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;

use crate::common::IPCWriter;
use crate::stream::RecordBatchReceiverStream;

/// Read spilled batches from the disk
Expand All @@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream(
///
/// Returns total number of the rows spilled to disk.
pub(crate) fn spill_record_batches(
batches: Vec<RecordBatch>,
batches: &[RecordBatch],
path: PathBuf,
schema: SchemaRef,
) -> Result<(usize, usize)> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
writer.write(batch)?;
}
writer.finish()?;
debug!(
Expand All @@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches(

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
Expand All @@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size(
) -> Result<()> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;

while offset < total_rows {
let length = std::cmp::min(total_rows - offset, batch_size_rows);
Expand Down Expand Up @@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size(
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
/// ^ ^ ^ ^
/// | | | |
/// col1->{ } | |
/// col1->{ } | |
/// col2--------->{ }
///
/// In the above case, `get_record_batch_memory_size` will return the size of
Expand Down Expand Up @@ -179,6 +178,51 @@ fn count_array_data_memory_size(
}
}

/// Write in Arrow IPC Stream format to a file.
///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
pub(crate) struct IPCStreamWriter {
/// Inner writer
pub writer: StreamWriter<File>,
/// Batches written
pub num_batches: usize,
/// Rows written
pub num_rows: usize,
/// Bytes written
pub num_bytes: usize,
}

impl IPCStreamWriter {
/// Create new writer
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
writer: StreamWriter::try_new(file, schema)?,
})
}

/// Write one single batch
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes;
Ok(())
}

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -190,6 +234,7 @@ mod tests {
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
use itertools::Itertools;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
Expand All @@ -214,18 +259,20 @@ mod tests {
let schema = batch1.schema();
let num_rows = batch1.num_rows() + batch2.num_rows();
let (spilled_rows, _) = spill_record_batches(
vec![batch1, batch2],
&[batch1, batch2],
spill_file.path().into(),
Arc::clone(&schema),
)?;
assert_eq!(spilled_rows, num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 2);

Ok(())
}

Expand All @@ -249,11 +296,13 @@ mod tests {
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 4);

Ok(())
}

Expand Down

0 comments on commit 7ac9fdf

Please sign in to comment.