Skip to content

Commit 7cf6817

Browse files
authored
[cherry-pick] [core] Fix deadlock when cancelling stale requests on in-order actors (#57746) (#57768)
## Description Cherry picking #57746 Signed-off-by: dayshah <dhyey2019@gmail.com>
1 parent ed6d638 commit 7cf6817

File tree

4 files changed

+151
-153
lines changed

4 files changed

+151
-153
lines changed

python/ray/tests/test_core_worker_fault_tolerance.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,85 @@
1+
import sys
2+
3+
import numpy as np
14
import pytest
25

36
import ray
7+
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
8+
9+
10+
@pytest.mark.parametrize(
11+
"allow_out_of_order_execution",
12+
[True, False],
13+
)
14+
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
15+
def test_push_actor_task_failure(
16+
monkeypatch,
17+
ray_start_cluster,
18+
allow_out_of_order_execution: bool,
19+
deterministic_failure: str,
20+
):
21+
with monkeypatch.context() as m:
22+
m.setenv(
23+
"RAY_testing_rpc_failure",
24+
"CoreWorkerService.grpc_client.PushTask=2:"
25+
+ ("100:0" if deterministic_failure == "request" else "0:100"),
26+
)
27+
m.setenv("RAY_actor_scheduling_queue_max_reorder_wait_seconds", "0")
28+
cluster = ray_start_cluster
29+
cluster.add_node(num_cpus=1)
30+
ray.init(address=cluster.address)
31+
32+
@ray.remote(
33+
max_task_retries=-1,
34+
allow_out_of_order_execution=allow_out_of_order_execution,
35+
)
36+
class RetryActor:
37+
def echo(self, value):
38+
return value
39+
40+
refs = []
41+
actor = RetryActor.remote()
42+
for i in range(10):
43+
refs.append(actor.echo.remote(i))
44+
assert ray.get(refs) == list(range(10))
45+
46+
47+
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
48+
def test_update_object_location_batch_failure(
49+
monkeypatch, ray_start_cluster, deterministic_failure
50+
):
51+
with monkeypatch.context() as m:
52+
m.setenv(
53+
"RAY_testing_rpc_failure",
54+
"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:"
55+
+ ("100:0" if deterministic_failure == "request" else "0:100"),
56+
)
57+
cluster = ray_start_cluster
58+
head_node_id = cluster.add_node(
59+
num_cpus=0,
60+
).node_id
61+
ray.init(address=cluster.address)
62+
worker_node_id = cluster.add_node(num_cpus=1).node_id
63+
64+
@ray.remote(num_cpus=1)
65+
def create_large_object():
66+
return np.zeros(100 * 1024 * 1024, dtype=np.uint8)
67+
68+
@ray.remote(num_cpus=0)
69+
def consume_large_object(obj):
70+
return sys.getsizeof(obj)
71+
72+
obj_ref = create_large_object.options(
73+
scheduling_strategy=NodeAffinitySchedulingStrategy(
74+
node_id=worker_node_id, soft=False
75+
)
76+
).remote()
77+
consume_ref = consume_large_object.options(
78+
scheduling_strategy=NodeAffinitySchedulingStrategy(
79+
node_id=head_node_id, soft=False
80+
)
81+
).remote(obj_ref)
82+
assert ray.get(consume_ref, timeout=10) > 0
483

584

685
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])

python/ray/tests/test_failure.py

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
init_error_pubsub,
1818
)
1919
from ray.exceptions import ActorDiedError, GetTimeoutError, RayActorError, RayTaskError
20-
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
2120

2221

2322
def test_unhandled_errors(ray_start_regular):
@@ -688,70 +687,6 @@ def func():
688687
caplog.clear()
689688

690689

691-
def test_transient_error_retry(monkeypatch, ray_start_cluster):
692-
with monkeypatch.context() as m:
693-
# This test submits 200 tasks with infinite retries and verifies that all tasks eventually succeed in the unstable network environment.
694-
m.setenv(
695-
"RAY_testing_rpc_failure",
696-
"CoreWorkerService.grpc_client.PushTask=100:25:25",
697-
)
698-
cluster = ray_start_cluster
699-
cluster.add_node(
700-
num_cpus=1,
701-
resources={"head": 1},
702-
)
703-
ray.init(address=cluster.address)
704-
705-
@ray.remote(max_task_retries=-1, resources={"head": 1})
706-
class RetryActor:
707-
def echo(self, value):
708-
return value
709-
710-
refs = []
711-
actor = RetryActor.remote()
712-
for i in range(200):
713-
refs.append(actor.echo.remote(i))
714-
assert ray.get(refs) == list(range(200))
715-
716-
717-
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
718-
def test_update_object_location_batch_failure(
719-
monkeypatch, ray_start_cluster, deterministic_failure
720-
):
721-
with monkeypatch.context() as m:
722-
m.setenv(
723-
"RAY_testing_rpc_failure",
724-
"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:"
725-
+ ("100:0" if deterministic_failure == "request" else "0:100"),
726-
)
727-
cluster = ray_start_cluster
728-
head_node_id = cluster.add_node(
729-
num_cpus=0,
730-
).node_id
731-
ray.init(address=cluster.address)
732-
worker_node_id = cluster.add_node(num_cpus=1).node_id
733-
734-
@ray.remote(num_cpus=1)
735-
def create_large_object():
736-
return np.zeros(100 * 1024 * 1024, dtype=np.uint8)
737-
738-
@ray.remote(num_cpus=0)
739-
def consume_large_object(obj):
740-
return sys.getsizeof(obj)
741-
742-
obj_ref = create_large_object.options(
743-
scheduling_strategy=NodeAffinitySchedulingStrategy(
744-
node_id=worker_node_id, soft=False
745-
)
746-
).remote()
747-
consume_ref = consume_large_object.options(
748-
scheduling_strategy=NodeAffinitySchedulingStrategy(
749-
node_id=head_node_id, soft=False
750-
)
751-
).remote(obj_ref)
752-
assert ray.get(consume_ref, timeout=10) > 0
753-
754-
755690
def test_raytaskerror_serialization(ray_start_regular):
756691
"""Test that RayTaskError with dual exception instances can be properly serialized."""
757692
import ray.cloudpickle as pickle

src/ray/core_worker/task_execution/task_receiver.cc

