Skip to content

Commit 57d399f

Browse files
committed
[Python] Refactor BinaryRow to reuse keys and key fields
1 parent 61179c7 commit 57d399f

19 files changed

+50
-107
lines changed

paimon-python/pypaimon/common/predicate.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from pyarrow import dataset as pyarrow_dataset
2828

2929
from pypaimon.manifest.schema.simple_stats import SimpleStats
30-
from pypaimon.table.row.generic_row import GenericRow
3130
from pypaimon.table.row.internal_row import InternalRow
3231

3332

@@ -74,25 +73,15 @@ def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
7473
if self.method == 'or':
7574
return any(p.test_by_simple_stats(stat, row_count) for p in self.literals)
7675

77-
# Get null count using the mapped index
78-
null_count = stat.null_counts[self.index] if stat.null_counts and self.index < len(
79-
stat.null_counts) else 0
76+
null_count = stat.null_counts[self.index]
8077

8178
if self.method == 'isNull':
8279
return null_count is not None and null_count > 0
8380
if self.method == 'isNotNull':
8481
return null_count is None or row_count is None or null_count < row_count
8582

86-
if not isinstance(stat.min_values, GenericRow):
87-
# Parse field values using BinaryRow's direct field access by name
88-
min_value = stat.min_values.get_field(self.index)
89-
max_value = stat.max_values.get_field(self.index)
90-
else:
91-
# TODO transform partition to BinaryRow
92-
min_values = stat.min_values.to_dict()
93-
max_values = stat.max_values.to_dict()
94-
min_value = min_values[self.field]
95-
max_value = max_values[self.field]
83+
min_value = stat.min_values.get_field(self.index)
84+
max_value = stat.max_values.get_field(self.index)
9685

9786
if min_value is None or max_value is None or (null_count is not None and null_count == row_count):
9887
# invalid stats, skip validation

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ def __init__(self, table):
3838
self.table: FileStoreTable = table
3939
self.manifest_path = table.table_path / "manifest"
4040
self.file_io = table.file_io
41-
self.partition_key_fields = self.table.table_schema.get_partition_key_fields()
42-
self.primary_key_fields = self.table.table_schema.get_primary_key_fields()
43-
self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields()
41+
self.partition_keys_fields = self.table.partition_keys_fields
42+
self.primary_keys_fields = self.table.primary_keys_fields
43+
self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields
4444

4545
def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=True) -> List[ManifestEntry]:
4646
manifest_file_path = self.manifest_path / manifest_file_name
@@ -55,8 +55,8 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
5555
file_dict = dict(record['_FILE'])
5656
key_dict = dict(file_dict['_KEY_STATS'])
5757
key_stats = SimpleStats(
58-
min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_key_fields),
59-
max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_key_fields),
58+
min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_keys_fields),
59+
max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_keys_fields),
6060
null_counts=key_dict['_NULL_COUNTS'],
6161
)
6262

@@ -80,8 +80,8 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
8080
file_name=file_dict['_FILE_NAME'],
8181
file_size=file_dict['_FILE_SIZE'],
8282
row_count=file_dict['_ROW_COUNT'],
83-
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], self.trimmed_primary_key_fields),
84-
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], self.trimmed_primary_key_fields),
83+
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], self.trimmed_primary_keys_fields),
84+
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], self.trimmed_primary_keys_fields),
8585
key_stats=key_stats,
8686
value_stats=value_stats,
8787
min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
@@ -100,7 +100,7 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
100100
)
101101
entry = ManifestEntry(
102102
kind=record['_KIND'],
103-
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], self.partition_key_fields),
103+
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], self.partition_keys_fields),
104104
bucket=record['_BUCKET'],
105105
total_buckets=record['_TOTAL_BUCKETS'],
106106
file=file_meta

paimon-python/pypaimon/manifest/manifest_list_manager.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
2626
from pypaimon.manifest.schema.simple_stats import SimpleStats
2727
from pypaimon.snapshot.snapshot import Snapshot
28-
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
29-
GenericRowSerializer)
28+
from pypaimon.table.row.binary_row import BinaryRow
29+
from pypaimon.table.row.generic_row import GenericRowSerializer
3030

3131

