Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ray/raylet/scheduling/cluster_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ bool ClusterLeaseManager::CancelAllLeasesOwnedBy(
}

void ClusterLeaseManager::ScheduleAndGrantLeases() {
ClusterResourceScheduler::SchedulingRoundGuard guard(cluster_resource_scheduler_);

// Always try to schedule infeasible tasks in case they are now feasible.
TryScheduleInfeasibleLease();
std::deque<std::shared_ptr<internal::Work>> works_to_cancel;
Expand Down
47 changes: 43 additions & 4 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,52 @@ bool ClusterResourceScheduler::NodeAvailable(scheduling::NodeID node_id) const {
return false;
}

RAY_CHECK(is_node_available_fn_ != nullptr);
if (!is_node_available_fn_(node_id) ||
cluster_resource_manager_->IsNodeDraining(node_id)) {
// Draining status is never cached and always checked live
if (cluster_resource_manager_->IsNodeDraining(node_id)) {
return false;
}

return true;
if (!node_available_snapshot_.empty()) {
auto it = node_available_snapshot_.find(node_id);
if (it != node_available_snapshot_.end()) {
return it->second;
}
}

return NodeAvailableImpl(node_id);
}

bool ClusterResourceScheduler::NodeAvailableImpl(scheduling::NodeID node_id) const {
RAY_CHECK(is_node_available_fn_ != nullptr);
return is_node_available_fn_(node_id);
}

void ClusterResourceScheduler::BeginSchedulingRound() {
scheduling_round_depth_++;

if (scheduling_round_depth_ > 1) {
return;
}

node_available_snapshot_.clear();
for (const auto &[node_id, _] : cluster_resource_manager_->GetResourceView()) {
// Skip local node - it's always checked directly
if (node_id == local_node_id_) {
continue;
}
node_available_snapshot_[node_id] = NodeAvailableImpl(node_id);
}
}

void ClusterResourceScheduler::EndSchedulingRound() {
RAY_CHECK(scheduling_round_depth_ > 0)
<< "EndSchedulingRound called without matching BeginSchedulingRound";

scheduling_round_depth_--;

if (scheduling_round_depth_ == 0) {
node_available_snapshot_.clear();
}
}

bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request,
Expand Down
31 changes: 31 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ class ClusterResourceScheduler {

bool IsLocalNodeWithRaylet() { return is_local_node_with_raylet_; }

/// Call at the start of a scheduling round to snapshot node availability.
/// All subsequent NodeAvailable() calls will use this snapshot until
/// EndSchedulingRound() is called. This method is reentrant-safe.
void BeginSchedulingRound();

/// Clear the node availability snapshot. Must be called after BeginSchedulingRound().
void EndSchedulingRound();

/// RAII guard for managing a scheduling round.
class SchedulingRoundGuard {
public:
explicit SchedulingRoundGuard(ClusterResourceScheduler &scheduler)
: scheduler_(scheduler) {
scheduler_.BeginSchedulingRound();
}
~SchedulingRoundGuard() { scheduler_.EndSchedulingRound(); }

private:
ClusterResourceScheduler &scheduler_;
};

private:
void Init(instrumented_io_context &io_service,
const NodeResources &local_node_resources,
Expand All @@ -148,6 +169,10 @@ class ClusterResourceScheduler {

bool NodeAvailable(scheduling::NodeID node_id) const;

/// Check if a node is available without using the snapshot.
/// This is the actual implementation that queries is_node_available_fn_.
bool NodeAvailableImpl(scheduling::NodeID node_id) const;

/// Decrease the available resources of a node when a resource request is
/// scheduled on the given node.
///
Expand Down Expand Up @@ -228,6 +253,12 @@ class ClusterResourceScheduler {
/// Whether there is a raylet on the local node.
bool is_local_node_with_raylet_ = true;

/// Pre-computed node availability for the current scheduling round.
/// Empty when not in a scheduling round.
absl::flat_hash_map<scheduling::NodeID, bool> node_available_snapshot_;
/// Depth counter for reentrant BeginSchedulingRound/EndSchedulingRound calls.
int scheduling_round_depth_ = 0;

friend class ClusterResourceSchedulerTest;
FRIEND_TEST(ClusterResourceSchedulerTest, PopulatePredefinedResources);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/scheduling/local_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void LocalLeaseManager::WaitForLeaseArgsRequests(std::shared_ptr<internal::Work>
}

void LocalLeaseManager::ScheduleAndGrantLeases() {
ClusterResourceScheduler::SchedulingRoundGuard guard(cluster_resource_scheduler_);

GrantScheduledLeasesToWorkers();
// TODO(swang): Spill from waiting queue first? Otherwise, we may end up
// spilling a lease whose args are already local.
Expand Down
163 changes: 163 additions & 0 deletions src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class ClusterResourceSchedulerTest : public ::testing::Test {
scheduling::ResourceID(OBJECT_STORE_MEM).Binary());
ASSERT_EQ(ray::kMemory_ResourceLabel, scheduling::ResourceID(MEM).Binary());
}

bool NodeAvailable(const ClusterResourceScheduler &scheduler,
scheduling::NodeID node_id) {
return scheduler.NodeAvailable(node_id);
}

void AddOrUpdateNode(ClusterResourceScheduler &scheduler,
scheduling::NodeID node_id,
const NodeResources &resources) {
scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, resources);
}

std::unique_ptr<gcs::MockGcsClient> gcs_client_;
std::function<bool(scheduling::NodeID)> is_node_available_fn_;
std::string node_name;
Expand Down Expand Up @@ -2448,6 +2460,157 @@ TEST_F(ClusterResourceSchedulerTest, FallbackReturnsNilForGCSIfAllNodesUnavailab
ASSERT_TRUE(result_node.IsNil());
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotTest) {
// Test that BeginSchedulingRound() creates a snapshot that reduces
// is_node_available_fn_ calls from N*M to M (where N=checks, M=nodes)

instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

// Counter to track how many times is_node_available_fn_ is called
int call_count = 0;
auto is_node_available_fn = [&call_count](scheduling::NodeID node_id) {
call_count++;
return true; // All nodes are alive
};

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

// Add 5 remote nodes
for (int i = 1; i <= 5; i++) {
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, scheduling::NodeID(i), node_resources);
}

// Reset counter
call_count = 0;

// Without snapshot: each NodeAvailable call should invoke is_node_available_fn_
for (int i = 1; i <= 5; i++) {
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(i)));
}
ASSERT_EQ(call_count, 5) << "Without snapshot, should call is_node_available_fn_ 5 times";

// Reset counter
call_count = 0;

// With snapshot: SchedulingRoundGuard should call is_node_available_fn_ once per node
{
ClusterResourceScheduler::SchedulingRoundGuard guard(resource_scheduler);
ASSERT_EQ(call_count, 5) << "SchedulingRoundGuard should snapshot all 5 nodes";

// Reset counter to verify subsequent calls don't invoke is_node_available_fn_
call_count = 0;

// Call NodeAvailable multiple times for each node
for (int check = 0; check < 10; check++) {
for (int i = 1; i <= 5; i++) {
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(i)));
}
}

// Should have made 50 NodeAvailable calls (10 checks * 5 nodes)
// but is_node_available_fn_ should not be called at all (using snapshot)
ASSERT_EQ(call_count, 0)
<< "With snapshot active, should not call is_node_available_fn_";
}

