Skip to content

[Data] Support strict=False mode for StreamingRepartition#60295

Open
machichima wants to merge 24 commits intoray-project:masterfrom
machichima:streamingrepartition-strict-false
Open

[Data] Support strict=False mode for StreamingRepartition#60295
machichima wants to merge 24 commits intoray-project:masterfrom
machichima:streamingrepartition-strict-false

Conversation

@machichima
Copy link
Contributor

@machichima machichima commented Jan 19, 2026

Description

Currently, StreamingRepartition operator is essentially strict=True. We want to relax this to allow non-strict mode with following guarantees:

  • Strict mode: is guaranteeing that all output blocks (maybe except for the last one), will be of size target_num_rows
  • Non-strict mode: will provide more relaxed guarantee – it can produce 1 block that is < target_num_rows blocks per input block (ie it wouldn’t do any stitching)

This mode will be the default mode and would allow StreamingRepartition to be fused into previous operator

Related issues

Closes #60026

Additional information

  • Added strict: bool = False parameter to repartition()
  • Added mode-specific bundler selection in _get_fused_streaming_repartition_operator() and plan_streaming_repartition_op():
    - Strict: uses ref_bundler=StreamingRepartitionRefBundler
    - Non-strict: uses ref_bundler=None (default BlockRefBundler)
  • Add unit tests

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from a team as a code owner January 19, 2026 12:00
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a strict parameter to StreamingRepartition, allowing for a non-strict mode. In non-strict mode, repartitioning doesn't stitch blocks, which enables more operator fusion opportunities. The changes are well-implemented across the logical planning, fusion rules, and physical planning layers. The default for repartition is now non-strict, which is a good choice for performance. The added tests are comprehensive and cover both the new non-strict behavior and the fusion logic. My main feedback is to add documentation for the new strict parameter in the user-facing Dataset.repartition method to ensure users understand how to use it.

num_blocks: Optional[int] = None,
target_num_rows_per_block: Optional[int] = None,
*,
strict: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new strict parameter should be documented in the repartition method's docstring. Explaining the difference between strict=True (the old behavior) and strict=False (the new default) is important for users to understand its impact on block sizes and fusion.

You could add something like this to the Args section:

strict: If ``True``, `repartition` guarantees that all output blocks, except for the last one, will have `target_num_rows_per_block` rows. If ``False``, `repartition` is more relaxed and may produce blocks smaller than `target_num_rows_per_block` without stitching them. This is only used with `target_num_rows_per_block`. Defaults to ``False``.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in dc609e1

cursor[bot]

This comment was marked as outdated.

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima
Copy link
Contributor Author

@owenowenisme PTAL. Thank you!

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 19, 2026
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the streamingrepartition-strict-false branch from 6cfbfc5 to 04964bc Compare January 19, 2026 21:25
cursor[bot]

This comment was marked as outdated.

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…artition-strict-false

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Copy link
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_operator_fusion is failing could you please take a look?

input_physical_dag,
data_context,
name=op.name,
compute_strategy=compute,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need min_rows_per_bundle = op.target_num_rows_per_block here if strict=False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 89965d0

Copy link
Contributor Author

@machichima machichima Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like when we set min_rows_per_bundle here, the BlockRefBundler will try to stitch the output:

return list(output_buffer), _merge_ref_bundles(*output_buffer)

Therefor, I think we should keep it as None here to prevent stitching

if self._min_rows_per_bundle is None:
# Short-circuit if no bundle row target was defined.
assert len(self._bundle_buffer) == 1
bundle = self._bundle_buffer[0]
self._bundle_buffer = []
self._bundle_buffer_size = 0
self._bundle_buffer_size_bytes = 0
return [bundle], bundle

Comment on lines 1693 to 1698
strict: If ``True``, ``repartition`` guarantees that all output blocks,
except for the last one, will have exactly ``target_num_rows_per_block`` rows.
If ``False``, ``repartition`` is more relaxed and may produce blocks smaller
than ``target_num_rows_per_block`` without stitching them together.
This parameter is only used with ``target_num_rows_per_block``.
Defaults to ``False``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to say that will only produce at most 1 block that is < target_num_rows_per_block per input block if strict is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in f748b79



