Skip to content

Cascaded spill merge and re-spill #15610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

2010YOUY01
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Background for memory-limited sort execution

See figures in https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/sort.rs

Current limitation for reading back spill files

Let's say the memory buffer can only hold 10 batches, but there are 900 files spilled, the current implementation will try to merge 900 files at once, and fail the query.
However, this scenario is possible to continue if it can only merge 10 files at a time, re-spill, until there are only less than 10 spill files in total, and finally read them back and merge them as the output.

High-level approach of this PR

Added one configuration option for max spill merge degree (haven't implemented now, in POC it's a hard-coded const MAX_SPILL_MERGE_DEGREE for simplicity)
At the final stage of external sort, there are initial spill files to merge, perform multi-pass read-merge-respill operation, the number of merged spill file in the next pass is decided by the closet power of MAX_SPILL_MERGE_DEGREE

Example:
Initial spill files to merge: 900
max merge degree: 10

pass1: merge 900 files to 100 (closet power of 10) files
pass2: 100 -> 10
pass3: 10 -> final single output

Inside each pass the number of files to merge in each step is split as even as possible, while is always <= max merge degree, see details in the implementation.

What changes are included in this PR?

Updated the sort.rs for multi-pass reading spill. The entry point for the described logic is function merge_spilled_files_multi_pass()

Are these changes tested?

To be done, I think it's adding varying max_spill_merge_degree to the sort fuzzer.

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Apr 7, 2025
@rluvaton
Copy link
Contributor

rluvaton commented Apr 7, 2025

BTW, row_hash uses the sort preserving merge stream as well and has similar problem, I think this should be a solution outside the sort exec

}

/// Maximum number of spill files to merge in a single pass
const MAX_SPILL_MERGE_DEGREE: usize = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable based on the number of available tokio blocking tasks I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's only for POC.

@rluvaton
Copy link
Contributor

rluvaton commented Apr 7, 2025

Also, to have a fully working larger than memory sort, you need to spill in

.try_grow(get_record_batch_memory_size(&batch))?;

In case the memory reservation is failing

@2010YOUY01
Copy link
Contributor Author

This PR and #15608 both implemented multi-level merge for SortExec for different purposes:

This PR

  • This PR wants to let memory-limit sort queries be able to run even if the memory budget is very tight (i.e. num-spill-files * batch-size > memory limit)
  • Always re-spill for each merge step

#15608

  • Reduces merge degree for performance (reading spills will stall for a shorter amount of time)
  • Never re-spill

I think we should refine the existing PR to be:

  1. Prioritize stable execution of memory-limited queries over performance.
    • I think the optimizations mentioned below are somewhat complex. We should first resolve the remaining known correctness issues in external sort, strengthen the tests, and then proceed with later optimizations more confidently.
  2. Extensible for future performance optimization

To summarize, I think this PR needs to be restructured to make future optimizations easier to implement. I don’t have a solid idea yet, so I’ll keep thinking and also wait to hear more opinions.

@2010YOUY01
Copy link
Contributor Author

BTW, row_hash uses the sort preserving merge stream as well and has similar problem, I think this should be a solution outside the sort exec

I think the spilling-related problem in external aggregation is still larger-than-memory sort, the current aggregation implementation tries to re-implement the sort spilling logic which is already done in ExternalSorter. So the implementation is reusable by row_hash (with some modifications)

@2010YOUY01
Copy link
Contributor Author

Also, to have a fully working larger than memory sort, you need to spill in

.try_grow(get_record_batch_memory_size(&batch))?;

In case the memory reservation is failing

Could you elaborate? I don't get it.

@alamb alamb mentioned this pull request Apr 7, 2025
12 tasks
// Recursively merge spilled files
// ──────────────────────────────────────────────────────────────────────
let spill_files = std::mem::take(&mut self.finished_spill_files);
let spill_files = self.recursively_merge_spill_files(spill_files).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can avoid recursion here if we don't have to use it?

The maximum number of pass of multi-pass external merge sort is "Total Passes = 1 (initial run) + ⌈log_d (number of runs)⌉" for d way merge sort. We can use this information to convert the recursion into a for loop (recursion has many performance disadvantages compare to loop).

}

