Skip to content

Commit 21b5fac

Browse files
authored
fix: small fixes for parquet reader including RowRanges, PreBuffer, ColumnIndexFilter and code cleanup (#330)
1 parent 9bf3b2f commit 21b5fac

7 files changed

Lines changed: 65 additions & 55 deletions

src/paimon/format/parquet/column_index_filter.cpp

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
9696
const auto& literals = leaf_predicate->Literals();
9797
FieldType field_type = leaf_predicate->GetFieldType();
9898

99+
if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL &&
100+
literals.empty()) {
101+
return Status::Invalid(
102+
fmt::format("predicate on column '{}' requires at least one literal", field_name));
103+
}
99104
std::vector<int32_t> matching_pages;
100105

101106
switch (function_type) {
@@ -106,37 +111,22 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
106111
matching_pages = FilterPagesByIsNotNull(column_index_ptr);
107112
break;
108113
case Function::Type::EQUAL:
109-
if (!literals.empty()) {
110-
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
111-
}
114+
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
112115
break;
113116
case Function::Type::NOT_EQUAL:
114-
if (!literals.empty()) {
115-
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
116-
}
117+
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
117118
break;
118119
case Function::Type::LESS_THAN:
119-
if (!literals.empty()) {
120-
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
121-
}
120+
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
122121
break;
123122
case Function::Type::LESS_OR_EQUAL:
124-
if (!literals.empty()) {
125-
matching_pages =
126-
FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
127-
}
123+
matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
128124
break;
129125
case Function::Type::GREATER_THAN:
130-
if (!literals.empty()) {
131-
matching_pages =
132-
FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
133-
}
126+
matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
134127
break;
135128
case Function::Type::GREATER_OR_EQUAL:
136-
if (!literals.empty()) {
137-
matching_pages =
138-
FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
139-
}
129+
matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
140130
break;
141131
case Function::Type::IN:
142132
matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type);

src/paimon/format/parquet/column_index_filter_test.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
#include "arrow/c/abi.h"
2727
#include "arrow/c/bridge.h"
2828
#include "gtest/gtest.h"
29+
#include "paimon/common/predicate/equal.h"
30+
#include "paimon/common/predicate/in.h"
31+
#include "paimon/common/predicate/leaf_predicate_impl.h"
2932
#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
3033
#include "paimon/common/utils/arrow/mem_utils.h"
3134
#include "paimon/defs.h"
@@ -480,4 +483,23 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) {
480483
EXPECT_EQ(row_group_row_count_, ranges.RowCount());
481484
}
482485

486+
/// Predicates other than IsNull/IsNotNull are not allowed without a literal.
487+
/// PredicateBuilder (public API) does not support constructing them without
488+
/// a literal, so the filter should return an error for this invalid input.
489+
TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsError) {
490+
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::Equal::Instance(), 0, "val",
491+
FieldType::INT, std::vector<Literal>());
492+
auto result = Filter(pred);
493+
EXPECT_FALSE(result.ok());
494+
}
495+
496+
/// Empty literals for IN predicate — same rule applies: non-IS_NULL/IS_NOT_NULL
497+
/// predicates without literals are invalid and should return an error.
498+
TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsError) {
499+
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::In::Instance(), 0, "val",
500+
FieldType::INT, std::vector<Literal>());
501+
auto result = Filter(pred);
502+
EXPECT_FALSE(result.ok());
503+
}
504+
483505
} // namespace paimon::parquet::test

src/paimon/format/parquet/file_reader_wrapper.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,13 @@
2626
#include "fmt/format.h"
2727
#include "paimon/format/parquet/column_index_filter.h"
2828
#include "paimon/format/parquet/page_filtered_row_group_reader.h"
29+
#include "paimon/format/parquet/parquet_format_defs.h"
2930
#include "paimon/macros.h"
3031
#include "parquet/arrow/reader.h"
3132
#include "parquet/file_reader.h"
3233
#include "parquet/metadata.h"
3334
#include "parquet/page_index.h"
3435

