-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Data] Support strict=False mode for StreamingRepartition #60295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
838513c
08008d1
591b00f
ae95e03
8f7282a
cdf8f9f
def13b2
dc609e1
2c87758
d9d4295
04964bc
a9fbce0
7b825e5
55c79bd
accb54a
89965d0
f748b79
49cc5fc
68d01c4
8a48fdd
c77787c
6a2fec8
111c054
83c5ddb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -278,22 +278,31 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # only allow fusion of MapBatches -> StreamingRepartition | ||||||||||||||||||||||||||||||||||
| if isinstance(down_logical_op, StreamingRepartition): | ||||||||||||||||||||||||||||||||||
| return ( | ||||||||||||||||||||||||||||||||||
| if not ( | ||||||||||||||||||||||||||||||||||
| isinstance(up_logical_op, MapBatches) | ||||||||||||||||||||||||||||||||||
| and up_logical_op.batch_size is not None | ||||||||||||||||||||||||||||||||||
| and down_logical_op.target_num_rows_per_block is not None | ||||||||||||||||||||||||||||||||||
| and down_logical_op.target_num_rows_per_block > 0 | ||||||||||||||||||||||||||||||||||
| # When the batch_size is a multiple of target_num_rows_per_block, fusing would still produce exactly identical sequence of blocks. | ||||||||||||||||||||||||||||||||||
| # See `_fuse_streaming_repartition_operators_in_dag` docstring for details. | ||||||||||||||||||||||||||||||||||
| # TODO: when the StreamingRepartition supports none_strict_mode, we can fuse | ||||||||||||||||||||||||||||||||||
| # `MapBatches -> StreamingRepartition` no matter what the `batch_size` and `target_num_rows` are. | ||||||||||||||||||||||||||||||||||
| # https://anyscale1.atlassian.net/browse/DATA-1731 | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Non-strict mode: can always fuse, no matter what batch_size is. | ||||||||||||||||||||||||||||||||||
| # This allows fusion without cross-task buffering by using default bundler. | ||||||||||||||||||||||||||||||||||
| if not down_logical_op._strict: | ||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Strict mode: only fuse when batch_size is a multiple of target_num_rows_per_block. | ||||||||||||||||||||||||||||||||||
| # When batch_size % target == 0, each batch can be perfectly sliced into chunks | ||||||||||||||||||||||||||||||||||
| # without cross-task buffering. See `_fuse_streaming_repartition_operators_in_dag` | ||||||||||||||||||||||||||||||||||
| # docstring for details. | ||||||||||||||||||||||||||||||||||
| return ( | ||||||||||||||||||||||||||||||||||
| up_logical_op.batch_size is not None | ||||||||||||||||||||||||||||||||||
| and up_logical_op.batch_size % down_logical_op.target_num_rows_per_block | ||||||||||||||||||||||||||||||||||
| == 0 | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| # Other operators cannot fuse with StreamingRepartition. | ||||||||||||||||||||||||||||||||||
| # StreamingRepartition can only fuse in non-strict mode. | ||||||||||||||||||||||||||||||||||
| # In strict mode, it does not support further fusion. | ||||||||||||||||||||||||||||||||||
| if isinstance(up_logical_op, StreamingRepartition): | ||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||
| return not up_logical_op._strict | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Otherwise, ops are compatible for fusion. | ||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||
|
|
@@ -312,9 +321,30 @@ def _get_fused_streaming_repartition_operator( | |||||||||||||||||||||||||||||||||
| up_logical_op = self._op_map.pop(up_op) | ||||||||||||||||||||||||||||||||||
| assert isinstance(up_logical_op, MapBatches) | ||||||||||||||||||||||||||||||||||
| assert isinstance(down_logical_op, StreamingRepartition) | ||||||||||||||||||||||||||||||||||
| assert up_logical_op.batch_size % down_logical_op.target_num_rows_per_block == 0 | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| batch_size = up_logical_op.batch_size | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Choose ref_bundler and fusion behavior based on strict mode | ||||||||||||||||||||||||||||||||||
| if down_logical_op._strict: | ||||||||||||||||||||||||||||||||||
| # Strict mode: use StreamingRepartitionRefBundler for stitching. | ||||||||||||||||||||||||||||||||||
| # Only works when batch_size % target == 0 (verified in _can_fuse). | ||||||||||||||||||||||||||||||||||
| assert batch_size % down_logical_op.target_num_rows_per_block == 0, ( | ||||||||||||||||||||||||||||||||||
| f"Strict mode fusion requires batch_size ({batch_size}) to be " | ||||||||||||||||||||||||||||||||||
| f"a multiple of target_num_rows_per_block " | ||||||||||||||||||||||||||||||||||
| f"({down_logical_op.target_num_rows_per_block})" | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| ref_bundler = StreamingRepartitionRefBundler(batch_size) | ||||||||||||||||||||||||||||||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
| # No further fusion because StreamingRepartitionRefBundler is stateful | ||||||||||||||||||||||||||||||||||
| # and maintains internal buffering state across bundles. | ||||||||||||||||||||||||||||||||||
| supports_fusion = False | ||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this prevent fusion when
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but I think it's intended. As the original code (strict mode) hard-coded # For now, we don't want to over-fuse StreamingRepartition with other map operators,
# so the result operator does not support further fusion.
supports_fusion=False,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'd not be blocking any subsequent fusion like that Let's add a test that we're able to fuse multiple ops like this:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While the comment is on line 338 (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Map > SR > SR case cannot work here because after the first Map > SR fusion, the logical operator becomes ray/python/ray/data/_internal/logical/rules/operator_fusion.py Lines 355 to 369 in f3d444a
The current implementation only allows MapBatches > SR fusion:
To support Map > SR > SR fusion, we will need more changes, which I think is a bit out of scope of this PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| # Non-strict mode: use default pass-through bundler. | ||||||||||||||||||||||||||||||||||
| # Works with any batch_size without cross-task buffering. | ||||||||||||||||||||||||||||||||||
| ref_bundler = None | ||||||||||||||||||||||||||||||||||
| # Can fuse further because the default bundler is stateless | ||||||||||||||||||||||||||||||||||
| # and processes each bundle independently. | ||||||||||||||||||||||||||||||||||
| supports_fusion = True | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| compute = self._fuse_compute_strategy( | ||||||||||||||||||||||||||||||||||
| up_logical_op.compute, down_logical_op.compute | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
@@ -331,19 +361,23 @@ def _get_fused_streaming_repartition_operator( | |||||||||||||||||||||||||||||||||
| input_op = input_deps[0] | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| assert up_op.data_context is down_op.data_context | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # In non-strict mode, use min_rows_per_bundle to ensure creating batches with batch_size. | ||||||||||||||||||||||||||||||||||
| # In strict mode, ref_bundler handles bundling, so do not set min_rows_per_bundle. | ||||||||||||||||||||||||||||||||||
| min_rows = None if down_logical_op._strict else batch_size | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| op = MapOperator.create( | ||||||||||||||||||||||||||||||||||
| up_op.get_map_transformer().fuse(down_op.get_map_transformer()), | ||||||||||||||||||||||||||||||||||
| input_op, | ||||||||||||||||||||||||||||||||||
| up_op.data_context, | ||||||||||||||||||||||||||||||||||
| name=name, | ||||||||||||||||||||||||||||||||||
| compute_strategy=compute, | ||||||||||||||||||||||||||||||||||
| ref_bundler=StreamingRepartitionRefBundler(batch_size), | ||||||||||||||||||||||||||||||||||
| min_rows_per_bundle=min_rows, | ||||||||||||||||||||||||||||||||||
| ref_bundler=ref_bundler, | ||||||||||||||||||||||||||||||||||
| map_task_kwargs=map_task_kwargs, | ||||||||||||||||||||||||||||||||||
| ray_remote_args=ray_remote_args, | ||||||||||||||||||||||||||||||||||
| ray_remote_args_fn=ray_remote_args_fn, | ||||||||||||||||||||||||||||||||||
| # For now, we don't want to over-fuse StreamingRepartition with other map operators, | ||||||||||||||||||||||||||||||||||
| # so the result operator does not support further fusion. | ||||||||||||||||||||||||||||||||||
| supports_fusion=False, | ||||||||||||||||||||||||||||||||||
| supports_fusion=supports_fusion, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| op.set_logical_operators(*up_op._logical_operators, *down_op._logical_operators) | ||||||||||||||||||||||||||||||||||
| for map_task_kwargs_fn in itertools.chain( | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -195,14 +195,18 @@ def plan_streaming_repartition_op( | |||||||||||||||||||
| ) | ||||||||||||||||||||
| map_transformer = MapTransformer([transform_fn]) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Disable fusion for streaming repartition with the downstream op. | ||||||||||||||||||||
| if op._strict: | ||||||||||||||||||||
| ref_bundler = StreamingRepartitionRefBundler(op.target_num_rows_per_block) | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| ref_bundler = None | ||||||||||||||||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| operator = MapOperator.create( | ||||||||||||||||||||
| map_transformer, | ||||||||||||||||||||
| input_physical_dag, | ||||||||||||||||||||
| data_context, | ||||||||||||||||||||
| name=op.name, | ||||||||||||||||||||
| compute_strategy=compute, | ||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in 89965d0
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like when we set
Therefor, I think we should keep it as ray/python/ray/data/_internal/execution/operators/map_operator.py Lines 828 to 835 in 68d01c4
|
||||||||||||||||||||
| ref_bundler=StreamingRepartitionRefBundler(op.target_num_rows_per_block), | ||||||||||||||||||||
| ref_bundler=ref_bundler, | ||||||||||||||||||||
| ray_remote_args=op.ray_remote_args, | ||||||||||||||||||||
| ray_remote_args_fn=op.ray_remote_args_fn, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1632,6 +1632,7 @@ def repartition( | |
| num_blocks: Optional[int] = None, | ||
| target_num_rows_per_block: Optional[int] = None, | ||
| *, | ||
| strict: bool = False, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new You could add something like this to the strict: If ``True``, `repartition` guarantees that all output blocks, except for the last one, will have `target_num_rows_per_block` rows. If ``False``, `repartition` is more relaxed and may produce blocks smaller than `target_num_rows_per_block` without stitching them. This is only used with `target_num_rows_per_block`. Defaults to ``False``.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in dc609e1 |
||
| shuffle: bool = False, | ||
| keys: Optional[List[str]] = None, | ||
| sort: bool = False, | ||
|
|
@@ -1681,6 +1682,13 @@ def repartition( | |
| optimal execution, based on the `target_num_rows_per_block`. This is | ||
| the current behavior because of the implementation and may change in | ||
| the future. | ||
| strict: If ``True``, ``repartition`` guarantees that all output blocks, | ||
| except for the last one, will have exactly ``target_num_rows_per_block`` rows. | ||
| If ``False``, ``repartition`` uses best-effort bundling and may produce at most | ||
| one block smaller than ``target_num_rows_per_block`` per input block without | ||
| forcing exact sizes through block splitting. | ||
| This parameter is only used with ``target_num_rows_per_block``. | ||
| Defaults to ``False``. | ||
| shuffle: Whether to perform a distributed shuffle during the | ||
| repartition. When shuffle is enabled, each output block | ||
| contains a subset of data rows from each input block, which | ||
|
|
@@ -1717,6 +1725,13 @@ def repartition( | |
| warnings.warn( | ||
| "`shuffle` is ignored when `target_num_rows_per_block` is set." | ||
| ) | ||
| else: | ||
| if strict: | ||
| # strict is used in row-based repartition only | ||
| warnings.warn( | ||
| "`strict` is ignored when `target_num_rows_per_block` is not set. " | ||
| "Use `target_num_rows_per_block` instead of `num_blocks` to enable `strict` mode." | ||
| ) | ||
|
|
||
| if (num_blocks is None) and (target_num_rows_per_block is None): | ||
| raise ValueError( | ||
|
|
@@ -1738,6 +1753,7 @@ def repartition( | |
| op = StreamingRepartition( | ||
| self._logical_plan.dag, | ||
| target_num_rows_per_block=target_num_rows_per_block, | ||
| strict=strict, | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
| else: | ||
| op = Repartition( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.