/// Recursively merges and re-spills files until the number of spill files is ≤ MAX_SPILL_MERGE_DEGREE
async fn recursively_merge_spill_files(
Copy link
Contributor

@qstommyshu qstommyshu Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this name misleading, looks like this is not a recursive function. Maybe we can change it to something more descriptive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated. I changed my mind midway through implementing this function.

let sort_exprs: LexOrdering = self.expr.iter().cloned().collect();

// ==== Doing sort-preserving merge on input partially sorted streams ====
let spm_stream = StreamingMergeBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this StreamingMergeBuilder uses heap under the hood, using heap is a common method to optimize external merge sort performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's an in-house implementation of loser-tree heap. If we don't limit the degree of merge, for large sort queries, this step is the bottleneck now, maybe there is some room to optimize inside 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already pretty well optimized (not that it couldn't be made better) but there isn't a lot of low hanging fruit in my opinion

Copy link
Contributor

@qstommyshu qstommyshu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor issues, the algorithm itself looks good to me in general. I can take a closer look in details if needed (I'm not familiar with this part of the codebase yet, but I'll try my best to provide good review comments).

And some other thoughts:

  1. This is a pretty complicated program, maybe we should write some unit tests to make sure it doesn't break for future modifications?
  2. One idea to improve the performance is to dynamic calculate the optimal merge degree based on file size and memory size, or maybe multi-thread the merge phase (not sure if it is feasible)

self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
debug!("SPM stream is constructed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up debug logs if they are not needed?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool -- thank you @2010YOUY01 and @rluvaton and @qstommyshu

I think in general (can do as a follow on PR) we will need to introduce some parallelism as well to really maximize performance.

Specifically, the merge operation is fundamentally single threaded (the hot loop in the merge). Thus I would expect merging one set of 10 files to likely be CPU bottlenecked

So we probably would need to try and merge multiple sets of 10 files in parallel (to multiple different output files) before we either bottlenecked on CPU or on I/O throughput

What I think would really help make progress in this area is A benchmark. I filed a ticket to track this issue:

@2010YOUY01
Copy link
Contributor Author

Thank you all for the review!

@qstommyshu I agree with the implementation-level feedbacks. I will address them in the refactor.

@alamb Regarding parallel merging: I was thinking if max_spill_perge_degree configured to 10, than the memory is limited so that in each partition, we can only hold 10 batches at the same time, so parallel merging is not possible in this case.
However, @rluvaton 's PR has inspired me that, it's possible each operator is able to hold 100 batches under the memory limit at the same time, but we might still want to merge them 10 at a time for performance.

I think the next steps are

  1. Contribute benchmarks for external sort.
  2. Refactor this PR to avoid always re-spill, also do parallel merging when possible.

@2010YOUY01 2010YOUY01 marked this pull request as draft April 10, 2025 04:11
@2010YOUY01
Copy link
Contributor Author

And some other thoughts:

  1. This is a pretty complicated program, maybe we should write some unit tests to make sure it doesn't break for future modifications?

I'll try to do most of the testing and cover edge cases in integration tests at https://github.com/apache/datafusion/blob/main/datafusion/core/tests/fuzz_cases/sort_fuzz.rs and https://github.com/apache/datafusion/blob/main/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs, instead of doing extensive UTs.

I think we should promote tests to a higher level (SQL) when possible, because that API is much more stable and easier to manage. If a feature is tested mostly through unit tests, and someone later refactors the component away, those tests are likely to get lost—they might assume the feature is already covered by integration tests.

I first heard this idea in a talk by the DuckDB developers https://youtu.be/BgC79Zt2fPs?si=WiziGqJ8Dlz6-MMW

@alamb
Copy link
Contributor

alamb commented Apr 10, 2025

Yes I totally agree when possible SQL (or dataframe) is a better level to test at (and because it is the API that most users care about, not the internal details)

@github-actions github-actions bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) common Related to common crate execution Related to the execution crate labels Apr 12, 2025
@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Apr 12, 2025

I didn't implement the parallel merge optimization for now, my major concern is: this optimization requires one extra configuration, and users have to learn and correctly set 2 configs for each individual query, to enable the most efficient cascaded spill merge execution (see the below intended optimization section for what's those 2 configs), which is not ideal.
So I'd like to defer the implementation for a bit, to think about if there are any simpler approaches (or maybe by collecting stats internally and auto-tune those related configs)
Also, I think the current implementation is good enough to cover common cases (I did a rough estimation, sorting TPCH-SF1000 lineitem table with 16GB of memory only requires one round of re-spill)

Here is the optimization that I originally thought about, I'll put them into a separate issue if it makes sense.

Example scenario

For one partition's SortExec, 100 runs are spilled, and we set spill_max_spill_merge_degree to 4

Current Implementation

image
Each time it merges 4 existing spills into one combined spill file, until there are <= 4 spills total, the final result can be produced.
For each entry, the number of re-spill will be $floor(\log_4 100)$ = 3

Intended optimization

If the memory pool is enough to hold more batches at a time (while spill_max_spill_merge_degree is still limited to 4, in case the merge-degree is too large and hurt performance in some cases)
One additional config sort_buffer_batch_capacity is introduced, and set to 16 in the above example, the execution will look like:
image
Then, inside each merge step, 16 spill files will be combined and re-spill. Each entry only need to be re-spilled for $floor(\log_{16} 100)$ = 1 time.
With this approach, we can achieve an optimal re-spill count, and also enable parallel merge.

@2010YOUY01
Copy link
Contributor Author

Benchmark results:
(I think there is no significant regression for an extra round of re-spill, if it's running on a machine with fast SSDs)

Environment

MacBook Pro with m4-pro chip (disk bandwidth is around 8000MB/s)

Sorting 'thin' table

  1. Run datafusion-cli with cargo run --profile release-nonlto -- --mem-pool-type fair -m 100M
  2. Execute explain analyze select * from generate_series(1, 1000000000) as t1(v1) order by v1;

Main: 37s (merge ~170 spill files at once)
PR (with sort_max_spill_merge_degree = 16, and there is one round of re-spill): 43s
PR (with sort_max_spill_merge_degree = 10, two rounds of re-spill): 49s

Sorting 'fat' table

Run sort_tpch benchmark q7

        // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12 all other columns
        r#"
        SELECT l_linenumber, l_suppkey, l_orderkey, 
               l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
               l_returnflag, l_linestatus, l_shipdate, l_commitdate,
               l_receiptdate, l_shipinstruct, l_shipmode
        FROM lineitem
        ORDER BY l_linenumber, l_suppkey, l_orderkey
        "#,

Benchmark command

 cargo run --profile release-nonlto --bin dfbench -- sort-tpch -p /Users/yongting/Code/datafusion/benchmarks/data/tpch_sf10 -q 7 --memory-limit 1.2G

Notes:

  • target_partitions config set to 14, and later configurations and results depend on this setting.
  • For PR's benchmark runs, sort_max_spill_merge_degree is manually changed to 6, as a result:
    • under 1.2G memory limit, 1 round of re-spill will be triggered
    • under 500M memory limit, 2 rounds of re-spill happens

Result

Main (1.2G):

Q7 iteration 0 took 9374.7 ms and returned 59986052 rows
Q7 iteration 1 took 8117.6 ms and returned 59986052 rows
Q7 iteration 2 took 8549.1 ms and returned 59986052 rows
Q7 avg time: 8680.47 ms

Main (500M):

Fail with OOM

PR (1.2G):

ata/tpch_sf10 -q 7 --memory-limit 1G`
Q7 iteration 0 took 10723.6 ms and returned 59986052 rows
Q7 iteration 1 took 12962.8 ms and returned 59986052 rows
Q7 iteration 2 took 11739.7 ms and returned 59986052 rows
Q7 avg time: 11808.71 ms

PR (500M):

Q7 iteration 0 took 16233.1 ms and returned 59986052 rows
Q7 iteration 1 took 18568.4 ms and returned 59986052 rows
Q7 iteration 2 took 19173.4 ms and returned 59986052 rows
Q7 avg time: 17991.67 ms

@2010YOUY01 2010YOUY01 marked this pull request as ready for review April 12, 2025 11:06
@2010YOUY01
Copy link
Contributor Author

I have made the following updates:

  • Address review comments
  • Introduced a new configuration option for max merge degree
  • Add tests

It's ready for another look.

@2010YOUY01 2010YOUY01 changed the title POC: Cascaded spill merge and re-spill Cascaded spill merge and re-spill Apr 12, 2025
#[case(1)]
#[tokio::test]
async fn test_invalid_sort_max_spill_merge_degree(
#[case] sort_max_spill_merge_degree: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this #[case] syntax looks so elegant for writing repetitive tests

@@ -84,6 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.sort_max_spill_merge_degree | 16 | When doing external sorting, the maximum number of spilled files to read back at once. Those read files in the same merge step will be sort- preserving-merged and re-spilled, and the step will be repeated to reduce the number of spilled files in multiple passes, until a final sorted run can be produced. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great attention to detail, for updating the user guide!

@qstommyshu
Copy link
Contributor

Intended optimization

If the memory pool is enough to hold more batches at a time (while spill_max_spill_merge_degree is still limited to 4, in case the merge-degree is too large and hurt performance in some cases) One additional config sort_buffer_batch_capacity is introduced, and set to 16 in the above example, the execution will look like: ...

Thanks for the clear explanation, that's a lot of great works, and looks really cool!

@alamb
Copy link
Contributor

alamb commented Apr 13, 2025

I plan to re-review this tomorrow

/// preserving-merged and re-spilled, and the step will be repeated to reduce
/// the number of spilled files in multiple passes, until a final sorted run
/// can be produced.
pub sort_max_spill_merge_degree: usize, default = 16
Copy link
Contributor

@rluvaton rluvaton Apr 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern about this that there can still be memory issue, if the batch from each stream together is above the memory limit

I have an implementation for this that is completely memory safe and will try to create a PR for that for inspiration

The way to decide on the degree is actually by storing for each spill file the largest amount of memory a single record batch taken, and then when deciding on the degree, you simply grow until you can no longer.

The reason why I'm picky about this is that it is a new configuration that will be hard to deprecate or change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 and @alamb I hope before you merge this PR to look at #15700 to see what I mean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm picky about this is that it is a new configuration that will be hard to deprecate or change

This is a solid point, this option is intended to be manually set, and it has to ensure (max_batch_size * per_partition_merge_degree * partition_count) < total_memory_limit. If it's set correctly for a query, then the query should succeed.
The problem is the ever-growing number of configurations in DataFusion, and it seems impossible to set them all correctly. Enabling parallel merging optimization would require introducing yet another configuration, I'm also trying to avoid that (though too-many-configs problem might be a harsh reality we must accept).

@rluvaton
Copy link
Contributor

rluvaton commented Apr 13, 2025

Also, to have a fully working larger than memory sort, you need to spill in

.try_grow(get_record_batch_memory_size(&batch))?;

In case the memory reservation is failing

Could you elaborate? I don't get it.

Maybe the description for #15700 might help

@2010YOUY01
Copy link
Contributor Author

Also, to have a fully working larger than memory sort, you need to spill in

.try_grow(get_record_batch_memory_size(&batch))?;

In case the memory reservation is failing

Could you elaborate? I don't get it.

Maybe the description for #15700 might help

Thank you for providing an alternative approach.

I described my primary concern in #15700 (comment), I think it is not realistic to determine a batch’s memory size after a spilling roundtrip due to the implementation complexity. In such cases, if the estimation is off by a factor of 2, the actual memory usage could also increase by a factor of 2, which is not ideal.

@rluvaton
Copy link
Contributor

Thank you, can you please take the fuzz test that I created in my pr and add it to yours, making sure it will pass (it will require you updating row_hash.rs file

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation execution Related to the execution crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A complete solution for stable and safe sort with spill
4 participants