Skip to content

Commit 99767c1

Browse files
authored
[manager] handle policy empty when all storage unavailable (#7)
1 parent 99607d3 commit 99767c1

File tree

2 files changed

+51
-23
lines changed

2 files changed

+51
-23
lines changed

kv_cache_manager/manager/cache_manager.cc

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -356,31 +356,35 @@ CacheManager::StartWriteCache(RequestContext *request_context,
356356
std::vector<std::string_view> new_location_spec_group_names;
357357
KVCM_METRICS_COLLECTOR_CHRONO_MARK_BEGIN(service_metrics_collector, ManagerFilterWriteCache);
358358
KeyVector query_keys = keys;
359+
360+
ErrorCode filter_ec;
359361
if (!keys.empty()) {
360362
KVCM_METRICS_COLLECTOR_SET_METRICS(service_metrics_collector, manager, request_key_count, keys.size());
361-
ec = FilterWriteCache(request_context,
362-
instance_id,
363-
meta_searcher,
364-
keys,
365-
new_keys,
366-
location_spec_group_names,
367-
new_location_spec_group_names,
368-
block_mask);
363+
filter_ec = FilterWriteCache(request_context,
364+
instance_id,
365+
meta_searcher,
366+
keys,
367+
new_keys,
368+
location_spec_group_names,
369+
new_location_spec_group_names,
370+
block_mask);
369371
} else {
370372
auto [ec_temp, block_size] = GetBlockSize(request_context, instance_id);
371373
RETURN_IF_EC_NOT_OK_WITH_TYPE_LOG(WARN, ec_temp, StartWriteCacheInfo, "start write cache failed");
372374
auto gen_keys = GenKeyVector(tokens, block_size);
373375
query_keys = gen_keys;
374376
KVCM_METRICS_COLLECTOR_SET_METRICS(service_metrics_collector, manager, request_key_count, gen_keys.size());
375-
ec = FilterWriteCache(request_context,
376-
instance_id,
377-
meta_searcher,
378-
gen_keys,
379-
new_keys,
380-
location_spec_group_names,
381-
new_location_spec_group_names,
382-
block_mask);
377+
filter_ec = FilterWriteCache(request_context,
378+
instance_id,
379+
meta_searcher,
380+
gen_keys,
381+
new_keys,
382+
location_spec_group_names,
383+
new_location_spec_group_names,
384+
block_mask);
383385
}
386+
RETURN_IF_EC_NOT_OK_WITH_TYPE_LOG(WARN, filter_ec, StartWriteCacheInfo, "filter write cache failed");
387+
384388
std::vector<std::string> location_ids;
385389
std::string write_session_id = StringUtil::GenerateRandomString(32);
386390
if (new_keys.empty()) {
@@ -598,6 +602,9 @@ ErrorCode CacheManager::FilterWriteCache(RequestContext *request_context,
598602
RETURN_IF_EC_NOT_OK_WITH_LOG(WARN, ec, "BatchGetLocation failed");
599603
assert(keys.size() == location_maps.size());
600604
auto policy = genSelectLocationPolicy(request_context, instance_id);
605+
if (!policy) {
606+
return EC_ERROR;
607+
}
601608
auto first_empty = std::find_if(
602609
location_maps.begin(), location_maps.end(), [&policy](const auto &m) { return !policy->ExistsForWrite(m); });
603610
bool only_prefix_not_empty =
@@ -1113,7 +1120,7 @@ std::unique_ptr<SelectLocationPolicy> CacheManager::genSelectLocationPolicy(Requ
11131120
if (group_available_storages.size() >= group_storages.size()) {
11141121
return std::make_unique<StaticWeightSLPolicy>();
11151122
}
1116-
if (group_available_storages.size() == 0) {
1123+
if (group_available_storages.empty()) {
11171124
request_context->error_tracer()->AddErrorMsg("all storages are unavailable");
11181125
KVCM_INTERVAL_LOG_WARN(10, "all storages are unavailable!");
11191126
return nullptr;

kv_cache_manager/manager/test/cache_manager_test.cc

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class CacheManagerTest : public TESTBASE {
4141

4242
std::unique_ptr<CacheManager> createCacheManager() {
4343
std::shared_ptr<MetricsRegistry> metrics_registry = std::make_shared<MetricsRegistry>();
44-
std::shared_ptr<RegistryManager> registry_manager = std::make_shared<RegistryManager>("", metrics_registry);
44+
registry_manager_ = std::make_shared<RegistryManager>("", metrics_registry);
4545
std::shared_ptr<InstanceGroup> instance_group = std::make_shared<InstanceGroup>();
4646
auto meta_indexer_config = std::make_shared<MetaIndexerConfig>();
4747
instance_group->cache_config_ = std::make_shared<CacheConfig>();
@@ -55,18 +55,18 @@ class CacheManagerTest : public TESTBASE {
5555

5656
std::shared_ptr<InstanceInfo> instance_info = std::make_shared<InstanceInfo>(
5757
"test_quota_group", "default", "test_instance", 64, createLocationSpecInfos(), createModelDeployment());
58-
registry_manager->instance_group_configs_["test_group"] = instance_group;
59-
registry_manager->instance_infos_["test_instance"] = instance_info;
60-
registry_manager->Init();
58+
registry_manager_->instance_group_configs_["test_group"] = instance_group;
59+
registry_manager_->instance_infos_["test_instance"] = instance_info;
60+
registry_manager_->Init();
6161
std::unique_ptr<CacheManager> cache_manager =
62-
std::make_unique<CacheManager>(metrics_registry, registry_manager);
62+
std::make_unique<CacheManager>(metrics_registry, registry_manager_);
6363

6464
EXPECT_TRUE(cache_manager->Init());
6565

6666
// load first because we need default group
6767
// in real usage, we load startup config after recover
6868
StartupConfigLoader loader;
69-
loader.Init(registry_manager);
69+
loader.Init(registry_manager_);
7070
loader.Load("");
7171

7272
EXPECT_EQ(EC_OK, cache_manager->DoRecover());
@@ -108,6 +108,7 @@ class CacheManagerTest : public TESTBASE {
108108
}
109109

110110
std::unique_ptr<CacheManager> cache_manager_;
111+
std::shared_ptr<RegistryManager> registry_manager_;
111112
std::shared_ptr<RequestContext> request_context_;
112113
};
113114

@@ -1333,4 +1334,24 @@ TEST_F(CacheManagerTest, TestUnavailableStorage) {
13331334
test_match_location(10, 4, "nfs_test_01"); // match available location
13341335
}
13351336

1337+
TEST_F(CacheManagerTest, TestStartWriteCacheWithNoAvailableStorage) {
1338+
auto expected = std::pair<ErrorCode, std::string>(EC_OK, default_storage_configs);
1339+
ASSERT_EQ(expected,
1340+
cache_manager_->RegisterInstance(request_context_.get(),
1341+
"default",
1342+
"test_instance",
1343+
64,
1344+
createLocationSpecInfos(),
1345+
createModelDeployment(),
1346+
std::vector<LocationSpecGroup>()));
1347+
1348+
ASSERT_EQ(EC_OK, registry_manager_->DisableStorage(request_context_.get(), "nfs_01"));
1349+
1350+
std::vector<int64_t> keys{1, 2, 3, 4};
1351+
auto [ec, start_write_cache_info] =
1352+
cache_manager_->StartWriteCache(request_context_.get(), "test_instance", keys, {}, {}, 100000000);
1353+
EXPECT_EQ(EC_ERROR, ec);
1354+
EXPECT_EQ(0, start_write_cache_info.locations().cache_locations_view().size());
1355+
}
1356+
13361357
} // namespace kv_cache_manager

0 commit comments

Comments
 (0)