Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,8 @@ long swapListTypeLength(robj *list, objectMeta *object_meta);
void swapListTypePush(robj *subject, robj *value, int where, redisDb *db, robj *key);
robj *swapListTypePop(robj *subject, int where, redisDb *db, robj *key);
void swapListMetaDelRange(redisDb *db, robj *key, long ltrim, long rtrim);
void swapListMetaTrimCold(redisDb *db, robj *key, long ltrim, long rtrim,
long long orig_lstart, long long orig_lend);
/* zset */
typedef struct zsetSwapData {
swapData sd;
Expand Down
145 changes: 145 additions & 0 deletions src/ctrip_swap_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,9 @@ int metaListDelete(metaList *main, int direction, long ridx) {
return delete;
}

/* Forward declaration: defined after swapListMetaDelRange (below metaListMerge). */
static void listMetaTrimToLen(listMeta *meta, long desired_len);

long metaListMerge(metaList *main, metaList *delta) {
long merged = 0, ridx, main_mean_ridx, delta_mean_ridx;
int segtype;
Expand All @@ -1028,6 +1031,55 @@ long metaListMerge(metaList *main, metaList *delta) {
serverAssert(metaListIsValid(main,LIST_META_STRICT_NOEMPTY|LIST_META_STRICT_CONTINOUS));
serverAssert(metaListIsValid(delta,0));

/* Clip delta to main's ridx range before merging.
*
* Race condition: swapListMetaTrimCold() may shrink the in-memory meta
* (reducing its last ridx) AFTER a SWAP_IN was dispatched but BEFORE it
* completes. The decoded delta then contains elements at ridx positions
* beyond the current main range, which would violate listMetaAlign's
* invariant and cause an assertion failure.
*
* Fix: discard any delta elements (and the corresponding meta segments)
* whose ridx positions fall outside main's range. These are "orphaned"
* entries – the meta trim made them logically deleted even though the
* RocksDB entries were not yet physically removed. */
{
segment *main_last = listMetaLastSegment(main->meta);
segment *delta_last_seg = listMetaLastSegment(delta->meta);
if (main_last && delta_last_seg) {
long main_end = main_last->index + main_last->len;
long delta_end = delta_last_seg->index + delta_last_seg->len;
if (delta_end > main_end) {
/* Count HOT elements in delta at ridx >= main_end.
* LIST_HEAD direction iterates from tail (high ridx) to head. */
long excess_hot = 0;
{
metaListIterator iter;
metaListIterInit(&iter, delta, LIST_HEAD);
while (!metaListIterFinished(&iter)) {
long cur_ridx = metaListIterCur(&iter, NULL, NULL);
if (cur_ridx >= main_end) {
excess_hot++;
metaListIterNext(&iter);
} else {
break;
}
}
metaListIterDeinit(&iter);
}
if (excess_hot > 0) {
/* Remove excess elements from the tail of delta's list
* (high-ridx elements are stored at the tail). */
listTypeDelRange(delta->list, -(excess_hot), excess_hot);
/* Trim delta's meta to match: remove the ridx range
* [main_end .. delta_end) from the tail. */
listMetaTrimToLen(delta->meta,
delta->meta->len - (delta_end - main_end));
}
}
}
}

/* always merge small inst into big one */
if (metaListLen(main,SEGMENT_TYPE_HOT) <
metaListLen(delta,SEGMENT_TYPE_HOT)) {
Expand Down Expand Up @@ -2022,6 +2074,52 @@ void swapListMetaDelRange(redisDb *db, robj *key, long ltrim, long rtrim) {
if (meta) listMetaExtend(meta,-ltrim,-rtrim);
}

/* Remove excess elements from the tail of the meta to reach desired_len.
* Used to clean up cold segments that are logically outside the LTRIM keep range. */
static void listMetaTrimToLen(listMeta *meta, long desired_len) {
if (desired_len >= meta->len || desired_len < 0) return;
long excess = meta->len - desired_len;
while (excess > 0 && meta->num > 0) {
segment *last = listMetaLastSegment(meta);
if (excess >= last->len) {
excess -= last->len;
meta->len -= last->len;
meta->num--;
} else {
last->len -= excess;
meta->len -= excess;
excess = 0;
}
}
}

/* After LTRIM on a warm/cold list, the meta may retain cold segments that are
* logically outside the master's keep range [orig_lstart..orig_lend].
* This happens because listMetaExtend only removes hot elements from head/tail,
* leaving cold elements in the middle intact even if they're out of range.
* This function trims the meta to match the master's desired list length. */
void swapListMetaTrimCold(redisDb *db, robj *key, long ltrim, long rtrim,
long long orig_lstart, long long orig_lend) {
listMeta *meta = lookupListMeta(db, key);
if (!meta) return;
/* Reconstruct total logical length before this LTRIM ran. */
long total_llen_before = meta->len + ltrim + rtrim;
/* Normalize negative indices (same as Redis ltrimCommand logic). */
if (orig_lstart < 0) orig_lstart += total_llen_before;
if (orig_lend < 0) orig_lend += total_llen_before;
if (orig_lstart < 0) orig_lstart = 0;
long desired_len;
if (orig_lend < orig_lstart || orig_lstart >= total_llen_before) {
desired_len = 0;
} else {
if (orig_lend >= total_llen_before) orig_lend = total_llen_before - 1;
desired_len = orig_lend - orig_lstart + 1;
}
if (meta->len > desired_len) {
listMetaTrimToLen(meta, desired_len);
}
}

/* List rdb save, note that:
* - hot lists are saved as RDB_TYPE_LIST_QUICKLIST (same as origin redis)
* - warm/cold list are saved as RDB_TYPE_LIST, which are more suitable
Expand Down Expand Up @@ -2872,6 +2970,53 @@ int swapListMetaTest(int argc, char *argv[], int accurate) {
metaListDestroy(main);
}

TEST("meta-list: merge clips delta exceeding main ridx range") {
/* Regression test for the listMetaAlign assertion failure that occurs
* when swapListMetaTrimCold() shrinks main's ridx range after a SWAP_IN
* has been dispatched but before metaListMerge() runs.
*
* The decoded delta still covers the old (larger) ridx range. Without
* the fix, metaListMerge() would call
* listMetaAlign(main(last=40), orig_delta_meta(last=60))
* which triggers the serverAssert at line 575 because 40 < 60.
*
* With the fix, the delta is clipped to main's ridx range before the
* merge, so the assertion is never violated. */

/* main: HOT(0,10) | COLD(10,20) | HOT(30,10) => ridx 0..39, 20 HOT */
listMeta *main_meta = listMetaCreate();
listMetaAppendSegment(main_meta, SEGMENT_TYPE_HOT, 0, 10);
listMetaAppendSegment(main_meta, SEGMENT_TYPE_COLD, 10, 20);
listMetaAppendSegment(main_meta, SEGMENT_TYPE_HOT, 30, 10);
robj *main_list = createQuicklistObject(-2, 0);
metaList *main = metaListBuild(main_meta, main_list);
metaListPopulateList(main); /* populate hot elements ridx 0..9, 30..39 */
test_assert(listTypeLength(main_list) == 20);

/* delta: HOT(0,60) => ridx 0..59, 60 HOT elements.
* Simulates a SWAP_IN decoded based on the OLD (untrimmed) meta that
* covered ridx 0..59. After swapListMetaTrimCold trimmed main to
* ridx 0..39, this delta would violate listMetaAlign's invariant. */
listMeta *delta_meta = listMetaCreate();
listMetaAppendSegment(delta_meta, SEGMENT_TYPE_HOT, 0, 60);
robj *delta_list = createQuicklistObject(-2, 0);
metaList *delta = metaListBuild(delta_meta, delta_list);
metaListPopulateList(delta); /* populate ridx 0..59 */
test_assert(listTypeLength(delta_list) == 60);

/* metaListMerge must NOT crash (the fix clips delta to ridx 0..39). */
metaListMerge(main, delta);

/* After merge: main covers ridx 0..39, all HOT (40 elements). */
segment *last = listMetaLastSegment(main->meta);
test_assert(last != NULL);
test_assert(last->index + last->len == 40);
test_assert(listTypeLength(main->list) == 40);

metaListDestroy(delta);
metaListDestroy(main);
}

TEST("meta-list: exclude") {
listMeta *meta = listMetaCreate();
robj *list = createQuicklistObject(-2, 0);
Expand Down
96 changes: 88 additions & 8 deletions src/ctrip_swap_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,87 @@

#include "ctrip_swap.h"

typedef struct replDeferredBeforeCall {
keyRequest key_request;
swapData *data;
void *datactx;
} replDeferredBeforeCall;

typedef struct replDeferredBeforeCallQueue {
client *wc;
list *items; /* list of replDeferredBeforeCall* */
} replDeferredBeforeCallQueue;

static list *replDeferredBeforeCallQueues = NULL;

static replDeferredBeforeCallQueue *replDeferredBeforeCallGetQueue(client *wc, int create) {
if (!replDeferredBeforeCallQueues) {
if (!create) return NULL;
replDeferredBeforeCallQueues = listCreate();
}

listIter li;
listNode *ln;
listRewind(replDeferredBeforeCallQueues, &li);
while ((ln = listNext(&li))) {
replDeferredBeforeCallQueue *q = listNodeValue(ln);
if (q->wc == wc) return q;
}

if (!create) return NULL;
replDeferredBeforeCallQueue *q = zmalloc(sizeof(*q));
q->wc = wc;
q->items = listCreate();
listAddNodeTail(replDeferredBeforeCallQueues, q);
return q;
}

static void replDeferredBeforeCallEnqueue(client *wc, swapCtx *ctx) {
if (!ctx->data) return;
replDeferredBeforeCallQueue *q = replDeferredBeforeCallGetQueue(wc, 1);
replDeferredBeforeCall *item = zmalloc(sizeof(*item));
memset(item, 0, sizeof(*item));
copyKeyRequest(&item->key_request, ctx->key_request);
item->data = ctx->data;
item->datactx = ctx->datactx;
ctx->data = NULL;
ctx->datactx = NULL;
listAddNodeTail(q->items, item);
}

static void replDeferredBeforeCallFlush(client *wc, int apply_before_call) {
if (!replDeferredBeforeCallQueues) return;
listNode *qln = listFirst(replDeferredBeforeCallQueues);
while (qln) {
listNode *next = listNextNode(qln);
replDeferredBeforeCallQueue *q = listNodeValue(qln);
if (q->wc == wc) {
listNode *ln = listFirst(q->items);
while (ln) {
listNode *ln_next = listNextNode(ln);
replDeferredBeforeCall *item = listNodeValue(ln);
if (apply_before_call && item->data && swapDataAlreadySetup(item->data)) {
swapDataBeforeCall(item->data, &item->key_request, wc, item->datactx);
}
if (item->data && item->data->value != NULL) {
decrRefCount(item->data->value);
item->data->value = NULL;
}
keyRequestDeinit(&item->key_request);
if (item->data) swapDataFree(item->data, item->datactx);
zfree(item);
listDelNode(q->items, ln);
ln = ln_next;
}
listRelease(q->items);
zfree(q);
listDelNode(replDeferredBeforeCallQueues, qln);
break;
}
qln = next;
}
}

/* See replicationHandleMasterDisconnection for more details */
void replicationHandleMasterDisconnectionWithoutReconnect(void) {
/* Fire the master link modules event. */
Expand Down Expand Up @@ -130,6 +211,7 @@ static void replClientUpdateSelectedDb(client *c) {
static void replCommandDispatch(client *wc, client *c) {
/* wc may still have argv from last dispatched command, free it safely. */
if (wc->argv) freeClientArgv(wc);
replDeferredBeforeCallFlush(wc, 0);

wc->db = c->db;

Expand Down Expand Up @@ -169,7 +251,9 @@ static void processFinishedReplCommands() {

while ((ln = listFirst(server.swap_repl_worker_clients_used))) {
wc = listNodeValue(ln);
if (wc->CLIENT_REPL_SWAPPING) break;
if (wc->CLIENT_REPL_SWAPPING) {
break;
}
c = wc->swap_repl_client;

wc->flags &= ~CLIENT_SWAPPING;
Expand All @@ -190,9 +274,11 @@ static void processFinishedReplCommands() {
}

if (wc->swap_errcode) {
replDeferredBeforeCallFlush(wc, 0);
rejectCommandFormat(c,"Swap failed (code=%d)",wc->swap_errcode);
wc->swap_errcode = 0;
} else {
replDeferredBeforeCallFlush(wc, 1);
call(wc, CMD_CALL_FULL);

/* post call */
Expand Down Expand Up @@ -252,19 +338,13 @@ void replWorkerClientKeyRequestFinished(client *wc, swapCtx *ctx) {
client *c;
listNode *ln;
list *repl_swapping_clients;
UNUSED(ctx);

serverLog(LL_DEBUG, "> replWorkerClientSwapFinished client(id=%ld,cmd=%s,key=%s)",
wc->id,wc->cmd->fullname,wc->argc <= 1 ? "": (sds)wc->argv[1]->ptr);

DEBUG_MSGS_APPEND(&ctx->msgs, "request-finished", "errcode=%d",ctx->errcode);

if (ctx->errcode) clientSwapError(wc,ctx->errcode);
keyRequestBeforeCall(wc,ctx);
if (ctx->data && ctx->data->value != NULL) {
decrRefCount(ctx->data->value);
ctx->data->value = NULL;
}
replDeferredBeforeCallEnqueue(wc, ctx);
/* Flag swap finished, note that command processing will be defered to
* processFinishedReplCommands becasue there might be unfinished preceeding swap. */
wc->keyrequests_count--;
Expand Down
9 changes: 7 additions & 2 deletions src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,13 @@ void ioThreadsScaleDownTryEnd(void) {
if (thread_state == THREAD_STATE_STOPPED) {
destroyIOThread(t);
server.io_threads_num--;
/* scaling down is not urgent
(it doesn't consume time from the main thread) */
/* If all threads have been scaled down to the target
* count, transition immediately so callers see NONE
* in the same beforeSleep pass. */
if (server.io_threads_num <= server.config_io_threads_num) {
server.io_threads_scale_status = IO_THREAD_SCALE_STATUS_NONE;
serverLog(LL_NOTICE, "IO threads scale-down end");
}
}
} else {
pauseIOThread(t->id);
Expand Down
19 changes: 18 additions & 1 deletion src/t_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,6 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, i
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_LIST, llen + count, llen);
if (llen == 0) {
if (deleted) *deleted = 1;

dbDelete(c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
} else {
Expand Down Expand Up @@ -1012,6 +1011,24 @@ void ltrimCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
#ifdef ENABLE_SWAP
objectMeta *om = lookupMeta(c->db,c->argv[1]);
/* After trimming the hot list, also trim cold meta segments that are logically
* outside the master's keep range. This corrects a divergence that occurs when
* eviction happens between LTRIM dispatch (when the key may be hot/meta-less)
* and LTRIM apply (when the key is warm with cold segments).
* We detect this case by checking whether a beforeCall rewrite occurred
* (c->swap_arg_rewrites->num == 2), which means the original logical indices
* are saved in the rewrites and the meta was present at apply time. */
if (om && c->swap_arg_rewrites && c->swap_arg_rewrites->num == 2 &&
c->swap_arg_rewrites->rewrites[0].arg_req.arg_idx == 2 &&
c->swap_arg_rewrites->rewrites[1].arg_req.arg_idx == 3) {
long long orig_lstart, orig_lend;
if (getLongLongFromObject(c->swap_arg_rewrites->rewrites[0].orig_arg, &orig_lstart) == C_OK &&
getLongLongFromObject(c->swap_arg_rewrites->rewrites[1].orig_arg, &orig_lend) == C_OK) {
swapListMetaTrimCold(c->db, c->argv[1], ltrim, rtrim, orig_lstart, orig_lend);
/* Re-lookup om after potential meta modification */
om = lookupMeta(c->db, c->argv[1]);
}
}
notifyKeyspaceEventDirty(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id,o,NULL);
if ((llenNew = swapListTypeLength(o,om)) == 0) {
#else
Expand Down
Loading
Loading