Skip to content

Commit 00aafbf

Browse files
committed
[GLUTEN-3884][CH][Parquet] support page pruning on first column, CH part
1 parent b8ba0ae commit 00aafbf

File tree

4 files changed

+149
-23
lines changed

4 files changed

+149
-23
lines changed

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
url = https://github.com/ClickHouse/boost
4646
[submodule "contrib/arrow"]
4747
path = contrib/arrow
48-
url = https://github.com/ClickHouse/arrow
48+
url = https://github.com/binmahone/arrow
4949
[submodule "contrib/thrift"]
5050
path = contrib/thrift
5151
url = https://github.com/apache/thrift

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 142 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <parquet/arrow/reader.h>
1515
#include <parquet/arrow/schema.h>
1616
#include <parquet/file_reader.h>
17+
#include <parquet/page_index.h>
1718
#include <parquet/statistics.h>
1819
#include "ArrowBufferedStreams.h"
1920
#include "ArrowColumnToCHColumn.h"
@@ -235,7 +236,10 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
235236
/// Range of values for each column, based on statistics in the Parquet metadata.
236237
/// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just
237238
/// missing in the metadata.
238-
static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
239+
static std::vector<Range> getHyperrectangleFromStatistics(
240+
const Block & header,
241+
const FormatSettings & format_settings,
242+
const std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> & name_to_statistics)
239243
{
240244
auto column_name_for_lookup = [&](std::string column_name) -> std::string
241245
{
@@ -244,23 +248,6 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
244248
return column_name;
245249
};
246250

247-
std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);
248-
249-
std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
250-
for (int i = 0; i < row_group->num_columns(); ++i)
251-
{
252-
auto c = row_group->ColumnChunk(i);
253-
auto s = c->statistics();
254-
if (!s)
255-
continue;
256-
257-
auto path = c->path_in_schema()->ToDotVector();
258-
if (path.size() != 1)
259-
continue; // compound types not supported
260-
261-
name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
262-
}
263-
264251
/// +-----+
265252
/// / /|
266253
/// +-----+ |
@@ -364,6 +351,81 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
364351
return hyperrectangle;
365352
}
366353

354+
355+
static std::vector<Range> getHyperrectangleForRowGroup(
356+
const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
357+
{
358+
auto column_name_for_lookup = [&](std::string column_name) -> std::string
359+
{
360+
if (format_settings.parquet.case_insensitive_column_matching)
361+
boost::to_lower(column_name);
362+
return column_name;
363+
};
364+
365+
std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);
366+
367+
std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
368+
for (int i = 0; i < row_group->num_columns(); ++i)
369+
{
370+
auto c = row_group->ColumnChunk(i);
371+
auto s = c->statistics();
372+
if (!s)
373+
continue;
374+
375+
auto path = c->path_in_schema()->ToDotVector();
376+
if (path.size() != 1)
377+
continue; // compound types not supported
378+
379+
name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
380+
}
381+
382+
return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
383+
}
384+
385+
static std::vector<Range> getHyperrectangleForPage(
386+
const bool is_null_wanted,
387+
const bool is_null_page,
388+
const std::string & min_value,
389+
const std::string & max_value,
390+
const Block & header,
391+
const FormatSettings & format_settings,
392+
const parquet::ColumnDescriptor * descr)
393+
{
394+
auto column_name_for_lookup = [&](std::string column_name) -> std::string
395+
{
396+
if (format_settings.parquet.case_insensitive_column_matching)
397+
boost::to_lower(column_name);
398+
return column_name;
399+
};
400+
401+
if (is_null_page)
402+
{
403+
std::vector ret(header.columns(), Range::createWholeUniverse());
404+
ret.at(0) = Range(Null::Value::NegativeInfinity, true, Null::Value::NegativeInfinity, true);
405+
return ret;
406+
}
407+
408+
std::shared_ptr<parquet::Statistics> stats;
409+
/// Page index does not contain enough statistics. E.g. we don't know whether a page contains NULL or not.
410+
/// So we have to create a fake one.
411+
if (is_null_wanted)
412+
{
413+
// if null is wanted, we have to assume that the page contains null
414+
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 1, 1, true, true, true);
415+
}
416+
else
417+
{
418+
// if null is not wanted, we can assume this page does not contain null
419+
// so that getHyperrectangleFromStatistics will return a much narrower range
420+
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 0, 1, true, true, true);
421+
}
422+
std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
423+
name_to_statistics.emplace(column_name_for_lookup(descr->name()), stats);
424+
425+
return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
426+
}
427+
428+
367429
ParquetBlockInputFormat::ParquetBlockInputFormat(
368430
ReadBuffer & buf,
369431
const Block & header_,
@@ -388,6 +450,59 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
388450
pool->wait();
389451
}
390452

