From e6ee490350244407e4a4337a205d785cdb39dd90 Mon Sep 17 00:00:00 2001 From: "xiyu.wxy" Date: Fri, 13 Mar 2026 23:26:56 +0800 Subject: [PATCH] [data_storage] enhance DataStorageBackend ::Create interface --- .../data_storage/data_storage_backend.h | 26 +- .../data_storage/data_storage_manager.cc | 29 +- .../data_storage/data_storage_manager.h | 9 +- .../data_storage/hf3fs_backend.cc | 76 +++-- kv_cache_manager/data_storage/hf3fs_backend.h | 7 +- .../data_storage/mooncake_backend.cc | 36 ++- .../data_storage/mooncake_backend.h | 7 +- kv_cache_manager/data_storage/nfs_backend.cc | 70 +++-- kv_cache_manager/data_storage/nfs_backend.h | 7 +- .../test/data_storage_manager_test.cc | 26 +- .../data_storage/test/hf3fs_backend_test.cc | 212 +++++++++---- .../data_storage/test/nfs_backend_test.cc | 186 +++++++++--- kv_cache_manager/manager/cache_manager.cc | 280 ++++++------------ kv_cache_manager/manager/cache_manager.h | 22 +- .../data_storage/tair_mempool_backend.cc | 7 +- .../data_storage/tair_mempool_backend.h | 7 +- .../data_storage/vcns_hf3fs_backend.cc | 9 +- .../data_storage/vcns_hf3fs_backend.h | 7 +- 18 files changed, 599 insertions(+), 424 deletions(-) diff --git a/kv_cache_manager/data_storage/data_storage_backend.h b/kv_cache_manager/data_storage/data_storage_backend.h index 90b131ed..bfd913f8 100644 --- a/kv_cache_manager/data_storage/data_storage_backend.h +++ b/kv_cache_manager/data_storage/data_storage_backend.h @@ -1,10 +1,12 @@ #pragma once #include +#include #include #include #include #include +#include #include "kv_cache_manager/common/error_code.h" #include "kv_cache_manager/data_storage/common_define.h" @@ -13,6 +15,23 @@ namespace kv_cache_manager { +// Block keys for a single spec +struct SpecBlockKeys { + std::string spec_name; // e.g., "tp0", "Linear_TP0" + int64_t spec_size; // Size per key for this spec + std::vector block_keys; // Raw key values + std::vector original_key_indices; // Mapping to caller's keys array indices +}; + +// Request object for creating blocks, extensible for future features (e.g., lifecycle groups) +struct CreateBlocksRequest { + std::string instance_id; + std::vector spec_block_keys; +}; + +// Per-spec create result, aligned 1:1 with CreateBlocksRequest::spec_block_keys +using SpecCreateResult = std::vector>; + class DataStorageBackend { public: DataStorageBackend() = delete; @@ -42,10 +61,9 @@ class DataStorageBackend { } virtual ErrorCode DoOpen(const StorageConfig &config, const std::string &trace_id) = 0; virtual ErrorCode Close() = 0; - virtual std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) = 0; + virtual std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) = 0; virtual std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) = 0; virtual std::vector Exist(const std::vector &storage_uris) = 0; diff --git a/kv_cache_manager/data_storage/data_storage_manager.cc b/kv_cache_manager/data_storage/data_storage_manager.cc index 5c8110eb..ccc09f4f 100644 --- a/kv_cache_manager/data_storage/data_storage_manager.cc +++ b/kv_cache_manager/data_storage/data_storage_manager.cc @@ -174,11 +174,10 @@ std::shared_ptr DataStorageManager::CreateStorageBackend(con } } -std::vector> DataStorageManager::Create(RequestContext *request_context, - const std::string &unique_name, - const std::vector &keys, - size_t size_per_key, - std::function cb) { +std::vector DataStorageManager::Create(RequestContext *request_context, + const std::string &unique_name, + const CreateBlocksRequest &request, + std::function cb) { SPAN_TRACER(request_context); std::shared_lock lock(rw_lock_); const std::string &trace_id = request_context->trace_id(); @@ -190,18 +189,24 @@ std::vector> DataStorageManager::Create(Req auto storage_backend = iter->second; const auto dsmc = storage_backend->GetMetricsCollector(); KVCM_METRICS_COLLECTOR_CHRONO_MARK_BEGIN(dsmc, DataStorageCreate); - std::vector> create_result = - storage_backend->Create(keys, size_per_key, trace_id, cb); + auto create_result = storage_backend->Create(request, trace_id, cb); KVCM_METRICS_COLLECTOR_CHRONO_MARK_END(dsmc, DataStorageCreate); - KVCM_METRICS_COLLECTOR_SET_METRICS(dsmc, data_storage, create_keys_qps, keys.size()); + // Count total keys from all specs for metrics + size_t total_keys = 0; + for (const auto &spec_block : request.spec_block_keys) { + total_keys += spec_block.block_keys.size(); + } + KVCM_METRICS_COLLECTOR_SET_METRICS(dsmc, data_storage, create_keys_qps, total_keys); if (request_context) { request_context->GetMetricsCollectorsVehicle().AddMetricsCollector(dsmc); } - std::for_each(create_result.begin(), create_result.end(), [&unique_name](auto &pair) { - if (pair.first == EC_OK) { - pair.second.SetHostName(unique_name); + for (auto &spec_result : create_result) { + for (auto &[ec, uri] : spec_result) { + if (ec == EC_OK) { + uri.SetHostName(unique_name); + } } - }); + } return create_result; } diff --git a/kv_cache_manager/data_storage/data_storage_manager.h b/kv_cache_manager/data_storage/data_storage_manager.h index 9192b833..b888f955 100644 --- a/kv_cache_manager/data_storage/data_storage_manager.h +++ b/kv_cache_manager/data_storage/data_storage_manager.h @@ -36,11 +36,10 @@ class DataStorageManager { ErrorCode UnRegisterStorage(const std::string &name); ErrorCode DoCleanup(); - std::vector> Create(RequestContext *request_context, - const std::string &unique_name, - const std::vector &keys, - size_t size_per_key, - std::function cb); + std::vector Create(RequestContext *request_context, + const std::string &unique_name, + const CreateBlocksRequest &request, + std::function cb); std::vector Delete(RequestContext *request_context, const std::string &unique_name, diff --git a/kv_cache_manager/data_storage/hf3fs_backend.cc b/kv_cache_manager/data_storage/hf3fs_backend.cc index 87948457..b74df30a 100644 --- a/kv_cache_manager/data_storage/hf3fs_backend.cc +++ b/kv_cache_manager/data_storage/hf3fs_backend.cc @@ -54,41 +54,59 @@ ErrorCode Hf3fsBackend::Close() { return EC_OK; }; -std::vector> Hf3fsBackend::Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) { - std::vector> result; - std::vector> batches; +std::vector Hf3fsBackend::Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) { + std::vector result; int32_t batch_size = spec_.key_count_per_file(); batch_size = batch_size <= 0 ? 1 : batch_size; - size_t total_key_count = keys.size(); - for (size_t start = 0; start < total_key_count; start += batch_size) { - size_t end = std::min(start + batch_size, total_key_count); - batches.emplace_back(keys.begin() + start, keys.begin() + end); - } - for (auto &batch : batches) { - DataStorageUri storage_uri; - storage_uri.SetProtocol(ToString(GetType())); - if (batch.size() > 1) { - std::string combine_key = StringUtil::Join(batch, "|"); - std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42)); - storage_uri.SetPath(base_path_ / (batch[0] + "_" + hash_str)); - } else { - storage_uri.SetPath(base_path_ / batch[0]); - } - storage_uri.SetParam("size", std::to_string(size_per_key)); - for (size_t j = 0; j < batch.size(); ++j) { - if (batch_size > 1) { - storage_uri.SetParam("blkid", std::to_string(j)); + + // Process each spec independently - never mix keys from different specs in a batch + for (const auto &spec_block : request.spec_block_keys) { + SpecCreateResult spec_result; + const auto &block_keys = spec_block.block_keys; + size_t total_key_count = block_keys.size(); + + // Batch within this spec only + for (size_t start = 0; start < total_key_count; start += batch_size) { + size_t end = std::min(start + batch_size, total_key_count); + + // Build block key strings for this batch + std::vector batch_keys; + batch_keys.reserve(end - start); + for (size_t i = start; i < end; ++i) { + std::string block_key = request.instance_id + "/" + spec_block.spec_name + "/" + + StringUtil::Uint64ToHex(block_keys[i]); + batch_keys.push_back(std::move(block_key)); } - result.push_back({EC_OK, storage_uri}); - if (spec_.touch_file_when_create()) { - Hf3fsDataStorageItem item = Hf3fsDataStorageItem::FromUri(storage_uri); - TouchFile(item.file_path); + + // Generate URI for this batch + DataStorageUri storage_uri; + storage_uri.SetProtocol(ToString(GetType())); + if (batch_keys.size() > 1) { + std::string combine_key = StringUtil::Join(batch_keys, "|"); + std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42)); + storage_uri.SetPath(base_path_ / (batch_keys[0] + "_" + hash_str)); + } else { + storage_uri.SetPath(base_path_ / batch_keys[0]); + } + storage_uri.SetParam("size", std::to_string(spec_block.spec_size)); + + // Add result entry for each key in the batch + for (size_t j = 0; j < batch_keys.size(); ++j) { + if (batch_size > 1) { + storage_uri.SetParam("blkid", std::to_string(j)); + } + spec_result.push_back({EC_OK, storage_uri}); + if (spec_.touch_file_when_create()) { + Hf3fsDataStorageItem item = Hf3fsDataStorageItem::FromUri(storage_uri); + TouchFile(item.file_path); + } } } + result.push_back(std::move(spec_result)); } + if (cb) { cb(); } diff --git a/kv_cache_manager/data_storage/hf3fs_backend.h b/kv_cache_manager/data_storage/hf3fs_backend.h index 1d380cb0..fdeb5f71 100644 --- a/kv_cache_manager/data_storage/hf3fs_backend.h +++ b/kv_cache_manager/data_storage/hf3fs_backend.h @@ -22,10 +22,9 @@ class Hf3fsBackend : public DataStorageBackend { ErrorCode DoOpen(const StorageConfig &storage_config, const std::string &trace_id) override; ErrorCode Close() override; - std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) override; + std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) override; std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) override; diff --git a/kv_cache_manager/data_storage/mooncake_backend.cc b/kv_cache_manager/data_storage/mooncake_backend.cc index bbb83c65..14cfde7f 100644 --- a/kv_cache_manager/data_storage/mooncake_backend.cc +++ b/kv_cache_manager/data_storage/mooncake_backend.cc @@ -7,6 +7,7 @@ #include #include "kv_cache_manager/common/logger.h" +#include "kv_cache_manager/common/string_util.h" #include "kv_cache_manager/metrics/metrics_registry.h" namespace kv_cache_manager { @@ -80,23 +81,32 @@ ErrorCode MooncakeBackend::Close() { return EC_OK; }; -std::vector> MooncakeBackend::Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) { - std::vector> result; - for (int i = 0; i < keys.size(); i++) { - DataStorageUri storage_uri; - storage_uri.SetProtocol(ToString(GetType())); - storage_uri.SetPath("/"); - storage_uri.SetParam("key", keys[i]); - storage_uri.SetParam("size", std::to_string(size_per_key)); - result.push_back({EC_OK, storage_uri}); +std::vector MooncakeBackend::Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) { + std::vector result; + + // Process all specs and keys - Mooncake doesn't batch, create one URI per key + for (const auto &spec_block : request.spec_block_keys) { + SpecCreateResult spec_result; + for (size_t i = 0; i < spec_block.block_keys.size(); i++) { + // Build the key string: instance_id/spec_name/hex(key) + std::string key_str = request.instance_id + "/" + spec_block.spec_name + "/" + + StringUtil::Uint64ToHex(spec_block.block_keys[i]); + + DataStorageUri storage_uri; + storage_uri.SetProtocol(ToString(GetType())); + storage_uri.SetPath("/"); + storage_uri.SetParam("key", key_str); + storage_uri.SetParam("size", std::to_string(spec_block.spec_size)); + spec_result.push_back({EC_OK, storage_uri}); + } + result.push_back(std::move(spec_result)); } + if (cb) { cb(); } - // not supported yet return result; } std::vector MooncakeBackend::Delete(const std::vector &storage_uris, diff --git a/kv_cache_manager/data_storage/mooncake_backend.h b/kv_cache_manager/data_storage/mooncake_backend.h index 012fbf28..1a0ff151 100644 --- a/kv_cache_manager/data_storage/mooncake_backend.h +++ b/kv_cache_manager/data_storage/mooncake_backend.h @@ -23,10 +23,9 @@ class MooncakeBackend : public DataStorageBackend { ErrorCode Close() override; - std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) override; + std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) override; std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) override; diff --git a/kv_cache_manager/data_storage/nfs_backend.cc b/kv_cache_manager/data_storage/nfs_backend.cc index 405fc404..c6082012 100644 --- a/kv_cache_manager/data_storage/nfs_backend.cc +++ b/kv_cache_manager/data_storage/nfs_backend.cc @@ -39,37 +39,55 @@ ErrorCode NfsBackend::Close() { return EC_OK; }; -std::vector> NfsBackend::Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) { - std::vector> result; - std::vector> batches; +std::vector NfsBackend::Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) { + std::vector result; int32_t batch_size = spec_.key_count_per_file(); batch_size = batch_size <= 0 ? 1 : batch_size; - size_t total_key_count = keys.size(); - for (size_t start = 0; start < total_key_count; start += batch_size) { - size_t end = std::min(start + batch_size, total_key_count); - batches.emplace_back(keys.begin() + start, keys.begin() + end); - } - for (auto &batch : batches) { - DataStorageUri storage_uri; - storage_uri.SetProtocol(ToString(GetType())); - if (batch.size() > 1) { - std::string combine_key = StringUtil::Join(batch, "|"); - std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42)); - storage_uri.SetPath(spec_.root_path() + batch[0] + "_" + hash_str); - } else { - storage_uri.SetPath(spec_.root_path() + batch[0]); - } - storage_uri.SetParam("size", std::to_string(size_per_key)); - for (size_t j = 0; j < batch.size(); ++j) { - if (batch_size > 1) { - storage_uri.SetParam("blkid", std::to_string(j)); + + // Process each spec independently - never mix keys from different specs in a batch + for (const auto &spec_block : request.spec_block_keys) { + SpecCreateResult spec_result; + const auto &block_keys = spec_block.block_keys; + size_t total_key_count = block_keys.size(); + + // Batch within this spec only + for (size_t start = 0; start < total_key_count; start += batch_size) { + size_t end = std::min(start + batch_size, total_key_count); + + // Build block key strings for this batch + std::vector batch_keys; + batch_keys.reserve(end - start); + for (size_t i = start; i < end; ++i) { + std::string block_key = request.instance_id + "/" + spec_block.spec_name + "/" + + StringUtil::Uint64ToHex(block_keys[i]); + batch_keys.push_back(std::move(block_key)); + } + + // Generate URI for this batch + DataStorageUri storage_uri; + storage_uri.SetProtocol(ToString(GetType())); + if (batch_keys.size() > 1) { + std::string combine_key = StringUtil::Join(batch_keys, "|"); + std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42)); + storage_uri.SetPath(spec_.root_path() + batch_keys[0] + "_" + hash_str); + } else { + storage_uri.SetPath(spec_.root_path() + batch_keys[0]); + } + storage_uri.SetParam("size", std::to_string(spec_block.spec_size)); + + // Add result entry for each key in the batch + for (size_t j = 0; j < batch_keys.size(); ++j) { + if (batch_size > 1) { + storage_uri.SetParam("blkid", std::to_string(j)); + } + spec_result.push_back({EC_OK, storage_uri}); } - result.push_back({EC_OK, storage_uri}); } + result.push_back(std::move(spec_result)); } + if (cb) { cb(); } diff --git a/kv_cache_manager/data_storage/nfs_backend.h b/kv_cache_manager/data_storage/nfs_backend.h index 2c61cb80..5d1d12f4 100644 --- a/kv_cache_manager/data_storage/nfs_backend.h +++ b/kv_cache_manager/data_storage/nfs_backend.h @@ -21,10 +21,9 @@ class NfsBackend : public DataStorageBackend { ErrorCode DoOpen(const StorageConfig &storage_config, const std::string &trace_id) override; ErrorCode Close() override; - std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) override; + std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) override; std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) override; diff --git a/kv_cache_manager/data_storage/test/data_storage_manager_test.cc b/kv_cache_manager/data_storage/test/data_storage_manager_test.cc index e3ee011f..b10b6e0a 100644 --- a/kv_cache_manager/data_storage/test/data_storage_manager_test.cc +++ b/kv_cache_manager/data_storage/test/data_storage_manager_test.cc @@ -51,16 +51,26 @@ TEST_F(DataStorageManagerTest, TestSimple) { ASSERT_EQ(EC_NOENT, data_storage_manager.EnableStorage("storage2")); // create exist delete - DataStorageUri storage_uri1("file://storage1/data/key1?size=128"); - // ASSERT_FALSE(data_storage_manager.Exist("storage1", {storage_uri1})[0]); RequestContext requesst_context("test"); - auto uris = data_storage_manager.Create(&requesst_context, "storage1", {"key1"}, 128, []() {}); - ASSERT_EQ(1, uris.size()); - ASSERT_EQ(EC_OK, uris[0].first); - ASSERT_EQ(storage_uri1.ToUriString(), uris[0].second.ToUriString()); + + // Build CreateBlocksRequest + CreateBlocksRequest create_request; + create_request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 128; + spec_block.block_keys = {0x6b657931}; // hex of "key1" + spec_block.original_key_indices = {0}; + create_request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = data_storage_manager.Create(&requesst_context, "storage1", create_request, []() {}); + ASSERT_EQ(1, result.size()); + ASSERT_EQ(1, result[0].size()); + ASSERT_EQ(EC_OK, result[0][0].first); + ASSERT_EQ("file://storage1/data/test_instance/tp0/6b657931?size=128", result[0][0].second.ToUriString()); // unique name not exist - uris = data_storage_manager.Create(&requesst_context, "storage2", {"key1"}, 128, []() {}); - ASSERT_EQ(0, uris.size()); + result = data_storage_manager.Create(&requesst_context, "storage2", create_request, []() {}); + ASSERT_EQ(0, result.size()); // unregister storage ASSERT_EQ(EC_OK, data_storage_manager.UnRegisterStorage("storage1")); diff --git a/kv_cache_manager/data_storage/test/hf3fs_backend_test.cc b/kv_cache_manager/data_storage/test/hf3fs_backend_test.cc index 1c7c9e3e..f2254dba 100644 --- a/kv_cache_manager/data_storage/test/hf3fs_backend_test.cc +++ b/kv_cache_manager/data_storage/test/hf3fs_backend_test.cc @@ -48,14 +48,24 @@ TEST_F(Hf3fsBackendTest, TestSimple) { StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); storage_config.set_check_storage_available_when_open(true); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; - auto results = backend.Create(keys, 128, "fake_trace_id_2", []() {}); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ(BuildUri("key1?size=128"), results[0].second.ToUriString()); - ASSERT_EQ(BuildUri("key2?size=128"), results[1].second.ToUriString()); - ASSERT_EQ(BuildUri("key3?size=128"), results[2].second.ToUriString()); - ASSERT_EQ(BuildUri("key4?size=128"), results[3].second.ToUriString()); - ASSERT_EQ(BuildUri("key5?size=128"), results[4].second.ToUriString()); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 128; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = backend.Create(request, "fake_trace_id_2", []() {}); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657931?size=128"), result[0][0].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657932?size=128"), result[0][1].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657933?size=128"), result[0][2].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657934?size=128"), result[0][3].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657935?size=128"), result[0][4].second.ToUriString()); } // 多个key一个文件的形式 { @@ -65,14 +75,24 @@ TEST_F(Hf3fsBackendTest, TestSimple) { StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); storage_config.set_check_storage_available_when_open(true); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_3")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; - auto results = backend.Create(keys, 128, "fake_trace_id_4", []() {}); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(BuildUri("key1_5a560a3d977cc6f2?blkid=0&size=128"), results[0].second.ToUriString()); - EXPECT_EQ(BuildUri("key1_5a560a3d977cc6f2?blkid=1&size=128"), results[1].second.ToUriString()); - EXPECT_EQ(BuildUri("key3_1184f2d3fc112241?blkid=0&size=128"), results[2].second.ToUriString()); - EXPECT_EQ(BuildUri("key3_1184f2d3fc112241?blkid=1&size=128"), results[3].second.ToUriString()); - EXPECT_EQ(BuildUri("key5?blkid=0&size=128"), results[4].second.ToUriString()); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 128; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = backend.Create(request, "fake_trace_id_4", []() {}); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=128"), result[0][0].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=128"), result[0][1].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657933_d893940787994b17?blkid=0&size=128"), result[0][2].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657933_d893940787994b17?blkid=1&size=128"), result[0][3].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657935?blkid=0&size=128"), result[0][4].second.ToUriString()); } } @@ -94,19 +114,29 @@ TEST_F(Hf3fsBackendTest, TestCreateWithBatchingAndCallbackInvocation) { spec->set_key_count_per_file(2); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_2")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 100; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool callback_called = false; auto callback = [&callback_called]() { callback_called = true; }; - auto results = backend.Create(keys, 100, "fake_trace_id_3", callback); + auto result = backend.Create(request, "fake_trace_id_3", callback); ASSERT_TRUE(callback_called); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(BuildUri("key1_5a560a3d977cc6f2?blkid=0&size=100"), results[0].second.ToUriString()); - EXPECT_EQ(BuildUri("key1_5a560a3d977cc6f2?blkid=1&size=100"), results[1].second.ToUriString()); - EXPECT_EQ(BuildUri("key3_1184f2d3fc112241?blkid=0&size=100"), results[2].second.ToUriString()); - EXPECT_EQ(BuildUri("key3_1184f2d3fc112241?blkid=1&size=100"), results[3].second.ToUriString()); - EXPECT_EQ(BuildUri("key5?blkid=0&size=100"), results[4].second.ToUriString()); - for (size_t i = 0; i < results.size(); ++i) { - ASSERT_EQ(results[i].first, EC_OK); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=100"), result[0][0].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=100"), result[0][1].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657933_d893940787994b17?blkid=0&size=100"), result[0][2].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657933_d893940787994b17?blkid=1&size=100"), result[0][3].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657935?blkid=0&size=100"), result[0][4].second.ToUriString()); + for (size_t i = 0; i < result[0].size(); ++i) { + ASSERT_EQ(result[0][i].first, EC_OK); } } @@ -116,19 +146,33 @@ TEST_F(Hf3fsBackendTest, TestCreateWithBatchSizeOneAndEmptyKeys) { spec->set_key_count_per_file(1); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); + + // Test empty request + CreateBlocksRequest empty_request; + empty_request.instance_id = "test_instance"; bool callback_called = false; auto cb = [&callback_called]() { callback_called = true; }; - auto results_empty = backend.Create({}, 100, "fake_trace_id_2", cb); + auto results_empty = backend.Create(empty_request, "fake_trace_id_2", cb); ASSERT_TRUE(callback_called); ASSERT_TRUE(results_empty.empty()); - std::vector keys = {"a", "b"}; + // Test with keys + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 100; + spec_block.block_keys = {0x61, 0x62}; // hex of 'a', 'b' + spec_block.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block)); + callback_called = false; - auto results = backend.Create(keys, 100, "fake_trace_id_3", [&callback_called]() { callback_called = true; }); + auto result = backend.Create(request, "fake_trace_id_3", [&callback_called]() { callback_called = true; }); ASSERT_TRUE(callback_called); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(BuildUri("a?size=100"), results[0].second.ToUriString()); - EXPECT_EQ(BuildUri("b?size=100"), results[1].second.ToUriString()); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ(BuildUri("test_instance/tp0/61?size=100"), result[0][0].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/62?size=100"), result[0][1].second.ToUriString()); } TEST_F(Hf3fsBackendTest, TestDeleteNotExistFile) { @@ -154,15 +198,25 @@ TEST_F(Hf3fsBackendTest, TestExistAndDelete) { spec->set_touch_file_when_create(true); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_5")); - std::vector keys = {"key1", "key2"}; - auto results = backend.Create(keys, 100, "fake_trace_id_1", []() {}); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ(BuildUri("key1?size=100"), results[0].second.ToUriString()); - ASSERT_EQ(BuildUri("key2?size=100"), results[1].second.ToUriString()); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 100; + spec_block.block_keys = {0x6b657931, 0x6b657932}; // hex of "key1", "key2" + spec_block.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = backend.Create(request, "fake_trace_id_1", []() {}); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657931?size=100"), result[0][0].second.ToUriString()); + ASSERT_EQ(BuildUri("test_instance/tp0/6b657932?size=100"), result[0][1].second.ToUriString()); { std::vector uris; - uris.emplace_back(results[0].second); - uris.emplace_back(results[1].second); + uris.emplace_back(result[0][0].second); + uris.emplace_back(result[0][1].second); uris.emplace_back(BuildUri("not_exist_key")); auto res = backend.Exist(uris); ASSERT_EQ(res.size(), uris.size()); @@ -172,8 +226,8 @@ TEST_F(Hf3fsBackendTest, TestExistAndDelete) { } { std::vector uris; - uris.emplace_back(results[0].second); - uris.emplace_back(results[1].second); + uris.emplace_back(result[0][0].second); + uris.emplace_back(result[0][1].second); uris.emplace_back(BuildUri("not_exist_key2")); auto res = backend.Delete(uris, "fake_trace_id_2", []() {}); ASSERT_EQ(res.size(), uris.size()); @@ -208,13 +262,23 @@ TEST_F(Hf3fsBackendTest, TestCreateHandlesInvalidBatchSize) { spec->set_key_count_per_file(0); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_2")); - std::vector keys = {"k1", "k2"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 50; + spec_block.block_keys = {0x6b31, 0x6b32}; // hex of "k1", "k2" + spec_block.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool cb_called = false; - auto results = backend.Create(keys, 50, "fake_trace_id_3", [&cb_called]() { cb_called = true; }); + auto result = backend.Create(request, "fake_trace_id_3", [&cb_called]() { cb_called = true; }); ASSERT_TRUE(cb_called); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(BuildUri("k1?size=50"), results[0].second.ToUriString()); - EXPECT_EQ(BuildUri("k2?size=50"), results[1].second.ToUriString()); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 2); + EXPECT_EQ(BuildUri("test_instance/tp0/6b31?size=50"), result[0][0].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b32?size=50"), result[0][1].second.ToUriString()); } TEST_F(Hf3fsBackendTest, TestCreateSingleKeyBatch) { @@ -223,10 +287,58 @@ TEST_F(Hf3fsBackendTest, TestCreateSingleKeyBatch) { spec->set_key_count_per_file(10); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_4")); - std::vector keys = {"singlekey"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 10; + spec_block.block_keys = {0x12345678abcdef00LL}; // a valid int64 key + spec_block.original_key_indices = {0}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool cb_called = false; - auto results = backend.Create(keys, 10, "fake_trace_id_5", [&cb_called]() { cb_called = true; }); + auto result = backend.Create(request, "fake_trace_id_5", [&cb_called]() { cb_called = true; }); ASSERT_TRUE(cb_called); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(BuildUri("singlekey?blkid=0&size=10"), results[0].second.ToUriString()); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 1); + EXPECT_EQ(BuildUri("test_instance/tp0/12345678abcdef00?blkid=0&size=10"), result[0][0].second.ToUriString()); +} + +TEST_F(Hf3fsBackendTest, TestMultipleSpecsNotMixed) { + // Test that keys from different specs are never mixed in the same batch + Hf3fsBackend backend(metrics_registry_); + auto spec = GetDefaultStorageSpec(); + spec->set_key_count_per_file(3); // Large batch size + StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_HF3FS, "test", spec); + ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + + // First spec with 2 keys + SpecBlockKeys spec_block1; + spec_block1.spec_name = "tp0"; + spec_block1.spec_size = 128; + spec_block1.block_keys = {0x6b657931, 0x6b657932}; + spec_block1.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block1)); + + // Second spec with 2 keys + SpecBlockKeys spec_block2; + spec_block2.spec_name = "tp1"; + spec_block2.spec_size = 128; + spec_block2.block_keys = {0x6b657933, 0x6b657934}; + spec_block2.original_key_indices = {2, 3}; + request.spec_block_keys.push_back(std::move(spec_block2)); + + auto result = backend.Create(request, "fake_trace_id_2", []() {}); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ(result[1].size(), 2); + + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=128"), result[0][0].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=128"), result[0][1].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp1/6b657933_3533532bb1ef35e1?blkid=0&size=128"), result[1][0].second.ToUriString()); + EXPECT_EQ(BuildUri("test_instance/tp1/6b657933_3533532bb1ef35e1?blkid=1&size=128"), result[1][1].second.ToUriString()); } diff --git a/kv_cache_manager/data_storage/test/nfs_backend_test.cc b/kv_cache_manager/data_storage/test/nfs_backend_test.cc index e12d066e..b9431787 100644 --- a/kv_cache_manager/data_storage/test/nfs_backend_test.cc +++ b/kv_cache_manager/data_storage/test/nfs_backend_test.cc @@ -24,14 +24,24 @@ TEST_F(NfsBackendTest, TestSimple) { spec->set_root_path("/data/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; - auto results = backend.Create(keys, 128, "fake_trace_id_2", []() {}); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ("file:///data/key1?size=128", results[0].second.ToUriString()); - ASSERT_EQ("file:///data/key2?size=128", results[1].second.ToUriString()); - ASSERT_EQ("file:///data/key3?size=128", results[2].second.ToUriString()); - ASSERT_EQ("file:///data/key4?size=128", results[3].second.ToUriString()); - ASSERT_EQ("file:///data/key5?size=128", results[4].second.ToUriString()); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 128; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = backend.Create(request, "fake_trace_id_2", []() {}); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + ASSERT_EQ("file:///data/test_instance/tp0/6b657931?size=128", result[0][0].second.ToUriString()); + ASSERT_EQ("file:///data/test_instance/tp0/6b657932?size=128", result[0][1].second.ToUriString()); + ASSERT_EQ("file:///data/test_instance/tp0/6b657933?size=128", result[0][2].second.ToUriString()); + ASSERT_EQ("file:///data/test_instance/tp0/6b657934?size=128", result[0][3].second.ToUriString()); + ASSERT_EQ("file:///data/test_instance/tp0/6b657935?size=128", result[0][4].second.ToUriString()); } // 多个key一个文件的形式 { @@ -41,14 +51,24 @@ TEST_F(NfsBackendTest, TestSimple) { spec->set_root_path("/data/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_3")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; - auto results = backend.Create(keys, 128, "fake_trace_id_4", []() {}); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ("file:///data/key1_5a560a3d977cc6f2?blkid=0&size=128", results[0].second.ToUriString()); - EXPECT_EQ("file:///data/key1_5a560a3d977cc6f2?blkid=1&size=128", results[1].second.ToUriString()); - EXPECT_EQ("file:///data/key3_1184f2d3fc112241?blkid=0&size=128", results[2].second.ToUriString()); - EXPECT_EQ("file:///data/key3_1184f2d3fc112241?blkid=1&size=128", results[3].second.ToUriString()); - EXPECT_EQ("file:///data/key5?blkid=0&size=128", results[4].second.ToUriString()); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 128; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + + auto result = backend.Create(request, "fake_trace_id_4", []() {}); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=128", result[0][0].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=128", result[0][1].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657933_d893940787994b17?blkid=0&size=128", result[0][2].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657933_d893940787994b17?blkid=1&size=128", result[0][3].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657935?blkid=0&size=128", result[0][4].second.ToUriString()); } } @@ -72,19 +92,29 @@ TEST_F(NfsBackendTest, TestCreateWithBatchingAndCallbackInvocation) { spec->set_root_path("/data/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 100; + spec_block.block_keys = {0x6b657931, 0x6b657932, 0x6b657933, 0x6b657934, 0x6b657935}; + spec_block.original_key_indices = {0, 1, 2, 3, 4}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool callback_called = false; auto callback = [&callback_called]() { callback_called = true; }; - auto results = backend.Create(keys, 100, "fake_trace_id_2", callback); + auto result = backend.Create(request, "fake_trace_id_2", callback); ASSERT_TRUE(callback_called); - ASSERT_EQ(results.size(), keys.size()); - EXPECT_EQ(results[0].second.ToUriString(), "file:///data/key1_5a560a3d977cc6f2?blkid=0&size=100"); - EXPECT_EQ(results[1].second.ToUriString(), "file:///data/key1_5a560a3d977cc6f2?blkid=1&size=100"); - EXPECT_EQ(results[2].second.ToUriString(), "file:///data/key3_1184f2d3fc112241?blkid=0&size=100"); - EXPECT_EQ(results[3].second.ToUriString(), "file:///data/key3_1184f2d3fc112241?blkid=1&size=100"); - EXPECT_EQ(results[4].second.ToUriString(), "file:///data/key5?blkid=0&size=100"); - for (size_t i = 0; i < results.size(); ++i) { - ASSERT_EQ(results[i].first, EC_OK); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 5); + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=100", result[0][0].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=100", result[0][1].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657933_d893940787994b17?blkid=0&size=100", result[0][2].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657933_d893940787994b17?blkid=1&size=100", result[0][3].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657935?blkid=0&size=100", result[0][4].second.ToUriString()); + for (size_t i = 0; i < result[0].size(); ++i) { + ASSERT_EQ(result[0][i].first, EC_OK); } } @@ -95,19 +125,34 @@ TEST_F(NfsBackendTest, TestCreateWithBatchSizeOneAndEmptyKeys) { spec->set_root_path("/data/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); + + // Test empty request + CreateBlocksRequest empty_request; + empty_request.instance_id = "test_instance"; + // No spec_block_keys added bool callback_called = false; auto cb = [&callback_called]() { callback_called = true; }; - auto results_empty = backend.Create({}, 100, "fake_trace_id_2", cb); + auto results_empty = backend.Create(empty_request, "fake_trace_id_2", cb); ASSERT_TRUE(callback_called); ASSERT_TRUE(results_empty.empty()); - std::vector keys = {"a", "b"}; + // Test with keys + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 100; + spec_block.block_keys = {0x61, 0x62}; // hex of 'a', 'b' + spec_block.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block)); + callback_called = false; - auto results = backend.Create(keys, 100, "fake_trace_id_3", [&callback_called]() { callback_called = true; }); + auto result = backend.Create(request, "fake_trace_id_3", [&callback_called]() { callback_called = true; }); ASSERT_TRUE(callback_called); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ(results[0].second.ToUriString(), "file:///data/a?size=100"); - ASSERT_EQ(results[1].second.ToUriString(), "file:///data/b?size=100"); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ("file:///data/test_instance/tp0/61?size=100", result[0][0].second.ToUriString()); + ASSERT_EQ("file:///data/test_instance/tp0/62?size=100", result[0][1].second.ToUriString()); } TEST_F(NfsBackendTest, TestDeleteReturnsOkAndSameSize) { @@ -168,13 +213,23 @@ TEST_F(NfsBackendTest, TestCreateHandlesInvalidBatchSize) { spec->set_root_path("/root/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); - std::vector keys = {"k1", "k2"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 50; + spec_block.block_keys = {0x6b31, 0x6b32}; // hex of "k1", "k2" + spec_block.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool cb_called = false; - auto results = backend.Create(keys, 50, "fake_trace_id_2", [&cb_called]() { cb_called = true; }); + auto result = backend.Create(request, "fake_trace_id_2", [&cb_called]() { cb_called = true; }); ASSERT_TRUE(cb_called); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ(results[0].second.ToUriString(), "file:///root/k1?size=50"); - ASSERT_EQ(results[1].second.ToUriString(), "file:///root/k2?size=50"); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ("file:///root/test_instance/tp0/6b31?size=50", result[0][0].second.ToUriString()); + ASSERT_EQ("file:///root/test_instance/tp0/6b32?size=50", result[0][1].second.ToUriString()); } TEST_F(NfsBackendTest, TestCreateSingleKeyBatch) { @@ -184,10 +239,59 @@ TEST_F(NfsBackendTest, TestCreateSingleKeyBatch) { spec->set_root_path("/root/"); StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); - std::vector keys = {"singlekey"}; + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + SpecBlockKeys spec_block; + spec_block.spec_name = "tp0"; + spec_block.spec_size = 10; + spec_block.block_keys = {0x12345678abcdef00LL}; // a valid int64 key + spec_block.original_key_indices = {0}; + request.spec_block_keys.push_back(std::move(spec_block)); + bool cb_called = false; - auto results = backend.Create(keys, 10, "fake_trace_id_2", [&cb_called]() { cb_called = true; }); + auto result = backend.Create(request, "fake_trace_id_2", [&cb_called]() { cb_called = true; }); ASSERT_TRUE(cb_called); - ASSERT_EQ(results.size(), keys.size()); - ASSERT_EQ(results[0].second.ToUriString(), "file:///root/singlekey?blkid=0&size=10"); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result[0].size(), 1); + ASSERT_EQ("file:///root/test_instance/tp0/12345678abcdef00?blkid=0&size=10", result[0][0].second.ToUriString()); +} + +TEST_F(NfsBackendTest, TestMultipleSpecsNotMixed) { + // Test that keys from different specs are never mixed in the same batch + NfsBackend backend(metrics_registry_); + std::shared_ptr spec(new NfsStorageSpec); + spec->set_key_count_per_file(3); // Large batch size + spec->set_root_path("/data/"); + StorageConfig storage_config(DataStorageType::DATA_STORAGE_TYPE_NFS, "test", spec); + ASSERT_EQ(EC_OK, backend.Open(storage_config, "fake_trace_id_1")); + + CreateBlocksRequest request; + request.instance_id = "test_instance"; + + // First spec with 2 keys + SpecBlockKeys spec_block1; + spec_block1.spec_name = "tp0"; + spec_block1.spec_size = 128; + spec_block1.block_keys = {0x6b657931, 0x6b657932}; + spec_block1.original_key_indices = {0, 1}; + request.spec_block_keys.push_back(std::move(spec_block1)); + + // Second spec with 2 keys + SpecBlockKeys spec_block2; + spec_block2.spec_name = "tp1"; + spec_block2.spec_size = 128; + spec_block2.block_keys = {0x6b657933, 0x6b657934}; + spec_block2.original_key_indices = {2, 3}; + request.spec_block_keys.push_back(std::move(spec_block2)); + + auto result = backend.Create(request, "fake_trace_id_2", []() {}); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result[0].size(), 2); + ASSERT_EQ(result[1].size(), 2); + + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=0&size=128", result[0][0].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp0/6b657931_36fc91bdda6e6263?blkid=1&size=128", result[0][1].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp1/6b657933_3533532bb1ef35e1?blkid=0&size=128", result[1][0].second.ToUriString()); + EXPECT_EQ("file:///data/test_instance/tp1/6b657933_3533532bb1ef35e1?blkid=1&size=128", result[1][1].second.ToUriString()); } diff --git a/kv_cache_manager/manager/cache_manager.cc b/kv_cache_manager/manager/cache_manager.cc index b84414e0..4e2ab847 100644 --- a/kv_cache_manager/manager/cache_manager.cc +++ b/kv_cache_manager/manager/cache_manager.cc @@ -659,155 +659,6 @@ ErrorCode CacheManager::FilterWriteCache(RequestContext *request_context, return EC_OK; } -ErrorCode -CacheManager::CreateInSingleBatch(RequestContext *request_context, - const std::string &instance_id, - const CacheManager::KeyVector &keys, - const std::vector &location_spec_group_names, - const std::shared_ptr &instance_info, - const std::shared_ptr &data_storage_manager, - const std::string &unique_name, - std::vector &allocated_uris, - std::vector>> &key_to_uris, - bool &is_create_success, - int64_t common_size) { - SPAN_TRACER(request_context); - const std::string &trace_id = request_context->trace_id(); - std::vector merged_block_keys; - std::vector merged_keys_idx; - std::vector spec_info_mapping; - merged_block_keys.reserve(instance_info->location_spec_infos().size() * keys.size()); - merged_keys_idx.reserve(instance_info->location_spec_infos().size() * keys.size()); - spec_info_mapping.reserve(instance_info->location_spec_infos().size() * keys.size()); - - for (const auto &spec_info : instance_info->location_spec_infos()) { - if (location_spec_group_names.empty()) { - for (size_t i = 0; i < keys.size(); i++) { - std::string block_key = instance_id + "/" + spec_info.name() + "/" + StringUtil::Uint64ToHex(keys[i]); - merged_block_keys.push_back(block_key); - merged_keys_idx.push_back(i); - spec_info_mapping.push_back(&spec_info); - } - } else { - for (size_t i = 0; i < keys.size(); i++) { - auto [ec, found] = IsSpecNameInSpecGroup(trace_id, - instance_id, - spec_info.name(), - location_spec_group_names[i], - instance_info->location_spec_groups()); - RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "IsSpecNameInSpecGroup failed"); - if (found) { - std::string block_key = - instance_id + "/" + spec_info.name() + "/" + StringUtil::Uint64ToHex(keys[i]); - merged_block_keys.push_back(block_key); - merged_keys_idx.push_back(i); - spec_info_mapping.push_back(&spec_info); - } - } - } - } - - std::vector> results = data_storage_manager->Create( - request_context, unique_name, merged_block_keys, common_size, []() { /* do nothing */ }); - - for (size_t i = 0; i < results.size(); i++) { - if (results[i].first == ErrorCode::EC_OK) { - allocated_uris.push_back(results[i].second); - key_to_uris[merged_keys_idx[i]].push_back({allocated_uris.size() - 1, spec_info_mapping[i]}); - } - } - // TODO: move check to another function - if (results.size() != merged_block_keys.size()) { - is_create_success = false; - PREFIX_LOG(WARN, - "create data storage fail, results size:%ld, request size: %ld", - results.size(), - merged_block_keys.size()); - } - for (auto &result : results) { - if (result.first != ErrorCode::EC_OK) { - is_create_success = false; - PREFIX_LOG(WARN, "create data storage fail, ec_code: %d", result.first); - break; - } - } - return EC_OK; -} - -ErrorCode CacheManager::CreateBySpec(RequestContext *request_context, - const std::string &instance_id, - const CacheManager::KeyVector &keys, - const std::vector &location_spec_group_names, - const std::shared_ptr &instance_info, - const std::shared_ptr &data_storage_manager, - const std::string &unique_name, - std::vector &allocated_uris, - std::vector>> &key_to_uris, - bool &is_create_success) { - // avoid use file across tp ranks - SPAN_TRACER(request_context); - const std::string &trace_id = request_context->trace_id(); - for (const auto &spec_info : instance_info->location_spec_infos()) { - std::vector block_keys; - std::vector keys_idx; - block_keys.reserve(keys.size()); - keys_idx.reserve(keys.size()); - if (location_spec_group_names.empty()) { - for (size_t i = 0; i < keys.size(); i++) { - std::string block_key = instance_id + "/" + spec_info.name() + "/" + StringUtil::Uint64ToHex(keys[i]); - block_keys.push_back(block_key); - keys_idx.push_back(i); - } - } else { - for (size_t i = 0; i < keys.size(); i++) { - auto [ec, found] = IsSpecNameInSpecGroup(trace_id, - instance_id, - spec_info.name(), - location_spec_group_names[i], - instance_info->location_spec_groups()); - RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "IsSpecNameInSpecGroup failed"); - if (found) { - std::string block_key = - instance_id + "/" + spec_info.name() + "/" + StringUtil::Uint64ToHex(keys[i]); - block_keys.push_back(block_key); - keys_idx.push_back(i); - } - } - } - - std::vector> results = data_storage_manager->Create( - request_context, unique_name, block_keys, spec_info.size(), []() { /* do nothing */ }); - - for (size_t i = 0; i < results.size(); i++) { - if (results[i].first == ErrorCode::EC_OK) { - allocated_uris.push_back(results[i].second); - key_to_uris[keys_idx[i]].push_back({allocated_uris.size() - 1, &spec_info}); - } - } - - // TODO: move check to another function - if (results.size() != block_keys.size()) { - is_create_success = false; - PREFIX_LOG(WARN, - "create data storage fail, results size:%ld, request size: %ld", - results.size(), - block_keys.size()); - } - for (auto &result : results) { - if (result.first != ErrorCode::EC_OK) { - is_create_success = false; - PREFIX_LOG(WARN, "create data storage fail, ec_code: %d", result.first); - break; - } - } - - if (!is_create_success) { - break; - } - } - return EC_OK; -} - ErrorCode CacheManager::GenWriteLocation(RequestContext *request_context, const std::string &instance_id, const CacheManager::KeyVector &keys, @@ -838,46 +689,103 @@ ErrorCode CacheManager::GenWriteLocation(RequestContext *request_context, request_context, instance_info->instance_group_name()); RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, select_result.ec, "select storage backend failed"); + // Build CreateBlocksRequest - resolve spec matching and group keys by spec + CreateBlocksRequest create_request; + create_request.instance_id = instance_id; + create_request.spec_block_keys.reserve(instance_info->location_spec_infos().size()); + + // Build a mapping from spec name to spec info for result processing + std::vector spec_info_mapping; + spec_info_mapping.reserve(instance_info->location_spec_infos().size()); + + for (const auto &spec_info : instance_info->location_spec_infos()) { + SpecBlockKeys spec_block; + spec_block.spec_name = spec_info.name(); + spec_block.spec_size = spec_info.size(); + + // Collect keys that belong to this spec + if (location_spec_group_names.empty()) { + // No group constraints - all keys go to all specs + spec_block.block_keys.reserve(keys.size()); + spec_block.original_key_indices.reserve(keys.size()); + for (size_t i = 0; i < keys.size(); i++) { + spec_block.block_keys.push_back(keys[i]); + spec_block.original_key_indices.push_back(i); + } + } else { + // Check each key against group constraints + for (size_t i = 0; i < keys.size(); i++) { + auto [ec, found] = IsSpecNameInSpecGroup(trace_id, + instance_id, + spec_info.name(), + location_spec_group_names[i], + instance_info->location_spec_groups()); + RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "IsSpecNameInSpecGroup failed"); + if (found) { + spec_block.block_keys.push_back(keys[i]); + spec_block.original_key_indices.push_back(i); + } + } + } + + // Only add spec if it has keys to create + if (!spec_block.block_keys.empty()) { + create_request.spec_block_keys.push_back(std::move(spec_block)); + spec_info_mapping.push_back(&spec_info); + } + } + + // Call backend with structured request + auto create_result = + data_storage_manager->Create(request_context, select_result.name, create_request, []() { /* do nothing */ }); + + // key_to_uris[i]: all (uri, spec_info) pairs allocated for keys[i], across every spec + // allocated_uris: flat list of every uri allocated, used only for rollback on failure + using KeyUriEntry = std::pair; + std::vector> key_to_uris(keys.size()); std::vector allocated_uris; allocated_uris.reserve(instance_info->location_spec_infos().size() * keys.size()); - std::vector>> key_to_uris(keys.size()); bool is_create_success = true; - bool merge = instance_info->location_spec_infos().empty() || - std::all_of(instance_info->location_spec_infos().begin() + 1, - instance_info->location_spec_infos().end(), - [&instance_info](const auto &spec) { - return spec.size() == instance_info->location_spec_infos().front().size(); - }); - int64_t common_size = merge && !instance_info->location_spec_infos().empty() - ? instance_info->location_spec_infos().front().size() - : 0; - - if (merge) { - auto ec = CreateInSingleBatch(request_context, - instance_id, - keys, - location_spec_group_names, - instance_info, - data_storage_manager, - select_result.name, - allocated_uris, - key_to_uris, - is_create_success, - common_size); - RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "CreateInSingleBatch failed"); + // Process results - structured return aligns 1:1 with spec_block_keys + if (create_result.size() != create_request.spec_block_keys.size()) { + is_create_success = false; + PREFIX_LOG(WARN, + "create data storage fail, results size:%ld, expected size: %ld", + create_result.size(), + create_request.spec_block_keys.size()); } else { - auto ec = CreateBySpec(request_context, - instance_id, - keys, - location_spec_group_names, - instance_info, - data_storage_manager, - select_result.name, - allocated_uris, - key_to_uris, - is_create_success); - RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "CreateBySpec failed"); + for (size_t spec_idx = 0; spec_idx < create_request.spec_block_keys.size(); ++spec_idx) { + const auto &spec_block = create_request.spec_block_keys[spec_idx]; + const auto &spec_result = create_result[spec_idx]; + const LocationSpecInfo *spec_info = spec_info_mapping[spec_idx]; + + if (spec_result.size() != spec_block.block_keys.size()) { + is_create_success = false; + PREFIX_LOG(WARN, + "create data storage fail, spec [%s] results size:%ld, expected size: %ld", + spec_block.spec_name.c_str(), + spec_result.size(), + spec_block.block_keys.size()); + break; + } + + for (size_t key_idx = 0; key_idx < spec_result.size(); ++key_idx) { + const auto &[ec, uri] = spec_result[key_idx]; + if (ec == ErrorCode::EC_OK) { + size_t original_key_idx = spec_block.original_key_indices[key_idx]; + key_to_uris[original_key_idx].emplace_back(uri, spec_info); + allocated_uris.push_back(uri); + } else { + is_create_success = false; + PREFIX_LOG(WARN, "create data storage fail, ec_code: %d", ec); + } + } + + if (!is_create_success) { + break; + } + } } if (!is_create_success) { @@ -905,10 +813,10 @@ ErrorCode CacheManager::GenWriteLocation(RequestContext *request_context, for (const auto &uris : key_to_uris) { CacheLocation cache_location; cache_location.set_type(select_result.type); - for (const auto &[data_storage_uri_idx, location_spec_info] : uris) { + for (const auto &[uri, location_spec_info] : uris) { LocationSpec location_spec; location_spec.set_name(location_spec_info->name()); - location_spec.set_uri(allocated_uris[data_storage_uri_idx].ToUriString()); + location_spec.set_uri(uri.ToUriString()); cache_location.push_location_spec(std::move(location_spec)); } cache_location.set_spec_size(uris.size()); diff --git a/kv_cache_manager/manager/cache_manager.h b/kv_cache_manager/manager/cache_manager.h index 18aaf1f8..52f03988 100644 --- a/kv_cache_manager/manager/cache_manager.h +++ b/kv_cache_manager/manager/cache_manager.h @@ -146,27 +146,7 @@ class CacheManager { const CacheManager::KeyVector &keys, const std::vector &location_spec_group_names, CacheLocationVector &new_locations); - ErrorCode CreateInSingleBatch(RequestContext *request_context, - const std::string &instance_id, - const CacheManager::KeyVector &keys, - const std::vector &location_spec_group_names, - const std::shared_ptr &instance_info, - const std::shared_ptr &data_storage_manager, - const std::string &unique_name, - std::vector &allocated_uris, - std::vector>> &key_to_uris, - bool &is_create_success, - int64_t common_size); - ErrorCode CreateBySpec(RequestContext *request_context, - const std::string &instance_id, - const CacheManager::KeyVector &keys, - const std::vector &location_spec_group_names, - const std::shared_ptr &instance_info, - const std::shared_ptr &data_storage_manager, - const std::string &unique_name, - std::vector &allocated_uris, - std::vector>> &key_to_uris, - bool &is_create_success); + ErrorCode TryCreateMetaSearcher(RequestContext *request_context, const std::string &instance_id); std::pair CheckInputAndGetMetaSearcher(RequestContext *request_context, diff --git a/open_source/kv_cache_manager/data_storage/tair_mempool_backend.cc b/open_source/kv_cache_manager/data_storage/tair_mempool_backend.cc index 86b9b64b..787b0b85 100644 --- a/open_source/kv_cache_manager/data_storage/tair_mempool_backend.cc +++ b/open_source/kv_cache_manager/data_storage/tair_mempool_backend.cc @@ -45,10 +45,9 @@ ErrorCode TairMempoolBackend::Close() { return EC_ERROR; } -std::vector> TairMempoolBackend::Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) { +std::vector TairMempoolBackend::Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) { KVCM_LOG_ERROR("no implementation for TairMempoolBackend"); return {}; } diff --git a/open_source/kv_cache_manager/data_storage/tair_mempool_backend.h b/open_source/kv_cache_manager/data_storage/tair_mempool_backend.h index 75c4d2a1..ca1a2c26 100644 --- a/open_source/kv_cache_manager/data_storage/tair_mempool_backend.h +++ b/open_source/kv_cache_manager/data_storage/tair_mempool_backend.h @@ -20,10 +20,9 @@ class TairMempoolBackend : public DataStorageBackend { ErrorCode Close() override; - std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) override; + std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) override; std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) override; diff --git a/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.cc b/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.cc index 19329dec..de1993eb 100644 --- a/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.cc +++ b/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.cc @@ -27,11 +27,10 @@ ErrorCode VcnsHf3fsBackend::Close() { return EC_ERROR; }; -std::vector> VcnsHf3fsBackend::Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) { - KVCM_LOG_ERROR("no implementation for TairMempoolBackend"); +std::vector VcnsHf3fsBackend::Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) { + KVCM_LOG_ERROR("no implementation for VcnsHf3fsBackend"); return {}; }; diff --git a/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.h b/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.h index 371edf82..870e594a 100644 --- a/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.h +++ b/open_source/kv_cache_manager/data_storage/vcns_hf3fs_backend.h @@ -17,10 +17,9 @@ class VcnsHf3fsBackend : public DataStorageBackend { ErrorCode DoOpen(const StorageConfig &storage_config, const std::string &trace_id) override; ErrorCode Close() override; - std::vector> Create(const std::vector &keys, - size_t size_per_key, - const std::string &trace_id, - std::function cb) override; + std::vector Create(const CreateBlocksRequest &request, + const std::string &trace_id, + std::function cb) override; std::vector Delete(const std::vector &storage_uris, const std::string &trace_id, std::function cb) override;