Skip to content

Commit 7e1f44e

Browse files
committed
update test
1 parent 7e47ca5 commit 7e1f44e

File tree

6 files changed

+203
-106
lines changed

6 files changed

+203
-106
lines changed

fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
8585
num_shards_(num_shards),
8686
weight_ttl_in_hours_(weight_ttl_in_hours),
8787
block_size_(FixedBlockPool::calculate_block_size<weight_type>(max_D)),
88-
block_alignment_(FixedBlockPool::calculate_block_alignment<weight_type>()),
88+
block_alignment_(
89+
FixedBlockPool::calculate_block_alignment<weight_type>()),
8990
kv_store_(SynchronizedShardedMap<int64_t, weight_type*>(
9091
num_shards_,
9192
block_size_,
@@ -262,8 +263,11 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
262263
feature_evict_->update_feature_statistics(block);
263264
}
264265
auto* data_ptr = FixedBlockPool::data_ptr<weight_type>(block);
265-
std::copy(weights[id_index].template data_ptr<weight_type>(),
266-
weights[id_index].template data_ptr<weight_type>() + weights[id_index].numel(),
266+
std::copy(weights[id_index]
267+
.template data_ptr<weight_type>(),
268+
weights[id_index]
269+
.template data_ptr<weight_type>() +
270+
weights[id_index].numel(),
267271
data_ptr);
268272
}
269273
}
@@ -366,7 +370,9 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
366370
continue;
367371
}
368372
// use mempool
369-
const auto* data_ptr = FixedBlockPool::data_ptr<weight_type>(cached_iter->second);
373+
const auto* data_ptr =
374+
FixedBlockPool::data_ptr<weight_type>(
375+
cached_iter->second);
370376
std::copy(
371377
data_ptr + width_offset,
372378
data_ptr + width_offset + row_width,

fbgemm_gpu/src/dram_kv_embedding_cache/fixed_block_pool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ class FixedBlockPool : public std::pmr::memory_resource {
236236
const std::size_t block_alignment_; // Block alignment requirement
237237
const std::size_t blocks_per_chunk_; // Number of blocks per chunk
238238
std::pmr::memory_resource* upstream_; // Upstream memory resource
239-
std::pmr::vector<chunk_info> chunks_{1024}; // Records of all allocated chunks
239+
std::pmr::vector<ChunkInfo> chunks_{1024}; // Records of all allocated chunks
240240
void* free_list_ = nullptr; // Free block list head pointer
241241
};
242242
} // namespace kv_mem

fbgemm_gpu/test/dram_kv_embedding_cache/feature_evict_test.cpp

