Skip to content

Commit 7f7b2df

Browse files
committed
add expirePools to optimize expire efficiency
Signed-off-by: Ray Cao <zisong.cw@alibaba-inc.com>
1 parent 772bd8b commit 7f7b2df

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

src/expire.c

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,74 @@
3737

3838
#include "server.h"
3939

40+
41+
/* TODO: comment */
42+
#define EPPOOL_SIZE 20
43+
44+
/* Define expirationPool as an array of robj */
45+
typedef robj **expirationPool;
46+
/* Create an array of expiration pool, one for each database. */
47+
static expirationPool *ExpirationPools;
48+
49+
void expirationPoolAlloc(void) {
50+
int dbnum = server.dbnum;
51+
52+
ExpirationPools = zmalloc(sizeof(expirationPool) * dbnum);
53+
54+
for (int i = 0; i < dbnum; i++) {
55+
ExpirationPools[i] = zmalloc(sizeof(struct robj*) * EPPOOL_SIZE);
56+
for (int j = 0; j < EPPOOL_SIZE; j++) {
57+
ExpirationPools[i][j] = NULL;
58+
}
59+
}
60+
}
61+
62+
void expirationPoolPopulate(serverDb *db, robj *val) {
63+
expirationPool pool = ExpirationPools[db->id];
64+
int k;
65+
66+
/* First check if the key already exists in the pool */
67+
for (int i = 0; i < EPPOOL_SIZE; i++) {
68+
if (pool[i] && pool[i] == val) {
69+
return; /* Key already exists in the pool, no need to insert again */
70+
}
71+
}
72+
73+
k = 0;
74+
while (k < EPPOOL_SIZE && pool[k] && objectGetExpire(pool[k]) > objectGetExpire(val)) k++;
75+
76+
/* If we can't insert (all slots filled with sooner-to-expire entries), return */
77+
if (k == 0 && pool[EPPOOL_SIZE - 1] != NULL) {
78+
return;
79+
} else if (k < EPPOOL_SIZE && pool[k] == NULL) {
80+
/* Inserting into empty position. No setup needed before insert. */
81+
} else {
82+
/* Inserting in the middle. Now k points to the first element
83+
* with expire time greater than the element to insert. */
84+
if (pool[EPPOOL_SIZE - 1] == NULL) {
85+
/* Free space on the right? Insert at k shifting
86+
* all the elements from k to end to the right. */
87+
memmove(pool + k + 1, pool + k, sizeof(pool[0]) * (EPPOOL_SIZE - k - 1));
88+
} else {
89+
/* No free space on right? Insert at k-1 */
90+
k--;
91+
/* Shift all elements on the left of k (included) to the
92+
* left, so we discard the element with smallest expire time. */
93+
if (pool[0]) decrRefCount(pool[0]);
94+
memmove(pool, pool + 1, sizeof(pool[0]) * k);
95+
}
96+
}
97+
98+
/* Store the entry and increment its refcount */
99+
pool[k] = val;
100+
incrRefCount(val);
101+
102+
for (int i = 0; i < EPPOOL_SIZE; i++) {
103+
if (pool[i])
104+
serverLog(LL_WARNING, "epiration pool index: %d, expiretime: %lld", i, objectGetExpire(pool[i]));
105+
}
106+
}
107+
40108
/*-----------------------------------------------------------------------------
41109
* Incremental collection of expired keys.
42110
*
@@ -145,6 +213,9 @@ void expireScanCallback(void *privdata, void *entry) {
145213
/* We want the average TTL of keys yet not expired. */
146214
data->ttl_sum += ttl;
147215
data->ttl_samples++;
216+
217+
/* Try to add this entry to the expiration pool for future expiration */
218+
expirationPoolPopulate(data->db, val);
148219
}
149220
data->sampled++;
150221
}
@@ -252,6 +323,38 @@ void activeExpireCycle(int type) {
252323

253324
if (kvstoreSize(db->expires)) dbs_performed++;
254325

326+
/* First, try to expire keys from the expire pool */
327+
robj **pool = ExpirationPools[db->id];
328+
long long now = mstime();
329+
int expired_from_pool = 0;
330+
331+
/* Traverse from back to front, stop when we find a non-expired key */
332+
for (int i = EPPOOL_SIZE - 1; i >= 0; i--) {
333+
if (pool[i] == NULL) continue;
334+
if (objectGetExpire(pool[i]) <= now) {
335+
/* Key is expired, try to expire it */
336+
if (activeExpireCycleTryExpire(db, pool[i], now)) {
337+
expired_from_pool++;
338+
/* Propagate the DEL command */
339+
postExecutionUnitOperations();
340+
}
341+
342+
/* Clean up the pool entry */
343+
decrRefCount(pool[i]);
344+
pool[i] = NULL;
345+
} else {
346+
/* Since we're traversing from back to front and keys are sorted by expire time,
347+
* if we find a non-expired key, all keys before it are also not expired */
348+
break;
349+
}
350+
}
351+
352+
/* If we found expired keys in the pool, update stats */
353+
if (expired_from_pool > 0) {
354+
total_expired += expired_from_pool;
355+
total_sampled += expired_from_pool;
356+
}
357+
255358
/* Continue to expire if at the end of the cycle there are still
256359
* a big percentage of keys to expire, compared to the number of keys
257360
* we scanned. The percentage, stored in config_cycle_acceptable_stale

src/server.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2844,6 +2844,7 @@ void initServer(void) {
28442844
server.db[j].avg_ttl = 0;
28452845
}
28462846
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
2847+
expirationPoolAlloc(); /* Initialize the expiration pools. */
28472848
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
28482849
* seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels
28492850
* (which has to be kvstore), see pubsubtype.serverPubSubChannels */

src/server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3555,6 +3555,8 @@ void handleBlockedClientsTimeout(void);
35553555
int clientsCronHandleTimeout(client *c, mstime_t now_ms);
35563556

35573557
/* expire.c -- Handling of expired keys */
3558+
void expirationPoolAlloc(void);
3559+
void expirationPoolPopulate(serverDb *db, robj *val);
35583560
void activeExpireCycle(int type);
35593561
void expireReplicaKeys(void);
35603562
void rememberReplicaKeyWithExpire(serverDb *db, robj *key);

0 commit comments

Comments
 (0)