Skip to content

Commit e078636

Browse files
committed
session: expose API to get host pool information
1 parent 426c16f commit e078636

File tree

6 files changed

+140
-19
lines changed

6 files changed

+140
-19
lines changed

connectionpool.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,16 @@ func (p *policyConnPool) getPoolByHostID(hostID string) (pool *hostConnPool, ok
203203
return
204204
}
205205

206+
func (p *policyConnPool) iteratePool(iter func(info HostPoolInfo) bool) {
207+
p.mu.RLock()
208+
for _, pool := range p.hostConnPools {
209+
if !iter(pool) {
210+
break
211+
}
212+
}
213+
p.mu.RUnlock()
214+
}
215+
206216
func (p *policyConnPool) Close() {
207217
p.mu.Lock()
208218
defer p.mu.Unlock()
@@ -264,12 +274,12 @@ type hostConnPool struct {
264274
filling bool
265275
}
266276

267-
func (h *hostConnPool) String() string {
268-
h.mu.RLock()
269-
defer h.mu.RUnlock()
270-
size, _ := h.connPicker.Size()
277+
func (pool *hostConnPool) String() string {
278+
pool.mu.RLock()
279+
defer pool.mu.RUnlock()
280+
size, _ := pool.connPicker.Size()
271281
return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
272-
h.filling, h.closed, size, h.size, h.host)
282+
pool.filling, pool.closed, size, pool.size, pool.host)
273283
}
274284

275285
func newHostConnPool(session *Session, host *HostInfo, port, size int,
@@ -587,3 +597,31 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
587597
pool.connPicker.Remove(conn)
588598
go pool.fill_debounce()
589599
}
600+
601+
func (pool *hostConnPool) GetConnectionCount() int {
602+
pool.mu.Lock()
603+
defer pool.mu.Unlock()
604+
return pool.connPicker.GetConnectionCount()
605+
}
606+
607+
func (pool *hostConnPool) GetExcessConnectionCount() int {
608+
pool.mu.Lock()
609+
defer pool.mu.Unlock()
610+
return pool.connPicker.GetExcessConnectionCount()
611+
}
612+
613+
func (pool *hostConnPool) GetShardCount() int {
614+
pool.mu.Lock()
615+
defer pool.mu.Unlock()
616+
return pool.connPicker.GetShardCount()
617+
}
618+
619+
func (pool *hostConnPool) Host() HostInformation {
620+
return pool.host
621+
}
622+
623+
func (pool *hostConnPool) IsClosed() bool {
624+
pool.mu.Lock()
625+
defer pool.mu.Unlock()
626+
return pool.closed
627+
}

connpicker.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ type ConnPicker interface {
1818
// nrShard specifies how many shards the host has.
1919
// If nrShards is zero, the caller shouldn't use shard-aware port.
2020
NextShard() (shardID, nrShards int)
21+
22+
GetConnectionCount() int
23+
GetExcessConnectionCount() int
24+
GetShardCount() int
2125
}
2226

