This repository was archived by the owner on Jan 12, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
This repository was archived by the owner on Jan 12, 2026. It is now read-only.
Start designing shuffling algorithm #26
Copy link
Copy link
Open
Description
When a stage sends its output, we want to start using that to shuffle data to downstream stages.
ray_beam_runner/ray_beam_runner/portability/execution.py
Lines 94 to 108 in 86bfcdd
| for output in worker_handler.data_conn.input_elements( | |
| process_bundle_id, | |
| expect_reads, | |
| abort_callback=lambda: | |
| (result_future.is_done() and bool(result_future.get().error))): | |
| if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run: | |
| output_buffers[expected_outputs[(output.transform_id, output.timer_family_id)]].append(output.data) | |
| if isinstance(output, beam_fn_api_pb2.Elements.Data) and not dry_run: | |
| output_buffers[expected_outputs[output.transform_id]].append(output.data) | |
| for pcoll, buffer in output_buffers.items(): | |
| objrefs = [ray.put(buffer)] | |
| runner_context.pcollection_buffers.put.remote(pcoll, objrefs) | |
| output_buffers[pcoll] = objrefs |
Example of shuffle implementation for Ray Datasets 1.13: ray-project/ray#23758
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels