Skip to content
This repository was archived by the owner on Jul 21, 2025. It is now read-only.

Commit f4741ae

Browse files
committed
transport: return errors on missing connections
1 parent 42ee7d6 commit f4741ae

File tree

5 files changed

+56
-21
lines changed

5 files changed

+56
-21
lines changed

query.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ func (q *Query) Exec(ctx context.Context) (Result, error) {
3131
for n != nil {
3232
sameNodeRetries:
3333
for {
34-
conn := n.Conn(info)
34+
conn, err := n.Conn(info)
35+
if err != nil {
36+
lastErr = err
37+
break sameNodeRetries
38+
}
39+
3540
res, err := q.exec(ctx, conn, q.stmt, nil)
3641
if err != nil {
3742
ri := transport.RetryInfo{
@@ -60,15 +65,19 @@ func (q *Query) Exec(ctx context.Context) (Result, error) {
6065
i++
6166
n = q.session.cfg.HostSelectionPolicy.Node(info, i)
6267
}
68+
69+
if lastErr == nil {
70+
return Result{}, ErrNoConnection
71+
}
6372
return Result{}, lastErr
6473
}
6574

6675
func (q *Query) pickConn(qi transport.QueryInfo) (*transport.Conn, error) {
6776
n := q.session.cfg.HostSelectionPolicy.Node(qi, 0)
6877

69-
conn := n.Conn(qi)
70-
if conn == nil {
71-
return nil, errNoConnection
78+
conn, err := n.Conn(qi)
79+
if err != nil {
80+
return nil, ErrNoConnection
7281
}
7382

7483
return conn, nil
@@ -281,6 +290,7 @@ type iterWorker struct {
281290
pickNode func(transport.QueryInfo, int) *transport.Node
282291
nodeIdx int
283292
conn *transport.Conn
293+
connErr error
284294

285295
rd transport.RetryDecider
286296

@@ -295,7 +305,7 @@ func (w *iterWorker) loop(ctx context.Context) {
295305
w.errCh <- fmt.Errorf("can't pick a node to execute request")
296306
return
297307
}
298-
w.conn = n.Conn(w.queryInfo)
308+
w.conn, w.connErr = n.Conn(w.queryInfo)
299309

300310
for {
301311
_, ok := <-w.requestCh
@@ -324,6 +334,10 @@ func (w *iterWorker) exec(ctx context.Context) (transport.QueryResult, error) {
324334
for {
325335
sameNodeRetries:
326336
for {
337+
if w.connErr != nil {
338+
lastErr = w.connErr
339+
break
340+
}
327341
res, err := w.queryExec(ctx, w.conn, w.stmt, w.pagingState)
328342
if err != nil {
329343
ri := transport.RetryInfo{
@@ -349,9 +363,12 @@ func (w *iterWorker) exec(ctx context.Context) (transport.QueryResult, error) {
349363
w.nodeIdx++
350364
n := w.pickNode(w.queryInfo, w.nodeIdx)
351365
if n == nil {
366+
if lastErr == nil {
367+
return transport.QueryResult{}, ErrNoConnection
368+
}
352369
return transport.QueryResult{}, lastErr
353370
}
354371

355-
w.conn = n.Conn(w.queryInfo)
372+
w.conn, w.connErr = n.Conn(w.queryInfo)
356373
}
357374
}

session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var (
5656
"SERIAL Consistency = 0x0008\n" +
5757
"LOCALSERIAL Consistency = 0x0009\n" +
5858
"LOCALONE Consistency = 0x000A")
59-
errNoConnection = fmt.Errorf("no working connection")
59+
ErrNoConnection = fmt.Errorf("no connection to execute the query on")
6060
)
6161

6262
type Compression = frame.Compression

transport/node.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ func (n *Node) Close() {
5353
n.setStatus(statusDown)
5454
}
5555

56-
func (n *Node) LeastBusyConn() *Conn {
56+
func (n *Node) LeastBusyConn() (*Conn, error) {
5757
return n.pool.LeastBusyConn()
5858
}
59-
func (n *Node) Conn(qi QueryInfo) *Conn {
59+
func (n *Node) Conn(qi QueryInfo) (*Conn, error) {
6060
if qi.tokenAware {
6161
return n.pool.Conn(qi.token)
6262
}
@@ -65,7 +65,11 @@ func (n *Node) Conn(qi QueryInfo) *Conn {
6565
}
6666

6767
func (n *Node) Prepare(ctx context.Context, s Statement) (Statement, error) {
68-
return n.LeastBusyConn().Prepare(ctx, s)
68+
conn, err := n.LeastBusyConn()
69+
if err != nil {
70+
return Statement{}, err
71+
}
72+
return conn.Prepare(ctx, s)
6973
}
7074

7175
var versionQuery = Statement{
@@ -74,7 +78,11 @@ var versionQuery = Statement{
7478
}
7579

7680
func (n *Node) FetchSchemaVersion(ctx context.Context) (frame.UUID, error) {
77-
conn := n.LeastBusyConn()
81+
conn, err := n.LeastBusyConn()
82+
if err != nil {
83+
return frame.UUID{}, err
84+
}
85+
7886
res, err := conn.Query(ctx, versionQuery, nil)
7987
if err != nil {
8088
return frame.UUID{}, err

transport/pool.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ func (p *ConnPool) String() string {
4141
return fmt.Sprintf("pool %s [shards=%d]", p.host, p.nrShards)
4242
}
4343

44-
func (p *ConnPool) Conn(token Token) *Conn {
44+
func (p *ConnPool) Conn(token Token) (*Conn, error) {
4545
idx := p.shardOf(token)
4646
if conn := p.loadConn(idx); conn != nil {
4747
if isHeavyLoaded(conn) {
48-
return p.maybeReplaceWithLessBusyConn(conn)
48+
return p.maybeReplaceWithLessBusyConn(conn), nil
4949
}
50-
return conn
50+
return conn, nil
5151
}
5252
return p.LeastBusyConn()
5353
}
@@ -57,7 +57,7 @@ func isHeavyLoaded(conn *Conn) bool {
5757
}
5858

5959
func (p *ConnPool) maybeReplaceWithLessBusyConn(conn *Conn) *Conn {
60-
if lb := p.LeastBusyConn(); conn.Waiting()-lb.Waiting() > maxStreamID<<1/10 {
60+
if lb, err := p.LeastBusyConn(); err == nil && conn.Waiting()-lb.Waiting() > maxStreamID<<1/10 {
6161
if p.connObs != nil {
6262
p.connObs.OnPickReplacedWithLessBusyConn(conn.Event())
6363
}
@@ -66,7 +66,7 @@ func (p *ConnPool) maybeReplaceWithLessBusyConn(conn *Conn) *Conn {
6666
return conn
6767
}
6868

69-
func (p *ConnPool) LeastBusyConn() *Conn {
69+
func (p *ConnPool) LeastBusyConn() (*Conn, error) {
7070
var (
7171
leastBusyConn *Conn
7272
minBusy = maxStreamID + 2 // adding 2 more is required due to atomics
@@ -80,7 +80,11 @@ func (p *ConnPool) LeastBusyConn() *Conn {
8080
}
8181
}
8282
}
83-
return leastBusyConn
83+
84+
if leastBusyConn == nil {
85+
return nil, fmt.Errorf("no connections available for host %s", p.host)
86+
}
87+
return leastBusyConn, nil
8488
}
8589

8690
func (p *ConnPool) shardOf(token Token) int {

transport/pool_integration_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,27 @@ func TestConnPoolConnIntegration(t *testing.T) {
7272
defer p.Close()
7373

7474
t0 := MurmurToken([]byte(""))
75-
if conn := p.Conn(t0); conn == nil || conn.Shard() != 0 {
75+
if conn, err := p.Conn(t0); err != nil || conn.Shard() != 0 {
7676
t.Fatal("invalid return of Conn")
7777
}
7878

7979
load := uint32(math.Floor(maxStreamID/2 + 1))
80-
p.Conn(t0).stats.inQueue.Store(load)
8180

82-
if conn := p.Conn(t0); conn == nil {
81+
conn, err := p.Conn(t0)
82+
if err != nil {
83+
t.Fatal("invalid return of Conn")
84+
}
85+
86+
conn.stats.inQueue.Store(load)
87+
88+
if conn, err := p.Conn(t0); err != nil {
8389
t.Fatal("invalid return of Conn")
8490
} else if conn.Shard() == 0 {
8591
t.Fatalf("invalid load distribution")
8692
}
8793

8894
t1 := MurmurToken([]byte("0")) // Very big number approx. 3 * 10^18.
89-
if conn := p.Conn(t1); conn == nil {
95+
if _, err := p.Conn(t1); err != nil {
9096
t.Fatal("invalid return of Conn")
9197
}
9298
}

0 commit comments

Comments
 (0)