Skip to content

Optimize TopK with threshold filter ~1.4x speedup #15697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Apr 13, 2025

Which issue does this PR close?

Rationale for this change

This optimizes our TopK by filtering early based on the threshold values, avoiding conversion to Row-values and slower conversions.
While pushing down to the scan is yielding more gains when possible, this is only possible if it is supported / enabled, has relevant statistics that allow pruning / filter pushdown is enabled and the TopK happens directly after a scan.

┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ improve_topk ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │  24.28ms │      16.45ms │ +1.48x faster │
│ Q2           │  26.17ms │      20.65ms │ +1.27x faster │
│ Q3           │  79.67ms │      54.37ms │ +1.47x faster │
│ Q4           │  27.44ms │      21.16ms │ +1.30x faster │
│ Q5           │  17.38ms │      13.30ms │ +1.31x faster │
│ Q6           │  30.91ms │      28.07ms │ +1.10x faster │
│ Q7           │  74.48ms │      73.20ms │     no change │
│ Q8           │  76.44ms │      45.69ms │ +1.67x faster │
│ Q9           │  88.62ms │      58.62ms │ +1.51x faster │
│ Q10          │ 128.54ms │     100.68ms │ +1.28x faster │
│ Q11          │  72.47ms │      57.48ms │ +1.26x faster │
└──────────────┴──────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary           ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (main)           │ 646.41ms │
│ Total Time (improve_topk)   │ 489.66ms │
│ Average Time (main)         │  58.76ms │
│ Average Time (improve_topk) │  44.51ms │
│ Queries Faster              │       10 │
│ Queries Slower              │        0 │
│ Queries with No Change      │        1 │
└─────────────────────────────┴──────────┘

Also some clickbench queries seems to be improved:

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ improve_topk ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     1.32ms │       1.23ms │ +1.07x faster │
│ QQuery 1     │    24.07ms │      23.99ms │     no change │
│ QQuery 2     │    66.63ms │      64.99ms │     no change │
│ QQuery 3     │    54.47ms │      52.82ms │     no change │
│ QQuery 4     │   571.78ms │     480.98ms │ +1.19x faster │
│ QQuery 5     │   634.56ms │     621.52ms │     no change │
│ QQuery 6     │     1.05ms │       1.07ms │     no change │
│ QQuery 7     │    27.49ms │      28.62ms │     no change │
│ QQuery 8     │   593.15ms │     599.61ms │     no change │
│ QQuery 9     │   875.49ms │     859.69ms │     no change │
│ QQuery 10    │   193.04ms │     192.22ms │     no change │
│ QQuery 11    │   220.40ms │     212.42ms │     no change │
│ QQuery 12    │   764.13ms │     785.23ms │     no change │
│ QQuery 13    │  1006.88ms │    1041.78ms │     no change │
│ QQuery 14    │   673.52ms │     681.64ms │     no change │
│ QQuery 15    │   713.13ms │     706.34ms │     no change │
│ QQuery 16    │  1447.87ms │    1385.00ms │     no change │
│ QQuery 17    │  1325.88ms │    1285.75ms │     no change │
│ QQuery 18    │  2599.18ms │    2571.18ms │     no change │
│ QQuery 19    │    48.23ms │      49.82ms │     no change │
│ QQuery 20    │   950.18ms │     956.24ms │     no change │
│ QQuery 21    │  1135.24ms │    1164.32ms │     no change │
│ QQuery 22    │  1838.18ms │    1858.80ms │     no change │
│ QQuery 23    │  6291.70ms │    6117.67ms │     no change │
│ QQuery 24    │   387.35ms │     370.48ms │     no change │
│ QQuery 25    │   358.61ms │     300.32ms │ +1.19x faster │
│ QQuery 26    │   435.01ms │     366.38ms │ +1.19x faster │
│ QQuery 27    │  1456.11ms │    1410.06ms │     no change │
│ QQuery 28    │ 11483.18ms │   11470.65ms │     no change │
│ QQuery 29    │   433.16ms │     441.81ms │     no change │
│ QQuery 30    │   590.06ms │     602.67ms │     no change │
│ QQuery 31    │   595.45ms │     611.93ms │     no change │
│ QQuery 32    │  2735.39ms │    2203.37ms │ +1.24x faster │
│ QQuery 33    │  2928.70ms │    2918.67ms │     no change │
│ QQuery 34    │  3165.39ms │    3146.05ms │     no change │
│ QQuery 35    │   912.50ms │     911.56ms │     no change │
│ QQuery 36    │    83.08ms │      84.13ms │     no change │
│ QQuery 37    │    38.31ms │      38.61ms │     no change │
│ QQuery 38    │    83.66ms │      82.82ms │     no change │
│ QQuery 39    │   138.21ms │     139.02ms │     no change │
│ QQuery 40    │    28.63ms │      28.20ms │     no change │
│ QQuery 41    │    27.61ms │      28.06ms │     no change │
│ QQuery 42    │    23.49ms │      22.86ms │     no change │
└──────────────┴────────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)           │ 47961.50ms │
│ Total Time (improve_topk)   │ 46920.59ms │
│ Average Time (main)         │  1115.38ms │
│ Average Time (improve_topk) │  1091.18ms │
│ Queries Faster              │          5 │
│ Queries Slower              │          0 │
│ Queries with No Change      │         38 │
└─────────────────────────────┴────────────┘

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

