Skip to content

Commit 3209828

Browse files
committed
fix: refresh cluster topology when replica slots are missing (#99)
Signed-off-by: Rueian <[email protected]>
1 parent 4ecfca8 commit 3209828

File tree

2 files changed

+187
-27
lines changed

2 files changed

+187
-27
lines changed

cluster.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -460,16 +460,15 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
460460
break
461461
}
462462
} else if toReplica && c.rslots != nil {
463-
if c.opt.ReadNodeSelector != nil {
464-
nodes := c.rslots[slot]
465-
rIndex := c.opt.ReadNodeSelector(slot, nodes)
466-
if rIndex >= 0 && rIndex < len(nodes) {
467-
p = c.rslots[slot][rIndex].conn
468-
} else {
469-
p = c.wslots[slot]
463+
if nodes := c.rslots[slot]; len(nodes) > 0 {
464+
rIndex := 0
465+
if c.opt.ReadNodeSelector != nil {
466+
rIndex = c.opt.ReadNodeSelector(slot, nodes)
467+
if rIndex < 0 || rIndex >= len(nodes) {
468+
rIndex = 0
469+
}
470470
}
471-
} else {
472-
p = c.rslots[slot][0].conn
471+
p = nodes[rIndex].conn
473472
}
474473
} else {
475474
p = c.wslots[slot]
@@ -617,18 +616,18 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
617616
var cc conn
618617
slot := cmd.Slot()
619618
if c.opt.SendToReplicas(cmd) {
620-
bm.Set(i)
621-
if c.opt.ReadNodeSelector != nil {
622-
nodes := c.rslots[slot]
623-
rIndex := c.opt.ReadNodeSelector(slot, nodes)
624-
if rIndex > 0 && rIndex < len(nodes) {
625-
itor[i] = rIndex
626-
} else {
627-
rIndex = 0 // default itor[i] = 0
619+
if nodes := c.rslots[slot]; len(nodes) > 0 {
620+
bm.Set(i)
621+
rIndex := 0
622+
if c.opt.ReadNodeSelector != nil {
623+
rIndex = c.opt.ReadNodeSelector(slot, nodes)
624+
if rIndex < 0 || rIndex >= len(nodes) {
625+
rIndex = 0
626+
} else if rIndex != 0 { // the default itor[i] is 0
627+
itor[i] = rIndex
628+
}
628629
}
629630
cc = nodes[rIndex].conn
630-
} else {
631-
cc = c.rslots[slot][0].conn
632631
}
633632
} else {
634633
cc = c.wslots[slot]
@@ -1109,15 +1108,15 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
11091108
var p conn
11101109
slot := cmd.Cmd.Slot()
11111110
if c.opt.SendToReplicas(Completed(cmd.Cmd)) {
1112-
if c.opt.ReadNodeSelector != nil {
1113-
rIndex := c.opt.ReadNodeSelector(slot, c.rslots[slot])
1114-
if rIndex >= 0 && rIndex < len(c.rslots[slot]) {
1115-
p = c.rslots[slot][rIndex].conn
1116-
} else {
1117-
p = c.wslots[slot]
1111+
if nodes := c.rslots[slot]; len(nodes) > 0 {
1112+
rIndex := 0
1113+
if c.opt.ReadNodeSelector != nil {
1114+
rIndex = c.opt.ReadNodeSelector(slot, nodes)
1115+
if rIndex < 0 || rIndex >= len(nodes) {
1116+
rIndex = 0
1117+
}
11181118
}
1119-
} else {
1120-
p = c.rslots[slot][0].conn
1119+
p = nodes[rIndex].conn
11211120
}
11221121
} else {
11231122
p = c.wslots[slot]

cluster_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10796,3 +10796,164 @@ func TestClusterClient_ReadNodeSelector_SendToAlternatePrimaryAndReplicaNodes(t
1079610796
}
1079710797
})
1079810798
}
10799+
10800+
func TestClusterClient_Refresh_MissingSlotsForReplicas_Do(t *testing.T) {
10801+
defer ShouldNotLeak(SetupLeakDetection())
10802+
10803+
var refresh int64
10804+
primaryNodeConn := &mockConn{
10805+
DoOverride: map[string]func(cmd Completed) RedisResult{
10806+
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
10807+
if atomic.AddInt64(&refresh, 1) == 1 {
10808+
return newResult(slicemsg('*', slotsMultiResp.val.values()[:1]), nil)
10809+
}
10810+
return slotsMultiResp
10811+
},
10812+
},
10813+
}
10814+
replicaNodeConn := &mockConn{
10815+
DoOverride: map[string]func(cmd Completed) RedisResult{
10816+
"GET K1{d}": func(cmd Completed) RedisResult {
10817+
return newResult(strmsg('+', "GET K1{d}"), nil)
10818+
},
10819+
},
10820+
}
10821+
10822+
client, err := newClusterClient(
10823+
&ClientOption{
10824+
InitAddress: []string{"127.0.0.1:0"},
10825+
SendToReplicas: func(cmd Completed) bool {
10826+
return true
10827+
},
10828+
ReadNodeSelector: func(_ uint16, _ []NodeInfo) int {
10829+
return 1
10830+
},
10831+
},
10832+
func(dst string, opt *ClientOption) conn {
10833+
if dst == "127.0.0.1:0" { // primary nodes
10834+
return primaryNodeConn
10835+
} else if dst == "127.0.3.1:1" { // replica nodes
10836+
return replicaNodeConn
10837+
}
10838+
return &mockConn{}
10839+
},
10840+
newRetryer(defaultRetryDelayFn),
10841+
)
10842+
if err != nil {
10843+
t.Fatalf("unexpected err %v", err)
10844+
}
10845+
10846+
if v, err := client.Do(context.Background(), client.B().Get().Key("K1{d}").Build()).ToString(); err != nil || v != "GET K1{d}" {
10847+
t.Fatalf("unexpected response %v %v", v, err)
10848+
}
10849+
}
10850+
10851+
func TestClusterClient_Refresh_MissingSlotsForReplicas_DoMulti(t *testing.T) {
10852+
defer ShouldNotLeak(SetupLeakDetection())
10853+
10854+
var refresh int64
10855+
primaryNodeConn := &mockConn{
10856+
DoOverride: map[string]func(cmd Completed) RedisResult{
10857+
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
10858+
if atomic.AddInt64(&refresh, 1) == 1 {
10859+
return newResult(slicemsg('*', slotsMultiResp.val.values()[:1]), nil)
10860+
}
10861+
return slotsMultiResp
10862+
},
10863+
},
10864+
}
10865+
replicaNodeConn := &mockConn{
10866+
DoMultiFn: func(multi ...Completed) *redisresults {
10867+
resps := make([]RedisResult, len(multi))
10868+
for i, cmd := range multi {
10869+
resps[i] = newResult(strmsg('+', strings.Join(cmd.Commands(), " ")), nil)
10870+
}
10871+
return &redisresults{s: resps}
10872+
},
10873+
}
10874+
10875+
client, err := newClusterClient(
10876+
&ClientOption{
10877+
InitAddress: []string{"127.0.0.1:0"},
10878+
SendToReplicas: func(cmd Completed) bool {
10879+
return true
10880+
},
10881+
ReadNodeSelector: func(_ uint16, _ []NodeInfo) int {
10882+
return 1
10883+
},
10884+
},
10885+
func(dst string, opt *ClientOption) conn {
10886+
if dst == "127.0.0.1:0" { // primary nodes
10887+
return primaryNodeConn
10888+
} else if dst == "127.0.3.1:1" { // replica nodes
10889+
return replicaNodeConn
10890+
}
10891+
return &mockConn{}
10892+
},
10893+
newRetryer(defaultRetryDelayFn),
10894+
)
10895+
if err != nil {
10896+
t.Fatalf("unexpected err %v", err)
10897+
}
10898+
10899+
for _, resp := range client.DoMulti(context.Background(), client.B().Get().Key("K1{d}").Build()) {
10900+
if v, err := resp.ToString(); err != nil || v != "GET K1{d}" {
10901+
t.Fatalf("unexpected response %v %v", v, err)
10902+
}
10903+
}
10904+
}
10905+
10906+
func TestClusterClient_Refresh_MissingSlotsForReplicas_DoMultiCache(t *testing.T) {
10907+
defer ShouldNotLeak(SetupLeakDetection())
10908+
10909+
var refresh int64
10910+
primaryNodeConn := &mockConn{
10911+
DoOverride: map[string]func(cmd Completed) RedisResult{
10912+
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
10913+
if atomic.AddInt64(&refresh, 1) == 1 {
10914+
return newResult(slicemsg('*', slotsMultiResp.val.values()[:1]), nil)
10915+
}
10916+
return slotsMultiResp
10917+
},
10918+
},
10919+
}
10920+
replicaNodeConn := &mockConn{
10921+
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
10922+
resps := make([]RedisResult, len(multi))
10923+
for i, cmd := range multi {
10924+
resps[i] = newResult(strmsg('+', strings.Join(cmd.Cmd.Commands(), " ")), nil)
10925+
}
10926+
return &redisresults{s: resps}
10927+
},
10928+
}
10929+
10930+
client, err := newClusterClient(
10931+
&ClientOption{
10932+
InitAddress: []string{"127.0.0.1:0"},
10933+
SendToReplicas: func(cmd Completed) bool {
10934+
return true
10935+
},
10936+
ReadNodeSelector: func(_ uint16, _ []NodeInfo) int {
10937+
return 1
10938+
},
10939+
},
10940+
func(dst string, opt *ClientOption) conn {
10941+
if dst == "127.0.0.1:0" { // primary nodes
10942+
return primaryNodeConn
10943+
} else if dst == "127.0.3.1:1" { // replica nodes
10944+
return replicaNodeConn
10945+
}
10946+
return &mockConn{}
10947+
},
10948+
newRetryer(defaultRetryDelayFn),
10949+
)
10950+
if err != nil {
10951+
t.Fatalf("unexpected err %v", err)
10952+
}
10953+
10954+
for _, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("K1{d}").Cache(), time.Second)) {
10955+
if v, err := resp.ToString(); err != nil || v != "GET K1{d}" {
10956+
t.Fatalf("unexpected response %v %v", v, err)
10957+
}
10958+
}
10959+
}

0 commit comments

Comments
 (0)