Skip to content

Commit 1d03d59

Browse files
committed
fix: memory leaks
1 parent 87d1add commit 1d03d59

4 files changed

Lines changed: 130 additions & 91 deletions

File tree

src/types/redis_topk.cc

Lines changed: 80 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ rocksdb::Status TopK::IncrBy(engine::Context &ctx, const Slice &user_key, const
5959
s = getTopKData(ctx, ns_key, topk_metadata, &topk);
6060
if (!s.ok()) return s;
6161

62+
std::vector<bool> is_dirty_buckets(topk_metadata.width * topk_metadata.depth, false);
63+
std::vector<bool> is_dirty_heaps(topk_metadata.top_k, false);
6264
topk.Add(items.data_, incr);
6365

6466
s = setTopkData(ctx, ns_key, topk_metadata, topk);
@@ -96,7 +98,7 @@ rocksdb::Status TopK::List(engine::Context &ctx, const Slice &user_key, std::vec
9698

9799
auto heap_buckets = topk.List();
98100
for (auto &bucket : heap_buckets) {
99-
items.emplace_back(bucket.item, bucket.itemlen);
101+
items.emplace_back(bucket.item);
100102
}
101103

102104
return rocksdb::Status::OK();
@@ -140,7 +142,10 @@ rocksdb::Status TopK::createTopK(engine::Context &ctx, const Slice &ns_key, uint
140142
s = batch->Put(metadata_cf_handle_, ns_key, top_k_meta_bytes);
141143
if (!s.ok()) return s;
142144

143-
s = setTopkData(ctx, ns_key, *metadata, block_split_top_k);
145+
// is dirty vector to optimize writes
146+
std::vector<bool> is_dirty_buckets(width * depth, true);
147+
std::vector<bool> is_dirty_heaps(k, true);
148+
s = setTopkData(ctx, ns_key, *metadata, block_split_top_k, is_dirty_buckets, is_dirty_heaps);
144149
if (!s.ok()) return s;
145150

146151
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
@@ -154,25 +159,40 @@ rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, con
154159
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), tk_key, &pinnable_value);
155160
if (!s.ok()) return s;
156161
if (i == 0) {
157-
if (pinnable_value.size() != metadata.width * metadata.depth * sizeof(Bucket)) {
158-
return rocksdb::Status::Corruption("TopK data corrupted: buckets size mismatch");
162+
// get buckets of topk structure
163+
for (uint32_t j = 0; j < metadata.width * metadata.depth; j++) {
164+
for (uint8_t k = 0; k < 2; k++) {
165+
std::string bk_key = getSubKey(ns_key, metadata, i, j, k);
166+
rocksdb::PinnableSlice bk_value;
167+
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), bk_key, &bk_value);
168+
if (!s.ok()) return s;
169+
170+
int dep = j / metadata.width;
171+
int wid = j % metadata.width;
172+
if (k == 0) {
173+
topk->buckets[dep][wid].fp = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
174+
} else {
175+
topk->buckets[dep][wid].count = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
176+
}
177+
}
159178
}
160-
memcpy(topk->buckets, pinnable_value.data(), pinnable_value.size());
161179
} else if (i == 1) {
162-
if (pinnable_value.size() != metadata.top_k * sizeof(HeapBucket)) {
163-
return rocksdb::Status::Corruption("TopK data corrupted: heap size mismatch");
164-
}
165-
memcpy(topk->heap, pinnable_value.data(), pinnable_value.size());
180+
// get heapbucket of topk structure
166181
for (uint32_t j = 0; j < metadata.top_k; j++) {
167-
std::string hb_key = getHBKey(ns_key, metadata, i, j);
168-
rocksdb::PinnableSlice hb_value;
169-
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), hb_key, &hb_value);
170-
if (!s.ok()) return s;
171-
if (hb_value.size() != topk->heap[j].itemlen) {
172-
return rocksdb::Status::Corruption("TopK data corrupted: heap bucket size mismatch");
182+
for (uint8_t k = 0; k < 3; k++) {
183+
std::string hb_key = getSubKey(ns_key, metadata, i, j, k);
184+
rocksdb::PinnableSlice hb_value;
185+
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), hb_key, &hb_value);
186+
if (!s.ok()) return s;
187+
188+
if (k == 0) {
189+
topk->heap[j].count = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
190+
} else if (k == 1) {
191+
topk->heap[j].fp = static_cast<uint32_t>(std::stoul(pinnable_value.data()));
192+
} else {
193+
topk->heap[j].item = hb_value.data();
194+
}
173195
}
174-
topk->heap[j].item = new char[topk->heap[j].itemlen];
175-
memcpy(topk->heap[j].item, hb_value.data(), hb_value.size());
176196
}
177197
} else {
178198
topk->heap_size = static_cast<int>(std::stoul(pinnable_value.data()));
@@ -182,30 +202,56 @@ rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, con
182202
}
183203

