Skip to content

Commit 63bb3ef

Browse files
committed
fix cases
1 parent 619dd77 commit 63bb3ef

File tree

3 files changed

+77
-5
lines changed

3 files changed

+77
-5
lines changed

paimon-python/pypaimon/read/reader/format_vortex_reader.py

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
from pypaimon.common.file_io import FileIO
2525
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
26+
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
27+
from pypaimon.table.special_fields import SpecialFields
2628

2729

2830
class FormatVortexReader(RecordBatchReader):
@@ -31,7 +33,7 @@ class FormatVortexReader(RecordBatchReader):
3133
and filters it based on the provided predicate and projection.
3234
"""
3335

34-
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
36+
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[DataField],
3537
push_down_predicate: Any, batch_size: int = 1024,
3638
row_indices: Optional[Any] = None):
3739
import vortex
@@ -46,7 +48,15 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
4648
else:
4749
vortex_file = vortex.open(file_path_for_vortex)
4850

49-
columns_for_vortex = read_fields if read_fields else None
51+
self.read_fields = read_fields
52+
self._read_field_names = [f.name for f in read_fields]
53+
54+
# Identify which fields exist in the file and which are missing
55+
file_schema_names = set(vortex_file.dtype.to_arrow_schema().names)
56+
self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names]
57+
self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names]
58+
59+
columns_for_vortex = self.existing_fields if self.existing_fields else None
5060

5161
# Try to convert Arrow predicate to Vortex expr for native push-down
5262
vortex_expr = None
@@ -65,9 +75,68 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
6575
self.record_batch_reader = vortex_file.scan(
6676
columns_for_vortex, expr=vortex_expr, indices=indices, batch_size=batch_size).to_arrow()
6777

78+
self._output_schema = (
79+
PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None
80+
)
81+
82+
@staticmethod
83+
def _cast_view_types(batch: RecordBatch) -> RecordBatch:
84+
"""Cast string_view/binary_view columns to string/binary for PyArrow compatibility."""
85+
columns = []
86+
fields = []
87+
changed = False
88+
for i in range(batch.num_columns):
89+
col = batch.column(i)
90+
field = batch.schema.field(i)
91+
if pa.types.is_large_string(col.type) or col.type == pa.string_view():
92+
col = col.cast(pa.utf8())
93+
field = field.with_type(pa.utf8())
94+
changed = True
95+
elif pa.types.is_large_binary(col.type) or col.type == pa.binary_view():
96+
col = col.cast(pa.binary())
97+
field = field.with_type(pa.binary())
98+
changed = True
99+
columns.append(col)
100+
fields.append(field)
101+
if changed:
102+
return pa.RecordBatch.from_arrays(columns, schema=pa.schema(fields))
103+
return batch
104+
68105
def read_arrow_batch(self) -> Optional[RecordBatch]:
69106
try:
70-
return next(self.record_batch_reader)
107+
batch = self._cast_view_types(next(self.record_batch_reader))
108+
109+
if not self.missing_fields:
110+
return batch
111+
112+
def _type_for_missing(name: str) -> pa.DataType:
113+
if self._output_schema is not None:
114+
idx = self._output_schema.get_field_index(name)
115+
if idx >= 0:
116+
return self._output_schema.field(idx).type
117+
return pa.null()
118+
119+
missing_columns = [
120+
pa.nulls(batch.num_rows, type=_type_for_missing(name))
121+
for name in self.missing_fields
122+
]
123+
124+
# Reconstruct the batch with all fields in the correct order
125+
all_columns = []
126+
out_fields = []
127+
for field_name in self._read_field_names:
128+
if field_name in self.existing_fields:
129+
column_idx = self.existing_fields.index(field_name)
130+
all_columns.append(batch.column(column_idx))
131+
out_fields.append(batch.schema.field(column_idx))
132+
else:
133+
column_idx = self.missing_fields.index(field_name)
134+
col_type = _type_for_missing(field_name)
135+
all_columns.append(missing_columns[column_idx])
136+
nullable = not SpecialFields.is_system_field(field_name)
137+
out_fields.append(pa.field(field_name, col_type, nullable=nullable))
138+
return pa.RecordBatch.from_arrays(all_columns, schema=pa.schema(out_fields))
139+
71140
except StopIteration:
72141
return None
73142

paimon-python/pypaimon/read/split_read.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
166166
format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields,
167167
read_arrow_predicate, batch_size=batch_size)
168168
elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
169-
format_reader = FormatVortexReader(self.table.file_io, file_path, read_file_fields,
169+
name_to_field = {f.name: f for f in self.read_fields}
170+
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
171+
format_reader = FormatVortexReader(self.table.file_io, file_path, ordered_read_fields,
170172
read_arrow_predicate, batch_size=batch_size,
171173
row_indices=row_indices)
172174
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:

paimon-python/pypaimon/tests/data_evolution_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,8 @@ def test_vortex_row_id_filter(self):
15361536
self.assertEqual(full.num_rows, 10)
15371537

15381538
# Filter by _ROW_ID using predicate — triggers row_ranges pushdown in Vortex
1539-
pb = rb.new_predicate_builder()
1539+
rb_with_rowid = table.new_read_builder().with_projection(['f0', 'f1', '_ROW_ID'])
1540+
pb = rb_with_rowid.new_predicate_builder()
15401541
rb_filtered = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 3))
15411542
filtered = rb_filtered.new_read().to_arrow(rb_filtered.new_scan().plan().splits())
15421543
self.assertEqual(filtered.num_rows, 1)

0 commit comments

Comments
 (0)