Skip to content

Commit 26b3be4

Browse files
authored
Merge branch 'main' into feat/manifest-cache
2 parents 53d3a6f + 200a487 commit 26b3be4

10 files changed

Lines changed: 571 additions & 537 deletions

src/paimon/core/operation/file_store_scan.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ Result<std::shared_ptr<FileStoreScan::RawPlan>> FileStoreScan::CreatePlan() cons
169169
metrics_->SetCounter(ScanMetrics::LAST_SCANNED_SNAPSHOT_ID,
170170
snapshot.has_value() ? snapshot.value().Id() : int64_t{0});
171171
metrics_->SetCounter(ScanMetrics::LAST_SCANNED_MANIFESTS, filtered_manifest_file_metas.size());
172-
metrics_->SetCounter(ScanMetrics::LAST_SCAN_SKIPPED_TABLE_FILES,
173-
all_data_files - manifest_entries.size());
172+
metrics_->SetCounter(
173+
ScanMetrics::LAST_SCAN_SKIPPED_TABLE_FILES,
174+
std::max(int64_t{0}, all_data_files - static_cast<int64_t>(manifest_entries.size())));
174175
metrics_->SetCounter(ScanMetrics::LAST_SCAN_RESULTED_TABLE_FILES, manifest_entries.size());
175176
return std::make_shared<FileStoreScan::RawPlan>(scan_mode_, snapshot,
176177
std::move(manifest_entries));
@@ -219,7 +220,7 @@ Status FileStoreScan::ReadFileEntries(const std::vector<ManifestFileMeta>& manif
219220
std::vector<ManifestEntry>* manifest_entries) const {
220221
std::vector<std::future<Result<std::vector<ManifestEntry>>>> futures;
221222
for (const auto& meta : manifest_metas) {
222-
auto read_meta_task = [this, &meta]() -> Result<std::vector<ManifestEntry>> {
223+
auto read_meta_task = [this, meta]() -> Result<std::vector<ManifestEntry>> {
223224
std::vector<ManifestEntry> tmp_entries;
224225
PAIMON_RETURN_NOT_OK(ReadManifestFileMeta(meta, &tmp_entries));
225226
return tmp_entries;

src/paimon/format/parquet/file_reader_wrapper.cpp

Lines changed: 247 additions & 283 deletions
Large diffs are not rendered by default.

src/paimon/format/parquet/file_reader_wrapper.h

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ class FileReaderWrapper {
5555
~FileReaderWrapper();
5656

5757
static Result<std::unique_ptr<FileReaderWrapper>> Create(
58-
std::unique_ptr<::parquet::arrow::FileReader>&& reader, ::arrow::MemoryPool* pool,
59-
int64_t batch_size);
58+
std::unique_ptr<::parquet::arrow::FileReader>&& reader, int64_t batch_size,
59+
std::shared_ptr<arrow::MemoryPool> pool);
6060

6161
/// Seek to the specified row number.
6262
/// @param row_number The row to seek to (must be at a row group boundary).
@@ -87,7 +87,7 @@ class FileReaderWrapper {
8787
}
8888

8989
/// Get the underlying Parquet file reader.
90-
::parquet::arrow::FileReader* GetFileReader() const {
90+
::parquet::arrow::FileReader* GetFileReader() {
9191
return file_reader_.get();
9292
}
9393

@@ -109,24 +109,18 @@ class FileReaderWrapper {
109109

110110
/// Prepare for lazy reading of the specified row groups and columns.
111111
/// Actual reader initialization is deferred until the first Next() call.
112-
Status PrepareForReadingLazy(const std::set<int32_t>& row_group_indices,
112+
Status PrepareForReadingLazy(const std::vector<TargetRowGroup>& target_row_groups,
113113
const std::vector<int32_t>& column_indices);
114114

115115
/// Prepare for immediate reading of the specified row groups and columns.
116116
/// Initializes the reader and starts pre-buffering I/O.
117-
Status PrepareForReading(const std::set<int32_t>& row_group_indices,
117+
Status PrepareForReading(const std::vector<TargetRowGroup>& target_row_groups,
118118
const std::vector<int32_t>& column_indices);
119119

120-
/// Filter row groups by read ranges, returning only those that overlap.
121-
Result<std::set<int32_t>> FilterRowGroupsByReadRanges(
122-
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
123-
const std::vector<int32_t>& src_row_groups) const;
124-
125-
/// Set per-row-group RowRanges for page-level filtering.
126-
/// Only partially matched row groups should have entries.
127-
void SetRowGroupRowRanges(const std::map<int32_t, RowRanges>& ranges) {
128-
row_group_row_ranges_ = ranges;
129-
}
120+
/// Apply read ranges to the current target_row_groups_, keeping only those
121+
/// whose row-group range is equal to one of the given read ranges.
122+
/// Resets reader state so that the next Next() call will re-initialize.
123+
Status ApplyReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges);
130124

131125
/// Get the page index reader for the file.
132126
/// Returns nullptr if page index is not available.
@@ -144,21 +138,38 @@ class FileReaderWrapper {
144138
private:
145139
FileReaderWrapper(std::unique_ptr<::parquet::arrow::FileReader>&& file_reader,
146140
const std::vector<std::pair<uint64_t, uint64_t>>& all_row_group_ranges,
147-
uint64_t num_rows, ::arrow::MemoryPool* pool, int64_t batch_size);
141+
uint64_t num_rows, int64_t batch_size,
142+
std::shared_ptr<::arrow::MemoryPool> pool);
143+
144+
/// Wait for all pending PreBuffer operations to complete.
145+
void WaitForPendingPreBuffer();
146+
147+
/// Advance current_row_group_idx_ to the next row group and update next_row_to_read_.
148+
void AdvanceToNextRowGroup();
149+
150+
/// Read next batch from a page-filtered row group. Returns nullptr when the RG is exhausted.
151+
Result<std::shared_ptr<arrow::RecordBatch>> NextPageFiltered();
148152

149-
Result<std::set<int32_t>> ReadRangesToRowGroupIds(
150-
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const;
151-
Result<int32_t> GetRowGroupId(std::pair<uint64_t, uint64_t> target_range) const;
153+
/// Read next batch from the fully-matched batch_reader_. Returns nullptr when exhausted.
154+
Result<std::shared_ptr<arrow::RecordBatch>> NextFullyMatched();
155+
156+
/// Build page_filtered_read_schema_ from the given column indices. No-op if already built.
157+
Status BuildPageFilteredSchema(const std::vector<int32_t>& column_indices);
158+
159+
/// Collect all byte ranges that need pre-buffering (page-filtered + fully-matched).
160+
std::vector<::arrow::io::ReadRange> CollectPreBufferRanges(
161+
const std::vector<int32_t>& column_indices);
162+
163+
/// Dispatch a single PreBufferRanges call with merged ranges.
164+
void DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges);
152165

153166
std::unique_ptr<::parquet::arrow::FileReader> file_reader_;
154167
std::unique_ptr<arrow::RecordBatchReader> batch_reader_;
155168

156169
std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges_;
157-
std::set<int32_t> target_row_group_indices_;
158-
std::vector<std::pair<uint64_t, uint64_t>> target_row_groups_;
159170
std::vector<int32_t> target_column_indices_;
160171

161-
::arrow::MemoryPool* pool_;
172+
std::shared_ptr<::arrow::MemoryPool> pool_;
162173
int64_t batch_size_; // 0 means no limit
163174

164175
const uint64_t num_rows_;
@@ -175,13 +186,8 @@ class FileReaderWrapper {
175186
RowRanges current_filtered_row_ranges_; // RowRanges for the active page-filtered RG
176187
uint64_t current_filtered_rg_start_ = 0; // Absolute row-group start row number
177188

178-
// Page-level filtering state. Externally injected via SetRowGroupRowRanges and
179-
// looked up by row group index when entering a page-filtered RG.
180-
std::map<int32_t, RowRanges> row_group_row_ranges_;
181-
182-
// Set of target_row_groups_ positional indices that use page-filtered reading.
183-
// Built in PrepareForReading from row_group_row_ranges_.
184-
std::set<uint64_t> page_filtered_indices_;
189+
// Target row groups with row ranges for none page-level filtering and page-level filtering
190+
std::vector<TargetRowGroup> target_row_groups_;
185191

186192
// Arrow schema covering target_column_indices_, used when constructing the per-RG
187193
// page-filtered reader. Cached in PrepareForReading because it's identical across
@@ -190,9 +196,6 @@ class FileReaderWrapper {
190196

191197
// Track pre-buffered ranges so we can wait on destruction
192198
std::vector<::arrow::io::ReadRange> prebuffered_ranges_;
193-
194-
/// Wait for all pending PreBuffer operations to complete.
195-
void WaitForPendingPreBuffer();
196199
};
197200

198201
} // namespace paimon::parquet

src/paimon/format/parquet/file_reader_wrapper_test.cpp

Lines changed: 95 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ class FileReaderWrapperTest : public ::testing::Test {
135135
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(arrow_pool_.get())
136136
->properties(arrow_reader_props)
137137
->Build(&file_reader));
138-
return FileReaderWrapper::Create(std::move(file_reader), ::arrow::default_memory_pool(),
139-
wrapper_batch_size);
138+
return FileReaderWrapper::Create(std::move(file_reader), wrapper_batch_size, arrow_pool_);
140139
}
141140

142141
void PrepareParquetFile(const std::string& file_path, int32_t row_count,
@@ -196,8 +195,9 @@ TEST_F(FileReaderWrapperTest, EmptyFile) {
196195
}
197196

198197
TEST_F(FileReaderWrapperTest, NullFileReader) {
199-
ASSERT_NOK_WITH_MSG(FileReaderWrapper::Create(nullptr, ::arrow::default_memory_pool(),
200-
/*batch_size=*/0),
198+
ASSERT_NOK_WITH_MSG(FileReaderWrapper::Create(nullptr,
199+
/*batch_size=*/0,
200+
/*pool=*/arrow_pool_),
201201
"file reader wrapper create failed. file reader is nullptr");
202202
}
203203

@@ -261,11 +261,11 @@ TEST_F(FileReaderWrapperTest, PageFilteredZeroBatchSizeDoesNotHang) {
261261
// contiguous ranges keep the test honest about RowRanges semantics; the actual
262262
// numbers don't matter as long as their total falls inside the row group.
263263
RowRanges rr({RowRanges::Range(0, 49), RowRanges::Range(100, 149)});
264-
reader_wrapper->SetRowGroupRowRanges({{0, rr}});
265264

266265
std::vector<int32_t> all_columns = {0, 1, 2};
267-
ASSERT_OK(reader_wrapper->PrepareForReading({0}, all_columns));
268-
266+
ASSERT_OK(reader_wrapper->PrepareForReading(
267+
{TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/true, /*ranges=*/rr)},
268+
all_columns));
269269
int64_t total = 0;
270270
int64_t batch_count = 0;
271271
while (true) {
@@ -295,10 +295,14 @@ TEST_F(FileReaderWrapperTest, SeekBackToConsumedPageFilteredRowGroup) {
295295
std::map<int32_t, RowRanges> row_ranges_map;
296296
row_ranges_map[0] = RowRanges(RowRanges::Range(10, 49));
297297
row_ranges_map[1] = RowRanges(RowRanges::Range(100, 149));
298-
reader_wrapper->SetRowGroupRowRanges(row_ranges_map);
299298

300299
std::vector<int32_t> all_columns = {0, 1, 2};
301-
ASSERT_OK(reader_wrapper->PrepareForReading({0, 1}, all_columns));
300+
ASSERT_OK(reader_wrapper->PrepareForReading(
301+
{TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/true,
302+
/*ranges=*/row_ranges_map[0]),
303+
TargetRowGroup(/*rg_index=*/1, /*is_partially_matched=*/true,
304+
/*ranges=*/row_ranges_map[1])},
305+
all_columns));
302306

303307
auto count_all_rows = [&](int64_t* out_total) {
304308
int64_t total = 0;
@@ -348,8 +352,9 @@ TEST_F(FileReaderWrapperTest, PageFilteredRespectsBatchSize) {
348352
for (int64_t batch_size : {int64_t{1}, int64_t{2}, int64_t{3}, int64_t{5}, int64_t{10}}) {
349353
SCOPED_TRACE("batch_size=" + std::to_string(batch_size));
350354
ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path, batch_size));
351-
reader_wrapper->SetRowGroupRowRanges({{0, rr}});
352-
ASSERT_OK(reader_wrapper->PrepareForReading({0}, {0, 1, 2}));
355+
ASSERT_OK(reader_wrapper->PrepareForReading(
356+
{TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/true, /*ranges=*/rr)},
357+
{0, 1, 2}));
353358

354359
int64_t total = 0;
355360
int64_t batch_count = 0;
@@ -380,45 +385,93 @@ TEST_F(FileReaderWrapperTest, GetRowGroupRanges) {
380385
ASSERT_TRUE(ranges.empty());
381386
}
382387

383-
TEST_F(FileReaderWrapperTest, ReadRangesToRowGroupIds) {
388+
TEST_F(FileReaderWrapperTest, ApplyReadRanges) {
384389
std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
385390
PrepareParquetFile(file_path, /*row_count=*/5500);
386391
ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
387-
std::set<int32_t> expected_row_group_ids = {0, 3, 5};
392+
393+
// Prepare with a subset of row groups: {0, 1, 2, 4, 5}
394+
std::vector<TargetRowGroup> initial_targets = {
395+
TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/false,
396+
/*ranges=*/RowRanges()),
397+
TargetRowGroup(/*rg_index=*/1, /*is_partially_matched=*/false,
398+
/*ranges=*/RowRanges()),
399+
TargetRowGroup(/*rg_index=*/2, /*is_partially_matched=*/false,
400+
/*ranges=*/RowRanges()),
401+
TargetRowGroup(/*rg_index=*/4, /*is_partially_matched=*/false,
402+
/*ranges=*/RowRanges()),
403+
TargetRowGroup(/*rg_index=*/5, /*is_partially_matched=*/false,
404+
/*ranges=*/RowRanges())};
405+
std::vector<int32_t> all_columns = {0, 1, 2};
406+
ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns));
407+
408+
// Apply read ranges that match RG 0, 3, 5. Only 0 and 5 are in initial targets.
388409
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
389410
{0, 1000}, {3000, 4000}, {5000, 5500}};
390-
ASSERT_OK_AND_ASSIGN(auto row_group_ids, reader_wrapper->ReadRangesToRowGroupIds(read_ranges));
391-
ASSERT_EQ(expected_row_group_ids, row_group_ids);
392-
std::vector<std::pair<uint64_t, uint64_t>> invalid_ranges = {
393-
{0, 1000}, {3000, 4000}, {5000, 5600}};
394-
ASSERT_NOK_WITH_MSG(reader_wrapper->ReadRangesToRowGroupIds(invalid_ranges),
395-
"not match with row group range bound");
396-
ASSERT_OK_AND_ASSIGN(row_group_ids, reader_wrapper->ReadRangesToRowGroupIds({}));
397-
ASSERT_TRUE(row_group_ids.empty());
411+
ASSERT_OK(reader_wrapper->ApplyReadRanges(read_ranges));
412+
413+
// Verify: reading should only produce rows from RG 0 (1000 rows) and RG 5 (500 rows).
414+
int64_t total_rows = 0;
415+
while (true) {
416+
ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next());
417+
if (!batch) {
418+
break;
419+
}
420+
total_rows += batch->num_rows();
421+
}
422+
ASSERT_EQ(1500, total_rows);
423+
424+
// Apply empty read ranges should result in no data.
425+
ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns));
426+
ASSERT_OK(reader_wrapper->ApplyReadRanges({}));
427+
ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next());
428+
ASSERT_FALSE(batch);
398429
}
399430

400-
TEST_F(FileReaderWrapperTest, FilterRowGroupsByReadRanges) {
431+
TEST_F(FileReaderWrapperTest, ApplyReadRangesWiderSecondCall) {
401432
std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
402433
PrepareParquetFile(file_path, /*row_count=*/5500);
403434
ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
404-
std::set<int32_t> expected_row_group_ids = {0, 5};
405-
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
406-
{0, 1000}, {3000, 4000}, {5000, 5500}};
407-
ASSERT_OK_AND_ASSIGN(auto row_group_ids,
408-
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {0, 1, 2, 4, 5}));
409-
ASSERT_EQ(expected_row_group_ids, row_group_ids);
410435

411-
ASSERT_OK_AND_ASSIGN(row_group_ids,
412-
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {}));
413-
ASSERT_TRUE(row_group_ids.empty());
436+
// Prepare with row groups: {0, 1, 2, 4, 5}
437+
std::vector<TargetRowGroup> initial_targets = {
438+
TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/false,
439+
/*ranges=*/RowRanges()),
440+
TargetRowGroup(/*rg_index=*/1, /*is_partially_matched=*/false,
441+
/*ranges=*/RowRanges()),
442+
TargetRowGroup(/*rg_index=*/2, /*is_partially_matched=*/false,
443+
/*ranges=*/RowRanges()),
444+
TargetRowGroup(/*rg_index=*/4, /*is_partially_matched=*/false,
445+
/*ranges=*/RowRanges()),
446+
TargetRowGroup(/*rg_index=*/5, /*is_partially_matched=*/false,
447+
/*ranges=*/RowRanges())};
448+
std::vector<int32_t> all_columns = {0, 1, 2};
449+
ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns));
450+
451+
// First ApplyReadRanges: narrow to RG 0 only.
452+
ASSERT_OK(reader_wrapper->ApplyReadRanges({{0, 1000}}));
453+
454+
// Second ApplyReadRanges: widen to RG 0, 1, 2. Previously excluded RG 1, 2 should restore.
455+
ASSERT_OK(reader_wrapper->ApplyReadRanges({{0, 1000}, {1000, 2000}, {2000, 3000}}));
456+
457+
// Verify: reading should produce rows from RG 0 + 1 + 2 = 3000 rows.
458+
int64_t total_rows = 0;
459+
while (true) {
460+
ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next());
461+
if (!batch) break;
462+
total_rows += batch->num_rows();
463+
}
464+
ASSERT_EQ(3000, total_rows);
414465
}
415466

