Skip to content

Commit 83fb848

Browse files
committed
HostPoolHostPolicy was moved to separate package
1 parent 34fdeeb commit 83fb848

File tree

4 files changed

+207
-186
lines changed

4 files changed

+207
-186
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package hostpoolhostpolicy
2+
3+
import (
4+
"sync"
5+
6+
"github.com/hailocab/go-hostpool"
7+
8+
"github.com/gocql/gocql"
9+
)
10+
11+
// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
12+
// to distribute queries between hosts and prevent sending queries to
13+
// unresponsive hosts. When creating the host pool that is passed to the policy
14+
// use an empty slice of hosts as the hostpool will be populated later by gocql.
15+
// See below for examples of usage:
16+
//
17+
// // Create host selection policy using a simple host pool
18+
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
19+
//
20+
// // Create host selection policy using an epsilon greedy pool
21+
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
22+
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
23+
// )
24+
func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy {
25+
return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp}
26+
}
27+
28+
type hostPoolHostPolicy struct {
29+
hp hostpool.HostPool
30+
mu sync.RWMutex
31+
hostMap map[string]*gocql.HostInfo
32+
}
33+
34+
func (r *hostPoolHostPolicy) Init(*gocql.Session) {}
35+
func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {}
36+
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
37+
func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true }
38+
39+
func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) {
40+
peers := make([]string, len(hosts))
41+
hostMap := make(map[string]*gocql.HostInfo, len(hosts))
42+
43+
for i, host := range hosts {
44+
ip := host.ConnectAddress().String()
45+
peers[i] = ip
46+
hostMap[ip] = host
47+
}
48+
49+
r.mu.Lock()
50+
r.hp.SetHosts(peers)
51+
r.hostMap = hostMap
52+
r.mu.Unlock()
53+
}
54+
55+
func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) {
56+
ip := host.ConnectAddress().String()
57+
58+
r.mu.Lock()
59+
defer r.mu.Unlock()
60+
61+
// If the host addr is present and isn't nil return
62+
if h, ok := r.hostMap[ip]; ok && h != nil {
63+
return
64+
}
65+
// otherwise, add the host to the map
66+
r.hostMap[ip] = host
67+
// and construct a new peer list to give to the HostPool
68+
hosts := make([]string, 0, len(r.hostMap))
69+
for addr := range r.hostMap {
70+
hosts = append(hosts, addr)
71+
}
72+
73+
r.hp.SetHosts(hosts)
74+
}
75+
76+
func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) {
77+
ip := host.ConnectAddress().String()
78+
79+
r.mu.Lock()
80+
defer r.mu.Unlock()
81+
82+
if _, ok := r.hostMap[ip]; !ok {
83+
return
84+
}
85+
86+
delete(r.hostMap, ip)
87+
hosts := make([]string, 0, len(r.hostMap))
88+
for _, host := range r.hostMap {
89+
hosts = append(hosts, host.ConnectAddress().String())
90+
}
91+
92+
r.hp.SetHosts(hosts)
93+
}
94+
95+
func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) {
96+
r.AddHost(host)
97+
}
98+
99+
func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) {
100+
r.RemoveHost(host)
101+
}
102+
103+
func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost {
104+
return func() gocql.SelectedHost {
105+
r.mu.RLock()
106+
defer r.mu.RUnlock()
107+
108+
if len(r.hostMap) == 0 {
109+
return nil
110+
}
111+
112+
hostR := r.hp.Get()
113+
host, ok := r.hostMap[hostR.Host()]
114+
if !ok {
115+
return nil
116+
}
117+
118+
return selectedHostPoolHost{
119+
policy: r,
120+
info: host,
121+
hostR: hostR,
122+
}
123+
}
124+
}
125+
126+
// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
127+
// implements the SelectedHost interface
128+
type selectedHostPoolHost struct {
129+
policy *hostPoolHostPolicy
130+
info *gocql.HostInfo
131+
hostR hostpool.HostPoolResponse
132+
}
133+
134+
func (host selectedHostPoolHost) Info() *gocql.HostInfo {
135+
return host.info
136+
}
137+
138+
func (host selectedHostPoolHost) Mark(err error) {
139+
ip := host.info.ConnectAddress().String()
140+
141+
host.policy.mu.RLock()
142+
defer host.policy.mu.RUnlock()
143+
144+
if _, ok := host.policy.hostMap[ip]; !ok {
145+
// host was removed between pick and mark
146+
return
147+
}
148+
149+
host.hostR.Mark(err)
150+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package hostpoolhostpolicy
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"testing"
7+
8+
"github.com/hailocab/go-hostpool"
9+
10+
"github.com/gocql/gocql"
11+
)
12+
13+
func TestHostPolicy_HostPool(t *testing.T) {
14+
policy := HostPoolHostPolicy(hostpool.New(nil))
15+
16+
//hosts := []*gocql.HostInfo{
17+
// {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
18+
// {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
19+
//}
20+
firstHost := &gocql.HostInfo{}
21+
firstHost.SetHostID("0")
22+
firstHost.SetConnectAddress(net.IPv4(10, 0, 0, 0))
23+
secHost := &gocql.HostInfo{}
24+
secHost.SetHostID("1")
25+
secHost.SetConnectAddress(net.IPv4(10, 0, 0, 1))
26+
hosts := []*gocql.HostInfo{firstHost, secHost}
27+
// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
28+
// which will result in an unpredictable ordering
29+
policy.SetHosts(hosts)
30+
31+
// the first host selected is actually at [1], but this is ok for RR
32+
// interleaved iteration should always increment the host
33+
iter := policy.Pick(nil)
34+
actualA := iter()
35+
if actualA.Info().HostID() != "0" {
36+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
37+
}
38+
actualA.Mark(nil)
39+
40+
actualB := iter()
41+
if actualB.Info().HostID() != "1" {
42+
t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
43+
}
44+
actualB.Mark(fmt.Errorf("error"))
45+
46+
actualC := iter()
47+
if actualC.Info().HostID() != "0" {
48+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
49+
}
50+
actualC.Mark(nil)
51+
52+
actualD := iter()
53+
if actualD.Info().HostID() != "0" {
54+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
55+
}
56+
actualD.Mark(nil)
57+
}

