Skip to content

[core] UpdateObjectLocationBatch RPC Failure Handling #52723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ ray_cc_library(
"//src/ray/common:asio",
"//src/ray/common:id",
"@com_github_grpc_grpc//:grpc++",
"//src/ray/gcs/gcs_client:gcs_client_lib",
"//src/ray/raylet_client:raylet_client_lib",
],
)

Expand Down
32 changes: 5 additions & 27 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,33 +557,11 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
return std::make_shared<rpc::CoreWorkerClient>(
addr,
*client_call_manager_,
/*core_worker_unavailable_timeout_callback=*/[this, addr]() {
const NodeID node_id = NodeID::FromBinary(addr.raylet_id());
const WorkerID worker_id = WorkerID::FromBinary(addr.worker_id());
const rpc::GcsNodeInfo *node_info =
gcs_client_->Nodes().Get(node_id, /*filter_dead_nodes=*/false);
if (node_info != nullptr && node_info->state() == rpc::GcsNodeInfo::DEAD) {
RAY_LOG(INFO).WithField(worker_id).WithField(node_id)
<< "Disconnect core worker client since its node is dead";
core_worker_client_pool_->Disconnect(worker_id);
return;
}

raylet::RayletClient raylet_client(
rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(),
node_info->node_manager_port(),
*client_call_manager_));
raylet_client.IsLocalWorkerDead(
worker_id,
[this, worker_id](const Status &status,
rpc::IsLocalWorkerDeadReply &&reply) {
if (status.ok() && reply.is_dead()) {
RAY_LOG(INFO).WithField(worker_id)
<< "Disconnect core worker client since it is dead";
core_worker_client_pool_->Disconnect(worker_id);
}
});
});
rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback(
gcs_client_.get(),
core_worker_client_pool_.get(),
client_call_manager_.get(),
addr));
});

