Skip to content

Commit 3f36b1b

Browse files
authored
[Data] Remove run_by_consumer parameter of ExecutionPlan (ray-project#46052)
This PR removes dead functions and parameters after ray-project#46054. Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
1 parent b0d2cb2 commit 3f36b1b

File tree

7 files changed

+19
-103
lines changed

7 files changed

+19
-103
lines changed

python/ray/data/_internal/execution/legacy_compat.py

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ray.data._internal.logical.util import record_operators_usage
1616
from ray.data._internal.plan import ExecutionPlan
1717
from ray.data._internal.stats import DatasetStats
18-
from ray.data.block import Block, BlockMetadata, List
18+
from ray.data.block import Block, BlockMetadata
1919
from ray.types import ObjectRef
2020

2121
# Warn about tasks larger than this.
@@ -25,13 +25,9 @@
2525
def execute_to_legacy_block_iterator(
2626
executor: Executor,
2727
plan: ExecutionPlan,
28-
allow_clear_input_blocks: bool,
29-
dataset_uuid: str,
3028
) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
3129
"""Same as execute_to_legacy_bundle_iterator but returning blocks and metadata."""
32-
bundle_iter = execute_to_legacy_bundle_iterator(
33-
executor, plan, allow_clear_input_blocks, dataset_uuid
34-
)
30+
bundle_iter = execute_to_legacy_bundle_iterator(executor, plan)
3531
for bundle in bundle_iter:
3632
for block, metadata in bundle.blocks:
3733
yield block, metadata
@@ -40,17 +36,13 @@ def execute_to_legacy_block_iterator(
4036
def execute_to_legacy_bundle_iterator(
4137
executor: Executor,
4238
plan: ExecutionPlan,
43-
allow_clear_input_blocks: bool,
44-
dataset_uuid: str,
4539
dag_rewrite=None,
4640
) -> Iterator[RefBundle]:
4741
"""Execute a plan with the new executor and return a bundle iterator.
4842
4943
Args:
5044
executor: The executor to use.
5145
plan: The legacy plan to execute.
52-
allow_clear_input_blocks: Whether the executor may consider clearing blocks.
53-
dataset_uuid: UUID of the dataset for this execution.
5446
dag_rewrite: Callback that can be used to mutate the DAG prior to execution.
5547
This is currently used as a legacy hack to inject the OutputSplit operator
5648
for `Dataset.streaming_split()`.
@@ -73,7 +65,6 @@ def execute_to_legacy_bundle_iterator(
7365
def execute_to_legacy_block_list(
7466
executor: Executor,
7567
plan: ExecutionPlan,
76-
allow_clear_input_blocks: bool,
7768
dataset_uuid: str,
7869
preserve_order: bool,
7970
) -> BlockList:
@@ -82,7 +73,6 @@ def execute_to_legacy_block_list(
8273
Args:
8374
executor: The executor to use.
8475
plan: The legacy plan to execute.
85-
allow_clear_input_blocks: Whether the executor may consider clearing blocks.
8676
dataset_uuid: UUID of the dataset for this execution.
8777
preserve_order: Whether to preserve order in execution.
8878
@@ -149,23 +139,6 @@ def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList:
149139
return BlockList(blocks, metadata, owned_by_consumer=owns_blocks)
150140

151141

152-
def _block_list_to_bundles(blocks: BlockList, owns_blocks: bool) -> List[RefBundle]:
153-
output = []
154-
for block, meta in blocks.iter_blocks_with_metadata():
155-
output.append(
156-
RefBundle(
157-
[
158-
(
159-
block,
160-
meta,
161-
)
162-
],
163-
owns_blocks=owns_blocks,
164-
)
165-
)
166-
return output
167-
168-
169142
def _set_stats_uuid_recursive(stats: DatasetStats, dataset_uuid: str) -> None:
170143
if not stats.dataset_uuid:
171144
stats.dataset_uuid = dataset_uuid

python/ray/data/_internal/iterator/stream_split_iterator.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,6 @@ def add_split_op(dag):
175175
output_iterator = execute_to_legacy_bundle_iterator(
176176
executor,
177177
dataset._plan,
178-
True,
179-
dataset._plan._dataset_uuid,
180178
dag_rewrite=add_split_op,
181179
)
182180
yield output_iterator

python/ray/data/_internal/plan.py

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,13 @@ def __init__(
5858
self,
5959
stats: DatasetStats,
6060
*,
61-
run_by_consumer: bool,
6261
data_context: Optional[DataContext] = None,
6362
):
6463
"""Create a plan with no transformation operators.
6564
6665
Args:
6766
stats: Stats for the base blocks.
6867
dataset_uuid: Dataset's UUID.
69-
run_by_consumer: Whether this plan is invoked to run by the consumption
70-
APIs (e.g. .iter_batches()).
7168
"""
7269
self._in_stats = stats
7370
# A computed snapshot of some prefix of operators and their corresponding
@@ -81,7 +78,6 @@ def __init__(
8178
# Set when a Dataset is constructed with this plan
8279
self._dataset_uuid = None
8380

84-
self._run_by_consumer = run_by_consumer
8581
self._dataset_name = None
8682

8783
self._has_started_execution = False
@@ -97,7 +93,6 @@ def __repr__(self) -> str:
9793
return (
9894
f"ExecutionPlan("
9995
f"dataset_uuid={self._dataset_uuid}, "
100-
f"run_by_consumer={self._run_by_consumer}, "
10196
f"snapshot_operator={self._snapshot_operator}"
10297
)
10398

@@ -163,9 +158,7 @@ def generate_logical_plan_string(
163158
count = None
164159
else:
165160
assert len(sources) == 1
166-
plan = ExecutionPlan(
167-
DatasetStats(metadata={}, parent=None), run_by_consumer=False
168-
)
161+
plan = ExecutionPlan(DatasetStats(metadata={}, parent=None))
169162
plan.link_logical_plan(LogicalPlan(sources[0]))
170163
schema = plan.schema()
171164
count = plan.meta_count()
@@ -292,7 +285,6 @@ def copy(self) -> "ExecutionPlan":
292285
"""
293286
plan_copy = ExecutionPlan(
294287
self._in_stats,
295-
run_by_consumer=self._run_by_consumer,
296288
data_context=self._context,
297289
)
298290
if self._snapshot_bundle is not None:
@@ -311,10 +303,7 @@ def deep_copy(self) -> "ExecutionPlan":
311303
Returns:
312304
A deep copy of this execution plan.
313305
"""
314-
plan_copy = ExecutionPlan(
315-
copy.copy(self._in_stats),
316-
run_by_consumer=self._run_by_consumer,
317-
)
306+
plan_copy = ExecutionPlan(copy.copy(self._in_stats))
318307
if self._snapshot_bundle:
319308
# Copy over the existing snapshot.
320309
plan_copy._snapshot_bundle = copy.copy(self._snapshot_bundle)
@@ -397,7 +386,6 @@ def meta_count(self) -> Optional[int]:
397386
@omit_traceback_stdout
398387
def execute_to_iterator(
399388
self,
400-
allow_clear_input_blocks: bool = True,
401389
) -> Tuple[
402390
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
403391
DatasetStats,
@@ -407,10 +395,6 @@ def execute_to_iterator(
407395
408396
This will use streaming execution to generate outputs.
409397
410-
Args:
411-
allow_clear_input_blocks: Whether we should try to clear the input blocks
412-
for each operator.
413-
414398
Returns:
415399
Tuple of iterator over output blocks and the executor.
416400
"""
@@ -420,7 +404,7 @@ def execute_to_iterator(
420404
ctx = self._context
421405

422406
if self.has_computed_output():
423-
bundle = self.execute(allow_clear_input_blocks)
407+
bundle = self.execute()
424408
return iter(bundle.blocks), self._snapshot_stats, None
425409

426410
from ray.data._internal.execution.legacy_compat import (
@@ -433,8 +417,6 @@ def execute_to_iterator(
433417
block_iter = execute_to_legacy_block_iterator(
434418
executor,
435419
self,
436-
allow_clear_input_blocks=allow_clear_input_blocks,
437-
dataset_uuid=self._dataset_uuid,
438420
)
439421
# Since the generator doesn't run any code until we try to fetch the first
440422
# value, force execution of one bundle before we call get_stats().
@@ -449,14 +431,11 @@ def execute_to_iterator(
449431
@omit_traceback_stdout
450432
def execute(
451433
self,
452-
allow_clear_input_blocks: bool = True,
453434
preserve_order: bool = False,
454435
) -> RefBundle:
455436
"""Execute this plan.
456437
457438
Args:
458-
allow_clear_input_blocks: Whether we should try to clear the input blocks
459-
for each operator.
460439
preserve_order: Whether to preserve order in execution.
461440
462441
Returns:
@@ -515,7 +494,6 @@ def execute(
515494
blocks = execute_to_legacy_block_list(
516495
executor,
517496
self,
518-
allow_clear_input_blocks=allow_clear_input_blocks,
519497
dataset_uuid=self._dataset_uuid,
520498
preserve_order=preserve_order,
521499
)

python/ray/data/dataset.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,10 +1403,7 @@ def train(self, data_iterator):
14031403
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
14041404
split_datasets.append(
14051405
MaterializedDataset(
1406-
ExecutionPlan(
1407-
stats,
1408-
run_by_consumer=owned_by_consumer,
1409-
),
1406+
ExecutionPlan(stats),
14101407
logical_plan,
14111408
)
14121409
)
@@ -1524,10 +1521,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]:
15241521
logical_plan = LogicalPlan(InputData(input_data=[bundle]))
15251522
split_datasets.append(
15261523
MaterializedDataset(
1527-
ExecutionPlan(
1528-
stats,
1529-
run_by_consumer=owned_by_consumer,
1530-
),
1524+
ExecutionPlan(stats),
15311525
logical_plan,
15321526
)
15331527
)
@@ -1601,10 +1595,7 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]:
16011595

16021596
splits.append(
16031597
MaterializedDataset(
1604-
ExecutionPlan(
1605-
stats,
1606-
run_by_consumer=bundle.owns_blocks,
1607-
),
1598+
ExecutionPlan(stats),
16081599
logical_plan,
16091600
)
16101601
)
@@ -1793,7 +1784,7 @@ def union(self, *other: List["Dataset"]) -> "Dataset":
17931784
)
17941785
stats.time_total_s = time.perf_counter() - start_time
17951786
return Dataset(
1796-
ExecutionPlan(stats, run_by_consumer=False),
1787+
ExecutionPlan(stats),
17971788
logical_plan,
17981789
)
17991790

@@ -4526,10 +4517,7 @@ def materialize(self) -> "MaterializedDataset":
45264517
]
45274518
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
45284519
output = MaterializedDataset(
4529-
ExecutionPlan(
4530-
copy._plan.stats(),
4531-
run_by_consumer=False,
4532-
),
4520+
ExecutionPlan(copy._plan.stats()),
45334521
logical_plan,
45344522
)
45354523
# Metrics are tagged with `copy`s uuid, update the output uuid with

python/ray/data/iterator.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -875,10 +875,7 @@ def materialize(self) -> "MaterializedDataset":
875875
]
876876
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
877877
return MaterializedDataset(
878-
ExecutionPlan(
879-
stats,
880-
run_by_consumer=owned_by_consumer,
881-
),
878+
ExecutionPlan(stats),
882879
logical_plan,
883880
)
884881

python/ray/data/read_api.py

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,7 @@ def from_blocks(blocks: List[Block]):
124124
from_blocks_op = FromBlocks(block_refs, metadata)
125125
logical_plan = LogicalPlan(from_blocks_op)
126126
return MaterializedDataset(
127-
ExecutionPlan(
128-
DatasetStats(metadata={"FromBlocks": metadata}, parent=None),
129-
run_by_consumer=False,
130-
),
127+
ExecutionPlan(DatasetStats(metadata={"FromBlocks": metadata}, parent=None)),
131128
logical_plan,
132129
)
133130

@@ -207,10 +204,7 @@ def from_items(
207204
from_items_op = FromItems(blocks, metadata)
208205
logical_plan = LogicalPlan(from_items_op)
209206
return MaterializedDataset(
210-
ExecutionPlan(
211-
DatasetStats(metadata={"FromItems": metadata}, parent=None),
212-
run_by_consumer=False,
213-
),
207+
ExecutionPlan(DatasetStats(metadata={"FromItems": metadata}, parent=None)),
214208
logical_plan,
215209
)
216210

@@ -441,7 +435,7 @@ def read_datasource(
441435
)
442436
logical_plan = LogicalPlan(read_op)
443437
return Dataset(
444-
plan=ExecutionPlan(stats, run_by_consumer=False),
438+
plan=ExecutionPlan(stats),
445439
logical_plan=logical_plan,
446440
)
447441

@@ -2481,10 +2475,7 @@ def from_pandas_refs(
24812475
metadata = ray.get([get_metadata.remote(df) for df in dfs])
24822476
logical_plan = LogicalPlan(FromPandas(dfs, metadata))
24832477
return MaterializedDataset(
2484-
ExecutionPlan(
2485-
DatasetStats(metadata={"FromPandas": metadata}, parent=None),
2486-
run_by_consumer=False,
2487-
),
2478+
ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)),
24882479
logical_plan,
24892480
)
24902481

@@ -2495,10 +2486,7 @@ def from_pandas_refs(
24952486
metadata = ray.get(metadata)
24962487
logical_plan = LogicalPlan(FromPandas(blocks, metadata))
24972488
return MaterializedDataset(
2498-
ExecutionPlan(
2499-
DatasetStats(metadata={"FromPandas": metadata}, parent=None),
2500-
run_by_consumer=False,
2501-
),
2489+
ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)),
25022490
logical_plan,
25032491
)
25042492

@@ -2581,10 +2569,7 @@ def from_numpy_refs(
25812569
logical_plan = LogicalPlan(FromNumpy(blocks, metadata))
25822570

25832571
return MaterializedDataset(
2584-
ExecutionPlan(
2585-
DatasetStats(metadata={"FromNumpy": metadata}, parent=None),
2586-
run_by_consumer=False,
2587-
),
2572+
ExecutionPlan(DatasetStats(metadata={"FromNumpy": metadata}, parent=None)),
25882573
logical_plan,
25892574
)
25902575

@@ -2660,10 +2645,7 @@ def from_arrow_refs(
26602645
logical_plan = LogicalPlan(FromArrow(tables, metadata))
26612646

26622647
return MaterializedDataset(
2663-
ExecutionPlan(
2664-
DatasetStats(metadata={"FromArrow": metadata}, parent=None),
2665-
run_by_consumer=False,
2666-
),
2648+
ExecutionPlan(DatasetStats(metadata={"FromArrow": metadata}, parent=None)),
26672649
logical_plan,
26682650
)
26692651

python/ray/data/tests/test_split.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def _test_equal_split_balanced(block_sizes, num_splits):
101101
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
102102
stats = DatasetStats(metadata={"TODO": []}, parent=None)
103103
ds = Dataset(
104-
ExecutionPlan(stats, run_by_consumer=True),
104+
ExecutionPlan(stats),
105105
logical_plan,
106106
)
107107

0 commit comments

Comments
 (0)