Skip to content

Commit 29f1016

Browse files
authored
Revert "[Core] Cancel lease requests before returning a PG bundle (#4… (ray-project#46091)
This reverts commit f0f52fa. Breaking https://github.com/orgs/anyscale/projects/76/views/1?pane=issue&itemId=67654997
1 parent 3f36b1b commit 29f1016

17 files changed

+237
-366
lines changed

python/ray/tests/test_gcs_fault_tolerance.py

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import sys
2-
import asyncio
32
import os
43
import threading
54
from time import sleep
@@ -23,8 +22,6 @@
2322
)
2423
from ray.job_submission import JobSubmissionClient, JobStatus
2524
from ray._raylet import GcsClient
26-
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
27-
from ray.util.state import list_placement_groups
2825

2926
import psutil
3027

@@ -1216,87 +1213,6 @@ def spawn(self, name, namespace):
12161213
raise ValueError(f"Unknown case: {case}")
12171214

12181215

1219-
MyPlugin = "MyPlugin"
1220-
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"
1221-
1222-
1223-
class HangPlugin(RuntimeEnvPlugin):
1224-
name = MyPlugin
1225-
1226-
async def create(
1227-
self,
1228-
uri,
1229-
runtime_env,
1230-
ctx,
1231-
logger, # noqa: F821
1232-
) -> float:
1233-
while True:
1234-
await asyncio.sleep(1)
1235-
1236-
@staticmethod
1237-
def validate(runtime_env_dict: dict) -> str:
1238-
return 1
1239-
1240-
1241-
@pytest.mark.parametrize(
1242-
"ray_start_regular_with_external_redis",
1243-
[
1244-
generate_system_config_map(
1245-
gcs_rpc_server_reconnect_timeout_s=60,
1246-
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
1247-
),
1248-
],
1249-
indirect=True,
1250-
)
1251-
@pytest.mark.parametrize(
1252-
"set_runtime_env_plugins",
1253-
[
1254-
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
1255-
],
1256-
indirect=True,
1257-
)
1258-
def test_placement_group_removal_after_gcs_restarts(
1259-
set_runtime_env_plugins, ray_start_regular_with_external_redis
1260-
):
1261-
@ray.remote
1262-
def task():
1263-
pass
1264-
1265-
pg = ray.util.placement_group(bundles=[{"CPU": 1}])
1266-
_ = task.options(
1267-
max_retries=0,
1268-
num_cpus=1,
1269-
scheduling_strategy=PlacementGroupSchedulingStrategy(
1270-
placement_group=pg,
1271-
),
1272-
runtime_env={
1273-
MyPlugin: {"name": "f2"},
1274-
"config": {"setup_timeout_seconds": -1},
1275-
},
1276-
).remote()
1277-
1278-
# The task should be popping worker
1279-
# TODO(jjyao) Use a more determinstic way to
1280-
# decide whether the task is popping worker
1281-
sleep(5)
1282-
1283-
ray.util.remove_placement_group(pg)
1284-
# The PG is marked as REMOVED in redis but not removed yet from raylet
1285-
# due to the injected delay of CancelResourceReserve rpc
1286-
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")
1287-
1288-
ray._private.worker._global_node.kill_gcs_server()
1289-
# After GCS restarts, it will try to remove the PG resources
1290-
# again via ReleaseUnusedBundles rpc
1291-
ray._private.worker._global_node.start_gcs_server()
1292-
1293-
def verify_pg_resources_cleaned():
1294-
r_keys = ray.available_resources().keys()
1295-
return all("group" not in k for k in r_keys)
1296-
1297-
wait_for_condition(verify_pg_resources_cleaned, timeout=30)
1298-
1299-
13001216
if __name__ == "__main__":
13011217

13021218
import pytest

python/ray/tests/test_placement_group_5.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,10 @@ async def create(
470470
) -> float:
471471
await asyncio.sleep(PLUGIN_TIMEOUT)
472472

473-
@staticmethod
474-
def validate(runtime_env_dict: dict) -> str:
475-
return 1
473+
474+
@staticmethod
475+
def validate(runtime_env_dict: dict) -> str:
476+
return 1
476477

477478

478479
@pytest.mark.parametrize(

src/ray/core_worker/core_worker_process.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
233233
thread.join();
234234

235235
RayConfig::instance().initialize(promise.get_future().get());
236-
ray::asio::testing::init();
237236
}
238237

