1616# limitations under the License.
1717################################################################################
1818
19- from typing import List , Optional , Any
19+ from typing import List , Optional , Any , Set
2020
2121import pyarrow as pa
2222from pyarrow import RecordBatch
@@ -35,7 +35,8 @@ class FormatVortexReader(RecordBatchReader):
3535
3636 def __init__ (self , file_io : FileIO , file_path : str , read_fields : List [DataField ],
3737 push_down_predicate : Any , batch_size : int = 1024 ,
38- row_indices : Optional [Any ] = None ):
38+ row_indices : Optional [Any ] = None ,
39+ predicate_fields : Optional [Set [str ]] = None ):
3940 import vortex
4041
4142 from pypaimon .read .reader .vortex_utils import to_vortex_specified
@@ -79,23 +80,29 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[DataField]
7980 PyarrowFieldParser .from_paimon_schema (read_fields ) if read_fields else None
8081 )
8182
83+ # Collect predicate-referenced fields for targeted view type casting
84+ self ._cast_fields = predicate_fields if predicate_fields and vortex_expr is not None else set ()
85+
8286 @staticmethod
83- def _cast_view_types (batch : RecordBatch ) -> RecordBatch :
84- """Cast string_view/binary_view columns to string/binary for PyArrow compatibility."""
87+ def _cast_view_types (batch : RecordBatch , target_fields : Set [str ]) -> RecordBatch :
88+ """Cast string_view/binary_view columns to string/binary, only for target fields."""
89+ if not target_fields :
90+ return batch
8591 columns = []
8692 fields = []
8793 changed = False
8894 for i in range (batch .num_columns ):
8995 col = batch .column (i )
9096 field = batch .schema .field (i )
91- if pa .types .is_large_string (col .type ) or col .type == pa .string_view ():
92- col = col .cast (pa .utf8 ())
93- field = field .with_type (pa .utf8 ())
94- changed = True
95- elif pa .types .is_large_binary (col .type ) or col .type == pa .binary_view ():
96- col = col .cast (pa .binary ())
97- field = field .with_type (pa .binary ())
98- changed = True
97+ if field .name in target_fields :
98+ if col .type == pa .string_view ():
99+ col = col .cast (pa .utf8 ())
100+ field = field .with_type (pa .utf8 ())
101+ changed = True
102+ elif col .type == pa .binary_view ():
103+ col = col .cast (pa .binary ())
104+ field = field .with_type (pa .binary ())
105+ changed = True
99106 columns .append (col )
100107 fields .append (field )
101108 if changed :
@@ -104,7 +111,8 @@ def _cast_view_types(batch: RecordBatch) -> RecordBatch:
104111
105112 def read_arrow_batch (self ) -> Optional [RecordBatch ]:
106113 try :
107- batch = self ._cast_view_types (next (self .record_batch_reader ))
114+ batch = next (self .record_batch_reader )
115+ batch = self ._cast_view_types (batch , self ._cast_fields )
108116
109117 if not self .missing_fields :
110118 return batch
0 commit comments