Skip to content

[Data] Operator fusion test is flaky #60700

@bveeramani

Description

@bveeramani

[2026-02-03T01:27:40Z] __________ test_map_fusion_with_concurrency_arg[False-2-True-2-True] ___________

[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] ray_start_regular_shared_2_cpus = RayContext(dashboard_url='127.0.0.1:8265', python_version='3.10.19', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT_SHA}}')
[2026-02-03T01:27:40Z] up_use_actor = False, up_concurrency = 2, down_use_actor = True
[2026-02-03T01:27:40Z] down_concurrency = 2, should_fuse = True
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] @pytest.mark.parametrize(
[2026-02-03T01:27:40Z] "up_use_actor, up_concurrency, down_use_actor, down_concurrency, should_fuse",
[2026-02-03T01:27:40Z] [
[2026-02-03T01:27:40Z] # === Task->Task cases ===
[2026-02-03T01:27:40Z] # Same concurrency set. Should fuse.
[2026-02-03T01:27:40Z] (False, 1, False, 1, True),
[2026-02-03T01:27:40Z] # Different concurrency set. Should not fuse.
[2026-02-03T01:27:40Z] (False, 1, False, 2, False),
[2026-02-03T01:27:40Z] # If one op has concurrency set, and the other doesn't, should not fuse.
[2026-02-03T01:27:40Z] (False, None, False, 1, False),
[2026-02-03T01:27:40Z] (False, 1, False, None, False),
[2026-02-03T01:27:40Z] # === Task->Actor cases ===
[2026-02-03T01:27:40Z] # When Task's concurrency is not set, should fuse.
[2026-02-03T01:27:40Z] (False, None, True, 2, True),
[2026-02-03T01:27:40Z] (False, None, True, (1, 2), True),
[2026-02-03T01:27:40Z] # When max size matches, should fuse.
[2026-02-03T01:27:40Z] (False, 2, True, 2, True),
[2026-02-03T01:27:40Z] (False, 2, True, (1, 2), True),
[2026-02-03T01:27:40Z] # When max size doesn't match, should not fuse.
[2026-02-03T01:27:40Z] (False, 1, True, 2, False),
[2026-02-03T01:27:40Z] (False, 1, True, (1, 2), False),
[2026-02-03T01:27:40Z] # === Actor->Task cases ===
[2026-02-03T01:27:40Z] # Should not fuse whatever concurrency is set.
[2026-02-03T01:27:40Z] (True, 2, False, 2, False),
[2026-02-03T01:27:40Z] # === Actor->Actor cases ===
[2026-02-03T01:27:40Z] # Should not fuse whatever concurrency is set.
[2026-02-03T01:27:40Z] (True, 2, True, 2, False),
[2026-02-03T01:27:40Z] ],
[2026-02-03T01:27:40Z] )
[2026-02-03T01:27:40Z] def test_map_fusion_with_concurrency_arg(
[2026-02-03T01:27:40Z] ray_start_regular_shared_2_cpus,
[2026-02-03T01:27:40Z] up_use_actor,
[2026-02-03T01:27:40Z] up_concurrency,
[2026-02-03T01:27:40Z] down_use_actor,
[2026-02-03T01:27:40Z] down_concurrency,
[2026-02-03T01:27:40Z] should_fuse,
[2026-02-03T01:27:40Z] ):
[2026-02-03T01:27:40Z] """Test map operator fusion with different concurrency settings."""
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] class Map:
[2026-02-03T01:27:40Z] def call(self, row):
[2026-02-03T01:27:40Z] return row
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] def map(row):
[2026-02-03T01:27:40Z] return row
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] ds = ray.data.range(10, override_num_blocks=2)
[2026-02-03T01:27:40Z] if not up_use_actor:
[2026-02-03T01:27:40Z] ds = ds.map(map, num_cpus=0, concurrency=up_concurrency)
[2026-02-03T01:27:40Z] up_name = "Map(map)"
[2026-02-03T01:27:40Z] else:
[2026-02-03T01:27:40Z] ds = ds.map(Map, num_cpus=0, concurrency=up_concurrency)
[2026-02-03T01:27:40Z] up_name = "Map(Map)"
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] if not down_use_actor:
[2026-02-03T01:27:40Z] ds = ds.map(map, num_cpus=0, concurrency=down_concurrency)
[2026-02-03T01:27:40Z] down_name = "Map(map)"
[2026-02-03T01:27:40Z] else:
[2026-02-03T01:27:40Z] ds = ds.map(Map, num_cpus=0, concurrency=down_concurrency)
[2026-02-03T01:27:40Z] down_name = "Map(Map)"
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] > assert extract_values("id", ds.take_all()) == list(range(10))
[2026-02-03T01:27:40Z] E assert [5, 6, 7, 8, 9, 0, ...] == [0, 1, 2, 3, 4, 5, ...]
[2026-02-03T01:27:40Z] E At index 0 diff: 5 != 0
[2026-02-03T01:27:40Z] E Full diff:
[2026-02-03T01:27:40Z] E - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2026-02-03T01:27:40Z] E + [5, 6, 7, 8, 9, 0, 1, 2, 3, 4]
[2026-02-03T01:27:40Z]
[2026-02-03T01:27:40Z] python/ray/data/tests/test_operator_fusion.py:684: AssertionError

Task

  • For this test and all other tests in the module that assume ordering, use the rows_same utility function to deflake

https://buildkite.com/ray-project/postmerge/builds/15775#019c2108-744c-414d-ad5e-84ca35b062dd

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issues

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions