Skip to content

Commit 535c8da

Browse files
committed
Revert "[ServerGlobalCallbacks] Stabilize experiment (grpc#40041)"
This reverts commit 5e3ea81.
1 parent 8c0be34 commit 535c8da

File tree

1 file changed

+58
-16
lines changed

1 file changed

+58
-16
lines changed

src/cpp/server/server_cc.cc

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,14 @@ Server::GlobalCallbacks* g_raw_callbacks = nullptr;
109109
gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
110110

111111
void InitGlobalCallbacks() {
112-
if (!g_raw_callbacks) {
113-
g_raw_callbacks = new DefaultGlobalCallbacks;
112+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
113+
if (!g_raw_callbacks) {
114+
g_raw_callbacks = new DefaultGlobalCallbacks;
115+
}
116+
} else {
117+
if (!g_callbacks) {
118+
g_callbacks = std::make_shared<DefaultGlobalCallbacks>();
119+
}
114120
}
115121
}
116122

@@ -416,7 +422,8 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
416422
return true;
417423
}
418424

419-
void Run(bool resources) {
425+
void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
426+
bool resources) {
420427
ctx_.Init(deadline_, &request_metadata_);
421428
wrapped_call_.Init(
422429
call_, server_, &cq_, server_->max_receive_message_size(),
@@ -426,6 +433,10 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
426433
server_->server_metric_recorder());
427434
ctx_->ctx.cq_ = &cq_;
428435
request_metadata_.count = 0;
436+
437+
if (!grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
438+
global_callbacks_ = global_callbacks;
439+
}
429440
resources_ = resources;
430441

431442
interceptor_methods_.SetCall(&*wrapped_call_);
@@ -461,13 +472,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
461472

462473
void ContinueRunAfterInterception() {
463474
ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
464-
g_raw_callbacks->PreSynchronousRequest(&ctx_->ctx);
475+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
476+
g_raw_callbacks->PreSynchronousRequest(&ctx_->ctx);
477+
} else {
478+
global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
479+
}
465480
auto* handler = resources_ ? method_->handler()
466481
: server_->resource_exhausted_handler_.get();
467482
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
468483
&*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
469484
nullptr, nullptr));
470-
g_raw_callbacks->PostSynchronousRequest(&ctx_->ctx);
485+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
486+
g_raw_callbacks->PostSynchronousRequest(&ctx_->ctx);
487+
} else {
488+
global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
489+
}
471490

472491
cq_.Shutdown();
473492

@@ -522,6 +541,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
522541
grpc_byte_buffer* request_payload_ = nullptr;
523542
grpc::CompletionQueue cq_;
524543
grpc::Status request_status_;
544+
std::shared_ptr<GlobalCallbacks> global_callbacks_;
525545
bool resources_;
526546
void* deserialized_request_ = nullptr;
527547
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
@@ -784,12 +804,14 @@ const char* Server::CallbackRequest<
784804
class Server::SyncRequestThreadManager : public grpc::ThreadManager {
785805
public:
786806
SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
807+
std::shared_ptr<GlobalCallbacks> global_callbacks,
787808
grpc_resource_quota* rq, int min_pollers,
788809
int max_pollers, int cq_timeout_msec)
789810
: ThreadManager("SyncServer", rq, min_pollers, max_pollers),
790811
server_(server),
791812
server_cq_(server_cq),
792-
cq_timeout_msec_(cq_timeout_msec) {}
813+
cq_timeout_msec_(cq_timeout_msec),
814+
global_callbacks_(std::move(global_callbacks)) {}
793815

794816
WorkStatus PollForWork(void** tag, bool* ok) override {
795817
*tag = nullptr;
@@ -820,7 +842,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
820842
DCHECK_NE(sync_req, nullptr);
821843
DCHECK(ok);
822844

823-
sync_req->Run(resources);
845+
sync_req->Run(global_callbacks_, resources);
824846
}
825847

826848
void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
@@ -876,6 +898,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
876898
int cq_timeout_msec_;
877899
bool has_sync_method_ = false;
878900
std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
901+
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
879902
};
880903

881904
Server::Server(
@@ -903,7 +926,12 @@ Server::Server(
903926
health_check_service_disabled_(false),
904927
server_metric_recorder_(server_metric_recorder) {
905928
gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
906-
g_raw_callbacks->UpdateArguments(args);
929+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
930+
g_raw_callbacks->UpdateArguments(args);
931+
} else {
932+
global_callbacks_ = grpc::g_callbacks;
933+
global_callbacks_->UpdateArguments(args);
934+
}
907935

908936
if (sync_server_cqs_ != nullptr) {
909937
bool default_rq_created = false;
@@ -915,9 +943,9 @@ Server::Server(
915943
}
916944

917945
for (const auto& it : *sync_server_cqs_) {
918-
sync_req_mgrs_.emplace_back(
919-
new SyncRequestThreadManager(this, it.get(), server_rq, min_pollers,
920-
max_pollers, sync_cq_timeout_msec));
946+
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
947+
this, it.get(), global_callbacks_, server_rq, min_pollers,
948+
max_pollers, sync_cq_timeout_msec));
921949
}
922950

923951
if (default_rq_created) {
@@ -988,9 +1016,15 @@ Server::~Server() {
9881016
}
9891017

9901018
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
991-
CHECK(!g_raw_callbacks);
992-
CHECK(callbacks);
993-
g_raw_callbacks = callbacks;
1019+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
1020+
CHECK(!g_raw_callbacks);
1021+
CHECK(callbacks);
1022+
g_raw_callbacks = callbacks;
1023+
} else {
1024+
CHECK(!g_callbacks);
1025+
CHECK(callbacks);
1026+
g_callbacks.reset(callbacks);
1027+
}
9941028
}
9951029

9961030
grpc_server* Server::c_server() { return server_; }
@@ -1117,7 +1151,11 @@ int Server::AddListeningPort(const std::string& addr,
11171151
grpc::ServerCredentials* creds) {
11181152
CHECK(!started_);
11191153
int port = creds->AddPortToServer(addr, server_);
1120-
g_raw_callbacks->AddPort(this, addr, creds, port);
1154+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
1155+
g_raw_callbacks->AddPort(this, addr, creds, port);
1156+
} else {
1157+
global_callbacks_->AddPort(this, addr, creds, port);
1158+
}
11211159
return port;
11221160
}
11231161

@@ -1150,7 +1188,11 @@ void Server::UnrefAndWaitLocked() {
11501188

11511189
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
11521190
CHECK(!started_);
1153-
g_raw_callbacks->PreServerStart(this);
1191+
if (grpc_core::IsServerGlobalCallbacksOwnershipEnabled()) {
1192+
g_raw_callbacks->PreServerStart(this);
1193+
} else {
1194+
g_callbacks->PreServerStart(this);
1195+
}
11541196
started_ = true;
11551197

11561198
// Only create default health check service when user did not provide an

0 commit comments

Comments
 (0)