Skip to content

Commit 59c8de2

Browse files
committed
[python] Make FileStoreWrite.max_seq_numbers lazied
1 parent fd413d7 commit 59c8de2

File tree

1 file changed

+25
-26
lines changed

1 file changed

+25
-26
lines changed

paimon-python/pypaimon/write/file_store_write.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pypaimon.write.writer.data_blob_writer import DataBlobWriter
2525
from pypaimon.write.writer.data_writer import DataWriter
2626
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
27+
from pypaimon.table.bucket_mode import BucketMode
2728

2829

2930
class FileStoreWrite:
@@ -34,7 +35,7 @@ def __init__(self, table):
3435

3536
self.table: FileStoreTable = table
3637
self.data_writers: Dict[Tuple, DataWriter] = {}
37-
self.max_seq_numbers = self._seq_number_stats() # TODO: build this on-demand instead of on all
38+
self.max_seq_numbers: dict = {}
3839
self.write_cols = None
3940

4041
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
@@ -45,27 +46,29 @@ def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
4546
writer.write(data)
4647

4748
def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
49+
def max_seq_number():
50+
return self._seq_number_stats(partition).get(bucket, 1)
4851
# Check if table has blob columns
4952
if self._has_blob_columns():
5053
return DataBlobWriter(
5154
table=self.table,
5255
partition=partition,
5356
bucket=bucket,
54-
max_seq_number=self.max_seq_numbers.get((partition, bucket), 1),
57+
max_seq_number=0,
5558
)
5659
elif self.table.is_primary_key_table:
5760
return KeyValueDataWriter(
5861
table=self.table,
5962
partition=partition,
6063
bucket=bucket,
61-
max_seq_number=self.max_seq_numbers.get((partition, bucket), 1),
62-
)
64+
max_seq_number=max_seq_number())
6365
else:
66+
seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number()
6467
return AppendOnlyDataWriter(
6568
table=self.table,
6669
partition=partition,
6770
bucket=bucket,
68-
max_seq_number=self.max_seq_numbers.get((partition, bucket), 1),
71+
max_seq_number=seq_number,
6972
write_cols=self.write_cols
7073
)
7174

@@ -99,32 +102,28 @@ def close(self):
99102
writer.close()
100103
self.data_writers.clear()
101104

102-
def _seq_number_stats(self) -> dict:
103-
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
104-
from pypaimon.manifest.manifest_list_manager import ManifestListManager
105-
from pypaimon.snapshot.snapshot_manager import SnapshotManager
106-
107-
snapshot_manager = SnapshotManager(self.table)
108-
manifest_list_manager = ManifestListManager(self.table)
109-
manifest_file_manager = ManifestFileManager(self.table)
105+
def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
106+
buckets = self.max_seq_numbers.get(partition)
107+
if buckets is None:
108+
buckets = self._load_seq_number_stats(partition)
109+
self.max_seq_numbers[partition] = buckets
110+
return buckets
110111

111-
latest_snapshot = snapshot_manager.get_latest_snapshot()
112-
if not latest_snapshot:
113-
return {}
114-
manifest_files = manifest_list_manager.read_all(latest_snapshot)
112+
def _load_seq_number_stats(self, partition: Tuple) -> dict:
113+
read_builder = self.table.new_read_builder()
114+
predicate_builder = read_builder.new_predicate_builder()
115+
sub_predicates = []
116+
for key, value in zip(self.table.partition_keys, partition):
117+
sub_predicates.append(predicate_builder.equal(key, value))
118+
partition_filter = predicate_builder.and_predicates(sub_predicates)
115119

116-
file_entries = []
117-
for manifest_file in manifest_files:
118-
manifest_entries = manifest_file_manager.read(manifest_file.file_name)
119-
for entry in manifest_entries:
120-
if entry.kind == 0:
121-
file_entries.append(entry)
120+
scan = read_builder.with_filter(partition_filter).new_scan()
121+
file_entries = scan.plan_files()
122122

123123
max_seq_numbers = {}
124124
for entry in file_entries:
125-
partition_key = (tuple(entry.partition.values), entry.bucket)
126125
current_seq_num = entry.file.max_sequence_number
127-
existing_max = max_seq_numbers.get(partition_key, -1)
126+
existing_max = max_seq_numbers.get(entry.bucket, -1)
128127
if current_seq_num > existing_max:
129-
max_seq_numbers[partition_key] = current_seq_num
128+
max_seq_numbers[entry.bucket] = current_seq_num
130129
return max_seq_numbers

0 commit comments

Comments
 (0)