Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use arrow IPC Stream format for spill files #14868

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub fn compute_record_batch_statistics(
}
}

/// Write in Arrow IPC format.
/// Write in Arrow IPC File format.
pub struct IPCWriter {
/// Path
pub path: PathBuf,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use arrow::compute::{
};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::StreamReader;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
JoinType, Result,
Expand Down Expand Up @@ -1394,7 +1394,7 @@ impl SortMergeJoinStream {

if let Some(batch) = buffered_batch.batch {
spill_record_batches(
vec![batch],
&[batch],
spill_file.path().into(),
Arc::clone(&self.buffered_schema),
)?;
Expand Down Expand Up @@ -2270,7 +2270,7 @@ fn fetch_right_columns_from_batch_by_idxs(
Vec::with_capacity(buffered_indices.len());

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

for batch in reader {
batch?.columns().iter().for_each(|column| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl ExternalSorter {
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let (spilled_rows, spilled_bytes) = spill_record_batches(
batches,
&batches,
spill_file.path().into(),
Arc::clone(&self.schema),
)?;
Expand Down
143 changes: 129 additions & 14 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::path::{Path, PathBuf};
use std::ptr::NonNull;

use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;
Expand All @@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;

use crate::common::IPCWriter;
use crate::stream::RecordBatchReceiverStream;

/// Read spilled batches from the disk
Expand All @@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream(
///
/// Returns total number of the rows spilled to disk.
pub(crate) fn spill_record_batches(
batches: Vec<RecordBatch>,
batches: &[RecordBatch],
path: PathBuf,
schema: SchemaRef,
) -> Result<(usize, usize)> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
writer.write(batch)?;
}
writer.finish()?;
debug!(
Expand All @@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches(

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
Expand All @@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size(
) -> Result<()> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;

while offset < total_rows {
let length = std::cmp::min(total_rows - offset, batch_size_rows);
Expand Down Expand Up @@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size(
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
/// ^ ^ ^ ^
/// | | | |
/// col1->{ } | |
/// col1->{ } | |
/// col2--------->{ }
///
/// In the above case, `get_record_batch_memory_size` will return the size of
Expand Down Expand Up @@ -179,17 +178,64 @@ fn count_array_data_memory_size(
}
}

/// Write in Arrow IPC Stream format to a file.
///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
struct IPCStreamWriter {
/// Inner writer
pub writer: StreamWriter<File>,
/// Batches written
pub num_batches: usize,
/// Rows written
pub num_rows: usize,
/// Bytes written
pub num_bytes: usize,
}

impl IPCStreamWriter {
/// Create new writer
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
writer: StreamWriter::try_new(file, schema)?,
})
}

/// Write one single batch
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes;
Ok(())
}

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
use crate::test::build_table_i32;
use arrow::array::{Float64Array, Int32Array, ListArray};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
use itertools::Itertools;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
Expand All @@ -214,18 +260,85 @@ mod tests {
let schema = batch1.schema();
let num_rows = batch1.num_rows() + batch2.num_rows();
let (spilled_rows, _) = spill_record_batches(
vec![batch1, batch2],
&[batch1, batch2],
spill_file.path().into(),
Arc::clone(&schema),
)?;
assert_eq!(spilled_rows, num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 2);

Ok(())
}

#[test]
fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed that this test fails on main with the error in #4658

// See https://github.com/apache/datafusion/issues/4658

let batch1 = build_table_i32(
("a2", &vec![0, 1, 2]),
("b2", &vec![3, 4, 5]),
("c2", &vec![4, 5, 6]),
);

let batch2 = build_table_i32(
("a2", &vec![10, 11, 12]),
("b2", &vec![13, 14, 15]),
("c2", &vec![14, 15, 16]),
);

// Dictionary encode the arrays
let dict_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
let dict_schema = Arc::new(Schema::new(vec![
Field::new("a2", dict_type.clone(), true),
Field::new("b2", dict_type.clone(), true),
Field::new("c2", dict_type.clone(), true),
]));

let batch1 = RecordBatch::try_new(
dict_schema.clone(),
batch1
.columns()
.iter()
.map(|array| cast(array, &dict_type))
.collect::<Result<_, _>>()?,
)?;

let batch2 = RecordBatch::try_new(
dict_schema.clone(),
batch2
.columns()
.iter()
.map(|array| cast(array, &dict_type))
.collect::<Result<_, _>>()?,
)?;

let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let num_rows = batch1.num_rows() + batch2.num_rows();
let (spilled_rows, _) = spill_record_batches(
&[batch1, batch2],
spill_file.path().into(),
dict_schema.clone(),
)?;
assert_eq!(spilled_rows, num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.schema(), dict_schema);

let batches = reader.collect_vec();
assert!(batches.len() == 2);

Ok(())
}

Expand All @@ -249,11 +362,13 @@ mod tests {
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 4);

Ok(())
}

Expand Down
Loading