// Reset counter
call_count = 0;

// After guard is destroyed, should go back to calling is_node_available_fn_
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 1)
<< "After SchedulingRoundGuard destroyed, should call is_node_available_fn_ again";
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotReentrantTest) {
// Test that nested Begin/End calls work correctly

instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

int call_count = 0;
auto is_node_available_fn = [&call_count](scheduling::NodeID node_id) {
call_count++;
return true;
};

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

// Add 2 remote nodes
for (int i = 1; i <= 2; i++) {
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, scheduling::NodeID(i), node_resources);
}

call_count = 0;

// First Begin - should create snapshot
resource_scheduler.BeginSchedulingRound();
int calls_after_first_begin = call_count;
ASSERT_EQ(calls_after_first_begin, 2) << "First Begin should snapshot 2 nodes";

// Second Begin (reentrant) - should NOT create new snapshot
resource_scheduler.BeginSchedulingRound();
ASSERT_EQ(call_count, calls_after_first_begin)
<< "Reentrant Begin should not re-snapshot";

// Check nodes - should use existing snapshot
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 0) << "Should use snapshot, not call is_node_available_fn_";

// First End - should NOT clear snapshot (still nested)
resource_scheduler.EndSchedulingRound();
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 0) << "After first End, should still use snapshot";

// Second End - should clear snapshot
resource_scheduler.EndSchedulingRound();
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 1) << "After second End, should call is_node_available_fn_";
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotDrainingTest) {
// Test that draining status is still checked even with snapshot

instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

// Node is alive according to is_node_available_fn_
auto is_node_available_fn = [](scheduling::NodeID node_id) {
return true;
};

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

// Add a remote node
scheduling::NodeID remote_node(1);
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, remote_node, node_resources);

// Begin scheduling round - node should be available in snapshot
{
ClusterResourceScheduler::SchedulingRoundGuard guard(resource_scheduler);
ASSERT_TRUE(NodeAvailable(resource_scheduler, remote_node))
<< "Node should be available initially";

// Mark node as draining
resource_scheduler.GetClusterResourceManager().SetNodeDraining(
remote_node, true, 0);

// Even though snapshot says node is alive, draining check should make it unavailable
ASSERT_FALSE(NodeAvailable(resource_scheduler, remote_node))
<< "Node should be unavailable when draining, even with snapshot";
}
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down