|
11 | 11 | PhysicalOperator, |
12 | 12 | RefBundle, |
13 | 13 | ) |
14 | | -from ray.data._internal.lazy_block_list import LazyBlockList |
15 | | -from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan |
16 | | -from ray.data._internal.logical.operators.read_operator import Read |
17 | 14 | from ray.data._internal.logical.optimizers import get_execution_plan |
18 | | -from ray.data._internal.logical.rules.set_read_parallelism import ( |
19 | | - compute_additional_split_factor, |
20 | | -) |
21 | 15 | from ray.data._internal.logical.util import record_operators_usage |
22 | 16 | from ray.data._internal.plan import ExecutionPlan |
23 | | -from ray.data._internal.planner.plan_read_op import ( |
24 | | - apply_output_blocks_handling_to_read_task, |
25 | | -) |
26 | 17 | from ray.data._internal.stats import DatasetStats |
27 | 18 | from ray.data.block import Block, BlockMetadata, List |
28 | | -from ray.data.context import DataContext |
29 | 19 | from ray.types import ObjectRef |
30 | 20 |
|
31 | 21 | # Warn about tasks larger than this. |
@@ -111,55 +101,6 @@ def execute_to_legacy_block_list( |
111 | 101 | return block_list |
112 | 102 |
|
113 | 103 |
|
114 | | -def get_legacy_lazy_block_list_read_only( |
115 | | - plan: ExecutionPlan, |
116 | | -) -> LazyBlockList: |
117 | | - """For a read-only plan, construct a LazyBlockList with ReadTasks from the |
118 | | - input Datasource or Reader. Note that the plan and the underlying ReadTasks |
119 | | - are not executed, only their known metadata is fetched. |
120 | | -
|
121 | | - Args: |
122 | | - plan: The legacy plan to execute. |
123 | | -
|
124 | | - Returns: |
125 | | - The output as a legacy LazyBlockList. |
126 | | - """ |
127 | | - assert plan.is_read_only(), "This function only supports read-only plans." |
128 | | - assert isinstance(plan._logical_plan, LogicalPlan) |
129 | | - read_logical_op = plan._logical_plan.dag |
130 | | - assert isinstance(read_logical_op, Read) |
131 | | - |
132 | | - # In the full dataset execution, the logic in ApplyAdditionalSplitToOutputBlocks |
133 | | - # is normally executed as part of the MapOperator created in the |
134 | | - # LogicalPlan -> PhysicalPlan plan translation. In this case, since we |
135 | | - # get the ReadTasks directly from the Datasource or Reader, |
136 | | - # we need to manually apply this logic in order to update the ReadTasks. |
137 | | - ctx = DataContext.get_current() |
138 | | - (parallelism, _, estimated_num_blocks, k,) = compute_additional_split_factor( |
139 | | - read_logical_op._datasource_or_legacy_reader, |
140 | | - read_logical_op._parallelism, |
141 | | - read_logical_op._mem_size, |
142 | | - ctx.target_max_block_size, |
143 | | - cur_additional_split_factor=None, |
144 | | - ) |
145 | | - read_tasks = read_logical_op._datasource_or_legacy_reader.get_read_tasks( |
146 | | - parallelism |
147 | | - ) |
148 | | - for read_task in read_tasks: |
149 | | - apply_output_blocks_handling_to_read_task(read_task, k) |
150 | | - |
151 | | - block_list = LazyBlockList( |
152 | | - read_tasks, |
153 | | - read_logical_op.name, |
154 | | - ray_remote_args=read_logical_op._ray_remote_args, |
155 | | - owned_by_consumer=False, |
156 | | - ) |
157 | | - # Update the estimated number of blocks after applying optimizations |
158 | | - # and fetching metadata (e.g. SetReadParallelismRule). |
159 | | - block_list._estimated_num_blocks = estimated_num_blocks |
160 | | - return block_list |
161 | | - |
162 | | - |
163 | 104 | def _get_execution_dag( |
164 | 105 | executor: Executor, |
165 | 106 | plan: ExecutionPlan, |
|
0 commit comments