@@ -254,3 +254,77 @@ def load_snapshot(self, identifier: Identifier):
254254 NotImplementedError: FileSystemCatalog does not support version management
255255 """
256256 raise NotImplementedError ("Filesystem catalog does not support load_snapshot" )
257+
258+ def list_partitions_paged (
259+ self ,
260+ identifier : Union [str , Identifier ],
261+ max_results : Optional [int ] = None ,
262+ page_token : Optional [str ] = None ,
263+ partition_name_pattern : Optional [str ] = None ,
264+ ):
265+ from pypaimon .api .api_response import Partition , PagedList
266+ from pypaimon .manifest .manifest_list_manager import ManifestListManager
267+ from pypaimon .manifest .manifest_file_manager import ManifestFileManager
268+
269+ if not isinstance (identifier , Identifier ):
270+ identifier = Identifier .from_string (identifier )
271+
272+ table = self .get_table (identifier )
273+ snapshot = table .snapshot_manager ().get_latest_snapshot ()
274+ if snapshot is None :
275+ return PagedList (elements = [])
276+
277+ # Read all manifest entries (ADD - DELETE merged)
278+ manifest_list_manager = ManifestListManager (table )
279+ manifest_file_manager = ManifestFileManager (table )
280+ manifest_files = manifest_list_manager .read_all (snapshot )
281+ entries = manifest_file_manager .read_entries_parallel (manifest_files , drop_stats = True )
282+
283+ # Group entries by partition spec
284+ partition_map = {} # spec_key -> aggregated stats
285+ for entry in entries :
286+ spec = {field .name : str (v ) for field , v in
287+ zip (entry .partition .fields , entry .partition .values )}
288+ spec_key = tuple (sorted (spec .items ()))
289+
290+ if spec_key not in partition_map :
291+ partition_map [spec_key ] = {
292+ 'spec' : spec ,
293+ 'record_count' : 0 ,
294+ 'file_size_in_bytes' : 0 ,
295+ 'file_count' : 0 ,
296+ 'last_file_creation_time' : 0 ,
297+ 'buckets' : set (),
298+ }
299+ stats = partition_map [spec_key ]
300+ stats ['record_count' ] += entry .file .row_count
301+ stats ['file_size_in_bytes' ] += entry .file .file_size
302+ stats ['file_count' ] += 1
303+ if entry .file .creation_time is not None :
304+ ct = entry .file .creation_time .get_millisecond ()
305+ if ct > stats ['last_file_creation_time' ]:
306+ stats ['last_file_creation_time' ] = ct
307+ stats ['buckets' ].add (entry .bucket )
308+
309+ # Convert to Partition objects
310+ partitions = []
311+ for stats in partition_map .values ():
312+ partitions .append (Partition (
313+ spec = stats ['spec' ],
314+ record_count = stats ['record_count' ],
315+ file_size_in_bytes = stats ['file_size_in_bytes' ],
316+ file_count = stats ['file_count' ],
317+ last_file_creation_time = stats ['last_file_creation_time' ],
318+ total_buckets = len (stats ['buckets' ]),
319+ ))
320+
321+ # Apply pattern filter
322+ if partition_name_pattern :
323+ import re
324+ regex = re .compile (partition_name_pattern .replace ('*' , '.*' ))
325+ partitions = [
326+ p for p in partitions
327+ if regex .fullmatch (',' .join (f'{ k } ={ v } ' for k , v in p .spec .items ()))
328+ ]
329+
330+ return PagedList (elements = partitions )
0 commit comments