3232
class ManifestListManager:
@@ -61,13 +61,13 @@ def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
6161
for record in reader:
6262
stats_dict = dict(record['_PARTITION_STATS'])
6363
partition_stats = SimpleStats(
64-
min_values=GenericRowDeserializer.from_bytes(
64+
min_values=BinaryRow(
6565
stats_dict['_MIN_VALUES'],
66-
self.table.table_schema.get_partition_key_fields()
66+
self.table.partition_keys_fields
6767
),
68-
max_values=GenericRowDeserializer.from_bytes(
68+
max_values=BinaryRow(
6969
stats_dict['_MAX_VALUES'],
70-
self.table.table_schema.get_partition_key_fields()
70+
self.table.partition_keys_fields
7171
),
7272
null_counts=stats_dict['_NULL_COUNTS'],
7373
)

paimon-python/pypaimon/manifest/schema/simple_stats.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
################################################################################
1818

1919
from dataclasses import dataclass
20-
from typing import List, Optional
20+
from typing import List
2121
from typing import ClassVar
2222

2323
from pypaimon.table.row.generic_row import GenericRow
@@ -28,7 +28,8 @@
2828
class SimpleStats:
2929
min_values: InternalRow
3030
max_values: InternalRow
31-
null_counts: Optional[List[int]]
31+
# TODO convert null counts to InternalArray
32+
null_counts: List[int]
3233

3334
_empty_stats: ClassVar[object] = None
3435

paimon-python/pypaimon/manifest/simple_stats_evolution.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ def evolution(self, stats: SimpleStats, row_count: Optional[int],
5959
null_counts = self._project_array(null_counts, dense_index_mapping)
6060

6161
if self.index_mapping is not None:
62-
# TODO support schema evolution
6362
min_values = self._project_row(min_values, self.index_mapping)
6463
max_values = self._project_row(max_values, self.index_mapping)
6564

paimon-python/pypaimon/manifest/simple_stats_evolutions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ def get_or_create(self, data_schema_id: int) -> SimpleStatsEvolution:
4040
if self.table_schema_id == data_schema_id:
4141
evolution = SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
4242
else:
43-
# TODO support schema evolution
4443
if self.table_fields is None:
4544
self.table_fields = self.table_data_fields
4645

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
4848
self.manifest_file_manager = ManifestFileManager(table)
4949

5050
self.primary_key_predicate = trim_and_transform_predicate(
51-
self.predicate, self.table.field_names, self.table.table_schema.get_trimmed_primary_keys())
51+
self.predicate, self.table.field_names, self.table.trimmed_primary_keys)
5252

5353
self.partition_key_predicate = trim_and_transform_predicate(
5454
self.predicate, self.table.field_names, self.table.partition_keys)

paimon-python/pypaimon/read/split_read.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataFi
6464
self.split = split
6565
self.value_arity = len(read_type)
6666

67-
self.trimmed_primary_key = self.table.table_schema.get_trimmed_primary_keys()
67+
self.trimmed_primary_key = self.table.trimmed_primary_keys
6868
self.read_fields = read_type
6969
if isinstance(self, MergeFileSplitRead):
7070
self.read_fields = self._create_key_value_fields(read_type)

paimon-python/pypaimon/schema/table_schema.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,3 @@ def copy(self, new_options: Optional[Dict[str, str]] = None) -> "TableSchema":
145145
comment=self.comment,
146146
time_millis=self.time_millis
147147
)
148-
149-
def get_primary_key_fields(self) -> List[DataField]:
150-
if not self.primary_keys:
151-
return []
152-
field_map = {field.name: field for field in self.fields}
153-
return [field_map[name] for name in self.primary_keys if name in field_map]
154-
155-
def get_partition_key_fields(self) -> List[DataField]:
156-
if not self.partition_keys:
157-
return []
158-
field_map = {field.name: field for field in self.fields}
159-
return [field_map[name] for name in self.partition_keys if name in field_map]
160-
161-
def get_trimmed_primary_key_fields(self) -> List[DataField]:
162-
if not self.primary_keys or not self.partition_keys:
163-
return self.get_primary_key_fields()
164-
adjusted = [pk for pk in self.primary_keys if pk not in self.partition_keys]
165-
# Validate that filtered list is not empty
166-
if not adjusted:
167-
raise ValueError(
168-
f"Primary key constraint {self.primary_keys} "
169-
f"should not be same with partition fields {self.partition_keys}, "
170-
"this will result in only one record in a partition")
171-
field_map = {field.name: field for field in self.fields}
172-
return [field_map[name] for name in adjusted if name in field_map]
173-
174-
def get_trimmed_primary_keys(self) -> List[str]:
175-
return [field.name for field in self.get_trimmed_primary_key_fields()]

paimon-python/pypaimon/table/file_store_table.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ def __init__(self, file_io: FileIO, identifier: Identifier, table_path: Path,
4949
self.field_names = [field.name for field in table_schema.fields]
5050
self.field_dict = {field.name: field for field in self.fields}
5151
self.primary_keys = table_schema.primary_keys
52+
self.primary_keys_fields = [self.field_dict[name] for name in self.primary_keys]
5253
self.partition_keys = table_schema.partition_keys
54+
self.partition_keys_fields = [self.field_dict[name] for name in self.partition_keys]
55+
self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not in self.partition_keys]
56+
self.trimmed_primary_keys_fields = [self.field_dict[name] for name in self.trimmed_primary_keys]
57+
5358
self.options = table_schema.options
5459
self.cross_partition_update = self.table_schema.cross_partition_update()
5560
self.is_primary_key_table = bool(self.primary_keys)

0 commit comments

Comments
 (0)