Skip to content
This repository was archived by the owner on Jul 21, 2025. It is now read-only.

Commit 438b792

Browse files
committed
query: Exec, Prepare, AsyncExec and Iter now take context as an argument
Query execution methods now take a context argument, when this context is done before sending the query, send will be cancelled, this is done by: - preventing allocation of streamID's for requests with done contexts in sendController() - passing the context to connWriter.loop() through requestCh - stopping already serialized requests from being sent in send() and compress() if context is done, those functions now return a skippedError. - connWriter.loop() now skips a request on skippedError, freeing its streamID through a callback to connReader.freeStream(). Fixes #250
1 parent 280bc13 commit 438b792

File tree

12 files changed

+290
-160
lines changed

12 files changed

+290
-160
lines changed

experiments/cmd/benchtab/main.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func main() {
4444
if err != nil {
4545
log.Fatal(err)
4646
}
47-
initKeyspaceAndTable(initSession, config.keyspace)
47+
initKeyspaceAndTable(ctx, initSession, config.keyspace)
4848
initSession.Close()
4949
}
5050

@@ -53,21 +53,21 @@ func main() {
5353
if err != nil {
5454
log.Fatal(err)
5555
}
56-
truncateTable(session)
56+
truncateTable(ctx, session)
5757

5858
if config.workload == Selects && !config.dontPrepare {
59-
initSelectsBenchmark(session, config)
59+
initSelectsBenchmark(ctx, session, config)
6060
}
6161

6262
if config.async {
63-
asyncBenchmark(&config, session)
63+
asyncBenchmark(ctx, &config, session)
6464
} else {
65-
benchmark(&config, session)
65+
benchmark(ctx, &config, session)
6666
}
6767
}
6868

