Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Feb 16, 2025
1 parent b652cee commit babe5cd
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 22 deletions.
210 changes: 193 additions & 17 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;

use arrow::{
array::{ArrayRef, Int32Array},
array::{as_string_array, ArrayRef, Int32Array, StringArray},
compute::SortOptions,
record_batch::RecordBatch,
};
Expand All @@ -29,6 +29,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::cast::as_int32_array;
use datafusion_execution::memory_pool::GreedyMemoryPool;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand All @@ -42,12 +43,17 @@ const KB: usize = 1 << 10;
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_10k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] {
SortTest::new()
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

Expand All @@ -57,29 +63,119 @@ async fn test_sort_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
{
SortTest::new()
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_strings_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_utf8_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(|b| {
let array = b.column(0);
as_string_array(array)
.iter()
.map(|s| s.unwrap().to_string())
})
.collect::<Vec<String>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(|b| {
let array = b.column(0);
as_string_array(array)
.iter()
.map(|s| s.unwrap().to_string())
})
.collect::<Vec<String>>();
assert_eq!(input, actual);
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_multi_columns_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_utf8_batches(batch_size)
.with_sort_columns(vec!["x", "y"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> {
let mut rows: Vec<_> = Vec::new();
let i32_array = as_int32_array(b.column(0)).unwrap();
let string_array = as_string_array(b.column(1));
for i in 0..b.num_rows() {
let str = string_array.value(i).to_string();
let i32 = i32_array.value(i);
rows.push((i32, str));
}
rows
}
let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
assert_eq!(input, actual);
}
}

#[tokio::test]
async fn test_sort_unlimited_mem() {
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] {
SortTest::new()
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(usize::MAX)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

#[derive(Debug, Default)]
struct SortTest {
input: Vec<Vec<RecordBatch>>,
/// The names of the columns to sort by
sort_columns: Vec<String>,
/// GreedyMemoryPool size, if specified
pool_size: Option<usize>,
/// If true, expect the sort to spill
Expand All @@ -91,12 +187,29 @@ impl SortTest {
Default::default()
}

fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self {
self.sort_columns = sort_columns.iter().map(|s| s.to_string()).collect();
self
}

/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
self
}

/// Create batches of utf8 values of rows
fn with_utf8_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_utf8_batches(rows)];
self
}

/// Create batches of int32 and utf8 values of rows
fn with_int32_utf8_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_utf8_batches(rows)];
self
}

/// specify that this test should use a memory pool of the specified size
fn with_pool_size(mut self, pool_size: usize) -> Self {
self.pool_size = Some(pool_size);
Expand All @@ -110,7 +223,7 @@ impl SortTest {

/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
async fn run(&self) -> (Vec<Vec<RecordBatch>>, Vec<RecordBatch>) {
let input = self.input.clone();
let first_batch = input
.iter()
Expand All @@ -119,16 +232,21 @@ impl SortTest {
.expect("at least one batch");
let schema = first_batch.schema();

let sort = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("x", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}]);
let sort_ordering = LexOrdering::new(
self.sort_columns
.iter()
.map(|c| PhysicalSortExpr {
expr: col(c, &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
})
.collect(),
);

let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap();
let sort = Arc::new(SortExec::new(sort, exec));
let sort = Arc::new(SortExec::new(sort_ordering, exec));

let session_config = SessionConfig::new();
let session_ctx = if let Some(pool_size) = self.pool_size {
Expand All @@ -153,9 +271,6 @@ impl SortTest {
let task_ctx = session_ctx.task_ctx();
let collected = collect(sort.clone(), task_ctx).await.unwrap();

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);

if self.should_spill {
assert_ne!(
sort.metrics().unwrap().spill_count().unwrap(),
Expand All @@ -175,7 +290,8 @@ impl SortTest {
0,
"The sort should have returned all memory used back to the memory pool"
);
assert_eq!(expected, actual, "failure in @ pool_size {self:?}");

(input, collected)
}
}

Expand Down Expand Up @@ -203,3 +319,63 @@ fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
}
batches
}

/// Return randomly sized record batches in a field named 'x' of type `Utf8`
/// with randomized content
fn make_staggered_utf8_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
let max_batch = 1024;

let mut batches = vec![];
let mut remaining = len;
while remaining != 0 {
let to_read = rng.gen_range(0..=remaining.min(max_batch));
remaining -= to_read;

batches.push(
RecordBatch::try_from_iter(vec![(
"x",
Arc::new(StringArray::from_iter_values(
(0..to_read).map(|_| format!("test_string_{}", rng.gen::<u32>())),
)) as ArrayRef,
)])
.unwrap(),
)
}
batches
}

/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content and a field named 'y' of type `Utf8`
/// with randomized content
fn make_staggered_i32_utf8_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
let max_batch = 1024;

let mut batches = vec![];
let mut remaining = len;
while remaining != 0 {
let to_read = rng.gen_range(0..=remaining.min(max_batch));
remaining -= to_read;

batches.push(
RecordBatch::try_from_iter(vec![
(
"x",
Arc::new(Int32Array::from_iter_values(
(0..to_read).map(|_| rng.gen()),
)) as ArrayRef,
),
(
"y",
Arc::new(StringArray::from_iter_values(
(0..to_read).map(|_| format!("test_string_{}", rng.gen::<u32>())),
)) as ArrayRef,
),
])
.unwrap(),
)
}

batches
}
16 changes: 11 additions & 5 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ struct ExternalSorter {
// ========================================================================
/// Potentially unsorted in memory buffer
in_mem_batches: Vec<RecordBatch>,
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,

/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
Expand Down Expand Up @@ -277,6 +279,7 @@ impl ExternalSorter {
Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: false,
spills: vec![],
expr: expr.into(),
metrics,
Expand Down Expand Up @@ -309,6 +312,7 @@ impl ExternalSorter {
}

self.in_mem_batches.push(input);
self.in_mem_batches_sorted = false;
Ok(())
}

Expand Down Expand Up @@ -423,7 +427,8 @@ impl ExternalSorter {
async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation.
// allocation. At the end of this function, memory will be
// reserved again for the next spill.
self.merge_reservation.free();

let before = self.reservation.size();
Expand Down Expand Up @@ -458,6 +463,7 @@ impl ExternalSorter {
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}
Some(writer) => {
Expand Down Expand Up @@ -662,10 +668,10 @@ impl ExternalSorter {
/// Estimate how much memory is needed to sort a `RecordBatch`.
///
/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
/// creating sorted copies of sorted columns in record batches, the sorted copies could be
/// in either row format or array format. Please refer to cursor.rs and stream.rs for more
/// details. No matter what format the sorted copies are, they will use more memory than
/// the original record batch.
/// creating sorted copies of sorted columns in record batches for speeding up comparison
/// in sorting and merging. The sorted copies are in either row format or array format.
/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
/// sorted copies are, they will use more memory than the original record batch.
fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
// 2x may not be enough for some cases, but it's a good start.
// If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
Expand Down

0 comments on commit babe5cd

Please sign in to comment.