2327
type defaultConnPicker struct {
@@ -27,6 +31,21 @@ type defaultConnPicker struct {
2731
mu sync.RWMutex
2832
}
2933

34+
func (p *defaultConnPicker) GetConnectionCount() int {
35+
p.mu.Lock()
36+
defer p.mu.Unlock()
37+
return len(p.conns)
38+
}
39+
40+
func (p *defaultConnPicker) GetExcessConnectionCount() int {
41+
return 0
42+
}
43+
44+
func (p *defaultConnPicker) GetShardCount() int {
45+
// It is not supposed to be used for scylla nodes and therefore does not know anything about shards count
46+
return 0
47+
}
48+
3049
func newDefaultConnPicker(size int) *defaultConnPicker {
3150
if size <= 0 {
3251
panic(fmt.Sprintf("invalid pool size %d", size))
@@ -111,6 +130,18 @@ func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
111130
// to the point where we have first connection.
112131
type nopConnPicker struct{}
113132

133+
func (p nopConnPicker) GetConnectionCount() int {
134+
return 0
135+
}
136+
137+
func (p nopConnPicker) GetExcessConnectionCount() int {
138+
return 0
139+
}
140+
141+
func (p nopConnPicker) GetShardCount() int {
142+
return 0
143+
}
144+
114145
func (nopConnPicker) Pick(Token, ExecutableQuery) *Conn {
115146
return nil
116147
}

host_source.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ type HostInfo struct {
176176
state nodeState
177177
scyllaShardAwarePort uint16
178178
scyllaShardAwarePortTLS uint16
179+
scyllaShardCount int
179180
graph bool
180181
}
181182

@@ -488,8 +489,10 @@ func (h *HostInfo) String() string {
488489
func (h *HostInfo) setScyllaSupported(s scyllaSupported) {
489490
h.mu.Lock()
490491
defer h.mu.Unlock()
492+
h.partitioner = s.partitioner
491493
h.scyllaShardAwarePort = s.shardAwarePort
492494
h.scyllaShardAwarePortTLS = s.shardAwarePortSSL
495+
h.scyllaShardCount = s.nrShards
493496
}
494497

495498
// ScyllaShardAwarePort returns the shard aware port of this host.
@@ -508,6 +511,13 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
508511
return h.scyllaShardAwarePortTLS
509512
}
510513

514+
// ScyllaShardCount returns count of shards on the node.
515+
func (h *HostInfo) ScyllaShardCount() int {
516+
h.mu.RLock()
517+
defer h.mu.RUnlock()
518+
return h.scyllaShardCount
519+
}
520+
511521
// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
512522
func checkSystemSchema(control controlConnection) (bool, error) {
513523
iter := control.querySystem("SELECT * FROM system_schema.keyspaces")

integration_only.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,6 @@ package gocql
88

99
import "fmt"
1010

11-
func (pool *hostConnPool) MissingConnections() (int, error) {
12-
pool.mu.Lock()
13-
defer pool.mu.Unlock()
14-
15-
if pool.closed {
16-
return 0, fmt.Errorf("pool is closed")
17-
}
18-
_, missing := pool.connPicker.Size()
19-
return missing, nil
20-
}
21-
2211
func (p *policyConnPool) MissingConnections() (int, error) {
2312
p.mu.Lock()
2413
defer p.mu.Unlock()
@@ -27,9 +16,9 @@ func (p *policyConnPool) MissingConnections() (int, error) {
2716

2817
// close the pools
2918
for _, pool := range p.hostConnPools {
30-
missing, err := pool.MissingConnections()
31-
if err != nil {
32-
return 0, err
19+
missing := pool.GetShardCount() - pool.GetConnectionCount()
20+
if pool.IsClosed() {
21+
return 0, fmt.Errorf("pool for %s is closed", pool.host.HostID())
3322
}
3423
total += missing
3524
}

scylla.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,18 @@ func (p *scyllaConnPicker) shouldCloseExcessConns() bool {
518518
return len(p.excessConns) > int(p.excessConnsLimitRate*float32(p.nrShards))
519519
}
520520

521+
func (p *scyllaConnPicker) GetConnectionCount() int {
522+
return p.nrConns
523+
}
524+
525+
func (p *scyllaConnPicker) GetExcessConnectionCount() int {
526+
return len(p.excessConns)
527+
}
528+
529+
func (p *scyllaConnPicker) GetShardCount() int {
530+
return p.nrShards
531+
}
532+
521533
func (p *scyllaConnPicker) Remove(conn *Conn) {
522534
shard := conn.scyllaSupported.shard
523535

session.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2425,6 +2425,47 @@ func (s *Session) GetHosts() []*HostInfo {
24252425
return s.hostSource.getHostsList()
24262426
}
24272427

2428+
type HostInformation interface {
2429+
Peer() net.IP
2430+
ConnectAddress() net.IP
2431+
UntranslatedConnectAddress() net.IP
2432+
BroadcastAddress() net.IP
2433+
ListenAddress() net.IP
2434+
RPCAddress() net.IP
2435+
PreferredIP() net.IP
2436+
DataCenter() string
2437+
Rack() string
2438+
HostID() string
2439+
WorkLoad() string
2440+
Partitioner() string
2441+
ClusterName() string
2442+
Tokens() []string
2443+
Port() int
2444+
IsUp() bool
2445+
ScyllaShardAwarePort() uint16
2446+
ScyllaShardAwarePortTLS() uint16
2447+
ScyllaShardCount() int
2448+
}
2449+
2450+
type HostPoolInfo interface {
2451+
GetConnectionCount() int
2452+
GetExcessConnectionCount() int
2453+
GetShardCount() int
2454+
String() string
2455+
InFlight() int
2456+
Host() HostInformation
2457+
IsClosed() bool
2458+
}
2459+
2460+
func (s *Session) GetHostPoolByID(hostID string) HostPoolInfo {
2461+
hostPool, _ := s.pool.getPoolByHostID(hostID)
2462+
return hostPool
2463+
}
2464+
2465+
func (s *Session) IterateHostPools(iter func(info HostPoolInfo) bool) {
2466+
s.pool.iteratePool(iter)
2467+
}
2468+
24282469
type ObservedQuery struct {
24292470
// Start is a time when the query was attempted
24302471
Start time.Time

0 commit comments

Comments
 (0)