Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pyarrow.compute as pc
import pyarrow.dataset as ds

from ray.data._internal.arrow_block import _BATCH_SIZE_PRESERVING_STUB_COL_NAME
from ray.data._internal.logical.rules.projection_pushdown import (
_extract_input_columns_renaming_mapping,
)
Expand Down Expand Up @@ -747,15 +748,45 @@ def eval_projection(projection_exprs: List[Expr], block: Block) -> Block:

names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])

# This clumsy workaround is necessary to be able to fill in Pyarrow tables
# that has to be "seeded" from existing table with N rows, and couldn't be
# started from a truly empty table.
#
# TODO fix
new_block = BlockAccessor.for_block(block).fill_column("__stub__", None)
new_block = BlockAccessor.for_block(new_block).drop(input_column_names)
block_type = block_accessor.block_type()
if block_type == BlockType.ARROW:
num_rows = block_accessor.num_rows()
arrays = []
for output_col in output_cols:
if isinstance(output_col, (pa.Array, pa.ChunkedArray)):
arrays.append(output_col)
else:
if isinstance(output_col, pa.Scalar):
column_type = output_col.type
else:
column_type = pa.infer_type([output_col])
array = pa.nulls(num_rows, type=column_type)
arrays.append(pc.fill_null(array, output_col))
return pa.Table.from_arrays(arrays, names=list(names))
elif block_type == BlockType.PANDAS:
num_rows = block_accessor.num_rows()
index = block.index if isinstance(block, pd.DataFrame) else range(num_rows)
data = {}
for name, output_col in zip(names, output_cols):
if isinstance(output_col, (pa.Array, pa.ChunkedArray)):
data[name] = output_col.to_pandas()
else:
data[name] = output_col
return pd.DataFrame(data, index=index)

# Build an empty block that preserves row count across block types. Arrow tables
# cannot be created truly empty (0 columns) while retaining row count, so
# BlockAccessor.select([]) injects a stub column internally for Arrow blocks.
new_block = block_accessor.select([])

for name, output_col in zip(names, output_cols):
new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col)

return BlockAccessor.for_block(new_block).drop(["__stub__"])
final_accessor = BlockAccessor.for_block(new_block)
if (
_BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names
and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names()
):
new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME])

return new_block
Comment on lines +777 to +792
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While this refactoring is a great improvement, creating a new BlockAccessor and a new block in each iteration of the loop can be inefficient, especially for projections with many columns. A more performant approach would be to construct the final block at once from all the computed columns, leveraging type-specific builders like pa.Table.from_arrays for Arrow and pd.DataFrame for Pandas. This avoids creating intermediate blocks and the need to handle the stub column for these known types.

Suggested change
# Build an empty block that preserves row count across block types. Arrow tables
# cannot be created truly empty (0 columns) while retaining row count, so
# BlockAccessor.select([]) injects a stub column internally for Arrow blocks.
new_block = block_accessor.select([])
for name, output_col in zip(names, output_cols):
new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col)
return BlockAccessor.for_block(new_block).drop(["__stub__"])
final_accessor = BlockAccessor.for_block(new_block)
if (
_BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names
and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names()
):
new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME])
return new_block
# Efficiently construct the new block based on its type.
block_type = block_accessor.block_type()
if block_type == BlockType.ARROW:
return pa.Table.from_arrays(list(output_cols), names=list(names))
elif block_type == BlockType.PANDAS:
return pd.DataFrame(dict(zip(names, output_cols)))
# Fallback to generic, iterative construction for other block types.
new_block = block_accessor.select([])
for name, output_col in zip(names, output_cols):
new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col)
final_accessor = BlockAccessor.for_block(new_block)
if (
_BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names
and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names()
):
new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME])
return new_block

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback path is unreachable dead code

Low Severity

The entire fallback code path (from the comment through the return new_block) is unreachable dead code. BlockType is an enum with only ARROW and PANDAS variants, and both are handled by early returns on lines 765 and 775. Additionally, NativeExpressionEvaluator.__init__ raises TypeError for any block type other than Arrow or Pandas, so expression evaluation on line 749 would fail before this code is reached even if a new BlockType were added. This means the block_accessor.select([]) call, the fill_column loop, and the conditional stub cleanup logic are never executed, adding maintenance burden and misleading readers into thinking this path handles real cases.

Fix in Cursor Fix in Web

60 changes: 60 additions & 0 deletions python/ray/data/tests/unit/test_expression_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import pytest
from pkg_resources import parse_version

from ray.data._internal.arrow_block import _BATCH_SIZE_PRESERVING_STUB_COL_NAME
from ray.data._internal.planner.plan_expression.expression_evaluator import (
ExpressionEvaluator,
eval_projection,
)
from ray.data.block import BlockAccessor
from ray.data.expressions import col, lit
from ray.data.tests.conftest import get_pyarrow_version


Expand Down Expand Up @@ -350,6 +354,62 @@ def test_filter_bad_expression(sample_data):
pq.read_table(sample_data_path, filters=filters)


@pytest.mark.parametrize(
"block",
[
pa.table({"a": [1, 2], "b": [3, 4]}),
pd.DataFrame({"a": [1, 2], "b": [3, 4]}),
],
)
def test_eval_projection_builds_from_empty_block(block):
exprs = [lit(5).alias("five"), (col("a") + lit(1)).alias("a_plus")]

out = eval_projection(exprs, block)
acc = BlockAccessor.for_block(out)

assert acc.num_rows() == 2
assert acc.column_names() == ["five", "a_plus"]

out_df = acc.to_pandas()
assert out_df["five"].tolist() == [5, 5]
assert out_df["a_plus"].tolist() == [2, 3]

if isinstance(out, pa.Table):
assert _BATCH_SIZE_PRESERVING_STUB_COL_NAME not in acc.column_names()


def test_eval_projection_empty_block():
block = pa.table({"a": pa.array([], type=pa.int64())})
exprs = [lit(5).alias("five")]

out = eval_projection(exprs, block)

assert BlockAccessor.for_block(out).num_rows() == 0


def test_eval_projection_with_stub_col_name_in_projection():
block = pa.table({"a": [1, 2]})
exprs = [lit(999).alias(_BATCH_SIZE_PRESERVING_STUB_COL_NAME)]

out = eval_projection(exprs, block)

acc = BlockAccessor.for_block(out)
assert _BATCH_SIZE_PRESERVING_STUB_COL_NAME in acc.column_names()
assert acc.to_pandas()[_BATCH_SIZE_PRESERVING_STUB_COL_NAME].tolist() == [
999,
999,
]


def test_eval_projection_single_column():
block = pa.table({"a": [1, 2], "b": [3, 4]})
exprs = [col("a")]

out = eval_projection(exprs, block)

assert BlockAccessor.for_block(out).column_names() == ["a"]


if __name__ == "__main__":
import sys

Expand Down