Skip to content

Commit 26a9201

Browse files
authored
GCS client test failure flakiness (ray-project#34656)
Why are these changes needed? Right now the theory is as follow. pubsub io service is created and run inside the GcsServer. That means if pubsub io service is accessed after GCSServer GC'ed, it will segfault. Right now, upon teardown, when we call rpc::DrainAndResetExecutor, this will recreate the Executor thread pool. Upon teardown, If DrainAndResetExecutor -> GcsServer's internal pubsub posts new SendReply to the newly created threadpool -> GcsServer.reset -> pubsub io service GC'ed -> SendReply invoked from the newly created thread pool, it will segfault. NOTE: the segfault is from pubsub service if you see the failure #2 0x7f92034d9129 in ray::rpc::ServerCallImpl<ray::rpc::InternalPubSubGcsServiceHandler, ray::rpc::GcsSubscriberPollRequest, ray::rpc::GcsSubscriberPollReply>::HandleRequestImpl()::'lambda'(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>)::operator()(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>) const::'lambda'()::operator()() const /proc/self/cwd/bazel-out/k8-opt/bin/_virtual_includes/grpc_common_lib/ray/rpc/server_call.h:212:48 As a fix, I only drain the thread pool. And then reset it after all operations are fully cleaned up (only from tests). I think there's no need to reset for regular proc termination like raylet, gcs, core workers. Related issue number Closes ray-project#34344 Signed-off-by: SangBin Cho <rkooo567@gmail.com>
1 parent 6c35629 commit 26a9201

File tree

7 files changed

+18
-10
lines changed

7 files changed

+18
-10
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ void CoreWorker::Exit(
783783
exit_type,
784784
detail = std::move(detail),
785785
creation_task_exception_pb_bytes]() {
786-
rpc::DrainAndResetServerCallExecutor();
786+
rpc::DrainServerCallExecutor();
787787
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
788788
KillChildProcs();
789789
Shutdown();

src/ray/gcs/gcs_client/test/gcs_client_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,14 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
105105
gcs_client_.reset();
106106

107107
server_io_service_->stop();
108-
rpc::DrainAndResetServerCallExecutor();
108+
rpc::DrainServerCallExecutor();
109109
server_io_service_thread_->join();
110110
gcs_server_->Stop();
111111
gcs_server_.reset();
112112
if (!no_redis_) {
113113
TestSetupUtil::FlushAllRedisServers();
114114
}
115+
rpc::ResetServerCallExecutor();
115116
}
116117

117118
void RestartGcsServer() {

src/ray/gcs/gcs_server/gcs_server_main.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ int main(int argc, char *argv[]) {
107107
int signal_number) {
108108
RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down...";
109109
main_service.stop();
110-
ray::rpc::DrainAndResetServerCallExecutor();
110+
ray::rpc::DrainServerCallExecutor();
111111
gcs_server.Stop();
112112
ray::stats::Shutdown();
113113
};

src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,12 @@ class GcsServerTest : public ::testing::Test {
5959

6060
void TearDown() override {
6161
io_service_.stop();
62-
rpc::DrainAndResetServerCallExecutor();
62+
rpc::DrainServerCallExecutor();
6363
gcs_server_->Stop();
6464
thread_io_service_->join();
6565
gcs_server_.reset();
6666
ray::gcs::RedisCallbackManager::instance().Clear();
67+
rpc::ResetServerCallExecutor();
6768
}
6869

6970
bool AddJob(const rpc::AddJobRequest &request) {

src/ray/raylet/node_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2028,7 +2028,7 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
20282028
return;
20292029
}
20302030
auto shutdown_after_reply = []() {
2031-
rpc::DrainAndResetServerCallExecutor();
2031+
rpc::DrainServerCallExecutor();
20322032
// Note that the callback is posted to the io service after the shutdown GRPC request
20332033
// is replied. Otherwise, the RPC might not be replied to GCS before it shutsdown
20342034
// itself. Implementation note: When raylet is shutdown by ray stop, the CLI sends a

src/ray/rpc/server_call.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {
3030

3131
boost::asio::thread_pool &GetServerCallExecutor() { return *_GetServerCallExecutor(); }
3232

33-
void DrainAndResetServerCallExecutor() {
34-
GetServerCallExecutor().join();
33+
void DrainServerCallExecutor() { GetServerCallExecutor().join(); }
34+
35+
void ResetServerCallExecutor() {
3536
_GetServerCallExecutor() = std::make_unique<boost::asio::thread_pool>(
3637
::RayConfig::instance().num_server_call_thread());
3738
}

src/ray/rpc/server_call.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ namespace rpc {
3232
/// This pool is shared across gRPC servers.
3333
boost::asio::thread_pool &GetServerCallExecutor();
3434

35-
/// For testing
36-
/// Drain the executor and reset it.
37-
void DrainAndResetServerCallExecutor();
35+
/// Drain the executor.
36+
void DrainServerCallExecutor();
37+
38+
/// Reset the server call executor.
39+
/// Testing only. After you drain the executor
40+
/// you need to regenerate the executor
41+
/// because they are global.
42+
void ResetServerCallExecutor();
3843

3944
/// Represents the callback function to be called when a `ServiceHandler` finishes
4045
/// handling a request.

0 commit comments

Comments
 (0)