Skip to content

Commit 925d325

Browse files
committed
[Data] Refactor eval_projection stub column handling
Improve the stub column cleanup logic in eval_projection(): - Replace hardcoded "__stub__" with _BATCH_SIZE_PRESERVING_STUB_COL_NAME constant - Use block_accessor.select([]) instead of fill + drop pattern for cleaner logic - Add conditional check to prevent dropping user columns that match stub name - Reuse BlockAccessor instance to avoid redundant object creation - Enhance comment to explain Arrow's empty table limitation Testing: - Add test for empty block (0 rows) handling - Add test for stub column name collision with user projection - Add test for single column projection - Verify stub column is properly cleaned up in Arrow tables - Add value assertions in stub column name collision test Signed-off-by: slfan1989 <slfan1989@apache.org>
1 parent 1ab2d07 commit 925d325

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

python/ray/data/_internal/planner/plan_expression/expression_evaluator.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,32 @@ def eval_projection(projection_exprs: List[Expr], block: Block) -> Block:
748748

749749
names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
750750

751+
block_type = block_accessor.block_type()
752+
if block_type == BlockType.ARROW:
753+
num_rows = block_accessor.num_rows()
754+
arrays = []
755+
for output_col in output_cols:
756+
if isinstance(output_col, (pa.Array, pa.ChunkedArray)):
757+
arrays.append(output_col)
758+
else:
759+
if isinstance(output_col, pa.Scalar):
760+
column_type = output_col.type
761+
else:
762+
column_type = pa.infer_type([output_col])
763+
array = pa.nulls(num_rows, type=column_type)
764+
arrays.append(pc.fill_null(array, output_col))
765+
return pa.Table.from_arrays(arrays, names=list(names))
766+
elif block_type == BlockType.PANDAS:
767+
num_rows = block_accessor.num_rows()
768+
index = block.index if isinstance(block, pd.DataFrame) else range(num_rows)
769+
data = {}
770+
for name, output_col in zip(names, output_cols):
771+
if isinstance(output_col, (pa.Array, pa.ChunkedArray)):
772+
data[name] = output_col.to_pandas()
773+
else:
774+
data[name] = output_col
775+
return pd.DataFrame(data, index=index)
776+
751777
# Build an empty block that preserves row count across block types. Arrow tables
752778
# cannot be created truly empty (0 columns) while retaining row count, so
753779
# BlockAccessor.select([]) injects a stub column internally for Arrow blocks.

0 commit comments

Comments
 (0)