Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,18 @@ func (c *clusterClient) _refresh() (err error) {
var removes []conn

c.mu.RLock()
// Preserve old slot mappings to use as fallback for incomplete topology
oldWslots := c.wslots
oldRslots := c.rslots
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh.conn = cc.conn
conns[addr] = fresh
// Validate connection health before reusing
// If connection has error, it will be replaced with a new one
if cc.conn.Error() == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mux handles reconnections internally, so no Error() check at the cluster client is okay.

fresh.conn = cc.conn
conns[addr] = fresh
}
// else: let the new connection be created (conns[addr] already has fresh conn from connFn)
} else {
removes = append(removes, cc.conn)
}
Expand Down Expand Up @@ -321,6 +329,19 @@ func (c *clusterClient) _refresh() (err error) {
}
}

// Preserve old slot mappings for any slots not covered by the new topology
// This prevents nil connections when CLUSTER SLOTS returns incomplete data
for i := 0; i < 16384; i++ {
Comment on lines +332 to +334
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not do this. CLUSTER SLOTS should be the only truth.

if wslots[i] == nil && oldWslots[i] != nil {
wslots[i] = oldWslots[i]
}
if c.rOpt != nil && len(rslots) > 0 {
if len(rslots[i]) == 0 && len(oldRslots) > i && len(oldRslots[i]) > 0 {
rslots[i] = oldRslots[i]
}
}
}

c.mu.Lock()
c.wslots = wslots
c.rslots = rslots
Expand Down Expand Up @@ -469,6 +490,8 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
}
}
p = nodes[rIndex].conn
} else {
p = c.wslots[slot] // fallback to master when replica slots are missing
}
} else {
p = c.wslots[slot]
Expand Down Expand Up @@ -628,6 +651,8 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
}
}
cc = nodes[rIndex].conn
} else {
cc = c.wslots[slot] // fallback to master when replica slots are missing
}
} else {
cc = c.wslots[slot]
Expand Down Expand Up @@ -1117,6 +1142,8 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
}
}
p = nodes[rIndex].conn
} else {
p = c.wslots[slot] // fallback to master when replica slots are missing
}
} else {
p = c.wslots[slot]
Expand Down
156 changes: 156 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10957,3 +10957,159 @@ func TestClusterClient_Refresh_MissingSlotsForReplicas_DoMultiCache(t *testing.T
}
}
}

func TestClusterClient_Refresh_BrokenConnectionNotReused(t *testing.T) {
defer ShouldNotLeak(SetupLeakDetection())

var refreshCount int64
var brokenConnReused atomic.Bool

// Simulate a connection that becomes broken after first use
brokenConn := &mockConn{
ErrorFn: func() error {
return errors.New("connection broken")
},
DoOverride: map[string]func(cmd Completed) RedisResult{
"SET K1{d} V1": func(cmd Completed) RedisResult {
// If this broken connection is reused, set the flag
brokenConnReused.Store(true)
return newErrResult(errors.New("broken connection should not be reused"))
},
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
atomic.AddInt64(&refreshCount, 1)
return slotsMultiResp
},
},
}

// Healthy connection that should be used after refresh
healthyConn := &mockConn{
DoOverride: map[string]func(cmd Completed) RedisResult{
"SET K1{d} V1": func(cmd Completed) RedisResult {
return newResult(strmsg('+', "OK"), nil)
},
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
atomic.AddInt64(&refreshCount, 1)
return slotsMultiResp
},
},
}

var connCreationCount int64
client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
},
func(dst string, opt *ClientOption) conn {
count := atomic.AddInt64(&connCreationCount, 1)
if count == 1 {
// First connection is healthy initially
return healthyConn
} else if count == 2 {
// During refresh, this connection appears broken
// Simulating network issue that broke the connection
return brokenConn
}
// Third connection created should be used (healthy replacement)
return healthyConn
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

// First command should succeed with initial healthy connection
if v, err := client.Do(context.Background(), client.B().Set().Key("K1{d}").Value("V1").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("initial command failed: %v %v", v, err)
}

// Force a topology refresh which encounters the broken connection
if err := client.refresh(context.Background()); err != nil {
t.Fatalf("refresh failed: %v", err)
}

// Verify that refresh actually happened
if atomic.LoadInt64(&refreshCount) == 0 {
t.Fatal("topology refresh did not occur")
}

// Now execute command again - it should use a new healthy connection
// NOT the broken connection
if v, err := client.Do(context.Background(), client.B().Set().Key("K1{d}").Value("V1").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("command after refresh failed: %v %v", v, err)
}

// Verify that the broken connection was NOT reused
if brokenConnReused.Load() {
t.Fatal("broken connection was reused - connection health validation failed!")
}
}

func TestClusterClient_Refresh_BrokenConnectionReplacedForAllSlots(t *testing.T) {
defer ShouldNotLeak(SetupLeakDetection())

var brokenConnUsed int64

// Connection that reports error
brokenConn := &mockConn{
ErrorFn: func() error {
return errors.New("connection broken due to network issue")
},
DoOverride: map[string]func(cmd Completed) RedisResult{
"GET K1{d}": func(cmd Completed) RedisResult {
atomic.AddInt64(&brokenConnUsed, 1)
return newErrResult(errors.New("using broken connection"))
},
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
return slotsMultiResp
},
},
}

healthyConn := &mockConn{
DoOverride: map[string]func(cmd Completed) RedisResult{
"GET K1{d}": func(cmd Completed) RedisResult {
return newResult(strmsg('+', "value1"), nil)
},
"CLUSTER SLOTS": func(cmd Completed) RedisResult {
return slotsMultiResp
},
},
}

var useHealthy atomic.Bool
client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
},
func(dst string, opt *ClientOption) conn {
if useHealthy.Load() {
return healthyConn
}
// First pass: connection becomes broken
// After refresh with validation: should get healthy conn
useHealthy.Store(true)
return brokenConn
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

// Trigger topology refresh - broken connection should be detected and replaced
if err := client.refresh(context.Background()); err != nil {
t.Fatalf("refresh failed: %v", err)
}

// Execute command - should use healthy connection, not broken one
if v, err := client.Do(context.Background(), client.B().Get().Key("K1{d}").Build()).ToString(); err != nil || v != "value1" {
t.Fatalf("command failed: %v %v", v, err)
}

// Verify broken connection was never used for commands
if count := atomic.LoadInt64(&brokenConnUsed); count > 0 {
t.Fatalf("broken connection was used %d times - should be 0", count)
}
}
Loading