object_info_publisher_ = std::make_unique<pubsub::Publisher>(
Expand Down
42 changes: 14 additions & 28 deletions src/ray/object_manager/ownership_object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,19 @@ OwnershipBasedObjectDirectory::OwnershipBasedObjectDirectory(
std::shared_ptr<gcs::GcsClient> &gcs_client,
pubsub::SubscriberInterface *object_location_subscriber,
rpc::CoreWorkerClientPool *owner_client_pool,
int64_t max_object_report_batch_size,
std::function<void(const ObjectID &, const rpc::ErrorType &)> mark_as_failed)
: io_service_(io_service),
gcs_client_(gcs_client),
client_call_manager_(io_service),
object_location_subscriber_(object_location_subscriber),
owner_client_pool_(owner_client_pool),
kMaxObjectReportBatchSize(max_object_report_batch_size),
mark_as_failed_(mark_as_failed) {}
kMaxObjectReportBatchSize(RayConfig::instance().max_object_report_batch_size()),
mark_as_failed_(std::move(mark_as_failed)) {}

namespace {

/// Filter out the removed nodes from the object locations.
void FilterRemovedNodes(std::shared_ptr<gcs::GcsClient> gcs_client,
void FilterRemovedNodes(const std::shared_ptr<gcs::GcsClient> &gcs_client,
std::unordered_set<NodeID> *node_ids) {
for (auto it = node_ids->begin(); it != node_ids->end();) {
if (gcs_client->Nodes().IsRemoved(*it)) {
Expand All @@ -55,7 +54,7 @@ void FilterRemovedNodes(std::shared_ptr<gcs::GcsClient> gcs_client,

/// Update object location data based on response from the owning core worker.
bool UpdateObjectLocations(const rpc::WorkerObjectLocationsPubMessage &location_info,
std::shared_ptr<gcs::GcsClient> gcs_client,
const std::shared_ptr<gcs::GcsClient> &gcs_client,
std::unordered_set<NodeID> *node_ids,
std::string *spilled_url,
NodeID *spilled_node_id,
Expand Down Expand Up @@ -249,29 +248,16 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded(
owner_client->UpdateObjectLocationBatch(
request,
[this, worker_id, node_id, owner_address](
Status status, const rpc::UpdateObjectLocationBatchReply &reply) {
const Status &status, const rpc::UpdateObjectLocationBatchReply &reply) {
if (!status.ok()) {
RAY_LOG(ERROR).WithField(worker_id)
<< "Failed to get object location update. This should not happen, this "
"should not happen because this is using the retryable grpc client.";
}
auto in_flight_request_it = in_flight_requests_.find(worker_id);
RAY_CHECK(in_flight_request_it != in_flight_requests_.end());
in_flight_requests_.erase(in_flight_request_it);

// TODO(sang): Handle network failures.
if (!status.ok()) {
// Currently we consider the owner is dead if the network is failed.
// Clean up the metadata. No need to mark objects as failed because
// that's only needed for the object pulling path (and this RPC is not on
// pulling path).
RAY_LOG(DEBUG).WithField(worker_id).WithField(node_id)
<< "Owner failed to update locations for node. The owner is most likely "
"dead. Status: "
<< status.ToString();
auto it = location_buffers_.find(worker_id);
if (it != location_buffers_.end()) {
location_buffers_.erase(it);
}
owner_client_pool_->Disconnect(worker_id);
return;
}

SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address);
});
}
Expand Down Expand Up @@ -303,8 +289,8 @@ void OwnershipBasedObjectDirectory::ObjectLocationSubscriptionCallback(
&it->second.pending_creation,
&it->second.object_size);

// If the lookup has failed, that means the object is lost. Trigger the callback in this
// case to handle failure properly.
// If the lookup has failed, that means the object is lost. Trigger the callback in
// this case to handle failure properly.
if (location_updated || location_lookup_failed) {
RAY_LOG(DEBUG).WithField(object_id)
<< "Pushing location updates to subscribers for object: "
Expand Down Expand Up @@ -462,7 +448,7 @@ ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations(
void OwnershipBasedObjectDirectory::LookupRemoteConnectionInfo(
RemoteConnectionInfo &connection_info) const {
auto node_info = gcs_client_->Nodes().Get(connection_info.node_id);
if (node_info) {
if (node_info != nullptr) {
NodeID result_node_id = NodeID::FromBinary(node_info->node_id());
RAY_CHECK(result_node_id == connection_info.node_id);
connection_info.ip = node_info->node_manager_address();
Expand All @@ -486,7 +472,7 @@ OwnershipBasedObjectDirectory::LookupAllRemoteConnections() const {

void OwnershipBasedObjectDirectory::HandleNodeRemoved(const NodeID &node_id) {
for (auto &[object_id, listener] : listeners_) {
bool updated = listener.current_object_locations.erase(node_id);
bool updated = listener.current_object_locations.erase(node_id) > 0;
if (listener.spilled_node_id == node_id) {
listener.spilled_node_id = NodeID::Nil();
listener.spilled_url = "";
Expand Down
29 changes: 13 additions & 16 deletions src/ray/object_manager/ownership_object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory {
std::shared_ptr<gcs::GcsClient> &gcs_client,
pubsub::SubscriberInterface *object_location_subscriber,
rpc::CoreWorkerClientPool *owner_client_pool,
int64_t max_object_report_batch_size,
std::function<void(const ObjectID &, const rpc::ErrorType &)> mark_as_failed);

virtual ~OwnershipBasedObjectDirectory() {}

void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override;

std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const override;
Expand Down Expand Up @@ -97,7 +94,7 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory {
/// The current set of known locations of this object.
std::unordered_set<NodeID> current_object_locations;
/// The location where this object has been spilled, if any.
std::string spilled_url = "";
std::string spilled_url;
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
NodeID spilled_node_id = NodeID::Nil();
Expand Down Expand Up @@ -127,7 +124,7 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory {
/// Client pool to owners.
rpc::CoreWorkerClientPool *owner_client_pool_;
/// The max batch size for ReportObjectAdded and ReportObjectRemoved.
const int64_t kMaxObjectReportBatchSize;
int64_t kMaxObjectReportBatchSize;
/// The callback used to mark an object as failed.
std::function<void(const ObjectID &, const rpc::ErrorType &)> mark_as_failed_;

Expand Down Expand Up @@ -165,22 +162,22 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory {
/// Metrics

/// Number of object locations added to this object directory.
uint64_t metrics_num_object_locations_added_;
double metrics_num_object_locations_added_per_second_;
uint64_t metrics_num_object_locations_added_ = 0;
double metrics_num_object_locations_added_per_second_ = 0;

/// Number of object locations removed from this object directory.
uint64_t metrics_num_object_locations_removed_;
double metrics_num_object_locations_removed_per_second_;
/// Number of object locations removed from this object directory. = 0;
uint64_t metrics_num_object_locations_removed_ = 0;
double metrics_num_object_locations_removed_per_second_ = 0;

/// Number of object location lookups.
uint64_t metrics_num_object_location_lookups_;
double metrics_num_object_location_lookups_per_second_;
/// Number of object location lookups. = 0;
uint64_t metrics_num_object_location_lookups_ = 0;
double metrics_num_object_location_lookups_per_second_ = 0;

/// Number of object location updates.
uint64_t metrics_num_object_location_updates_;
double metrics_num_object_location_updates_per_second_;
uint64_t metrics_num_object_location_updates_ = 0;
double metrics_num_object_location_updates_per_second_ = 0;

uint64_t cum_metrics_num_object_location_updates_;
uint64_t cum_metrics_num_object_location_updates_ = 0;

friend class OwnershipBasedObjectDirectoryTest;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ class OwnershipBasedObjectDirectoryTest : public ::testing::Test {
gcs_client_mock_,
subscriber_.get(),
&client_pool,
/*max_object_report_batch_size=*/20,
[this](const ObjectID &object_id, const rpc::ErrorType &error_type) {
MarkAsFailed(object_id, error_type);
}) {}
}) {
obod_.kMaxObjectReportBatchSize = 20;
}

void TearDown() { owner_client->Reset(); }

Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ service CoreWorkerService {
rpc PubsubCommandBatch(PubsubCommandBatchRequest) returns (PubsubCommandBatchReply);
// Update the batched object location information to the ownership-based object
// directory.
// Failure: Retries, it's idempotent.
rpc UpdateObjectLocationBatch(UpdateObjectLocationBatchRequest)
returns (UpdateObjectLocationBatchReply);
// Get object locations from the ownership-based object directory.
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ NodeManager::NodeManager(
config.enable_resource_isolation),
client_call_manager_(io_service),
worker_rpc_pool_([this](const rpc::Address &addr) {
return std::make_shared<rpc::CoreWorkerClient>(addr, client_call_manager_, []() {
RAY_LOG(FATAL) << "Raylet doesn't call any retryable core worker grpc methods.";
});
return std::make_shared<rpc::CoreWorkerClient>(
addr,
client_call_manager_,
rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback(
gcs_client_.get(), &worker_rpc_pool_, &client_call_manager_, addr));
}),
core_worker_subscriber_(std::make_unique<pubsub::Subscriber>(
self_node_id_,
Expand All @@ -179,8 +181,6 @@ NodeManager::NodeManager(
gcs_client_,
core_worker_subscriber_.get(),
/*owner_client_pool=*/&worker_rpc_pool_,
/*max_object_report_batch_size=*/
RayConfig::instance().max_object_report_batch_size(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it just annoyed me that I had to move one level up for this so i moved it one level down

[this](const ObjectID &obj_id, const ErrorType &error_type) {
rpc::ObjectReference ref;
ref.set_object_id(obj_id.Binary());
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ class LocalObjectManagerTestWithMinSpillingSize {
gcs_client_,
subscriber_.get(),
&client_pool,
/*max_object_report_batch_size=*/20000,
[](const ObjectID &object_id, const rpc::ErrorType &error_type) {})),
manager(
manager_node_id_,
Expand Down
6 changes: 3 additions & 3 deletions src/ray/rpc/worker/core_worker_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ namespace ray {
namespace rpc {

CoreWorkerClient::CoreWorkerClient(
const rpc::Address &address,
rpc::Address address,
ClientCallManager &client_call_manager,
std::function<void()> core_worker_unavailable_timeout_callback)
: addr_(address) {
: addr_(std::move(address)) {
grpc_client_ = std::make_shared<GrpcClient<CoreWorkerService>>(
addr_.ip_address(), addr_.port(), client_call_manager);

Expand All @@ -39,7 +39,7 @@ CoreWorkerClient::CoreWorkerClient(
/*server_unavailable_timeout_seconds=*/
::RayConfig::instance().core_worker_rpc_server_reconnect_timeout_s(),
/*server_unavailable_timeout_callback=*/
core_worker_unavailable_timeout_callback,
std::move(core_worker_unavailable_timeout_callback),
/*server_name=*/"Core worker " + addr_.ip_address());
}

Expand Down
11 changes: 6 additions & 5 deletions src/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
/*method_timeout_ms*/ -1,
override)

VOID_RPC_CLIENT_METHOD(CoreWorkerService,
UpdateObjectLocationBatch,
grpc_client_,
/*method_timeout_ms*/ -1,
override)
VOID_RETRYABLE_RPC_CLIENT_METHOD(retryable_grpc_client_,
CoreWorkerService,
UpdateObjectLocationBatch,
grpc_client_,
/*method_timeout_ms*/ -1,
override)

VOID_RPC_CLIENT_METHOD(CoreWorkerService,
GetObjectLocationsOwner,
Expand Down
36 changes: 36 additions & 0 deletions src/ray/rpc/worker/core_worker_client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,45 @@

#include <memory>

#include "ray/raylet_client/raylet_client.h"

namespace ray {
namespace rpc {

std::function<void()> CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback(
gcs::GcsClient *gcs_client,
rpc::CoreWorkerClientPool *worker_client_pool,
rpc::ClientCallManager *client_call_manager,
const rpc::Address &addr) {
return [addr, gcs_client, worker_client_pool, client_call_manager]() {
const NodeID node_id = NodeID::FromBinary(addr.raylet_id());
const WorkerID worker_id = WorkerID::FromBinary(addr.worker_id());
const rpc::GcsNodeInfo *node_info =
gcs_client->Nodes().Get(node_id, /*filter_dead_nodes=*/false);
if (node_info != nullptr && node_info->state() == rpc::GcsNodeInfo::DEAD) {
RAY_LOG(INFO).WithField(worker_id).WithField(node_id)
<< "Disconnect core worker client since its node is dead";
worker_client_pool->Disconnect(worker_id);
return;
}

raylet::RayletClient raylet_client(
rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(),
node_info->node_manager_port(),
*client_call_manager));
raylet_client.IsLocalWorkerDead(
worker_id,
[worker_client_pool, worker_id](const Status &status,
rpc::IsLocalWorkerDeadReply &&reply) {
if (status.ok() && reply.is_dead()) {
RAY_LOG(INFO).WithField(worker_id)
<< "Disconnect core worker client since it is dead";
worker_client_pool->Disconnect(worker_id);
}
});
};
}

std::shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
const Address &addr_proto) {
RAY_CHECK_NE(addr_proto.worker_id(), "");
Expand Down
9 changes: 9 additions & 0 deletions src/ray/rpc/worker/core_worker_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/rpc/worker/core_worker_client.h"

namespace ray {
Expand All @@ -35,6 +36,14 @@ class CoreWorkerClientPool {
explicit CoreWorkerClientPool(CoreWorkerClientFactoryFn client_factory)
: core_worker_client_factory_(std::move(client_factory)){};

/// Default unavailable_timeout_callback for retryable rpc's used by client factories on
/// core worker and node manager.
static std::function<void()> GetDefaultUnavailableTimeoutCallback(
gcs::GcsClient *gcs_client,
rpc::CoreWorkerClientPool *worker_client_pool,
rpc::ClientCallManager *client_call_manager,
const rpc::Address &addr);

/// Returns an open CoreWorkerClientInterface if one exists, and connect to one
/// if it does not. The returned pointer is borrowed, and expected to be used
/// briefly.
Expand Down