Lines changed: 71 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,9 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
171171
if (canceled_task_spec.IsActorTask()) {
172172
// If task cancelation is due to worker shutdown, propagate that information
173173
// to the submitter.
174-
bool is_worker_exiting = false;
175-
{
176-
absl::MutexLock lock(&stop_mu_);
177-
is_worker_exiting = stopping_;
178-
if (stopping_) {
179-
reply->set_worker_exiting(true);
180-
reply->set_was_cancelled_before_running(true);
181-
}
182-
}
183-
if (is_worker_exiting) {
174+
if (stopping_) {
175+
reply->set_worker_exiting(true);
176+
reply->set_was_cancelled_before_running(true);
184177
canceled_send_reply_callback(Status::OK(), nullptr, nullptr);
185178
} else {
186179
canceled_send_reply_callback(status, nullptr, nullptr);
@@ -191,82 +184,79 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
191184
}
192185
};
193186

194-
{
195-
absl::MutexLock lock(&stop_mu_);
196-
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
197-
if (stopping_) {
198-
reply->set_was_cancelled_before_running(true);
199-
if (task_spec.IsActorTask()) {
200-
reply->set_worker_exiting(true);
201-
}
202-
send_reply_callback(Status::OK(), nullptr, nullptr);
203-
return;
187+
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
188+
if (stopping_) {
189+
reply->set_was_cancelled_before_running(true);
190+
if (task_spec.IsActorTask()) {
191+
reply->set_worker_exiting(true);
204192
}
193+
send_reply_callback(Status::OK(), nullptr, nullptr);
194+
return;
195+
}
205196

206-
if (task_spec.IsActorCreationTask()) {
207-
SetupActor(task_spec.IsAsyncioActor(),
208-
task_spec.MaxActorConcurrency(),
209-
task_spec.AllowOutOfOrderExecution());
210-
}
197+
if (task_spec.IsActorCreationTask()) {
198+
SetupActor(task_spec.IsAsyncioActor(),
199+
task_spec.MaxActorConcurrency(),
200+
task_spec.AllowOutOfOrderExecution());
201+
}
211202

212-
if (!task_spec.IsActorTask()) {
213-
resource_ids = ResourceMappingType{};
214-
for (const auto &mapping : request.resource_mapping()) {
215-
std::vector<std::pair<int64_t, double>> rids;
216-
rids.reserve(mapping.resource_ids().size());
217-
for (const auto &ids : mapping.resource_ids()) {
218-
rids.emplace_back(ids.index(), ids.quantity());
219-
}
220-
(*resource_ids)[mapping.name()] = std::move(rids);
203+
if (!task_spec.IsActorTask()) {
204+
resource_ids = ResourceMappingType{};
205+
for (const auto &mapping : request.resource_mapping()) {
206+
std::vector<std::pair<int64_t, double>> rids;
207+
rids.reserve(mapping.resource_ids().size());
208+
for (const auto &ids : mapping.resource_ids()) {
209+
rids.emplace_back(ids.index(), ids.quantity());
221210
}
211+
(*resource_ids)[mapping.name()] = std::move(rids);
222212
}
213+
}
223214

224-
if (task_spec.IsActorTask()) {
225-
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
226-
if (it == actor_scheduling_queues_.end()) {
227-
it = actor_scheduling_queues_
228-
.emplace(
229-
task_spec.CallerWorkerId(),
230-
allow_out_of_order_execution_
231-
? std::unique_ptr<SchedulingQueue>(
232-
std::make_unique<OutOfOrderActorSchedulingQueue>(
233-
task_execution_service_,
234-
waiter_,
235-
task_event_buffer_,
236-
pool_manager_,
237-
fiber_state_manager_,
238-
is_asyncio_,
239-
fiber_max_concurrency_,
240-
concurrency_groups_))
241-
: std::unique_ptr<
242-
SchedulingQueue>(std::make_unique<ActorSchedulingQueue>(
243-
task_execution_service_,
244-
waiter_,
245-
task_event_buffer_,
246-
pool_manager_,
247-
RayConfig::instance()
248-
.actor_scheduling_queue_max_reorder_wait_seconds())))
249-
.first;
250-
}
251-
252-
auto accept_callback = make_accept_callback();
253-
it->second->Add(request.sequence_number(),
254-
request.client_processed_up_to(),
255-
std::move(accept_callback),
256-
std::move(cancel_callback),
257-
std::move(send_reply_callback),
258-
std::move(task_spec));
259-
} else {
260-
RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId()
261-
<< " to normal scheduling task queue.";
262-
auto accept_callback = make_accept_callback();
263-
normal_scheduling_queue_->Add(request.sequence_number(),
264-
request.client_processed_up_to(),
265-
std::move(accept_callback),
266-
std::move(cancel_callback),
267-
std::move(send_reply_callback),
268-
std::move(task_spec));
215+
if (task_spec.IsActorTask()) {
216+
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
217+
if (it == actor_scheduling_queues_.end()) {
218+
it = actor_scheduling_queues_
219+
.emplace(
220+
task_spec.CallerWorkerId(),
221+
allow_out_of_order_execution_
222+
? std::unique_ptr<SchedulingQueue>(
223+
std::make_unique<OutOfOrderActorSchedulingQueue>(
224+
task_execution_service_,
225+
waiter_,
226+
task_event_buffer_,
227+
pool_manager_,
228+
fiber_state_manager_,
229+
is_asyncio_,
230+
fiber_max_concurrency_,
231+
concurrency_groups_))
232+
: std::unique_ptr<SchedulingQueue>(
233+
std::make_unique<ActorSchedulingQueue>(
234+
task_execution_service_,
235+
waiter_,
236+
task_event_buffer_,
237+
pool_manager_,
238+
RayConfig::instance()
239+
.actor_scheduling_queue_max_reorder_wait_seconds())))
240+
.first;
269241
}
242+
243+
auto accept_callback = make_accept_callback();
244+
it->second->Add(request.sequence_number(),
245+
request.client_processed_up_to(),
246+
std::move(accept_callback),
247+
std::move(cancel_callback),
248+
std::move(send_reply_callback),
249+
std::move(task_spec));
250+
} else {
251+
RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId()
252+
<< " to normal scheduling task queue.";
253+
auto accept_callback = make_accept_callback();
254+
normal_scheduling_queue_->Add(request.sequence_number(),
255+
request.client_processed_up_to(),
256+
std::move(accept_callback),
257+
std::move(cancel_callback),
258+
std::move(send_reply_callback),
259+
std::move(task_spec));
270260
}
271261
}
272262

@@ -315,12 +305,8 @@ void TaskReceiver::SetupActor(bool is_asyncio,
315305
}
316306

317307
void TaskReceiver::Stop() {
318-
{
319-
absl::MutexLock lock(&stop_mu_);
320-
if (stopping_) {
321-
return;
322-
}
323-
stopping_ = true;
308+
if (stopping_.exchange(true)) {
309+
return;
324310
}
325311
for (const auto &[_, scheduling_queue] : actor_scheduling_queues_) {
326312
scheduling_queue->Stop();

src/ray/core_worker/task_execution/task_receiver.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,8 @@ class TaskReceiver {
104104
void SetActorReprName(const std::string &repr_name);
105105

106106
private:
107-
/// Guard for shutdown state.
108-
absl::Mutex stop_mu_;
109107
// True once shutdown begins. Requests to execute new tasks will be rejected.
110-
bool stopping_ ABSL_GUARDED_BY(stop_mu_) = false;
108+
std::atomic<bool> stopping_ = false;
111109
/// Set up the configs for an actor.
112110
/// This should be called once for the actor creation task.
113111
void SetupActor(bool is_asyncio,

0 commit comments

Comments
 (0)