Skip to content

feat: add multi level merge sort that will always fit in memory #15700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

rluvaton
Copy link
Contributor

@rluvaton rluvaton commented Apr 13, 2025

Which issue does this PR close?

Rationale for this change

We need merge sort that does not fail with out of memory

What changes are included in this PR?

Implemented multi level merge sort on top of SortPreservingMergeStream that spill intermediate result when not enough memory.

How does it work:

When using the MultiLevelMerge you provide in memory streams and spill files,
each spill file contain the memory size of the record batch with the largest memory consumption.

Why is this important?

SortPreservingMergeStream uses BatchBuilder which grow and shrink memory based on the record batches that it get. however if there is not enough memory it will just fail.

this solution will reserve beforehand for each spill file the worst case scenerio for the record batch size so there will be no way that there is not enough memory mid sorting.

it will also try to reduce buffer size and number of streams to the minimum when there is not enough memory and will only fail if there is not enough memory for holding 2 record batches with no buffering in the stream

It can also be easily adjusted to allow for predefined maximum memory to merge stream

Are these changes tested?

yes added fuzz test for aggregate and sort

Are there any user-facing changes?

not really


Related to #15610

@github-actions github-actions bot added the core Core DataFusion crate label Apr 13, 2025


#[tokio::test]
async fn test_low_cardinality() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails on main on OOM


for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;

