diff --git a/mooncake-store/include/replica.h b/mooncake-store/include/replica.h index dba29c3bbd..0013153400 100644 --- a/mooncake-store/include/replica.h +++ b/mooncake-store/include/replica.h @@ -367,6 +367,10 @@ class Replica { descriptor_variant); } + bool is_completed() const noexcept { + return status == ReplicaStatus::COMPLETE; + } + MemoryDescriptor& get_memory_descriptor() { if (auto* desc = std::get_if(&descriptor_variant)) { diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 16c7dfd029..c0b7732753 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -2619,6 +2619,9 @@ ErrorCode Client::TransferRead(const Replica::Descriptor& replica_descriptor, if (replica_descriptor.is_memory_replica()) { auto& mem_desc = replica_descriptor.get_memory_descriptor(); total_size = mem_desc.buffer_descriptor.size_; + } else if (replica_descriptor.is_local_disk_replica()) { + auto& local_disk_desc = replica_descriptor.get_local_disk_descriptor(); + total_size = local_disk_desc.object_size; } else { auto& disk_desc = replica_descriptor.get_disk_descriptor(); total_size = disk_desc.object_size; @@ -2939,30 +2942,33 @@ tl::expected Client::GetPreferredReplica( if (replica_list.empty()) { return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } - if (mounted_segments_.empty() || replica_list.size() == 1) { - return replica_list[0]; - } - - std::unordered_set local_endpoints; - { - std::lock_guard lock(mounted_segments_mutex_); - for (const auto& [segment_id, segment] : mounted_segments_) { - local_endpoints.insert(segment.te_endpoint); - } - } + const Replica::Descriptor* remote_memory = nullptr; + const Replica::Descriptor* local_disk = nullptr; + const Replica::Descriptor* global_disk = nullptr; for (const auto& rep : replica_list) { + if (!rep.is_completed()) { + continue; + } if (rep.is_memory_replica()) { - const auto& mem_desc = rep.get_memory_descriptor(); - const std::string& endpoint = - mem_desc.buffer_descriptor.transport_endpoint_; - if (local_endpoints.count(endpoint)) { - return rep; - } + // P0: Local memory is the best choice + if (IsReplicaOnLocalMemory(rep)) return rep; + // P1: Record the first remote memory found + if (!remote_memory) remote_memory = &rep; + } else if (rep.is_local_disk_replica()) { + // P2: Record the first local disk (SSD) found + if (!local_disk) local_disk = &rep; + } else if (rep.is_disk_replica()) { + // P3: Record the first global disk found + if (!global_disk) global_disk = &rep; } } - return replica_list[0]; + if (remote_memory) return *remote_memory; + if (local_disk) return *local_disk; + if (global_disk) return *global_disk; + + return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); } size_t Client::GetLocalHotCacheSizeFromEnv() { diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index ae37096165..f298b9ebba 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -1840,11 +1840,23 @@ std::shared_ptr RealClient::get_buffer_internal( allocateSlices(slices, replica, buffer_handle.ptr()); // Get the object data - auto get_result = client_->Get(key, query_result.value(), slices); - if (!get_result) { - LOG(ERROR) << "Get failed for key: " << key - << " with error: " << toString(get_result.error()); - return nullptr; + if (replica.is_local_disk_replica()) { + std::unordered_map slices_map; + slices_map.emplace(key, slices.at(0)); + auto get_result = batch_get_into_offload_object_internal( + replica.get_local_disk_descriptor().transport_endpoint, slices_map); + if (!get_result) { + LOG(ERROR) << "Get failed from LOCAL_DISK for key: " << key + << " with error: " << toString(get_result.error()); + return nullptr; + } + } else { + auto get_result = client_->Get(key, query_result.value(), slices); + if (!get_result) { + LOG(ERROR) << "Get failed for key: " << key + << " with error: " << toString(get_result.error()); + return nullptr; + } } // Create BufferHandle with the allocated memory @@ -2055,10 +2067,12 @@ RealClient::batch_get_buffer_internal( size_t original_index; std::string key; QueryResult query_result; + Replica::Descriptor preferred_replica; std::unique_ptr buffer_handle; std::vector slices; }; std::vector valid_ops; + std::vector valid_local_disk_ops; valid_ops.reserve(keys.size()); for (size_t i = 0; i < keys.size(); ++i) { @@ -2079,7 +2093,14 @@ RealClient::batch_get_buffer_internal( continue; } - const auto &replica = query_result_values.replicas[0]; + const auto &res = + client_->GetPreferredReplica(query_result_values.replicas); + if (!res) { + LOG(ERROR) << "No preferred replica found for key: " << key; + continue; + } + const auto &replica = res.value(); + uint64_t total_size = calculate_total_size(replica); if (total_size == 0) { continue; @@ -2098,43 +2119,87 @@ RealClient::batch_get_buffer_internal( std::vector slices; allocateSlices(slices, replica, buffer_handle->ptr()); - valid_ops.emplace_back( - KeyOp{.original_index = i, - .key = key, - .query_result = std::move(query_result_values), - .buffer_handle = std::move(buffer_handle), - .slices = std::move(slices)}); + if (replica.is_local_disk_replica()) { + valid_local_disk_ops.emplace_back( + KeyOp{.original_index = i, + .key = key, + .query_result = std::move(query_result_values), + .preferred_replica = replica, + .buffer_handle = std::move(buffer_handle), + .slices = std::move(slices)}); + } else { + valid_ops.emplace_back( + KeyOp{.original_index = i, + .key = key, + .query_result = std::move(query_result_values), + .preferred_replica = replica, + .buffer_handle = std::move(buffer_handle), + .slices = std::move(slices)}); + } } - if (valid_ops.empty()) { + if (valid_ops.empty() && valid_local_disk_ops.empty()) { return final_results; } // 3. Execute batch get - std::vector batch_keys; - std::vector batch_query_results; - std::unordered_map> batch_slices; - batch_keys.reserve(valid_ops.size()); - batch_query_results.reserve(valid_ops.size()); + if (!valid_ops.empty()) { + std::vector batch_keys; + std::vector batch_query_results; + std::unordered_map> batch_slices; + batch_keys.reserve(valid_ops.size()); + batch_query_results.reserve(valid_ops.size()); + + for (auto &op : valid_ops) { + batch_keys.push_back(op.key); + batch_query_results.push_back(op.query_result); + batch_slices[op.key] = op.slices; + } - for (auto &op : valid_ops) { - batch_keys.push_back(op.key); - batch_query_results.push_back(op.query_result); - batch_slices[op.key] = op.slices; + auto batch_get_results = + client_->BatchGet(batch_keys, batch_query_results, batch_slices); + + // 4. Process results and create BufferHandles + for (size_t i = 0; i < valid_ops.size(); ++i) { + if (batch_get_results[i]) { + auto &op = valid_ops[i]; + final_results[op.original_index] = + std::make_shared( + std::move(*op.buffer_handle)); + } else { + LOG(ERROR) << "BatchGet failed for key '" << valid_ops[i].key + << "': " << toString(batch_get_results[i].error()); + } + } } - auto batch_get_results = - client_->BatchGet(batch_keys, batch_query_results, batch_slices); + // 5. Execute batch get for local disk + if (!valid_local_disk_ops.empty()) { + std::unordered_map> + offload_objects; + for (const auto &op : valid_local_disk_ops) { + const auto &replica = op.preferred_replica; + auto [it, _] = offload_objects.try_emplace( + replica.get_local_disk_descriptor().transport_endpoint); + it->second.emplace(op.key, op.slices.at(0)); + } - // 4. Process results and create BufferHandles - for (size_t i = 0; i < valid_ops.size(); ++i) { - if (batch_get_results[i]) { - auto &op = valid_ops[i]; - final_results[op.original_index] = - std::make_shared(std::move(*op.buffer_handle)); - } else { - LOG(ERROR) << "BatchGet failed for key '" << valid_ops[i].key - << "': " << toString(batch_get_results[i].error()); + for (auto &offload_objects_it : offload_objects) { + auto batch_get_offload_result = + batch_get_into_offload_object_internal( + offload_objects_it.first, offload_objects_it.second); + if (!batch_get_offload_result) { + LOG(ERROR) << "Batch get store object failed with error: " + << batch_get_offload_result.error(); + } else { + for (auto &op : valid_local_disk_ops) { + if (offload_objects_it.second.count(op.key)) { + final_results[op.original_index] = + std::make_shared( + std::move(*op.buffer_handle)); + } + } + } } } @@ -3177,6 +3242,7 @@ RealClient::batch_get_into_internal(const std::vector &keys, std::string key; size_t original_index; QueryResult query_result; + Replica::Descriptor preferred_replica; std::vector slices; uint64_t total_size; }; @@ -3209,7 +3275,14 @@ RealClient::batch_get_into_internal(const std::vector &keys, } // Calculate required buffer size - const auto &replica = query_result_values.replicas[0]; + const auto &res = + client_->GetPreferredReplica(query_result_values.replicas); + if (!res) { + results[i] = tl::unexpected(ErrorCode::INVALID_REPLICA); + continue; + } + const auto &replica = res.value(); + uint64_t total_size = calculate_total_size(replica); // Validate buffer capacity @@ -3225,13 +3298,13 @@ RealClient::batch_get_into_internal(const std::vector &keys, std::vector key_slices; allocateSlices(key_slices, replica, buffers[i]); - if (query_result_values.replicas.size() == 1 && - query_result_values.replicas.at(0).is_local_disk_replica()) { + if (replica.is_local_disk_replica()) { valid_local_disk_operations.emplace( key, ValidKeyInfo{.key = key, .original_index = i, .query_result = std::move(query_result_values), + .preferred_replica = replica, .slices = std::move(key_slices), .total_size = total_size}); results[i] = static_cast(total_size); @@ -3242,6 +3315,7 @@ RealClient::batch_get_into_internal(const std::vector &keys, {.key = key, .original_index = i, .query_result = std::move(query_result_values), + .preferred_replica = replica, .slices = std::move(key_slices), .total_size = total_size}); @@ -3290,7 +3364,7 @@ RealClient::batch_get_into_internal(const std::vector &keys, offload_objects; for (const auto &op_it : valid_local_disk_operations) { - const auto &replica = op_it.second.query_result.replicas.at(0); + const auto &replica = op_it.second.preferred_replica; auto [store_segment_it, _] = offload_objects.try_emplace( replica.get_local_disk_descriptor().transport_endpoint); store_segment_it->second.emplace(op_it.first, @@ -3511,11 +3585,13 @@ RealClient::batch_get_into_multi_buffers_internal( std::string key; size_t original_index; QueryResult query_result; + Replica::Descriptor preferred_replica; std::vector slices; uint64_t total_size; }; std::vector valid_operations; + std::unordered_map valid_local_disk_operations; valid_operations.reserve(num_keys); for (size_t i = 0; i < num_keys; ++i) { const auto &key = keys[i]; @@ -3537,7 +3613,13 @@ RealClient::batch_get_into_multi_buffers_internal( continue; } // Calculate required buffer size - const auto &replica = query_result_values.replicas[0]; + const auto &res = + client_->GetPreferredReplica(query_result_values.replicas); + if (!res) { + results.emplace_back(tl::unexpected(ErrorCode::INVALID_REPLICA)); + continue; + } + const auto &replica = res.value(); uint64_t total_size = calculate_total_size(replica); const auto &sizes = all_sizes[i]; uint64_t dst_total_size = 0; @@ -3555,7 +3637,28 @@ RealClient::batch_get_into_multi_buffers_internal( const auto &buffers = all_buffers[i]; std::vector key_slices; key_slices.reserve(buffers.size()); - if (replica.is_memory_replica()) { + if (replica.is_local_disk_replica()) { + for (size_t j = 0; j < buffers.size(); ++j) { + key_slices.emplace_back(Slice{buffers[j], sizes[j]}); + } + if (key_slices.size() != 1) { + LOG(ERROR) << "Local disk offload currently only supports 1 " + "slice per key for multi-buffers, given: " + << key_slices.size() << " for key: " << key; + results.emplace_back(tl::unexpected(ErrorCode::INVALID_PARAMS)); + continue; + } + valid_local_disk_operations.emplace( + key, + ValidKeyInfo{.key = key, + .original_index = i, + .query_result = std::move(query_result_values), + .preferred_replica = replica, + .slices = std::move(key_slices), + .total_size = total_size}); + results.emplace_back(static_cast(total_size)); + continue; + } else if (replica.is_memory_replica()) { for (size_t j = 0; j < buffers.size(); ++j) { key_slices.emplace_back(Slice{buffers[j], sizes[j]}); } @@ -3569,41 +3672,74 @@ RealClient::batch_get_into_multi_buffers_internal( {.key = key, .original_index = i, .query_result = std::move(query_result_values), + .preferred_replica = replica, .slices = std::move(key_slices), .total_size = total_size}); // Set success result (actual bytes transferred) results.emplace_back(static_cast(total_size)); } // Early return if no valid operations - if (valid_operations.empty()) { + if (valid_operations.empty() && valid_local_disk_operations.empty()) { return results; } // Prepare batch transfer data structures - std::vector batch_keys; - std::vector batch_query_results; - std::unordered_map> batch_slices; - batch_keys.reserve(valid_operations.size()); - batch_query_results.reserve(valid_operations.size()); - for (auto &op : valid_operations) { - batch_keys.push_back(op.key); - batch_query_results.push_back(op.query_result); - batch_slices[op.key] = op.slices; + if (!valid_operations.empty()) { + std::vector batch_keys; + std::vector batch_query_results; + std::unordered_map> batch_slices; + batch_keys.reserve(valid_operations.size()); + batch_query_results.reserve(valid_operations.size()); + for (auto &op : valid_operations) { + batch_keys.push_back(op.key); + batch_query_results.push_back(op.query_result); + batch_slices[op.key] = op.slices; + } + + auto batch_get_results = + client_->BatchGet(batch_keys, batch_query_results, batch_slices, + prefer_alloc_in_same_node); + + // Process transfer results + for (size_t j = 0; j < batch_get_results.size(); ++j) { + const auto &op = valid_operations[j]; + + if (!batch_get_results[j]) { + const auto error = batch_get_results[j].error(); + LOG(ERROR) << "BatchGet failed for key '" << op.key + << "': " << toString(error); + results[op.original_index] = tl::unexpected(error); + } + } } - auto batch_get_results = - client_->BatchGet(batch_keys, batch_query_results, batch_slices, - prefer_alloc_in_same_node); + if (!valid_local_disk_operations.empty()) { + std::unordered_map> + offload_objects; - // Process transfer results - for (size_t j = 0; j < batch_get_results.size(); ++j) { - const auto &op = valid_operations[j]; + for (const auto &op_it : valid_local_disk_operations) { + const auto &replica = op_it.second.preferred_replica; + auto [store_segment_it, _] = offload_objects.try_emplace( + replica.get_local_disk_descriptor().transport_endpoint); + store_segment_it->second.emplace(op_it.first, + op_it.second.slices.at(0)); + } - if (!batch_get_results[j]) { - const auto error = batch_get_results[j].error(); - LOG(ERROR) << "BatchGet failed for key '" << op.key - << "': " << toString(error); - results[op.original_index] = tl::unexpected(error); + for (auto &offload_objects_it : offload_objects) { + auto batch_get_offload_result = + batch_get_into_offload_object_internal( + offload_objects_it.first, offload_objects_it.second); + if (!batch_get_offload_result) { + LOG(ERROR) << "Batch get store object failed with error: " + << batch_get_offload_result.error(); + for (const auto &offload_object_it : + offload_objects_it.second) { + results[valid_local_disk_operations + .at(offload_object_it.first) + .original_index] = + tl::make_unexpected(batch_get_offload_result.error()); + } + } } } return results;