[Data] Refactor eval_projection stub column handling#60814
[Data] Refactor eval_projection stub column handling#60814slfan1989 wants to merge 3 commits intoray-project:masterfrom
Conversation
Improve the stub column cleanup logic in eval_projection(): - Replace hardcoded "__stub__" with _BATCH_SIZE_PRESERVING_STUB_COL_NAME constant - Use block_accessor.select([]) instead of fill + drop pattern for cleaner logic - Add conditional check to prevent dropping user columns that match stub name - Reuse BlockAccessor instance to avoid redundant object creation - Enhance comment to explain Arrow's empty table limitation Testing: - Add test for empty block (0 rows) handling - Add test for stub column name collision with user projection - Add test for single column projection - Verify stub column is properly cleaned up in Arrow tables - Add value assertions in stub column name collision test Signed-off-by: slfan1989 <slfan1989@apache.org>
There was a problem hiding this comment.
Code Review
This pull request is a solid refactoring of the eval_projection function. It successfully removes the hardcoded "__stub__" string, fixes a potential bug where a user's column could be accidentally dropped, and improves code clarity by replacing the old workaround with block_accessor.select([]). The new tests are comprehensive and cover important edge cases. I have one suggestion to further improve performance by avoiding iterative block creation.
| # Build an empty block that preserves row count across block types. Arrow tables | ||
| # cannot be created truly empty (0 columns) while retaining row count, so | ||
| # BlockAccessor.select([]) injects a stub column internally for Arrow blocks. | ||
| new_block = block_accessor.select([]) | ||
|
|
||
| for name, output_col in zip(names, output_cols): | ||
| new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col) | ||
|
|
||
| return BlockAccessor.for_block(new_block).drop(["__stub__"]) | ||
| final_accessor = BlockAccessor.for_block(new_block) | ||
| if ( | ||
| _BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names | ||
| and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names() | ||
| ): | ||
| new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME]) | ||
|
|
||
| return new_block |
There was a problem hiding this comment.
While this refactoring is a great improvement, creating a new BlockAccessor and a new block in each iteration of the loop can be inefficient, especially for projections with many columns. A more performant approach would be to construct the final block at once from all the computed columns, leveraging type-specific builders like pa.Table.from_arrays for Arrow and pd.DataFrame for Pandas. This avoids creating intermediate blocks and the need to handle the stub column for these known types.
| # Build an empty block that preserves row count across block types. Arrow tables | |
| # cannot be created truly empty (0 columns) while retaining row count, so | |
| # BlockAccessor.select([]) injects a stub column internally for Arrow blocks. | |
| new_block = block_accessor.select([]) | |
| for name, output_col in zip(names, output_cols): | |
| new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col) | |
| return BlockAccessor.for_block(new_block).drop(["__stub__"]) | |
| final_accessor = BlockAccessor.for_block(new_block) | |
| if ( | |
| _BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names | |
| and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names() | |
| ): | |
| new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME]) | |
| return new_block | |
| # Efficiently construct the new block based on its type. | |
| block_type = block_accessor.block_type() | |
| if block_type == BlockType.ARROW: | |
| return pa.Table.from_arrays(list(output_cols), names=list(names)) | |
| elif block_type == BlockType.PANDAS: | |
| return pd.DataFrame(dict(zip(names, output_cols))) | |
| # Fallback to generic, iterative construction for other block types. | |
| new_block = block_accessor.select([]) | |
| for name, output_col in zip(names, output_cols): | |
| new_block = BlockAccessor.for_block(new_block).fill_column(name, output_col) | |
| final_accessor = BlockAccessor.for_block(new_block) | |
| if ( | |
| _BATCH_SIZE_PRESERVING_STUB_COL_NAME not in names | |
| and _BATCH_SIZE_PRESERVING_STUB_COL_NAME in final_accessor.column_names() | |
| ): | |
| new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME]) | |
| return new_block |
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
Improve the stub column cleanup logic in eval_projection(): - Replace hardcoded "__stub__" with _BATCH_SIZE_PRESERVING_STUB_COL_NAME constant - Use block_accessor.select([]) instead of fill + drop pattern for cleaner logic - Add conditional check to prevent dropping user columns that match stub name - Reuse BlockAccessor instance to avoid redundant object creation - Enhance comment to explain Arrow's empty table limitation Testing: - Add test for empty block (0 rows) handling - Add test for stub column name collision with user projection - Add test for single column projection - Verify stub column is properly cleaned up in Arrow tables - Add value assertions in stub column name collision test Signed-off-by: slfan1989 <slfan1989@apache.org>
| ): | ||
| new_block = final_accessor.drop([_BATCH_SIZE_PRESERVING_STUB_COL_NAME]) | ||
|
|
||
| return new_block |
There was a problem hiding this comment.
Fallback path is unreachable dead code
Low Severity
The entire fallback code path (from the comment through the return new_block) is unreachable dead code. BlockType is an enum with only ARROW and PANDAS variants, and both are handled by early returns on lines 765 and 775. Additionally, NativeExpressionEvaluator.__init__ raises TypeError for any block type other than Arrow or Pandas, so expression evaluation on line 749 would fail before this code is reached even if a new BlockType were added. This means the block_accessor.select([]) call, the fill_column loop, and the conditional stub cleanup logic are never executed, adding maintenance burden and misleading readers into thinking this path handles real cases.


Description
Refactors the
eval_projection()function in the expression evaluator to improve stub column handling. This PR addresses three key issues:"__stub__"with the proper constant_BATCH_SIZE_PRESERVING_STUB_COL_NAMEblock_accessor.select([])The refactor also includes performance optimization by reusing BlockAccessor instances and enhanced documentation explaining why Arrow tables require a stub column.
Related issues
Additional information
What changed
Code improvements:
block_accessor.select([])to create empty blocks instead of manualfill_column("__stub__")+drop()final_accessorto avoid creating multiple BlockAccessor instancesTest coverage
Added comprehensive tests covering edge cases:
test_eval_projection_empty_block()- handles 0-row blockstest_eval_projection_with_stub_col_name_in_projection()- ensures user columns with stub name are preserved (with value verification)test_eval_projection_single_column()- basic projection casetest_eval_projection_builds_from_empty_block()- verifies behavior across Arrow and Pandas blocks