Skip to content

Commit bc38f5e

Browse files
committed
fix
1 parent 5938626 commit bc38f5e

File tree

4 files changed

+191
-17
lines changed

4 files changed

+191
-17
lines changed

docs/content/pypaimon/cli.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,6 @@ dt=2024-01-02,region=eu 300 524288 5 1704153
339339
dt=2024-01-03,region=us 200 262144 3 1704240000000 1704326400000 admin
340340
```
341341

342-
**Note:** Both filesystem and REST catalogs support listing partitions.
343-
344342
### Table Rename
345343

346344
Rename a table in the catalog. Both source and target must be specified in `database.table` format.

paimon-python/pypaimon/catalog/filesystem_catalog.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,6 @@ def commit_snapshot(
245245
raise NotImplementedError("This catalog does not support commit catalog")
246246

247247
def load_snapshot(self, identifier: Identifier):
248-
"""Load the snapshot of table identified by the given Identifier.
249-
250-
Args:
251-
identifier: Path of the table
252-
253-
Raises:
254-
NotImplementedError: FileSystemCatalog does not support version management
255-
"""
256248
raise NotImplementedError("Filesystem catalog does not support load_snapshot")
257249

258250
def list_partitions_paged(
@@ -318,13 +310,36 @@ def list_partitions_paged(
318310
total_buckets=len(stats['buckets']),
319311
))
320312

321-
# Apply pattern filter
313+
# Apply pattern filter with proper regex escaping
322314
if partition_name_pattern:
323315
import re
324-
regex = re.compile(partition_name_pattern.replace('*', '.*'))
316+
# Escape special regex chars except '*', then replace '*' with '.*'
317+
escaped_pattern = re.escape(partition_name_pattern).replace(r'\*', '.*')
318+
regex = re.compile(escaped_pattern)
325319
partitions = [
326320
p for p in partitions
327321
if regex.fullmatch(','.join(f'{k}={v}' for k, v in p.spec.items()))
328322
]
329323

330-
return PagedList(elements=partitions)
324+
# Sort partitions by name (partition spec string)
325+
partitions.sort(key=lambda p: ','.join(f'{k}={v}' for k, v in sorted(p.spec.items())))
326+
327+
# Apply pagination
328+
start_index = 0
329+
if page_token is not None:
330+
try:
331+
start_index = int(page_token)
332+
except ValueError:
333+
# Invalid token, start from beginning
334+
start_index = 0
335+
336+
end_index = len(partitions)
337+
if max_results is not None and max_results > 0:
338+
end_index = min(start_index + max_results, len(partitions))
339+
340+
result_partitions = partitions[start_index:end_index]
341+
next_page_token = None
342+
if max_results is not None and end_index < len(partitions):
343+
next_page_token = str(end_index)
344+
345+
return PagedList(elements=result_partitions, next_page_token=next_page_token)

paimon-python/pypaimon/tests/filesystem_catalog_test.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import unittest
2020
from unittest.mock import MagicMock
2121

22+
import pyarrow as pa
23+
2224
from pypaimon import CatalogFactory, Schema
2325
from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
2426
DatabaseNotExistException,
@@ -276,3 +278,162 @@ def test_get_database_propagates_exists_error(self):
276278

277279
# Restore original method
278280
filesystem_catalog.file_io.exists = original_exists
281+
282+
def _create_partitioned_table_with_data(self, catalog, identifier, partitions_data):
283+
"""Helper to create a partitioned table and write data for each partition.
284+
285+
Args:
286+
catalog: The catalog instance.
287+
identifier: Table identifier string (e.g. 'test_db.tbl').
288+
partitions_data: List of dicts, each with 'dt' and rows count.
289+
e.g. [{'dt': '2024-01-01', 'rows': 2}, {'dt': '2024-01-02', 'rows': 3}]
290+
"""
291+
pa_schema = pa.schema([
292+
('dt', pa.string()),
293+
('col1', pa.int32()),
294+
])
295+
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
296+
catalog.create_table(identifier, schema, True)
297+
table = catalog.get_table(identifier)
298+
299+
for part in partitions_data:
300+
write_builder = table.new_batch_write_builder()
301+
table_write = write_builder.new_write()
302+
table_commit = write_builder.new_commit()
303+
data = pa.Table.from_pydict({
304+
'dt': [part['dt']] * part['rows'],
305+
'col1': list(range(part['rows'])),
306+
}, schema=pa_schema)
307+
table_write.write_arrow(data)
308+
table_commit.commit(table_write.prepare_commit())
309+
table_write.close()
310+
table_commit.close()
311+
312+
def test_list_partitions_paged(self):
313+
"""Test list_partitions_paged with real data from manifest files."""
314+
catalog = CatalogFactory.create({"warehouse": self.warehouse})
315+
catalog.create_database("test_db", False)
316+
317+
identifier = "test_db.part_tbl"
318+
self._create_partitioned_table_with_data(catalog, identifier, [
319+
{'dt': '2024-01-03', 'rows': 3},
320+
{'dt': '2024-01-01', 'rows': 2},
321+
{'dt': '2024-01-02', 'rows': 5},
322+
])
323+
324+
# List all partitions
325+
result = catalog.list_partitions_paged(identifier)
326+
self.assertEqual(len(result.elements), 3)
327+
self.assertIsNone(result.next_page_token)
328+
329+
# Verify partitions are sorted by spec
330+
specs = [p.spec['dt'] for p in result.elements]
331+
self.assertEqual(specs, sorted(specs))
332+
333+
# Verify aggregated statistics
334+
part_map = {p.spec['dt']: p for p in result.elements}
335+
self.assertEqual(part_map['2024-01-01'].record_count, 2)
336+
self.assertEqual(part_map['2024-01-02'].record_count, 5)
337+
self.assertEqual(part_map['2024-01-03'].record_count, 3)
338+
for p in result.elements:
339+
self.assertGreater(p.file_size_in_bytes, 0)
340+
self.assertGreater(p.file_count, 0)
341+
self.assertGreater(p.last_file_creation_time, 0)
342+
343+
def test_list_partitions_paged_pagination(self):
344+
"""Test list_partitions_paged pagination with max_results and page_token."""
345+
catalog = CatalogFactory.create({"warehouse": self.warehouse})
346+
catalog.create_database("test_db", False)
347+
348+
identifier = "test_db.paged_tbl"
349+
self._create_partitioned_table_with_data(catalog, identifier, [
350+
{'dt': '2024-01-01', 'rows': 1},
351+
{'dt': '2024-01-02', 'rows': 1},
352+
{'dt': '2024-01-03', 'rows': 1},
353+
])
354+
355+
# First page: max_results=2
356+
page1 = catalog.list_partitions_paged(identifier, max_results=2)
357+
self.assertEqual(len(page1.elements), 2)
358+
self.assertIsNotNone(page1.next_page_token)
359+
360+
# Second page: use next_page_token
361+
page2 = catalog.list_partitions_paged(
362+
identifier, max_results=2, page_token=page1.next_page_token
363+
)
364+
self.assertEqual(len(page2.elements), 1)
365+
self.assertIsNone(page2.next_page_token)
366+
367+
# All specs across pages should cover all 3 partitions
368+
all_specs = [p.spec['dt'] for p in page1.elements + page2.elements]
369+
self.assertEqual(sorted(all_specs), ['2024-01-01', '2024-01-02', '2024-01-03'])
370+
371+
# max_results larger than total returns all
372+
result = catalog.list_partitions_paged(identifier, max_results=100)
373+
self.assertEqual(len(result.elements), 3)
374+
self.assertIsNone(result.next_page_token)
375+
376+
def test_list_partitions_paged_pattern(self):
377+
"""Test list_partitions_paged with partition_name_pattern filter."""
378+
catalog = CatalogFactory.create({"warehouse": self.warehouse})
379+
catalog.create_database("test_db", False)
380+
381+
identifier = "test_db.pattern_tbl"
382+
self._create_partitioned_table_with_data(catalog, identifier, [
383+
{'dt': '2024-01-01', 'rows': 1},
384+
{'dt': '2024-02-01', 'rows': 1},
385+
{'dt': '2024-02-15', 'rows': 1},
386+
])
387+
388+
# Exact match
389+
result = catalog.list_partitions_paged(
390+
identifier, partition_name_pattern='dt=2024-01-01'
391+
)
392+
self.assertEqual(len(result.elements), 1)
393+
self.assertEqual(result.elements[0].spec['dt'], '2024-01-01')
394+
395+
# Wildcard match
396+
result = catalog.list_partitions_paged(
397+
identifier, partition_name_pattern='dt=2024-02*'
398+
)
399+
self.assertEqual(len(result.elements), 2)
400+
specs = sorted(p.spec['dt'] for p in result.elements)
401+
self.assertEqual(specs, ['2024-02-01', '2024-02-15'])
402+
403+
# No match
404+
result = catalog.list_partitions_paged(
405+
identifier, partition_name_pattern='dt=2025*'
406+
)
407+
self.assertEqual(len(result.elements), 0)
408+
409+
def test_list_partitions_paged_empty(self):
410+
"""Test list_partitions_paged on a table with no data."""
411+
catalog = CatalogFactory.create({"warehouse": self.warehouse})
412+
catalog.create_database("test_db", False)
413+
414+
pa_schema = pa.schema([('dt', pa.string()), ('val', pa.int32())])
415+
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
416+
catalog.create_table('test_db.empty_tbl', schema, False)
417+
418+
result = catalog.list_partitions_paged('test_db.empty_tbl')
419+
self.assertEqual(len(result.elements), 0)
420+
self.assertIsNone(result.next_page_token)
421+
422+
def test_list_partitions_paged_invalid_token(self):
423+
"""Test list_partitions_paged with invalid page_token falls back to start."""
424+
catalog = CatalogFactory.create({"warehouse": self.warehouse})
425+
catalog.create_database("test_db", False)
426+
427+
identifier = "test_db.token_tbl"
428+
self._create_partitioned_table_with_data(catalog, identifier, [
429+
{'dt': '2024-01-01', 'rows': 1},
430+
{'dt': '2024-01-02', 'rows': 1},
431+
])
432+
433+
# Invalid page_token should fall back to start
434+
result = catalog.list_partitions_paged(
435+
identifier, max_results=1, page_token='invalid'
436+
)
437+
self.assertEqual(len(result.elements), 1)
438+
self.assertEqual(result.elements[0].spec['dt'], '2024-01-01')
439+
self.assertIsNotNone(result.next_page_token)

paimon-python/pypaimon/tests/rest/rest_server.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,8 @@ def _handle_table_resource(self, method: str, path_parts: List[str],
471471
return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404)
472472
return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404)
473473

474-
def _table_partitions_handle(self, method: str, identifier: Identifier,
475-
parameters: Dict[str, str]) -> Tuple[str, int]:
474+
def _table_partitions_handle(
475+
self, method: str, identifier: Identifier, parameters: Dict[str, str]) -> Tuple[str, int]:
476476
"""Handle table partitions listing"""
477477
if method != "GET":
478478
return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405)
@@ -1111,8 +1111,8 @@ def _generate_final_list_tables_response(self, parameters: Dict[str, str],
11111111

11121112
return self._mock_response(response, 200)
11131113

1114-
def _generate_final_list_partitions_response(self, parameters: Dict[str, str],
1115-
partitions: List[Partition]) -> Tuple[str, int]:
1114+
def _generate_final_list_partitions_response(
1115+
self, parameters: Dict[str, str], partitions: List[Partition]) -> Tuple[str, int]:
11161116
"""Generate final list partitions response"""
11171117
if partitions:
11181118
max_results = self._get_max_results(parameters)

0 commit comments

Comments
 (0)