// If the heap doesn't have k elements yet, we can't create thresholds
match self.heap.max() {
Some(max_row) => {
// Get the batch that contains the max row
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a bit of code from @adriangb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of it can probably combined when dynamic filter for topk is ready

@Dandandan Dandandan changed the title Optimize TopK with filter Optimize TopK with filter Apr 13, 2025
@Dandandan Dandandan changed the title Optimize TopK with filter Optimize TopK with filter ~1.4x Apr 13, 2025
@Dandandan Dandandan changed the title Optimize TopK with filter ~1.4x Optimize TopK with filter ~1.4x faster Apr 13, 2025
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the ideas to basically do the same thing we're going to do for the dynamic filters but essentially do the filtering inside of top K to avoid some extra work. Is that correct? If so, it sounds like a great idea and we're going to be able to reuse a lot of the code

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 13, 2025

If I understand correctly, the ideas to basically do the same thing we're going to do for the dynamic filters but essentially do the filtering inside of top K to avoid some extra work. Is that correct? If so, it sounds like a great idea and we're going to be able to reuse a lot of the code

Yeah that's totally correct! The gains won't be as impressive as with dynamic filter being able to push it down to a scan, but still avoid work in TopK by not having to convert the sorting keys to row format.

@adriangb
Copy link
Contributor

adriangb commented Apr 13, 2025

Nice! We can even wire it up with the filter pushdown so that if an operator under us "absorbs" the filter (eg it got pushed down to the scan) we skip doing this internally.

But 1.4x faster is a great reason to merge this and re-use the code later.

@Dandandan
Copy link
Contributor Author

Nice! We can even wire it up with the filter pushdown so that if an operator under us "absorbs" the filter (eg it got pushed down to the scan) we skip doing this internally.

Yeah, would be useful to avoid filtering twice and the way to go👍

@adriangb
Copy link
Contributor

@Dandandan will be happy to review once CI is passing 😄

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 13, 2025

@adriangb FYI CI is passing, it's ready for review.
I had to make some changes to the filter that is applied to respect lexicographic ordering (which made Q7 lose the speedup), but it looks like it is still a big improvement while I can see benchmarks. I filed #15698 to support multiple columns + use BinaryExpr to utilize some further optimizations.

@Dandandan Dandandan changed the title Optimize TopK with filter ~1.4x faster Optimize TopK with filter ~1.4x speedup Apr 13, 2025
@Dandandan Dandandan changed the title Optimize TopK with filter ~1.4x speedup Optimize TopK with threshold filter ~1.4x speedup Apr 13, 2025
@adriangb
Copy link
Contributor

I'll take a look tomorrow! Why do we have to use only the first column? Is it just to break up the change into smaller units? We had multi-column support working in the now closed PR that added it.

@Dandandan
Copy link
Contributor Author

I'll take a look tomorrow! Why do we have to use only the first column? Is it just to break up the change into smaller units? We had multi-column support working in the now closed PR that added it.

Thanks!
Time was up yesterday.
I see your PR also handled it.

I think it is not super hard to add support for all columns, but want to benchmark the change well as well. As the first column(s) filter out most of the rows the gains for adding more filters become smaller and with many rows it might be faster to only keep a smaller number of first sort columns instead of filtering on all.

}
let filter_predicate = FilterBuilder::new(&filter);
let filter_predicate = if sort_keys.len() > 1 {
filter_predicate.optimize().build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments to explain this optimize()? The original doc is not super clear I think.

@@ -212,6 +212,10 @@ main() {
# same data as for tpch
data_tpch "1"
;;
sort_tpch_limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the best name would be, but I feel it would be useful for discoverability to have topk in it. tpch_topk? sort_tpch_topk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please also add a description of this benchmark in https://github.com/apache/datafusion/tree/main/benchmarks#benchmarks ?

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be nicer (and tie in better with future work 😉) if we essentially followed the structure of #15301 but do the filtering in TopK or SortExec:

  1. Keep track of a thresholds: Arc<RwLock<Vec<Option<ScalarValue>>>> and filter: Option<Arc>onTopK`.
  2. For each batch check pass it through the existing filter, if any, and exit early if no rows remain.
  3. If we updated our heap propagate the update to thresholds and filter.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan and @2010YOUY01 and @adriangb and @geoffreyclaude !

One thing I was wondering about for this PR is how much will it help once we implement actual topk filter pushdown into the scan (aka #15037)

I am thinking that the topk filter pushdown will already filter out rows that are known not to be in the topK

Specifically, once we implement topk filter pushdown the rows should already be filtered and so checking again in the TopK itself won't add any benefit, will it?

@@ -212,6 +212,10 @@ main() {
# same data as for tpch
data_tpch "1"
;;
sort_tpch_limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please also add a description of this benchmark in https://github.com/apache/datafusion/tree/main/benchmarks#benchmarks ?

@adriangb
Copy link
Contributor

Thanks @Dandandan and @2010YOUY01 and @adriangb and @geoffreyclaude !

One thing I was wondering about for this PR is how much will it help once we implement actual topk filter pushdown into the scan (aka #15037)

I am thinking that the topk filter pushdown will already filter out rows that are known not to be in the topK

Specifically, once we implement topk filter pushdown the rows should already be filtered and so checking again in the TopK itself won't add any benefit, will it?

Yes that's right for Parquet, but not all data sources support filter pushdown, so there's still benefit for those. But yeah, I'm hoping we can structure this in a way that we get an immediate win that justifies the change but also introduces all of the code necessary for filter push down later on.

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 15, 2025

I am am thinking that the topk filter pushdown will already filter out rows that are known not to be in the topK

Yes, in those cases it might not be adding something but it is still useful in the following cases:

  • Filter pushdown not enabled (this is not yet default)
  • Stats not enabled or not in useful distribution to allow effective pruning
  • Non Parquet sources
  • TopK on a plan that doesn't allow pushing down the filter to a source (i.e. most plans involving aggregate, joins, ...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize TopK with filter
6 participants