Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Keep nil slices in MapScan (CASSGO-44)

- Improve error messages for marshalling (CASSGO-38)

- Remove HostPoolHostPolicy from gocql package (CASSGO-21)

### Fixed

- Retry policy now takes into account query idempotency (CASSGO-27)
Expand Down
150 changes: 150 additions & 0 deletions hostpool/hostpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package hostpool

import (
"sync"

"github.com/hailocab/go-hostpool"

"github.com/gocql/gocql"
)

// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
// to distribute queries between hosts and prevent sending queries to
// unresponsive hosts. When creating the host pool that is passed to the policy
// use an empty slice of hosts as the hostpool will be populated later by gocql.
// See below for examples of usage:
//
// // Create host selection policy using a simple host pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
//
// // Create host selection policy using an epsilon greedy pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
// )
func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy {
return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp}
}

type hostPoolHostPolicy struct {
hp hostpool.HostPool
mu sync.RWMutex
hostMap map[string]*gocql.HostInfo
}

func (r *hostPoolHostPolicy) Init(*gocql.Session) {}
func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true }

func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*gocql.HostInfo, len(hosts))

for i, host := range hosts {
ip := host.ConnectAddress().String()
peers[i] = ip
hostMap[ip] = host
}

r.mu.Lock()
r.hp.SetHosts(peers)
r.hostMap = hostMap
r.mu.Unlock()
}

func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

// If the host addr is present and isn't nil return
if h, ok := r.hostMap[ip]; ok && h != nil {
return
}
// otherwise, add the host to the map
r.hostMap[ip] = host
// and construct a new peer list to give to the HostPool
hosts := make([]string, 0, len(r.hostMap))
for addr := range r.hostMap {
hosts = append(hosts, addr)
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.hostMap[ip]; !ok {
return
}

delete(r.hostMap, ip)
hosts := make([]string, 0, len(r.hostMap))
for _, host := range r.hostMap {
hosts = append(hosts, host.ConnectAddress().String())
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) {
r.AddHost(host)
}

func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) {
r.RemoveHost(host)
}

func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost {
return func() gocql.SelectedHost {
r.mu.RLock()
defer r.mu.RUnlock()

if len(r.hostMap) == 0 {
return nil
}

hostR := r.hp.Get()
host, ok := r.hostMap[hostR.Host()]
if !ok {
return nil
}

return selectedHostPoolHost{
policy: r,
info: host,
hostR: hostR,
}
}
}

// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
// implements the SelectedHost interface
type selectedHostPoolHost struct {
policy *hostPoolHostPolicy
info *gocql.HostInfo
hostR hostpool.HostPoolResponse
}

func (host selectedHostPoolHost) Info() *gocql.HostInfo {
return host.info
}

func (host selectedHostPoolHost) Mark(err error) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

if _, ok := host.policy.hostMap[ip]; !ok {
// host was removed between pick and mark
return
}

host.hostR.Mark(err)
}
57 changes: 57 additions & 0 deletions hostpool/hostpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package hostpool

import (
"fmt"
"net"
"testing"

"github.com/hailocab/go-hostpool"

"github.com/gocql/gocql"
)

func TestHostPolicy_HostPool(t *testing.T) {
policy := HostPoolHostPolicy(hostpool.New(nil))

//hosts := []*gocql.HostInfo{
// {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
// {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
//}
firstHost := &gocql.HostInfo{}
firstHost.SetHostID("0")
firstHost.SetConnectAddress(net.IPv4(10, 0, 0, 0))
secHost := &gocql.HostInfo{}
secHost.SetHostID("1")
secHost.SetConnectAddress(net.IPv4(10, 0, 0, 1))
hosts := []*gocql.HostInfo{firstHost, secHost}
// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
// which will result in an unpredictable ordering
policy.SetHosts(hosts)

// the first host selected is actually at [1], but this is ok for RR
// interleaved iteration should always increment the host
iter := policy.Pick(nil)
actualA := iter()
if actualA.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
}
actualA.Mark(nil)

actualB := iter()
if actualB.Info().HostID() != "1" {
t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
}
actualB.Mark(fmt.Errorf("error"))

actualC := iter()
if actualC.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
}
actualC.Mark(nil)

actualD := iter()
if actualD.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
}
actualD.Mark(nil)
}
143 changes: 0 additions & 143 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/hailocab/go-hostpool"
)

// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
Expand Down Expand Up @@ -690,147 +688,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
// to distribute queries between hosts and prevent sending queries to
// unresponsive hosts. When creating the host pool that is passed to the policy
// use an empty slice of hosts as the hostpool will be populated later by gocql.
// See below for examples of usage:
//
// // Create host selection policy using a simple host pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
//
// // Create host selection policy using an epsilon greedy pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
// )
func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
return &hostPoolHostPolicy{hostMap: map[string]*HostInfo{}, hp: hp}
}

type hostPoolHostPolicy struct {
hp hostpool.HostPool
mu sync.RWMutex
hostMap map[string]*HostInfo
}

func (r *hostPoolHostPolicy) Init(*Session) {}
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }

func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*HostInfo, len(hosts))

for i, host := range hosts {
ip := host.ConnectAddress().String()
peers[i] = ip
hostMap[ip] = host
}

r.mu.Lock()
r.hp.SetHosts(peers)
r.hostMap = hostMap
r.mu.Unlock()
}

func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

// If the host addr is present and isn't nil return
if h, ok := r.hostMap[ip]; ok && h != nil {
return
}
// otherwise, add the host to the map
r.hostMap[ip] = host
// and construct a new peer list to give to the HostPool
hosts := make([]string, 0, len(r.hostMap))
for addr := range r.hostMap {
hosts = append(hosts, addr)
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.hostMap[ip]; !ok {
return
}

delete(r.hostMap, ip)
hosts := make([]string, 0, len(r.hostMap))
for _, host := range r.hostMap {
hosts = append(hosts, host.ConnectAddress().String())
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) HostUp(host *HostInfo) {
r.AddHost(host)
}

func (r *hostPoolHostPolicy) HostDown(host *HostInfo) {
r.RemoveHost(host)
}

func (r *hostPoolHostPolicy) Pick(qry ExecutableQuery) NextHost {
return func() SelectedHost {
r.mu.RLock()
defer r.mu.RUnlock()

if len(r.hostMap) == 0 {
return nil
}

hostR := r.hp.Get()
host, ok := r.hostMap[hostR.Host()]
if !ok {
return nil
}

return selectedHostPoolHost{
policy: r,
info: host,
hostR: hostR,
}
}
}

// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
// implements the SelectedHost interface
type selectedHostPoolHost struct {
policy *hostPoolHostPolicy
info *HostInfo
hostR hostpool.HostPoolResponse
}

func (host selectedHostPoolHost) Info() *HostInfo {
return host.info
}

func (host selectedHostPoolHost) Mark(err error) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

if _, ok := host.policy.hostMap[ip]; !ok {
// host was removed between pick and mark
return
}

host.hostR.Mark(err)
}

type dcAwareRR struct {
local string
localHosts cowHostList
Expand Down
Loading