Skip to content

Commit 63c6f5d

Browse files
committed
SetHost API for Query
Query.SetHost() allows users to specify on which node the Query will be executed. It is not a tipycal use case, but it makes sense with virtual tables which are available since C* 5.0.0. Patch by Bohdan Siryk; Reviewed by João Reis for CASSGO-4
1 parent 37030fb commit 63c6f5d

File tree

4 files changed

+103
-6
lines changed

4 files changed

+103
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
### Added
1010

11+
- SetHost API for Query (CASSGO-4)
12+
1113
### Changed
1214

1315
- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)

cassandra_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"context"
3333
"errors"
3434
"fmt"
35-
"github.com/stretchr/testify/require"
3635
"io"
3736
"math"
3837
"math/big"
@@ -45,6 +44,8 @@ import (
4544
"time"
4645
"unicode"
4746

47+
"github.com/stretchr/testify/require"
48+
4849
"gopkg.in/inf.v0"
4950
)
5051

@@ -3303,7 +3304,6 @@ func TestUnsetColBatch(t *testing.T) {
33033304
}
33043305
var id, mInt, count int
33053306
var mText string
3306-
33073307
if err := session.Query("SELECT count(*) FROM gocql_test.batchUnsetInsert;").Scan(&count); err != nil {
33083308
t.Fatalf("Failed to select with err: %v", err)
33093309
} else if count != 2 {
@@ -3338,3 +3338,46 @@ func TestQuery_NamedValues(t *testing.T) {
33383338
t.Fatal(err)
33393339
}
33403340
}
3341+
3342+
func TestQuery_SetHost(t *testing.T) {
3343+
// This test ensures that queries are sent to the specified host only
3344+
3345+
session := createSession(t)
3346+
defer session.Close()
3347+
3348+
hosts, err := session.GetHosts()
3349+
if err != nil {
3350+
t.Fatal(err)
3351+
}
3352+
3353+
for _, expectedHost := range hosts {
3354+
const iterations = 5
3355+
for i := 0; i < iterations; i++ {
3356+
var actualHostID string
3357+
err := session.Query("SELECT host_id FROM system.local").
3358+
SetHost(expectedHost).
3359+
Scan(&actualHostID)
3360+
if err != nil {
3361+
t.Fatal(err)
3362+
}
3363+
3364+
if expectedHost.HostID() != actualHostID {
3365+
t.Fatalf("Expected query to be executed on host %s, but it was executed on %s",
3366+
expectedHost.HostID(),
3367+
actualHostID,
3368+
)
3369+
}
3370+
}
3371+
}
3372+
3373+
// ensuring that the driver properly handles the case
3374+
// when specified host for the query is down
3375+
host := hosts[0]
3376+
host.state = NodeDown
3377+
err = session.Query("SELECT host_id FROM system.local").
3378+
SetHost(host).
3379+
Exec()
3380+
if !errors.Is(err, ErrNoConnections) {
3381+
t.Fatalf("Expected error to be: %v, but got %v", ErrNoConnections, err)
3382+
}
3383+
}

query_executor.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ExecutableQuery interface {
4141
Keyspace() string
4242
Table() string
4343
IsIdempotent() bool
44+
GetHost() *HostInfo
4445

4546
withContext(context.Context) ExecutableQuery
4647

@@ -83,12 +84,27 @@ func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp S
8384
}
8485

8586
func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
86-
hostIter := q.policy.Pick(qry)
87+
var hostIter NextHost
88+
89+
// checking if the host is specified for the query,
90+
// if it is, the query should be executed at the specified host
91+
host := qry.GetHost()
92+
if host != nil {
93+
hostIter = func() SelectedHost {
94+
return (*selectedHost)(host)
95+
}
96+
}
97+
98+
// if host is not specified for the query,
99+
// then a host will be picked by HostSelectionPolicy
100+
if hostIter == nil {
101+
hostIter = q.policy.Pick(qry)
102+
}
87103

88104
// check if the query is not marked as idempotent, if
89105
// it is, we force the policy to NonSpeculative
90106
sp := qry.speculativeExecutionPolicy()
91-
if !qry.IsIdempotent() || sp.Attempts() == 0 {
107+
if host != nil || !qry.IsIdempotent() || sp.Attempts() == 0 {
92108
return q.do(qry.Context(), qry, hostIter), nil
93109
}
94110

@@ -129,12 +145,17 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
129145
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
130146
selectedHost := hostIter()
131147
rt := qry.retryPolicy()
148+
specifiedHost := qry.GetHost()
132149

133150
var lastErr error
134151
var iter *Iter
135152
for selectedHost != nil {
136153
host := selectedHost.Info()
137-
if host == nil || !host.IsUp() {
154+
if specifiedHost != nil && host != nil && !host.IsUp() {
155+
return &Iter{err: ErrNoConnections}
156+
}
157+
158+
if (host == nil || !host.IsUp()) && specifiedHost == nil {
138159
selectedHost = hostIter()
139160
continue
140161
}
@@ -166,7 +187,9 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
166187

167188
// Exit if the query was successful
168189
// or query is not idempotent or no retry policy defined
169-
if iter.err == nil || !qry.IsIdempotent() || rt == nil {
190+
// Also, if there is specified host for the query to be executed on
191+
// and query execution is failed we should exit
192+
if iter.err == nil || specifiedHost != nil || !qry.IsIdempotent() || rt == nil {
170193
return iter
171194
}
172195

session.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,10 @@ type Query struct {
943943

944944
// routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex.
945945
routingInfo *queryRoutingInfo
946+
947+
// host specifies the host on which the query should be executed.
948+
// If it is nil, then the host is picked by HostSelectionPolicy
949+
host *HostInfo
946950
}
947951

948952
type queryRoutingInfo struct {
@@ -1430,6 +1434,18 @@ func (q *Query) releaseAfterExecution() {
14301434
q.decRefCount()
14311435
}
14321436

1437+
// SetHosts allows to define on which host the query should be executed.
1438+
// If host == nil, then the HostSelectionPolicy will be used to pick a host.
1439+
func (q *Query) SetHost(host *HostInfo) *Query {
1440+
q.host = host
1441+
return q
1442+
}
1443+
1444+
// GetHost returns host on which query should be executed.
1445+
func (q *Query) GetHost() *HostInfo {
1446+
return q.host
1447+
}
1448+
14331449
// Iter represents an iterator that can be used to iterate over all rows that
14341450
// were returned by a query. The iterator might send additional queries to the
14351451
// database during the iteration if paging was enabled.
@@ -2045,6 +2061,10 @@ func (b *Batch) releaseAfterExecution() {
20452061
// that would race with speculative executions.
20462062
}
20472063

2064+
func (b *Batch) GetHost() *HostInfo {
2065+
return nil
2066+
}
2067+
20482068
type BatchType byte
20492069

20502070
const (
@@ -2177,6 +2197,15 @@ func (t *traceWriter) Trace(traceId []byte) {
21772197
}
21782198
}
21792199

2200+
// GetHosts returns a list of hosts found via queries to system.local and system.peers
2201+
func (s *Session) GetHosts() ([]*HostInfo, error) {
2202+
hosts, _, err := s.hostSource.GetHosts()
2203+
if err != nil {
2204+
return nil, err
2205+
}
2206+
return hosts, nil
2207+
}
2208+
21802209
type ObservedQuery struct {
21812210
Keyspace string
21822211
Statement string

0 commit comments

Comments
 (0)