Skip to content

Commit ddafa1b

Browse files
committed
Fix cowHostList can't have hosts with same ConnectAddress
cowHostList uses HostInfo.Equal to confirm host uniqueness, which relies on `ConnectAddress.Equal`, which does not allow to have different hosts with same `ConnectAddress`
1 parent d32a392 commit ddafa1b

File tree

3 files changed

+34
-9
lines changed

3 files changed

+34
-9
lines changed

host_source.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool {
201201
return true
202202
}
203203

204-
return h.ConnectAddress().Equal(host.ConnectAddress())
204+
return h.HostID() == host.HostID() && h.ConnectAddressAndPort() == host.ConnectAddressAndPort()
205205
}
206206

207207
func (h *HostInfo) Peer() net.IP {

policies.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"fmt"
3333
"math"
3434
"math/rand"
35-
"net"
3635
"sync"
3736
"sync/atomic"
3837
"time"
@@ -82,7 +81,7 @@ func (c *cowHostList) add(host *HostInfo) bool {
8281
return true
8382
}
8483

85-
func (c *cowHostList) remove(ip net.IP) bool {
84+
func (c *cowHostList) remove(host *HostInfo) bool {
8685
c.mu.Lock()
8786
l := c.get()
8887
size := len(l)
@@ -94,7 +93,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
9493
found := false
9594
newL := make([]*HostInfo, 0, size)
9695
for i := 0; i < len(l); i++ {
97-
if !l[i].ConnectAddress().Equal(ip) {
96+
if !l[i].Equal(host) {
9897
newL = append(newL, l[i])
9998
} else {
10099
found = true
@@ -351,7 +350,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
351350
}
352351

353352
func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) {
354-
r.hosts.remove(host.ConnectAddress())
353+
r.hosts.remove(host)
355354
}
356355

357356
func (r *roundRobinHostPolicy) HostUp(host *HostInfo) {
@@ -517,7 +516,7 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) {
517516

518517
func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
519518
t.mu.Lock()
520-
if t.hosts.remove(host.ConnectAddress()) {
519+
if t.hosts.remove(host) {
521520
meta := t.getMetadataForUpdate()
522521
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
523522
t.updateReplicas(meta, t.getKeyspaceName())
@@ -720,9 +719,9 @@ func (d *dcAwareRR) AddHost(host *HostInfo) {
720719

721720
func (d *dcAwareRR) RemoveHost(host *HostInfo) {
722721
if d.IsLocal(host) {
723-
d.localHosts.remove(host.ConnectAddress())
722+
d.localHosts.remove(host)
724723
} else {
725-
d.remoteHosts.remove(host.ConnectAddress())
724+
d.remoteHosts.remove(host)
726725
}
727726
}
728727

@@ -826,7 +825,7 @@ func (d *rackAwareRR) AddHost(host *HostInfo) {
826825

827826
func (d *rackAwareRR) RemoveHost(host *HostInfo) {
828827
dist := d.HostTier(host)
829-
d.hosts[dist].remove(host.ConnectAddress())
828+
d.hosts[dist].remove(host)
830829
}
831830

832831
func (d *rackAwareRR) HostUp(host *HostInfo) { d.AddHost(host) }

policies_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,32 @@ func TestRoundRobbin(t *testing.T) {
6565
}
6666
}
6767

68+
func TestRoundRobbinSameConnectAddress(t *testing.T) {
69+
policy := RoundRobinHostPolicy()
70+
71+
hosts := [...]*HostInfo{
72+
{hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1), port: 9042},
73+
{hostId: "1", connectAddress: net.IPv4(0, 0, 0, 1), port: 9043},
74+
}
75+
76+
for _, host := range hosts {
77+
policy.AddHost(host)
78+
}
79+
80+
got := make(map[string]bool)
81+
it := policy.Pick(nil)
82+
for h := it(); h != nil; h = it() {
83+
id := h.Info().hostId
84+
if got[id] {
85+
t.Fatalf("got duplicate host: %v", id)
86+
}
87+
got[id] = true
88+
}
89+
if len(got) != len(hosts) {
90+
t.Fatalf("expected %d hosts got %d", len(hosts), len(got))
91+
}
92+
}
93+
6894
// Tests of the token-aware host selection policy implementation with a
6995
// round-robin host selection policy fallback.
7096
func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {

0 commit comments

Comments
 (0)