Skip to content

Commit 0b10f5d

Browse files
committed
[python] Python commit snapshot should not contain None fields
1 parent f99ee47 commit 0b10f5d

File tree

4 files changed

+50
-9
lines changed

4 files changed

+50
-9
lines changed

paimon-python/pypaimon/common/json_util.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ def json_field(json_name: str, **kwargs):
2727
return field(metadata={"json_name": json_name}, **kwargs)
2828

2929

30+
def optional_json_field(json_name: str, json_include: str):
31+
"""Create a field with custom JSON name"""
32+
return field(metadata={"json_name": json_name, "json_include": json_include}, default=None)
33+
34+
3035
class JSON:
3136

3237
@staticmethod
@@ -55,6 +60,11 @@ def __to_dict(obj: Any) -> Dict[str, Any]:
5560
# Get custom JSON name from metadata
5661
json_name = field_info.metadata.get("json_name", field_info.name)
5762

63+
# Json include
64+
if field_value is None:
65+
if field_info.metadata.get("json_include", None) == "non_null":
66+
continue
67+
5868
# Handle nested objects
5969
if hasattr(field_value, "to_dict"):
6070
result[json_name] = field_value.to_dict()

paimon-python/pypaimon/snapshot/snapshot.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from dataclasses import dataclass
2020
from typing import Optional
2121

22-
from pypaimon.common.json_util import json_field
22+
from pypaimon.common.json_util import json_field, optional_json_field
2323

2424
BATCH_COMMIT_IDENTIFIER = 0x7fffffffffffffff
2525

@@ -39,9 +39,9 @@ class Snapshot:
3939
commit_kind: str = json_field("commitKind")
4040
time_millis: int = json_field("timeMillis")
4141
# Optional fields with defaults
42-
changelog_manifest_list: Optional[str] = json_field("changelogManifestList", default=None)
43-
index_manifest: Optional[str] = json_field("indexManifest", default=None)
44-
changelog_record_count: Optional[int] = json_field("changelogRecordCount", default=None)
45-
watermark: Optional[int] = json_field("watermark", default=None)
46-
statistics: Optional[str] = json_field("statistics", default=None)
47-
next_row_id: Optional[int] = json_field("nextRowId", default=None)
42+
changelog_manifest_list: Optional[str] = optional_json_field("changelogManifestList", "non_null")
43+
index_manifest: Optional[str] = optional_json_field("indexManifest", "non_null")
44+
changelog_record_count: Optional[int] = optional_json_field("changelogRecordCount", "non_null")
45+
watermark: Optional[int] = optional_json_field("watermark", "non_null")
46+
statistics: Optional[str] = optional_json_field("statistics", "non_null")
47+
next_row_id: Optional[int] = optional_json_field("nextRowId", "non_null")

paimon-python/pypaimon/snapshot/snapshot_manager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ def __init__(self, table):
3535
self.latest_file = f"{self.snapshot_dir}/LATEST"
3636

3737
def get_latest_snapshot(self) -> Optional[Snapshot]:
38+
snapshot_json = self.get_latest_snapshot_json()
39+
if snapshot_json is None:
40+
return None
41+
return JSON.from_json(snapshot_json, Snapshot)
42+
43+
def get_latest_snapshot_json(self) -> Optional[str]:
3844
if not self.file_io.exists(self.latest_file):
3945
return None
4046

@@ -45,8 +51,7 @@ def get_latest_snapshot(self) -> Optional[Snapshot]:
4551
if not self.file_io.exists(snapshot_file):
4652
return None
4753

48-
snapshot_content = self.file_io.read_file_utf8(snapshot_file)
49-
return JSON.from_json(snapshot_content, Snapshot)
54+
return self.file_io.read_file_utf8(snapshot_file)
5055

5156
def read_latest_file(self, max_retries: int = 5):
5257
"""

paimon-python/pypaimon/tests/write/table_write_test.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,32 @@ def setUpClass(cls):
5353
def tearDownClass(cls):
5454
shutil.rmtree(cls.tempdir, ignore_errors=True)
5555

56+
def test_write_snapshot(self):
57+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
58+
self.catalog.create_table('default.test_write_snapshot', schema, False)
59+
table = self.catalog.get_table('default.test_write_snapshot')
60+
write_builder = table.new_batch_write_builder()
61+
62+
# write
63+
table_write = write_builder.new_write()
64+
table_commit = write_builder.new_commit()
65+
table_write.write_arrow(self.expected)
66+
table_commit.commit(table_write.prepare_commit())
67+
table_write.close()
68+
table_commit.close()
69+
70+
# read
71+
read_builder = table.new_read_builder()
72+
table_read = read_builder.new_read()
73+
splits = read_builder.new_scan().plan().splits()
74+
actual = table_read.to_arrow(splits).sort_by('user_id')
75+
self.assertEqual(self.expected, actual)
76+
77+
# snapshot
78+
snapshot_json: str = table.snapshot_manager().get_latest_snapshot_json()
79+
self.assertEquals(True, snapshot_json.__contains__("baseManifestList"))
80+
self.assertEquals(False, snapshot_json.__contains__("nextRowId"))
81+
5682
def test_multi_prepare_commit_ao(self):
5783
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
5884
self.catalog.create_table('default.test_append_only_parquet', schema, False)

0 commit comments

Comments
 (0)