Skip to content

Commit e485a1c

Browse files
committed
[python] Add totalRecordCount and deltaRecordCount in Snapshot
1 parent e2e471e commit e485a1c

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

paimon-python/pypaimon/tests/writer_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,22 @@ def test_writer(self):
7373
self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/bucket-0"))
7474
self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*.avro")), 2)
7575
self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/bucket-0/*.parquet")), 1)
76+
77+
with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
78+
content = ''.join(file.readlines())
79+
self.assertTrue(content.__contains__('\"totalRecordCount\": 3'))
80+
self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))
81+
82+
write_builder = table.new_batch_write_builder()
83+
table_write = write_builder.new_write()
84+
table_commit = write_builder.new_commit()
85+
table_write.write_arrow(expect)
86+
commit_messages = table_write.prepare_commit()
87+
table_commit.commit(commit_messages)
88+
table_write.close()
89+
table_commit.close()
90+
91+
with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as file:
92+
content = ''.join(file.readlines())
93+
self.assertTrue(content.__contains__('\"totalRecordCount\": 6'))
94+
self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))

paimon-python/pypaimon/write/file_store_commit.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,17 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
5858
new_manifest_files = self.manifest_file_manager.write(commit_messages)
5959
if not new_manifest_files:
6060
return
61+
6162
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
63+
6264
existing_manifest_files = []
65+
record_count_add = self._generate_record_count_add(commit_messages)
66+
total_record_count = record_count_add
67+
6368
if latest_snapshot:
6469
existing_manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
70+
total_record_count += latest_snapshot.total_record_count
71+
6572
new_manifest_files.extend(existing_manifest_files)
6673
manifest_list = self.manifest_list_manager.write(new_manifest_files)
6774

@@ -72,6 +79,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
7279
schema_id=0,
7380
base_manifest_list=manifest_list,
7481
delta_manifest_list=manifest_list,
82+
total_record_count=total_record_count,
83+
delta_record_count=record_count_add,
7584
commit_user=self.commit_user,
7685
commit_identifier=commit_identifier,
7786
commit_kind="APPEND",
@@ -100,13 +109,17 @@ def overwrite(self, partition, commit_messages: List[CommitMessage], commit_iden
100109
# In overwrite mode, we don't merge with existing manifests
101110
manifest_list = self.manifest_list_manager.write(new_manifest_files)
102111

112+
record_count_add = self._generate_record_count_add(commit_messages)
113+
103114
new_snapshot_id = self._generate_snapshot_id()
104115
snapshot_data = Snapshot(
105116
version=3,
106117
id=new_snapshot_id,
107118
schema_id=0,
108119
base_manifest_list=manifest_list,
109120
delta_manifest_list=manifest_list,
121+
total_record_count=record_count_add,
122+
delta_record_count=record_count_add,
110123
commit_user=self.commit_user,
111124
commit_identifier=commit_identifier,
112125
commit_kind="OVERWRITE",
@@ -234,3 +247,25 @@ def _generate_partition_statistics(self, commit_messages: List[CommitMessage]) -
234247
)
235248
for stats in partition_stats.values()
236249
]
250+
251+
def _generate_record_count_add(self, commit_messages: List[CommitMessage]) -> int:
252+
"""
253+
Generate record count add from commit messages.
254+
255+
This method follows the Java implementation pattern from
256+
org.apache.paimon.manifest.ManifestEntry.recordCountAdd().
257+
258+
Args:
259+
commit_messages: List of commit messages to analyze
260+
261+
Returns:
262+
Count of add record
263+
"""
264+
record_count = 0
265+
266+
for message in commit_messages:
267+
new_files = message.new_files()
268+
for file_meta in new_files:
269+
record_count += file_meta.row_count
270+
271+
return record_count

0 commit comments

Comments
 (0)