*max_record_batch_size =
(*max_record_batch_size).max(batch.get_actually_used_size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not realistic to correctly know a batch's size after a roundtrip of spilling and reading back, with this get_actually_used_size() implementation. The actual implementation might give us some surprise. The implementation can get even more complex in the future, for example we might implement extra encodings for #14078, and the memory size of a batch after reading back can be harder to estimate.

Copy link
Contributor Author

@rluvaton rluvaton Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the actual array content before spill and after spill is different this function will always return the correct result regardless of the spill file format as we calculate the actual array content size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be some type of arrays with complex internal buffer management, a simple example is:
Before spilling an StringView array has 10MB actual content, backed by 3 * 4MB buffer.
After spilling and reading back, the reader implementation decided to use 1 * 16MB buffer instead.
Different allocation policies caused different fragmentation status, and physical memory consumed varies.

Here are some real bugs found recently due to similar reasons (this explains why I'm worried about inconsistent memory size for logically equivalent batches):
#14644
#14823
#13377
Note they're only caused by primitive and string arrays, for more complex types like struct, array, or other nested types, I think it's even more likely to see such inconsistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to reproduce that so I can better answer, how do you create that string view array so it will cause what you said?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after looking at the code I came to the conclusion that this is still the closest there is to accurately estimating memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not estimate, even if it's correct 99% of the time, IMO it's impossible to make sure it's always accurate for nested type's reader implementation. If the estimate is way off for edge cases, the bug would be hard to investigate.
If we want to follow this optimistic approach, the only required memory accounting I think is during buffering batches inside SortExec, and all the remaining memory-tracking code can be deleted to make the implementation much more simpler, the potential problem is unexpected behavior for non-primitive types (e.g. dictionary array's row format size can explode)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I added tests for every type to make sure the memory accounting is correct would you approve?

@rluvaton rluvaton marked this pull request as ready for review April 15, 2025 22:12
@alamb
Copy link
Contributor

alamb commented May 24, 2025

@2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged?

Specifically if it needs more tests perhaps you can help identify which are needed

@ding-young
Copy link
Contributor

@alamb Sure! I may not be able to provide a detailed review right away, but I can definitely help by running the tests added in the PR locally and looking into memory accounting for the nested type that have been mentioned.

@2010YOUY01
Copy link
Contributor

@2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged?

Specifically if it needs more tests perhaps you can help identify which are needed

I have some concerns about this PR's design direction (see more in #15700 (comment)), and I don't think it can be addressed by more extensive tests.
By the way, this PR serves as an alternative to #15610. It's ready for another review, except that it needs to merge main.

@rluvaton
Copy link
Contributor Author

@2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged?

Specifically if it needs more tests perhaps you can help identify which are needed

I have some concerns about this PR's design direction (see more in #15700 (comment)), and I don't think it can be addressed by more extensive tests.

Why is that? You raised some concerns about miscalculating the size of the record batch, adding tests will make sure we are calculating correctly

@adriangb
Copy link
Contributor

adriangb commented Jul 1, 2025

In the interest of this valuable work not being lost, is there any way that #15700 (comment) could be addressed by a method that's not more tests? Could we calculate the actual batch sizes every time we load into memory? Even if possible that opens up questions of what to do if we load a batch and now exceed our memory budget, but maybe it's a path forward?

@ding-young
Copy link
Contributor

Hi @adriangb, thanks for raising this point. I'm currently reviewing both this PR and the other cascading merge sort PR (#15610). I'm not taking sides between the two approaches, but I agree that accurately estimating memory consumption is tricky considering issues discussed above and the fact that now compression is supported in spill files. We may need to think more about whether we can special-case scenarios where the memory size changes after spilling and reloading, or perhaps add some kind of backup logic to handle such situations more gracefully.

@ding-young
Copy link
Contributor

I've rebased this branch on the latest main and tested whether estimated size changes after we load RecordBatch which was compressed with lz4_frame into memory. The result of get_actually_used_size() was identical before and after (arrow-ipc StreamReader will return decoded array). Of course, since buffer allocations and copies happen internally during decoding, actual system memory usage (which DataFusion doesn't track) may temporarily be higher. Anyway, I've only tested for primitive type array + compression so I'll run a few more tests and try to see if I can reproduce any of the problematic cases discussed above.

Hi @adriangb, thanks for raising this point. I'm currently reviewing both this PR and the other cascading merge sort PR (#15610). I'm not taking sides between the two approaches, but I agree that accurately estimating memory consumption is tricky considering issues discussed above and the fact that now compression is supported in spill files. We may need to think more about whether we can special-case scenarios where the memory size changes after spilling and reloading, or perhaps add some kind of backup logic to handle such situations more gracefully.

@2010YOUY01
Copy link
Contributor

#15700 (comment)

I have a idea to fix this concern: adding a max merge degree configuration, if either
a. SPM's estimated memory exceed budget
b. configured max merge degree has reached
do a re-spill.

This approach I think has two advantages:

  1. If batch size bloat happens after spill and read back roundtrip (see feat: add multi level merge sort that will always fit in memory #15700 (comment)), if there is a hard merge degree limit to override the estimation, query can still finish.
  2. Also helpful to tune for speed: even we have enough memory to perform a very wide merge, limiting it to a smaller merge is still likely to run faster.

I (or possibly @ding-young) can handle this patch in a follow-up PR. I think we can move forward with this one—I’ll review it in the next few days.

@rluvaton
Copy link
Contributor Author

rluvaton commented Jul 4, 2025

So should I fix this PR conflicts? It seems like this pr has a chance to be merged

@ding-young
Copy link
Contributor

@rluvaton If you’d like, I can send a PR to your (fork's) branch that resolve merge conflicts since I already have one. Anyway there were only minor diffs to handle when I rebased your branch with main.

@rluvaton
Copy link
Contributor Author

rluvaton commented Jul 4, 2025

I would appreciate it, it would greatly help me

@ding-young
Copy link
Contributor

I would appreciate it, it would greatly help me

@rluvaton I opened a pr on your fork. Would you take a look when you have some time?

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jul 7, 2025
@rluvaton
Copy link
Contributor Author

rluvaton commented Jul 7, 2025

I would appreciate it, it would greatly help me

@rluvaton I opened a pr on your fork. Would you take a look when you have some time?

I really appriciate the PR but the changes are too large for me to review so I just did it myself:

+95,274 −36,420

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this work. I left some thoughts, looking forward to your feedbacks.

Ok(())
}

struct RunSortTestWithLimitedMemoryArgs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This utilities seems to be copied and pasted from aggregate_fuzz.rs, how about refactor it into a common module?

@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> Vec<RecordBatch> {

batches
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add a top-level comment for this group of tests, like 'Testing spilling sort queries, with another memory-consuming neighbor (the mock operator with MemoryBehavior)'.
The same for aggregate fuzz tests (perhaps we can structure them into the same new file?)


/// Execution plan that return the stream on the call to `execute`. further calls to `execute` will
/// return an error
pub struct StreamExec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think OneShotExec or OnceExec is more descriptive.

self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
.with_spill_manager(self.spill_state.spill_manager.clone())
.with_sorted_spill_files(std::mem::take(&mut self.spill_state.spills))
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to spill all in-memory batches (in streams) to disk, before this final merging step. Also, let the multi pass merge operator also only handle spill files, and don't have to handle in-mem batches and spills at the same time.

This is just a simplification for now, we can do a optimization to avoid this re-spill step in the future.

The issue is, without special handling, it's possible that in-mem batches will take most of the available memory budget, and leave only a very small memory part for multi-pass spilling to continue. This can cause slow downs or even prevent some cases to finish.

We're already doing this in sort executor, see:

// Sort `in_mem_batches` and spill it first. If there are many
// `in_mem_batches` and the memory limit is almost reached, merging
// them with the spilled files at the same time might cause OOM.
if !self.in_mem_batches.is_empty() {
self.sort_and_spill_in_mem_batches().await?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But let's let for example this:
I have 1 spill file and 1 in memory stream, now I will spill even though I don't need to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think this is likely a small slowdown.
Comparing to 90% of the memory is taken by in-memory streams, and we have to use the remaining 10% to re-spill lots of spills, this case can cause huge slowdown.

// (reserving memory for the biggest batch in each stream)
// This is a hack
.with_reservation(
MemoryConsumer::new("merge stream mock memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should free the reservation and let the SPM operator use the global memory pool, to validate it can continue with the memory limit.

for spill in &self.sorted_spill_files {
// For memory pools that are not shared this is good, for other this is not
// and there should be some upper limit to memory reservation so we won't starve the system
match reservation.try_grow(spill.max_record_batch_memory * buffer_size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPM can use another *2 memory for its internal intermediate data, see

/// Estimate how much memory is needed to sort a `RecordBatch`.
///
/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
/// creating sorted copies of sorted columns in record batches for speeding up comparison
/// in sorting and merging. The sorted copies are in either row format or array format.
/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
/// sorted copies are, they will use more memory than the original record batch.
fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
// 2x may not be enough for some cases, but it's a good start.
// If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
// to compensate for the extra memory needed.
get_record_batch_memory_size(batch) * 2
}

Here we might need an additional *2 for the estimation.

/// otherwise it will return an error
fn get_sorted_spill_files_to_merge(
&mut self,
buffer_size: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think size is ambiguous because it sounds like 'memory size in bytes', perhaps buffer_len?

@@ -220,12 +220,12 @@ struct ExternalSorter {

/// During external sorting, in-memory intermediate data will be appended to
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
in_progress_spill_file: Option<InProgressSpillFile>,
in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the comment for this newly added usize.

let Some(expressions) = expressions else {
return internal_err!("Sort expressions cannot be empty for streaming merge");
};

if !sorted_spill_files.is_empty() {
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code flow is

Sort/Aggregate
-> StreamingMerge
-> MultiLevelMerge (with potential internal re-spills to ensure the final merge can proceed under memory limit)
-> StreamingMerge

Then the first StreamingMerge -> MultiLevelMerge indirection implemented here seems redundant.

How about let sort/aggregate directly use MultiLevelMergeBuilder instead?

use arrow::downcast_primitive_array;
use arrow_schema::DataType;

/// TODO - NEED TO MOVE THIS TO ARROW
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use in-memory size for now, both for simplicity and to stay conservative in our memory estimation?
Once it's implemented on the Arrow side, we can consider switching to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A complete solution for stable and safe sort with spill
5 participants