Skip to content

Commit 962d4d0

Browse files
committed
Endless query execution fix
1 parent c75ff5f commit 962d4d0

File tree

4 files changed

+16
-12
lines changed

4 files changed

+16
-12
lines changed

events_ccm_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestEventNodeDownControl(t *testing.T) {
104104
}
105105
session.pool.mu.RUnlock()
106106

107-
host := session.ring.getHost(node.Addr)
107+
host, _ := session.ring.getHost(node.Addr)
108108
if host == nil {
109109
t.Fatal("node not in metadata ring")
110110
} else if host.IsUp() {
@@ -146,7 +146,7 @@ func TestEventNodeDown(t *testing.T) {
146146
t.Fatal("node not removed after remove event")
147147
}
148148

149-
host := session.ring.getHost(node.Addr)
149+
host, _ := session.ring.getHost(node.Addr)
150150
if host == nil {
151151
t.Fatal("node not in metadata ring")
152152
} else if host.IsUp() {
@@ -203,7 +203,7 @@ func TestEventNodeUp(t *testing.T) {
203203
t.Fatal("node not added after node added event")
204204
}
205205

206-
host := session.ring.getHost(node.Addr)
206+
host, _ := session.ring.getHost(node.Addr)
207207
if host == nil {
208208
t.Fatal("node not in metadata ring")
209209
} else if !host.IsUp() {

policies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ func (host *selectedHost) Info() *HostInfo {
323323
func (host *selectedHost) Mark(err error) {}
324324

325325
// NextHost is an iteration function over picked hosts
326+
// Should return nil eventually to prevent endless query execution.
326327
type NextHost func() SelectedHost
327328

328329
// RoundRobinHostPolicy is a round-robin load balancing policy, where each host

query_executor.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package gocql
2727
import (
2828
"context"
2929
"sync"
30+
"sync/atomic"
3031
"time"
3132
)
3233

@@ -89,14 +90,16 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
8990
// check if the host id is specified for the query,
9091
// if it is, the query should be executed at the corresponding host.
9192
if hostID := qry.GetHostID(); hostID != "" {
93+
host, ok := q.pool.session.ring.getHost(hostID)
94+
if !ok {
95+
return nil, ErrNoConnections
96+
}
97+
var returnedHostOnce int32 = 0
9298
hostIter = func() SelectedHost {
93-
pool, ok := q.pool.getPoolByHostID(hostID)
94-
// if the specified host is down
95-
// we return nil to avoid endless query execution in queryExecutor.do()
96-
if !ok || !pool.host.IsUp() {
97-
return nil
99+
if atomic.CompareAndSwapInt32(&returnedHostOnce, 0, 1) {
100+
return (*selectedHost)(host)
98101
}
99-
return (*selectedHost)(pool.host)
102+
return nil
100103
}
101104
}
102105

ring.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ func (r *ring) getHostByIP(ip string) (*HostInfo, bool) {
6767
return r.hosts[hi], ok
6868
}
6969

70-
func (r *ring) getHost(hostID string) *HostInfo {
70+
func (r *ring) getHost(hostID string) (host *HostInfo, ok bool) {
7171
r.mu.RLock()
72-
host := r.hosts[hostID]
72+
host, ok = r.hosts[hostID]
7373
r.mu.RUnlock()
74-
return host
74+
return
7575
}
7676

7777
func (r *ring) allHosts() []*HostInfo {

0 commit comments

Comments
 (0)