Skip to content

Commit 1228933

Browse files
authored
[data] Fix errors with concatenation with mixed pyarrow native and extension types (#57566)
## Why are these changes needed? Cherry-pick #56811 Original description: If we had an execution where we needed to concatenate native pyarrow types and pyarrow extension types, we would get errors like the following: ``` ⚠️ Dataset dataset_5_0 execution failed: : 0.00 row [00:00, ? row/s] - Repartition 1: 0.00 row [00:00, ? row/s] *- Split Repartition: : 0.00 row [00:00, ? row/s] 2025-09-22 17:21:34,068 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose 2025-09-22 17:21:34,068 ERROR exceptions.py:81 -- Full stack trace: Traceback (most recent call last): File "/Users/mowen/code/ray/python/ray/data/exceptions.py", line 49, in handle_trace return fn(*args, **kwargs) File "/Users/mowen/code/ray/python/ray/data/_internal/plan.py", line 533, in execute blocks = execute_to_legacy_block_list( File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 127, in execute_to_legacy_block_list block_list = _bundles_to_block_list(bundles) File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 175, in _bundles_to_block_list bundle_list = list(bundles) File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__ return self.get_next() File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 680, in get_next bundle = state.get_output_blocking(output_split_idx) File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 373, in get_output_blocking raise self._exception File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 331, in run continue_sched = self._scheduling_loop_step(self._topology) File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 475, in _scheduling_loop_step update_operator_states(topology) File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 586, in update_operator_states op.all_inputs_done() File "/Users/mowen/code/ray/python/ray/data/_internal/execution/operators/base_physical_operator.py", line 122, in all_inputs_done self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx) File "/Users/mowen/code/ray/python/ray/data/_internal/planner/repartition.py", line 84, in split_repartition_fn return scheduler.execute(refs, num_outputs, ctx) File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 106, in execute ] = reduce_bar.fetch_until_complete(list(reduce_metadata_schema)) File "/Users/mowen/code/ray/python/ray/data/_internal/progress_bar.py", line 166, in fetch_until_complete for ref, result in zip(done, ray.get(done)): File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper return fn(*args, **kwargs) File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 104, in wrapper return func(*args, **kwargs) File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2952, in get values, debugger_breakpoint = worker.get_objects( File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 1025, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(RuntimeError): ray::reduce() (pid=7442, ip=127.0.0.1) File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce new_block = builder.build() File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build return self._builder.build() File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build return self._concat_tables(tables) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables return transform_pyarrow.concat(tables, promote_types=True) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat col = _concatenate_chunked_arrays(col_chunked_arrays) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays raise RuntimeError(f"Types mismatch: {type_} != {arr.type}") RuntimeError: Types mismatch: uint64 != double 2025-09-22 17:21:34,069 ERROR worker.py:429 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::reduce() (pid=7442, ip=127.0.0.1) File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce new_block = builder.build() File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build return self._builder.build() File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build return self._concat_tables(tables) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables return transform_pyarrow.concat(tables, promote_types=True) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat col = _concatenate_chunked_arrays(col_chunked_arrays) File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays raise RuntimeError(f"Types mismatch: {type_} != {arr.type}") RuntimeError: Types mismatch: uint64 != double ``` This PR adds a test that replicates this and fixes the underlying issue by concatenating extension types and native types separately before rejoining them. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :(
1 parent 2ece9ff commit 1228933

File tree

2 files changed

+192
-88
lines changed

2 files changed

+192
-88
lines changed

python/ray/data/_internal/arrow_ops/transform_pyarrow.py

Lines changed: 136 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,106 @@ def shuffle(block: "pyarrow.Table", seed: Optional[int] = None) -> "pyarrow.Tabl
584584
return take_table(block, indices)
585585

586586

587+
def _concat_cols_with_null_list(
588+
col_chunked_arrays: List["pyarrow.ChunkedArray"],
589+
) -> "pyarrow.ChunkedArray":
590+
import pyarrow as pa
591+
592+
# For each opaque list column, iterate through all schemas until
593+
# we find a valid value_type that can be used to override the
594+
# column types in the following for-loop.
595+
scalar_type = None
596+
for arr in col_chunked_arrays:
597+
if not pa.types.is_list(arr.type) or not pa.types.is_null(arr.type.value_type):
598+
scalar_type = arr.type
599+
break
600+
601+
if scalar_type is not None:
602+
for c_idx in range(len(col_chunked_arrays)):
603+
c = col_chunked_arrays[c_idx]
604+
if pa.types.is_list(c.type) and pa.types.is_null(c.type.value_type):
605+
if pa.types.is_list(scalar_type):
606+
# If we are dealing with a list input,
607+
# cast the array to the scalar_type found above.
608+
col_chunked_arrays[c_idx] = c.cast(scalar_type)
609+
else:
610+
# If we are dealing with a single value, construct
611+
# a new array with null values filled.
612+
col_chunked_arrays[c_idx] = pa.chunked_array(
613+
[pa.nulls(c.length(), type=scalar_type)]
614+
)
615+
616+
return _concatenate_chunked_arrays(col_chunked_arrays)
617+
618+
619+
def _concat_cols_with_extension_tensor_types(
620+
col_chunked_arrays: List["pyarrow.ChunkedArray"],
621+
) -> "pyarrow.ChunkedArray":
622+
623+
import pyarrow as pa
624+
625+
# For our tensor extension types, manually construct a chunked array
626+
# containing chunks from all blocks. This is to handle
627+
# homogeneous-shaped block columns having different shapes across
628+
# blocks: if tensor element shapes differ across blocks, a
629+
# variable-shaped tensor array will be returned.
630+
combined_chunks = list(
631+
itertools.chain(*[chunked.iterchunks() for chunked in col_chunked_arrays])
632+
)
633+
634+
return pa.chunked_array(unify_tensor_arrays(combined_chunks))
635+
636+
637+
def _concat_cols_with_extension_object_types(
638+
col_chunked_arrays: List["pyarrow.ChunkedArray"],
639+
) -> "pyarrow.ChunkedArray":
640+
import pyarrow as pa
641+
642+
from ray.data.extensions import ArrowPythonObjectArray, ArrowPythonObjectType
643+
644+
chunks_to_concat = []
645+
# Cast everything to objects if concatenated with an object column
646+
for ca in col_chunked_arrays:
647+
for chunk in ca.chunks:
648+
if isinstance(ca.type, ArrowPythonObjectType):
649+
chunks_to_concat.append(chunk)
650+
else:
651+
chunks_to_concat.append(
652+
ArrowPythonObjectArray.from_objects(chunk.to_pylist())
653+
)
654+
return pa.chunked_array(chunks_to_concat)
655+
656+
657+
def _concat_cols_with_native_pyarrow_types(
658+
col_names: List[str], blocks: List["pyarrow.Table"], promote_types: bool = False
659+
) -> Dict[str, "pyarrow.ChunkedArray"]:
660+
if not col_names:
661+
return {}
662+
663+
# For columns with native Pyarrow types, we should use built-in pyarrow.concat_tables.
664+
import pyarrow as pa
665+
666+
# When concatenating tables we allow type promotions to occur, since
667+
# no schema enforcement is currently performed, therefore allowing schemas
668+
# to vary b/w blocks
669+
670+
# NOTE: Type promotions aren't available in Arrow < 14.0
671+
subset_blocks = []
672+
for block in blocks:
673+
cols_to_select = [
674+
col_name for col_name in col_names if col_name in block.schema.names
675+
]
676+
subset_blocks.append(block.select(cols_to_select))
677+
if get_pyarrow_version() < parse_version("14.0.0"):
678+
table = pa.concat_tables(subset_blocks, promote=True)
679+
else:
680+
arrow_promote_types_mode = "permissive" if promote_types else "default"
681+
table = pa.concat_tables(
682+
subset_blocks, promote_options=arrow_promote_types_mode
683+
)
684+
return {col_name: table.column(col_name) for col_name in table.schema.names}
685+
686+
587687
def concat(
588688
blocks: List["pyarrow.Table"], *, promote_types: bool = False
589689
) -> "pyarrow.Table":
@@ -594,7 +694,6 @@ def concat(
594694

595695
from ray.air.util.tensor_extensions.arrow import ArrowConversionError
596696
from ray.data.extensions import (
597-
ArrowPythonObjectArray,
598697
ArrowPythonObjectType,
599698
get_arrow_extension_tensor_types,
600699
)
@@ -624,104 +723,53 @@ def concat(
624723
# Handle alignment of struct type columns.
625724
blocks = _align_struct_fields(blocks, schema)
626725

627-
# Rollup columns with opaque (null-typed) lists, to process in following for-loop.
726+
# Identify columns with null lists
628727
cols_with_null_list = set()
629728
for b in blocks:
630729
for col_name in b.schema.names:
631730
col_type = b.schema.field(col_name).type
632731
if pa.types.is_list(col_type) and pa.types.is_null(col_type.value_type):
633732
cols_with_null_list.add(col_name)
634733

635-
if (
636-
any(isinstance(type_, pa.ExtensionType) for type_ in schema.types)
637-
or cols_with_null_list
638-
):
639-
# Custom handling for extension array columns.
640-
cols = []
641-
for col_name in schema.names:
642-
col_chunked_arrays = []
643-
for block in blocks:
644-
col_chunked_arrays.append(block.column(col_name))
645-
646-
if isinstance(schema.field(col_name).type, tensor_types):
647-
# For our tensor extension types, manually construct a chunked array
648-
# containing chunks from all blocks. This is to handle
649-
# homogeneous-shaped block columns having different shapes across
650-
# blocks: if tensor element shapes differ across blocks, a
651-
# variable-shaped tensor array will be returned.
652-
combined_chunks = list(
653-
itertools.chain(
654-
*[chunked.iterchunks() for chunked in col_chunked_arrays]
655-
)
656-
)
734+
# Concatenate the columns according to their type
735+
concatenated_cols = {}
736+
native_pyarrow_cols = []
737+
for col_name in schema.names:
738+
col_type = schema.field(col_name).type
657739

658-
col = pa.chunked_array(unify_tensor_arrays(combined_chunks))
659-
elif isinstance(schema.field(col_name).type, ArrowPythonObjectType):
660-
chunks_to_concat = []
661-
# Cast everything to objects if concatenated with an object column
662-
for ca in col_chunked_arrays:
663-
for chunk in ca.chunks:
664-
if isinstance(ca.type, ArrowPythonObjectType):
665-
chunks_to_concat.append(chunk)
666-
else:
667-
chunks_to_concat.append(
668-
ArrowPythonObjectArray.from_objects(chunk.to_pylist())
669-
)
670-
col = pa.chunked_array(chunks_to_concat)
740+
col_chunked_arrays = []
741+
for block in blocks:
742+
if col_name in block.schema.names:
743+
col_chunked_arrays.append(block.column(col_name))
671744
else:
672-
if col_name in cols_with_null_list:
673-
# For each opaque list column, iterate through all schemas until
674-
# we find a valid value_type that can be used to override the
675-
# column types in the following for-loop.
676-
scalar_type = None
677-
for arr in col_chunked_arrays:
678-
if not pa.types.is_list(arr.type) or not pa.types.is_null(
679-
arr.type.value_type
680-
):
681-
scalar_type = arr.type
682-
break
683-
684-
if scalar_type is not None:
685-
for c_idx in range(len(col_chunked_arrays)):
686-
c = col_chunked_arrays[c_idx]
687-
if pa.types.is_list(c.type) and pa.types.is_null(
688-
c.type.value_type
689-
):
690-
if pa.types.is_list(scalar_type):
691-
# If we are dealing with a list input,
692-
# cast the array to the scalar_type found above.
693-
col_chunked_arrays[c_idx] = c.cast(scalar_type)
694-
else:
695-
# If we are dealing with a single value, construct
696-
# a new array with null values filled.
697-
col_chunked_arrays[c_idx] = pa.chunked_array(
698-
[pa.nulls(c.length(), type=scalar_type)]
699-
)
700-
701-
col = _concatenate_chunked_arrays(col_chunked_arrays)
702-
cols.append(col)
703-
704-
# Build the concatenated table.
705-
table = pyarrow.Table.from_arrays(cols, schema=schema)
706-
# Validate table schema (this is a cheap check by default).
707-
table.validate()
708-
else:
709-
# No extension array columns, so use built-in pyarrow.concat_tables.
710-
711-
# When concatenating tables we allow type promotions to occur, since
712-
# no schema enforcement is currently performed, therefore allowing schemas
713-
# to vary b/w blocks
714-
#
715-
# NOTE: Type promotions aren't available in Arrow < 14.0
716-
if get_pyarrow_version() < parse_version("14.0.0"):
717-
table = pyarrow.concat_tables(blocks, promote=True)
718-
else:
719-
arrow_promote_types_mode = "permissive" if promote_types else "default"
720-
table = pyarrow.concat_tables(
721-
blocks, promote_options=arrow_promote_types_mode
745+
col_chunked_arrays.append(pa.nulls(block.num_rows, type=col_type))
746+
747+
if col_name in cols_with_null_list:
748+
concatenated_cols[col_name] = _concat_cols_with_null_list(
749+
col_chunked_arrays
750+
)
751+
elif isinstance(col_type, tensor_types):
752+
concatenated_cols[col_name] = _concat_cols_with_extension_tensor_types(
753+
col_chunked_arrays
754+
)
755+
elif isinstance(col_type, ArrowPythonObjectType):
756+
concatenated_cols[col_name] = _concat_cols_with_extension_object_types(
757+
col_chunked_arrays
722758
)
759+
else:
760+
# Add to the list of native pyarrow columns, these will be concatenated after the loop using pyarrow.concat_tables
761+
native_pyarrow_cols.append(col_name)
723762

724-
return table
763+
concatenated_cols.update(
764+
_concat_cols_with_native_pyarrow_types(
765+
native_pyarrow_cols, blocks, promote_types
766+
)
767+
)
768+
769+
# Ensure that the columns are in the same order as the schema, reconstruct the table.
770+
return pyarrow.Table.from_arrays(
771+
[concatenated_cols[col_name] for col_name in schema.names], schema=schema
772+
)
725773

726774

727775
def concat_and_sort(

python/ray/data/tests/test_transform_pyarrow.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2847,6 +2847,62 @@ def unify_schemas_nested_struct_tensors_schemas():
28472847
return {"with_tensor": schema1, "without_tensor": schema2, "expected": expected}
28482848

28492849

2850+
@pytest.mark.parametrize("use_arrow_tensor_v2", [True, False])
2851+
@pytest.mark.skipif(
2852+
get_pyarrow_version() < MIN_PYARROW_VERSION_TYPE_PROMOTION,
2853+
reason="Requires Arrow version of at least 14.0.0",
2854+
)
2855+
def test_concat_with_mixed_tensor_types_and_native_pyarrow_types(
2856+
use_arrow_tensor_v2, restore_data_context
2857+
):
2858+
DataContext.get_current().use_arrow_tensor_v2 = use_arrow_tensor_v2
2859+
2860+
num_rows = 1024
2861+
2862+
# Block A: int is uint64; tensor = Ray tensor extension
2863+
t_uint = pa.table(
2864+
{
2865+
"int": pa.array(np.zeros(num_rows // 2, dtype=np.uint64), type=pa.uint64()),
2866+
"tensor": ArrowTensorArray.from_numpy(
2867+
np.zeros((num_rows // 2, 3, 3), dtype=np.float32)
2868+
),
2869+
}
2870+
)
2871+
2872+
# Block B: int is float64 with NaNs; tensor = same extension type
2873+
f = np.ones(num_rows // 2, dtype=np.float64)
2874+
f[::8] = np.nan
2875+
t_float = pa.table(
2876+
{
2877+
"int": pa.array(f, type=pa.float64()),
2878+
"tensor": ArrowTensorArray.from_numpy(
2879+
np.zeros((num_rows // 2, 3, 3), dtype=np.float32)
2880+
),
2881+
}
2882+
)
2883+
2884+
# Two input blocks with different Arrow dtypes for "int"
2885+
ds = ray.data.from_arrow([t_uint, t_float])
2886+
2887+
# Force a concat across blocks
2888+
ds = ds.repartition(1)
2889+
2890+
# This should not raise: RuntimeError: Types mismatch: double != uint64
2891+
ds.materialize()
2892+
2893+
# Ensure that the result is correct
2894+
# Determine expected tensor type based on current DataContext setting
2895+
if use_arrow_tensor_v2:
2896+
expected_tensor_type = ArrowTensorTypeV2((3, 3), pa.float32())
2897+
else:
2898+
expected_tensor_type = ArrowTensorType((3, 3), pa.float32())
2899+
2900+
assert ds.schema().base_schema == pa.schema(
2901+
[("int", pa.float64()), ("tensor", expected_tensor_type)]
2902+
)
2903+
assert ds.count() == num_rows
2904+
2905+
28502906
@pytest.fixture
28512907
def object_with_tensor_fails_blocks():
28522908
"""Blocks that should fail when concatenating objects with tensors."""

0 commit comments

Comments
 (0)