feat: Dynamic streaming hash table min reduction based on CPU cache#61326
feat: Dynamic streaming hash table min reduction based on CPU cache#61326HappenLee wants to merge 2 commits intoapache:masterfrom
Conversation
- Add get_cache_size() and get_cache_line_size() methods to CpuInfo - Add StreamingHtMinReductionEntry struct and get_streaming_ht_min_reduction() method - Replace static STREAMING_HT_MIN_REDUCTION config with dynamic calculation based on L2/L3 cache size - Update include paths in streaming_aggregation_operator.cpp and distinct_streaming_aggregation_operator.cpp - Change namespace from doris to doris::pipeline in streaming_aggregation_operator.cpp This change enables better adaptation to different hardware environments by dynamically calculating hash table expansion thresholds based on actual CPU cache sizes.
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR aims to make streaming aggregation hash table expansion thresholds adapt to the actual hardware by deriving the “min reduction” table from CPU L2/L3 cache sizes (instead of using a fixed constant table).
Changes:
- Add cache size / cache line size accessors to
CpuInfo. - Add a dynamic
get_streaming_ht_min_reduction()table builder based on detected L2/L3 cache sizes. - Update streaming aggregation operators to use the dynamic reduction table (and adjust includes/namespace).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
be/src/util/cpu_info.h |
Adds cache query helpers and a dynamically-built streaming HT min-reduction table. |
be/src/exec/operator/streaming_aggregation_operator.cpp |
Switches to CpuInfo-based reduction table; modifies includes and namespace. |
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp |
Switches to CpuInfo-based reduction table and updates includes accordingly. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| #include "common/cast_set.h" | ||
| #include "common/compiler_util.h" // IWYU pragma: keep | ||
| #include "exec/operator/operator.h" | ||
| #include "exec/operator/streaming_agg_min_reduction.h" | ||
| #include "exprs/aggregate/aggregate_function_simple_factory.h" | ||
| #include "exprs/vectorized_agg_fn.h" | ||
| #include "exprs/vslot_ref.h" | ||
| #include "pipeline/exec/operator.h" | ||
| #include "util/cpu_info.h" | ||
| #include "vec/aggregate_functions/aggregate_function_simple_factory.h" | ||
| #include "vec/exprs/vectorized_agg_fn.h" | ||
| #include "vec/exprs/vslot_ref.h" |
| namespace doris::pipeline { | ||
|
|
||
| using StreamingHtMinReductionEntry = doris::CpuInfo::StreamingHtMinReductionEntry; | ||
| static const std::vector<StreamingHtMinReductionEntry>& STREAMING_HT_MIN_REDUCTION = | ||
| doris::CpuInfo::get_streaming_ht_min_reduction(); | ||
| static const size_t STREAMING_HT_MIN_REDUCTION_SIZE = STREAMING_HT_MIN_REDUCTION.size(); | ||
|
|
||
| StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) |
| const auto* reduction = _is_single_backend | ||
| ? SINGLE_BE_STREAMING_HT_MIN_REDUCTION | ||
| : STREAMING_HT_MIN_REDUCTION; | ||
|
|
| const auto* reduction = _is_single_backend | ||
| ? SINGLE_BE_STREAMING_HT_MIN_REDUCTION | ||
| : STREAMING_HT_MIN_REDUCTION; | ||
|
|
||
| // Find the appropriate reduction factor in our table for the current hash table sizes. | ||
| int cache_level = 0; | ||
| while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && | ||
| ht_mem >= reduction[cache_level + 1].min_ht_mem) { |
| static long get_cache_size(CacheLevel level) { | ||
| long cache_sizes[NUM_CACHE_LEVELS]; | ||
| long cache_line_sizes[NUM_CACHE_LEVELS]; | ||
| _get_cache_info(cache_sizes, cache_line_sizes); | ||
| return cache_sizes[level]; | ||
| } | ||
|
|
||
| static long get_cache_line_size(CacheLevel level) { | ||
| long cache_sizes[NUM_CACHE_LEVELS]; | ||
| long cache_line_sizes[NUM_CACHE_LEVELS]; | ||
| _get_cache_info(cache_sizes, cache_line_sizes); | ||
| return cache_line_sizes[level]; | ||
| } |
| static std::vector<StreamingHtMinReductionEntry> entries; | ||
| static bool initialized = false; | ||
|
|
||
| if (!initialized) { | ||
| long l2_cache_size = CpuInfo::get_cache_size(CpuInfo::L2_CACHE); | ||
| long l3_cache_size = CpuInfo::get_cache_size(CpuInfo::L3_CACHE); | ||
|
|
||
| entries.push_back({.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}); | ||
|
|
||
| if (l2_cache_size > 256 * 1024) { | ||
| entries.push_back( | ||
| {.min_ht_mem = l2_cache_size / 4, .streaming_ht_min_reduction = 1.1}); | ||
| } else { | ||
| entries.push_back({.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1}); | ||
| } | ||
|
|
||
| if (l3_cache_size > 4 * 1024 * 1024) { | ||
| entries.push_back( | ||
| {.min_ht_mem = l3_cache_size / 2, .streaming_ht_min_reduction = 2.0}); | ||
| } else { | ||
| entries.push_back( | ||
| {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0}); | ||
| } | ||
|
|
||
| initialized = true; | ||
| } |
| struct StreamingHtMinReductionEntry { | ||
| long min_ht_mem; | ||
| double streaming_ht_min_reduction; | ||
| }; | ||
|
|
||
| static const std::vector<StreamingHtMinReductionEntry>& get_streaming_ht_min_reduction() { | ||
| static std::vector<StreamingHtMinReductionEntry> entries; |
|
run buildall |
TPC-H: Total hot run time: 26872 ms |
TPC-DS: Total hot run time: 167377 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This change enables better adaptation to different hardware environments by dynamically calculating hash table expansion thresholds based on actual CPU cache sizes.
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)