|
32 | 32 | from pypaimon.read.split import Split |
33 | 33 | from pypaimon.snapshot.snapshot_manager import SnapshotManager |
34 | 34 | from pypaimon.table.bucket_mode import BucketMode |
35 | | -from pypaimon.table.row.generic_row import GenericRow |
36 | | -from pypaimon.manifest.schema.simple_stats import SimpleStats |
37 | 35 |
|
38 | 36 |
|
39 | 37 | class FullStartingScanner(StartingScanner): |
@@ -77,28 +75,26 @@ def scan(self) -> Plan: |
77 | 75 | splits = self._apply_push_down_limit(splits) |
78 | 76 | return Plan(splits) |
79 | 77 |
|
80 | | - def _read_manifest_files(self) -> List[ManifestFileMeta]: |
| 78 | + def plan_files(self) -> List[ManifestEntry]: |
81 | 79 | latest_snapshot = self.snapshot_manager.get_latest_snapshot() |
82 | 80 | if not latest_snapshot: |
83 | 81 | return [] |
84 | 82 | manifest_files = self.manifest_list_manager.read_all(latest_snapshot) |
85 | | - return self.filter_manifest_files(manifest_files) |
| 83 | + return self.read_manifest_entries(manifest_files) |
86 | 84 |
|
87 | | - def filter_manifest_files(self, files: List[ManifestFileMeta]) -> List[ManifestFileMeta]: |
| 85 | + def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: |
88 | 86 | def filter_manifest_file(file: ManifestFileMeta) -> bool: |
89 | 87 | if not self.partition_key_predicate: |
90 | 88 | return True |
91 | 89 | return self.partition_key_predicate.test_by_simple_stats( |
92 | 90 | file.partition_stats, |
93 | 91 | file.num_added_files + file.num_deleted_files) |
94 | 92 |
|
95 | | - return [file for file in files if filter_manifest_file(file)] |
96 | | - |
97 | | - def plan_files(self) -> List[ManifestEntry]: |
98 | | - manifest_files = self._read_manifest_files() |
99 | 93 | deleted_entries = set() |
100 | 94 | added_entries = [] |
101 | 95 | for manifest_file in manifest_files: |
| 96 | + if not filter_manifest_file(manifest_file): |
| 97 | + continue |
102 | 98 | manifest_entries = self.manifest_file_manager.read( |
103 | 99 | manifest_file.file_name, |
104 | 100 | lambda row: self._filter_manifest_entry(row)) |
|
0 commit comments