Skip to content

Commit 56b34af

Browse files
author
Maksim Konovalov
committed
feature/balancing-faces: add external load balancing methods support
I added the ability to use custom balancing methods when connecting with a pool.
1 parent 8b2be01 commit 56b34af

File tree

5 files changed

+43
-20
lines changed

5 files changed

+43
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1414
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
17+
- Add support for external load balancing methods when connecting via Pool (#400).
1718

1819
### Changed
1920

pool/balancer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package pool
2+
3+
import "github.com/tarantool/go-tarantool/v2"
4+
5+
type BalancingMethod = func(int) BalancingPool
6+
7+
type BalancingPool interface {
8+
IsEmpty() bool
9+
GetConnection(string) *tarantool.Connection
10+
DeleteConnection(string) *tarantool.Connection
11+
AddConnection(id string, conn *tarantool.Connection)
12+
GetNextConnection() *tarantool.Connection
13+
GetConnections() map[string]*tarantool.Connection
14+
}

pool/connection_pool.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ type Opts struct {
8181
CheckTimeout time.Duration
8282
// ConnectionHandler provides an ability to handle connection updates.
8383
ConnectionHandler ConnectionHandler
84+
// BalancingMethod is how connections for the request will be selected
85+
BalancingMethod BalancingMethod
8486
}
8587

8688
/*
@@ -110,9 +112,9 @@ type ConnectionPool struct {
110112

111113
state state
112114
done chan struct{}
113-
roPool *roundRobinStrategy
114-
rwPool *roundRobinStrategy
115-
anyPool *roundRobinStrategy
115+
roPool BalancingPool
116+
rwPool BalancingPool
117+
anyPool BalancingPool
116118
poolsMutex sync.RWMutex
117119
watcherContainer watcherContainer
118120
}
@@ -153,6 +155,10 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
153155
// opts. Instances must have unique names.
154156
func ConnectWithOpts(ctx context.Context, instances []Instance,
155157
opts Opts) (*ConnectionPool, error) {
158+
if opts.BalancingMethod == nil {
159+
opts.BalancingMethod = NewRoundRobinStrategy
160+
}
161+
156162
unique := make(map[string]bool)
157163
for _, instance := range instances {
158164
if _, ok := unique[instance.Name]; ok {
@@ -166,9 +172,9 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
166172
}
167173

168174
size := len(instances)
169-
rwPool := newRoundRobinStrategy(size)
170-
roPool := newRoundRobinStrategy(size)
171-
anyPool := newRoundRobinStrategy(size)
175+
rwPool := opts.BalancingMethod(size)
176+
roPool := opts.BalancingMethod(size)
177+
anyPool := opts.BalancingMethod(size)
172178

173179
connPool := &ConnectionPool{
174180
ends: make(map[string]*endpoint),

pool/round_robin.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,26 @@ import (
77
"github.com/tarantool/go-tarantool/v2"
88
)
99

10-
type roundRobinStrategy struct {
10+
var _ BalancingPool = (*RoundRobinStrategy)(nil)
11+
12+
type RoundRobinStrategy struct {
1113
conns []*tarantool.Connection
1214
indexById map[string]uint
1315
mutex sync.RWMutex
1416
size uint64
1517
current uint64
1618
}
1719

18-
func newRoundRobinStrategy(size int) *roundRobinStrategy {
19-
return &roundRobinStrategy{
20+
func NewRoundRobinStrategy(size int) BalancingPool {
21+
return &RoundRobinStrategy{
2022
conns: make([]*tarantool.Connection, 0, size),
2123
indexById: make(map[string]uint, size),
2224
size: 0,
2325
current: 0,
2426
}
2527
}
2628

27-
func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection {
29+
func (r *RoundRobinStrategy) GetConnection(id string) *tarantool.Connection {
2830
r.mutex.RLock()
2931
defer r.mutex.RUnlock()
3032

@@ -36,7 +38,7 @@ func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection {
3638
return r.conns[index]
3739
}
3840

39-
func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
41+
func (r *RoundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
4042
r.mutex.Lock()
4143
defer r.mutex.Unlock()
4244

@@ -64,14 +66,14 @@ func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
6466
return conn
6567
}
6668

67-
func (r *roundRobinStrategy) IsEmpty() bool {
69+
func (r *RoundRobinStrategy) IsEmpty() bool {
6870
r.mutex.RLock()
6971
defer r.mutex.RUnlock()
7072

7173
return r.size == 0
7274
}
7375

74-
func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection {
76+
func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection {
7577
r.mutex.RLock()
7678
defer r.mutex.RUnlock()
7779

@@ -81,7 +83,7 @@ func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection {
8183
return r.conns[r.nextIndex()]
8284
}
8385

84-
func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
86+
func (r *RoundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
8587
r.mutex.RLock()
8688
defer r.mutex.RUnlock()
8789

@@ -93,7 +95,7 @@ func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
9395
return conns
9496
}
9597

96-
func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) {
98+
func (r *RoundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) {
9799
r.mutex.Lock()
98100
defer r.mutex.Unlock()
99101

@@ -106,7 +108,7 @@ func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection
106108
}
107109
}
108110

109-
func (r *roundRobinStrategy) nextIndex() uint64 {
111+
func (r *RoundRobinStrategy) nextIndex() uint64 {
110112
next := atomic.AddUint64(&r.current, 1)
111113
return (next - 1) % r.size
112114
}

pool/round_robin_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const (
1212
)
1313

1414
func TestRoundRobinAddDelete(t *testing.T) {
15-
rr := newRoundRobinStrategy(10)
15+
rr := NewRoundRobinStrategy(10)
1616

1717
addrs := []string{validAddr1, validAddr2}
1818
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}
@@ -32,7 +32,7 @@ func TestRoundRobinAddDelete(t *testing.T) {
3232
}
3333

3434
func TestRoundRobinAddDuplicateDelete(t *testing.T) {
35-
rr := newRoundRobinStrategy(10)
35+
rr := NewRoundRobinStrategy(10)
3636

3737
conn1 := &tarantool.Connection{}
3838
conn2 := &tarantool.Connection{}
@@ -52,7 +52,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) {
5252
}
5353

5454
func TestRoundRobinGetNextConnection(t *testing.T) {
55-
rr := newRoundRobinStrategy(10)
55+
rr := NewRoundRobinStrategy(10)
5656

5757
addrs := []string{validAddr1, validAddr2}
5858
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}
@@ -70,7 +70,7 @@ func TestRoundRobinGetNextConnection(t *testing.T) {
7070
}
7171

7272
func TestRoundRobinStrategy_GetConnections(t *testing.T) {
73-
rr := newRoundRobinStrategy(10)
73+
rr := NewRoundRobinStrategy(10)
7474

7575
addrs := []string{validAddr1, validAddr2}
7676
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}

0 commit comments

Comments
 (0)