Skip to content

Commit a481fe2

Browse files
authored
Update clusterMoveNodeSlots to also move importing slots and migrating slots (valkey-io#2370)
In valkey-io#2301, we added clusterMoveNodeSlots to implement the logic of moving slots from old primary to new primary, when myself receives the replica (old primary) message first and the new primary message later in a shard failover. However due to this, when myself receives the new primary message later next time, there is no way to call clusterUpdateSlotsConfigWith, because we have already updated the slots of the new primary before. This result in, for example, importing slots and migrating slots not being updated, see valkey-io#445. In this commit, we also make clusterMoveNodeSlots to move importing slots and migrating slots. Fixes valkey-io#2363. Signed-off-by: Binbin <binloveplay1314@qq.com>
1 parent 2a44506 commit a481fe2

File tree

4 files changed

+107
-34
lines changed

4 files changed

+107
-34
lines changed

src/cluster_legacy.c

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica);
7272
int clusterAddSlot(clusterNode *n, int slot);
7373
int clusterDelSlot(int slot);
7474
int clusterDelNodeSlots(clusterNode *node);
75-
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
75+
void clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node, int *slots, int *importing_slots, int *migrating_slots);
7676
void clusterNodeSetSlotBit(clusterNode *n, int slot);
7777
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required);
7878
void clusterHandleReplicaFailover(void);
@@ -2783,7 +2783,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
27832783
/* Update importing_slots_from to point to the sender, if it is in the
27842784
* same shard as the previous slot owner */
27852785
if (areInSameShard(sender, server.cluster->importing_slots_from[j])) {
2786-
serverLog(LL_NOTICE,
2786+
serverLog(LL_VERBOSE,
27872787
"Failover occurred in migration source. Update importing "
27882788
"source for slot %d to node %.40s (%s) in shard %.40s.",
27892789
j, sender->name, sender->human_nodename, sender->shard_id);
@@ -2825,7 +2825,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
28252825
(server.cluster->migrating_slots_to[j]->configEpoch < senderConfigEpoch ||
28262826
nodeIsReplica(server.cluster->migrating_slots_to[j])) &&
28272827
areInSameShard(server.cluster->migrating_slots_to[j], sender)) {
2828-
serverLog(LL_NOTICE,
2828+
serverLog(LL_VERBOSE,
28292829
"Failover occurred in migration target."
28302830
" Slot %d is now being migrated to node %.40s (%s) in shard %.40s.",
28312831
j, sender->name, sender->human_nodename, sender->shard_id);
@@ -3717,17 +3717,35 @@ int clusterProcessPacket(clusterLink *link) {
37173717
/* A failover occurred in the shard where `sender` belongs to and `sender` is
37183718
* no longer a primary. Update slot assignment to `sender_claimed_config_epoch`,
37193719
* which is the new primary in the shard. */
3720-
int slots = clusterMoveNodeSlots(sender, sender_claimed_primary);
3720+
int slots = 0, importing_slots = 0, migrating_slots = 0;
3721+
clusterMoveNodeSlots(sender, sender_claimed_primary,
3722+
&slots, &importing_slots, &migrating_slots);
37213723
/* `primary` is still a `replica` in this observer node's view;
37223724
* update its role and configEpoch */
37233725
clusterSetNodeAsPrimary(sender_claimed_primary);
37243726
sender_claimed_primary->configEpoch = sender_claimed_config_epoch;
3725-
serverLog(LL_NOTICE,
3726-
"A failover occurred in shard %.40s; node %.40s (%s) lost %d slot(s) and"
3727-
" failed over to node %.40s (%s) with a config epoch of %llu",
3728-
sender->shard_id, sender->name, sender->human_nodename, slots,
3729-
sender_claimed_primary->name, sender_claimed_primary->human_nodename,
3730-
(unsigned long long)sender_claimed_primary->configEpoch);
3727+
if (slots) {
3728+
serverLog(LL_NOTICE,
3729+
"A failover occurred in shard %.40s; node %.40s (%s) lost %d slot(s) and"
3730+
" failed over to node %.40s (%s) with a config epoch of %llu",
3731+
sender->shard_id, sender->name, sender->human_nodename, slots,
3732+
sender_claimed_primary->name, sender_claimed_primary->human_nodename,
3733+
(unsigned long long)sender_claimed_primary->configEpoch);
3734+
}
3735+
if (importing_slots) {
3736+
serverLog(LL_NOTICE,
3737+
"A failover occurred in migration source. Update importing "
3738+
"source of %d slot(s) to node %.40s (%s) in shard %.40s.",
3739+
importing_slots, sender_claimed_primary->name,
3740+
sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id);
3741+
}
3742+
if (migrating_slots) {
3743+
serverLog(LL_NOTICE,
3744+
"A failover occurred in migration target. Update migrating "
3745+
"target of %d slot(s) to node %.40s (%s) in shard %.40s.",
3746+
migrating_slots, sender_claimed_primary->name,
3747+
sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id);
3748+
}
37313749
serverAssert(sender->numslots == 0);
37323750
}
37333751
} else {
@@ -3748,7 +3766,7 @@ int clusterProcessPacket(clusterLink *link) {
37483766
sender->flags |= CLUSTER_NODE_REPLICA;
37493767

37503768
/* Update config and state. */
3751-
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
3769+
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
37523770
}
37533771

37543772
/* Primary node changed for this replica? */
@@ -5886,18 +5904,43 @@ int clusterDelNodeSlots(clusterNode *node) {
58865904
/* Transfer slots from `from_node` to `to_node`.
58875905
*
58885906
* Iterates over all cluster slots, transferring each slot covered
5889-
* by `from_node` to `to_node`. Counts and returns the number of
5890-
* slots transferred. */
5891-
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) {
5892-
int processed = 0;
5907+
* by `from_node` to `to_node`. Includes importing slots and migrating
5908+
* slots. This function currently only called after a failover occurs
5909+
* within a shard, i.e. moving slots from the old primary to the new
5910+
* primary. It is a special case of clusterUpdateSlotsConfigWith. */
5911+
void clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node, int *slots, int *importing_slots, int *migrating_slots) {
5912+
serverAssert(areInSameShard(from_node, to_node));
5913+
int processed = 0, importing_processed = 0, migrating_processed = 0;
5914+
58935915
for (int j = 0; j < CLUSTER_SLOTS; j++) {
58945916
if (clusterNodeCoversSlot(from_node, j)) {
58955917
clusterDelSlot(j);
58965918
clusterAddSlot(to_node, j);
58975919
processed++;
58985920
}
5921+
5922+
if (server.cluster->importing_slots_from[j] == from_node) {
5923+
serverLog(LL_VERBOSE,
5924+
"Failover occurred in migration source. Update importing "
5925+
"source for slot %d to node %.40s (%s) in shard %.40s.",
5926+
j, to_node->name, to_node->human_nodename, to_node->shard_id);
5927+
server.cluster->importing_slots_from[j] = to_node;
5928+
importing_processed++;
5929+
}
5930+
5931+
if (server.cluster->migrating_slots_to[j] == from_node) {
5932+
serverLog(LL_VERBOSE,
5933+
"Failover occurred in migration target."
5934+
" Slot %d is now being migrated to node %.40s (%s) in shard %.40s.",
5935+
j, to_node->name, to_node->human_nodename, to_node->shard_id);
5936+
server.cluster->migrating_slots_to[j] = to_node;
5937+
migrating_processed++;
5938+
}
58995939
}
5900-
return processed;
5940+
5941+
if (slots) *slots = processed;
5942+
if (importing_slots) *importing_slots = importing_processed;
5943+
if (migrating_slots) *migrating_slots = migrating_processed;
59015944
}
59025945

59035946
/* Clear the migrating / importing state for all the slots.

tests/support/cluster_util.tcl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,3 +433,21 @@ proc check_cluster_node_mark {flag ref_node_index instance_id_to_check} {
433433
proc get_slot_field {slot_output shard_id node_id attrib_id} {
434434
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
435435
}
436+
437+
proc get_open_slots {srv_idx} {
438+
set slots [dict get [cluster_get_myself $srv_idx] slots]
439+
if {[regexp {\[.*} $slots slots]} {
440+
set slots [regsub -all {[{}]} $slots ""]
441+
return $slots
442+
} else {
443+
return {}
444+
}
445+
}
446+
447+
proc wait_for_slot_state {srv_idx pattern} {
448+
wait_for_condition 100 100 {
449+
[get_open_slots $srv_idx] eq $pattern
450+
} else {
451+
fail "incorrect slot state on R $srv_idx: expected $pattern; got [get_open_slots $srv_idx]"
452+
}
453+
}

tests/unit/cluster/manual-failover.tcl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,16 @@ start_cluster 3 1 {tags {external:skip cluster}} {
422422
set R3_shardid [R 3 cluster myshardid]
423423
assert_equal $R0_shardid $R3_shardid
424424

425+
# We also take this opportunity to verify slot migration.
426+
# Move slot 0 from R0 to R1. Move slot 5462 from R1 to R0.
427+
R 0 cluster setslot 0 migrating $R1_nodeid
428+
R 1 cluster setslot 0 importing $R0_nodeid
429+
R 1 cluster setslot 5462 migrating $R0_nodeid
430+
R 0 cluster setslot 5462 importing $R1_nodeid
431+
assert_equal [get_open_slots 0] "\[0->-$R1_nodeid\] \[5462-<-$R1_nodeid\]"
432+
assert_equal [get_open_slots 1] "\[0-<-$R0_nodeid\] \[5462->-$R0_nodeid\]"
433+
wait_for_slot_state 3 "\[0->-$R1_nodeid\] \[5462-<-$R1_nodeid\]"
434+
425435
# Ensure that related nodes do not reconnect.
426436
R 1 debug disable-cluster-reconnection 1
427437
R 2 debug disable-cluster-reconnection 1
@@ -458,13 +468,33 @@ start_cluster 3 1 {tags {external:skip cluster}} {
458468
assert_equal {0-5461} [dict get [cluster_get_node_by_id 1 $R3_nodeid] slots]
459469
assert_equal {0-5461} [dict get [cluster_get_node_by_id 2 $R3_nodeid] slots]
460470

471+
# Check that in the R1 perspective, both migration-source and migration-target
472+
# have moved from R0 to R1.
473+
assert_equal [get_open_slots 0] "\[0->-$R1_nodeid\] \[5462-<-$R1_nodeid\]"
474+
assert_equal [get_open_slots 1] "\[0-<-$R3_nodeid\] \[5462->-$R3_nodeid\]"
475+
assert_equal [get_open_slots 3] "\[0->-$R1_nodeid\] \[5462-<-$R1_nodeid\]"
476+
461477
# A failover occurred in shard, we will only go to this code branch,
462478
# verify we print the logs.
479+
480+
# Both importing slots and migrating slots are move to R3.
481+
set pattern "*Failover occurred in migration source. Update importing source for slot 0 to node $R3_nodeid () in shard $R3_shardid*"
482+
verify_log_message -1 $pattern $loglines1
483+
set pattern "*Failover occurred in migration target. Slot 5462 is now being migrated to node $R3_nodeid () in shard $R3_shardid*"
484+
verify_log_message -1 $pattern $loglines1
485+
486+
# Both slots are move to R3.
463487
set R0_slots 5462
464488
set pattern "*A failover occurred in shard $R3_shardid; node $R0_nodeid () lost $R0_slots slot(s) and failed over to node $R3_nodeid*"
465489
verify_log_message -1 $pattern $loglines1
466490
verify_log_message -2 $pattern $loglines2
467491

492+
# Both importing slots and migrating slots are move to R3.
493+
set pattern "*A failover occurred in migration source. Update importing source of 1 slot(s) to node $R3_nodeid () in shard $R3_shardid*"
494+
verify_log_message -1 $pattern $loglines1
495+
set pattern "*A failover occurred in migration target. Update migrating target of 1 slot(s) to node $R3_nodeid () in shard $R3_shardid*"
496+
verify_log_message -1 $pattern $loglines1
497+
468498
R 1 debug disable-cluster-reconnection 0
469499
R 2 debug disable-cluster-reconnection 0
470500
R 3 debug disable-cluster-reconnection 0

tests/unit/cluster/slot-migration.tcl

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
proc get_open_slots {srv_idx} {
2-
set slots [dict get [cluster_get_myself $srv_idx] slots]
3-
if {[regexp {\[.*} $slots slots]} {
4-
set slots [regsub -all {[{}]} $slots ""]
5-
return $slots
6-
} else {
7-
return {}
8-
}
9-
}
10-
111
proc get_cluster_role {srv_idx} {
122
set flags [dict get [cluster_get_myself $srv_idx] flags]
133
set role [lindex $flags 1]
@@ -80,14 +70,6 @@ proc wait_for_role {srv_idx role} {
8070
wait_for_cluster_propagation
8171
}
8272

83-
proc wait_for_slot_state {srv_idx pattern} {
84-
wait_for_condition 100 100 {
85-
[get_open_slots $srv_idx] eq $pattern
86-
} else {
87-
fail "incorrect slot state on R $srv_idx: expected $pattern; got [get_open_slots $srv_idx]"
88-
}
89-
}
90-
9173
# restart a server and wait for it to come back online
9274
proc fail_server {server_id} {
9375
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]

0 commit comments

Comments
 (0)