Lines changed: 120 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,69 @@ size_t BLOCK_ALIGNMENT = FixedBlockPool::calculate_block_alignment<float>();
1818
TEST(FeatureEvictTest, CounterBasedEviction) {
1919
static constexpr int NUM_SHARDS = 8;
2020
auto executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(4);
21-
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
21+
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(
22+
NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
2223

2324
// Insert test data
2425
for (int i = 0; i < 1000; ++i) {
2526
int shard_id = i % NUM_SHARDS;
2627
auto wlock = kv_store_->by(shard_id).wlock();
2728
auto* pool = kv_store_->pool_by(shard_id);
2829
auto* block = pool->allocate_t<float>();
30+
ASSERT_NE(block, nullptr);
2931
FixedBlockPool::set_key(block, i);
30-
FixedBlockPool::set_count(block, 1); // Initial score
32+
FixedBlockPool::set_count(block, 1);
3133
FixedBlockPool::set_used(block, true);
32-
wlock->insert({i, block});
34+
auto result = wlock->insert({i, block});
35+
ASSERT_TRUE(result.second);
3336
}
3437

3538
for (int i = 1000; i < 2000; ++i) {
3639
int shard_id = i % NUM_SHARDS;
3740
auto wlock = kv_store_->by(shard_id).wlock();
3841
auto* pool = kv_store_->pool_by(shard_id);
3942
auto* block = pool->allocate_t<float>();
43+
ASSERT_NE(block, nullptr);
4044
FixedBlockPool::set_key(block, i);
41-
FixedBlockPool::set_count(block, 2); // Initial score
45+
FixedBlockPool::set_count(block, 2);
4246
FixedBlockPool::set_used(block, true);
43-
wlock->insert({i, block});
47+
auto result = wlock->insert({i, block});
48+
ASSERT_TRUE(result.second);
49+
}
50+
51+
size_t count_1_blocks = 0, count_2_blocks = 0;
52+
for (int shard_id = 0; shard_id < NUM_SHARDS; ++shard_id) {
53+
auto rlock = kv_store_->by(shard_id).rlock();
54+
for (const auto& [key, block] : *rlock) {
55+
uint32_t count = FixedBlockPool::get_count(block);
56+
if (count == 1)
57+
count_1_blocks++;
58+
else if (count == 2)
59+
count_2_blocks++;
60+
ASSERT_TRUE(FixedBlockPool::get_used(block));
61+
}
4462
}
63+
ASSERT_EQ(count_1_blocks, 1000);
64+
ASSERT_EQ(count_2_blocks, 1000);
4565

4666
std::unique_ptr<FeatureEvict<float>> feature_evict;
4767
int evict_trigger_mode = 2;
4868
int evict_trigger_strategy = 1;
4969
uint32_t count_threshold = 1;
5070
float count_decay_rate = 0.5;
51-
// feature evict config
52-
FeatureEvictConfig feature_evict_config;
53-
feature_evict_config.trigger_mode = static_cast<EvictTriggerMode>(evict_trigger_mode);
54-
feature_evict_config.trigger_strategy = static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
71+
72+
FeatureEvictConfig feature_evict_config{};
73+
feature_evict_config.trigger_mode =
74+
static_cast<EvictTriggerMode>(evict_trigger_mode);
75+
feature_evict_config.trigger_strategy =
76+
static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
5577
feature_evict_config.count_threshold = count_threshold;
5678
feature_evict_config.count_decay_rate = count_decay_rate;
5779

5880
if (feature_evict_config.trigger_mode != EvictTriggerMode::DISABLED) {
59-
feature_evict = create_feature_evict(feature_evict_config, executor_.get(),*kv_store_.get(), 4);
81+
feature_evict = create_feature_evict(
82+
feature_evict_config, executor_.get(), *kv_store_.get(), 4);
83+
ASSERT_NE(feature_evict, nullptr);
6084
}
6185

6286
// Initial validation
@@ -69,69 +93,101 @@ TEST(FeatureEvictTest, CounterBasedEviction) {
6993

7094
// Perform eviction
7195
feature_evict->trigger_evict();
96+
ASSERT_TRUE(feature_evict->is_evicting());
7297

7398
// Validate eviction process
74-
while (feature_evict->is_evicting()) {
99+
int max_iterations = 1000;
100+
int iterations = 0;
101+
while (feature_evict->is_evicting() && iterations < max_iterations) {
75102
feature_evict->resume();
76103
std::this_thread::sleep_for(std::chrono::microseconds(5));
77104
feature_evict->pause();
105+
iterations++;
78106
}
107+
ASSERT_LT(iterations, max_iterations);
108+
ASSERT_FALSE(feature_evict->is_evicting());
79109

80110
// Validate results
81111
size_t remaining = 0;
112+
size_t remaining_count_1 = 0;
82113
for (int shard_id = 0; shard_id < NUM_SHARDS; ++shard_id) {
83114
auto rlock = kv_store_->by(shard_id).rlock();
84115
remaining += rlock->size();
85116
// Validate score decay
86117
for (const auto& [key, block] : *rlock) {
87-
ASSERT_EQ(FixedBlockPool::get_count(block), 1);
118+
uint32_t count = FixedBlockPool::get_count(block);
119+
ASSERT_EQ(count, 1);
120+
if (count == 1) remaining_count_1++;
88121
}
89122
}
90123
std::cout << "remaining: " << remaining << std::endl;
91124
ASSERT_EQ(remaining, 1000);
125+
ASSERT_EQ(remaining_count_1, 1000);
92126
}
93127

94128
TEST(FeatureEvictTest, TimeBasedEviction) {
95129
static constexpr int NUM_SHARDS = 8;
96130
auto executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(4);
97-
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
131+
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(
132+
NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
133+
134+
auto start_time = std::chrono::steady_clock::now();
98135

99136
// Insert test data
100137
for (int i = 0; i < 1000; ++i) {
101138
int shard_id = i % NUM_SHARDS;
102139
auto wlock = kv_store_->by(shard_id).wlock();
103140
auto* pool = kv_store_->pool_by(shard_id);
104141
auto* block = pool->allocate_t<float>();
142+
ASSERT_NE(block, nullptr);
105143
FixedBlockPool::set_key(block, i);
106-
FixedBlockPool::update_timestamp(block); // Initial score
144+
FixedBlockPool::update_timestamp(block);
107145
FixedBlockPool::set_used(block, true);
108-
wlock->insert({i, block});
146+
auto result = wlock->insert({i, block});
147+
ASSERT_TRUE(result.second);
109148
}
149+
110150
std::this_thread::sleep_for(std::chrono::seconds(5));
151+
auto mid_time = std::chrono::steady_clock::now();
111152

112153
for (int i = 1000; i < 2000; ++i) {
113154
int shard_id = i % NUM_SHARDS;
114155
auto wlock = kv_store_->by(shard_id).wlock();
115156
auto* pool = kv_store_->pool_by(shard_id);
116157
auto* block = pool->allocate_t<float>();
158+
ASSERT_NE(block, nullptr);
117159
FixedBlockPool::set_key(block, i);
118-
FixedBlockPool::update_timestamp(block); // Initial score
160+
FixedBlockPool::update_timestamp(block);
119161
FixedBlockPool::set_used(block, true);
120-
wlock->insert({i, block});
162+
auto result = wlock->insert({i, block});
163+
ASSERT_TRUE(result.second);
121164
}
122165

166+
auto current_time = std::chrono::steady_clock::now();
167+
auto old_blocks_age = std::chrono::duration_cast<std::chrono::seconds>(
168+
current_time - start_time)
169+
.count();
170+
auto new_blocks_age =
171+
std::chrono::duration_cast<std::chrono::seconds>(current_time - mid_time)
172+
.count();
173+
ASSERT_GT(old_blocks_age, new_blocks_age); // 确保时间差异存在
174+
123175
std::unique_ptr<FeatureEvict<float>> feature_evict;
124176
int evict_trigger_mode = 2;
125177
int evict_trigger_strategy = 0;
126178
uint32_t ttl = 4;
127-
// feature evict config
179+
128180
FeatureEvictConfig feature_evict_config;
129-
feature_evict_config.trigger_mode = static_cast<EvictTriggerMode>(evict_trigger_mode);
130-
feature_evict_config.trigger_strategy = static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
181+
feature_evict_config.trigger_mode =
182+
static_cast<EvictTriggerMode>(evict_trigger_mode);
183+
feature_evict_config.trigger_strategy =
184+
static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
131185
feature_evict_config.ttl = ttl;
132186

133187
if (feature_evict_config.trigger_mode != EvictTriggerMode::DISABLED) {
134-
feature_evict = create_feature_evict(feature_evict_config, executor_.get(),*kv_store_.get(), 4);
188+
feature_evict = create_feature_evict(
189+
feature_evict_config, executor_.get(), *kv_store_.get(), 4);
190+
ASSERT_NE(feature_evict, nullptr);
135191
}
136192

137193
// Initial validation
@@ -144,28 +200,40 @@ TEST(FeatureEvictTest, TimeBasedEviction) {
144200

145201
// Perform eviction
146202
feature_evict->trigger_evict();
203+
ASSERT_TRUE(feature_evict->is_evicting());
147204

148205
// Validate eviction process
149-
while (feature_evict->is_evicting()) {
206+
int max_iterations = 1000;
207+
int iterations = 0;
208+
while (feature_evict->is_evicting() && iterations < max_iterations) {
150209
feature_evict->resume();
151210
std::this_thread::sleep_for(std::chrono::microseconds(5));
152211
feature_evict->pause();
212+
iterations++;
153213
}
214+
ASSERT_LT(iterations, max_iterations);
215+
ASSERT_FALSE(feature_evict->is_evicting());
154216

155217
// Validate results
156218
size_t remaining = 0;
219+
size_t newer_blocks = 0;
157220
for (int shard_id = 0; shard_id < NUM_SHARDS; ++shard_id) {
158221
auto rlock = kv_store_->by(shard_id).rlock();
159222
remaining += rlock->size();
223+
for (const auto& [key, block] : *rlock) {
224+
if (key >= 1000) newer_blocks++;
225+
ASSERT_TRUE(FixedBlockPool::get_used(block));
226+
}
160227
}
161228
std::cout << "remaining: " << remaining << std::endl;
162229
ASSERT_EQ(remaining, 1000);
230+
ASSERT_GT(newer_blocks, 800);
163231
}
164-
165232
TEST(FeatureEvictTest, TimeCounterBasedEviction) {
166233
static constexpr int NUM_SHARDS = 8;
167234
auto executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(4);
168-
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
235+
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(
236+
NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
169237

170238
// Insert test data
171239
for (int i = 0; i < 500; ++i) {
@@ -198,7 +266,7 @@ TEST(FeatureEvictTest, TimeCounterBasedEviction) {
198266
auto* pool = kv_store_->pool_by(shard_id);
199267
auto* block = pool->allocate_t<float>();
200268
FixedBlockPool::set_key(block, i);
201-
FixedBlockPool::update_timestamp(block); // Initial score
269+
FixedBlockPool::update_timestamp(block); // Initial score
202270
FixedBlockPool::set_count(block, 2);
203271
FixedBlockPool::set_used(block, true);
204272
wlock->insert({i, block});
@@ -213,14 +281,17 @@ TEST(FeatureEvictTest, TimeCounterBasedEviction) {
213281

214282
// feature evict config
215283
FeatureEvictConfig feature_evict_config;
216-
feature_evict_config.trigger_mode = static_cast<EvictTriggerMode>(evict_trigger_mode);
217-
feature_evict_config.trigger_strategy = static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
284+
feature_evict_config.trigger_mode =
285+
static_cast<EvictTriggerMode>(evict_trigger_mode);
286+
feature_evict_config.trigger_strategy =
287+
static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
218288
feature_evict_config.ttl = ttl;
219289
feature_evict_config.count_threshold = count_threshold;
220290
feature_evict_config.count_decay_rate = count_decay_rate;
221291

222292
if (feature_evict_config.trigger_mode != EvictTriggerMode::DISABLED) {
223-
feature_evict = create_feature_evict(feature_evict_config, executor_.get(),*kv_store_.get(), 4);
293+
feature_evict = create_feature_evict(
294+
feature_evict_config, executor_.get(), *kv_store_.get(), 4);
224295
}
225296

226297
// Initial validation
@@ -254,7 +325,8 @@ TEST(FeatureEvictTest, TimeCounterBasedEviction) {
254325
TEST(FeatureEvictTest, L2WeightBasedEviction) {
255326
static constexpr int NUM_SHARDS = 8;
256327
auto executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(4);
257-
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
328+
auto kv_store_ = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(
329+
NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT);
258330
int dim = 4;
259331
std::vector<float> weight1(dim, 1.0);
260332
// Insert test data
@@ -288,12 +360,15 @@ TEST(FeatureEvictTest, L2WeightBasedEviction) {
288360
double l2_weight_threshold = 3.0;
289361
// feature evict config
290362
FeatureEvictConfig feature_evict_config;
291-
feature_evict_config.trigger_mode = static_cast<EvictTriggerMode>(evict_trigger_mode);
292-
feature_evict_config.trigger_strategy = static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
363+
feature_evict_config.trigger_mode =
364+
static_cast<EvictTriggerMode>(evict_trigger_mode);
365+
feature_evict_config.trigger_strategy =
366+
static_cast<EvictTriggerStrategy>(evict_trigger_strategy);
293367
feature_evict_config.l2_weight_threshold = l2_weight_threshold;
294368

295369
if (feature_evict_config.trigger_mode != EvictTriggerMode::DISABLED) {
296-
feature_evict = create_feature_evict(feature_evict_config, executor_.get(),*kv_store_.get(), dim);
370+
feature_evict = create_feature_evict(
371+
feature_evict_config, executor_.get(), *kv_store_.get(), dim);
297372
}
298373

299374
// Initial validation
@@ -327,7 +402,8 @@ TEST(FeatureEvictTest, L2WeightBasedEviction) {
327402
TEST(FeatureEvictTest, PerformanceTest) {
328403
static constexpr int NUM_SHARDS = 1;
329404
// Test configurations
330-
const std::vector<int> test_sizes = {100'000, 500'000, 1'000'000, 5'000'000, 10'000'000};
405+
const std::vector<int> test_sizes = {
406+
100'000, 500'000, 1'000'000, 5'000'000, 10'000'000};
331407

332408
fmt::print("\nPerformance Test Results:\n");
333409
fmt::print("{:<15} {:<15} {:<15}\n", "Size", "Time(ms)", "Items/ms");
@@ -336,8 +412,8 @@ TEST(FeatureEvictTest, PerformanceTest) {
336412
for (const auto& size : test_sizes) {
337413
// Create executor and store for each test size
338414
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(8);
339-
auto kv_store =
340-
std::make_unique<SynchronizedShardedMap<int64_t, float*>>(NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT, 1000);
415+
auto kv_store = std::make_unique<SynchronizedShardedMap<int64_t, float*>>(
416+
NUM_SHARDS, BLOCK_SIZE, BLOCK_ALIGNMENT, 1000);
341417

342418
// Insert test data with different initial scores
343419
for (int i = 0; i < size; ++i) {
@@ -346,7 +422,8 @@ TEST(FeatureEvictTest, PerformanceTest) {
346422
auto* pool = kv_store->pool_by(shard_id);
347423
auto* block = pool->allocate_t<float>();
348424
FixedBlockPool::set_key(block, i);
349-
FixedBlockPool::set_count(block, (i % 2) ? 1 : 2); // Alternate between scores
425+
FixedBlockPool::set_count(block,
426+
(i % 2) ? 1 : 2); // Alternate between scores
350427
FixedBlockPool::set_used(block, true);
351428
wlock->insert({i, block});
352429
}
@@ -365,17 +442,22 @@ TEST(FeatureEvictTest, PerformanceTest) {
365442
}
366443

367444
auto end_time = std::chrono::high_resolution_clock::now();
368-
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
445+
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
446+
end_time - start_time)
447+
.count();
369448

370449
std::size_t current_size = 0;
371450
for (int shard_id = 0; shard_id < NUM_SHARDS; ++shard_id) {
372451
auto wlock = kv_store->by(shard_id).wlock();
373452
current_size += wlock->size();
374453
}
375-
double eviction_rate = static_cast<double>(size - current_size) / static_cast<double>(size);
454+
double eviction_rate =
455+
static_cast<double>(size - current_size) / static_cast<double>(size);
376456

377457
// Print results
378458
fmt::print("{:<15d} {:<15d} {:<15.2f}\n", size, duration, eviction_rate);
459+
ASSERT_LT(current_size, size);
460+
ASSERT_GT(eviction_rate, 0.0);
379461
}
380462
}
381463
} // namespace kv_mem

0 commit comments

Comments
 (0)