Skip to content

Commit 1c389fc

Browse files
committed
Fix worker exit cleanup (#6450)
* working but ugly * comments * proper but hanging in grpc server destructor * grpc server shutdown deadline * fix disconnect * lint * shutdown_only in test * replace shutdown
1 parent 90f5f9a commit 1c389fc

File tree

8 files changed

+40
-46
lines changed

8 files changed

+40
-46
lines changed

python/ray/_raylet.pyx

+3-6
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,8 @@ cdef execute_task(
668668
# If we've reached the max number of executions for this worker, exit.
669669
task_counter = manager.get_task_counter(job_id, function_descriptor)
670670
if task_counter == execution_info.max_calls:
671+
# Intentionally disconnect so the raylet doesn't print an error.
672+
# TODO(edoakes): we should handle max_calls in the core worker.
671673
worker.core_worker.disconnect()
672674
sys.exit(0)
673675

@@ -715,11 +717,6 @@ cdef CRayStatus check_signals() nogil:
715717
return CRayStatus.OK()
716718

717719

718-
cdef void exit_handler() nogil:
719-
with gil:
720-
sys.exit(0)
721-
722-
723720
cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
724721
cdef shared_ptr[CBuffer] empty_metadata
725722
if c_str.size() == 0:
@@ -769,7 +766,7 @@ cdef class CoreWorker:
769766
raylet_socket.encode("ascii"), job_id.native(),
770767
gcs_options.native()[0], log_dir.encode("utf-8"),
771768
node_ip_address.encode("utf-8"), node_manager_port,
772-
task_execution_handler, check_signals, exit_handler, True))
769+
task_execution_handler, check_signals, True))
773770

774771
def disconnect(self):
775772
self.destory_event_loop_if_exists()

python/ray/includes/libcoreworker.pxd

-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
7575
const c_vector[CObjectID] &return_ids,
7676
c_vector[shared_ptr[CRayObject]] *returns) nogil,
7777
CRayStatus() nogil,
78-
void () nogil,
7978
c_bool ref_counting_enabled)
8079
void Disconnect()
8180
CWorkerType &GetWorkerType()

python/ray/tests/test_tempfile.py

+5-8
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,12 @@ def test_conn_cluster():
3737
"temp_dir must not be provided.")
3838

3939

40-
def test_tempdir():
40+
def test_tempdir(shutdown_only):
4141
shutil.rmtree("/tmp/ray", ignore_errors=True)
4242
ray.init(temp_dir="/tmp/i_am_a_temp_dir")
4343
assert os.path.exists(
4444
"/tmp/i_am_a_temp_dir"), "Specified temp dir not found."
4545
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
46-
ray.shutdown()
4746
shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True)
4847

4948

@@ -57,7 +56,7 @@ def test_tempdir_commandline():
5756
shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True)
5857

5958

60-
def test_raylet_socket_name():
59+
def test_raylet_socket_name(shutdown_only):
6160
ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket")
6261
assert os.path.exists(
6362
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
@@ -77,7 +76,7 @@ def test_raylet_socket_name():
7776
pass # It could have been removed by Ray.
7877

7978

80-
def test_temp_plasma_store_socket():
79+
def test_temp_plasma_store_socket(shutdown_only):
8180
ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket")
8281
assert os.path.exists(
8382
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
@@ -97,7 +96,7 @@ def test_temp_plasma_store_socket():
9796
pass # It could have been removed by Ray.
9897

9998

100-
def test_raylet_tempfiles():
99+
def test_raylet_tempfiles(shutdown_only):
101100
ray.init(num_cpus=0)
102101
node = ray.worker._global_node
103102
top_levels = set(os.listdir(node.get_session_dir_path()))
@@ -132,15 +131,13 @@ def test_raylet_tempfiles():
132131

133132
socket_files = set(os.listdir(node.get_sockets_dir_path()))
134133
assert socket_files == {"plasma_store", "raylet"}
135-
ray.shutdown()
136134

137135

138-
def test_tempdir_privilege():
136+
def test_tempdir_privilege(shutdown_only):
139137
os.chmod("/tmp/ray", 0o000)
140138
ray.init(num_cpus=1)
141139
session_dir = ray.worker._global_node.get_session_dir_path()
142140
assert os.path.exists(session_dir), "Specified socket path not found."
143-
ray.shutdown()
144141

145142

146143
def test_session_dir_uniqueness():

python/ray/worker.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ def sigterm_handler(signum, frame):
431431

432432
signal.signal(signal.SIGTERM, sigterm_handler)
433433
self.core_worker.run_task_loop()
434+
sys.exit(0)
434435

435436

436437
def get_gpu_ids():
@@ -834,6 +835,12 @@ def shutdown(exiting_interpreter=False):
834835

835836
disconnect(exiting_interpreter)
836837

838+
# We need to destruct the core worker here because after this function,
839+
# we will tear down any processes spawned by ray.init() and the background
840+
# IO thread in the core worker doesn't currently handle that gracefully.
841+
if hasattr(global_worker, "core_worker"):
842+
del global_worker.core_worker
843+
837844
# Disconnect global state from GCS.
838845
ray.state.state.disconnect()
839846

@@ -843,7 +850,7 @@ def shutdown(exiting_interpreter=False):
843850
_global_node.kill_all_processes(check_alive=False, allow_graceful=True)
844851
_global_node = None
845852

846-
# TODO(rkn): Instead of manually reseting some of the worker fields, we
853+
# TODO(rkn): Instead of manually resetting some of the worker fields, we
847854
# should simply set "global_worker" to equal "None" or something like that.
848855
global_worker.set_mode(None)
849856
global_worker._post_get_hooks = []
@@ -1333,12 +1340,6 @@ def disconnect(exiting_interpreter=False):
13331340
worker.cached_functions_to_run = []
13341341
worker.serialization_context_map.clear()
13351342

1336-
# We need to destruct the core worker here because after this function,
1337-
# we will tear down any processes spawned by ray.init() and the background
1338-
# threads in the core worker don't currently handle that gracefully.
1339-
if hasattr(worker, "core_worker"):
1340-
del worker.core_worker
1341-
13421343

13431344
@contextmanager
13441345
def _changeproctitle(title, next_title):

src/ray/core_worker/core_worker.cc

+14-16
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
6666
const std::string &log_dir, const std::string &node_ip_address,
6767
int node_manager_port,
6868
const TaskExecutionCallback &task_execution_callback,
69-
std::function<Status()> check_signals,
70-
const std::function<void()> exit_handler,
71-
bool ref_counting_enabled)
69+
std::function<Status()> check_signals, bool ref_counting_enabled)
7270
: worker_type_(worker_type),
7371
language_(language),
7472
log_dir_(log_dir),
@@ -106,12 +104,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
106104
RAY_CHECK(task_execution_callback_ != nullptr);
107105
auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
108106
std::placeholders::_2, std::placeholders::_3);
107+
auto exit = std::bind(&CoreWorker::Shutdown, this);
109108
raylet_task_receiver_ =
110109
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
111-
worker_context_.GetWorkerID(), raylet_client_, execute_task, exit_handler));
110+
worker_context_.GetWorkerID(), raylet_client_, execute_task, exit));
112111
direct_task_receiver_ =
113112
std::unique_ptr<CoreWorkerDirectTaskReceiver>(new CoreWorkerDirectTaskReceiver(
114-
worker_context_, task_execution_service_, execute_task, exit_handler));
113+
worker_context_, task_execution_service_, execute_task, exit));
115114
}
116115

