Skip to content

Commit 75c0826

Browse files
authored
Merge pull request ClickHouse#97231 from ClickHouse/fix_collection_for_selective_filters_
Fix AutoPR statistics collection for highly selective prewhere
2 parents afc4143 + 05941a8 commit 75c0826

11 files changed

Lines changed: 184 additions & 12 deletions

src/Processors/QueryPlan/Optimizations/RuntimeDataflowStatistics.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ void RuntimeDataflowStatisticsCacheUpdater::recordInputColumns(
219219
const ColumnsWithTypeAndName & input_columns,
220220
const NamesAndTypesList & part_columns,
221221
const ColumnSizeByName & column_sizes,
222-
size_t read_bytes)
222+
size_t read_bytes,
223+
std::optional<bool> & should_continue_sampling)
223224
{
224225
Stopwatch watch;
225226

@@ -252,8 +253,11 @@ void RuntimeDataflowStatisticsCacheUpdater::recordInputColumns(
252253
}
253254
else
254255
{
256+
if (!should_continue_sampling.has_value())
257+
should_continue_sampling = shouldSampleBlock(statistics, input_columns[0].column->size());
258+
255259
// We don't have individual column size info, likely because it is a compact part. Let's try to estimate it.
256-
if (shouldSampleBlock(statistics, input_columns[0].column->size()))
260+
if (*should_continue_sampling)
257261
{
258262
for (const auto & column : input_columns)
259263
{

src/Processors/QueryPlan/Optimizations/RuntimeDataflowStatistics.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <cstddef>
1313
#include <memory>
1414
#include <mutex>
15+
#include <optional>
1516

1617
namespace DB
1718
{
@@ -93,11 +94,16 @@ class RuntimeDataflowStatisticsCacheUpdater
9394

9495
void recordAggregationKeySizes(const Aggregator & aggregator, const Block & block);
9596

97+
/// Updates should_continue_sampling to true if the current read block is chosen for sampling.
98+
/// It is needed because in general we read each block in multiple steps because of prewhere.
99+
/// If the first part of the block was chosen for sampling, we want to record statistics for the whole block in later steps,
100+
/// so should_continue_sampling remains true for subsequent calls for the same logical block.
96101
void recordInputColumns(
97102
const ColumnsWithTypeAndName & input_columns,
98103
const NamesAndTypesList & part_columns,
99104
const ColumnSizeByName & column_sizes,
100-
size_t read_bytes);
105+
size_t read_bytes,
106+
std::optional<bool> & should_continue_sampling);
101107

102108
void markUnsupportedCase() { unsupported_case.store(true, std::memory_order_relaxed); }
103109

src/Storages/MergeTree/IMergeTreeReader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ class IMergeTreeReader : private boost::noncopyable
7171
ALWAYS_INLINE const NamesAndTypesList & getColumns() const { return data_part_info_for_read->isWidePart() ? converted_requested_columns : original_requested_columns; }
7272
size_t numColumnsInResult() const { return getColumns().size(); }
7373

74+
/// Returns column names and types as they are stored on disk (may differ from requested types
75+
/// when there are pending type-changing mutations). Used to build correct `ColumnsWithTypeAndName`
76+
/// before `performRequiredConversions` is applied.
77+
const NamesAndTypes & getColumnsToRead() const { return columns_to_read; }
78+
7479
size_t getFirstMarkToRead() const { return all_mark_ranges.front().begin; }
7580

7681
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;

src/Storages/MergeTree/MergeTreeReadTask.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,17 @@ MergeTreeReadTask::MergeTreeReadTask(
8282
, size_predictor(std::move(size_predictor_))
8383
, updater(std::move(updater_))
8484
{
85+
if (updater)
86+
{
87+
dataflow_cache_update_cb
88+
= [&](const ColumnsWithTypeAndName & columns, size_t read_bytes, std::optional<bool> & should_continue_sampling) -> void
89+
{
90+
chassert(updater);
91+
const auto & part_columns = info->data_part->getColumns();
92+
const auto & column_sizes = info->data_part->getColumnSizes();
93+
updater->recordInputColumns(columns, part_columns, column_sizes, read_bytes, should_continue_sampling);
94+
};
95+
}
8596
}
8697

8798
/// Returns pointer to the index if all columns in the read step belongs to the read step for that index.
@@ -356,7 +367,7 @@ MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read()
356367
UInt64 recommended_rows = estimateNumRows();
357368
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(block_size_params.max_block_size_rows, recommended_rows));
358369

359-
auto read_result = readers_chain.read(rows_to_read, mark_ranges, patches_mark_ranges);
370+
auto read_result = readers_chain.read(rows_to_read, mark_ranges, patches_mark_ranges, dataflow_cache_update_cb);
360371

361372
/// All rows were filtered. Repeat.
362373
if (read_result.num_rows == 0)
@@ -399,10 +410,6 @@ MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read()
399410
block = sample_block.cloneWithColumns(read_result.columns);
400411
}
401412

402-
if (updater)
403-
updater->recordInputColumns(
404-
block.getColumnsWithTypeAndName(), info->data_part->getColumns(), info->data_part->getColumnSizes(), num_read_bytes);
405-
406413
BlockAndProgress res = {
407414
.block = std::move(block),
408415
.read_mark_ranges = read_result.read_mark_ranges,

src/Storages/MergeTree/MergeTreeReadTask.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ struct MergeTreeReadTask : private boost::noncopyable
212212
const ReadStepsPerformanceCounters & read_steps_performance_counters);
213213

214214
private:
215+
using DataflowCacheUpdateCallback
216+
= std::function<void(const ColumnsWithTypeAndName & columns, size_t read_bytes, std::optional<bool> & should_continue_sampling)>;
217+
215218
UInt64 estimateNumRows() const;
216219

217220
/// Shared information required for reading.
@@ -239,6 +242,7 @@ struct MergeTreeReadTask : private boost::noncopyable
239242
MergeTreeBlockSizePredictorPtr size_predictor;
240243

241244
RuntimeDataflowStatisticsCacheUpdaterPtr updater;
245+
DataflowCacheUpdateCallback dataflow_cache_update_cb;
242246
};
243247

244248
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;

src/Storages/MergeTree/MergeTreeReadersChain.cpp

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#include <Storages/MergeTree/MergeTreeReadersChain.h>
21
#include <Storages/MergeTree/IMergeTreeReader.h>
3-
#include <Common/logger_useful.h>
2+
#include <Storages/MergeTree/MergeTreeReadersChain.h>
43
#include <Storages/MergeTree/PatchParts/PatchPartsUtils.h>
4+
#include <Common/logger_useful.h>
55

66
namespace DB
77
{
@@ -56,7 +56,33 @@ static std::optional<UInt64> getMaxPatchVersionForStep(const MergeTreeRangeReade
5656
return prewhere_info ? prewhere_info->mutation_version : std::nullopt;
5757
}
5858

59-
MergeTreeReadersChain::ReadResult MergeTreeReadersChain::read(size_t max_rows, MarkRanges & ranges, std::vector<MarkRanges> & patch_ranges)
59+
/// Builds `ColumnsWithTypeAndName` using the on-disk column descriptions (from `IMergeTreeReader::getColumnsToRead`).
60+
/// This is important when columns have not yet been converted, i.e. their types with differ those contained in `getReadSampleBlock`.
61+
static ColumnsWithTypeAndName toColumnsWithTypeAndName(const Columns & columns, const NamesAndTypes & on_disk_columns)
62+
{
63+
if (columns.size() != on_disk_columns.size())
64+
throw Exception(
65+
ErrorCodes::LOGICAL_ERROR,
66+
"Number of columns doesn't match number of on-disk columns, columns size: {}, on_disk_columns size: {}",
67+
columns.size(),
68+
on_disk_columns.size());
69+
70+
ColumnsWithTypeAndName res;
71+
res.reserve(columns.size());
72+
for (size_t i = 0; i < columns.size(); ++i)
73+
{
74+
/// Columns might be null, e.g. not yet filled by `fillMissingColumns`
75+
if (columns[i])
76+
res.emplace_back(columns[i], on_disk_columns[i].type, on_disk_columns[i].name);
77+
}
78+
return res;
79+
}
80+
81+
MergeTreeReadersChain::ReadResult MergeTreeReadersChain::read(
82+
size_t max_rows,
83+
MarkRanges & ranges,
84+
std::vector<MarkRanges> & patch_ranges,
85+
const DataflowCacheUpdateCallback & dataflow_cache_update_cb)
6086
{
6187
if (max_rows == 0)
6288
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected at least 1 row to read, got 0.");
@@ -79,10 +105,18 @@ MergeTreeReadersChain::ReadResult MergeTreeReadersChain::read(size_t max_rows, M
79105
throw;
80106
}
81107

108+
std::optional<bool> should_continue_sampling;
82109
if (read_result.num_rows != 0)
83110
{
84111
first_reader.getReader()->fillVirtualColumns(read_result.columns, read_result.num_rows);
85112
readPatches(first_reader.getReadSampleBlock(), patch_ranges, read_result);
113+
114+
if (dataflow_cache_update_cb)
115+
dataflow_cache_update_cb(
116+
toColumnsWithTypeAndName(read_result.columns, first_reader.getReader()->getColumnsToRead()),
117+
read_result.num_bytes_read,
118+
should_continue_sampling);
119+
86120
executeActionsBeforePrewhere(read_result, read_result.columns, first_reader, {}, read_result.num_rows);
87121

88122
executePrewhereActions(first_reader, read_result, {}, range_readers.size() == 1);
@@ -91,6 +125,7 @@ MergeTreeReadersChain::ReadResult MergeTreeReadersChain::read(size_t max_rows, M
91125

92126
for (size_t i = 1; i < range_readers.size(); ++i)
93127
{
128+
const size_t num_bytes_read_so_far = read_result.num_bytes_read;
94129
size_t num_read_rows = 0;
95130
auto columns = range_readers[i].continueReadingChain(read_result, num_read_rows);
96131

@@ -109,6 +144,17 @@ MergeTreeReadersChain::ReadResult MergeTreeReadersChain::read(size_t max_rows, M
109144
if (num_read_rows == 0)
110145
num_read_rows = read_result.num_rows;
111146

147+
if (dataflow_cache_update_cb)
148+
{
149+
chassert(read_result.num_bytes_read >= num_bytes_read_so_far);
150+
// It is important that we call `recordInputColumns` here even if `should_continue_sampling`
151+
// is already set to false, because we still need to update the total bytes seen.
152+
dataflow_cache_update_cb(
153+
toColumnsWithTypeAndName(columns, range_readers[i].getReader()->getColumnsToRead()),
154+
read_result.num_bytes_read - num_bytes_read_so_far,
155+
should_continue_sampling);
156+
}
157+
112158
executeActionsBeforePrewhere(read_result, columns, range_readers[i], previous_header, num_read_rows);
113159
read_result.columns.insert(read_result.columns.end(), columns.begin(), columns.end());
114160
}

src/Storages/MergeTree/MergeTreeReadersChain.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include <Storages/MergeTree/MergeTreeRangeReader.h>
33
#include <Storages/MergeTree/PatchParts/MergeTreePatchReader.h>
44

5+
#include <functional>
6+
57
namespace DB
68
{
79

@@ -30,13 +32,18 @@ using ColumnsForPatches = std::vector<ColumnsForPatch>;
3032

3133
class MergeTreeReadersChain
3234
{
35+
using DataflowCacheUpdateCallback
36+
= std::function<void(const ColumnsWithTypeAndName & columns, size_t read_bytes, std::optional<bool> & should_continue_sampling)>;
37+
3338
public:
3439
MergeTreeReadersChain() = default;
3540
MergeTreeReadersChain(RangeReaders range_readers_, MergeTreePatchReaders patch_readers_);
3641
bool isInitialized() const { return is_initialized; }
3742

3843
using ReadResult = MergeTreeRangeReader::ReadResult;
39-
ReadResult read(size_t max_rows, MarkRanges & ranges, std::vector<MarkRanges> & patch_ranges);
44+
45+
ReadResult
46+
read(size_t max_rows, MarkRanges & ranges, std::vector<MarkRanges> & patch_ranges, const DataflowCacheUpdateCallback & update_cb = {});
4047

4148
size_t numReadRowsInCurrentGranule() const;
4249
size_t numPendingRowsInCurrentGranule() const;

tests/queries/0_stateless/03801_autopr_input_bytes_estimation_query_with_subqueries.reference

Whitespace-only changes.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
-- Tags: stateful
2+
3+
-- To avoid too slow test execution
4+
set remote_filesystem_read_method='threadpool', allow_prefetched_read_pool_for_remote_filesystem=1, filesystem_prefetch_step_marks=0, filesystem_prefetch_step_bytes='100Mi';
5+
6+
SET enable_parallel_replicas=0, automatic_parallel_replicas_mode=2, parallel_replicas_local_plan=1, parallel_replicas_index_analysis_only_on_coordinator=1,
7+
parallel_replicas_for_non_replicated_merge_tree=1, max_parallel_replicas=3, cluster_for_parallel_replicas='parallel_replicas';
8+
9+
-- External aggregation is not supported as of now
10+
SET max_bytes_before_external_group_by=0, max_bytes_ratio_before_external_group_by=0;
11+
12+
SET use_query_condition_cache=0;
13+
14+
create table t(a UInt64) engine=MergeTree order by a;
15+
insert into t select number from numbers_mt(1e6);
16+
17+
SELECT a % 10000 FROM t FORMAT Null SETTINGS log_comment='03801_autopr_input_bytes_estimation_query_with_subqueries_query_0';
18+
19+
-- `CounterID` is part of the PK
20+
SELECT EventTime, CounterID, URL, Referer FROM test.hits WHERE CounterID IN (SELECT a % 10000 FROM t) FORMAT Null SETTINGS log_comment='03801_autopr_input_bytes_estimation_query_with_subqueries_query_1';
21+
-- `WatchID` is not
22+
SELECT EventTime, CounterID, URL, Referer FROM test.hits WHERE WatchID IN (SELECT a % 10000 FROM t) FORMAT Null SETTINGS log_comment='03801_autopr_input_bytes_estimation_query_with_subqueries_query_2';
23+
24+
SET enable_parallel_replicas=0, automatic_parallel_replicas_mode=0;
25+
26+
SYSTEM FLUSH LOGS query_log;
27+
28+
-- Just checking that the estimation is not too far off
29+
--
30+
-- We subtract the compressed bytes of the subquery because it cannot be executed with parallel replicas in the current infrastructure,
31+
-- so the "parallelizable" part of the query is only the main query itself, thus AutoPR heuristic should use only its estimation.
32+
WITH (
33+
SELECT
34+
ProfileEvents['ReadCompressedBytes']
35+
FROM system.query_log
36+
WHERE (event_date >= yesterday()) AND (event_time >= NOW() - INTERVAL '15 MINUTES') AND (current_database = currentDatabase()) AND (log_comment = '03801_autopr_input_bytes_estimation_query_with_subqueries_query_0') AND (type = 'QueryFinish')
37+
ORDER BY event_time_microseconds
38+
) AS compressed_bytes_subquery
39+
SELECT format('{} {} {}', log_comment, compressed_bytes, statistics_input_bytes)
40+
FROM (
41+
SELECT
42+
log_comment,
43+
ProfileEvents['ReadCompressedBytes'] - compressed_bytes_subquery AS compressed_bytes,
44+
ProfileEvents['RuntimeDataflowStatisticsInputBytes']::Int64 statistics_input_bytes
45+
FROM system.query_log
46+
WHERE (event_date >= yesterday()) AND (event_time >= NOW() - INTERVAL '15 MINUTES') AND (current_database = currentDatabase()) AND (match(log_comment, '03801_autopr_input_bytes_estimation_query_with_subqueries_query_[12]')) AND (type = 'QueryFinish')
47+
ORDER BY event_time_microseconds
48+
)
49+
WHERE greatest(compressed_bytes, statistics_input_bytes) / least(compressed_bytes, statistics_input_bytes) > 2;

tests/queries/0_stateless/03927_autopr_input_bytes_estimation_prewhere_filter.reference

Whitespace-only changes.

0 commit comments

Comments
 (0)