Skip to content

Commit f9e1eb2

Browse files
committed
[python] Let Python write file without value stats by default
1 parent 7f1c329 commit f9e1eb2

File tree

3 files changed

+16
-48
lines changed

3 files changed

+16
-48
lines changed

paimon-python/pypaimon/manifest/schema/simple_stats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def empty_stats(cls):
3737
if cls._empty_stats is None:
3838
min_values = GenericRow([], [])
3939
max_values = GenericRow([], [])
40-
cls._empty_stats = cls(min_values, max_values, None)
40+
cls._empty_stats = cls(min_values, max_values, [])
4141
return cls._empty_stats
4242

4343

paimon-python/pypaimon/write/writer/data_blob_writer.py

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,15 @@ def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBa
196196

197197
return normal_data, blob_data
198198

199-
def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
199+
@staticmethod
200+
def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
200201
"""Process normal data (similar to base DataWriter)."""
201202
if data is None or data.num_rows == 0:
202203
return pa.Table.from_batches([])
203204
return pa.Table.from_batches([data])
204205

205-
def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
206+
@staticmethod
207+
def _merge_normal_data(existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
206208
return pa.concat_tables([existing_data, new_data])
207209

208210
def _should_roll_normal(self) -> bool:
@@ -243,7 +245,7 @@ def _close_current_writers(self):
243245
logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
244246
f"added {len(blob_metas)} blob file metadata after normal metadata")
245247

246-
def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
248+
def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]:
247249
if data.num_rows == 0:
248250
return None
249251

@@ -270,37 +272,15 @@ def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
270272

271273
def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
272274
external_path: Optional[str] = None) -> DataFileMeta:
273-
# Column stats (only for normal columns)
274-
column_stats = {
275-
field.name: self._get_column_stats(data, field.name)
276-
for field in self.table.table_schema.fields
277-
if field.name != self.blob_column_name
278-
}
279-
280-
# Get normal fields only
281-
normal_fields = [field for field in self.table.table_schema.fields
282-
if field.name != self.blob_column_name]
283-
284-
min_value_stats = [column_stats[field.name]['min_values'] for field in normal_fields]
285-
max_value_stats = [column_stats[field.name]['max_values'] for field in normal_fields]
286-
value_null_counts = [column_stats[field.name]['null_counts'] for field in normal_fields]
287-
288275
self.sequence_generator.start = self.sequence_generator.current
289-
290276
return DataFileMeta.create(
291277
file_name=file_name,
292278
file_size=self.file_io.get_file_size(file_path),
293279
row_count=data.num_rows,
294280
min_key=GenericRow([], []),
295281
max_key=GenericRow([], []),
296-
key_stats=SimpleStats(
297-
GenericRow([], []),
298-
GenericRow([], []),
299-
[]),
300-
value_stats=SimpleStats(
301-
GenericRow(min_value_stats, normal_fields),
302-
GenericRow(max_value_stats, normal_fields),
303-
value_null_counts),
282+
key_stats=SimpleStats.empty_stats(),
283+
value_stats=SimpleStats.empty_stats(),
304284
min_sequence_number=-1,
305285
max_sequence_number=-1,
306286
schema_id=self.table.table_schema.id,
@@ -309,7 +289,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
309289
creation_time=Timestamp.now(),
310290
delete_row_count=0,
311291
file_source=0,
312-
value_stats_cols=self.normal_column_names,
292+
value_stats_cols=[],
313293
external_path=external_path,
314294
file_path=file_path,
315295
write_cols=self.write_cols)

paimon-python/pypaimon/write/writer/data_writer.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from pypaimon.data.timestamp import Timestamp
2727
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2828
from pypaimon.manifest.schema.simple_stats import SimpleStats
29-
from pypaimon.schema.data_types import PyarrowFieldParser
3029
from pypaimon.table.bucket_mode import BucketMode
3130
from pypaimon.table.row.generic_row import GenericRow
3231

@@ -190,21 +189,14 @@ def _write_data_to_file(self, data: pa.Table):
190189
min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
191190
max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
192191

193-
# key stats & value stats
194-
data_fields = self.table.fields if self.table.is_primary_key_table \
195-
else PyarrowFieldParser.to_paimon_schema(data.schema)
196-
column_stats = {
192+
key_column_stats = {
197193
field.name: self._get_column_stats(data, field.name)
198-
for field in data_fields
194+
for field in self.table.trimmed_primary_keys
199195
}
200-
all_fields = data_fields
201-
min_value_stats = [column_stats[field.name]['min_values'] for field in all_fields]
202-
max_value_stats = [column_stats[field.name]['max_values'] for field in all_fields]
203-
value_null_counts = [column_stats[field.name]['null_counts'] for field in all_fields]
204196
key_fields = self.trimmed_primary_keys_fields
205-
min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields]
206-
max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields]
207-
key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields]
197+
min_key_stats = [key_column_stats[field.name]['min_values'] for field in key_fields]
198+
max_key_stats = [key_column_stats[field.name]['max_values'] for field in key_fields]
199+
key_null_counts = [key_column_stats[field.name]['null_counts'] for field in key_fields]
208200
if not all(count == 0 for count in key_null_counts):
209201
raise RuntimeError("Primary key should not be null")
210202

@@ -222,11 +214,7 @@ def _write_data_to_file(self, data: pa.Table):
222214
GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
223215
key_null_counts,
224216
),
225-
value_stats=SimpleStats(
226-
GenericRow(min_value_stats, data_fields),
227-
GenericRow(max_value_stats, data_fields),
228-
value_null_counts,
229-
),
217+
value_stats=SimpleStats.empty_stats(),
230218
min_sequence_number=min_seq,
231219
max_sequence_number=max_seq,
232220
schema_id=self.table.table_schema.id,
@@ -235,7 +223,7 @@ def _write_data_to_file(self, data: pa.Table):
235223
creation_time=Timestamp.now(),
236224
delete_row_count=0,
237225
file_source=0,
238-
value_stats_cols=None, # None means all columns in the data have statistics
226+
value_stats_cols=[],
239227
external_path=external_path_str, # Set external path if using external paths
240228
first_row_id=None,
241229
write_cols=self.write_cols,

0 commit comments

Comments
 (0)