Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mooncake-store/include/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ class Replica {
descriptor_variant);
}

bool is_completed() const noexcept {
return status == ReplicaStatus::COMPLETE;
}

MemoryDescriptor& get_memory_descriptor() {
if (auto* desc =
std::get_if<MemoryDescriptor>(&descriptor_variant)) {
Expand Down
42 changes: 24 additions & 18 deletions mooncake-store/src/client_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,9 @@ ErrorCode Client::TransferRead(const Replica::Descriptor& replica_descriptor,
if (replica_descriptor.is_memory_replica()) {
auto& mem_desc = replica_descriptor.get_memory_descriptor();
total_size = mem_desc.buffer_descriptor.size_;
} else if (replica_descriptor.is_local_disk_replica()) {
auto& local_disk_desc = replica_descriptor.get_local_disk_descriptor();
total_size = local_disk_desc.object_size;
} else {
auto& disk_desc = replica_descriptor.get_disk_descriptor();
total_size = disk_desc.object_size;
Expand Down Expand Up @@ -2939,30 +2942,33 @@ tl::expected<Replica::Descriptor, ErrorCode> Client::GetPreferredReplica(
if (replica_list.empty()) {
return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
}
if (mounted_segments_.empty() || replica_list.size() == 1) {
return replica_list[0];
}

std::unordered_set<std::string> local_endpoints;
{
std::lock_guard<std::mutex> lock(mounted_segments_mutex_);
for (const auto& [segment_id, segment] : mounted_segments_) {
local_endpoints.insert(segment.te_endpoint);
}
}
const Replica::Descriptor* remote_memory = nullptr;
const Replica::Descriptor* local_disk = nullptr;
const Replica::Descriptor* global_disk = nullptr;

for (const auto& rep : replica_list) {
if (!rep.is_completed()) {
continue;
}
if (rep.is_memory_replica()) {
const auto& mem_desc = rep.get_memory_descriptor();
const std::string& endpoint =
mem_desc.buffer_descriptor.transport_endpoint_;
if (local_endpoints.count(endpoint)) {
return rep;
}
// P0: Local memory is the best choice
if (IsReplicaOnLocalMemory(rep)) return rep;
// P1: Record the first remote memory found
if (!remote_memory) remote_memory = &rep;
} else if (rep.is_local_disk_replica()) {
// P2: Record the first local disk (SSD) found
if (!local_disk) local_disk = &rep;
} else if (rep.is_disk_replica()) {
// P3: Record the first global disk found
if (!global_disk) global_disk = &rep;
}
}

return replica_list[0];
if (remote_memory) return *remote_memory;
if (local_disk) return *local_disk;
if (global_disk) return *global_disk;

return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY);
}

size_t Client::GetLocalHotCacheSizeFromEnv() {
Expand Down
Loading
Loading