Skip to content

Commit fd632d8

Browse files
committed
Remove HostPoolHostPolicy from gocql package
HostPoolHostPolicy was moved to a separate package, and users don't need to download dependency if they aren't using it. patch by Oleksandr Luzhniy; reviewed by João Reis, Stanislav Bychkov, James Hartig, for CASSGO-21
1 parent d345630 commit fd632d8

File tree

5 files changed

+210
-186
lines changed

5 files changed

+210
-186
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
- Keep nil slices in MapScan (CASSGO-44)
2828

2929
- Improve error messages for marshalling (CASSGO-38)
30+
31+
- Remove HostPoolHostPolicy from gocql package (CASSGO-21)
32+
3033
### Fixed
3134

3235
- Retry policy now takes into account query idempotency (CASSGO-27)

hostpool/hostpool.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package hostpool
2+
3+
import (
4+
"sync"
5+
6+
"github.com/hailocab/go-hostpool"
7+
8+
"github.com/gocql/gocql"
9+
)
10+
11+
// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
12+
// to distribute queries between hosts and prevent sending queries to
13+
// unresponsive hosts. When creating the host pool that is passed to the policy
14+
// use an empty slice of hosts as the hostpool will be populated later by gocql.
15+
// See below for examples of usage:
16+
//
17+
// // Create host selection policy using a simple host pool
18+
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
19+
//
20+
// // Create host selection policy using an epsilon greedy pool
21+
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
22+
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
23+
// )
24+
func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy {
25+
return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp}
26+
}
27+
28+
type hostPoolHostPolicy struct {
29+
hp hostpool.HostPool
30+
mu sync.RWMutex
31+
hostMap map[string]*gocql.HostInfo
32+
}
33+
34+
func (r *hostPoolHostPolicy) Init(*gocql.Session) {}
35+
func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {}
36+
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
37+
func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true }
38+
39+
func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) {
40+
peers := make([]string, len(hosts))
41+
hostMap := make(map[string]*gocql.HostInfo, len(hosts))
42+
43+
for i, host := range hosts {
44+
ip := host.ConnectAddress().String()
45+
peers[i] = ip
46+
hostMap[ip] = host
47+
}
48+
49+
r.mu.Lock()
50+
r.hp.SetHosts(peers)
51+
r.hostMap = hostMap
52+
r.mu.Unlock()
53+
}
54+
55+
func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) {
56+
ip := host.ConnectAddress().String()
57+
58+
r.mu.Lock()
59+
defer r.mu.Unlock()
60+
61+
// If the host addr is present and isn't nil return
62+
if h, ok := r.hostMap[ip]; ok && h != nil {
63+
return
64+
}
65+
// otherwise, add the host to the map
66+
r.hostMap[ip] = host
67+
// and construct a new peer list to give to the HostPool
68+
hosts := make([]string, 0, len(r.hostMap))
69+
for addr := range r.hostMap {
70+
hosts = append(hosts, addr)
71+
}
72+
73+
r.hp.SetHosts(hosts)
74+
}
75+
76+
func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) {
77+
ip := host.ConnectAddress().String()
78+
79+
r.mu.Lock()
80+
defer r.mu.Unlock()
81+
82+
if _, ok := r.hostMap[ip]; !ok {
83+
return
84+
}
85+
86+
delete(r.hostMap, ip)
87+
hosts := make([]string, 0, len(r.hostMap))
88+
for _, host := range r.hostMap {
89+
hosts = append(hosts, host.ConnectAddress().String())
90+
}
91+
92+
r.hp.SetHosts(hosts)
93+
}
94+
95+
func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) {
96+
r.AddHost(host)
97+
}
98+
99+
func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) {
100+
r.RemoveHost(host)
101+
}
102+
103+
func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost {
104+
return func() gocql.SelectedHost {
105+
r.mu.RLock()
106+
defer r.mu.RUnlock()
107+
108+
if len(r.hostMap) == 0 {
109+
return nil
110+
}
111+
112+
hostR := r.hp.Get()
113+
host, ok := r.hostMap[hostR.Host()]
114+
if !ok {
115+
return nil
116+
}
117+
118+
return selectedHostPoolHost{
119+
policy: r,
120+
info: host,
121+
hostR: hostR,
122+
}
123+
}
124+
}
125+
126+
// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
127+
// implements the SelectedHost interface
128+
type selectedHostPoolHost struct {
129+
policy *hostPoolHostPolicy
130+
info *gocql.HostInfo
131+
hostR hostpool.HostPoolResponse
132+
}
133+
134+
func (host selectedHostPoolHost) Info() *gocql.HostInfo {
135+
return host.info
136+
}
137+
138+
func (host selectedHostPoolHost) Mark(err error) {
139+
ip := host.info.ConnectAddress().String()
140+
141+
host.policy.mu.RLock()
142+
defer host.policy.mu.RUnlock()
143+
144+
if _, ok := host.policy.hostMap[ip]; !ok {
145+
// host was removed between pick and mark
146+
return
147+
}
148+
149+
host.hostR.Mark(err)
150+
}

