|
25 | 25 | from pypaimon.common.predicate import Predicate |
26 | 26 | from pypaimon.deletionvectors import ApplyDeletionVectorReader |
27 | 27 | from pypaimon.deletionvectors.deletion_vector import DeletionVector |
| 28 | +from pypaimon.globalindex import Range |
28 | 29 | from pypaimon.manifest.schema.data_file_meta import DataFileMeta |
29 | 30 | from pypaimon.read.interval_partition import IntervalPartition, SortedRun |
30 | 31 | from pypaimon.read.partition_info import PartitionInfo |
|
44 | 45 | from pypaimon.read.reader.format_lance_reader import FormatLanceReader |
45 | 46 | from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader |
46 | 47 | from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader, |
47 | | - RowPositionReader) |
| 48 | + RowPositionReader, EmptyRecordBatchReader) |
48 | 49 | from pypaimon.read.reader.iface.record_reader import RecordReader |
49 | 50 | from pypaimon.read.reader.key_value_unwrap_reader import \ |
50 | 51 | KeyValueUnwrapRecordReader |
@@ -592,14 +593,19 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe |
592 | 593 |
|
593 | 594 | def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> Optional[RecordReader]: |
594 | 595 | """Create a file reader for a single file.""" |
595 | | - record_reader = self.file_reader_supplier( |
596 | | - file=file, |
597 | | - for_merge_read=False, |
598 | | - read_fields=read_fields, |
599 | | - row_tracking_enabled=True) |
| 596 | + def create_record_reader(): |
| 597 | + return self.file_reader_supplier( |
| 598 | + file=file, |
| 599 | + for_merge_read=False, |
| 600 | + read_fields=read_fields, |
| 601 | + row_tracking_enabled=True) |
600 | 602 | if self.row_ranges is None: |
601 | | - return record_reader |
602 | | - return RowIdFilterRecordBatchReader(record_reader, file.first_row_id, self.row_ranges) |
| 603 | + return create_record_reader() |
| 604 | + file_range = Range(file.first_row_id, file.first_row_id + file.row_count - 1) |
| 605 | + row_ranges = Range.and_(self.row_ranges, [file_range]) |
| 606 | + if len(row_ranges) == 0: |
| 607 | + return EmptyRecordBatchReader() |
| 608 | + return RowIdFilterRecordBatchReader(create_record_reader(), file.first_row_id, row_ranges) |
603 | 609 |
|
604 | 610 | def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: |
605 | 611 | """Split files into field bunches.""" |
|
0 commit comments