Skip to content

Commit 1362fbf

Browse files
Henrik Johanssonpenberg
Henrik Johansson
authored andcommitted
gemini: terminating properly upon an error
We ensure that we drain the remaining work after a quit signal is detected. This ensures that no workers get blocked on trying to send a status message after the receiving end has stopped listening. We also send down the termination context to the driver to allow it to abort what it is doing as fast as possible. (cherry picked from commit c8b2425)
1 parent 0c69950 commit 1362fbf

File tree

3 files changed

+46
-23
lines changed

3 files changed

+46
-23
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## Unreleased
44

5+
- Ensure proper termination when errors happen.
56
- Fix mutation timestamps to match on system under test and test oracle.
67
- Gemini now timestamps errors for easier correlation.
78

Diff for: cmd/gemini/root.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func run(cmd *cobra.Command, args []string) {
170170
if verbose {
171171
fmt.Println(stmt)
172172
}
173-
if err := session.Mutate(stmt); err != nil {
173+
if err := session.Mutate(context.Background(), stmt); err != nil {
174174
fmt.Printf("%v", err)
175175
return
176176
}
@@ -180,7 +180,7 @@ func run(cmd *cobra.Command, args []string) {
180180
if verbose {
181181
fmt.Println(stmt)
182182
}
183-
if err := session.Mutate(stmt); err != nil {
183+
if err := session.Mutate(context.Background(), stmt); err != nil {
184184
fmt.Printf("%v", err)
185185
return
186186
}
@@ -229,37 +229,39 @@ func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string, ou
229229
for {
230230
select {
231231
case <-timer.C:
232+
cancelWorkers()
233+
testRes = drain(c, testRes)
232234
testRes.PrintResult(out)
233235
fmt.Println("Test run completed. Exiting.")
234-
cancelWorkers()
235236
return
236237
case <-reporterCtx.Done():
237-
testRes.PrintResult(out)
238238
return
239239
case res := <-c:
240240
testRes = res.Merge(&testRes)
241241
if sp != nil {
242242
sp.Suffix = fmt.Sprintf(" Running Gemini... %v", testRes)
243243
}
244244
if testRes.ReadErrors > 0 {
245-
testRes.PrintResult(out)
246-
fmt.Println(testRes.Errors)
247245
if failFast {
248246
fmt.Println("Error in data validation. Exiting.")
249247
cancelWorkers()
248+
testRes = drain(c, testRes)
249+
testRes.PrintResult(out)
250250
return
251251
}
252+
testRes.PrintResult(out)
252253
}
253254
}
254255
}
255256
}(duration)
256257

257258
workers.Wait()
259+
close(c)
258260
cancelReporter()
259261
reporter.Wait()
260262
}
261263

262-
func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
264+
func mutationJob(ctx context.Context, schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
263265
mutateStmt, err := schema.GenMutateStmt(table, &p)
264266
if err != nil {
265267
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
@@ -271,7 +273,7 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
271273
if verbose {
272274
fmt.Println(mutateStmt.PrettyCQL())
273275
}
274-
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
276+
if err := s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
275277
e := gemini.JobError{
276278
Timestamp: time.Now(),
277279
Message: "Mutation failed: " + err.Error(),
@@ -284,14 +286,14 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
284286
}
285287
}
286288

287-
func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
289+
func validationJob(ctx context.Context, schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
288290
checkStmt := schema.GenCheckStmt(table, &p)
289291
checkQuery := checkStmt.Query
290292
checkValues := checkStmt.Values()
291293
if verbose {
292294
fmt.Println(checkStmt.PrettyCQL())
293295
}
294-
if err := s.Check(table, checkQuery, checkValues...); err != nil {
296+
if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil {
295297
// De-duplication needed?
296298
e := gemini.JobError{
297299
Timestamp: time.Now(),
@@ -318,23 +320,23 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table g
318320
}
319321
switch mode {
320322
case writeMode:
321-
mutationJob(schema, table, s, p, &testStatus, out)
323+
mutationJob(ctx, schema, table, s, p, &testStatus, out)
322324
case readMode:
323-
validationJob(schema, table, s, p, &testStatus, out)
325+
validationJob(ctx, schema, table, s, p, &testStatus, out)
324326
default:
325327
ind := p.Rand.Intn(100000) % 2
326328
if ind == 0 {
327-
mutationJob(schema, table, s, p, &testStatus, out)
329+
mutationJob(ctx, schema, table, s, p, &testStatus, out)
328330
} else {
329-
validationJob(schema, table, s, p, &testStatus, out)
331+
validationJob(ctx, schema, table, s, p, &testStatus, out)
330332
}
331333
}
332334

333335
if i%1000 == 0 {
334336
c <- testStatus
335337
testStatus = Status{}
336338
}
337-
if failFast && testStatus.ReadErrors > 0 {
339+
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
338340
break
339341
}
340342
i++
@@ -390,3 +392,10 @@ func printSetup() error {
390392
tw.Flush()
391393
return nil
392394
}
395+
396+
func drain(ch chan Status, testRes Status) Status {
397+
for res := range ch {
398+
testRes = res.Merge(&testRes)
399+
}
400+
return testRes
401+
}

Diff for: session.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gemini
22

33
import (
4+
"context"
45
"fmt"
56
"math/big"
67
"sort"
@@ -55,26 +56,26 @@ func (s *Session) Close() {
5556
s.oracleSession.Close()
5657
}
5758

58-
func (s *Session) Mutate(query string, values ...interface{}) error {
59+
func (s *Session) Mutate(ctx context.Context, query string, values ...interface{}) error {
5960
ts := time.Now()
6061
var tsUsec int64 = ts.UnixNano() / 1000
61-
if err := s.testSession.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil {
62+
if err := s.testSession.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
6263
return fmt.Errorf("%v [cluster = test, query = '%s']", err, query)
6364
}
64-
if err := s.oracleSession.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil {
65+
if err := s.oracleSession.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
6566
return fmt.Errorf("%v [cluster = oracle, query = '%s']", err, query)
6667
}
6768
return nil
6869
}
6970

70-
func (s *Session) Check(table Table, query string, values ...interface{}) (err error) {
71-
testIter := s.testSession.Query(query, values...).Iter()
72-
oracleIter := s.oracleSession.Query(query, values...).Iter()
71+
func (s *Session) Check(ctx context.Context, table Table, query string, values ...interface{}) (err error) {
72+
testIter := s.testSession.Query(query, values...).WithContext(ctx).Iter()
73+
oracleIter := s.oracleSession.Query(query, values...).WithContext(ctx).Iter()
7374
defer func() {
74-
if e := testIter.Close(); e != nil {
75+
if e := testIter.Close(); !ignore(e) {
7576
err = multierr.Append(err, errors.Errorf("test system failed: %s", e.Error()))
7677
}
77-
if e := oracleIter.Close(); e != nil {
78+
if e := oracleIter.Close(); !ignore(e) {
7879
err = multierr.Append(err, errors.Errorf("oracle failed: %s", e.Error()))
7980
}
8081
}()
@@ -169,3 +170,15 @@ func loadSet(iter *gocql.Iter) []map[string]interface{} {
169170
}
170171
return rows
171172
}
173+
174+
func ignore(err error) bool {
175+
if err == nil {
176+
return true
177+
}
178+
switch err {
179+
case context.Canceled, context.DeadlineExceeded:
180+
return true
181+
default:
182+
return false
183+
}
184+
}

0 commit comments

Comments
 (0)