From 629f7e2a59deacd5c0f929d7641de88132ac8b51 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Tue, 3 Feb 2026 23:53:53 +0000 Subject: [PATCH 1/9] Fix placement group bundle semantics and fallback logic Signed-off-by: ryanaoleary --- .../tests/test_placement_group_fallback.py | 233 +++++++++++++++++ src/ray/gcs/gcs_placement_group.cc | 25 +- src/ray/gcs/gcs_placement_group.h | 16 +- src/ray/gcs/gcs_placement_group_scheduler.cc | 80 +++++- src/ray/gcs/gcs_placement_group_scheduler.h | 7 + .../gcs_placement_group_scheduler_test.cc | 241 ++++++++++++++++++ src/ray/protobuf/gcs.proto | 14 +- 7 files changed, 599 insertions(+), 17 deletions(-) create mode 100644 python/ray/tests/test_placement_group_fallback.py diff --git a/python/ray/tests/test_placement_group_fallback.py b/python/ray/tests/test_placement_group_fallback.py new file mode 100644 index 000000000000..952e79a14a7c --- /dev/null +++ b/python/ray/tests/test_placement_group_fallback.py @@ -0,0 +1,233 @@ +import pytest + +import ray +from ray._private.test_utils import placement_group_assert_no_leak +from ray.util.placement_group import ( + placement_group, + placement_group_table, + remove_placement_group, +) + + +def test_placement_group_fallback_resources(ray_start_cluster): + """Test fallback based on resource bundles.""" + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + # Feasible fallback strategy with bundles requesting <= available CPU. + fallback_strategy = [{"bundles": [{"CPU": 4}]}] + + pg = placement_group( + name="resource_fallback_pg", + bundles=[{"CPU": 8}], # Infeasible initial bundle request. + strategy="PACK", + fallback_strategy=fallback_strategy, + ) + # Placement group is scheduled using fallback. + ray.get(pg.ready(), timeout=10) + + # Example task to try to schedule to used node. + @ray.remote(num_cpus=1) + def check_capacity(): + return "ok" + + # Example task times out because all CPU used by placement group. + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(check_capacity.remote(), timeout=2) + + remove_placement_group(pg) + placement_group_assert_no_leak([pg]) + + +def test_placement_group_fallback_strategy_labels(ray_start_cluster): + """ + Test that fallback strategy is used when primary bundles are not feasible + due to label constraints. + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=2, labels={}) # Unlabelled node + cluster.add_node(num_cpus=2, labels={"region": "us-west1"}) + ray.init(address=cluster.address) + + fallback_strategy = [ + {"bundles": [{"CPU": 2}], "bundle_label_selector": [{"region": "us-west1"}]} + ] + + pg = placement_group( + name="fallback_pg", + bundles=[{"CPU": 2}], + bundle_label_selector=[{"region": "us-east1"}], # Infeasible label + strategy="PACK", + fallback_strategy=fallback_strategy, # Feasible fallback + ) + + # Succeeds due to fallback option + ray.get(pg.ready(), timeout=10) + + # Verify it was scheduled on the correct node + table = placement_group_table(pg) + bundle_node_id = table["bundles_to_node_id"][0] + + found = False + for node in ray.nodes(): + if node["NodeID"] == bundle_node_id: + assert node["Labels"]["region"] == "us-west1" + found = True + break + assert found, "Scheduled node not found in cluster state" + + remove_placement_group(pg) + placement_group_assert_no_leak([pg]) + + +def test_placement_group_fallback_priority(ray_start_cluster): + """Test that the first feasible fallback option is chosen from multiple feasible fallbacks.""" + cluster = ray_start_cluster + # Node has 10 CPUs + cluster.add_node(num_cpus=10) + ray.init(address=cluster.address) + + fallback_strategy = [ + {"bundles": [{"CPU": 10}]}, # Infeasible + {"bundles": [{"CPU": 5}]}, # Feasible + {"bundles": [{"CPU": 1}]}, # Feasible + ] + + pg = placement_group( + name="priority_pg", + bundles=[{"CPU": 20}], # Infeasible main bundles. + strategy="PACK", + fallback_strategy=fallback_strategy, + ) + + ray.get(pg.ready(), timeout=10) + + # Verify we consumed 5 CPUs, not 1. + @ray.remote(num_cpus=6) + def heavy_task(): + return "ok" + + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(heavy_task.remote(), timeout=2) + + remove_placement_group(pg) + placement_group_assert_no_leak([pg]) + + +def test_placement_group_fallback_bundle_shapes(ray_start_cluster): + """Test fallback works even when changing the number of bundles.""" + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=1) + ray.init(address=cluster.address) + + # Feasible fallback specifies 2 bundles with 1 CPU each (rather than 1 bundle + # with 2 CPU). + fallback_strategy = [{"bundles": [{"CPU": 1}, {"CPU": 1}]}] + + pg = placement_group( + name="reshape_pg", + bundles=[{"CPU": 2}], # Infeasible 2 CPU bundle on any node. + strategy="SPREAD", + fallback_strategy=fallback_strategy, + ) + + ray.get(pg.ready(), timeout=10) + + table = placement_group_table(pg) + assert len(table["bundles"]) == 2 + + remove_placement_group(pg) + placement_group_assert_no_leak([pg]) + + +def test_multiple_placement_groups_and_fallbacks(ray_start_cluster): + """ + Test that multiple placement groups with fallback strategies correctly subtract + from available resources in the cluster. + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=10) + ray.init(address=cluster.address) + + # Define a fallback strategy that uses 3 CPUs. + fallback_strategy = [{"bundles": [{"CPU": 3}]}] + + pgs = [] + for i in range(3): + pg = placement_group( + name=f"pg_{i}", + bundles=[{"CPU": 100}], # Infeasible + strategy="PACK", + fallback_strategy=fallback_strategy, + ) + pgs.append(pg) + + # Create 3 PGs that should all use the fallback strategy. + for pg in pgs: + ray.get(pg.ready(), timeout=10) + + # Verify we can still schedule a task utilizing the last CPU (10 total - 9 used by PGs). + @ray.remote(num_cpus=1) + def small_task(): + return "ok" + + assert ray.get(small_task.remote(), timeout=5) == "ok" + + # Validate PGs with fallback correctly subtract from the available cluster resources to where + # a task requesting more CPU than is available times out. + @ray.remote(num_cpus=2) + def large_task(): + return "fail" + + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(large_task.remote(), timeout=2) + + for pg in pgs: + remove_placement_group(pg) + placement_group_assert_no_leak(pgs) + + +def test_placement_group_fallback_validation(ray_start_cluster): + """ + Verifies that PG validates resource shape with both primary and fallback bundles. + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=4, num_gpus=0) + ray.init(address=cluster.address) + + pg = placement_group( + name="validation_pg", + bundles=[{"GPU": 1}], + strategy="PACK", + fallback_strategy=[{"bundles": [{"CPU": 1}]}], + ) + + # Task requires CPU, primary option has only GPU. + # The client-side validation logic should check the fallback strategy + # and allow this task to proceed. + @ray.remote(num_cpus=1) + def run_on_cpu(): + return "success" + + try: + # If client-side validation fails, this raises ValueError immediately. + ref = run_on_cpu.options(placement_group=pg).remote() + assert ray.get(ref) == "success" + except ValueError as e: + pytest.fail(f"Validation failed for fallback-compatible task: {e}") + + # Verify bundle_specs contains active bundles. + ray.get(pg.ready()) + assert pg.bundle_specs[0].get("CPU") == 1 + assert pg.bundle_specs[0].get("GPU") is None + + remove_placement_group(pg) + placement_group_assert_no_leak([pg]) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_placement_group.cc b/src/ray/gcs/gcs_placement_group.cc index 72e12efa0fee..f28b48c39830 100644 --- a/src/ray/gcs/gcs_placement_group.cc +++ b/src/ray/gcs/gcs_placement_group.cc @@ -60,8 +60,15 @@ std::vector> &GcsPlacementGroup::GetB const { // Fill the cache if it wasn't. if (cached_bundle_specs_.empty()) { - const auto &bundles = placement_group_table_data_.bundles(); - for (const auto &bundle : bundles) { + // If no active bundles selected, return the highest priority scheduling strategy. + const auto &source_bundles = + (placement_group_table_data_.bundles().empty() && + placement_group_table_data_.scheduling_strategy_size() > 0) + ? placement_group_table_data_.scheduling_strategy(0).bundles() + : placement_group_table_data_.bundles(); + + cached_bundle_specs_.reserve(source_bundles.size()); + for (const auto &bundle : source_bundles) { cached_bundle_specs_.push_back(std::make_shared(bundle)); } } @@ -144,5 +151,19 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() { return placement_group_table_data_.mutable_stats(); } +const google::protobuf::RepeatedPtrField + &GcsPlacementGroup::GetSchedulingStrategy() const { + return placement_group_table_data_.scheduling_strategy(); +} + +void GcsPlacementGroup::UpdateActiveBundles( + const rpc::PlacementGroupSchedulingOption &selected_option) { + // Invalidate the cache because we are changing the bundles. + cached_bundle_specs_.clear(); + + // Replace the current bundles with the bundles from the selected strategy. + placement_group_table_data_.mutable_bundles()->CopyFrom(selected_option.bundles()); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_placement_group.h b/src/ray/gcs/gcs_placement_group.h index ab4ce0c73de0..01c04e6adb13 100644 --- a/src/ray/gcs/gcs_placement_group.h +++ b/src/ray/gcs/gcs_placement_group.h @@ -62,8 +62,6 @@ class GcsPlacementGroup { placement_group_spec.placement_group_id()); placement_group_table_data_.set_name(placement_group_spec.name()); placement_group_table_data_.set_state(rpc::PlacementGroupTableData::PENDING); - placement_group_table_data_.mutable_bundles()->CopyFrom( - placement_group_spec.bundles()); placement_group_table_data_.set_strategy(placement_group_spec.strategy()); placement_group_table_data_.set_creator_job_id(placement_group_spec.creator_job_id()); placement_group_table_data_.set_creator_actor_id( @@ -78,6 +76,15 @@ class GcsPlacementGroup { placement_group_table_data_.set_ray_namespace(ray_namespace); placement_group_table_data_.set_placement_group_creation_timestamp_ms( current_sys_time_ms()); + + // Construct scheduling strategy list. Index 0 contains the primary request. + auto *primary_option = placement_group_table_data_.add_scheduling_strategy(); + primary_option->mutable_bundles()->CopyFrom(placement_group_spec.bundles()); + + // Index 1..N: fallback strategies. + placement_group_table_data_.mutable_scheduling_strategy()->MergeFrom( + placement_group_spec.fallback_options()); + SetupStates(); } @@ -156,6 +163,11 @@ class GcsPlacementGroup { rpc::PlacementGroupStats *GetMutableStats(); + const google::protobuf::RepeatedPtrField + &GetSchedulingStrategy() const; + + void UpdateActiveBundles(const rpc::PlacementGroupSchedulingOption &selected_option); + private: // XXX. FRIEND_TEST(GcsPlacementGroupManagerTest, TestPlacementGroupBundleCache); diff --git a/src/ray/gcs/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_placement_group_scheduler.cc index b3d733b470c5..9f74cbacdb21 100644 --- a/src/ray/gcs/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_placement_group_scheduler.cc @@ -38,6 +38,24 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( cluster_resource_scheduler_(cluster_resource_scheduler), raylet_client_pool_(raylet_client_pool) {} +raylet_scheduling_policy::SchedulingResult GcsPlacementGroupScheduler::TrySchedule( + const std::shared_ptr &placement_group, + const std::vector> &bundles, + const rpc::PlacementStrategy strategy) { + std::vector resource_request_list; + resource_request_list.reserve(bundles.size()); + for (const auto &bundle : bundles) { + resource_request_list.emplace_back(&bundle->GetRequiredResources()); + } + + auto scheduling_options = + CreateSchedulingOptions(placement_group->GetPlacementGroupID(), + strategy, + placement_group->GetSoftTargetNodeID()); + + return cluster_resource_scheduler_.Schedule(resource_request_list, scheduling_options); +} + void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( const SchedulePgRequest &request) { const auto &placement_group = request.placement_group; @@ -54,25 +72,58 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( return; } - const auto &bundles = placement_group->GetUnplacedBundles(); - const auto &strategy = placement_group->GetStrategy(); - + auto bundles = placement_group->GetUnplacedBundles(); RAY_LOG(DEBUG) << "Scheduling placement group " << placement_group->GetName() << ", id: " << placement_group->GetPlacementGroupID() << ", bundles size = " << bundles.size(); - std::vector resource_request_list; - resource_request_list.reserve(bundles.size()); - for (const auto &bundle : bundles) { - resource_request_list.emplace_back(&bundle->GetRequiredResources()); + const auto &scheduling_strategies = placement_group->GetSchedulingStrategy(); + bool is_scheduling_all_bundles = + (bundles.size() == placement_group->GetBundles().size()); + + if (is_scheduling_all_bundles && !scheduling_strategies.empty()) { + RAY_LOG(INFO) << "Scheduling whole Placement Group " + << placement_group->GetPlacementGroupID() << " using primary strategy."; + + const auto &primary_option = scheduling_strategies.Get(0); + placement_group->UpdateActiveBundles(primary_option); + bundles = placement_group->GetUnplacedBundles(); } - auto scheduling_options = - CreateSchedulingOptions(placement_group->GetPlacementGroupID(), - strategy, - placement_group->GetSoftTargetNodeID()); auto scheduling_result = - cluster_resource_scheduler_.Schedule(resource_request_list, scheduling_options); + TrySchedule(placement_group, bundles, placement_group->GetStrategy()); + + const rpc::PlacementGroupSchedulingOption *applied_fallback_option = nullptr; + std::vector> fallback_bundles; + + if (!scheduling_result.status.IsSuccess() && scheduling_strategies.size() > 1 && + is_scheduling_all_bundles) { + RAY_LOG(DEBUG) << "Primary scheduling failed for PG " + << placement_group->GetPlacementGroupID() << ". Attempting " + << (scheduling_strategies.size() - 1) << " fallback options."; + + for (int i = 1; i < scheduling_strategies.size(); i++) { + const auto &option = scheduling_strategies.Get(i); + + fallback_bundles.clear(); + for (const auto &bundle_proto : option.bundles()) { + fallback_bundles.push_back( + std::make_shared(bundle_proto)); + } + + auto fallback_result = + TrySchedule(placement_group, fallback_bundles, placement_group->GetStrategy()); + + if (fallback_result.status.IsSuccess()) { + RAY_LOG(INFO) << "Placement Group " << placement_group->GetPlacementGroupID() + << " primary scheduling failed, but fallback strategy succeeded."; + scheduling_result = fallback_result; + bundles = fallback_bundles; + applied_fallback_option = &option; + break; + } + } + } auto result_status = scheduling_result.status; const auto &selected_nodes = scheduling_result.selected_nodes; @@ -97,6 +148,11 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( RAY_CHECK(bundles.size() == selected_nodes.size()); + // If a fallback option was selected, commit it to the placement group state. + if (applied_fallback_option) { + placement_group->UpdateActiveBundles(*applied_fallback_option); + } + // Covert to a map of bundle to node. ScheduleMap bundle_to_node; for (size_t i = 0; i < selected_nodes.size(); ++i) { diff --git a/src/ray/gcs/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_placement_group_scheduler.h index 9f509cc01561..a3337c527043 100644 --- a/src/ray/gcs/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_placement_group_scheduler.h @@ -476,6 +476,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// wildcard resource. bool IsPlacementGroupWildcardResource(const std::string &resource_name); + // Helper to attempt scheduling for a specific set of bundles. + // Returns the scheduling result (success/failure and selected nodes). + raylet_scheduling_policy::SchedulingResult TrySchedule( + const std::shared_ptr &placement_group, + const std::vector> &bundles, + const rpc::PlacementStrategy strategy); + instrumented_io_context &io_context_; /// A timer that ticks every cancel resource failure milliseconds. diff --git a/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc index bad08cb3e3e6..b54708fd84b1 100644 --- a/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc @@ -1257,6 +1257,10 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestInitialize) { auto create_placement_group_request = GenCreatePlacementGroupRequest(); auto placement_group = std::make_shared(create_placement_group_request, "", counter_); + + // Populate 'bundles' field for test so it can be mutated. + placement_group->UpdateActiveBundles(placement_group->GetSchedulingStrategy().Get(0)); + placement_group->GetMutableBundle(0)->set_node_id(node0->node_id()); placement_group->GetMutableBundle(1)->set_node_id(node1->node_id()); @@ -1426,5 +1430,242 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestBundlesRemovedWhenNodeDead) { ASSERT_EQ(scheduler_->waiting_removed_bundles_.size(), 0); } +TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyResources) { + auto node = GenNodeInfo(); + AddNode(node, 10); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + // Create a placement group resource request that is infeasible. + auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 20); + + // Create a fallback strategy with feasible bundles. + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *bundle = fallback_option->add_bundles(); + bundle->mutable_unit_resources()->insert({"CPU", 5.0}); + bundle->mutable_bundle_id()->set_bundle_index(0); + bundle->mutable_bundle_id()->set_placement_group_id( + request.placement_group_spec().placement_group_id()); + + auto placement_group = std::make_shared(request, "", counter_); + + // Validate that scheduling is successful using the fallback strategy. + scheduler_->ScheduleUnplacedBundles( + SchedulePgRequest{placement_group, + [this](std::shared_ptr pg, bool) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(pg)); + }, + [this](std::shared_ptr pg) { + absl::MutexLock lock(&placement_group_requests_mutex_); + success_placement_groups_.emplace_back(std::move(pg)); + }}); + + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); + ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); + WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::FAILURE); + WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::SUCCESS); + CheckEqWithPlacementGroupFront(placement_group, GcsPlacementGroupStatus::SUCCESS); + + auto final_bundles = placement_group->GetBundles(); + ASSERT_EQ(final_bundles.size(), 1); + auto cpu_amount = final_bundles[0] + ->GetRequiredResources() + .Get(scheduling::ResourceID("CPU")) + .Double(); + ASSERT_EQ(cpu_amount, 5.0); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyLabels) { + auto node = GenNodeInfo(); + (*node->mutable_resources_total())["CPU"] = 10; + (*node->mutable_labels())["cpu-family"] = "intel"; + AddNode(node); + + // Create a resource request with an infeasible label selector. + auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 1); + auto *primary_bundle = request.mutable_placement_group_spec()->mutable_bundles(0); + (*primary_bundle->mutable_label_selector())["cpu-family"] = "amd"; + + // Create a fallback strategy with identical bundle resources but feasible label. + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_bundle = fallback_option->add_bundles(); + fallback_bundle->mutable_unit_resources()->insert({"CPU", 1.0}); + fallback_bundle->mutable_bundle_id()->set_bundle_index(0); + fallback_bundle->mutable_bundle_id()->set_placement_group_id( + request.placement_group_spec().placement_group_id()); + (*fallback_bundle->mutable_label_selector())["cpu-family"] = "intel"; + + auto placement_group = std::make_shared(request, "", counter_); + + // Validate scheduling is successful using fallback strategy with feasible labels. + scheduler_->ScheduleUnplacedBundles( + SchedulePgRequest{placement_group, + [this](std::shared_ptr pg, bool) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(pg)); + }, + [this](std::shared_ptr pg) { + absl::MutexLock lock(&placement_group_requests_mutex_); + success_placement_groups_.emplace_back(std::move(pg)); + }}); + + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); + ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); + WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::FAILURE); + WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::SUCCESS); + CheckEqWithPlacementGroupFront(placement_group, GcsPlacementGroupStatus::SUCCESS); + + auto final_bundles = placement_group->GetBundles(); + ASSERT_EQ(final_bundles.size(), 1); + + const auto &label_selector = + final_bundles[0]->GetRequiredResources().GetLabelSelector(); + + // Compare actual label selector added to bundle to the expected one. + auto actual_map = label_selector.ToStringMap(); + + google::protobuf::Map expected_map; + expected_map["cpu-family"] = "intel"; + + ASSERT_EQ(actual_map.size(), expected_map.size()); + for (const auto &[key, val] : expected_map) { + ASSERT_TRUE(actual_map.contains(key)); + ASSERT_EQ(actual_map.at(key), val); + } +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyInfeasible) { + auto node = GenNodeInfo(); + (*node->mutable_resources_total())["CPU"] = 10; + (*node->mutable_labels())["cpu-family"] = "intel"; + AddNode(node); + + // Create a resource request with an infeasible label selector. + auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 1); + auto *primary_bundle = request.mutable_placement_group_spec()->mutable_bundles(0); + (*primary_bundle->mutable_label_selector())["cpu-family"] = "amd"; + + // Create a fallback strategy with an infeasible fallback. + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_bundle = fallback_option->add_bundles(); + fallback_bundle->mutable_unit_resources()->insert({"CPU", 1.0}); + fallback_bundle->mutable_bundle_id()->set_bundle_index(0); + fallback_bundle->mutable_bundle_id()->set_placement_group_id( + request.placement_group_spec().placement_group_id()); + (*fallback_bundle->mutable_label_selector())["cpu-family"] = "arm"; + + auto placement_group = std::make_shared(request, "", counter_); + + // Validate the placement group remains unschedulable. + scheduler_->ScheduleUnplacedBundles( + SchedulePgRequest{placement_group, + [this](std::shared_ptr pg, bool is_feasible) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(pg)); + // It should be feasible (valid request), just not currently + // schedulable. + ASSERT_TRUE(is_feasible); + }, + [this](std::shared_ptr pg) { + absl::MutexLock lock(&placement_group_requests_mutex_); + success_placement_groups_.emplace_back(std::move(pg)); + }}); + + ASSERT_EQ(0, raylet_clients_[0]->num_lease_requested); + + WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); + WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::SUCCESS); + CheckEqWithPlacementGroupFront(placement_group, GcsPlacementGroupStatus::FAILURE); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestPGResetFallbackToPrimaryBundles) { + auto node_fallback = GenNodeInfo(0); + (*node_fallback->mutable_resources_total())["CPU"] = 10; + node_fallback->mutable_resources_total()->erase("CustomResource"); + AddNode(node_fallback); + + // Create a PG Request that is only satisfied by fallback option. + auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 1); + + // Infeasible primary bundle. + auto *primary_bundle = request.mutable_placement_group_spec()->mutable_bundles(0); + (*primary_bundle->mutable_unit_resources())["CustomResource"] = 1.0; + primary_bundle->mutable_unit_resources()->erase("CPU"); + + // Feasible fallback option. + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_bundle = fallback_option->add_bundles(); + (*fallback_bundle->mutable_unit_resources())["CPU"] = 1.0; + fallback_bundle->mutable_bundle_id()->set_bundle_index(0); + fallback_bundle->mutable_bundle_id()->set_placement_group_id( + request.placement_group_spec().placement_group_id()); + + auto placement_group = std::make_shared(request, "", counter_); + + // Schedule to fallback. + scheduler_->ScheduleUnplacedBundles( + SchedulePgRequest{placement_group, + [this](std::shared_ptr pg, bool) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(pg)); + }, + [this](std::shared_ptr pg) { + absl::MutexLock lock(&placement_group_requests_mutex_); + success_placement_groups_.emplace_back(std::move(pg)); + }}); + + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); + ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); + WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::SUCCESS); + + // Verify fallback bundle is being utilized. + ASSERT_EQ(placement_group->GetBundles()[0] + ->GetRequiredResources() + .Get(scheduling::ResourceID("CPU")) + .Double(), + 1.0); + + // Simulate node failure to reset PG. + RemoveNode(node_fallback); + placement_group->GetMutableBundle(0)->clear_node_id(); + + // Add a new node that satisfies the primary bundles. + auto node_primary = GenNodeInfo(1); + (*node_primary->mutable_resources_total())["CustomResource"] = 10; + node_primary->mutable_resources_total()->erase("CPU"); + AddNode(node_primary); + + // Validate that during rescheduling the scheduler should reset to primary option. + scheduler_->ScheduleUnplacedBundles( + SchedulePgRequest{placement_group, + [this](std::shared_ptr pg, bool) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(pg)); + }, + [this](std::shared_ptr pg) { + absl::MutexLock lock(&placement_group_requests_mutex_); + success_placement_groups_.emplace_back(std::move(pg)); + }}); + + ASSERT_EQ(1, raylet_clients_[1]->num_lease_requested); + ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); + WaitPendingDone(raylet_clients_[1]->commit_callbacks, 1); + ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources()); + WaitPlacementGroupPendingDone(2, GcsPlacementGroupStatus::SUCCESS); + + // Validate the PG reverted to primary strategy. + ASSERT_EQ(placement_group->GetBundles()[0] + ->GetRequiredResources() + .Get(scheduling::ResourceID("CustomResource")) + .Double(), + 1.0); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 673d40f7c969..fc85fd19fbba 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -356,6 +356,12 @@ message GcsNodeInfo { // The death info of this node. NodeDeathInfo death_info = 29; + + // Temporary directory of this node. + string temp_dir = 32; + + // Session directory of this node. + string session_dir = 33; } // A lighter version of GcsNodeInfo containing only essential fields. @@ -646,7 +652,9 @@ message PlacementGroupTableData { bytes placement_group_id = 1; // The name of the placement group. string name = 2; - // The array of the bundle in Placement Group. + // The currently active bundles for this Placement Group. + // If a fallback strategy is applied, this field is updated to contain the bundles + // from that strategy rather than the original request. repeated Bundle bundles = 3; // The schedule strategy of this Placement Group. PlacementStrategy strategy = 4; @@ -679,6 +687,10 @@ message PlacementGroupTableData { // The time that the last bundle of a placement group is committed at, // effectively when the placement group has been "scheduled". int64 placement_group_final_bundle_placement_timestamp_ms = 16; + // The full list of scheduling options for this placement group. + // Index 0: The original, primary request. + // Index 1..N: The options defined in each fallback strategy. + repeated PlacementGroupSchedulingOption scheduling_strategy = 17; } message JobTableData { From cc7a8a4bfcc594272034a78596a91f432b87e815 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Fri, 6 Feb 2026 11:55:45 +0000 Subject: [PATCH 2/9] Add back changes dropped in bad rebase Signed-off-by: ryanaoleary --- python/ray/_raylet.pyx | 60 ++++++++++- python/ray/includes/common.pxd | 8 +- python/ray/tests/BUILD.bazel | 1 + python/ray/util/placement_group.py | 100 ++++++++++++++++-- src/ray/common/bundle_spec.cc | 21 ++-- src/ray/common/placement_group.h | 43 ++++++-- src/ray/core_worker/BUILD.bazel | 1 + src/ray/core_worker/common.h | 14 ++- src/ray/core_worker/core_worker.cc | 3 +- src/ray/gcs/gcs_autoscaler_state_manager.cc | 16 ++- src/ray/gcs/gcs_placement_group.cc | 18 ++-- src/ray/gcs/gcs_placement_group.h | 5 +- src/ray/gcs/gcs_placement_group_scheduler.cc | 23 +++- .../gcs_autoscaler_state_manager_test.cc | 11 +- .../tests/gcs_placement_group_manager_test.cc | 37 +++++-- .../gcs_placement_group_scheduler_test.cc | 40 +++++-- src/ray/protobuf/common.proto | 9 +- src/ray/protobuf/gcs.proto | 6 -- 18 files changed, 341 insertions(+), 75 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 006b2a620186..80d26a199ec4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -106,6 +106,7 @@ from ray.includes.common cimport ( CLabelNotIn, CLabelSelector, CNodeResources, + CPlacementGroupSchedulingOption, CRayFunction, CWorkerType, CJobConfig, @@ -647,6 +648,54 @@ cdef int prepare_fallback_strategy( return 0 +cdef int prepare_bundle_label_selector( + list selector_list, + c_vector[CLabelSelector] *out_vector) except -1: + + cdef CLabelSelector c_label_selector + + if selector_list: + out_vector.reserve(len(selector_list)) + for selector_dict in selector_list: + c_label_selector = CLabelSelector() + prepare_label_selector(selector_dict, &c_label_selector) + out_vector.push_back(c_label_selector) + + return 0 + +cdef int prepare_placement_group_fallback_strategy( + list fallback_strategy, + c_vector[CPlacementGroupSchedulingOption] *fallback_strategy_vector) except -1: + + cdef: + CPlacementGroupSchedulingOption c_option + unordered_map[c_string, double] c_bundle_map + + if fallback_strategy is None: + return 0 + + fallback_strategy_vector.reserve(len(fallback_strategy)) + + for option_dict in fallback_strategy: + c_option = CPlacementGroupSchedulingOption() + + # Convert bundles field to C and prepare unit resources. + bundles_list = option_dict.get("bundles") + if bundles_list: + c_option.bundles.reserve(len(bundles_list)) + for bundle in bundles_list: + c_bundle_map.clear() + prepare_resources(bundle, &c_bundle_map) + c_option.bundles.push_back(c_bundle_map) + + # Convert bundle_label_selector field to C and prepare label selectors. + selector_list = option_dict.get("bundle_label_selector", []) + prepare_bundle_label_selector(selector_list, &c_option.bundle_label_selector) + + fallback_strategy_vector.push_back(c_option) + + return 0 + cdef int prepare_resources( dict resource_dict, unordered_map[c_string, double] *resource_map) except -1: @@ -3672,11 +3721,14 @@ cdef class CoreWorker: c_string strategy, c_bool is_detached, soft_target_node_id, - c_vector[unordered_map[c_string, c_string]] bundle_label_selector): + list bundle_label_selector, + list fallback_strategy=None): cdef: CPlacementGroupID c_placement_group_id CPlacementStrategy c_strategy CNodeID c_soft_target_node_id = CNodeID.Nil() + c_vector[CLabelSelector] c_bundle_label_selector + c_vector[CPlacementGroupSchedulingOption] c_fallback_strategy if strategy == b"PACK": c_strategy = PLACEMENT_STRATEGY_PACK @@ -3693,6 +3745,9 @@ cdef class CoreWorker: if soft_target_node_id is not None: c_soft_target_node_id = CNodeID.FromHex(soft_target_node_id) + prepare_bundle_label_selector(bundle_label_selector, &c_bundle_label_selector) + prepare_placement_group_fallback_strategy(fallback_strategy, &c_fallback_strategy) + with nogil: check_status( CCoreWorkerProcess.GetCoreWorker(). @@ -3703,7 +3758,8 @@ cdef class CoreWorker: bundles, is_detached, c_soft_target_node_id, - bundle_label_selector), + c_bundle_label_selector, + c_fallback_strategy), &c_placement_group_id)) return PlacementGroupID(c_placement_group_id.Binary()) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 40851246f2f7..610834b5c608 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -422,6 +422,11 @@ cdef extern from "ray/core_worker/common.h" nogil: CLabelSelector label_selector, c_vector[CFallbackOption] fallback_strategy) + cdef cppclass CPlacementGroupSchedulingOption "ray::core::PlacementGroupSchedulingOption": + CPlacementGroupSchedulingOption() + c_vector[unordered_map[c_string, double]] bundles + c_vector[CLabelSelector] bundle_label_selector + cdef cppclass CPlacementGroupCreationOptions \ "ray::core::PlacementGroupCreationOptions": CPlacementGroupCreationOptions() @@ -431,7 +436,8 @@ cdef extern from "ray/core_worker/common.h" nogil: const c_vector[unordered_map[c_string, double]] &bundles, c_bool is_detached, CNodeID soft_target_node_id, - const c_vector[unordered_map[c_string, c_string]] &bundle_label_selector, + const c_vector[CLabelSelector] &bundle_label_selector, + const c_vector[CPlacementGroupSchedulingOption] &fallback_strategy, ) cdef cppclass CObjectLocation "ray::core::ObjectLocation": diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 74737b89643d..8fe89a5fff1e 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -202,6 +202,7 @@ py_test_module_list( "test_placement_group_3.py", "test_placement_group_4.py", "test_placement_group_5.py", + "test_placement_group_fallback.py", "test_scheduling.py", "test_scheduling_2.py", "test_token_auth_integration.py", diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index d2e29b81c536..6f9410492d2d 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -20,6 +20,8 @@ "STRICT_SPREAD", } +VALID_PLACEMENT_GROUP_FALLBACK_OPTIONS = {"bundles", "bundle_label_selector"} + # We need to import this method to use for ready API. # But ray.remote is only available in runtime, and @@ -53,6 +55,7 @@ def __init__( ): self.id = id self.bundle_cache = bundle_cache + self._all_bundle_cache = None @property def is_empty(self): @@ -101,7 +104,11 @@ def wait(self, timeout_seconds: Union[float, int] = 30) -> bool: @property def bundle_specs(self) -> List[Dict]: - """List[Dict]: Return bundles belonging to this placement group.""" + """List[Dict]: Return bundles belonging to this placement group. + + This returns the currently active bundles. If the placement group is + using a fallback strategy, this returns the fallback bundles. + """ self._fill_bundle_cache_if_needed() return self.bundle_cache @@ -110,9 +117,23 @@ def bundle_count(self) -> int: self._fill_bundle_cache_if_needed() return len(self.bundle_cache) + @property + def _all_bundle_specs(self) -> List[Dict]: + """Return all possible bundles, including the primary and fallback options. + + This is used for validation to ensure we don't reject tasks that + are valid under a fallback strategy configuration. + """ + self._fill_bundle_cache_if_needed() + + return self._all_bundle_cache or self.bundle_cache + def _fill_bundle_cache_if_needed(self) -> None: - if not self.bundle_cache: - self.bundle_cache = _get_bundle_cache(self.id) + if not self.bundle_cache or not self._all_bundle_cache: + cache_data = _get_bundle_cache(self.id) + + self.bundle_cache = cache_data["active"] + self._all_bundle_cache = cache_data["all"] def __eq__(self, other): if not isinstance(other, PlacementGroup): @@ -132,13 +153,24 @@ def _call_placement_group_ready(pg_id: PlacementGroupID, timeout_seconds: int) - @client_mode_wrap -def _get_bundle_cache(pg_id: PlacementGroupID) -> List[Dict]: +def _get_bundle_cache(pg_id: PlacementGroupID) -> Dict[str, List[Dict]]: worker = ray._private.worker.global_worker worker.check_connected() - return list( - ray._private.state.state.placement_group_table(pg_id)["bundles"].values() - ) + table = ray._private.state.state.placement_group_table(pg_id) + + # The bundles actively being used for scheduling. + active_bundles = list(table["bundles"].values()) + + # The list of bundles from all scheduling options. + if "scheduling_strategy" in table and table["scheduling_strategy"]: + all_bundles = [] + for strategy in table["scheduling_strategy"]: + all_bundles.extend(strategy.get("bundles", [])) + else: + all_bundles = active_bundles + + return {"active": active_bundles, "all": all_bundles} @PublicAPI @@ -150,6 +182,7 @@ def placement_group( lifetime: Optional[str] = None, _soft_target_node_id: Optional[str] = None, bundle_label_selector: List[Dict[str, str]] = None, + fallback_strategy: Optional[List[Dict]] = None, ) -> PlacementGroup: """Asynchronously creates a PlacementGroup. @@ -177,6 +210,9 @@ def placement_group( This currently only works with STRICT_PACK pg. bundle_label_selector: A list of label selectors to apply to a placement group on a per-bundle level. + fallback_strategy: A list of scheduling option dicts that define the fallback + options to use when attempting to schedule this placement group. Supported + options are the bundles and bundle_label_selector to attempt to schedule. Raises: ValueError: if bundle type is not a list. @@ -195,11 +231,15 @@ def placement_group( lifetime=lifetime, _soft_target_node_id=_soft_target_node_id, bundle_label_selector=bundle_label_selector, + fallback_strategy=fallback_strategy, ) if bundle_label_selector is None: bundle_label_selector = [] + if fallback_strategy is None: + fallback_strategy = [] + if lifetime == "detached": detached = True else: @@ -212,6 +252,7 @@ def placement_group( detached, _soft_target_node_id, bundle_label_selector, + fallback_strategy, ) return PlacementGroup(placement_group_id) @@ -344,6 +385,7 @@ def validate_placement_group( lifetime: Optional[str] = None, _soft_target_node_id: Optional[str] = None, bundle_label_selector: List[Dict[str, str]] = None, + fallback_strategy: Optional[List[Dict]] = None, ) -> bool: """Validates inputs for placement_group. @@ -370,6 +412,9 @@ def validate_placement_group( ) _validate_bundle_label_selector(bundle_label_selector) + if fallback_strategy is not None: + _validate_fallback_strategy(fallback_strategy) + if strategy not in VALID_PLACEMENT_GROUP_STRATEGIES: raise ValueError( f"Invalid placement group strategy {strategy}. " @@ -463,6 +508,45 @@ def _validate_bundle_label_selector(bundle_label_selector: List[Dict[str, str]]) ) +def _validate_fallback_strategy(fallback_strategy: List[Dict]): + """Validates the placement group fallback strategy.""" + if not isinstance(fallback_strategy, list): + raise ValueError( + f"fallback_strategy must be a list, got {type(fallback_strategy)}." + ) + + for i, option in enumerate(fallback_strategy): + if not isinstance(option, dict): + raise ValueError( + f"fallback_strategy[{i}] must be a dict, got {type(option)}." + ) + + # Validate placement group fallback options. + if "bundles" not in option: + raise ValueError(f"fallback_strategy[{i}] must contain 'bundles'.") + + _validate_bundles(option["bundles"]) + + if "bundle_label_selector" in option: + fallback_labels = option["bundle_label_selector"] + + if len(option["bundles"]) != len(fallback_labels): + raise ValueError( + f"In fallback_strategy[{i}], length of `bundle_label_selector` " + f"must equal length of `bundles`." + ) + + _validate_bundle_label_selector(fallback_labels) + + # Check that fallback strategy only specifies supported options. + invalid_options = set(option.keys()) - VALID_PLACEMENT_GROUP_FALLBACK_OPTIONS + if invalid_options: + raise ValueError( + f"fallback_strategy[{i}] contains invalid options: {invalid_options}. " + f"Supported options are: {VALID_PLACEMENT_GROUP_FALLBACK_OPTIONS}" + ) + + def _valid_resource_shape(resources, bundle_specs): """ If the resource shape cannot fit into every @@ -487,7 +571,7 @@ def _valid_resource_shape(resources, bundle_specs): def _validate_resource_shape( placement_group, resources, placement_resources, task_or_actor_repr ): - bundles = placement_group.bundle_specs + bundles = placement_group._all_bundle_specs resources_valid = _valid_resource_shape(resources, bundles) placement_resources_valid = _valid_resource_shape(placement_resources, bundles) diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index 336f8906ab11..e00c0fe92cf1 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -23,18 +23,27 @@ namespace ray { void BundleSpecification::ComputeResources() { auto unit_resource = MapFromProtobuf(message_->unit_resources()); - if (unit_resource.empty()) { + bool has_label_selector = message_->has_label_selector() && + message_->label_selector().label_constraints_size() > 0; + + if (unit_resource.empty() && !has_label_selector) { // A static nil object is used here to avoid allocating the empty object every time. static std::shared_ptr nil_unit_resource = std::make_shared(); unit_resource_ = nil_unit_resource; } else { - unit_resource_ = std::make_shared(ResourceMapToResourceRequest( - unit_resource, /*requires_object_store_memory=*/false)); + // Allocate if bundle specifies resources or label selector. + if (unit_resource.empty()) { + unit_resource_ = std::make_shared(); + } else { + unit_resource_ = std::make_shared(ResourceMapToResourceRequest( + unit_resource, /*requires_object_store_memory=*/false)); + } - // Set LabelSelector required for scheduling this bundle if specified. - // Parses string map from proto to LabelSelector data type. - unit_resource_->SetLabelSelector(LabelSelector(message_->label_selector())); + // Apply labels if present. + if (has_label_selector) { + unit_resource_->SetLabelSelector(LabelSelector(message_->label_selector())); + } } // Generate placement group bundle labels. diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index c3d0057d88b2..7a0fc9643d8b 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -18,6 +18,7 @@ #include "ray/common/bundle_spec.h" #include "ray/common/grpc_util.h" #include "ray/common/id.h" +#include "ray/common/scheduling/label_selector.h" #include "src/ray/protobuf/common.pb.h" namespace ray { @@ -34,6 +35,13 @@ using BundleLocations = std::pair>, pair_hash>; +// Defines the structure of a scheduling option to try when scheduling the placement +// group. +struct PlacementGroupSchedulingOption { + std::vector> bundles; + std::vector bundle_label_selector; +}; + class PlacementGroupSpecification : public MessageWrapper { public: /// Construct from a protobuf message object. @@ -87,8 +95,8 @@ class PlacementGroupSpecBuilder { const JobID &creator_job_id, const ActorID &creator_actor_id, bool is_creator_detached_actor, - const std::vector> - &bundle_label_selector = {}) { + const std::vector &bundle_label_selector = {}, + const std::vector &fallback_strategy = {}) { message_->set_placement_group_id(placement_group_id.Binary()); message_->set_name(name); message_->set_strategy(strategy); @@ -105,6 +113,7 @@ class PlacementGroupSpecBuilder { message_->set_is_detached(is_detached); message_->set_soft_target_node_id(soft_target_node_id.Binary()); + // Populate primary strategy bundles. for (size_t i = 0; i < bundles.size(); i++) { auto resources = bundles[i]; auto message_bundle = message_->add_bundles(); @@ -118,17 +127,39 @@ class PlacementGroupSpecBuilder { if (current->second == 0) { resources.erase(current); } else { - mutable_unit_resources->insert({current->first, current->second}); + (*mutable_unit_resources)[current->first] = current->second; } } // Set the label selector for this bundle if provided in bundle_label_selector. if (bundle_label_selector.size() > i) { - auto *mutable_label_selector = message_bundle->mutable_label_selector(); - for (const auto &pair : bundle_label_selector[i]) { - (*mutable_label_selector)[pair.first] = pair.second; + bundle_label_selector[i].ToProto(message_bundle->mutable_label_selector()); + } + } + + // Populate fallback strategy bundles. + for (const auto &option : fallback_strategy) { + auto *fallback_message = message_->add_fallback_strategy(); + for (size_t i = 0; i < option.bundles.size(); i++) { + auto *bundle_message = fallback_message->add_bundles(); + auto *mutable_bundle_id = bundle_message->mutable_bundle_id(); + + mutable_bundle_id->set_bundle_index(i); + mutable_bundle_id->set_placement_group_id(placement_group_id.Binary()); + + auto *mutable_resources = bundle_message->mutable_unit_resources(); + for (const auto &resource : option.bundles[i]) { + if (resource.second > 0) { + mutable_resources->insert({resource.first, resource.second}); + } + } + + if (option.bundle_label_selector.size() > i) { + option.bundle_label_selector[i].ToProto( + bundle_message->mutable_label_selector()); } } } + return *this; } diff --git a/src/ray/core_worker/BUILD.bazel b/src/ray/core_worker/BUILD.bazel index 4918fbbbcc0f..20332ffecab1 100644 --- a/src/ray/core_worker/BUILD.bazel +++ b/src/ray/core_worker/BUILD.bazel @@ -117,6 +117,7 @@ ray_cc_library( visibility = [":__subpackages__"], deps = [ "//src/ray/common:id", + "//src/ray/common:placement_group", "//src/ray/common:ray_object", "//src/ray/common:task_common", "//src/ray/util:process", diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 6c91be760265..c5c4dbdbac2c 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -21,6 +21,7 @@ #include #include "ray/common/id.h" +#include "ray/common/placement_group.h" #include "ray/common/ray_object.h" #include "ray/common/scheduling/fallback_strategy.h" #include "ray/common/scheduling/label_selector.h" @@ -222,6 +223,8 @@ struct ActorCreationOptions { const std::vector fallback_strategy; }; +using PlacementGroupSchedulingOption = ray::PlacementGroupSchedulingOption; + using PlacementStrategy = rpc::PlacementStrategy; struct PlacementGroupCreationOptions { @@ -231,14 +234,15 @@ struct PlacementGroupCreationOptions { std::vector> bundles, bool is_detached_p, NodeID soft_target_node_id = NodeID::Nil(), - std::vector> bundle_label_selector = - {}) + std::vector bundle_label_selector = {}, + std::vector fallback_strategy = {}) : name_(std::move(name)), strategy_(strategy), bundles_(std::move(bundles)), is_detached_(is_detached_p), soft_target_node_id_(soft_target_node_id), - bundle_label_selector_(std::move(bundle_label_selector)) { + bundle_label_selector_(std::move(bundle_label_selector)), + fallback_strategy_(std::move(fallback_strategy)) { RAY_CHECK(soft_target_node_id_.IsNil() || strategy_ == PlacementStrategy::STRICT_PACK) << "soft_target_node_id only works with STRICT_PACK now"; } @@ -258,7 +262,9 @@ struct PlacementGroupCreationOptions { /// This only applies to STRICT_PACK pg. const NodeID soft_target_node_id_; /// The label selectors to apply per-bundle in this placement group. - const std::vector> bundle_label_selector_; + const std::vector bundle_label_selector_; + /// The list of fallback options to try if the primary request cannot be satisfied. + const std::vector fallback_strategy_; }; class ObjectLocation { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 931533cd0f04..caf9a1de8727 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2304,7 +2304,8 @@ Status CoreWorker::CreatePlacementGroup( worker_context_->GetCurrentJobID(), worker_context_->GetCurrentActorID(), worker_context_->CurrentActorDetached(), - placement_group_creation_options.bundle_label_selector_); + placement_group_creation_options.bundle_label_selector_, + placement_group_creation_options.fallback_strategy_); PlacementGroupSpecification placement_group_spec = builder.Build(); *return_placement_group_id = placement_group_id; RAY_LOG(INFO).WithField(placement_group_id) diff --git a/src/ray/gcs/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_autoscaler_state_manager.cc index 3f79456bc689..e39fc49f257f 100644 --- a/src/ray/gcs/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_autoscaler_state_manager.cc @@ -221,6 +221,15 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( auto *bundle_selector = gang_resource_req->add_bundle_selectors(); // Copy the PG's bundles to the request. + // If PENDING and 'bundles' is empty, the demand comes from the primary strategy + // (index 0). If RESCHEDULING, 'bundles' is populated with the active strategy we are + // trying to recover. + auto *bundles_source = pg_data.mutable_bundles(); + if (pg_state == rpc::PlacementGroupTableData::PENDING && bundles_source->empty() && + pg_data.scheduling_strategy_size() > 0) { + bundles_source = pg_data.mutable_scheduling_strategy(0)->mutable_bundles(); + } + for (auto &&bundle : std::move(*pg_data.mutable_bundles())) { if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { // We will be skipping **placed** bundle (which has node id associated with it). @@ -243,10 +252,9 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( auto *bundle_resource_req = bundle_selector->add_resource_requests(); *bundle_resource_req->mutable_resources_bundle() = unit_resources; - // Parse label selector map into LabelSelector proto in ResourceRequest - if (!bundle.label_selector().empty()) { - ray::LabelSelector selector(bundle.label_selector()); - selector.ToProto(bundle_resource_req->add_label_selectors()); + // Add label selector to ResourceRequest + if (bundle.label_selector().label_constraints_size() > 0) { + bundle_resource_req->add_label_selectors()->CopyFrom(bundle.label_selector()); } // Add the placement constraint. diff --git a/src/ray/gcs/gcs_placement_group.cc b/src/ray/gcs/gcs_placement_group.cc index f28b48c39830..3f8e576cb159 100644 --- a/src/ray/gcs/gcs_placement_group.cc +++ b/src/ray/gcs/gcs_placement_group.cc @@ -60,15 +60,8 @@ std::vector> &GcsPlacementGroup::GetB const { // Fill the cache if it wasn't. if (cached_bundle_specs_.empty()) { - // If no active bundles selected, return the highest priority scheduling strategy. - const auto &source_bundles = - (placement_group_table_data_.bundles().empty() && - placement_group_table_data_.scheduling_strategy_size() > 0) - ? placement_group_table_data_.scheduling_strategy(0).bundles() - : placement_group_table_data_.bundles(); - - cached_bundle_specs_.reserve(source_bundles.size()); - for (const auto &bundle : source_bundles) { + const auto &bundles = placement_group_table_data_.bundles(); + for (const auto &bundle : bundles) { cached_bundle_specs_.push_back(std::make_shared(bundle)); } } @@ -156,6 +149,13 @@ const google::protobuf::RepeatedPtrField return placement_group_table_data_.scheduling_strategy(); } +google::protobuf::RepeatedPtrField + *GcsPlacementGroup::GetMutableSchedulingStrategy() { + // Invalidate the cache because mutating the strategy. + cached_bundle_specs_.clear(); + return placement_group_table_data_.mutable_scheduling_strategy(); +} + void GcsPlacementGroup::UpdateActiveBundles( const rpc::PlacementGroupSchedulingOption &selected_option) { // Invalidate the cache because we are changing the bundles. diff --git a/src/ray/gcs/gcs_placement_group.h b/src/ray/gcs/gcs_placement_group.h index 01c04e6adb13..cb93b2bef223 100644 --- a/src/ray/gcs/gcs_placement_group.h +++ b/src/ray/gcs/gcs_placement_group.h @@ -83,7 +83,7 @@ class GcsPlacementGroup { // Index 1..N: fallback strategies. placement_group_table_data_.mutable_scheduling_strategy()->MergeFrom( - placement_group_spec.fallback_options()); + placement_group_spec.fallback_strategy()); SetupStates(); } @@ -166,6 +166,9 @@ class GcsPlacementGroup { const google::protobuf::RepeatedPtrField &GetSchedulingStrategy() const; + google::protobuf::RepeatedPtrField + *GetMutableSchedulingStrategy(); + void UpdateActiveBundles(const rpc::PlacementGroupSchedulingOption &selected_option); private: diff --git a/src/ray/gcs/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_placement_group_scheduler.cc index 9f74cbacdb21..b4bd0fb60c4d 100644 --- a/src/ray/gcs/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_placement_group_scheduler.cc @@ -78,10 +78,16 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( << ", bundles size = " << bundles.size(); const auto &scheduling_strategies = placement_group->GetSchedulingStrategy(); + + bool is_initial_pending_schedule = + (placement_group->GetState() == rpc::PlacementGroupTableData::PENDING && + placement_group->GetBundles().empty()); + bool is_scheduling_all_bundles = (bundles.size() == placement_group->GetBundles().size()); - if (is_scheduling_all_bundles && !scheduling_strategies.empty()) { + if ((is_initial_pending_schedule || is_scheduling_all_bundles) && + !scheduling_strategies.empty()) { RAY_LOG(INFO) << "Scheduling whole Placement Group " << placement_group->GetPlacementGroupID() << " using primary strategy."; @@ -90,8 +96,15 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( bundles = placement_group->GetUnplacedBundles(); } + if (bundles.empty()) { + RAY_LOG(DEBUG) << "No bundles to schedule for PG " + << placement_group->GetPlacementGroupID(); + return; + } + auto scheduling_result = TrySchedule(placement_group, bundles, placement_group->GetStrategy()); + bool any_strategy_feasible = !scheduling_result.status.IsInfeasible(); const rpc::PlacementGroupSchedulingOption *applied_fallback_option = nullptr; std::vector> fallback_bundles; @@ -114,6 +127,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( auto fallback_result = TrySchedule(placement_group, fallback_bundles, placement_group->GetStrategy()); + if (!fallback_result.status.IsInfeasible()) { + any_strategy_feasible = true; + } + if (fallback_result.status.IsSuccess()) { RAY_LOG(INFO) << "Placement Group " << placement_group->GetPlacementGroupID() << " primary scheduling failed, but fallback strategy succeeded."; @@ -135,10 +152,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( << ", because current resources can't satisfy the required resource. IsFailed: " << result_status.IsFailed() << " IsInfeasible: " << result_status.IsInfeasible() << " IsPartialSuccess: " << result_status.IsPartialSuccess(); - bool infeasible = result_status.IsInfeasible(); + // If the placement group creation has failed, // but if it is not infeasible, it is retryable to create. - failure_callback(placement_group, /*is_feasible*/ !infeasible); + failure_callback(placement_group, /*is_feasible*/ any_strategy_feasible); return; } diff --git a/src/ray/gcs/tests/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/tests/gcs_autoscaler_state_manager_test.cc index b6e091020d14..38761de34cd4 100644 --- a/src/ray/gcs/tests/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/tests/gcs_autoscaler_state_manager_test.cc @@ -1155,11 +1155,18 @@ TEST_F(GcsAutoscalerStateManagerTest, auto *bundle1 = pg_data->add_bundles(); (*bundle1->mutable_unit_resources())["CPU"] = 2; (*bundle1->mutable_unit_resources())["GPU"] = 1; - (*bundle1->mutable_label_selector())["accelerator"] = "in(A100,B200)"; + auto *constraint1 = bundle1->mutable_label_selector()->add_label_constraints(); + constraint1->set_label_key("accelerator"); + constraint1->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + constraint1->add_label_values("A100"); + constraint1->add_label_values("B200"); auto *bundle2 = pg_data->add_bundles(); (*bundle2->mutable_unit_resources())["CPU"] = 4; - (*bundle2->mutable_label_selector())["accelerator"] = "!in(TPU)"; + auto *constraint2 = bundle2->mutable_label_selector()->add_label_constraints(); + constraint2->set_label_key("accelerator"); + constraint2->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_NOT_IN); + constraint2->add_label_values("TPU"); EXPECT_CALL(*gcs_placement_group_manager_, GetPlacementGroupLoad) .WillOnce(Return(std::make_shared(std::move(load)))); diff --git a/src/ray/gcs/tests/gcs_placement_group_manager_test.cc b/src/ray/gcs/tests/gcs_placement_group_manager_test.cc index ad1d5e596a39..42f4b0956ede 100644 --- a/src/ray/gcs/tests/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/tests/gcs_placement_group_manager_test.cc @@ -131,10 +131,16 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { // resources for each bundle void MockReceivePrepareRequest( const std::shared_ptr &placement_group) { - int bundles_size = placement_group->GetPlacementGroupTableData().bundles_size(); - for (int bundle_index = 0; bundle_index < bundles_size; bundle_index++) { - placement_group->GetMutableBundle(bundle_index) - ->set_node_id(NodeID::FromRandom().Binary()); + // Mutate the primary scheduling strategy because 'bundles' is empty for PENDING PGs. + auto *strategy = placement_group->GetMutableSchedulingStrategy(); + if (strategy->size() > 0) { + auto *bundles = strategy->Mutable(0)->mutable_bundles(); + for (int i = 0; i < bundles->size(); i++) { + bundles->Mutable(i)->set_node_id(NodeID::FromRandom().Binary()); + } + // Manually set the active bundles to the updated strategy. + placement_group->UpdateActiveBundles( + placement_group->GetSchedulingStrategy().Get(0)); } } @@ -143,9 +149,15 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { void MockReceivePrepareRequestWithBundleIndexes( const std::shared_ptr &placement_group, const std::vector &bundle_indices) { - for (const auto &bundle_index : bundle_indices) { - placement_group->GetMutableBundle(bundle_index) - ->set_node_id(NodeID::FromRandom().Binary()); + auto *strategy = placement_group->GetMutableSchedulingStrategy(); + if (strategy->size() > 0) { + auto *bundles = strategy->Mutable(0)->mutable_bundles(); + for (const auto &bundle_index : bundle_indices) { + bundles->Mutable(bundle_index)->set_node_id(NodeID::FromRandom().Binary()); + } + // Manually set the active bundles to the updated strategy. + placement_group->UpdateActiveBundles( + placement_group->GetSchedulingStrategy().Get(0)); } } @@ -245,11 +257,13 @@ TEST_F(GcsPlacementGroupManagerTest, TestPlacementGroupBundleCache) { auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); ASSERT_TRUE(placement_group->cached_bundle_specs_.empty()); // Fill the cache and verify it. + placement_group->UpdateActiveBundles(placement_group->GetSchedulingStrategy().Get(0)); const auto &bundle_specs = placement_group->GetBundles(); ASSERT_EQ(placement_group->cached_bundle_specs_, bundle_specs); ASSERT_FALSE(placement_group->cached_bundle_specs_.empty()); // Invalidate the cache and verify it. - RAY_UNUSED(placement_group->GetMutableBundle(0)); + RAY_UNUSED( + placement_group->GetMutableSchedulingStrategy()->Mutable(0)->mutable_bundles(0)); ASSERT_TRUE(placement_group->cached_bundle_specs_.empty()); } @@ -856,8 +870,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulerReinitializeAfterGcsRestart) { ASSERT_EQ(mock_placement_group_scheduler_->GetPlacementGroupCount(), 1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); - placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary()); - placement_group->GetMutableBundle(1)->set_node_id(NodeID::FromRandom().Binary()); + auto *primary_bundles = + placement_group->GetMutableSchedulingStrategy()->Mutable(0)->mutable_bundles(); + primary_bundles->Mutable(0)->set_node_id(NodeID::FromRandom().Binary()); + primary_bundles->Mutable(1)->set_node_id(NodeID::FromRandom().Binary()); + mock_placement_group_scheduler_->placement_groups_.pop_back(); OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); diff --git a/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc index b54708fd84b1..cb997fd887fe 100644 --- a/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/tests/gcs_placement_group_scheduler_test.cc @@ -1378,6 +1378,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestCheckingWildcardResource) { /*name=*/"", /*strategy=*/rpc::PlacementStrategy::SPREAD, /*bundles_count=*/1); auto placement_group = std::make_shared(create_placement_group_request, "", counter_); + // PG has been scheduled and has active bundles set. + placement_group->UpdateActiveBundles(placement_group->GetSchedulingStrategy().Get(0)); int wildcard_resource_count = 0; for (const auto &bundle_spec : placement_group->GetBundles()) { for (const auto &resource_entry : bundle_spec->GetFormattedResources()) { @@ -1439,7 +1441,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyResources) { auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 20); // Create a fallback strategy with feasible bundles. - auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_strategy(); auto *bundle = fallback_option->add_bundles(); bundle->mutable_unit_resources()->insert({"CPU", 5.0}); bundle->mutable_bundle_id()->set_bundle_index(0); @@ -1486,16 +1488,25 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyLabels) { // Create a resource request with an infeasible label selector. auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 1); auto *primary_bundle = request.mutable_placement_group_spec()->mutable_bundles(0); - (*primary_bundle->mutable_label_selector())["cpu-family"] = "amd"; + + auto *constraint = primary_bundle->mutable_label_selector()->add_label_constraints(); + constraint->set_label_key("cpu-family"); + constraint->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + constraint->add_label_values("amd"); // Create a fallback strategy with identical bundle resources but feasible label. - auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_strategy(); auto *fallback_bundle = fallback_option->add_bundles(); fallback_bundle->mutable_unit_resources()->insert({"CPU", 1.0}); fallback_bundle->mutable_bundle_id()->set_bundle_index(0); fallback_bundle->mutable_bundle_id()->set_placement_group_id( request.placement_group_spec().placement_group_id()); - (*fallback_bundle->mutable_label_selector())["cpu-family"] = "intel"; + + auto *fallback_constraint = + fallback_bundle->mutable_label_selector()->add_label_constraints(); + fallback_constraint->set_label_key("cpu-family"); + fallback_constraint->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + fallback_constraint->add_label_values("intel"); auto placement_group = std::make_shared(request, "", counter_); @@ -1547,16 +1558,25 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyInfeasible) { // Create a resource request with an infeasible label selector. auto request = GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 1, 1); auto *primary_bundle = request.mutable_placement_group_spec()->mutable_bundles(0); - (*primary_bundle->mutable_label_selector())["cpu-family"] = "amd"; + + auto *constraint = primary_bundle->mutable_label_selector()->add_label_constraints(); + constraint->set_label_key("cpu-family"); + constraint->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + constraint->add_label_values("amd"); // Create a fallback strategy with an infeasible fallback. - auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_strategy(); auto *fallback_bundle = fallback_option->add_bundles(); fallback_bundle->mutable_unit_resources()->insert({"CPU", 1.0}); fallback_bundle->mutable_bundle_id()->set_bundle_index(0); fallback_bundle->mutable_bundle_id()->set_placement_group_id( request.placement_group_spec().placement_group_id()); - (*fallback_bundle->mutable_label_selector())["cpu-family"] = "arm"; + + auto *fallback_constraint = + fallback_bundle->mutable_label_selector()->add_label_constraints(); + fallback_constraint->set_label_key("cpu-family"); + fallback_constraint->set_operator_(rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + fallback_constraint->add_label_values("arm"); auto placement_group = std::make_shared(request, "", counter_); @@ -1566,9 +1586,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestFallbackStrategyInfeasible) { [this](std::shared_ptr pg, bool is_feasible) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(pg)); - // It should be feasible (valid request), just not currently - // schedulable. - ASSERT_TRUE(is_feasible); + ASSERT_FALSE(is_feasible); }, [this](std::shared_ptr pg) { absl::MutexLock lock(&placement_group_requests_mutex_); @@ -1597,7 +1615,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGResetFallbackToPrimaryBundles) { primary_bundle->mutable_unit_resources()->erase("CPU"); // Feasible fallback option. - auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_options(); + auto *fallback_option = request.mutable_placement_group_spec()->add_fallback_strategy(); auto *fallback_bundle = fallback_option->add_bundles(); (*fallback_bundle->mutable_unit_resources())["CPU"] = 1.0; fallback_bundle->mutable_bundle_id()->set_bundle_index(0); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index d20e57b14c29..35f32eb9884d 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -681,7 +681,7 @@ message Bundle { bytes node_id = 3; // Label selector associated with this bundle. // Populated from bundle_label_selector if provided. - map label_selector = 4; + LabelSelector label_selector = 4; } message PlacementGroupSpec { @@ -708,6 +708,8 @@ message PlacementGroupSpec { // Otherwise, the bundles can be placed elsewhere. // This only applies to STRICT_PACK pg. bytes soft_target_node_id = 11; + // The list of fallback options to try if the primary resource request cannot be satisfied. + repeated PlacementGroupSchedulingOption fallback_strategy = 12; } message ObjectReference { @@ -1073,6 +1075,11 @@ enum PlacementStrategy { STRICT_SPREAD = 3; } +// Represents a single scheduling option for a placement group. +message PlacementGroupSchedulingOption { + repeated Bundle bundles = 1; +} + // The type of operator to use for the label constraint. enum LabelSelectorOperator { LABEL_OPERATOR_UNSPECIFIED = 0; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index fc85fd19fbba..d50f43025938 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -356,12 +356,6 @@ message GcsNodeInfo { // The death info of this node. NodeDeathInfo death_info = 29; - - // Temporary directory of this node. - string temp_dir = 32; - - // Session directory of this node. - string session_dir = 33; } // A lighter version of GcsNodeInfo containing only essential fields. From f69cbaadbf9b3cfd1dcd7c11564d250674ce4847 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Fri, 6 Feb 2026 12:18:33 +0000 Subject: [PATCH 3/9] Fix loop to iterate over bundles_source Signed-off-by: ryanaoleary --- src/ray/gcs/gcs_autoscaler_state_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_autoscaler_state_manager.cc index e39fc49f257f..4f92a01f0391 100644 --- a/src/ray/gcs/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_autoscaler_state_manager.cc @@ -230,7 +230,7 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( bundles_source = pg_data.mutable_scheduling_strategy(0)->mutable_bundles(); } - for (auto &&bundle : std::move(*pg_data.mutable_bundles())) { + for (auto &&bundle : std::move(*bundles_source)) { if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { // We will be skipping **placed** bundle (which has node id associated with it). // This is to avoid double counting the bundles that are already placed when From fcefe96fb24ee86ef99f1793a8b1b429841d3965 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Fri, 6 Feb 2026 13:06:44 +0000 Subject: [PATCH 4/9] Fix cursor comments and rename field to be clear its not strategy Signed-off-by: ryanaoleary --- python/ray/_private/state.py | 18 ++++++++++++++++++ python/ray/util/placement_group.py | 11 ++++++----- src/ray/common/placement_group.h | 3 +-- src/ray/gcs/gcs_autoscaler_state_manager.cc | 4 ++-- src/ray/gcs/gcs_placement_group.cc | 4 ++-- src/ray/gcs/gcs_placement_group.h | 4 ++-- src/ray/gcs/gcs_placement_group_scheduler.cc | 14 +------------- src/ray/protobuf/gcs.proto | 2 +- 8 files changed, 33 insertions(+), 27 deletions(-) diff --git a/python/ray/_private/state.py b/python/ray/_private/state.py index 8d86df754a0d..39de2a77bc98 100644 --- a/python/ray/_private/state.py +++ b/python/ray/_private/state.py @@ -345,6 +345,24 @@ def get_strategy(strategy): stats = placement_group_info.stats assert placement_group_info is not None + + scheduling_strategy = [] + for strategy_proto in placement_group_info.scheduling_strategy: + bundles_list = [] + label_selectors_list = [] + for bundle in strategy_proto.bundles: + # Extract unit resources + bundle_dict = message_to_dict(bundle) + bundles_list.append(bundle_dict.get("unitResources", {})) + # Extract label selector from the bundle + label_selectors_list.append(message_to_dict(bundle.label_selector)) + + strategy_dict = { + "bundles": bundles_list, + "bundle_label_selector": label_selectors_list, + } + scheduling_strategy.append(strategy_dict) + return { "placement_group_id": binary_to_hex( placement_group_info.placement_group_id diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 6f9410492d2d..9d04c18216da 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -83,10 +83,11 @@ def ready(self) -> "ray._raylet.ObjectRef": _export_bundle_reservation_check_method_if_needed() - assert len(self.bundle_cache) != 0, ( - "ready() cannot be called on placement group object with a " - "bundle length == 0, current bundle length: " - f"{len(self.bundle_cache)}" + # We check _all_bundle_specs to ensure the placement group has at least + # one defined bundle strategy to wait on. + assert len(self._all_bundle_specs) != 0, ( + "ready() cannot be called on a placement group with no bundles defined. " + "Ensure the placement group was created with a non-empty list of bundles." ) return bundle_reservation_check.options( @@ -255,7 +256,7 @@ def placement_group( fallback_strategy, ) - return PlacementGroup(placement_group_id) + return PlacementGroup(placement_group_id, bundle_cache=bundles) @PublicAPI diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index 7a0fc9643d8b..a187bedecd03 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -127,7 +127,7 @@ class PlacementGroupSpecBuilder { if (current->second == 0) { resources.erase(current); } else { - (*mutable_unit_resources)[current->first] = current->second; + mutable_unit_resources->insert({current->first, current->second}); } } // Set the label selector for this bundle if provided in bundle_label_selector. @@ -159,7 +159,6 @@ class PlacementGroupSpecBuilder { } } } - return *this; } diff --git a/src/ray/gcs/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_autoscaler_state_manager.cc index 4f92a01f0391..721d8f418ff4 100644 --- a/src/ray/gcs/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_autoscaler_state_manager.cc @@ -226,8 +226,8 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( // trying to recover. auto *bundles_source = pg_data.mutable_bundles(); if (pg_state == rpc::PlacementGroupTableData::PENDING && bundles_source->empty() && - pg_data.scheduling_strategy_size() > 0) { - bundles_source = pg_data.mutable_scheduling_strategy(0)->mutable_bundles(); + pg_data.scheduling_options_size() > 0) { + bundles_source = pg_data.mutable_scheduling_options(0)->mutable_bundles(); } for (auto &&bundle : std::move(*bundles_source)) { diff --git a/src/ray/gcs/gcs_placement_group.cc b/src/ray/gcs/gcs_placement_group.cc index 3f8e576cb159..7777f3bf90ba 100644 --- a/src/ray/gcs/gcs_placement_group.cc +++ b/src/ray/gcs/gcs_placement_group.cc @@ -146,14 +146,14 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() { const google::protobuf::RepeatedPtrField &GcsPlacementGroup::GetSchedulingStrategy() const { - return placement_group_table_data_.scheduling_strategy(); + return placement_group_table_data_.scheduling_options(); } google::protobuf::RepeatedPtrField *GcsPlacementGroup::GetMutableSchedulingStrategy() { // Invalidate the cache because mutating the strategy. cached_bundle_specs_.clear(); - return placement_group_table_data_.mutable_scheduling_strategy(); + return placement_group_table_data_.mutable_scheduling_options(); } void GcsPlacementGroup::UpdateActiveBundles( diff --git a/src/ray/gcs/gcs_placement_group.h b/src/ray/gcs/gcs_placement_group.h index cb93b2bef223..616939159599 100644 --- a/src/ray/gcs/gcs_placement_group.h +++ b/src/ray/gcs/gcs_placement_group.h @@ -78,11 +78,11 @@ class GcsPlacementGroup { current_sys_time_ms()); // Construct scheduling strategy list. Index 0 contains the primary request. - auto *primary_option = placement_group_table_data_.add_scheduling_strategy(); + auto *primary_option = placement_group_table_data_.add_scheduling_options(); primary_option->mutable_bundles()->CopyFrom(placement_group_spec.bundles()); // Index 1..N: fallback strategies. - placement_group_table_data_.mutable_scheduling_strategy()->MergeFrom( + placement_group_table_data_.mutable_scheduling_options()->MergeFrom( placement_group_spec.fallback_strategy()); SetupStates(); diff --git a/src/ray/gcs/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_placement_group_scheduler.cc index b4bd0fb60c4d..b4c9fba5ad41 100644 --- a/src/ray/gcs/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_placement_group_scheduler.cc @@ -79,15 +79,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( const auto &scheduling_strategies = placement_group->GetSchedulingStrategy(); - bool is_initial_pending_schedule = - (placement_group->GetState() == rpc::PlacementGroupTableData::PENDING && - placement_group->GetBundles().empty()); - bool is_scheduling_all_bundles = (bundles.size() == placement_group->GetBundles().size()); - if ((is_initial_pending_schedule || is_scheduling_all_bundles) && - !scheduling_strategies.empty()) { + if (is_scheduling_all_bundles && !scheduling_strategies.empty()) { RAY_LOG(INFO) << "Scheduling whole Placement Group " << placement_group->GetPlacementGroupID() << " using primary strategy."; @@ -96,12 +91,6 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( bundles = placement_group->GetUnplacedBundles(); } - if (bundles.empty()) { - RAY_LOG(DEBUG) << "No bundles to schedule for PG " - << placement_group->GetPlacementGroupID(); - return; - } - auto scheduling_result = TrySchedule(placement_group, bundles, placement_group->GetStrategy()); bool any_strategy_feasible = !scheduling_result.status.IsInfeasible(); @@ -152,7 +141,6 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( << ", because current resources can't satisfy the required resource. IsFailed: " << result_status.IsFailed() << " IsInfeasible: " << result_status.IsInfeasible() << " IsPartialSuccess: " << result_status.IsPartialSuccess(); - // If the placement group creation has failed, // but if it is not infeasible, it is retryable to create. failure_callback(placement_group, /*is_feasible*/ any_strategy_feasible); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index d50f43025938..26a6dc6490ed 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -684,7 +684,7 @@ message PlacementGroupTableData { // The full list of scheduling options for this placement group. // Index 0: The original, primary request. // Index 1..N: The options defined in each fallback strategy. - repeated PlacementGroupSchedulingOption scheduling_strategy = 17; + repeated PlacementGroupSchedulingOption scheduling_options = 17; } message JobTableData { From 230690010006fd6367c65ced89c8da9ccaee7552 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Fri, 6 Feb 2026 22:23:38 +0000 Subject: [PATCH 5/9] Fix erroneously named field Signed-off-by: ryanaoleary --- python/ray/_private/state.py | 7 ++++--- python/ray/util/placement_group.py | 12 ++++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/state.py b/python/ray/_private/state.py index 39de2a77bc98..00e821820157 100644 --- a/python/ray/_private/state.py +++ b/python/ray/_private/state.py @@ -346,8 +346,8 @@ def get_strategy(strategy): stats = placement_group_info.stats assert placement_group_info is not None - scheduling_strategy = [] - for strategy_proto in placement_group_info.scheduling_strategy: + scheduling_options = [] + for strategy_proto in placement_group_info.scheduling_options: bundles_list = [] label_selectors_list = [] for bundle in strategy_proto.bundles: @@ -361,7 +361,7 @@ def get_strategy(strategy): "bundles": bundles_list, "bundle_label_selector": label_selectors_list, } - scheduling_strategy.append(strategy_dict) + scheduling_options.append(strategy_dict) return { "placement_group_id": binary_to_hex( @@ -391,6 +391,7 @@ def get_strategy(strategy): stats.scheduling_state ].name, }, + "scheduling_options": scheduling_options, } def _nanoseconds_to_microseconds(self, time_in_nanoseconds): diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 9d04c18216da..996c732c2f91 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -52,10 +52,11 @@ def __init__( self, id: "ray._raylet.PlacementGroupID", bundle_cache: Optional[List[Dict]] = None, + all_bundle_cache: Optional[List[Dict]] = None, ): self.id = id self.bundle_cache = bundle_cache - self._all_bundle_cache = None + self._all_bundle_cache = all_bundle_cache @property def is_empty(self): @@ -130,6 +131,9 @@ def _all_bundle_specs(self) -> List[Dict]: return self._all_bundle_cache or self.bundle_cache def _fill_bundle_cache_if_needed(self) -> None: + if self.bundle_cache is not None: + return + if not self.bundle_cache or not self._all_bundle_cache: cache_data = _get_bundle_cache(self.id) @@ -164,9 +168,9 @@ def _get_bundle_cache(pg_id: PlacementGroupID) -> Dict[str, List[Dict]]: active_bundles = list(table["bundles"].values()) # The list of bundles from all scheduling options. - if "scheduling_strategy" in table and table["scheduling_strategy"]: + if "scheduling_options" in table and table["scheduling_options"]: all_bundles = [] - for strategy in table["scheduling_strategy"]: + for strategy in table["scheduling_options"]: all_bundles.extend(strategy.get("bundles", [])) else: all_bundles = active_bundles @@ -256,7 +260,7 @@ def placement_group( fallback_strategy, ) - return PlacementGroup(placement_group_id, bundle_cache=bundles) + return PlacementGroup(placement_group_id) @PublicAPI From 3a74c66c4eabb5071606afc99cae316a3085d065 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Sat, 7 Feb 2026 02:39:00 +0000 Subject: [PATCH 6/9] Fix cacheing in placement group Signed-off-by: ryanaoleary --- python/ray/util/placement_group.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 996c732c2f91..0881e87b274d 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -131,14 +131,13 @@ def _all_bundle_specs(self) -> List[Dict]: return self._all_bundle_cache or self.bundle_cache def _fill_bundle_cache_if_needed(self) -> None: - if self.bundle_cache is not None: + if self.bundle_cache and self._all_bundle_cache: return - if not self.bundle_cache or not self._all_bundle_cache: - cache_data = _get_bundle_cache(self.id) + cache_data = _get_bundle_cache(self.id) - self.bundle_cache = cache_data["active"] - self._all_bundle_cache = cache_data["all"] + self.bundle_cache = cache_data["active"] + self._all_bundle_cache = cache_data["all"] def __eq__(self, other): if not isinstance(other, PlacementGroup): From 27e4a6ed6a98788bff427d0c35b9ab1b03f341b5 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Sat, 7 Feb 2026 02:41:31 +0000 Subject: [PATCH 7/9] Clang format Signed-off-by: ryanaoleary --- src/ray/protobuf/common.proto | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 35f32eb9884d..7a1ba420d464 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -708,7 +708,8 @@ message PlacementGroupSpec { // Otherwise, the bundles can be placed elsewhere. // This only applies to STRICT_PACK pg. bytes soft_target_node_id = 11; - // The list of fallback options to try if the primary resource request cannot be satisfied. + // The list of fallback options to try if the primary resource request cannot be + // scheduled. repeated PlacementGroupSchedulingOption fallback_strategy = 12; } From 8f6bbcbb45716555a20c903656f4923655cefb8c Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Sat, 7 Feb 2026 07:41:42 +0000 Subject: [PATCH 8/9] Fix truthiness of cache check Signed-off-by: ryanaoleary --- python/ray/util/placement_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 0881e87b274d..0481ef9533cf 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -131,7 +131,7 @@ def _all_bundle_specs(self) -> List[Dict]: return self._all_bundle_cache or self.bundle_cache def _fill_bundle_cache_if_needed(self) -> None: - if self.bundle_cache and self._all_bundle_cache: + if self.bundle_cache is not None and self._all_bundle_cache is not None: return cache_data = _get_bundle_cache(self.id) From 060e03dbaf64a5e8d1a0cbb87610614192aa0327 Mon Sep 17 00:00:00 2001 From: ryanaoleary Date: Sat, 7 Feb 2026 12:28:19 +0000 Subject: [PATCH 9/9] Fix cache call and comment Signed-off-by: ryanaoleary --- python/ray/tests/test_placement_group_fallback.py | 2 +- python/ray/util/placement_group.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_placement_group_fallback.py b/python/ray/tests/test_placement_group_fallback.py index 952e79a14a7c..098d1573615e 100644 --- a/python/ray/tests/test_placement_group_fallback.py +++ b/python/ray/tests/test_placement_group_fallback.py @@ -89,7 +89,7 @@ def test_placement_group_fallback_priority(ray_start_cluster): ray.init(address=cluster.address) fallback_strategy = [ - {"bundles": [{"CPU": 10}]}, # Infeasible + {"bundles": [{"CPU": 11}]}, # Infeasible {"bundles": [{"CPU": 5}]}, # Feasible {"bundles": [{"CPU": 1}]}, # Feasible ] diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 0481ef9533cf..74c5704af83d 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -131,7 +131,7 @@ def _all_bundle_specs(self) -> List[Dict]: return self._all_bundle_cache or self.bundle_cache def _fill_bundle_cache_if_needed(self) -> None: - if self.bundle_cache is not None and self._all_bundle_cache is not None: + if self.bundle_cache: return cache_data = _get_bundle_cache(self.id)