Skip to content

Commit 1ab2d07

Browse files
committed
[Data] Refactor eval_projection stub column handling
Improve the stub column cleanup logic in eval_projection(): - Replace hardcoded "__stub__" with _BATCH_SIZE_PRESERVING_STUB_COL_NAME constant - Use block_accessor.select([]) instead of fill + drop pattern for cleaner logic - Add conditional check to prevent dropping user columns that match stub name - Reuse BlockAccessor instance to avoid redundant object creation - Enhance comment to explain Arrow's empty table limitation Testing: - Add test for empty block (0 rows) handling - Add test for stub column name collision with user projection - Add test for single column projection - Verify stub column is properly cleaned up in Arrow tables - Add value assertions in stub column name collision test Signed-off-by: slfan1989 <slfan1989@apache.org>
1 parent b93fc26 commit 1ab2d07

File tree

2 files changed

+73
-8
lines changed

2 files changed

+73
-8
lines changed

python/ray/data/_internal/planner/plan_expression/expression_evaluator.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pyarrow.compute as pc
1212
import pyarrow.dataset as ds
1313

14+
from ray.data._internal.arrow_block import _BATCH_SIZE_PRESERVING_STUB_COL_NAME
1415
from ray.data._internal.logical.rules.projection_pushdown import (
1516
_extract_input_columns_renaming_mapping,
1617
)
@@ -747,15 +748,19 @@ def eval_projection(projection_exprs: List[Expr], block: Block) -> Block:
747748

748749
names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
749750

750-
# This clumsy workaround is necessary to be able to fill in Pyarrow tables
751-
# that has to be "seeded" from existing table with N rows, and couldn't be
752-
# started from a truly empty table.
753-
#
754-
# TODO fix
755-
new_block = BlockAccessor.for_block(block).fill_column("__stub__", None)
756-
new_block = BlockAccessor.for_block(new_block).drop(input_column_names)
751+
# Build an empty block that preserves row count across block types. Arrow tables
752+
# cannot be created truly empty (0 columns) while retaining row count, so
753+
# BlockAccessor.select([]) injects a stub column internally for Arrow blocks.
754+
new_block = block_accessor.select([])
757755

758756
for name, output_col in zip(names, output_cols):
759757
new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col)
760758

761-
return BlockAccessor.for_block(new_block).drop(["__stub__"])
759+
final_accessor = BlockAccessor.for_block(new_block)
760+
if (
761+
_BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names
762+
and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names()
763+
):
764+
new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME])
765+
766+
return new_block

python/ray/data/tests/unit/test_expression_evaluator.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
import pytest
66
from pkg_resources import parse_version
77

8+
from ray.data._internal.arrow_block import _BATCH_SIZE_PRESERVING_STUB_COL_NAME
89
from ray.data._internal.planner.plan_expression.expression_evaluator import (
910
ExpressionEvaluator,
11+
eval_projection,
1012
)
13+
from ray.data.block import BlockAccessor
14+
from ray.data.expressions import col, lit
1115
from ray.data.tests.conftest import get_pyarrow_version
1216

1317

@@ -350,6 +354,62 @@ def test_filter_bad_expression(sample_data):
350354
pq.read_table(sample_data_path, filters=filters)
351355

352356

357+
@pytest.mark.parametrize(
358+
"block",
359+
[
360+
pa.table({"a": [1, 2], "b": [3, 4]}),
361+
pd.DataFrame({"a": [1, 2], "b": [3, 4]}),
362+
],
363+
)
364+
def test_eval_projection_builds_from_empty_block(block):
365+
exprs = [lit(5).alias("five"), (col("a") + lit(1)).alias("a_plus")]
366+
367+
out = eval_projection(exprs, block)
368+
acc = BlockAccessor.for_block(out)
369+
370+
assert acc.num_rows() == 2
371+
assert acc.column_names() == ["five", "a_plus"]
372+
373+
out_df = acc.to_pandas()
374+
assert out_df["five"].tolist() == [5, 5]
375+
assert out_df["a_plus"].tolist() == [2, 3]
376+
377+
if isinstance(out, pa.Table):
378+
assert _BATCH_SIZE_PRESERVING_STUB_COL_NAME not in acc.column_names()
379+
380+
381+
def test_eval_projection_empty_block():
382+
block = pa.table({"a": pa.array([], type=pa.int64())})
383+
exprs = [lit(5).alias("five")]
384+
385+
out = eval_projection(exprs, block)
386+
387+
assert BlockAccessor.for_block(out).num_rows() == 0
388+
389+
390+
def test_eval_projection_with_stub_col_name_in_projection():
391+
block = pa.table({"a": [1, 2]})
392+
exprs = [lit(999).alias(_BATCH_SIZE_PRESERVING_STUB_COL_NAME)]
393+
394+
out = eval_projection(exprs, block)
395+
396+
acc = BlockAccessor.for_block(out)
397+
assert _BATCH_SIZE_PRESERVING_STUB_COL_NAME in acc.column_names()
398+
assert acc.to_pandas()[_BATCH_SIZE_PRESERVING_STUB_COL_NAME].tolist() == [
399+
999,
400+
999,
401+
]
402+
403+
404+
def test_eval_projection_single_column():
405+
block = pa.table({"a": [1, 2], "b": [3, 4]})
406+
exprs = [col("a")]
407+
408+
out = eval_projection(exprs, block)
409+
410+
assert BlockAccessor.for_block(out).column_names() == ["a"]
411+
412+
353413
if __name__ == "__main__":
354414
import sys
355415

0 commit comments

Comments
 (0)