Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,24 @@ def get_strategy(strategy):

stats = placement_group_info.stats
assert placement_group_info is not None

scheduling_options = []
for strategy_proto in placement_group_info.scheduling_options:
bundles_list = []
label_selectors_list = []
for bundle in strategy_proto.bundles:
# Extract unit resources
bundle_dict = message_to_dict(bundle)
bundles_list.append(bundle_dict.get("unitResources", {}))
# Extract label selector from the bundle
label_selectors_list.append(message_to_dict(bundle.label_selector))

strategy_dict = {
"bundles": bundles_list,
"bundle_label_selector": label_selectors_list,
}
scheduling_options.append(strategy_dict)

return {
"placement_group_id": binary_to_hex(
placement_group_info.placement_group_id
Expand Down Expand Up @@ -373,6 +391,7 @@ def get_strategy(strategy):
stats.scheduling_state
].name,
},
"scheduling_options": scheduling_options,
}

def _nanoseconds_to_microseconds(self, time_in_nanoseconds):
Expand Down
60 changes: 58 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ from ray.includes.common cimport (
CLabelNotIn,
CLabelSelector,
CNodeResources,
CPlacementGroupSchedulingOption,
CRayFunction,
CWorkerType,
CJobConfig,
Expand Down Expand Up @@ -661,6 +662,54 @@ cdef int prepare_fallback_strategy(

return 0

cdef int prepare_bundle_label_selector(
list selector_list,
c_vector[CLabelSelector] *out_vector) except -1:

cdef CLabelSelector c_label_selector

if selector_list:
out_vector.reserve(len(selector_list))
for selector_dict in selector_list:
c_label_selector = CLabelSelector()
prepare_label_selector(selector_dict, &c_label_selector)
out_vector.push_back(c_label_selector)

return 0

cdef int prepare_placement_group_fallback_strategy(
list fallback_strategy,
c_vector[CPlacementGroupSchedulingOption] *fallback_strategy_vector) except -1:

cdef:
CPlacementGroupSchedulingOption c_option
unordered_map[c_string, double] c_bundle_map

if fallback_strategy is None:
return 0

fallback_strategy_vector.reserve(len(fallback_strategy))

for option_dict in fallback_strategy:
c_option = CPlacementGroupSchedulingOption()

# Convert bundles field to C and prepare unit resources.
bundles_list = option_dict.get("bundles")
if bundles_list:
c_option.bundles.reserve(len(bundles_list))
for bundle in bundles_list:
c_bundle_map.clear()
prepare_resources(bundle, &c_bundle_map)
c_option.bundles.push_back(c_bundle_map)

# Convert bundle_label_selector field to C and prepare label selectors.
selector_list = option_dict.get("bundle_label_selector", [])
prepare_bundle_label_selector(selector_list, &c_option.bundle_label_selector)

fallback_strategy_vector.push_back(c_option)

return 0

cdef int prepare_resources(
dict resource_dict,
unordered_map[c_string, double] *resource_map) except -1:
Expand Down Expand Up @@ -3695,11 +3744,14 @@ cdef class CoreWorker:
c_string strategy,
c_bool is_detached,
soft_target_node_id,
c_vector[unordered_map[c_string, c_string]] bundle_label_selector):
list bundle_label_selector,
list fallback_strategy=None):
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
CNodeID c_soft_target_node_id = CNodeID.Nil()
c_vector[CLabelSelector] c_bundle_label_selector
c_vector[CPlacementGroupSchedulingOption] c_fallback_strategy

if strategy == b"PACK":
c_strategy = PLACEMENT_STRATEGY_PACK
Expand All @@ -3716,6 +3768,9 @@ cdef class CoreWorker:
if soft_target_node_id is not None:
c_soft_target_node_id = CNodeID.FromHex(soft_target_node_id)

prepare_bundle_label_selector(bundle_label_selector, &c_bundle_label_selector)
prepare_placement_group_fallback_strategy(fallback_strategy, &c_fallback_strategy)

with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().
Expand All @@ -3726,7 +3781,8 @@ cdef class CoreWorker:
bundles,
is_detached,
c_soft_target_node_id,
bundle_label_selector),
c_bundle_label_selector,
c_fallback_strategy),
&c_placement_group_id))

return PlacementGroupID(c_placement_group_id.Binary())
Expand Down
8 changes: 7 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ cdef extern from "ray/core_worker/common.h" nogil:
CLabelSelector label_selector,
c_vector[CFallbackOption] fallback_strategy)

cdef cppclass CPlacementGroupSchedulingOption "ray::core::PlacementGroupSchedulingOption":
CPlacementGroupSchedulingOption()
c_vector[unordered_map[c_string, double]] bundles
c_vector[CLabelSelector] bundle_label_selector

cdef cppclass CPlacementGroupCreationOptions \
"ray::core::PlacementGroupCreationOptions":
CPlacementGroupCreationOptions()
Expand All @@ -431,7 +436,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
const c_vector[unordered_map[c_string, double]] &bundles,
c_bool is_detached,
CNodeID soft_target_node_id,
const c_vector[unordered_map[c_string, c_string]] &bundle_label_selector,
const c_vector[CLabelSelector] &bundle_label_selector,
const c_vector[CPlacementGroupSchedulingOption] &fallback_strategy,
)

cdef cppclass CObjectLocation "ray::core::ObjectLocation":
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ py_test_module_list(
"test_placement_group_3.py",
"test_placement_group_4.py",
"test_placement_group_5.py",
"test_placement_group_fallback.py",
"test_scheduling.py",
"test_scheduling_2.py",
"test_token_auth_integration.py",
Expand Down
233 changes: 233 additions & 0 deletions python/ray/tests/test_placement_group_fallback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import pytest

import ray
from ray._private.test_utils import placement_group_assert_no_leak
from ray.util.placement_group import (
placement_group,
placement_group_table,
remove_placement_group,
)


def test_placement_group_fallback_resources(ray_start_cluster):
"""Test fallback based on resource bundles."""
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)

# Feasible fallback strategy with bundles requesting <= available CPU.
fallback_strategy = [{"bundles": [{"CPU": 4}]}]

pg = placement_group(
name="resource_fallback_pg",
bundles=[{"CPU": 8}], # Infeasible initial bundle request.
strategy="PACK",
fallback_strategy=fallback_strategy,
)
# Placement group is scheduled using fallback.
ray.get(pg.ready(), timeout=10)

# Example task to try to schedule to used node.
@ray.remote(num_cpus=1)
def check_capacity():
return "ok"

# Example task times out because all CPU used by placement group.
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(check_capacity.remote(), timeout=2)

remove_placement_group(pg)
placement_group_assert_no_leak([pg])


