Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: External sort failing on StringView due to shared buffers #14823

Merged
merged 6 commits into from
Feb 26, 2025

Conversation

2010YOUY01
Copy link
Contributor

Which issue does this PR close?

Follow up to #14644, this PR fixes an unsolved failing case for external sort. It's found by #12136 (comment).

Rationale for this change

Recap for major steps for external sort (there are detailed doc in datafusion/physical-plan/src/sorts/sort.rs)

  1. Inside each partition, input batches will be buffered until it reaches the memory limit.
  2. When OOM is triggered, all buffered batches will be sorted one by one, and finally merged into one large sorted run (though physically it's chunked into many small batches)
  3. Spill the sorted run to disk. After reading all inputs, read back spilled batches and merge into the final result. Since only one batch is needed for each spilled sorted run to generate the final result, the memory overhead is reduced in this step.

The problem is: if step 2 is done on batches with StringViewArray columns, the sorted array will only reorder the views (string prefix plus pointer into range in the payload buffer, if this element is long), and the underlying buffers won't be moved.
When reading back spilled batches from a single sorted run, it's required to read one batch by one batch, otherwise memory pressure won't be reduced. As a result, the spill writer has to write all referenced buffers for each batch, and many buffers will be written multiple times. The size of the spilled batch would explode, and some memory-limited sort query can fail due to this reason.

Reproducer

Under benchmarks/, run

cargo run --profile release-nonlto --bin dfbench -- sort-tpch  -p '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf10' -o '/tmp/main.json' -q 10 --iterations 1 --partitions 4 --memory-limit 5000M --debug

Main branch result (20GB spilled)

SortExec: expr=[l_orderkey@0 ASC NULLS LAST, l_suppkey@1 ASC NULLS LAST, l_linenumber@2 ASC NULLS LAST, l_comment@3 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=59986052, elapsed_compute=12.965460651s, spill_count=24, spilled_bytes=19472334146, spilled_rows=51202072]

Q10 iteration 0 took 12839.6 ms and returned 59986052 rows
Q10 avg time: 12839.60 ms

PR result (12GB spilled)

SortExec: expr=[l_orderkey@0 ASC NULLS LAST, l_suppkey@1 ASC NULLS LAST, l_linenumber@2 ASC NULLS LAST, l_comment@3 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=59986052, elapsed_compute=15.363745389s, spill_count=48, spilled_bytes=12600523712, spilled_rows=55216152]

Q10 iteration 0 took 9424.3 ms and returned 59986052 rows
Q10 avg time: 9424.33 ms

What changes are included in this PR?

Before spilling, StringViewArray columns must be permutated the way described above, so this PR reorganize the array to sequential order before spilling.

Are these changes tested?

Added one unit regression test. This test fails without the change.

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Feb 22, 2025
writer.write(&batch)?;
spill_writer = Some(writer);
self.in_mem_batches.push(batch);
self.spill().await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor is not related to the PR, I did this along the way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep the original logic:

  1. self.spill().await?;

  2. And then write the remaining batch to disk.

Because the self.in_mem_batches.push(batch) may cause OOM for memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in_mem_batches is a Vec<RecordBatch>, so it doesn't have any internal mechanism to update reservation. Also, the new batch is already in memory, so there is no difference to spill together or not. I added a comment for this.
Besides, I think manually keep the buffered batch in sync with reservation is quite tricky, it makes the implementation hard to reason and perhaps cause bugs, hopefully we can find a RAII way to improve it in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @2010YOUY01 for explain, got it, i was wrong, so we do not have internal mechanism to update reservation. If we have a RAII way to improve it in the future, this is a great idea, may be we can file a ticket for it!

in_mem_batches is a Vec<RecordBatch>, so it doesn't have any internal mechanism to update reservation. Also, the new batch is already in memory, so there is no difference to spill together or not. I added a comment for this. Besides, I think manually keep the buffered batch in sync with reservation is quite tricky, it makes the implementation hard to reason and perhaps cause bugs, hopefully we can find a RAII way to improve it in the future.

@@ -1425,7 +1478,7 @@ mod tests {
// Processing 840 KB of data using 400 KB of memory requires at least 2 spills
// It will spill roughly 18000 rows and 800 KBytes.
// We leave a little wiggle room for the actual numbers.
assert!((2..=10).contains(&spill_count));
assert!((12..=18).contains(&spill_count));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is caused by the above refactor, the old implementation forget to update the statistics, so we missed several counts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update the comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in 92ca3b0

if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will string_view_array.gc() affect the performance when it call many times?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, if i make sense right, it seems not too much affection to performance, because we only remain the used buffer data?

before gc:

/// sorted_batch1 -> buffer1
///                          -> buffer2
/// sorted_batch2 -> buffer1
///                           -> buffer2

after gc:

/// sorted_batch1 ->  new buffer (used data of buffer1, used data of buffer2)

/// sorted_batch2 -> new buffer (used data of buffer1, used data of buffer2)

Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, there is some inefficiency here, I filed apache/arrow-rs#7184. Once done on arrow side, we can remove the copies here to speed it up.
IMO it won't cause regression for datafusion, it's only done when the spill is triggered, and if we don't copy here it can cause larger inefficiency or fail some memory-limited sort queries.

@tustvold
Copy link
Contributor

FWIW I think this compaction logic probably makes sense as part of the IPCWriter, as opposed to a workaround here - apache/arrow-rs#7185

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thank you @2010YOUY01 and @zhuqi-lucas

I agree this would be better in the arrow ipc writer, so leaving a link to that ticket in the comments would be good I think

@@ -1425,7 +1478,7 @@ mod tests {
// Processing 840 KB of data using 400 KB of memory requires at least 2 spills
// It will spill roughly 18000 rows and 800 KBytes.
// We leave a little wiggle room for the actual numbers.
assert!((2..=10).contains(&spill_count));
assert!((12..=18).contains(&spill_count));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update the comments?

@2010YOUY01
Copy link
Contributor Author

Thank you all for the feedbacks. I have addressed the review comments (also added a small further simplification for the refactor)

@alamb
Copy link
Contributor

alamb commented Feb 26, 2025

🚀

@alamb alamb merged commit 99c811a into apache:main Feb 26, 2025
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants