Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ray/raylet/scheduling/cluster_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ bool ClusterLeaseManager::CancelAllLeasesOwnedBy(
}

void ClusterLeaseManager::ScheduleAndGrantLeases() {
ClusterResourceScheduler::SchedulingRoundGuard guard(cluster_resource_scheduler_);

// Always try to schedule infeasible tasks in case they are now feasible.
TryScheduleInfeasibleLease();
std::deque<std::shared_ptr<internal::Work>> works_to_cancel;
Expand Down
48 changes: 44 additions & 4 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,53 @@ bool ClusterResourceScheduler::NodeAvailable(scheduling::NodeID node_id) const {
return false;
}

if (!node_available_snapshot_.empty()) {
auto it = node_available_snapshot_.find(node_id);
if (it != node_available_snapshot_.end()) {
return it->second;
}
}

// Not in a round or node not in snapshot (e.g. newly added): full live check.
return IsRemoteNodeAvailable(node_id);
}

bool ClusterResourceScheduler::IsRemoteNodeAvailable(scheduling::NodeID node_id) const {
return !cluster_resource_manager_->IsNodeDraining(node_id) &&
NodeAvailableImpl(node_id);
}

bool ClusterResourceScheduler::NodeAvailableImpl(scheduling::NodeID node_id) const {
RAY_CHECK(is_node_available_fn_ != nullptr);
if (!is_node_available_fn_(node_id) ||
cluster_resource_manager_->IsNodeDraining(node_id)) {
return false;
return is_node_available_fn_(node_id);
}

void ClusterResourceScheduler::BeginSchedulingRound() {
scheduling_round_depth_++;

if (scheduling_round_depth_ > 1) {
return;
}

return true;
node_available_snapshot_.clear();
for (const auto &[node_id, _] : cluster_resource_manager_->GetResourceView()) {
// Skip local node - it's always checked directly
if (node_id == local_node_id_) {
continue;
}
node_available_snapshot_[node_id] = IsRemoteNodeAvailable(node_id);
}
}

void ClusterResourceScheduler::EndSchedulingRound() {
RAY_CHECK(scheduling_round_depth_ > 0)
<< "EndSchedulingRound called without matching BeginSchedulingRound";

scheduling_round_depth_--;

if (scheduling_round_depth_ == 0) {
node_available_snapshot_.clear();
}
}

bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request,
Expand Down
40 changes: 40 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,32 @@ class ClusterResourceScheduler {

bool IsLocalNodeWithRaylet() { return is_local_node_with_raylet_; }

/// Call at the start of a scheduling round to snapshot node availability.
/// All subsequent NodeAvailable() calls will use this snapshot until
/// EndSchedulingRound() is called. This method is reentrant-safe.
void BeginSchedulingRound();

/// Clear the node availability snapshot. Must be called after BeginSchedulingRound().
void EndSchedulingRound();

/// RAII guard for managing a scheduling round.
class SchedulingRoundGuard {
public:
explicit SchedulingRoundGuard(ClusterResourceScheduler &scheduler)
: scheduler_(scheduler) {
scheduler_.BeginSchedulingRound();
}
~SchedulingRoundGuard() { scheduler_.EndSchedulingRound(); }

SchedulingRoundGuard(const SchedulingRoundGuard &) = delete;
SchedulingRoundGuard &operator=(const SchedulingRoundGuard &) = delete;
SchedulingRoundGuard(SchedulingRoundGuard &&) = delete;
SchedulingRoundGuard &operator=(SchedulingRoundGuard &&) = delete;

private:
ClusterResourceScheduler &scheduler_;
};

private:
void Init(instrumented_io_context &io_service,
const NodeResources &local_node_resources,
Expand All @@ -148,6 +174,14 @@ class ClusterResourceScheduler {

bool NodeAvailable(scheduling::NodeID node_id) const;

/// Full availability for a remote node (liveness + not draining), no snapshot.
/// Used when building the snapshot and in NodeAvailable() fallback.
bool IsRemoteNodeAvailable(scheduling::NodeID node_id) const;

/// Check if a node is available without using the snapshot.
/// This is the actual implementation that queries is_node_available_fn_.
bool NodeAvailableImpl(scheduling::NodeID node_id) const;

/// Decrease the available resources of a node when a resource request is
/// scheduled on the given node.
///
Expand Down Expand Up @@ -228,6 +262,12 @@ class ClusterResourceScheduler {
/// Whether there is a raylet on the local node.
bool is_local_node_with_raylet_ = true;

/// Pre-computed node availability for the current scheduling round.
/// Empty when not in a scheduling round.
absl::flat_hash_map<scheduling::NodeID, bool> node_available_snapshot_;
/// Depth counter for reentrant BeginSchedulingRound/EndSchedulingRound calls.
int scheduling_round_depth_ = 0;

friend class ClusterResourceSchedulerTest;
FRIEND_TEST(ClusterResourceSchedulerTest, PopulatePredefinedResources);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/scheduling/local_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void LocalLeaseManager::WaitForLeaseArgsRequests(std::shared_ptr<internal::Work>
}

void LocalLeaseManager::ScheduleAndGrantLeases() {
ClusterResourceScheduler::SchedulingRoundGuard guard(cluster_resource_scheduler_);

GrantScheduledLeasesToWorkers();
// TODO(swang): Spill from waiting queue first? Otherwise, we may end up
// spilling a lease whose args are already local.
Expand Down
156 changes: 156 additions & 0 deletions src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class ClusterResourceSchedulerTest : public ::testing::Test {
scheduling::ResourceID(OBJECT_STORE_MEM).Binary());
ASSERT_EQ(ray::kMemory_ResourceLabel, scheduling::ResourceID(MEM).Binary());
}

bool NodeAvailable(const ClusterResourceScheduler &scheduler,
scheduling::NodeID node_id) {
return scheduler.NodeAvailable(node_id);
}

void AddOrUpdateNode(ClusterResourceScheduler &scheduler,
scheduling::NodeID node_id,
const NodeResources &resources) {
scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, resources);
}

std::unique_ptr<gcs::MockGcsClient> gcs_client_;
std::function<bool(scheduling::NodeID)> is_node_available_fn_;
std::string node_name;
Expand Down Expand Up @@ -2448,6 +2460,150 @@ TEST_F(ClusterResourceSchedulerTest, FallbackReturnsNilForGCSIfAllNodesUnavailab
ASSERT_TRUE(result_node.IsNil());
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotTest) {
// Test that BeginSchedulingRound() creates a snapshot that reduces
// is_node_available_fn_ calls from N*M to M (where N=checks, M=nodes)

instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

// Counter to track how many times is_node_available_fn_ is called
int call_count = 0;
auto is_node_available_fn = [&call_count](scheduling::NodeID node_id) {
call_count++;
return true; // All nodes are alive
};

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

// Add 5 remote nodes
for (int i = 1; i <= 5; i++) {
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, scheduling::NodeID(i), node_resources);
}

// Reset counter
call_count = 0;

// Without snapshot: each NodeAvailable call should invoke is_node_available_fn_
for (int i = 1; i <= 5; i++) {
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(i)));
}
ASSERT_EQ(call_count, 5) << "Without snapshot, should call is_node_available_fn_ 5 times";

// Reset counter
call_count = 0;

// With snapshot: SchedulingRoundGuard should call is_node_available_fn_ once per node
{
ClusterResourceScheduler::SchedulingRoundGuard guard(resource_scheduler);
ASSERT_EQ(call_count, 5) << "SchedulingRoundGuard should snapshot all 5 nodes";

// Reset counter to verify subsequent calls don't invoke is_node_available_fn_
call_count = 0;

// Call NodeAvailable multiple times for each node
for (int check = 0; check < 10; check++) {
for (int i = 1; i <= 5; i++) {
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(i)));
}
}

// Should have made 50 NodeAvailable calls (10 checks * 5 nodes)
// but is_node_available_fn_ should not be called at all (using snapshot)
ASSERT_EQ(call_count, 0)
<< "With snapshot active, should not call is_node_available_fn_";
}

// Reset counter
call_count = 0;

// After guard is destroyed, should go back to calling is_node_available_fn_
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 1)
<< "After SchedulingRoundGuard destroyed, should call is_node_available_fn_ again";
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotReentrantTest) {
// Test that nested Begin/End calls work correctly

instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

int call_count = 0;
auto is_node_available_fn = [&call_count](scheduling::NodeID node_id) {
call_count++;
return true;
};

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

// Add 2 remote nodes
for (int i = 1; i <= 2; i++) {
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, scheduling::NodeID(i), node_resources);
}

call_count = 0;

// First Begin - should create snapshot
resource_scheduler.BeginSchedulingRound();
int calls_after_first_begin = call_count;
ASSERT_EQ(calls_after_first_begin, 2) << "First Begin should snapshot 2 nodes";

// Second Begin (reentrant) - should NOT create new snapshot
resource_scheduler.BeginSchedulingRound();
ASSERT_EQ(call_count, calls_after_first_begin)
<< "Reentrant Begin should not re-snapshot";

// Check nodes - should use existing snapshot
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 0) << "Should use snapshot, not call is_node_available_fn_";

// First End - should NOT clear snapshot (still nested)
resource_scheduler.EndSchedulingRound();
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 0) << "After first End, should still use snapshot";

// Second End - should clear snapshot
resource_scheduler.EndSchedulingRound();
call_count = 0;
ASSERT_TRUE(NodeAvailable(resource_scheduler, scheduling::NodeID(1)));
ASSERT_EQ(call_count, 1) << "After second End, should call is_node_available_fn_";
}

TEST_F(ClusterResourceSchedulerTest, NodeAvailableSnapshotDrainingTest) {
// Snapshot includes draining; Raylet is single-threaded so draining cannot
// change mid-round. Test that a node marked draining before the round is
// snapshotted as unavailable.
instrumented_io_context io_context;
auto local_node_id = scheduling::NodeID(0);

auto is_node_available_fn = [](scheduling::NodeID node_id) { return true; };

ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 8}}, is_node_available_fn, fake_gauge_);

scheduling::NodeID remote_node(1);
NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 8.0}});
AddOrUpdateNode(resource_scheduler, remote_node, node_resources);

// Mark node as draining before the scheduling round
resource_scheduler.GetClusterResourceManager().SetNodeDraining(
remote_node, true, 0);

{
ClusterResourceScheduler::SchedulingRoundGuard guard(resource_scheduler);
// Snapshot was taken with draining=true, so node is unavailable for the round
ASSERT_FALSE(NodeAvailable(resource_scheduler, remote_node))
<< "Node should be snapshotted as unavailable when draining";
}
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down