416467
TEST_F(FileReaderWrapperTest, PrepareForReading) {
417468
std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
418469
PrepareParquetFile(file_path, /*row_count=*/5500);
419470
ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
420-
ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{1},
421-
/*column_indices=*/{0}));
471+
ASSERT_OK(reader_wrapper->PrepareForReading(
472+
/*target_row_groups=*/{TargetRowGroup(/*rg_index=*/1, /*is_partially_matched=*/false,
473+
/*ranges=*/RowRanges())},
474+
/*column_indices=*/{0}));
422475
// seek before actual read range
423476
ASSERT_OK(reader_wrapper->SeekToRow(0));
424477
ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead());
@@ -438,8 +491,12 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) {
438491
ASSERT_FALSE(record_batch);
439492

440493
// empty column indices
441-
ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{0, 1},
442-
/*column_indices=*/{}));
494+
ASSERT_OK(reader_wrapper->PrepareForReading(
495+
/*target_row_groups=*/{TargetRowGroup(/*rg_index=*/0, /*is_partially_matched=*/false,
496+
/*ranges=*/RowRanges()),
497+
TargetRowGroup(/*rg_index=*/1, /*is_partially_matched=*/false,
498+
/*ranges=*/RowRanges())},
499+
/*column_indices=*/{}));
443500
ASSERT_EQ(0, reader_wrapper->GetNextRowToRead());
444501
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
445502
reader_wrapper->GetPreviousBatchFirstRowNumber().value());
@@ -448,8 +505,9 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) {
448505
ASSERT_EQ(0, record_batch->num_columns());
449506

450507
// empty row group indices
451-
ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{},
452-
/*column_indices=*/{0}));
508+
ASSERT_OK(reader_wrapper->PrepareForReading(
509+
/*target_row_groups=*/{},
510+
/*column_indices=*/{0}));
453511
ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
454512
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
455513
reader_wrapper->GetPreviousBatchFirstRowNumber().value());

0 commit comments

Comments
 (0)