Skip to content

Commit e4854b1

Browse files
committed
Revert "[gRPC] Migrate raylet client implementation to grpc (#5120)"
This reverts commit 40395ac.
1 parent 00beb50 commit e4854b1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1157
-1699
lines changed

BUILD.bazel

+18-49
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ proto_library(
5252

5353
cc_proto_library(
5454
name = "node_manager_cc_proto",
55-
deps = [":node_manager_proto"],
55+
deps = ["node_manager_proto"],
5656
)
5757

5858
proto_library(
@@ -62,21 +62,7 @@ proto_library(
6262

6363
cc_proto_library(
6464
name = "object_manager_cc_proto",
65-
deps = [":object_manager_proto"],
66-
)
67-
68-
proto_library(
69-
name = "raylet_proto",
70-
srcs = ["src/ray/protobuf/raylet.proto"],
71-
deps = [
72-
":common_proto",
73-
":gcs_proto",
74-
],
75-
)
76-
77-
cc_proto_library(
78-
name = "raylet_cc_proto",
79-
deps = [":raylet_proto"],
65+
deps = ["object_manager_proto"],
8066
)
8167

8268
proto_library(
@@ -105,7 +91,7 @@ cc_proto_library(
10591

10692
# === Begin of rpc definitions ===
10793

108-
# GRPC common lib.
94+
# grpc common lib
10995
cc_library(
11096
name = "grpc_common_lib",
11197
srcs = glob([
@@ -155,7 +141,7 @@ cc_grpc_library(
155141
deps = [":object_manager_cc_proto"],
156142
)
157143

158-
# Object manager rpc server and client.
144+
# Object manager server and client.
159145
cc_library(
160146
name = "object_manager_rpc",
161147
hdrs = glob([
@@ -171,43 +157,15 @@ cc_library(
171157
],
172158
)
173159

174-
# Raylet gRPC lib.
175-
cc_grpc_library(
176-
name = "raylet_cc_grpc",
177-
srcs = [":raylet_proto"],
178-
grpc_only = True,
179-
deps = [":raylet_cc_proto"],
180-
)
181-
182-
# Raylet rpc server and client.
183-
cc_library(
184-
name = "raylet_rpc",
185-
srcs = glob([
186-
"src/ray/rpc/raylet/*.cc",
187-
]),
188-
hdrs = glob([
189-
"src/ray/rpc/raylet/*.h",
190-
"src/ray/raylet/*.h",
191-
]),
192-
copts = COPTS,
193-
deps = [
194-
":grpc_common_lib",
195-
":ray_common",
196-
":raylet_cc_grpc",
197-
"@boost//:asio",
198-
"@com_github_grpc_grpc//:grpc++",
199-
],
200-
)
201-
202-
# Worker gRPC lib.
160+
# worker gRPC lib.
203161
cc_grpc_library(
204162
name = "worker_cc_grpc",
205163
srcs = [":worker_proto"],
206164
grpc_only = True,
207165
deps = [":worker_cc_proto"],
208166
)
209167

210-
# Worker server and client.
168+
# worker server and client.
211169
cc_library(
212170
name = "worker_rpc",
213171
hdrs = glob([
@@ -243,6 +201,7 @@ cc_library(
243201
copts = COPTS,
244202
deps = [
245203
":common_cc_proto",
204+
":node_manager_fbs",
246205
":ray_util",
247206
"@boost//:asio",
248207
"@com_github_grpc_grpc//:grpc++",
@@ -341,11 +300,11 @@ cc_library(
341300
deps = [
342301
":common_cc_proto",
343302
":gcs",
303+
":node_manager_fbs",
344304
":node_manager_rpc",
345305
":object_manager",
346306
":ray_common",
347307
":ray_util",
348-
":raylet_rpc",
349308
":stats_lib",
350309
":worker_rpc",
351310
"@boost//:asio",
@@ -416,6 +375,7 @@ cc_test(
416375
srcs = ["src/ray/raylet/lineage_cache_test.cc"],
417376
copts = COPTS,
418377
deps = [
378+
":node_manager_fbs",
419379
":raylet_lib",
420380
"@com_google_googletest//:gtest_main",
421381
],
@@ -426,6 +386,7 @@ cc_test(
426386
srcs = ["src/ray/raylet/reconstruction_policy_test.cc"],
427387
copts = COPTS,
428388
deps = [
389+
":node_manager_fbs",
429390
":object_manager",
430391
":raylet_lib",
431392
"@com_google_googletest//:gtest_main",
@@ -615,6 +576,7 @@ cc_library(
615576
deps = [
616577
":gcs_cc_proto",
617578
":hiredis",
579+
":node_manager_fbs",
618580
":node_manager_rpc",
619581
":ray_common",
620582
":ray_util",
@@ -670,6 +632,13 @@ flatbuffer_cc_library(
670632
out_prefix = "src/ray/common/",
671633
)
672634

635+
flatbuffer_cc_library(
636+
name = "node_manager_fbs",
637+
srcs = ["src/ray/raylet/format/node_manager.fbs"],
638+
flatc_args = FLATC_ARGS,
639+
out_prefix = "src/ray/raylet/format/",
640+
)
641+
673642
flatbuffer_cc_library(
674643
name = "object_manager_fbs",
675644
srcs = ["src/ray/object_manager/format/object_manager.fbs"],

java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public class RayletClientImpl implements RayletClient {
3737
private long client = 0;
3838

3939
// TODO(qwang): JobId parameter can be removed once we embed jobId in driverId.
40-
public RayletClientImpl(String schedulerSockName, UniqueId workerId,
40+
public RayletClientImpl(String schedulerSockName, UniqueId clientId,
4141
boolean isWorker, JobId jobId) {
42-
client = nativeInit(schedulerSockName, workerId.getBytes(),
42+
client = nativeInit(schedulerSockName, clientId.getBytes(),
4343
isWorker, jobId.getBytes());
4444
}
4545

python/ray/__init__.py

-7
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,6 @@
55
import os
66
import sys
77

8-
# MUST import ray._raylet before pyarrow to initialize some global variables.
9-
# It seems the library related to memory allocation in pyarrow will destroy the
10-
# initialization of grpc if we import pyarrow at first.
11-
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more
12-
# details.
13-
import ray._raylet
14-
158
if "pyarrow" in sys.modules:
169
raise ImportError("Ray must be imported before pyarrow because Ray "
1710
"requires a specific version of pyarrow (which is "

python/ray/_raylet.pyx

+3-3
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,14 @@ cdef class RayletClient:
220220
cdef unique_ptr[CRayletClient] client
221221

222222
def __cinit__(self, raylet_socket,
223-
WorkerID worker_id,
223+
ClientID client_id,
224224
c_bool is_worker,
225225
JobID job_id):
226226
# We know that we are using Python, so just skip the language
227227
# parameter.
228228
# TODO(suquark): Should we allow unicode chars in "raylet_socket"?
229229
self.client.reset(new CRayletClient(
230-
raylet_socket.encode("ascii"), worker_id.native(), is_worker,
230+
raylet_socket.encode("ascii"), client_id.native(), is_worker,
231231
job_id.native(), LANGUAGE_PYTHON))
232232

233233
def disconnect(self):
@@ -374,7 +374,7 @@ cdef class RayletClient:
374374

375375
@property
376376
def client_id(self):
377-
return ClientID(self.client.get().GetWorkerId().Binary())
377+
return ClientID(self.client.get().GetClientID().Binary())
378378

379379
@property
380380
def job_id(self):

python/ray/includes/common.pxd

+5-5
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,16 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
8888

8989

9090
cdef extern from "ray/protobuf/common.pb.h" nogil:
91-
cdef cppclass CLanguage "ray::rpc::Language":
91+
cdef cppclass CLanguage "Language":
9292
pass
9393

9494

9595
# This is a workaround for C++ enum class since Cython has no corresponding
9696
# representation.
97-
cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc::Language" nogil:
98-
cdef CLanguage LANGUAGE_PYTHON "ray::rpc::Language::PYTHON"
99-
cdef CLanguage LANGUAGE_CPP "ray::rpc::Language::CPP"
100-
cdef CLanguage LANGUAGE_JAVA "ray::rpc::Language::JAVA"
97+
cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
98+
cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON"
99+
cdef CLanguage LANGUAGE_CPP "Language::CPP"
100+
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
101101

102102

103103
cdef extern from "ray/common/task/scheduling_resources.h" \

python/ray/includes/libraylet.pxd

+7-7
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ from ray.includes.task cimport CTaskSpec
2323

2424

2525
cdef extern from "ray/protobuf/gcs.pb.h" nogil:
26-
cdef cppclass GCSProfileEvent "ray::rpc::ProfileTableData::ProfileEvent":
26+
cdef cppclass GCSProfileEvent "ProfileTableData::ProfileEvent":
2727
void set_event_type(const c_string &value)
2828
void set_start_time(double value)
2929
void set_end_time(double value)
3030
c_string set_extra_data(const c_string &value)
3131
GCSProfileEvent()
3232

33-
cdef cppclass GCSProfileTableData "ray::rpc::ProfileTableData":
33+
cdef cppclass GCSProfileTableData "ProfileTableData":
3434
void set_component_type(const c_string &value)
3535
void set_component_id(const c_string &value)
3636
void set_node_ip_address(const c_string &value)
@@ -43,12 +43,13 @@ ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
4343
ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair
4444

4545

46-
cdef extern from "ray/rpc/raylet/raylet_client.h" namespace "ray::rpc" nogil:
47-
cdef cppclass CRayletClient "ray::rpc::RayletClient":
46+
cdef extern from "ray/raylet/raylet_client.h" nogil:
47+
cdef cppclass CRayletClient "RayletClient":
4848
CRayletClient(const c_string &raylet_socket,
49-
const CWorkerID &worker_id,
49+
const CClientID &client_id,
5050
c_bool is_worker, const CJobID &job_id,
5151
const CLanguage &language)
52+
CRayStatus Disconnect()
5253
CRayStatus SubmitTask(const CTaskSpec &task_spec)
5354
CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec)
5455
CRayStatus TaskDone()
@@ -72,8 +73,7 @@ cdef extern from "ray/rpc/raylet/raylet_client.h" namespace "ray::rpc" nogil:
7273
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
7374
CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id)
7475
CLanguage GetLanguage() const
75-
CWorkerID GetWorkerId() const
76+
CClientID GetClientID() const
7677
CJobID GetJobID() const
7778
c_bool IsWorker() const
78-
CRayStatus Disconnect()
7979
const ResourceMappingType &GetResourceIDs() const

python/ray/tests/test_multi_node.py

+8-12
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,11 @@ def g():
458458
# Make sure the first driver ran to completion.
459459
assert "success" in out
460460

461+
nonexistent_id_bytes = _random_string()
462+
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
461463
# Define a driver that creates one task that depends on a nonexistent
462464
# object. This task will be queued as waiting to execute.
463-
driver_script_template = """
465+
driver_script = """
464466
import time
465467
import ray
466468
ray.init(redis_address="{}")
@@ -470,24 +472,22 @@ def g(x):
470472
g.remote(ray.ObjectID(ray.utils.hex_to_binary("{}")))
471473
time.sleep(1)
472474
print("success")
473-
"""
475+
""".format(redis_address, nonexistent_id_hex)
474476

475477
# Create some drivers and let them exit and make sure everything is
476478
# still alive.
477479
for _ in range(3):
478-
nonexistent_id_bytes = _random_string()
479-
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
480-
driver_script = driver_script_template.format(redis_address,
481-
nonexistent_id_hex)
482480
out = run_string_as_driver(driver_script)
483481
# Simulate the nonexistent dependency becoming available.
484482
ray.worker.global_worker.put_object(
485483
ray.ObjectID(nonexistent_id_bytes), None)
486484
# Make sure the first driver ran to completion.
487485
assert "success" in out
488486

487+
nonexistent_id_bytes = _random_string()
488+
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
489489
# Define a driver that calls `ray.wait` on a nonexistent object.
490-
driver_script_template = """
490+
driver_script = """
491491
import time
492492
import ray
493493
ray.init(redis_address="{}")
@@ -497,15 +497,11 @@ def g():
497497
g.remote()
498498
time.sleep(1)
499499
print("success")
500-
"""
500+
""".format(redis_address, nonexistent_id_hex)
501501

502502
# Create some drivers and let them exit and make sure everything is
503503
# still alive.
504504
for _ in range(3):
505-
nonexistent_id_bytes = _random_string()
506-
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
507-
driver_script = driver_script_template.format(redis_address,
508-
nonexistent_id_hex)
509505
out = run_string_as_driver(driver_script)
510506
# Simulate the nonexistent dependency becoming available.
511507
ray.worker.global_worker.put_object(

python/ray/worker.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from ray import (
4242
ActorHandleID,
4343
ActorID,
44+
ClientID,
4445
WorkerID,
4546
JobID,
4647
ObjectID,
@@ -1946,7 +1947,7 @@ def connect(node,
19461947

19471948
worker.raylet_client = ray._raylet.RayletClient(
19481949
node.raylet_socket_name,
1949-
WorkerID(worker.worker_id),
1950+
ClientID(worker.worker_id),
19501951
(mode == WORKER_MODE),
19511952
worker.current_job_id,
19521953
)

src/ray/common/grpc_util.h

+1-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <google/protobuf/map.h>
55
#include <google/protobuf/repeated_field.h>
66
#include <grpcpp/grpcpp.h>
7+
78
#include "status.h"
89

910
namespace ray {
@@ -72,17 +73,6 @@ inline std::vector<T> VectorFromProtobuf(
7273
return std::vector<T>(pb_repeated.begin(), pb_repeated.end());
7374
}
7475

75-
template <typename Message>
76-
using AddFunction = void (Message::*)(const ::std::string &value);
77-
/// Add a vector of type ID to protobuf message.
78-
template <typename ID, typename Message>
79-
inline void IdVectorToProtobuf(const std::vector<ID> &ids, Message &message,
80-
AddFunction<Message> add_func) {
81-
for (const auto &id : ids) {
82-
(message.*add_func)(id.Binary());
83-
}
84-
}
85-
8676
/// Converts a Protobuf `RepeatedField` to a vector of IDs.
8777
template <class ID>
8878
inline std::vector<ID> IdVectorFromProtobuf(

src/ray/common/ray_config_def.h

+1-8
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000)
2020
/// warning is logged that the handler is taking too long.
2121
RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100)
2222

23-
/// The duration between heartbeats. This value is used for both worker and raylet.
23+
/// The duration between heartbeats. These are sent by the raylet.
2424
RAY_CONFIG(int64_t, heartbeat_timeout_milliseconds, 100)
25-
/// Worker heartbeats also use `heartbeat_timeout_milliseconds` as timer timeout period.
26-
/// If a worker has not sent a heartbeat in the last `num_worker_heartbeats_timeout`
27-
/// heartbeat intervals, raylet will mark this worker as dead.
28-
RAY_CONFIG(int64_t, num_worker_heartbeats_timeout, 30)
2925
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
3026
/// heartbeat intervals, the raylet monitor process will report
3127
/// it as dead to the db_client table.
@@ -156,9 +152,6 @@ RAY_CONFIG(uint32_t, num_actor_checkpoints_to_keep, 20)
156152
/// Maximum number of ids in one batch to send to GCS to delete keys.
157153
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)
158154

159-
/// Number of times for a raylet client to retry to register.
160-
RAY_CONFIG(int, num_raylet_client_retry_times, 25)
161-
162155
/// When getting objects from object store, print a warning every this number of attempts.
163156
RAY_CONFIG(uint32_t, object_store_get_warn_per_num_attempts, 50)
164157

0 commit comments

Comments
 (0)