From 736e4890cf06bf414ec322c4e2ae654d936bbe70 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 1 May 2025 10:30:08 -0700 Subject: [PATCH 1/5] update obj lcoation resilience Signed-off-by: dayshah --- src/ray/object_manager/BUILD.bazel | 1 + .../ownership_object_directory.cc | 66 +++++++++++-------- .../ownership_object_directory.h | 29 ++++---- .../test/ownership_object_directory_test.cc | 5 +- src/ray/protobuf/core_worker.proto | 1 + src/ray/raylet/node_manager.cc | 2 - .../raylet/test/local_object_manager_test.cc | 1 - 7 files changed, 56 insertions(+), 49 deletions(-) diff --git a/src/ray/object_manager/BUILD.bazel b/src/ray/object_manager/BUILD.bazel index 619e208c9df4f..3a7f1ae28961c 100644 --- a/src/ray/object_manager/BUILD.bazel +++ b/src/ray/object_manager/BUILD.bazel @@ -69,6 +69,7 @@ ray_cc_library( "//src/ray/common:id", "//src/ray/gcs/gcs_client:gcs_client_lib", "//src/ray/pubsub:subscriber_lib", + "//src/ray/raylet_client:raylet_client_lib", "@com_google_absl//absl/container:flat_hash_map", ], ) diff --git a/src/ray/object_manager/ownership_object_directory.cc b/src/ray/object_manager/ownership_object_directory.cc index 0641a8a58525c..99c688093a133 100644 --- a/src/ray/object_manager/ownership_object_directory.cc +++ b/src/ray/object_manager/ownership_object_directory.cc @@ -20,6 +20,7 @@ #include #include +#include "ray/raylet_client/raylet_client.h" #include "ray/stats/metric_defs.h" namespace ray { @@ -29,20 +30,19 @@ OwnershipBasedObjectDirectory::OwnershipBasedObjectDirectory( std::shared_ptr &gcs_client, pubsub::SubscriberInterface *object_location_subscriber, rpc::CoreWorkerClientPool *owner_client_pool, - int64_t max_object_report_batch_size, std::function mark_as_failed) : io_service_(io_service), gcs_client_(gcs_client), client_call_manager_(io_service), object_location_subscriber_(object_location_subscriber), owner_client_pool_(owner_client_pool), - kMaxObjectReportBatchSize(max_object_report_batch_size), - mark_as_failed_(mark_as_failed) {} + kMaxObjectReportBatchSize(RayConfig::instance().max_object_report_batch_size()), + mark_as_failed_(std::move(mark_as_failed)) {} namespace { /// Filter out the removed nodes from the object locations. -void FilterRemovedNodes(std::shared_ptr gcs_client, +void FilterRemovedNodes(const std::shared_ptr &gcs_client, std::unordered_set *node_ids) { for (auto it = node_ids->begin(); it != node_ids->end();) { if (gcs_client->Nodes().IsRemoved(*it)) { @@ -55,7 +55,7 @@ void FilterRemovedNodes(std::shared_ptr gcs_client, /// Update object location data based on response from the owning core worker. bool UpdateObjectLocations(const rpc::WorkerObjectLocationsPubMessage &location_info, - std::shared_ptr gcs_client, + const std::shared_ptr &gcs_client, std::unordered_set *node_ids, std::string *spilled_url, NodeID *spilled_node_id, @@ -249,28 +249,38 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded( owner_client->UpdateObjectLocationBatch( request, [this, worker_id, node_id, owner_address]( - Status status, const rpc::UpdateObjectLocationBatchReply &reply) { - auto in_flight_request_it = in_flight_requests_.find(worker_id); - RAY_CHECK(in_flight_request_it != in_flight_requests_.end()); - in_flight_requests_.erase(in_flight_request_it); - - // TODO(sang): Handle network failures. + const Status &status, const rpc::UpdateObjectLocationBatchReply &reply) { if (!status.ok()) { - // Currently we consider the owner is dead if the network is failed. - // Clean up the metadata. No need to mark objects as failed because - // that's only needed for the object pulling path (and this RPC is not on - // pulling path). - RAY_LOG(DEBUG).WithField(worker_id).WithField(node_id) - << "Owner failed to update locations for node. The owner is most likely " - "dead. Status: " - << status.ToString(); - auto it = location_buffers_.find(worker_id); - if (it != location_buffers_.end()) { - location_buffers_.erase(it); + const auto *node_info = gcs_client_->Nodes().Get(node_id, true); + if (node_info == nullptr) { + RAY_LOG(INFO) + << "UpdateObjectLocationBatch failed to node already marked dead. Node: " + << node_id; + location_buffers_.erase(worker_id); + owner_client_pool_->Disconnect(worker_id); + return; } - owner_client_pool_->Disconnect(worker_id); - return; + raylet::RayletClient node_manager_client( + rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(), + node_info->node_manager_port(), + client_call_manager_)); + node_manager_client.IsLocalWorkerDead( + worker_id, + [this, node_id, worker_id](const Status &status, + rpc::IsLocalWorkerDeadReply &&reply) { + if (status.ok() && reply.is_dead()) { + RAY_LOG(INFO) + << "UpdateObjectLocationBatch failed to worker already marked " + "dead. Node: " + << node_id << ", worker: " << worker_id; + location_buffers_.erase(worker_id); + owner_client_pool_->Disconnect(worker_id); + } + }); } + auto in_flight_request_it = in_flight_requests_.find(worker_id); + RAY_CHECK(in_flight_request_it != in_flight_requests_.end()); + in_flight_requests_.erase(in_flight_request_it); SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address); }); @@ -303,8 +313,8 @@ void OwnershipBasedObjectDirectory::ObjectLocationSubscriptionCallback( &it->second.pending_creation, &it->second.object_size); - // If the lookup has failed, that means the object is lost. Trigger the callback in this - // case to handle failure properly. + // If the lookup has failed, that means the object is lost. Trigger the callback in + // this case to handle failure properly. if (location_updated || location_lookup_failed) { RAY_LOG(DEBUG).WithField(object_id) << "Pushing location updates to subscribers for object: " @@ -462,7 +472,7 @@ ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations( void OwnershipBasedObjectDirectory::LookupRemoteConnectionInfo( RemoteConnectionInfo &connection_info) const { auto node_info = gcs_client_->Nodes().Get(connection_info.node_id); - if (node_info) { + if (node_info != nullptr) { NodeID result_node_id = NodeID::FromBinary(node_info->node_id()); RAY_CHECK(result_node_id == connection_info.node_id); connection_info.ip = node_info->node_manager_address(); @@ -486,7 +496,7 @@ OwnershipBasedObjectDirectory::LookupAllRemoteConnections() const { void OwnershipBasedObjectDirectory::HandleNodeRemoved(const NodeID &node_id) { for (auto &[object_id, listener] : listeners_) { - bool updated = listener.current_object_locations.erase(node_id); + bool updated = listener.current_object_locations.erase(node_id) > 0; if (listener.spilled_node_id == node_id) { listener.spilled_node_id = NodeID::Nil(); listener.spilled_url = ""; diff --git a/src/ray/object_manager/ownership_object_directory.h b/src/ray/object_manager/ownership_object_directory.h index 870be8b5fc054..59a80fcf1ba40 100644 --- a/src/ray/object_manager/ownership_object_directory.h +++ b/src/ray/object_manager/ownership_object_directory.h @@ -46,11 +46,8 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { std::shared_ptr &gcs_client, pubsub::SubscriberInterface *object_location_subscriber, rpc::CoreWorkerClientPool *owner_client_pool, - int64_t max_object_report_batch_size, std::function mark_as_failed); - virtual ~OwnershipBasedObjectDirectory() {} - void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override; std::vector LookupAllRemoteConnections() const override; @@ -97,7 +94,7 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { /// The current set of known locations of this object. std::unordered_set current_object_locations; /// The location where this object has been spilled, if any. - std::string spilled_url = ""; + std::string spilled_url; // The node id that spills the object to the disk. // It will be Nil if it uses a distributed external storage. NodeID spilled_node_id = NodeID::Nil(); @@ -127,7 +124,7 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { /// Client pool to owners. rpc::CoreWorkerClientPool *owner_client_pool_; /// The max batch size for ReportObjectAdded and ReportObjectRemoved. - const int64_t kMaxObjectReportBatchSize; + int64_t kMaxObjectReportBatchSize; /// The callback used to mark an object as failed. std::function mark_as_failed_; @@ -165,22 +162,22 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { /// Metrics /// Number of object locations added to this object directory. - uint64_t metrics_num_object_locations_added_; - double metrics_num_object_locations_added_per_second_; + uint64_t metrics_num_object_locations_added_ = 0; + double metrics_num_object_locations_added_per_second_ = 0; - /// Number of object locations removed from this object directory. - uint64_t metrics_num_object_locations_removed_; - double metrics_num_object_locations_removed_per_second_; + /// Number of object locations removed from this object directory. = 0; + uint64_t metrics_num_object_locations_removed_ = 0; + double metrics_num_object_locations_removed_per_second_ = 0; - /// Number of object location lookups. - uint64_t metrics_num_object_location_lookups_; - double metrics_num_object_location_lookups_per_second_; + /// Number of object location lookups. = 0; + uint64_t metrics_num_object_location_lookups_ = 0; + double metrics_num_object_location_lookups_per_second_ = 0; /// Number of object location updates. - uint64_t metrics_num_object_location_updates_; - double metrics_num_object_location_updates_per_second_; + uint64_t metrics_num_object_location_updates_ = 0; + double metrics_num_object_location_updates_per_second_ = 0; - uint64_t cum_metrics_num_object_location_updates_; + uint64_t cum_metrics_num_object_location_updates_ = 0; friend class OwnershipBasedObjectDirectoryTest; }; diff --git a/src/ray/object_manager/test/ownership_object_directory_test.cc b/src/ray/object_manager/test/ownership_object_directory_test.cc index a511db7fb28af..6faa379cd78d8 100644 --- a/src/ray/object_manager/test/ownership_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_object_directory_test.cc @@ -130,10 +130,11 @@ class OwnershipBasedObjectDirectoryTest : public ::testing::Test { gcs_client_mock_, subscriber_.get(), &client_pool, - /*max_object_report_batch_size=*/20, [this](const ObjectID &object_id, const rpc::ErrorType &error_type) { MarkAsFailed(object_id, error_type); - }) {} + }) { + obod_.kMaxObjectReportBatchSize = 20; + } void TearDown() { owner_client->Reset(); } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 6946ab0672b3b..eec8c3ab4ed39 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -481,6 +481,7 @@ service CoreWorkerService { rpc PubsubCommandBatch(PubsubCommandBatchRequest) returns (PubsubCommandBatchReply); // Update the batched object location information to the ownership-based object // directory. + // Failure: Retry rpc UpdateObjectLocationBatch(UpdateObjectLocationBatchRequest) returns (UpdateObjectLocationBatchReply); // Get object locations from the ownership-based object directory. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4bb5b95ea651c..3047e5454c9e6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -179,8 +179,6 @@ NodeManager::NodeManager( gcs_client_, core_worker_subscriber_.get(), /*owner_client_pool=*/&worker_rpc_pool_, - /*max_object_report_batch_size=*/ - RayConfig::instance().max_object_report_batch_size(), [this](const ObjectID &obj_id, const ErrorType &error_type) { rpc::ObjectReference ref; ref.set_object_id(obj_id.Binary()); diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 9e7b5e486a48e..61a577a681893 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -336,7 +336,6 @@ class LocalObjectManagerTestWithMinSpillingSize { gcs_client_, subscriber_.get(), &client_pool, - /*max_object_report_batch_size=*/20000, [](const ObjectID &object_id, const rpc::ErrorType &error_type) {})), manager( manager_node_id_, From dcb3f1cba72a002e768008360a67c49d5888db9f Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 1 May 2025 16:25:48 -0700 Subject: [PATCH 2/5] use retryable grpc client Signed-off-by: dayshah --- BUILD.bazel | 2 ++ src/ray/core_worker/core_worker.cc | 32 +++-------------- src/ray/object_manager/BUILD.bazel | 1 - .../ownership_object_directory.cc | 30 ++-------------- src/ray/protobuf/core_worker.proto | 2 +- src/ray/raylet/node_manager.cc | 8 +++-- src/ray/rpc/worker/core_worker_client.cc | 6 ++-- src/ray/rpc/worker/core_worker_client.h | 11 +++--- src/ray/rpc/worker/core_worker_client_pool.cc | 36 +++++++++++++++++++ src/ray/rpc/worker/core_worker_client_pool.h | 9 +++++ 10 files changed, 70 insertions(+), 67 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 1760705429e9c..d0108af67dfc2 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -347,6 +347,8 @@ ray_cc_library( "//src/ray/common:asio", "//src/ray/common:id", "@com_github_grpc_grpc//:grpc++", + "//src/ray/gcs/gcs_client:gcs_client_lib", + "//src/ray/raylet_client:raylet_client_lib", ], ) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 879d6002d4b1e..359ceaa638e82 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -557,33 +557,11 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id) return std::make_shared( addr, *client_call_manager_, - /*core_worker_unavailable_timeout_callback=*/[this, addr]() { - const NodeID node_id = NodeID::FromBinary(addr.raylet_id()); - const WorkerID worker_id = WorkerID::FromBinary(addr.worker_id()); - const rpc::GcsNodeInfo *node_info = - gcs_client_->Nodes().Get(node_id, /*filter_dead_nodes=*/false); - if (node_info != nullptr && node_info->state() == rpc::GcsNodeInfo::DEAD) { - RAY_LOG(INFO).WithField(worker_id).WithField(node_id) - << "Disconnect core worker client since its node is dead"; - core_worker_client_pool_->Disconnect(worker_id); - return; - } - - raylet::RayletClient raylet_client( - rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(), - node_info->node_manager_port(), - *client_call_manager_)); - raylet_client.IsLocalWorkerDead( - worker_id, - [this, worker_id](const Status &status, - rpc::IsLocalWorkerDeadReply &&reply) { - if (status.ok() && reply.is_dead()) { - RAY_LOG(INFO).WithField(worker_id) - << "Disconnect core worker client since it is dead"; - core_worker_client_pool_->Disconnect(worker_id); - } - }); - }); + rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback( + gcs_client_.get(), + core_worker_client_pool_.get(), + client_call_manager_.get(), + addr)); }); object_info_publisher_ = std::make_unique( diff --git a/src/ray/object_manager/BUILD.bazel b/src/ray/object_manager/BUILD.bazel index 3a7f1ae28961c..619e208c9df4f 100644 --- a/src/ray/object_manager/BUILD.bazel +++ b/src/ray/object_manager/BUILD.bazel @@ -69,7 +69,6 @@ ray_cc_library( "//src/ray/common:id", "//src/ray/gcs/gcs_client:gcs_client_lib", "//src/ray/pubsub:subscriber_lib", - "//src/ray/raylet_client:raylet_client_lib", "@com_google_absl//absl/container:flat_hash_map", ], ) diff --git a/src/ray/object_manager/ownership_object_directory.cc b/src/ray/object_manager/ownership_object_directory.cc index 99c688093a133..2ceec5228476c 100644 --- a/src/ray/object_manager/ownership_object_directory.cc +++ b/src/ray/object_manager/ownership_object_directory.cc @@ -20,7 +20,6 @@ #include #include -#include "ray/raylet_client/raylet_client.h" #include "ray/stats/metric_defs.h" namespace ray { @@ -251,32 +250,9 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded( [this, worker_id, node_id, owner_address]( const Status &status, const rpc::UpdateObjectLocationBatchReply &reply) { if (!status.ok()) { - const auto *node_info = gcs_client_->Nodes().Get(node_id, true); - if (node_info == nullptr) { - RAY_LOG(INFO) - << "UpdateObjectLocationBatch failed to node already marked dead. Node: " - << node_id; - location_buffers_.erase(worker_id); - owner_client_pool_->Disconnect(worker_id); - return; - } - raylet::RayletClient node_manager_client( - rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(), - node_info->node_manager_port(), - client_call_manager_)); - node_manager_client.IsLocalWorkerDead( - worker_id, - [this, node_id, worker_id](const Status &status, - rpc::IsLocalWorkerDeadReply &&reply) { - if (status.ok() && reply.is_dead()) { - RAY_LOG(INFO) - << "UpdateObjectLocationBatch failed to worker already marked " - "dead. Node: " - << node_id << ", worker: " << worker_id; - location_buffers_.erase(worker_id); - owner_client_pool_->Disconnect(worker_id); - } - }); + RAY_LOG(ERROR).WithField(worker_id) + << "Failed to get object location update. This should not happen, this " + "should not happen because this is using the retryable grpc client."; } auto in_flight_request_it = in_flight_requests_.find(worker_id); RAY_CHECK(in_flight_request_it != in_flight_requests_.end()); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index eec8c3ab4ed39..91767cbdd1822 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -481,7 +481,7 @@ service CoreWorkerService { rpc PubsubCommandBatch(PubsubCommandBatchRequest) returns (PubsubCommandBatchReply); // Update the batched object location information to the ownership-based object // directory. - // Failure: Retry + // Failure: Retries, it's idempotent. rpc UpdateObjectLocationBatch(UpdateObjectLocationBatchRequest) returns (UpdateObjectLocationBatchReply); // Get object locations from the ownership-based object directory. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3047e5454c9e6..57e2d0cebbbbc 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -157,9 +157,11 @@ NodeManager::NodeManager( config.enable_resource_isolation), client_call_manager_(io_service), worker_rpc_pool_([this](const rpc::Address &addr) { - return std::make_shared(addr, client_call_manager_, []() { - RAY_LOG(FATAL) << "Raylet doesn't call any retryable core worker grpc methods."; - }); + return std::make_shared( + addr, + client_call_manager_, + rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback( + gcs_client_.get(), &worker_rpc_pool_, &client_call_manager_, addr)); }), core_worker_subscriber_(std::make_unique( self_node_id_, diff --git a/src/ray/rpc/worker/core_worker_client.cc b/src/ray/rpc/worker/core_worker_client.cc index 56ca2cb02e78e..8b7899e1511ae 100644 --- a/src/ray/rpc/worker/core_worker_client.cc +++ b/src/ray/rpc/worker/core_worker_client.cc @@ -22,10 +22,10 @@ namespace ray { namespace rpc { CoreWorkerClient::CoreWorkerClient( - const rpc::Address &address, + rpc::Address address, ClientCallManager &client_call_manager, std::function core_worker_unavailable_timeout_callback) - : addr_(address) { + : addr_(std::move(address)) { grpc_client_ = std::make_shared>( addr_.ip_address(), addr_.port(), client_call_manager); @@ -39,7 +39,7 @@ CoreWorkerClient::CoreWorkerClient( /*server_unavailable_timeout_seconds=*/ ::RayConfig::instance().core_worker_rpc_server_reconnect_timeout_s(), /*server_unavailable_timeout_callback=*/ - core_worker_unavailable_timeout_callback, + std::move(core_worker_unavailable_timeout_callback), /*server_name=*/"Core worker " + addr_.ip_address()); } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index ab9def207c497..6be68af420b57 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -259,11 +259,12 @@ class CoreWorkerClient : public std::enable_shared_from_this, /*method_timeout_ms*/ -1, override) - VOID_RPC_CLIENT_METHOD(CoreWorkerService, - UpdateObjectLocationBatch, - grpc_client_, - /*method_timeout_ms*/ -1, - override) + VOID_RETRYABLE_RPC_CLIENT_METHOD(retryable_grpc_client_, + CoreWorkerService, + UpdateObjectLocationBatch, + grpc_client_, + /*method_timeout_ms*/ -1, + override) VOID_RPC_CLIENT_METHOD(CoreWorkerService, GetObjectLocationsOwner, diff --git a/src/ray/rpc/worker/core_worker_client_pool.cc b/src/ray/rpc/worker/core_worker_client_pool.cc index 6a0ea069f35e2..1acbd0884e24c 100644 --- a/src/ray/rpc/worker/core_worker_client_pool.cc +++ b/src/ray/rpc/worker/core_worker_client_pool.cc @@ -16,9 +16,45 @@ #include +#include "ray/raylet_client/raylet_client.h" + namespace ray { namespace rpc { +std::function CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback( + gcs::GcsClient *gcs_client, + rpc::CoreWorkerClientPool *worker_client_pool, + rpc::ClientCallManager *client_call_manager, + const rpc::Address &addr) { + return [addr, gcs_client, worker_client_pool, client_call_manager]() { + const NodeID node_id = NodeID::FromBinary(addr.raylet_id()); + const WorkerID worker_id = WorkerID::FromBinary(addr.worker_id()); + const rpc::GcsNodeInfo *node_info = + gcs_client->Nodes().Get(node_id, /*filter_dead_nodes=*/false); + if (node_info != nullptr && node_info->state() == rpc::GcsNodeInfo::DEAD) { + RAY_LOG(INFO).WithField(worker_id).WithField(node_id) + << "Disconnect core worker client since its node is dead"; + worker_client_pool->Disconnect(worker_id); + return; + } + + raylet::RayletClient raylet_client( + rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(), + node_info->node_manager_port(), + *client_call_manager)); + raylet_client.IsLocalWorkerDead( + worker_id, + [worker_client_pool, worker_id](const Status &status, + rpc::IsLocalWorkerDeadReply &&reply) { + if (status.ok() && reply.is_dead()) { + RAY_LOG(INFO).WithField(worker_id) + << "Disconnect core worker client since it is dead"; + worker_client_pool->Disconnect(worker_id); + } + }); + }; +} + std::shared_ptr CoreWorkerClientPool::GetOrConnect( const Address &addr_proto) { RAY_CHECK_NE(addr_proto.worker_id(), ""); diff --git a/src/ray/rpc/worker/core_worker_client_pool.h b/src/ray/rpc/worker/core_worker_client_pool.h index b737ebc638440..e2fcff438318b 100644 --- a/src/ray/rpc/worker/core_worker_client_pool.h +++ b/src/ray/rpc/worker/core_worker_client_pool.h @@ -22,6 +22,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" #include "ray/common/id.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/rpc/worker/core_worker_client.h" namespace ray { @@ -35,6 +36,14 @@ class CoreWorkerClientPool { explicit CoreWorkerClientPool(CoreWorkerClientFactoryFn client_factory) : core_worker_client_factory_(std::move(client_factory)){}; + /// Default unavailable_timeout_callback for retryable rpc's used by client factories on + /// core worker and node manager. + static std::function GetDefaultUnavailableTimeoutCallback( + gcs::GcsClient *gcs_client, + rpc::CoreWorkerClientPool *worker_client_pool, + rpc::ClientCallManager *client_call_manager, + const rpc::Address &addr); + /// Returns an open CoreWorkerClientInterface if one exists, and connect to one /// if it does not. The returned pointer is borrowed, and expected to be used /// briefly. From e2259f520c47d463a0f03d03c75e4579aee0968f Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 1 May 2025 16:30:11 -0700 Subject: [PATCH 3/5] fix message Signed-off-by: dayshah --- src/ray/object_manager/ownership_object_directory.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/ownership_object_directory.cc b/src/ray/object_manager/ownership_object_directory.cc index 2ceec5228476c..2c0c76d0e175f 100644 --- a/src/ray/object_manager/ownership_object_directory.cc +++ b/src/ray/object_manager/ownership_object_directory.cc @@ -251,8 +251,8 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded( const Status &status, const rpc::UpdateObjectLocationBatchReply &reply) { if (!status.ok()) { RAY_LOG(ERROR).WithField(worker_id) - << "Failed to get object location update. This should not happen, this " - "should not happen because this is using the retryable grpc client."; + << "Failed to get object location update. This should not happen because " + "this rpc is using the retryable grpc client."; } auto in_flight_request_it = in_flight_requests_.find(worker_id); RAY_CHECK(in_flight_request_it != in_flight_requests_.end()); From 7f4a6cfe3f604a0dd229b237bcaa7c07d5e19bb6 Mon Sep 17 00:00:00 2001 From: dayshah Date: Sun, 4 May 2025 23:46:28 -0400 Subject: [PATCH 4/5] fix build Signed-off-by: dayshah --- src/ray/rpc/worker/core_worker_client.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 6be68af420b57..6436618766b0d 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -200,7 +200,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, /// /// \param[in] address Address of the worker server. /// \param[in] client_call_manager The `ClientCallManager` used for managing requests. - CoreWorkerClient(const rpc::Address &address, + CoreWorkerClient(rpc::Address address, ClientCallManager &client_call_manager, std::function core_worker_unavailable_timeout_callback); From d921cb28d51ab35298cf8252b8ca81b4fbdf0d79 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 5 May 2025 02:58:04 -0400 Subject: [PATCH 5/5] fix cpp test Signed-off-by: dayshah --- .../test/ownership_object_directory_test.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/test/ownership_object_directory_test.cc b/src/ray/object_manager/test/ownership_object_directory_test.cc index 6faa379cd78d8..f748b561eb5a4 100644 --- a/src/ray/object_manager/test/ownership_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_object_directory_test.cc @@ -472,8 +472,12 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestOwnerFailed) { // The second batch is replied, but failed. ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch(ray::Status::Invalid(""))); - // Requests are not sent anymore. - ASSERT_EQ(NumBatchRequestSent(), 2); + // Retry is sent and replied to + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + // One extra request is sent. + ASSERT_EQ(NumBatchRequestSent(), 4); + // Make sure metadata is cleaned up properly. AssertNoLeak(); }