hostpool/hostpool_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package hostpool
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"testing"
7+
8+
"github.com/hailocab/go-hostpool"
9+
10+
"github.com/gocql/gocql"
11+
)
12+
13+
func TestHostPolicy_HostPool(t *testing.T) {
14+
policy := HostPoolHostPolicy(hostpool.New(nil))
15+
16+
//hosts := []*gocql.HostInfo{
17+
// {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
18+
// {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
19+
//}
20+
firstHost := &gocql.HostInfo{}
21+
firstHost.SetHostID("0")
22+
firstHost.SetConnectAddress(net.IPv4(10, 0, 0, 0))
23+
secHost := &gocql.HostInfo{}
24+
secHost.SetHostID("1")
25+
secHost.SetConnectAddress(net.IPv4(10, 0, 0, 1))
26+
hosts := []*gocql.HostInfo{firstHost, secHost}
27+
// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
28+
// which will result in an unpredictable ordering
29+
policy.SetHosts(hosts)
30+
31+
// the first host selected is actually at [1], but this is ok for RR
32+
// interleaved iteration should always increment the host
33+
iter := policy.Pick(nil)
34+
actualA := iter()
35+
if actualA.Info().HostID() != "0" {
36+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
37+
}
38+
actualA.Mark(nil)
39+
40+
actualB := iter()
41+
if actualB.Info().HostID() != "1" {
42+
t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
43+
}
44+
actualB.Mark(fmt.Errorf("error"))
45+
46+
actualC := iter()
47+
if actualC.Info().HostID() != "0" {
48+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
49+
}
50+
actualC.Mark(nil)
51+
52+
actualD := iter()
53+
if actualD.Info().HostID() != "0" {
54+
t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
55+
}
56+
actualD.Mark(nil)
57+
}

policies.go

Lines changed: 0 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import (
3636
"sync"
3737
"sync/atomic"
3838
"time"
39-
40-
"github.com/hailocab/go-hostpool"
4139
)
4240

4341
// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
@@ -690,147 +688,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
690688
}
691689
}
692690

693-
// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
694-
// to distribute queries between hosts and prevent sending queries to
695-
// unresponsive hosts. When creating the host pool that is passed to the policy
696-
// use an empty slice of hosts as the hostpool will be populated later by gocql.
697-
// See below for examples of usage:
698-
//
699-
// // Create host selection policy using a simple host pool
700-
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
701-
//
702-
// // Create host selection policy using an epsilon greedy pool
703-
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
704-
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
705-
// )
706-
func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
707-
return &hostPoolHostPolicy{hostMap: map[string]*HostInfo{}, hp: hp}
708-
}
709-
710-
type hostPoolHostPolicy struct {
711-
hp hostpool.HostPool
712-
mu sync.RWMutex
713-
hostMap map[string]*HostInfo
714-
}
715-
716-
func (r *hostPoolHostPolicy) Init(*Session) {}
717-
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
718-
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
719-
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }
720-
721-
func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
722-
peers := make([]string, len(hosts))
723-
hostMap := make(map[string]*HostInfo, len(hosts))
724-
725-
for i, host := range hosts {
726-
ip := host.ConnectAddress().String()
727-
peers[i] = ip
728-
hostMap[ip] = host
729-
}
730-
731-
r.mu.Lock()
732-
r.hp.SetHosts(peers)
733-
r.hostMap = hostMap
734-
r.mu.Unlock()
735-
}
736-
737-
func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
738-
ip := host.ConnectAddress().String()
739-
740-
r.mu.Lock()
741-
defer r.mu.Unlock()
742-
743-
// If the host addr is present and isn't nil return
744-
if h, ok := r.hostMap[ip]; ok && h != nil {
745-
return
746-
}
747-
// otherwise, add the host to the map
748-
r.hostMap[ip] = host
749-
// and construct a new peer list to give to the HostPool
750-
hosts := make([]string, 0, len(r.hostMap))
751-
for addr := range r.hostMap {
752-
hosts = append(hosts, addr)
753-
}
754-
755-
r.hp.SetHosts(hosts)
756-
}
757-
758-
func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
759-
ip := host.ConnectAddress().String()
760-
761-
r.mu.Lock()
762-
defer r.mu.Unlock()
763-
764-
if _, ok := r.hostMap[ip]; !ok {
765-
return
766-
}
767-
768-
delete(r.hostMap, ip)
769-
hosts := make([]string, 0, len(r.hostMap))
770-
for _, host := range r.hostMap {
771-
hosts = append(hosts, host.ConnectAddress().String())
772-
}
773-
774-
r.hp.SetHosts(hosts)
775-
}
776-
777-
func (r *hostPoolHostPolicy) HostUp(host *HostInfo) {
778-
r.AddHost(host)
779-
}
780-
781-
func (r *hostPoolHostPolicy) HostDown(host *HostInfo) {
782-
r.RemoveHost(host)
783-
}
784-
785-
func (r *hostPoolHostPolicy) Pick(qry ExecutableQuery) NextHost {
786-
return func() SelectedHost {
787-
r.mu.RLock()
788-
defer r.mu.RUnlock()
789-
790-
if len(r.hostMap) == 0 {
791-
return nil
792-
}
793-
794-
hostR := r.hp.Get()
795-
host, ok := r.hostMap[hostR.Host()]
796-
if !ok {
797-
return nil
798-
}
799-
800-
return selectedHostPoolHost{
801-
policy: r,
802-
info: host,
803-
hostR: hostR,
804-
}
805-
}
806-
}
807-
808-
// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
809-
// implements the SelectedHost interface
810-
type selectedHostPoolHost struct {
811-
policy *hostPoolHostPolicy
812-
info *HostInfo
813-
hostR hostpool.HostPoolResponse
814-
}
815-
816-
func (host selectedHostPoolHost) Info() *HostInfo {
817-
return host.info
818-
}
819-
820-
func (host selectedHostPoolHost) Mark(err error) {
821-
ip := host.info.ConnectAddress().String()
822-
823-
host.policy.mu.RLock()
824-
defer host.policy.mu.RUnlock()
825-
826-
if _, ok := host.policy.hostMap[ip]; !ok {
827-
// host was removed between pick and mark
828-
return
829-
}
830-
831-
host.hostR.Mark(err)
832-
}
833-
834691
type dcAwareRR struct {
835692
local string
836693
localHosts cowHostList

0 commit comments

Comments
 (0)