6969
// benchmark is the same as in gocql.
70-
func benchmark(config *Config, session *scylla.Session) {
70+
func benchmark(ctx context.Context, config *Config, session *scylla.Session) {
7171
var wg sync.WaitGroup
7272
nextBatchStart := -config.batchSize
7373

@@ -78,11 +78,11 @@ func benchmark(config *Config, session *scylla.Session) {
7878
wg.Add(1)
7979
go func() {
8080
defer wg.Done()
81-
insertQ, err := session.Prepare(insertStmt)
81+
insertQ, err := session.Prepare(ctx, insertStmt)
8282
if err != nil {
8383
log.Fatal(err)
8484
}
85-
selectQ, err := session.Prepare(selectStmt)
85+
selectQ, err := session.Prepare(ctx, selectStmt)
8686
if err != nil {
8787
log.Fatal(err)
8888
}
@@ -98,15 +98,15 @@ func benchmark(config *Config, session *scylla.Session) {
9898

9999
for pk := curBatchStart; pk < curBatchEnd; pk++ {
100100
if config.workload == Inserts || config.workload == Mixed {
101-
_, err := insertQ.BindInt64(0, pk).BindInt64(1, 2*pk).BindInt64(2, 3*pk).Exec()
101+
_, err := insertQ.BindInt64(0, pk).BindInt64(1, 2*pk).BindInt64(2, 3*pk).Exec(ctx)
102102
if err != nil {
103103
panic(err)
104104
}
105105
}
106106

107107
if config.workload == Selects || config.workload == Mixed {
108108
var v1, v2 int64
109-
res, err := selectQ.BindInt64(0, pk).Exec()
109+
res, err := selectQ.BindInt64(0, pk).Exec(ctx)
110110
if err != nil {
111111
panic(err)
112112
}
@@ -133,7 +133,7 @@ func benchmark(config *Config, session *scylla.Session) {
133133
log.Printf("Finished\nBenchmark time: %d ms\n", benchTime.Milliseconds())
134134
}
135135

136-
func asyncBenchmark(config *Config, session *scylla.Session) {
136+
func asyncBenchmark(ctx context.Context, config *Config, session *scylla.Session) {
137137
var wg sync.WaitGroup
138138
nextBatchStart := -config.batchSize
139139

@@ -145,11 +145,11 @@ func asyncBenchmark(config *Config, session *scylla.Session) {
145145
go func() {
146146
defer wg.Done()
147147

148-
insertQ, err := session.Prepare(insertStmt)
148+
insertQ, err := session.Prepare(ctx, insertStmt)
149149
if err != nil {
150150
log.Fatal(err)
151151
}
152-
selectQ, err := session.Prepare(selectStmt)
152+
selectQ, err := session.Prepare(ctx, selectStmt)
153153
if err != nil {
154154
log.Fatal(err)
155155
}
@@ -164,11 +164,11 @@ func asyncBenchmark(config *Config, session *scylla.Session) {
164164
curBatchEnd := min(curBatchStart+config.batchSize, config.tasks)
165165

166166
if config.workload == Inserts || config.workload == Mixed {
167-
asyncInserts(&insertQ, curBatchStart, curBatchEnd)
167+
asyncInserts(ctx, &insertQ, curBatchStart, curBatchEnd)
168168
}
169169

170170
if config.workload == Selects || config.workload == Mixed {
171-
asyncSelects(&selectQ, curBatchStart, curBatchEnd)
171+
asyncSelects(ctx, &selectQ, curBatchStart, curBatchEnd)
172172
}
173173
}
174174
}()
@@ -179,12 +179,12 @@ func asyncBenchmark(config *Config, session *scylla.Session) {
179179
log.Printf("Finished\nBenchmark time: %d ms\n", benchTime.Milliseconds())
180180
}
181181

182-
func asyncInserts(insertQ *scylla.Query, curBatchStart, curBatchEnd int64) {
182+
func asyncInserts(ctx context.Context, insertQ *scylla.Query, curBatchStart, curBatchEnd int64) {
183183
for pk := curBatchStart; pk < curBatchEnd; pk++ {
184184
insertQ.BindInt64(0, pk)
185185
insertQ.BindInt64(1, 2*pk)
186186
insertQ.BindInt64(2, 3*pk)
187-
insertQ.AsyncExec()
187+
insertQ.AsyncExec(ctx)
188188
}
189189
for pk := curBatchStart; pk < curBatchEnd; pk++ {
190190
if _, err := insertQ.Fetch(); err != nil {
@@ -193,10 +193,10 @@ func asyncInserts(insertQ *scylla.Query, curBatchStart, curBatchEnd int64) {
193193
}
194194
}
195195

196-
func asyncSelects(selectQ *scylla.Query, curBatchStart, curBatchEnd int64) {
196+
func asyncSelects(ctx context.Context, selectQ *scylla.Query, curBatchStart, curBatchEnd int64) {
197197
for pk := curBatchStart; pk < curBatchEnd; pk++ {
198198
selectQ.BindInt64(0, pk)
199-
selectQ.AsyncExec()
199+
selectQ.AsyncExec(ctx)
200200
}
201201
for pk := curBatchStart; pk < curBatchEnd; pk++ {
202202
res, err := selectQ.Fetch()
@@ -222,31 +222,31 @@ func asyncSelects(selectQ *scylla.Query, curBatchStart, curBatchEnd int64) {
222222
}
223223
}
224224

225-
func truncateTable(session *scylla.Session) {
225+
func truncateTable(ctx context.Context, session *scylla.Session) {
226226
q := session.Query("TRUNCATE TABLE benchtab")
227-
if _, err := q.Exec(); err != nil {
227+
if _, err := q.Exec(ctx); err != nil {
228228
log.Fatal(err)
229229
}
230230
}
231231

232-
func initKeyspaceAndTable(session *scylla.Session, ks string) {
232+
func initKeyspaceAndTable(ctx context.Context, session *scylla.Session, ks string) {
233233
q := session.Query("DROP KEYSPACE IF EXISTS " + ks)
234-
if _, err := q.Exec(); err != nil {
234+
if _, err := q.Exec(ctx); err != nil {
235235
log.Fatal(err)
236236
}
237237

238238
q = session.Query("CREATE KEYSPACE " + ks + " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}")
239-
if _, err := q.Exec(); err != nil {
239+
if _, err := q.Exec(ctx); err != nil {
240240
log.Fatal(err)
241241
}
242242

243243
q = session.Query("CREATE TABLE " + ks + ".benchtab (pk bigint PRIMARY KEY, v1 bigint, v2 bigint)")
244-
if _, err := q.Exec(); err != nil {
244+
if _, err := q.Exec(ctx); err != nil {
245245
log.Fatal(err)
246246
}
247247
}
248248

249-
func initSelectsBenchmark(session *scylla.Session, config Config) {
249+
func initSelectsBenchmark(ctx context.Context, session *scylla.Session, config Config) {
250250
log.Println("inserting values...")
251251

252252
var wg sync.WaitGroup
@@ -257,7 +257,7 @@ func initSelectsBenchmark(session *scylla.Session, config Config) {
257257
go func() {
258258
defer wg.Done()
259259

260-
insertQ, err := session.Prepare(insertStmt)
260+
insertQ, err := session.Prepare(ctx, insertStmt)
261261
if err != nil {
262262
log.Fatal(err)
263263
}
@@ -275,7 +275,7 @@ func initSelectsBenchmark(session *scylla.Session, config Config) {
275275
insertQ.BindInt64(0, pk)
276276
insertQ.BindInt64(1, 2*pk)
277277
insertQ.BindInt64(2, 3*pk)
278-
if _, err := insertQ.Exec(); err != nil {
278+
if _, err := insertQ.Exec(ctx); err != nil {
279279
log.Fatal(err)
280280
}
281281
}

query.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package scylla
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/scylladb/scylla-go-driver/frame"
@@ -11,18 +12,18 @@ type Query struct {
1112
session *Session
1213
stmt transport.Statement
1314
buf frame.Buffer
14-
exec func(*transport.Conn, transport.Statement, frame.Bytes) (transport.QueryResult, error)
15-
asyncExec func(*transport.Conn, transport.Statement, frame.Bytes, transport.ResponseHandler)
15+
exec func(context.Context, *transport.Conn, transport.Statement, frame.Bytes) (transport.QueryResult, error)
16+
asyncExec func(context.Context, *transport.Conn, transport.Statement, frame.Bytes, transport.ResponseHandler)
1617
res []transport.ResponseHandler
1718
}
1819

19-
func (q *Query) Exec() (Result, error) {
20+
func (q *Query) Exec(ctx context.Context) (Result, error) {
2021
conn, err := q.pickConn()
2122
if err != nil {
2223
return Result{}, err
2324
}
2425

25-
res, err := q.exec(conn, q.stmt, nil)
26+
res, err := q.exec(ctx, conn, q.stmt, nil)
2627
return Result(res), err
2728
}
2829

@@ -47,7 +48,7 @@ func (q *Query) pickConn() (*transport.Conn, error) {
4748
return conn, nil
4849
}
4950

50-
func (q *Query) AsyncExec() {
51+
func (q *Query) AsyncExec(ctx context.Context) {
5152
stmt := q.stmt.Clone()
5253

5354
conn, err := q.pickConn()
@@ -58,7 +59,7 @@ func (q *Query) AsyncExec() {
5859

5960
h := transport.MakeResponseHandler()
6061
q.res = append(q.res, h)
61-
q.asyncExec(conn, stmt, nil, h)
62+
q.asyncExec(ctx, conn, stmt, nil, h)
6263
}
6364

6465
var ErrNoQueryResults = fmt.Errorf("no query results to be fetched")
@@ -148,7 +149,7 @@ func (q *Query) Compression() bool {
148149

149150
type Result transport.QueryResult
150151

151-
func (q *Query) Iter() Iter {
152+
func (q *Query) Iter(ctx context.Context) Iter {
152153
it := Iter{
153154
requestCh: make(chan struct{}, 1),
154155
nextCh: make(chan transport.QueryResult),
@@ -171,7 +172,7 @@ func (q *Query) Iter() Iter {
171172
}
172173

173174
it.requestCh <- struct{}{}
174-
go worker.loop()
175+
go worker.loop(ctx)
175176
return it
176177
}
177178

@@ -232,21 +233,21 @@ type iterWorker struct {
232233
stmt transport.Statement
233234
conn *transport.Conn
234235
pagingState []byte
235-
queryExec func(*transport.Conn, transport.Statement, frame.Bytes) (transport.QueryResult, error)
236+
queryExec func(context.Context, *transport.Conn, transport.Statement, frame.Bytes) (transport.QueryResult, error)
236237

237238
requestCh chan struct{}
238239
nextCh chan transport.QueryResult
239240
errCh chan error
240241
}
241242

242-
func (w *iterWorker) loop() {
243+
func (w *iterWorker) loop(ctx context.Context) {
243244
for {
244245
_, ok := <-w.requestCh
245246
if !ok {
246247
return
247248
}
248249

249-
res, err := w.queryExec(w.conn, w.stmt, w.pagingState)
250+
res, err := w.queryExec(ctx, w.conn, w.stmt, w.pagingState)
250251
if err != nil {
251252
w.errCh <- err
252253
return

session.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,16 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
139139
func (s *Session) Query(content string) Query {
140140
return Query{session: s,
141141
stmt: transport.Statement{Content: content, Consistency: s.cfg.DefaultConsistency},
142-
exec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
143-
return conn.Query(stmt, pagingState)
142+
exec: func(ctx context.Context, conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
143+
return conn.Query(ctx, stmt, pagingState)
144144
},
145-
asyncExec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
146-
conn.AsyncQuery(stmt, pagingState, handler)
145+
asyncExec: func(ctx context.Context, conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
146+
conn.AsyncQuery(ctx, stmt, pagingState, handler)
147147
},
148148
}
149149
}
150150

151-
func (s *Session) Prepare(content string) (Query, error) {
151+
func (s *Session) Prepare(ctx context.Context, content string) (Query, error) {
152152
stmt := transport.Statement{Content: content, Consistency: frame.ALL}
153153

154154
// Prepare on all nodes concurrently.
@@ -160,7 +160,7 @@ func (s *Session) Prepare(content string) (Query, error) {
160160
wg.Add(1)
161161
go func(idx int) {
162162
defer wg.Done()
163-
resStmt[idx], resErr[idx] = nodes[idx].Prepare(stmt)
163+
resStmt[idx], resErr[idx] = nodes[idx].Prepare(ctx, stmt)
164164
}(i)
165165
}
166166
wg.Wait()
@@ -171,11 +171,11 @@ func (s *Session) Prepare(content string) (Query, error) {
171171
return Query{
172172
session: s,
173173
stmt: resStmt[i],
174-
exec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
175-
return conn.Execute(stmt, pagingState)
174+
exec: func(ctx context.Context, conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
175+
return conn.Execute(ctx, stmt, pagingState)
176176
},
177-
asyncExec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
178-
conn.AsyncExecute(stmt, pagingState, handler)
177+
asyncExec: func(ctx context.Context, conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
178+
conn.AsyncExecute(ctx, stmt, pagingState, handler)
179179
},
180180
}, nil
181181
}

0 commit comments

Comments
 (0)