def test_placement_group_fallback_strategy_labels(ray_start_cluster):
"""
Test that fallback strategy is used when primary bundles are not feasible
due to label constraints.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=2, labels={}) # Unlabelled node
cluster.add_node(num_cpus=2, labels={"region": "us-west1"})
ray.init(address=cluster.address)

fallback_strategy = [
{"bundles": [{"CPU": 2}], "bundle_label_selector": [{"region": "us-west1"}]}
]

pg = placement_group(
name="fallback_pg",
bundles=[{"CPU": 2}],
bundle_label_selector=[{"region": "us-east1"}], # Infeasible label
strategy="PACK",
fallback_strategy=fallback_strategy, # Feasible fallback
)

# Succeeds due to fallback option
ray.get(pg.ready(), timeout=10)

# Verify it was scheduled on the correct node
table = placement_group_table(pg)
bundle_node_id = table["bundles_to_node_id"][0]

found = False
for node in ray.nodes():
if node["NodeID"] == bundle_node_id:
assert node["Labels"]["region"] == "us-west1"
found = True
break
assert found, "Scheduled node not found in cluster state"

remove_placement_group(pg)
placement_group_assert_no_leak([pg])


def test_placement_group_fallback_priority(ray_start_cluster):
"""Test that the first feasible fallback option is chosen from multiple feasible fallbacks."""
cluster = ray_start_cluster
# Node has 10 CPUs
cluster.add_node(num_cpus=10)
ray.init(address=cluster.address)

fallback_strategy = [
{"bundles": [{"CPU": 11}]}, # Infeasible
{"bundles": [{"CPU": 5}]}, # Feasible
{"bundles": [{"CPU": 1}]}, # Feasible
]

pg = placement_group(
name="priority_pg",
bundles=[{"CPU": 20}], # Infeasible main bundles.
strategy="PACK",
fallback_strategy=fallback_strategy,
)

ray.get(pg.ready(), timeout=10)

# Verify we consumed 5 CPUs, not 1.
@ray.remote(num_cpus=6)
def heavy_task():
return "ok"

with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(heavy_task.remote(), timeout=2)

remove_placement_group(pg)
placement_group_assert_no_leak([pg])


def test_placement_group_fallback_bundle_shapes(ray_start_cluster):
"""Test fallback works even when changing the number of bundles."""
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

# Feasible fallback specifies 2 bundles with 1 CPU each (rather than 1 bundle
# with 2 CPU).
fallback_strategy = [{"bundles": [{"CPU": 1}, {"CPU": 1}]}]

pg = placement_group(
name="reshape_pg",
bundles=[{"CPU": 2}], # Infeasible 2 CPU bundle on any node.
strategy="SPREAD",
fallback_strategy=fallback_strategy,
)

ray.get(pg.ready(), timeout=10)

table = placement_group_table(pg)
assert len(table["bundles"]) == 2

remove_placement_group(pg)
placement_group_assert_no_leak([pg])


def test_multiple_placement_groups_and_fallbacks(ray_start_cluster):
"""
Test that multiple placement groups with fallback strategies correctly subtract
from available resources in the cluster.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=10)
ray.init(address=cluster.address)

# Define a fallback strategy that uses 3 CPUs.
fallback_strategy = [{"bundles": [{"CPU": 3}]}]

pgs = []
for i in range(3):
pg = placement_group(
name=f"pg_{i}",
bundles=[{"CPU": 100}], # Infeasible
strategy="PACK",
fallback_strategy=fallback_strategy,
)
pgs.append(pg)

# Create 3 PGs that should all use the fallback strategy.
for pg in pgs:
ray.get(pg.ready(), timeout=10)

# Verify we can still schedule a task utilizing the last CPU (10 total - 9 used by PGs).
@ray.remote(num_cpus=1)
def small_task():
return "ok"

assert ray.get(small_task.remote(), timeout=5) == "ok"

# Validate PGs with fallback correctly subtract from the available cluster resources to where
# a task requesting more CPU than is available times out.
@ray.remote(num_cpus=2)
def large_task():
return "fail"

with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(large_task.remote(), timeout=2)

for pg in pgs:
remove_placement_group(pg)
placement_group_assert_no_leak(pgs)


def test_placement_group_fallback_validation(ray_start_cluster):
"""
Verifies that PG validates resource shape with both primary and fallback bundles.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=4, num_gpus=0)
ray.init(address=cluster.address)

pg = placement_group(
name="validation_pg",
bundles=[{"GPU": 1}],
strategy="PACK",
fallback_strategy=[{"bundles": [{"CPU": 1}]}],
)

# Task requires CPU, primary option has only GPU.
# The client-side validation logic should check the fallback strategy
# and allow this task to proceed.
@ray.remote(num_cpus=1)
def run_on_cpu():
return "success"

try:
# If client-side validation fails, this raises ValueError immediately.
ref = run_on_cpu.options(placement_group=pg).remote()
assert ray.get(ref) == "success"
except ValueError as e:
pytest.fail(f"Validation failed for fallback-compatible task: {e}")

# Verify bundle_specs contains active bundles.
ray.get(pg.ready())
assert pg.bundle_specs[0].get("CPU") == 1
assert pg.bundle_specs[0].get("GPU") is None

remove_placement_group(pg)
placement_group_assert_no_leak([pg])


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
Loading