Skip to content

Commit c7c00bb

Browse files
committed
feat: optimize TopK heapify with dirty tracking
1 parent 1d03d59 commit c7c00bb

4 files changed

Lines changed: 78 additions & 33 deletions

File tree

src/types/redis_topk.cc

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ rocksdb::Status TopK::IncrBy(engine::Context &ctx, const Slice &user_key, const
6161

6262
std::vector<bool> is_dirty_buckets(topk_metadata.width * topk_metadata.depth, false);
6363
std::vector<bool> is_dirty_heaps(topk_metadata.top_k, false);
64-
topk.Add(items.data_, incr);
64+
topk.Add(items.data_, incr, is_dirty_buckets, is_dirty_heaps);
6565

66-
s = setTopkData(ctx, ns_key, topk_metadata, topk);
66+
s = setTopkData(ctx, ns_key, topk_metadata, topk, is_dirty_buckets, is_dirty_heaps);
6767
if (!s.ok()) return s;
6868

6969
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
@@ -154,10 +154,6 @@ rocksdb::Status TopK::createTopK(engine::Context &ctx, const Slice &ns_key, uint
154154
rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata,
155155
BlockSplitTopK *topk) {
156156
for (uint8_t i = 0; i < 3; i++) {
157-
std::string tk_key = getTKKey(ns_key, metadata, i);
158-
rocksdb::PinnableSlice pinnable_value;
159-
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), tk_key, &pinnable_value);
160-
if (!s.ok()) return s;
161157
if (i == 0) {
162158
// get buckets of topk structure
163159
for (uint32_t j = 0; j < metadata.width * metadata.depth; j++) {
@@ -166,13 +162,13 @@ rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, con
166162
rocksdb::PinnableSlice bk_value;
167163
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), bk_key, &bk_value);
168164
if (!s.ok()) return s;
169-
170-
int dep = j / metadata.width;
171-
int wid = j % metadata.width;
165+
166+
uint32_t dep = j / metadata.width;
167+
uint32_t wid = j % metadata.width;
172168
if (k == 0) {
173-
topk->buckets[dep][wid].fp = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
169+
topk->buckets[dep][wid].fp = static_cast<uint32_t>(std::stoul(bk_value.data()));
174170
} else {
175-
topk->buckets[dep][wid].count = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
171+
topk->buckets[dep][wid].count = static_cast<uint32_t>(std::stoul(bk_value.data()));
176172
}
177173
}
178174
}
@@ -186,23 +182,27 @@ rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, con
186182
if (!s.ok()) return s;
187183

188184
if (k == 0) {
189-
topk->heap[j].count = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
185+
topk->heap[j].count = static_cast<uint32_t>(std::stoul(hb_value.data()));
190186
} else if (k == 1) {
191-
topk->heap[j].fp = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
187+
topk->heap[j].fp = static_cast<uint32_t>(std::stoul(hb_value.data()));
192188
} else {
193189
topk->heap[j].item = hb_value.data();
194190
}
195191
}
196192
}
197193
} else {
194+
std::string tk_key = getTKKey(ns_key, metadata, i);
195+
rocksdb::PinnableSlice pinnable_value;
196+
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), tk_key, &pinnable_value);
197+
if (!s.ok()) return s;
198198
topk->heap_size = static_cast<int>(std::stoul(pinnable_value.data()));
199199
}
200200
}
201201
return rocksdb::Status::OK();
202202
}
203203

204204
rocksdb::Status TopK::setTopkData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata,
205-
const BlockSplitTopK &topk, const std::vector<bool> &is_dirty_buckets,
205+
const BlockSplitTopK &topk, const std::vector<bool> &is_dirty_buckets,
206206
const std::vector<bool> &is_dirty_heaps) {
207207
auto batch = storage_->GetWriteBatchBase();
208208

@@ -215,8 +215,8 @@ rocksdb::Status TopK::setTopkData(engine::Context &ctx, const Slice &ns_key, con
215215
for (uint32_t k = 0; k < 2; k++) {
216216
std::string sub_key = getSubKey(ns_key, metadata, i, j, k);
217217
std::string sub_value;
218-
int dep = j / metadata.width;
219-
int wid = j % metadata.width;
218+
uint32_t dep = j / metadata.width;
219+
uint32_t wid = j % metadata.width;
220220
if (k == 0) {
221221
sub_value = std::to_string(topk.buckets[dep][wid].fp);
222222
} else {
@@ -264,7 +264,8 @@ std::string TopK::getTKKey(const Slice &ns_key, const TopKMetadata &metadata, ui
264264
return bf_key;
265265
}
266266

267-
std::string TopK::getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, uint8_t index) {
267+
std::string TopK::getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index,
268+
uint8_t index) {
268269
std::string sub_key;
269270
PutFixed8(&sub_key, topk_index);
270271
PutFixed32(&sub_key, sub_index);

src/types/redis_topk.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ class TopK : public SubKeyScanner {
6161

6262
std::string getTKKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t index);
6363

64-
std::string getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, uint8_t index);
64+
std::string getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index,
65+
uint8_t index);
6566
};
6667

6768
} // namespace redis