35-
// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
36-
// Status. Used as the trailing catch clauses of a try block in every public
37-
// method that calls into the parquet C++ API, so the read layer never throws.
38-
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
39-
catch (const std::exception& e) { \
40-
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
41-
} \
42-
catch (...) { \
43-
return Status::UnknownError((context), ": unknown error"); \
44-
}
45-
4636
namespace paimon::parquet {
4737

4838
namespace {

src/paimon/format/parquet/page_filtered_row_group_reader.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ Result<std::unique_ptr<arrow::RecordBatchReader>> PageFilteredRowGroupReader::Re
249249
// Pre-buffering failed, fall back to row-group level PreBuffer
250250
::arrow::io::IOContext io_ctx(pool);
251251
parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options);
252+
PAIMON_RETURN_NOT_OK_FROM_ARROW(
253+
parquet_reader->WhenBuffered(rg_vec, col_vec).status());
252254
}
253255
} else {
254256
PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status());

src/paimon/format/parquet/parquet_file_batch_reader.cpp

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,6 @@
4848
#include "parquet/arrow/reader.h"
4949
#include "parquet/properties.h"
5050

51-
// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
52-
// Status. Used as the trailing catch clauses of a try block in every public
53-
// method that calls into the parquet C++ API, so the read layer never throws.
54-
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
55-
catch (const std::exception& e) { \
56-
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
57-
} \
58-
catch (...) { \
59-
return Status::UnknownError((context), ": unknown error"); \
60-
}
61-
6251
namespace arrow {
6352
class MemoryPool;
6453
} // namespace arrow
@@ -159,18 +148,6 @@ Status ParquetFileBatchReader::SetReadSchema(
159148
}
160149
}
161150

162-
// Build column name to index map for page-level filtering.
163-
// For leaf columns, indices[0] is the correct leaf column index in Parquet.
164-
// For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,
165-
// but predicate pushdown only targets leaf columns with simple types, so indices[0]
166-
// is always the correct single leaf index for predicate evaluation.
167-
std::map<std::string, int32_t> column_name_to_index;
168-
for (const auto& [name, indices] : field_index_map) {
169-
if (!indices.empty()) {
170-
column_name_to_index[name] = indices[0];
171-
}
172-
}
173-
174151
std::vector<int32_t> row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups());
175152
if (predicate) {
176153
PAIMON_ASSIGN_OR_RAISE(row_groups,
@@ -188,6 +165,18 @@ Status ParquetFileBatchReader::SetReadSchema(
188165
OptionsUtils::GetValueFromMap<bool>(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER,
189166
DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER));
190167
if (enable_page_index_filter) {
168+
// Build column name to index map for page-level filtering.
169+
// For leaf columns, indices[0] is the correct leaf column index in Parquet.
170+
// For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,
171+
// but predicate pushdown only targets leaf columns with simple types, so indices[0]
172+
// is always the correct single leaf index for predicate evaluation.
173+
std::map<std::string, int32_t> column_name_to_index;
174+
for (const auto& [name, indices] : field_index_map) {
175+
if (!indices.empty()) {
176+
column_name_to_index[name] = indices[0];
177+
}
178+
}
179+
191180
PAIMON_ASSIGN_OR_RAISE(
192181
auto page_filter_result,
193182
FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups));

src/paimon/format/parquet/parquet_format_defs.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,22 @@
1919
#include <cstdint>
2020
#include <limits>
2121

22+
#include "fmt/format.h"
23+
#include "paimon/status.h"
24+
2225
namespace paimon::parquet {
2326

27+
// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
28+
// Status. Used as the trailing catch clauses of a try block in every public
29+
// method that calls into the parquet C++ API, so the read layer never throws.
30+
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
31+
catch (const std::exception& e) { \
32+
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
33+
} \
34+
catch (...) { \
35+
return Status::UnknownError(fmt::format("{}: unknown error", (context))); \
36+
}
37+
2438
// write
2539
static inline const char PARQUET_BLOCK_SIZE[] = "parquet.block.size";
2640
static inline const char PARQUET_PAGE_SIZE[] = "parquet.page.size";

src/paimon/format/parquet/row_ranges.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ class RowRanges {
4343
/// Creates a RowRanges from a list of ranges.
4444
explicit RowRanges(const std::vector<Range>& ranges) : ranges_(ranges) {}
4545

46+
/// Creates a RowRanges from a list of ranges, taking ownership of the vector.
47+
explicit RowRanges(std::vector<Range>&& ranges) : ranges_(std::move(ranges)) {}
48+
4649
/// Creates a RowRanges with a single range [0, row_count - 1].
4750
static RowRanges CreateSingle(int64_t row_count) {
4851
if (row_count <= 0) {

0 commit comments

Comments
 (0)