Skip to content

Commit 26c61ef

Browse files
Add LOAD_BALANCING_POLICY_SLOW_AVOIDANCE funtionality
The java driver has the feature to automatically avoid slow replicas by doing simple heuristics. This is one of the key feature to have a stable latency. This commit adds additional field in tokenAwareHostPolicy to control if the feature is enabled and what is the maximum in flight threshold. If feature is enabled driver sorts the replicas to first try those with less than specified maximum in flight connections. Fixes: #154
1 parent 3c32c6c commit 26c61ef

File tree

6 files changed

+79
-0
lines changed

6 files changed

+79
-0
lines changed

connectionpool.go

+20
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,17 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
211211
}
212212
}
213213

214+
func (p *policyConnPool) InFlight() int {
215+
p.mu.RLock()
216+
count := 0
217+
for _, pool := range p.hostConnPools {
218+
count += pool.InFlight()
219+
}
220+
p.mu.RUnlock()
221+
222+
return count
223+
}
224+
214225
func (p *policyConnPool) Size() int {
215226
p.mu.RLock()
216227
count := 0
@@ -348,6 +359,15 @@ func (pool *hostConnPool) Size() int {
348359
return size
349360
}
350361

362+
// Size returns the number of connections currently active in the pool
363+
func (pool *hostConnPool) InFlight() int {
364+
pool.mu.RLock()
365+
defer pool.mu.RUnlock()
366+
367+
size := pool.connPicker.InFlight()
368+
return size
369+
}
370+
351371
// Close the connection pool
352372
func (pool *hostConnPool) Close() {
353373
pool.mu.Lock()

connpicker.go

+10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type ConnPicker interface {
1010
Pick(token, string, string) *Conn
1111
Put(*Conn)
1212
Remove(conn *Conn)
13+
InFlight() int
1314
Size() (int, int)
1415
Close()
1516

@@ -60,6 +61,11 @@ func (p *defaultConnPicker) Close() {
6061
}
6162
}
6263

64+
func (p *defaultConnPicker) InFlight() int {
65+
size := len(p.conns)
66+
return size
67+
}
68+
6369
func (p *defaultConnPicker) Size() (int, int) {
6470
size := len(p.conns)
6571
return size, p.size - size
@@ -114,6 +120,10 @@ func (nopConnPicker) Put(*Conn) {
114120
func (nopConnPicker) Remove(conn *Conn) {
115121
}
116122

123+
func (nopConnPicker) InFlight() int {
124+
return 0
125+
}
126+
117127
func (nopConnPicker) Size() (int, int) {
118128
// Return 1 to make hostConnPool to try to establish a connection.
119129
// When first connection is established hostConnPool replaces nopConnPicker

host_source.go

+5
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool {
409409
return h != nil && h.State() == NodeUp
410410
}
411411

412+
func (h *HostInfo) IsBusy(s *Session) bool {
413+
pool, ok := s.pool.getPool(h)
414+
return ok && h != nil && pool.InFlight() >= MAX_IN_FLIGHT_THRESHOLD
415+
}
416+
412417
func (h *HostInfo) HostnameAndPort() string {
413418
h.mu.Lock()
414419
defer h.mu.Unlock()

internal/streams/streams.go

+4
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) {
145145
func (s *IDGenerator) Available() int {
146146
return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
147147
}
148+
149+
func (s *IDGenerator) InUse() int {
150+
return int(atomic.LoadInt32(&s.inuseStreams))
151+
}

policies.go

+30
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,17 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
391391
}
392392
}
393393

394+
// AvoidSlowReplicas enabled avoiding slow replicas
395+
//
396+
// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas
397+
// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD requests in flight
398+
func AvoidSlowReplicas(max_in_flight_threshold int) func(policy *tokenAwareHostPolicy) {
399+
return func(t *tokenAwareHostPolicy) {
400+
t.avoidSlowReplicas = true
401+
MAX_IN_FLIGHT_THRESHOLD = max_in_flight_threshold
402+
}
403+
}
404+
394405
// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
395406
//
396407
// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
@@ -424,6 +435,8 @@ type clusterMeta struct {
424435
tokenRing *tokenRing
425436
}
426437

438+
var MAX_IN_FLIGHT_THRESHOLD int = 10
439+
427440
type tokenAwareHostPolicy struct {
428441
fallback HostSelectionPolicy
429442
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
@@ -443,6 +456,8 @@ type tokenAwareHostPolicy struct {
443456

444457
// Experimental, this interface and use may change
445458
tablets cowTabletList
459+
460+
avoidSlowReplicas bool
446461
}
447462

448463
func (t *tokenAwareHostPolicy) Init(s *Session) {
@@ -687,6 +702,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
687702
}
688703
}
689704

705+
if s := qry.GetSession(); s != nil && t.avoidSlowReplicas {
706+
healthyReplicas := make([]*HostInfo, 0, len(replicas))
707+
unhealthyReplicas := make([]*HostInfo, 0, len(replicas))
708+
709+
for _, h := range replicas {
710+
if h.IsBusy(s) {
711+
unhealthyReplicas = append(unhealthyReplicas, h)
712+
} else {
713+
healthyReplicas = append(healthyReplicas, h)
714+
}
715+
}
716+
717+
replicas = append(healthyReplicas, unhealthyReplicas...)
718+
}
719+
690720
var (
691721
fallbackIter NextHost
692722
i, j, k int

scylla.go

+10
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,16 @@ func (p *scyllaConnPicker) Remove(conn *Conn) {
544544
}
545545
}
546546

547+
func (p *scyllaConnPicker) InFlight() int {
548+
result := 0
549+
for _, conn := range p.conns {
550+
if conn != nil {
551+
result = result + (conn.streams.InUse())
552+
}
553+
}
554+
return result
555+
}
556+
547557
func (p *scyllaConnPicker) Size() (int, int) {
548558
return p.nrConns, p.nrShards - p.nrConns
549559
}

0 commit comments

Comments
 (0)