239238
void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {

src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
243243
auto node_id = NodeID::FromBinary(node.value()->node_id());
244244

245245
if (max_retry == current_retry_cnt) {
246-
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
247-
"retry count is reached. "
248-
<< bundle_spec->DebugString() << " at node " << node_id;
246+
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
247+
"retry count is reached. "
248+
<< bundle_spec->DebugString() << " at node " << node_id;
249249
return;
250250
}
251251

@@ -261,10 +261,11 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
261261
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
262262
<< bundle_spec->DebugString() << " at node " << node_id;
263263
} else {
264-
// We couldn't delete the pg resources because of network issue. Retry.
265-
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
266-
<< bundle_spec->DebugString() << " at node " << node_id
267-
<< ". Status: " << status;
264+
// We couldn't delete the pg resources either becuase it is in use
265+
// or network issue. Retry.
266+
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
267+
<< bundle_spec->DebugString() << " at node " << node_id
268+
<< ". Status: " << status;
268269
execute_after(
269270
io_context_,
270271
[this, bundle_spec, node, max_retry, current_retry_cnt] {
@@ -567,10 +568,14 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
567568
for (const auto &iter : *(leasing_bundle_locations)) {
568569
auto &bundle_spec = iter.second.second;
569570
auto &node_id = iter.second.first;
570-
CancelResourceReserve(bundle_spec,
571-
gcs_node_manager_.GetAliveNode(node_id),
572-
/*max_retry*/ 5,
573-
/*num_retry*/ 0);
571+
CancelResourceReserve(
572+
bundle_spec,
573+
gcs_node_manager_.GetAliveNode(node_id),
574+
// Retry 10 * worker registeration timeout to avoid race condition.
575+
// See https://github.com/ray-project/ray/pull/42942
576+
// for more details.
577+
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
578+
/*num_retry*/ 0);
574579
}
575580
}
576581
}
@@ -589,10 +594,14 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
589594
for (const auto &iter : *(committed_bundle_locations)) {
590595
auto &bundle_spec = iter.second.second;
591596
auto &node_id = iter.second.first;
592-
CancelResourceReserve(bundle_spec,
593-
gcs_node_manager_.GetAliveNode(node_id),
594-
/*max_retry*/ 5,
595-
/*num_retry*/ 0);
597+
CancelResourceReserve(
598+
bundle_spec,
599+
gcs_node_manager_.GetAliveNode(node_id),
600+
// Retry 10 * worker registeration timeout to avoid race condition.
601+
// See https://github.com/ray-project/ray/pull/42942
602+
// for more details.
603+
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
604+
/*num_retry*/ 0);
596605
}
597606
committed_bundle_location_index_.Erase(placement_group_id);
598607
cluster_resource_scheduler_.GetClusterResourceManager()

src/ray/gcs/gcs_server/gcs_server_main.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ int main(int argc, char *argv[]) {
6262
gflags::ShutDownCommandLineFlags();
6363

6464
RayConfig::instance().initialize(config_list);
65-
ray::asio::testing::init();
6665

6766
// IO Service for main loop.
6867
instrumented_io_context main_service;

src/ray/raylet/local_task_manager.cc

Lines changed: 59 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -546,15 +546,21 @@ bool LocalTaskManager::PoppedWorkerHandler(
546546
not_detached_with_owner_failed = true;
547547
}
548548

549-
if (!canceled) {
550-
const auto &required_resource =
551-
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
552-
for (auto &entry : required_resource) {
553-
// This is to make sure PG resource is not deleted during popping worker
554-
// unless the lease request is cancelled.
555-
RAY_CHECK(cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
556-
scheduling::ResourceID(entry.first)))
557-
<< entry.first;
549+
const auto &required_resource =
550+
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
551+
for (auto &entry : required_resource) {
552+
if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
553+
scheduling::ResourceID(entry.first))) {
554+
RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first !=
555+
PlacementGroupID::Nil());
556+
RAY_LOG(DEBUG) << "The placement group: "
557+
<< task.GetTaskSpecification().PlacementGroupBundleId().first
558+
<< " was removed when poping workers for task: " << task_id
559+
<< ", will cancel the task.";
560+
CancelTask(
561+
task_id,
562+
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED);
563+
canceled = true;
558564
}
559565
}
560566

@@ -849,7 +855,7 @@ void LocalTaskManager::ReleaseTaskArgs(const TaskID &task_id) {
849855
}
850856

851857
namespace {
852-
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
858+
void ReplyCancelled(std::shared_ptr<internal::Work> &work,
853859
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
854860
const std::string &scheduling_failure_message) {
855861
auto reply = work->reply;
@@ -861,67 +867,55 @@ void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
861867
}
862868
} // namespace
863869

864-
bool LocalTaskManager::CancelTasks(
865-
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
870+
bool LocalTaskManager::CancelTask(
871+
const TaskID &task_id,
866872
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
867873
const std::string &scheduling_failure_message) {
868-
bool tasks_cancelled = false;
869-
870-
ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
871-
tasks_to_dispatch_, [&](const std::shared_ptr<internal::Work> &work) {
872-
if (predicate(work)) {
873-
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
874-
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
875-
ReplyCancelled(work, failure_type, scheduling_failure_message);
876-
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
877-
// We've already acquired resources so we need to release them.
878-
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
879-
work->allocated_instances);
880-
// Release pinned task args.
881-
ReleaseTaskArgs(task_id);
882-
}
883-
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
884-
task_dependency_manager_.RemoveTaskDependencies(
885-
work->task.GetTaskSpecification().TaskId());
886-
}
887-
RemoveFromRunningTasksIfExists(work->task);
888-
work->SetStateCancelled();
889-
tasks_cancelled = true;
890-
return true;
891-
} else {
892-
return false;
874+
for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end();
875+
shapes_it++) {
876+
auto &work_queue = shapes_it->second;
877+
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
878+
const auto &task = (*work_it)->task;
879+
if (task.GetTaskSpecification().TaskId() == task_id) {
880+
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
881+
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
882+
if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
883+
// We've already acquired resources so we need to release them.
884+
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
885+
(*work_it)->allocated_instances);
886+
// Release pinned task args.
887+
ReleaseTaskArgs(task_id);
893888
}
894-
});
895-
896-
ray::erase_if<std::shared_ptr<internal::Work>>(
897-
waiting_task_queue_, [&](const std::shared_ptr<internal::Work> &work) {
898-
if (predicate(work)) {
899-
ReplyCancelled(work, failure_type, scheduling_failure_message);
900-
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
901-
task_dependency_manager_.RemoveTaskDependencies(
902-
work->task.GetTaskSpecification().TaskId());
903-
}
904-
waiting_tasks_index_.erase(work->task.GetTaskSpecification().TaskId());
905-
tasks_cancelled = true;
906-
return true;
907-
} else {
908-
return false;
889+
if (!task.GetTaskSpecification().GetDependencies().empty()) {
890+
task_dependency_manager_.RemoveTaskDependencies(
891+
task.GetTaskSpecification().TaskId());
892+
}
893+
RemoveFromRunningTasksIfExists(task);
894+
(*work_it)->SetStateCancelled();
895+
work_queue.erase(work_it);
896+
if (work_queue.empty()) {
897+
tasks_to_dispatch_.erase(shapes_it);
909898
}
910-
});
899+
return true;
900+
}
901+
}
902+
}
911903

912-
return tasks_cancelled;
913-
}
904+
auto iter = waiting_tasks_index_.find(task_id);
905+
if (iter != waiting_tasks_index_.end()) {
906+
const auto &task = (*iter->second)->task;
907+
ReplyCancelled(*iter->second, failure_type, scheduling_failure_message);
908+
if (!task.GetTaskSpecification().GetDependencies().empty()) {
909+
task_dependency_manager_.RemoveTaskDependencies(
910+
task.GetTaskSpecification().TaskId());
911+
}
912+
waiting_task_queue_.erase(iter->second);
913+
waiting_tasks_index_.erase(iter);
914914

915-
bool LocalTaskManager::CancelTask(
916-
const TaskID &task_id,
917-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
918-
const std::string &scheduling_failure_message) {
919-
return CancelTasks(
920-
[task_id](const std::shared_ptr<internal::Work> &work) {
921-
return work->task.GetTaskSpecification().TaskId() == task_id;
922-
},
923-
failure_type,
924-
scheduling_failure_message);
915+
return true;
916+
}
917+
918+
return false;
925919
}
926920

927921
bool LocalTaskManager::AnyPendingTasksForResourceAcquisition(

src/ray/raylet/local_task_manager.h

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,17 @@ class LocalTaskManager : public ILocalTaskManager {
111111
/// \param task: Output parameter.
112112
void TaskFinished(std::shared_ptr<WorkerInterface> worker, RayTask *task);
113113

114-
/// Attempt to cancel all queued tasks that match the predicate.
114+
/// Attempt to cancel an already queued task.
115115
///
116-
/// \param predicate: A function that returns true if a task needs to be cancelled.
117-
/// \param failure_type: The reason for cancellation.
118-
/// \param scheduling_failure_message: The reason message for cancellation.
119-
/// \return True if any task was successfully cancelled.
120-
bool CancelTasks(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
121-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
122-
const std::string &scheduling_failure_message) override;
116+
/// \param task_id: The id of the task to remove.
117+
/// \param failure_type: The failure type.
118+
///
119+
/// \return True if task was successfully removed. This function will return
120+
/// false if the task is already running.
121+
bool CancelTask(const TaskID &task_id,
122+
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
123+
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
124+
const std::string &scheduling_failure_message = "") override;
123125

124126
/// Return if any tasks are pending resource acquisition.
125127
///
@@ -201,18 +203,6 @@ class LocalTaskManager : public ILocalTaskManager {
201203
const rpc::Address &owner_address,
202204
const std::string &runtime_env_setup_error_message);
203205

204-
/// Attempt to cancel an already queued task.
205-
///
206-
/// \param task_id: The id of the task to remove.
207-
/// \param failure_type: The failure type.
208-
///
209-
/// \return True if task was successfully removed. This function will return
210-
/// false if the task is already running.
211-
bool CancelTask(const TaskID &task_id,
212-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
213-
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
214-
const std::string &scheduling_failure_message = "");
215-
216206
/// Attempts to dispatch all tasks which are ready to run. A task
217207
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still
218208
/// available resources on the node.

0 commit comments

Comments
 (0)