@@ -135,8 +135,7 @@ class FileReaderWrapperTest : public ::testing::Test {
135135 PAIMON_RETURN_NOT_OK_FROM_ARROW (file_reader_builder.memory_pool (arrow_pool_.get ())
136136 ->properties (arrow_reader_props)
137137 ->Build (&file_reader));
138- return FileReaderWrapper::Create (std::move (file_reader), ::arrow::default_memory_pool (),
139- wrapper_batch_size);
138+ return FileReaderWrapper::Create (std::move (file_reader), wrapper_batch_size, arrow_pool_);
140139 }
141140
142141 void PrepareParquetFile (const std::string& file_path, int32_t row_count,
@@ -196,8 +195,9 @@ TEST_F(FileReaderWrapperTest, EmptyFile) {
196195}
197196
198197TEST_F (FileReaderWrapperTest, NullFileReader) {
199- ASSERT_NOK_WITH_MSG (FileReaderWrapper::Create (nullptr , ::arrow::default_memory_pool (),
200- /* batch_size=*/ 0 ),
198+ ASSERT_NOK_WITH_MSG (FileReaderWrapper::Create (nullptr ,
199+ /* batch_size=*/ 0 ,
200+ /* pool=*/ arrow_pool_),
201201 " file reader wrapper create failed. file reader is nullptr" );
202202}
203203
@@ -261,11 +261,11 @@ TEST_F(FileReaderWrapperTest, PageFilteredZeroBatchSizeDoesNotHang) {
261261 // contiguous ranges keep the test honest about RowRanges semantics; the actual
262262 // numbers don't matter as long as their total falls inside the row group.
263263 RowRanges rr ({RowRanges::Range (0 , 49 ), RowRanges::Range (100 , 149 )});
264- reader_wrapper->SetRowGroupRowRanges ({{0 , rr}});
265264
266265 std::vector<int32_t > all_columns = {0 , 1 , 2 };
267- ASSERT_OK (reader_wrapper->PrepareForReading ({0 }, all_columns));
268-
266+ ASSERT_OK (reader_wrapper->PrepareForReading (
267+ {TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ true , /* ranges=*/ rr)},
268+ all_columns));
269269 int64_t total = 0 ;
270270 int64_t batch_count = 0 ;
271271 while (true ) {
@@ -295,10 +295,14 @@ TEST_F(FileReaderWrapperTest, SeekBackToConsumedPageFilteredRowGroup) {
295295 std::map<int32_t , RowRanges> row_ranges_map;
296296 row_ranges_map[0 ] = RowRanges (RowRanges::Range (10 , 49 ));
297297 row_ranges_map[1 ] = RowRanges (RowRanges::Range (100 , 149 ));
298- reader_wrapper->SetRowGroupRowRanges (row_ranges_map);
299298
300299 std::vector<int32_t > all_columns = {0 , 1 , 2 };
301- ASSERT_OK (reader_wrapper->PrepareForReading ({0 , 1 }, all_columns));
300+ ASSERT_OK (reader_wrapper->PrepareForReading (
301+ {TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ true ,
302+ /* ranges=*/ row_ranges_map[0 ]),
303+ TargetRowGroup (/* rg_index=*/ 1 , /* is_partially_matched=*/ true ,
304+ /* ranges=*/ row_ranges_map[1 ])},
305+ all_columns));
302306
303307 auto count_all_rows = [&](int64_t * out_total) {
304308 int64_t total = 0 ;
@@ -348,8 +352,9 @@ TEST_F(FileReaderWrapperTest, PageFilteredRespectsBatchSize) {
348352 for (int64_t batch_size : {int64_t {1 }, int64_t {2 }, int64_t {3 }, int64_t {5 }, int64_t {10 }}) {
349353 SCOPED_TRACE (" batch_size=" + std::to_string (batch_size));
350354 ASSERT_OK_AND_ASSIGN (auto reader_wrapper, PrepareReaderWrapper (file_path, batch_size));
351- reader_wrapper->SetRowGroupRowRanges ({{0 , rr}});
352- ASSERT_OK (reader_wrapper->PrepareForReading ({0 }, {0 , 1 , 2 }));
355+ ASSERT_OK (reader_wrapper->PrepareForReading (
356+ {TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ true , /* ranges=*/ rr)},
357+ {0 , 1 , 2 }));
353358
354359 int64_t total = 0 ;
355360 int64_t batch_count = 0 ;
@@ -380,45 +385,93 @@ TEST_F(FileReaderWrapperTest, GetRowGroupRanges) {
380385 ASSERT_TRUE (ranges.empty ());
381386}
382387
383- TEST_F (FileReaderWrapperTest, ReadRangesToRowGroupIds ) {
388+ TEST_F (FileReaderWrapperTest, ApplyReadRanges ) {
384389 std::string file_path = PathUtil::JoinPath (dir_->Str (), " test.parquet" );
385390 PrepareParquetFile (file_path, /* row_count=*/ 5500 );
386391 ASSERT_OK_AND_ASSIGN (auto reader_wrapper, PrepareReaderWrapper (file_path));
387- std::set<int32_t > expected_row_group_ids = {0 , 3 , 5 };
392+
393+ // Prepare with a subset of row groups: {0, 1, 2, 4, 5}
394+ std::vector<TargetRowGroup> initial_targets = {
395+ TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ false ,
396+ /* ranges=*/ RowRanges ()),
397+ TargetRowGroup (/* rg_index=*/ 1 , /* is_partially_matched=*/ false ,
398+ /* ranges=*/ RowRanges ()),
399+ TargetRowGroup (/* rg_index=*/ 2 , /* is_partially_matched=*/ false ,
400+ /* ranges=*/ RowRanges ()),
401+ TargetRowGroup (/* rg_index=*/ 4 , /* is_partially_matched=*/ false ,
402+ /* ranges=*/ RowRanges ()),
403+ TargetRowGroup (/* rg_index=*/ 5 , /* is_partially_matched=*/ false ,
404+ /* ranges=*/ RowRanges ())};
405+ std::vector<int32_t > all_columns = {0 , 1 , 2 };
406+ ASSERT_OK (reader_wrapper->PrepareForReadingLazy (initial_targets, all_columns));
407+
408+ // Apply read ranges that match RG 0, 3, 5. Only 0 and 5 are in initial targets.
388409 std::vector<std::pair<uint64_t , uint64_t >> read_ranges = {
389410 {0 , 1000 }, {3000 , 4000 }, {5000 , 5500 }};
390- ASSERT_OK_AND_ASSIGN (auto row_group_ids, reader_wrapper->ReadRangesToRowGroupIds (read_ranges));
391- ASSERT_EQ (expected_row_group_ids, row_group_ids);
392- std::vector<std::pair<uint64_t , uint64_t >> invalid_ranges = {
393- {0 , 1000 }, {3000 , 4000 }, {5000 , 5600 }};
394- ASSERT_NOK_WITH_MSG (reader_wrapper->ReadRangesToRowGroupIds (invalid_ranges),
395- " not match with row group range bound" );
396- ASSERT_OK_AND_ASSIGN (row_group_ids, reader_wrapper->ReadRangesToRowGroupIds ({}));
397- ASSERT_TRUE (row_group_ids.empty ());
411+ ASSERT_OK (reader_wrapper->ApplyReadRanges (read_ranges));
412+
413+ // Verify: reading should only produce rows from RG 0 (1000 rows) and RG 5 (500 rows).
414+ int64_t total_rows = 0 ;
415+ while (true ) {
416+ ASSERT_OK_AND_ASSIGN (auto batch, reader_wrapper->Next ());
417+ if (!batch) {
418+ break ;
419+ }
420+ total_rows += batch->num_rows ();
421+ }
422+ ASSERT_EQ (1500 , total_rows);
423+
424+ // Apply empty read ranges should result in no data.
425+ ASSERT_OK (reader_wrapper->PrepareForReadingLazy (initial_targets, all_columns));
426+ ASSERT_OK (reader_wrapper->ApplyReadRanges ({}));
427+ ASSERT_OK_AND_ASSIGN (auto batch, reader_wrapper->Next ());
428+ ASSERT_FALSE (batch);
398429}
399430
400- TEST_F (FileReaderWrapperTest, FilterRowGroupsByReadRanges ) {
431+ TEST_F (FileReaderWrapperTest, ApplyReadRangesWiderSecondCall ) {
401432 std::string file_path = PathUtil::JoinPath (dir_->Str (), " test.parquet" );
402433 PrepareParquetFile (file_path, /* row_count=*/ 5500 );
403434 ASSERT_OK_AND_ASSIGN (auto reader_wrapper, PrepareReaderWrapper (file_path));
404- std::set<int32_t > expected_row_group_ids = {0 , 5 };
405- std::vector<std::pair<uint64_t , uint64_t >> read_ranges = {
406- {0 , 1000 }, {3000 , 4000 }, {5000 , 5500 }};
407- ASSERT_OK_AND_ASSIGN (auto row_group_ids,
408- reader_wrapper->FilterRowGroupsByReadRanges (read_ranges, {0 , 1 , 2 , 4 , 5 }));
409- ASSERT_EQ (expected_row_group_ids, row_group_ids);
410435
411- ASSERT_OK_AND_ASSIGN (row_group_ids,
412- reader_wrapper->FilterRowGroupsByReadRanges (read_ranges, {}));
413- ASSERT_TRUE (row_group_ids.empty ());
436+ // Prepare with row groups: {0, 1, 2, 4, 5}
437+ std::vector<TargetRowGroup> initial_targets = {
438+ TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ false ,
439+ /* ranges=*/ RowRanges ()),
440+ TargetRowGroup (/* rg_index=*/ 1 , /* is_partially_matched=*/ false ,
441+ /* ranges=*/ RowRanges ()),
442+ TargetRowGroup (/* rg_index=*/ 2 , /* is_partially_matched=*/ false ,
443+ /* ranges=*/ RowRanges ()),
444+ TargetRowGroup (/* rg_index=*/ 4 , /* is_partially_matched=*/ false ,
445+ /* ranges=*/ RowRanges ()),
446+ TargetRowGroup (/* rg_index=*/ 5 , /* is_partially_matched=*/ false ,
447+ /* ranges=*/ RowRanges ())};
448+ std::vector<int32_t > all_columns = {0 , 1 , 2 };
449+ ASSERT_OK (reader_wrapper->PrepareForReadingLazy (initial_targets, all_columns));
450+
451+ // First ApplyReadRanges: narrow to RG 0 only.
452+ ASSERT_OK (reader_wrapper->ApplyReadRanges ({{0 , 1000 }}));
453+
454+ // Second ApplyReadRanges: widen to RG 0, 1, 2. Previously excluded RG 1, 2 should restore.
455+ ASSERT_OK (reader_wrapper->ApplyReadRanges ({{0 , 1000 }, {1000 , 2000 }, {2000 , 3000 }}));
456+
457+ // Verify: reading should produce rows from RG 0 + 1 + 2 = 3000 rows.
458+ int64_t total_rows = 0 ;
459+ while (true ) {
460+ ASSERT_OK_AND_ASSIGN (auto batch, reader_wrapper->Next ());
461+ if (!batch) break ;
462+ total_rows += batch->num_rows ();
463+ }
464+ ASSERT_EQ (3000 , total_rows);
414465}
415466
416467TEST_F (FileReaderWrapperTest, PrepareForReading) {
417468 std::string file_path = PathUtil::JoinPath (dir_->Str (), " test.parquet" );
418469 PrepareParquetFile (file_path, /* row_count=*/ 5500 );
419470 ASSERT_OK_AND_ASSIGN (auto reader_wrapper, PrepareReaderWrapper (file_path));
420- ASSERT_OK (reader_wrapper->PrepareForReading (/* row_group_indices=*/ {1 },
421- /* column_indices=*/ {0 }));
471+ ASSERT_OK (reader_wrapper->PrepareForReading (
472+ /* target_row_groups=*/ {TargetRowGroup (/* rg_index=*/ 1 , /* is_partially_matched=*/ false ,
473+ /* ranges=*/ RowRanges ())},
474+ /* column_indices=*/ {0 }));
422475 // seek before actual read range
423476 ASSERT_OK (reader_wrapper->SeekToRow (0 ));
424477 ASSERT_EQ (1000 , reader_wrapper->GetNextRowToRead ());
@@ -438,8 +491,12 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) {
438491 ASSERT_FALSE (record_batch);
439492
440493 // empty column indices
441- ASSERT_OK (reader_wrapper->PrepareForReading (/* row_group_indices=*/ {0 , 1 },
442- /* column_indices=*/ {}));
494+ ASSERT_OK (reader_wrapper->PrepareForReading (
495+ /* target_row_groups=*/ {TargetRowGroup (/* rg_index=*/ 0 , /* is_partially_matched=*/ false ,
496+ /* ranges=*/ RowRanges ()),
497+ TargetRowGroup (/* rg_index=*/ 1 , /* is_partially_matched=*/ false ,
498+ /* ranges=*/ RowRanges ())},
499+ /* column_indices=*/ {}));
443500 ASSERT_EQ (0 , reader_wrapper->GetNextRowToRead ());
444501 ASSERT_EQ (std::numeric_limits<uint64_t >::max (),
445502 reader_wrapper->GetPreviousBatchFirstRowNumber ().value ());
@@ -448,8 +505,9 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) {
448505 ASSERT_EQ (0 , record_batch->num_columns ());
449506
450507 // empty row group indices
451- ASSERT_OK (reader_wrapper->PrepareForReading (/* row_group_indices=*/ {},
452- /* column_indices=*/ {0 }));
508+ ASSERT_OK (reader_wrapper->PrepareForReading (
509+ /* target_row_groups=*/ {},
510+ /* column_indices=*/ {0 }));
453511 ASSERT_EQ (5500 , reader_wrapper->GetNextRowToRead ());
454512 ASSERT_EQ (std::numeric_limits<uint64_t >::max (),
455513 reader_wrapper->GetPreviousBatchFirstRowNumber ().value ());
0 commit comments