117116
// Start RPC server after all the task receivers are properly initialized.
@@ -212,26 +211,25 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
212211
}
213212

214213
CoreWorker::~CoreWorker() {
215-
Shutdown();
214+
io_service_.stop();
216215
io_thread_.join();
216+
if (log_dir_ != "") {
217+
RayLog::ShutDownRayLog();
218+
}
217219
}
218220

219221
void CoreWorker::Shutdown() {
220-
if (!shutdown_) {
221-
shutdown_ = true;
222-
io_service_.stop();
223-
if (worker_type_ == WorkerType::WORKER) {
224-
task_execution_service_.stop();
225-
}
226-
if (log_dir_ != "") {
227-
RayLog::ShutDownRayLog();
228-
}
222+
io_service_.stop();
223+
if (worker_type_ == WorkerType::WORKER) {
224+
task_execution_service_.stop();
229225
}
230226
}
231227

232228
void CoreWorker::Disconnect() {
233229
io_service_.stop();
234-
gcs_client_->Disconnect();
230+
if (gcs_client_) {
231+
gcs_client_->Disconnect();
232+
}
235233
if (raylet_client_) {
236234
RAY_IGNORE_EXPR(raylet_client_->Disconnect());
237235
}

src/ray/core_worker/core_worker.h

-3
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ class CoreWorker {
6262
/// \param[in] check_signals Language worker function to check for signals and handle
6363
/// them. If the function returns anything but StatusOK, any long-running
6464
/// operations in the core worker will short circuit and return that status.
65-
/// \param[in] exit_handler Language worker function to orderly shutdown the worker.
66-
/// We guarantee this will be run on the main thread of the worker.
6765
/// \param[in] ref_counting_enabled Whether to enable object ref counting.
6866
///
6967
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
@@ -73,7 +71,6 @@ class CoreWorker {
7371
const std::string &log_dir, const std::string &node_ip_address,
7472
int node_manager_port, const TaskExecutionCallback &task_execution_callback,
7573
std::function<Status()> check_signals = nullptr,
76-
std::function<void()> exit_handler = nullptr,
7774
bool ref_counting_enabled = false);
7875

7976
~CoreWorker();

src/ray/core_worker/transport/direct_actor_transport.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,10 @@ class CoreWorkerDirectTaskReceiver {
424424

425425
~CoreWorkerDirectTaskReceiver() {
426426
fiber_shutdown_event_.Notify();
427-
fiber_runner_thread_.join();
427+
// Only join the fiber thread if it was spawned in the first place.
428+
if (fiber_runner_thread_.joinable()) {
429+
fiber_runner_thread_.join();
430+
}
428431
}
429432

430433
/// Initialize this receiver. This must be called prior to use.

src/ray/rpc/grpc_server.h

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#ifndef RAY_RPC_GRPC_SERVER_H
22
#define RAY_RPC_GRPC_SERVER_H
33

4-
#include <thread>
5-
#include <utility>
6-
74
#include <grpcpp/grpcpp.h>
5+
86
#include <boost/asio.hpp>
7+
#include <thread>
8+
#include <utility>
99

1010
#include "ray/common/status.h"
1111
#include "ray/rpc/server_call.h"
@@ -42,7 +42,9 @@ class GrpcServer {
4242
// Shutdown this server
4343
void Shutdown() {
4444
if (!is_closed_) {
45-
server_->Shutdown();
45+
// Shutdown the server with an immediate deadline.
46+
// TODO(edoakes): do we want to do this in all cases?
47+
server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME));
4648
for (const auto &cq : cqs_) {
4749
cq->Shutdown();
4850
}

0 commit comments

Comments
 (0)