Skip to content

Commit ae7d1c6

Browse files
author
Henrik Johansson
authored
Merge pull request #116 from scylladb/backport-terminate-fix
Backport termination fix to 1.0
2 parents 74944b6 + 1362fbf commit ae7d1c6

File tree

3 files changed

+48
-25
lines changed

3 files changed

+48
-25
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## Unreleased
44

5+
- Ensure proper termination when errors happen.
56
- Fix mutation timestamps to match on system under test and test oracle.
67
- Gemini now timestamps errors for easier correlation.
78

Diff for: cmd/gemini/root.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func run(cmd *cobra.Command, args []string) {
170170
if verbose {
171171
fmt.Println(stmt)
172172
}
173-
if err := session.Mutate(stmt); err != nil {
173+
if err := session.Mutate(context.Background(), stmt); err != nil {
174174
fmt.Printf("%v", err)
175175
return
176176
}
@@ -180,7 +180,7 @@ func run(cmd *cobra.Command, args []string) {
180180
if verbose {
181181
fmt.Println(stmt)
182182
}
183-
if err := session.Mutate(stmt); err != nil {
183+
if err := session.Mutate(context.Background(), stmt); err != nil {
184184
fmt.Printf("%v", err)
185185
return
186186
}
@@ -229,37 +229,39 @@ func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string, ou
229229
for {
230230
select {
231231
case <-timer.C:
232+
cancelWorkers()
233+
testRes = drain(c, testRes)
232234
testRes.PrintResult(out)
233235
fmt.Println("Test run completed. Exiting.")
234-
cancelWorkers()
235236
return
236237
case <-reporterCtx.Done():
237-
testRes.PrintResult(out)
238238
return
239239
case res := <-c:
240240
testRes = res.Merge(&testRes)
241241
if sp != nil {
242242
sp.Suffix = fmt.Sprintf(" Running Gemini... %v", testRes)
243243
}
244244
if testRes.ReadErrors > 0 {
245-
testRes.PrintResult(out)
246-
fmt.Println(testRes.Errors)
247245
if failFast {
248246
fmt.Println("Error in data validation. Exiting.")
249247
cancelWorkers()
248+
testRes = drain(c, testRes)
249+
testRes.PrintResult(out)
250250
return
251251
}
252+
testRes.PrintResult(out)
252253
}
253254
}
254255
}
255256
}(duration)
256257

257258
workers.Wait()
259+
close(c)
258260
cancelReporter()
259261
reporter.Wait()
260262
}
261263

262-
func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
264+
func mutationJob(ctx context.Context, schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
263265
mutateStmt, err := schema.GenMutateStmt(table, &p)
264266
if err != nil {
265267
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
@@ -271,7 +273,7 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
271273
if verbose {
272274
fmt.Println(mutateStmt.PrettyCQL())
273275
}
274-
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
276+
if err := s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
275277
e := gemini.JobError{
276278
Timestamp: time.Now(),
277279
Message: "Mutation failed: " + err.Error(),
@@ -284,14 +286,14 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
284286
}
285287
}
286288

287-
func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
289+
func validationJob(ctx context.Context, schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
288290
checkStmt := schema.GenCheckStmt(table, &p)
289291
checkQuery := checkStmt.Query
290292
checkValues := checkStmt.Values()
291293
if verbose {
292294
fmt.Println(checkStmt.PrettyCQL())
293295
}
294-
if err := s.Check(table, checkQuery, checkValues...); err != nil {
296+
if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil {
295297
// De-duplication needed?
296298
e := gemini.JobError{
297299
Timestamp: time.Now(),
@@ -318,23 +320,23 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table g
318320
}
319321
switch mode {
320322
case writeMode:
321-
mutationJob(schema, table, s, p, &testStatus, out)
323+
mutationJob(ctx, schema, table, s, p, &testStatus, out)
322324
case readMode:
323-
validationJob(schema, table, s, p, &testStatus, out)
325+
validationJob(ctx, schema, table, s, p, &testStatus, out)
324326
default:
325327
ind := p.Rand.Intn(100000) % 2
326328
if ind == 0 {
327-
mutationJob(schema, table, s, p, &testStatus, out)
329+
mutationJob(ctx, schema, table, s, p, &testStatus, out)
328330
} else {
329-
validationJob(schema, table, s, p, &testStatus, out)
331+
validationJob(ctx, schema, table, s, p, &testStatus, out)
330332
}
331333
}
332334

333335
if i%1000 == 0 {
334336
c <- testStatus
335337
testStatus = Status{}
336338
}
337-
if failFast && testStatus.ReadErrors > 0 {
339+
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
338340
break
339341
}
340342
i++
@@ -390,3 +392,10 @@ func printSetup() error {
390392
tw.Flush()
391393
return nil
392394
}
395+
396+
func drain(ch chan Status, testRes Status) Status {
397+
for res := range ch {
398+
testRes = res.Merge(&testRes)
399+
}
400+
return testRes
401+
}

Diff for: session.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gemini
22

33
import (
4+
"context"
45
"fmt"
56
"math/big"
67
"sort"
@@ -55,27 +56,27 @@ func (s *Session) Close() {
5556
s.oracleSession.Close()
5657
}
5758

58-
func (s *Session) Mutate(query string, values ...interface{}) error {
59+
func (s *Session) Mutate(ctx context.Context, query string, values ...interface{}) error {
5960
ts := time.Now()
6061
var tsUsec int64 = ts.UnixNano() / 1000
61-
if err := s.testSession.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil {
62+
if err := s.testSession.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
6263
return fmt.Errorf("%v [cluster = test, query = '%s']", err, query)
6364
}
64-
if err := s.oracleSession.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil {
65+
if err := s.oracleSession.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
6566
return fmt.Errorf("%v [cluster = oracle, query = '%s']", err, query)
6667
}
6768
return nil
6869
}
6970

70-
func (s *Session) Check(table Table, query string, values ...interface{}) (err error) {
71-
testIter := s.testSession.Query(query, values...).Iter()
72-
oracleIter := s.oracleSession.Query(query, values...).Iter()
71+
func (s *Session) Check(ctx context.Context, table Table, query string, values ...interface{}) (err error) {
72+
testIter := s.testSession.Query(query, values...).WithContext(ctx).Iter()
73+
oracleIter := s.oracleSession.Query(query, values...).WithContext(ctx).Iter()
7374
defer func() {
74-
if e := testIter.Close(); e != nil {
75-
err = multierr.Append(err, errors.Errorf("test system failed: %s", err.Error()))
75+
if e := testIter.Close(); !ignore(e) {
76+
err = multierr.Append(err, errors.Errorf("test system failed: %s", e.Error()))
7677
}
77-
if e := oracleIter.Close(); e != nil {
78-
err = multierr.Append(err, errors.Errorf("oracle failed: %s", err.Error()))
78+
if e := oracleIter.Close(); !ignore(e) {
79+
err = multierr.Append(err, errors.Errorf("oracle failed: %s", e.Error()))
7980
}
8081
}()
8182

@@ -169,3 +170,15 @@ func loadSet(iter *gocql.Iter) []map[string]interface{} {
169170
}
170171
return rows
171172
}
173+
174+
func ignore(err error) bool {
175+
if err == nil {
176+
return true
177+
}
178+
switch err {
179+
case context.Canceled, context.DeadlineExceeded:
180+
return true
181+
default:
182+
return false
183+
}
184+
}

0 commit comments

Comments
 (0)