policies.go

Lines changed: 0 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
"sync"
1717
"sync/atomic"
1818
"time"
19-
20-
"github.com/hailocab/go-hostpool"
2119
)
2220

2321
// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
@@ -670,147 +668,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
670668
}
671669
}
672670

673-
// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
674-
// to distribute queries between hosts and prevent sending queries to
675-
// unresponsive hosts. When creating the host pool that is passed to the policy
676-
// use an empty slice of hosts as the hostpool will be populated later by gocql.
677-
// See below for examples of usage:
678-
//
679-
// // Create host selection policy using a simple host pool
680-
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
681-
//
682-
// // Create host selection policy using an epsilon greedy pool
683-
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
684-
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
685-
// )
686-
func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
687-
return &hostPoolHostPolicy{hostMap: map[string]*HostInfo{}, hp: hp}
688-
}
689-
690-
type hostPoolHostPolicy struct {
691-
hp hostpool.HostPool
692-
mu sync.RWMutex
693-
hostMap map[string]*HostInfo
694-
}
695-
696-
func (r *hostPoolHostPolicy) Init(*Session) {}
697-
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
698-
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
699-
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }
700-
701-
func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
702-
peers := make([]string, len(hosts))
703-
hostMap := make(map[string]*HostInfo, len(hosts))
704-
705-
for i, host := range hosts {
706-
ip := host.ConnectAddress().String()
707-
peers[i] = ip
708-
hostMap[ip] = host
709-
}
710-
711-
r.mu.Lock()
712-
r.hp.SetHosts(peers)
713-
r.hostMap = hostMap
714-
r.mu.Unlock()
715-
}
716-
717-
func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
718-
ip := host.ConnectAddress().String()
719-
720-
r.mu.Lock()
721-
defer r.mu.Unlock()
722-
723-
// If the host addr is present and isn't nil return
724-
if h, ok := r.hostMap[ip]; ok && h != nil {
725-
return
726-
}
727-
// otherwise, add the host to the map
728-
r.hostMap[ip] = host
729-
// and construct a new peer list to give to the HostPool
730-
hosts := make([]string, 0, len(r.hostMap))
731-
for addr := range r.hostMap {
732-
hosts = append(hosts, addr)
733-
}
734-
735-
r.hp.SetHosts(hosts)
736-
}
737-
738-
func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
739-
ip := host.ConnectAddress().String()
740-
741-
r.mu.Lock()
742-
defer r.mu.Unlock()
743-
744-
if _, ok := r.hostMap[ip]; !ok {
745-
return
746-
}
747-
748-
delete(r.hostMap, ip)
749-
hosts := make([]string, 0, len(r.hostMap))
750-
for _, host := range r.hostMap {
751-
hosts = append(hosts, host.ConnectAddress().String())
752-
}
753-
754-
r.hp.SetHosts(hosts)
755-
}
756-
757-
func (r *hostPoolHostPolicy) HostUp(host *HostInfo) {
758-
r.AddHost(host)
759-
}
760-
761-
func (r *hostPoolHostPolicy) HostDown(host *HostInfo) {
762-
r.RemoveHost(host)
763-
}
764-
765-
func (r *hostPoolHostPolicy) Pick(qry ExecutableQuery) NextHost {
766-
return func() SelectedHost {
767-
r.mu.RLock()
768-
defer r.mu.RUnlock()
769-
770-
if len(r.hostMap) == 0 {
771-
return nil
772-
}
773-
774-
hostR := r.hp.Get()
775-
host, ok := r.hostMap[hostR.Host()]
776-
if !ok {
777-
return nil
778-
}
779-
780-
return selectedHostPoolHost{
781-
policy: r,
782-
info: host,
783-
hostR: hostR,
784-
}
785-
}
786-
}
787-
788-
// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
789-
// implements the SelectedHost interface
790-
type selectedHostPoolHost struct {
791-
policy *hostPoolHostPolicy
792-
info *HostInfo
793-
hostR hostpool.HostPoolResponse
794-
}
795-
796-
func (host selectedHostPoolHost) Info() *HostInfo {
797-
return host.info
798-
}
799-
800-
func (host selectedHostPoolHost) Mark(err error) {
801-
ip := host.info.ConnectAddress().String()
802-
803-
host.policy.mu.RLock()
804-
defer host.policy.mu.RUnlock()
805-
806-
if _, ok := host.policy.hostMap[ip]; !ok {
807-
// host was removed between pick and mark
808-
return
809-
}
810-
811-
host.hostR.Mark(err)
812-
}
813-
814671
type dcAwareRR struct {
815672
local string
816673
localHosts cowHostList

0 commit comments

Comments
 (0)