Skip to content

Commit cc65c31

Browse files
committed
add expirationPools to optimize expire efficiency
Signed-off-by: Ray Cao <zisong.cw@alibaba-inc.com>
1 parent 772bd8b commit cc65c31

File tree

3 files changed

+103
-1
lines changed

3 files changed

+103
-1
lines changed

src/expire.c

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,68 @@
3737

3838
#include "server.h"
3939

40+
41+
/* TODO: comment */
42+
43+
#define EPPOOL_SIZE 20
44+
#define EPPOOL_CACHED_SDS_SIZE 255
45+
struct expirationPoolEntry {
46+
long long expiretime; /* Absolute expiration time */
47+
robj *entry; /* The entry with expire time */
48+
};
49+
50+
/* Create an array of expire pools, one for each database. */
51+
static struct expirationPoolEntry **ExpirationPools;
52+
53+
void expirationPoolAlloc(void) {
54+
int dbnum = server.dbnum;
55+
56+
ExpirationPools = zmalloc(sizeof(struct expirationPoolEntry *) * dbnum);
57+
58+
for (int i = 0; i < dbnum; i++) {
59+
ExpirationPools[i] = zmalloc(sizeof(struct expirationPoolEntry) * EPPOOL_SIZE);
60+
for (int j = 0; j < EPPOOL_SIZE; j++) {
61+
ExpirationPools[i][j].expiretime = 0;
62+
ExpirationPools[i][j].entry = NULL;
63+
}
64+
}
65+
}
66+
67+
void expirationPoolPopulate(serverDb *db, robj *val, long long expiretime) {
68+
struct expirationPoolEntry *pool = ExpirationPools[db->id];
69+
int k;
70+
71+
k = 0;
72+
while (k < EPPOOL_SIZE && pool[k].entry && pool[k].expiretime < expiretime) k++;
73+
74+
/* If we can't insert (all slots filled with sooner-to-expire entries), return */
75+
if (k == 0 && pool[EPPOOL_SIZE - 1].entry != NULL) {
76+
return;
77+
} else if (k < EPPOOL_SIZE && pool[k].entry == NULL) {
78+
/* Inserting into empty position. No setup needed before insert. */
79+
} else {
80+
/* Inserting in the middle. Now k points to the first element
81+
* with expire time greater than the element to insert. */
82+
if (pool[EPPOOL_SIZE - 1].entry == NULL) {
83+
/* Free space on the right? Insert at k shifting
84+
* all the elements from k to end to the right. */
85+
memmove(pool + k + 1, pool + k, sizeof(pool[0]) * (EPPOOL_SIZE - k - 1));
86+
} else {
87+
/* No free space on right? Insert at k-1 */
88+
k--;
89+
/* Shift all elements on the left of k (included) to the
90+
* left, so we discard the element with smallest expire time. */
91+
if (pool[0].entry) decrRefCount(pool[0].entry);
92+
memmove(pool, pool + 1, sizeof(pool[0]) * k);
93+
}
94+
}
95+
96+
/* Store the entry and increment its refcount */
97+
pool[k].entry = val;
98+
incrRefCount(val);
99+
pool[k].expiretime = expiretime;
100+
}
101+
40102
/*-----------------------------------------------------------------------------
41103
* Incremental collection of expired keys.
42104
*
@@ -135,7 +197,8 @@ typedef struct {
135197
void expireScanCallback(void *privdata, void *entry) {
136198
robj *val = entry;
137199
expireScanData *data = privdata;
138-
long long ttl = objectGetExpire(val) - data->now;
200+
long long expiretime = objectGetExpire(val);
201+
long long ttl = expiretime - data->now;
139202
if (activeExpireCycleTryExpire(data->db, val, data->now)) {
140203
data->expired++;
141204
/* Propagate the DEL command */
@@ -145,6 +208,9 @@ void expireScanCallback(void *privdata, void *entry) {
145208
/* We want the average TTL of keys yet not expired. */
146209
data->ttl_sum += ttl;
147210
data->ttl_samples++;
211+
212+
/* Try to add this entry to the expiration pool for future expiration */
213+
expirationPoolPopulate(data->db, val, expiretime);
148214
}
149215
data->sampled++;
150216
}
@@ -252,6 +318,39 @@ void activeExpireCycle(int type) {
252318

253319
if (kvstoreSize(db->expires)) dbs_performed++;
254320

321+
/* First, try to expire keys from the expire pool */
322+
struct expirationPoolEntry *pool = ExpirationPools[db->id];
323+
long long now = mstime();
324+
int expired_from_pool = 0;
325+
326+
/* Traverse from back to front, stop when we find a non-expired key */
327+
for (int i = EPPOOL_SIZE - 1; i >= 0; i--) {
328+
if (pool[i].entry == NULL) continue;
329+
if (pool[i].expiretime <= now) {
330+
/* Key is expired, try to expire it */
331+
if (activeExpireCycleTryExpire(db, pool[i].entry, now)) {
332+
expired_from_pool++;
333+
/* Propagate the DEL command */
334+
postExecutionUnitOperations();
335+
}
336+
337+
/* Clean up the pool entry */
338+
decrRefCount(pool[i].entry);
339+
pool[i].entry = NULL;
340+
pool[i].expiretime = 0;
341+
} else {
342+
/* Since we're traversing from back to front and keys are sorted by expire time,
343+
* if we find a non-expired key, all keys before it are also not expired */
344+
break;
345+
}
346+
}
347+
348+
/* If we found expired keys in the pool, update stats */
349+
if (expired_from_pool > 0) {
350+
total_expired += expired_from_pool;
351+
total_sampled += expired_from_pool;
352+
}
353+
255354
/* Continue to expire if at the end of the cycle there are still
256355
* a big percentage of keys to expire, compared to the number of keys
257356
* we scanned. The percentage, stored in config_cycle_acceptable_stale

src/server.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2844,6 +2844,7 @@ void initServer(void) {
28442844
server.db[j].avg_ttl = 0;
28452845
}
28462846
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
2847+
expirationPoolAlloc(); /* Initialize the expire pools. */
28472848
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
28482849
* seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels
28492850
* (which has to be kvstore), see pubsubtype.serverPubSubChannels */

src/server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3555,6 +3555,8 @@ void handleBlockedClientsTimeout(void);
35553555
int clientsCronHandleTimeout(client *c, mstime_t now_ms);
35563556

35573557
/* expire.c -- Handling of expired keys */
3558+
void expirationPoolAlloc(void);
3559+
void expirationPoolPopulate(serverDb *db, robj *val, long long expiretime);
35583560
void activeExpireCycle(int type);
35593561
void expireReplicaKeys(void);
35603562
void rememberReplicaKeyWithExpire(serverDb *db, robj *key);

0 commit comments

Comments
 (0)