src/types/topk.cc

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ static uint32_t TopkHash(const void *item, int itemlen, uint32_t i) { return Mur
9898
constexpr uint32_t GA = 1919;
9999

100100
/* ---------------------------------------------------------------------- */
101-
void BlockSplitTopK::HeapifyDown(int start) {
101+
void BlockSplitTopK::HeapifyDown(int start, std::vector<bool> &is_dirty_heaps) {
102102
int child = start;
103103

104104
// check whether larger than children
@@ -117,6 +117,7 @@ void BlockSplitTopK::HeapifyDown(int start) {
117117
HeapBucket top = heap[start];
118118
do {
119119
heap[start] = heap[child];
120+
is_dirty_heaps[start] = true;
120121
start = child;
121122

122123
if ((heap_size - 2) / 2 < child) {
@@ -129,9 +130,10 @@ void BlockSplitTopK::HeapifyDown(int start) {
129130
}
130131
} while (heap[child].count < top.count);
131132
heap[start] = top;
133+
is_dirty_heaps[start] = true;
132134
}
133135

134-
void BlockSplitTopK::HeapifyUp(int start) {
136+
void BlockSplitTopK::HeapifyUp(int start, std::vector<bool> &is_dirty_heaps) {
135137
int parent = start;
136138

137139
// check whether smaller than parent
@@ -147,6 +149,7 @@ void BlockSplitTopK::HeapifyUp(int start) {
147149
HeapBucket bottom = heap[start];
148150
do {
149151
heap[start] = heap[parent];
152+
is_dirty_heaps[start] = true;
150153
start = parent;
151154

152155
if (start == 0) {
@@ -155,6 +158,7 @@ void BlockSplitTopK::HeapifyUp(int start) {
155158
parent = (parent - 1) / 2;
156159
} while (heap[parent].count > bottom.count);
157160
heap[start] = bottom;
161+
is_dirty_heaps[start] = true;
158162
}
159163

160164
int BlockSplitTopK::CheckExistInHeap(const std::string &item) const {
@@ -170,7 +174,8 @@ int BlockSplitTopK::CmpHeapBucketCount(const HeapBucket &a, const HeapBucket &b)
170174
return a.count < b.count ? 1 : a.count > b.count ? -1 : 0;
171175
}
172176

173-
void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
177+
void BlockSplitTopK::Add(const std::string &item, uint32_t increment, std::vector<bool> &is_dirty_buckets,
178+
std::vector<bool> &is_dirty_heaps) {
174179
uint32_t itemlen = item.size();
175180
const char *data = item.c_str();
176181
CounterT max_count = 0;
@@ -184,10 +189,12 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
184189
if (buckets[i][loc].count == 0) {
185190
buckets[i][loc].fp = fp;
186191
buckets[i][loc].count = increment;
192+
is_dirty_buckets[i * width + loc] = true;
187193
max_count = std::max(max_count, buckets[i][loc].count);
188194
} else if (buckets[i][loc].fp == fp && location != -1) {
189195
buckets[i][loc].count += increment;
190196
max_count = std::max(max_count, buckets[i][loc].count);
197+
is_dirty_buckets[i * width + loc] = true;
191198
} else {
192199
// decay
193200
uint32_t local_incr = increment;
@@ -196,15 +203,17 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
196203
if (buckets[i][loc].count < TOPK_DECAY_LOOKUP_TABLE) {
197204
decay = lookup_table[buckets[i][loc].count];
198205
} else {
199-
decay = pow(lookup_table[TOPK_DECAY_LOOKUP_TABLE - 1], (buckets[i][loc].count / (TOPK_DECAY_LOOKUP_TABLE - 1))) *
200-
lookup_table[buckets[i][loc].count % (TOPK_DECAY_LOOKUP_TABLE - 1)];
206+
decay =
207+
pow(lookup_table[TOPK_DECAY_LOOKUP_TABLE - 1], (buckets[i][loc].count / (TOPK_DECAY_LOOKUP_TABLE - 1))) *
208+
lookup_table[buckets[i][loc].count % (TOPK_DECAY_LOOKUP_TABLE - 1)];
201209
}
202210
double chance = rand() / (double)RAND_MAX;
203211
if (chance < decay) {
204212
--buckets[i][loc].count;
205213
if (buckets[i][loc].count == 0) {
206214
buckets[i][loc].fp = fp;
207215
buckets[i][loc].count = 1;
216+
is_dirty_buckets[i * width + loc] = true;
208217
max_count = std::max(max_count, buckets[i][loc].count);
209218
break;
210219
}
@@ -220,19 +229,21 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
220229
heap[0].item = item;
221230

222231
heap[0].count = max_count;
223-
224-
HeapifyDown(0);
232+
is_dirty_heaps[0] = true;
233+
HeapifyDown(0, is_dirty_heaps);
225234
}
226235
} else {
227236
heap[location].count += increment;
228-
HeapifyDown(location);
237+
HeapifyDown(location, is_dirty_heaps);
229238
}
230239
} else {
231240
heap[heap_size].fp = fp;
232241
heap[heap_size].item = item;
233242
heap[heap_size].count = max_count;
234243

235-
HeapifyUp((int)heap_size);
244+
is_dirty_heaps[heap_size] = true;
245+
246+
HeapifyUp((int)heap_size, is_dirty_heaps);
236247
heap_size++;
237248
}
238249
}

src/types/topk.h

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,45 @@ struct HeapBucket {
3737
CounterT count;
3838
std::string item;
3939

40-
HeapBucket& operator=(const HeapBucket& other) {
40+
HeapBucket() = default;
41+
42+
HeapBucket(uint32_t fp, CounterT count, std::string item) : fp(fp), count(count), item(std::move(item)) {}
43+
44+
HeapBucket(const HeapBucket &other) {
45+
if (this != &other) {
46+
fp = other.fp;
47+
count = other.count;
48+
item = other.item;
49+
}
50+
}
51+
52+
HeapBucket(const HeapBucket &&other) noexcept {
53+
if (this != &other) {
54+
fp = other.fp;
55+
count = other.count;
56+
item = other.item;
57+
}
58+
}
59+
60+
HeapBucket &operator=(const HeapBucket &other) {
61+
if (this != &other) {
62+
fp = other.fp;
63+
count = other.count;
64+
item = other.item;
65+
}
66+
return *this;
67+
}
68+
69+
HeapBucket &operator=(const HeapBucket &&other) noexcept {
4170
if (this != &other) {
4271
fp = other.fp;
4372
count = other.count;
4473
item = other.item;
45-
return *this;
4674
}
75+
return *this;
4776
}
77+
78+
~HeapBucket() = default;
4879
};
4980

5081
struct Bucket {
@@ -73,14 +104,15 @@ class BlockSplitTopK {
73104
}
74105
}
75106

76-
~BlockSplitTopK() {}
107+
~BlockSplitTopK() = default;
77108

78-
void Add(const std::string &item, uint32_t increment);
109+
void Add(const std::string &item, uint32_t increment, std::vector<bool> &is_dirty_buckets,
110+
std::vector<bool> &is_dirty_heaps);
79111
bool Query(const std::string &item) const;
80112
std::vector<HeapBucket> List();
81113

82-
void HeapifyDown(int start);
83-
void HeapifyUp(int start);
114+
void HeapifyDown(int start, std::vector<bool> &is_dirty_heaps);
115+
void HeapifyUp(int start, std::vector<bool> &is_dirty_heaps);
84116
int CheckExistInHeap(const std::string &item) const;
85117
static int CmpHeapBucketCount(const HeapBucket &a, const HeapBucket &b);
86118

0 commit comments

Comments
 (0)