184204
rocksdb::Status TopK::setTopkData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata,
185-
const BlockSplitTopK &topk) {
205+
const BlockSplitTopK &topk, const std::vector<bool> &is_dirty_buckets,
206+
const std::vector<bool> &is_dirty_heaps) {
186207
auto batch = storage_->GetWriteBatchBase();
187-
WriteBatchLogData log_data(kRedisTopK, {"setTopkData"});
188-
rocksdb::Status s = batch->PutLogData(log_data.Encode());
189-
if (!s.ok()) return s;
190208

191209
for (uint8_t i = 0; i < 3; i++) {
192-
std::string tk_key = getTKKey(ns_key, metadata, i);
193-
std::string tk_value;
194210
if (i == 0) {
195-
tk_value.assign(reinterpret_cast<const char *>(topk.buckets), metadata.width * metadata.depth * sizeof(Bucket));
211+
for (uint32_t j = 0; j < metadata.width * metadata.depth; j++) {
212+
if (!is_dirty_buckets[j]) {
213+
continue;
214+
}
215+
for (uint32_t k = 0; k < 2; k++) {
216+
std::string sub_key = getSubKey(ns_key, metadata, i, j, k);
217+
std::string sub_value;
218+
int dep = j / metadata.width;
219+
int wid = j % metadata.width;
220+
if (k == 0) {
221+
sub_value = std::to_string(topk.buckets[dep][wid].fp);
222+
} else {
223+
sub_value = std::to_string(topk.buckets[dep][wid].count);
224+
}
225+
rocksdb::Status s = batch->Put(sub_key, sub_value);
226+
if (!s.ok()) return s;
227+
}
228+
}
196229
} else if (i == 1) {
197-
tk_value.assign(reinterpret_cast<const char *>(topk.heap), metadata.top_k * sizeof(HeapBucket));
198230
for (uint32_t j = 0; j < metadata.top_k; j++) {
199-
std::string hb_key = getHBKey(ns_key, metadata, i, j);
200-
std::string hb_value(topk.heap[j].item, topk.heap[j].itemlen);
201-
s = batch->Put(hb_key, hb_value);
202-
if (!s.ok()) return s;
231+
if (!is_dirty_heaps[j]) {
232+
continue;
233+
}
234+
for (uint8_t k = 0; k < 3; k++) {
235+
std::string sub_key = getSubKey(ns_key, metadata, i, j, k);
236+
std::string sub_value;
237+
if (k == 0) {
238+
sub_value = std::to_string(topk.heap[j].count);
239+
} else if (k == 1) {
240+
sub_value = std::to_string(topk.heap[j].fp);
241+
} else {
242+
sub_value = topk.heap[j].item;
243+
}
244+
rocksdb::Status s = batch->Put(sub_key, sub_value);
245+
if (!s.ok()) return s;
246+
}
203247
}
204248
} else {
249+
std::string tk_key = getTKKey(ns_key, metadata, i);
250+
std::string tk_value;
205251
tk_value = std::to_string(topk.heap_size);
252+
rocksdb::Status s = batch->Put(tk_key, tk_value);
253+
if (!s.ok()) return s;
206254
}
207-
rocksdb::Status s = batch->Put(tk_key, tk_value);
208-
if (!s.ok()) return s;
209255
}
210256

211257
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
@@ -218,10 +264,11 @@ std::string TopK::getTKKey(const Slice &ns_key, const TopKMetadata &metadata, ui
218264
return bf_key;
219265
}
220266

221-
std::string TopK::getHBKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t hp_index) {
267+
std::string TopK::getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, uint8_t index) {
222268
std::string sub_key;
223269
PutFixed8(&sub_key, topk_index);
224-
PutFixed32(&sub_key, hp_index);
270+
PutFixed32(&sub_key, sub_index);
271+
PutFixed8(&sub_key, index);
225272
return InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
226273
}
227274

