Skip to content

Commit c1e46be

Browse files
committed
CASSGO-4 SetHost API for Query
This patch provides mechanism that allows users to specify on which node the query will be executed. It is not a tipycal use case, but it makes sense with virtual tables which are available since C* 5.0.0. Patch by Bohdan Siryk; Reviewed by <reviewers> for CASSGO-4
1 parent 109a892 commit c1e46be

File tree

3 files changed

+84
-5
lines changed

3 files changed

+84
-5
lines changed

cassandra_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3303,7 +3303,6 @@ func TestUnsetColBatch(t *testing.T) {
33033303
}
33043304
var id, mInt, count int
33053305
var mText string
3306-
33073306
if err := session.Query("SELECT count(*) FROM gocql_test.batchUnsetInsert;").Scan(&count); err != nil {
33083307
t.Fatalf("Failed to select with err: %v", err)
33093308
} else if count != 2 {
@@ -3338,3 +3337,35 @@ func TestQuery_NamedValues(t *testing.T) {
33383337
t.Fatal(err)
33393338
}
33403339
}
3340+
3341+
func TestQuery_SetHost(t *testing.T) {
3342+
// This test ensures that queries are sent to the specified host only
3343+
3344+
session := createSession(t)
3345+
defer session.Close()
3346+
3347+
hosts, err := session.GetHosts()
3348+
if err != nil {
3349+
t.Fatal(err)
3350+
}
3351+
3352+
for _, expectedHost := range hosts {
3353+
const iterations = 5
3354+
for i := 0; i < iterations; i++ {
3355+
var actualHostID string
3356+
err := session.Query("SELECT host_id FROM system.local").
3357+
SetHost(expectedHost).
3358+
Scan(&actualHostID)
3359+
if err != nil {
3360+
t.Fatal(err)
3361+
}
3362+
3363+
if expectedHost.HostID() != actualHostID {
3364+
t.Fatalf("Expected query to be executed on host %s, but it was executed on %s",
3365+
expectedHost.HostID(),
3366+
actualHostID,
3367+
)
3368+
}
3369+
}
3370+
}
3371+
}

query_executor.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ExecutableQuery interface {
4141
Keyspace() string
4242
Table() string
4343
IsIdempotent() bool
44+
GetHost() *HostInfo
4445

4546
withContext(context.Context) ExecutableQuery
4647

@@ -83,12 +84,27 @@ func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp S
8384
}
8485

8586
func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
86-
hostIter := q.policy.Pick(qry)
87+
var hostIter NextHost
88+
89+
// checking if the host is specified for the query,
90+
// if it is, the query should be executed at the specified host
91+
host := qry.GetHost()
92+
if host != nil {
93+
hostIter = func() SelectedHost {
94+
return (*selectedHost)(host)
95+
}
96+
}
97+
98+
// if host is not specified for the query,
99+
// then a host will be picked by HostSelectionPolicy
100+
if hostIter == nil {
101+
hostIter = q.policy.Pick(qry)
102+
}
87103

88104
// check if the query is not marked as idempotent, if
89105
// it is, we force the policy to NonSpeculative
90106
sp := qry.speculativeExecutionPolicy()
91-
if !qry.IsIdempotent() || sp.Attempts() == 0 {
107+
if host != nil || !qry.IsIdempotent() || sp.Attempts() == 0 {
92108
return q.do(qry.Context(), qry, hostIter), nil
93109
}
94110

@@ -129,12 +145,13 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
129145
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
130146
selectedHost := hostIter()
131147
rt := qry.retryPolicy()
148+
specifiedHost := qry.GetHost()
132149

133150
var lastErr error
134151
var iter *Iter
135152
for selectedHost != nil {
136153
host := selectedHost.Info()
137-
if host == nil || !host.IsUp() {
154+
if (host == nil || !host.IsUp()) && specifiedHost == nil {
138155
selectedHost = hostIter()
139156
continue
140157
}
@@ -166,7 +183,9 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
166183

167184
// Exit if the query was successful
168185
// or no retry policy defined or retry attempts were reached
169-
if iter.err == nil || !qry.IsIdempotent() || rt == nil || !rt.Attempt(qry) {
186+
// Also, if there is specified host for the query to be executed on
187+
// and query execution is failed we should exit
188+
if iter.err == nil || specifiedHost != nil || !qry.IsIdempotent() || rt == nil || !rt.Attempt(qry) {
170189
return iter
171190
}
172191
lastErr = iter.err

session.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,10 @@ type Query struct {
936936

937937
// routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex.
938938
routingInfo *queryRoutingInfo
939+
940+
// host specifies the host on which the query should be executed.
941+
// If it is nil, then the host is picked by HostSelectionPolicy
942+
host *HostInfo
939943
}
940944

941945
type queryRoutingInfo struct {
@@ -1423,6 +1427,18 @@ func (q *Query) releaseAfterExecution() {
14231427
q.decRefCount()
14241428
}
14251429

1430+
// SetHosts allows to define on which host the query should be executed.
1431+
// If host == nil, then the HostSelectionPolicy will be used to pick a host.
1432+
func (q *Query) SetHost(host *HostInfo) *Query {
1433+
q.host = host
1434+
return q
1435+
}
1436+
1437+
// GetHost returns host on which query should be executed.
1438+
func (q *Query) GetHost() *HostInfo {
1439+
return q.host
1440+
}
1441+
14261442
// Iter represents an iterator that can be used to iterate over all rows that
14271443
// were returned by a query. The iterator might send additional queries to the
14281444
// database during the iteration if paging was enabled.
@@ -2030,6 +2046,10 @@ func (b *Batch) releaseAfterExecution() {
20302046
// that would race with speculative executions.
20312047
}
20322048

2049+
func (b *Batch) GetHost() *HostInfo {
2050+
return nil
2051+
}
2052+
20332053
type BatchType byte
20342054

20352055
const (
@@ -2162,6 +2182,15 @@ func (t *traceWriter) Trace(traceId []byte) {
21622182
}
21632183
}
21642184

2185+
// GetHosts returns a list of hosts found via queries to system.local and system.peers
2186+
func (s *Session) GetHosts() ([]*HostInfo, error) {
2187+
hosts, _, err := s.hostSource.GetHosts()
2188+
if err != nil {
2189+
return nil, err
2190+
}
2191+
return hosts, nil
2192+
}
2193+
21652194
type ObservedQuery struct {
21662195
Keyspace string
21672196
Statement string

0 commit comments

Comments
 (0)