453+
454+
// Apply page index of first column if first column is sorted. We only consider first column because:
455+
// 1. when first column is sorted, it is likely that other columns are not sorted and lack selectivity
456+
// 2. it's more complex to calcute page index for multiple columns with KeyCondition, which assumes column
457+
// rows are aligned among pages, but unfortunatedly it's not.
458+
void ParquetBlockInputFormat::applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group)
459+
{
460+
if (metadata->RowGroup(row_group)->ColumnChunk(0)->path_in_schema()->ToDotVector().size() != 1)
461+
return; // compound types not supported
462+
463+
const auto pg_idx_reader = parquet_reader->GetPageIndexReader()->RowGroup(row_group);
464+
if (pg_idx_reader == nullptr)
465+
return;
466+
467+
const auto first_col_idx = pg_idx_reader->GetColumnIndex(0);
468+
const auto first_col_offsets = pg_idx_reader->GetOffsetIndex(0);
469+
if (first_col_idx != nullptr
470+
&& (first_col_idx->boundary_order() == parquet::BoundaryOrder::Ascending
471+
|| first_col_idx->boundary_order() == parquet::BoundaryOrder::Descending))
472+
{
473+
auto row_ranges = std::make_shared<parquet::RowRanges>();
474+
auto null_pages = first_col_idx->null_pages();
475+
const auto min_values = first_col_idx->encoded_min_values();
476+
const auto max_values = first_col_idx->encoded_max_values();
477+
478+
std::vector<Range> probe_range(getPort().getHeader().columns(), Range::createWholeUniverse());
479+
probe_range.at(0) = Range(NEGATIVE_INFINITY, true, NEGATIVE_INFINITY, true);
480+
// probe_range limits the range of first column to be NULL only.
481+
// If the probe result is true, it means where condition contains "first_col is NULL AND ...",
482+
// it also means rows with first_col being null is wanted by where condition.
483+
const bool null_wanted = key_condition->checkInHyperrectangle(probe_range, getPort().getHeader().getDataTypes()).can_be_true;
484+
485+
for (size_t i = 0; i < null_pages.size(); ++i)
486+
{
487+
auto page_hyperrectangle = getHyperrectangleForPage(
488+
null_wanted,
489+
null_pages.at(i),
490+
min_values.at(i),
491+
max_values.at(i),
492+
getPort().getHeader(),
493+
format_settings,
494+
metadata->schema()->Column(0));
495+
if (key_condition->checkInHyperrectangle(page_hyperrectangle, getPort().getHeader().getDataTypes()).can_be_true)
496+
{
497+
auto to = (i == null_pages.size() - 1) ? metadata->RowGroup(row_group)->num_rows() - 1
498+
: first_col_offsets->page_locations().at(i + 1).first_row_index - 1;
499+
row_ranges->add(parquet::Range(first_col_offsets->page_locations().at(i).first_row_index, to), false);
500+
}
501+
}
502+
row_group_batches.back().row_ranges_map.insert(std::make_pair(row_group, row_ranges));
503+
}
504+
}
505+
391506
void ParquetBlockInputFormat::initializeIfNeeded()
392507
{
393508
if (std::exchange(is_initialized, true))
@@ -401,7 +516,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
401516
if (is_stopped)
402517
return;
403518

404-
metadata = parquet::ReadMetaData(arrow_file);
519+
const auto parquet_reader = parquet::ParquetFileReader::Open(arrow_file);
520+
metadata = parquet_reader->metadata();
405521

406522
std::shared_ptr<arrow::Schema> schema;
407523
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
@@ -433,6 +549,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
433549
row_group_batches.back().row_groups_idxs.push_back(row_group);
434550
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
435551
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
552+
553+
applyRowRangesFromPageIndex(parquet_reader, row_group);
436554
}
437555
}
438556

@@ -486,8 +604,11 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
486604
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
487605
THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader));
488606

489-
THROW_ARROW_NOT_OK(
490-
row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader));
607+
THROW_ARROW_NOT_OK(row_group_batch.file_reader->GetRecordBatchReader(
608+
row_group_batch.row_groups_idxs,
609+
column_indices,
610+
std::make_shared<std::map<int, parquet::RowRangesPtr>>(std::move(row_group_batch.row_ranges_map)),
611+
&row_group_batch.record_batch_reader));
491612

492613
row_group_batch.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
493614
getPort().getHeader(),

src/Processors/Formats/Impl/ParquetBlockInputFormat.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#include <Formats/FormatSettings.h>
88
#include <Storages/MergeTree/KeyCondition.h>
99

10+
#include <parquet/column_reader.h>
11+
#include <parquet/file_reader.h>
12+
1013
namespace parquet { class FileMetaData; }
1114
namespace parquet::arrow { class FileReader; }
1215
namespace arrow { class Buffer; class RecordBatchReader;}
@@ -72,6 +75,7 @@ class ParquetBlockInputFormat : public IInputFormat
7275
is_stopped = 1;
7376
}
7477

78+
void applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group);
7579
void initializeIfNeeded();
7680
void initializeRowGroupBatchReader(size_t row_group_batch_idx);
7781

@@ -208,6 +212,7 @@ class ParquetBlockInputFormat : public IInputFormat
208212
size_t total_bytes_compressed = 0;
209213

210214
std::vector<int> row_groups_idxs;
215+
std::map<int, parquet::RowRangesPtr> row_ranges_map;
211216

212217
// These are only used by the decoding thread, so don't require locking the mutex.
213218
std::unique_ptr<parquet::arrow::FileReader> file_reader;

0 commit comments

Comments
 (0)