src/types/redis_topk.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ class TopK : public SubKeyScanner {
5656
rocksdb::Status getTopKData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata,
5757
BlockSplitTopK *topk);
5858
rocksdb::Status setTopkData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata,
59-
const BlockSplitTopK &topk);
59+
const BlockSplitTopK &topk, const std::vector<bool> &is_dirty_buckets,
60+
const std::vector<bool> &is_dirty_heaps);
6061

6162
std::string getTKKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t index);
6263

63-
std::string getHBKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t hp_index);
64+
std::string getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, uint8_t index);
6465
};
6566

6667
} // namespace redis

src/types/topk.cc

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ static uint32_t TopkHash(const void *item, int itemlen, uint32_t i) { return Mur
9898
constexpr uint32_t GA = 1919;
9999

100100
/* ---------------------------------------------------------------------- */
101-
void BlockSplitTopK::HeapifyDown(int start) const {
101+
void BlockSplitTopK::HeapifyDown(int start) {
102102
int child = start;
103103

104104
// check whether larger than children
@@ -114,10 +114,9 @@ void BlockSplitTopK::HeapifyDown(int start) const {
114114
return;
115115
}
116116

117-
HeapBucket top;
118-
memcpy(&top, &heap[start], sizeof(HeapBucket));
117+
HeapBucket top = heap[start];
119118
do {
120-
memcpy(&heap[start], &heap[child], sizeof(HeapBucket));
119+
heap[start] = heap[child];
121120
start = child;
122121

123122
if ((heap_size - 2) / 2 < child) {
@@ -129,10 +128,10 @@ void BlockSplitTopK::HeapifyDown(int start) const {
129128
++child;
130129
}
131130
} while (heap[child].count < top.count);
132-
memcpy(&heap[start], &top, sizeof(HeapBucket));
131+
heap[start] = top;
133132
}
134133

135-
void BlockSplitTopK::HeapifyUp(int start) const {
134+
void BlockSplitTopK::HeapifyUp(int start) {
136135
int parent = start;
137136

138137
// check whether smaller than parent
@@ -145,25 +144,22 @@ void BlockSplitTopK::HeapifyUp(int start) const {
145144
return;
146145
}
147146

148-
HeapBucket bottom;
149-
memcpy(&bottom, &heap[start], sizeof(HeapBucket));
147+
HeapBucket bottom = heap[start];
150148
do {
151-
memcpy(&heap[start], &heap[parent], sizeof(HeapBucket));
149+
heap[start] = heap[parent];
152150
start = parent;
153151

154152
if (start == 0) {
155153
break;
156154
}
157155
parent = (parent - 1) / 2;
158156
} while (heap[parent].count > bottom.count);
159-
memcpy(&heap[start], &bottom, sizeof(HeapBucket));
157+
heap[start] = bottom;
160158
}
161159

162160
int BlockSplitTopK::CheckExistInHeap(const std::string &item) const {
163-
uint32_t itemlen = item.size();
164-
const char *data = item.c_str();
165161
for (int i = heap_size - 1; i >= 0; --i) {
166-
if (heap[i].itemlen == itemlen && memcmp(heap[i].item, data, itemlen) == 0) {
162+
if (heap[i].item == item) {
167163
return i;
168164
}
169165
}
@@ -185,32 +181,31 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
185181
for (size_t i = 0; i < depth; ++i) {
186182
uint32_t loc = TopkHash(data, (int)itemlen, i) % width;
187183

188-
loc += i * width;
189-
if (buckets[loc].count == 0) {
190-
buckets[loc].fp = fp;
191-
buckets[loc].count = increment;
192-
max_count = std::max(max_count, buckets[loc].count);
193-
} else if (buckets[loc].fp == fp && location != -1) {
194-
buckets[loc].count += increment;
195-
max_count = std::max(max_count, buckets[loc].count);
184+
if (buckets[i][loc].count == 0) {
185+
buckets[i][loc].fp = fp;
186+
buckets[i][loc].count = increment;
187+
max_count = std::max(max_count, buckets[i][loc].count);
188+
} else if (buckets[i][loc].fp == fp && location != -1) {
189+
buckets[i][loc].count += increment;
190+
max_count = std::max(max_count, buckets[i][loc].count);
196191
} else {
197192
// decay
198193
uint32_t local_incr = increment;
199194
for (; local_incr > 0; --local_incr) {
200195
double decay = 0.0;
201-
if (buckets[loc].count < TOPK_DECAY_LOOKUP_TABLE) {
202-
decay = lookup_table[buckets[loc].count];
196+
if (buckets[i][loc].count < TOPK_DECAY_LOOKUP_TABLE) {
197+
decay = lookup_table[buckets[i][loc].count];
203198
} else {
204-
decay = pow(lookup_table[TOPK_DECAY_LOOKUP_TABLE - 1], (buckets[loc].count / (TOPK_DECAY_LOOKUP_TABLE - 1))) *
205-
lookup_table[buckets[loc].count % (TOPK_DECAY_LOOKUP_TABLE - 1)];
199+
decay = pow(lookup_table[TOPK_DECAY_LOOKUP_TABLE - 1], (buckets[i][loc].count / (TOPK_DECAY_LOOKUP_TABLE - 1))) *
200+
lookup_table[buckets[i][loc].count % (TOPK_DECAY_LOOKUP_TABLE - 1)];
206201
}
207202
double chance = rand() / (double)RAND_MAX;
208203
if (chance < decay) {
209-
--buckets[loc].count;
210-
if (buckets[loc].count == 0) {
211-
buckets[loc].fp = fp;
212-
buckets[loc].count = 1;
213-
max_count = std::max(max_count, buckets[loc].count);
204+
--buckets[i][loc].count;
205+
if (buckets[i][loc].count == 0) {
206+
buckets[i][loc].fp = fp;
207+
buckets[i][loc].count = 1;
208+
max_count = std::max(max_count, buckets[i][loc].count);
214209
break;
215210
}
216211
}
@@ -222,10 +217,7 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
222217
if (location == -1) {
223218
if (heap[0].count == max_count || heap[0].count + 1 == max_count) {
224219
heap[0].fp = fp;
225-
heap[0].itemlen = itemlen;
226-
delete[] heap[0].item;
227-
heap[0].item = new char[itemlen];
228-
memcpy(heap[0].item, data, itemlen);
220+
heap[0].item = item;
229221

230222
heap[0].count = max_count;
231223

@@ -237,9 +229,7 @@ void BlockSplitTopK::Add(const std::string &item, uint32_t increment) {
237229
}
238230
} else {
239231
heap[heap_size].fp = fp;
240-
heap[heap_size].itemlen = itemlen;
241-
heap[heap_size].item = new char[itemlen];
242-
memcpy(heap[heap_size].item, data, itemlen);
232+
heap[heap_size].item = item;
243233
heap[heap_size].count = max_count;
244234

245235
HeapifyUp((int)heap_size);

0 commit comments

Comments
 (0)