Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions kv_cache_manager/manager/cache_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,31 +356,35 @@ CacheManager::StartWriteCache(RequestContext *request_context,
std::vector<std::string_view> new_location_spec_group_names;
KVCM_METRICS_COLLECTOR_CHRONO_MARK_BEGIN(service_metrics_collector, ManagerFilterWriteCache);
KeyVector query_keys = keys;

ErrorCode filter_ec;
if (!keys.empty()) {
KVCM_METRICS_COLLECTOR_SET_METRICS(service_metrics_collector, manager, request_key_count, keys.size());
ec = FilterWriteCache(request_context,
instance_id,
meta_searcher,
keys,
new_keys,
location_spec_group_names,
new_location_spec_group_names,
block_mask);
filter_ec = FilterWriteCache(request_context,
instance_id,
meta_searcher,
keys,
new_keys,
location_spec_group_names,
new_location_spec_group_names,
block_mask);
} else {
auto [ec_temp, block_size] = GetBlockSize(request_context, instance_id);
RETURN_IF_EC_NOT_OK_WITH_TYPE_LOG(WARN, ec_temp, StartWriteCacheInfo, "start write cache failed");
auto gen_keys = GenKeyVector(tokens, block_size);
query_keys = gen_keys;
KVCM_METRICS_COLLECTOR_SET_METRICS(service_metrics_collector, manager, request_key_count, gen_keys.size());
ec = FilterWriteCache(request_context,
instance_id,
meta_searcher,
gen_keys,
new_keys,
location_spec_group_names,
new_location_spec_group_names,
block_mask);
filter_ec = FilterWriteCache(request_context,
instance_id,
meta_searcher,
gen_keys,
new_keys,
location_spec_group_names,
new_location_spec_group_names,
block_mask);
}
RETURN_IF_EC_NOT_OK_WITH_TYPE_LOG(WARN, filter_ec, StartWriteCacheInfo, "filter write cache failed");

std::vector<std::string> location_ids;
std::string write_session_id = StringUtil::GenerateRandomString(32);
if (new_keys.empty()) {
Expand Down Expand Up @@ -598,6 +602,9 @@ ErrorCode CacheManager::FilterWriteCache(RequestContext *request_context,
RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "BatchGetLocation failed");
assert(keys.size() == location_maps.size());
auto policy = genSelectLocationPolicy(request_context, instance_id);
if (!policy) {
return EC_ERROR;
}
auto first_empty = std::find_if(
location_maps.begin(), location_maps.end(), [&policy](const auto &m) { return !policy->ExistsForWrite(m); });
bool only_prefix_not_empty =
Expand Down Expand Up @@ -1113,7 +1120,7 @@ std::unique_ptr<SelectLocationPolicy> CacheManager::genSelectLocationPolicy(Requ
if (group_available_storages.size() >= group_storages.size()) {
return std::make_unique<StaticWeightSLPolicy>();
}
if (group_available_storages.size() == 0) {
if (group_available_storages.empty()) {
request_context->error_tracer()->AddErrorMsg("all storages are unavailable");
KVCM_INTERVAL_LOG_WARN(10, "all storages are unavailable!");
return nullptr;
Expand Down
33 changes: 27 additions & 6 deletions kv_cache_manager/manager/test/cache_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CacheManagerTest : public TESTBASE {

std::unique_ptr<CacheManager> createCacheManager() {
std::shared_ptr<MetricsRegistry> metrics_registry = std::make_shared<MetricsRegistry>();
std::shared_ptr<RegistryManager> registry_manager = std::make_shared<RegistryManager>("", metrics_registry);
registry_manager_ = std::make_shared<RegistryManager>("", metrics_registry);
std::shared_ptr<InstanceGroup> instance_group = std::make_shared<InstanceGroup>();
auto meta_indexer_config = std::make_shared<MetaIndexerConfig>();
instance_group->cache_config_ = std::make_shared<CacheConfig>();
Expand All @@ -55,18 +55,18 @@ class CacheManagerTest : public TESTBASE {

std::shared_ptr<InstanceInfo> instance_info = std::make_shared<InstanceInfo>(
"test_quota_group", "default", "test_instance", 64, createLocationSpecInfos(), createModelDeployment());
registry_manager->instance_group_configs_["test_group"] = instance_group;
registry_manager->instance_infos_["test_instance"] = instance_info;
registry_manager->Init();
registry_manager_->instance_group_configs_["test_group"] = instance_group;
registry_manager_->instance_infos_["test_instance"] = instance_info;
registry_manager_->Init();
std::unique_ptr<CacheManager> cache_manager =
std::make_unique<CacheManager>(metrics_registry, registry_manager);
std::make_unique<CacheManager>(metrics_registry, registry_manager_);

EXPECT_TRUE(cache_manager->Init());

// load first because we need default group
// in real usage, we load startup config after recover
StartupConfigLoader loader;
loader.Init(registry_manager);
loader.Init(registry_manager_);
loader.Load("");

EXPECT_EQ(EC_OK, cache_manager->DoRecover());
Expand Down Expand Up @@ -108,6 +108,7 @@ class CacheManagerTest : public TESTBASE {
}

std::unique_ptr<CacheManager> cache_manager_;
std::shared_ptr<RegistryManager> registry_manager_;
std::shared_ptr<RequestContext> request_context_;
};

Expand Down Expand Up @@ -1333,4 +1334,24 @@ TEST_F(CacheManagerTest, TestUnavailableStorage) {
test_match_location(10, 4, "nfs_test_01"); // match available location
}

TEST_F(CacheManagerTest, TestStartWriteCacheWithNoAvailableStorage) {
auto expected = std::pair<ErrorCode, std::string>(EC_OK, default_storage_configs);
ASSERT_EQ(expected,
cache_manager_->RegisterInstance(request_context_.get(),
"default",
"test_instance",
64,
createLocationSpecInfos(),
createModelDeployment(),
std::vector<LocationSpecGroup>()));

ASSERT_EQ(EC_OK, registry_manager_->DisableStorage(request_context_.get(), "nfs_01"));

std::vector<int64_t> keys{1, 2, 3, 4};
auto [ec, start_write_cache_info] =
cache_manager_->StartWriteCache(request_context_.get(), "test_instance", keys, {}, {}, 100000000);
EXPECT_EQ(EC_ERROR, ec);
EXPECT_EQ(0, start_write_cache_info.locations().cache_locations_view().size());
}

} // namespace kv_cache_manager