@pytest.mark.parametrize("batch_size", [30, 35, 45])
def test_streaming_repartition_fusion_non_strict(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fusion test should be in python/ray/data/tests/test_operator_fusion.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's existing fusion and streaming repartition related test in this file, I think we can put this here as it align with existing tests. WDYT?

def test_streaming_repartition_fusion_output_shape(

ref_bundler = StreamingRepartitionRefBundler(batch_size)
# No further fusion because StreamingRepartitionRefBundler is stateful
# and maintains internal buffering state across bundles.
supports_fusion = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this prevent fusion when batch_size == target_num_rows_per_block ?

Copy link
Contributor Author

@machichima machichima Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think it's intended. As the original code (strict mode) hard-coded supports_fusion=False to prevent further fusion

            # For now, we don't want to over-fuse StreamingRepartition with other map operators,
            # so the result operator does not support further fusion.
            supports_fusion=False,

Comment on lines 425 to 428
strict: If True, guarantees that all output blocks, except for the last one,
will have exactly target_num_rows_per_block rows. If False, is more relaxed
and may produce blocks smaller than target_num_rows_per_block without
stitching them together. Defaults to False.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto with the comment in dataset.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in f748b79

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Signed-off-by: machichima <nary12321@gmail.com>
Comment on lines 277 to 283
if not (
isinstance(up_logical_op, MapBatches)
and up_logical_op._batch_size is not None
and down_logical_op.target_num_rows_per_block is not None
and down_logical_op.target_num_rows_per_block > 0
# When the batch_size is a multiple of target_num_rows_per_block, fusing would still produce exactly identical sequence of blocks.
# See `_fuse_streaming_repartition_operators_in_dag` docstring for details.
# TODO: when the StreamingRepartition supports none_strict_mode, we can fuse
# `MapBatches -> StreamingRepartition` no matter what the `batch_size` and `target_num_rows` are.
# https://anyscale1.atlassian.net/browse/DATA-1731
and up_logical_op._batch_size
% down_logical_op.target_num_rows_per_block
):
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this logic is correct -- if _batch_size is None we'd still allow to fuse StreamingRepartition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexeykudinkin ,
I was following the original logic here, which also return False when _batch_size is None

if isinstance(down_logical_op, StreamingRepartition):
return (
isinstance(up_logical_op, MapBatches)
and up_logical_op._batch_size is not None
and down_logical_op.target_num_rows_per_block is not None
and down_logical_op.target_num_rows_per_block > 0
# When the batch_size is a multiple of target_num_rows_per_block, fusing would still produce exactly identical sequence of blocks.
# See `_fuse_streaming_repartition_operators_in_dag` docstring for details.
# TODO: when the StreamingRepartition supports none_strict_mode, we can fuse
# `MapBatches -> StreamingRepartition` no matter what the `batch_size` and `target_num_rows` are.
# https://anyscale1.atlassian.net/browse/DATA-1731
and up_logical_op._batch_size
% down_logical_op.target_num_rows_per_block
== 0
)

Also, while we use StreamingRepartitionRefBundler(batch_size), based on the class def, the batch_size cannot be None

def __init__(self, target_num_rows_per_block: int):
assert (
target_num_rows_per_block > 0
), "target_num_rows_per_block must be positive for streaming repartition."

Therefor, I think we should keep this here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you're relaxing this, right?

There are now should be 2 modes:

  • StreamingRepartition(strict=True): batch-size need to be exact multiple of target_num_rows_per_block to produce correct results.
  • StreamingRepartition(strict=False): batch-size could be anything (even null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense! Thank you for pointing this out. Updated in c77787c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not (
isinstance(up_logical_op, MapBatches)
and up_logical_op._batch_size is not None
and down_logical_op.target_num_rows_per_block is not None
and down_logical_op.target_num_rows_per_block > 0
# When the batch_size is a multiple of target_num_rows_per_block, fusing would still produce exactly identical sequence of blocks.
# See `_fuse_streaming_repartition_operators_in_dag` docstring for details.
# TODO: when the StreamingRepartition supports none_strict_mode, we can fuse
# `MapBatches -> StreamingRepartition` no matter what the `batch_size` and `target_num_rows` are.
# https://anyscale1.atlassian.net/browse/DATA-1731
and up_logical_op._batch_size
% down_logical_op.target_num_rows_per_block
):
return False
if (
not isinstance(up_logical_op, MapBatches)
or not down_logical_op.target_num_rows_per_block
):
return False

Can we simplify the logic here like this? And add check and raise error at dataset api to check if target_num_rows_per_block is not None it should not be negative

(Maybe move this)

assert (
target_num_rows_per_block > 0
), "target_num_rows_per_block must be positive for streaming repartition."

@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Feb 5, 2026
…artition-strict-false

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
ref_bundler = StreamingRepartitionRefBundler(batch_size)
# No further fusion because StreamingRepartitionRefBundler is stateful
# and maintains internal buffering state across bundles.
supports_fusion = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd not be blocking any subsequent fusion like that

Let's add a test that we're able to fuse multiple ops like this:

  • Map > Map > SR
  • Map > SR > SR

Copy link
Contributor Author

@machichima machichima Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the comment is on line 338 (supports_fusion=False), I want to make sure do we want to support fusion for strict mode? Or just add test for non-strict mode? I think it's the latter one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map > SR > SR case cannot work here because after the first Map > SR fusion, the logical operator becomes AbstractUDFMap rather than MapBatches.

logical_op = AbstractUDFMap(
name,
input_op,
up_logical_op.fn,
can_modify_num_rows=up_logical_op.can_modify_num_rows,
fn_args=up_logical_op.fn_args,
fn_kwargs=up_logical_op.fn_kwargs,
fn_constructor_args=up_logical_op.fn_constructor_args,
fn_constructor_kwargs=up_logical_op.fn_constructor_kwargs,
min_rows_per_bundled_input=batch_size,
compute=compute,
ray_remote_args_fn=ray_remote_args_fn,
ray_remote_args=ray_remote_args,
)
self._op_map[op] = logical_op

The current implementation only allows MapBatches > SR fusion:

and isinstance(self._op_map[upstream_ops[0]], MapBatches)

To support Map > SR > SR fusion, we will need more changes, which I think is a bit out of scope of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in:

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Support strict=False mode for StreamingRepartition

3 participants