diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cfff094a..485fe6320 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `ObservedQuery.Consistency` and `ObservedBatch.Consistency` fields (CASSGO-53) + ### Changed - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) diff --git a/cassandra_test.go b/cassandra_test.go index 545b307e0..dcd5694a5 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -173,9 +173,10 @@ func TestObserve(t *testing.T) { } var ( - observedErr error - observedKeyspace string - observedStmt string + observedErr error + observedKeyspace string + observedStmt string + observedConsistency Consistency ) const keyspace = "gocql_test" @@ -184,12 +185,14 @@ func TestObserve(t *testing.T) { observedErr = errors.New("placeholder only") // used to distinguish err=nil cases observedKeyspace = "" observedStmt = "" + observedConsistency = 0 } observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) { observedKeyspace = o.Keyspace observedStmt = o.Statement observedErr = o.Err + observedConsistency = o.Consistency }) // select before inserted, will error but the reporting is err=nil as the query is valid @@ -203,6 +206,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } resetObserved() @@ -214,6 +219,8 @@ func TestObserve(t *testing.T) { t.Fatal("insert: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `INSERT INTO observe (id) VALUES (?)` { t.Fatal("insert: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } resetObserved() @@ -228,6 +235,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } // also works from session observer @@ -241,6 +250,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } // reports errors when the query is poorly formed @@ -254,6 +265,16 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM unknown_table WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) + } + + resetObserved() + expectedConsistency := One + if err := session.Query(`SELECT id FROM unknown_table WHERE id = ?`, 42).Observer(observer).Consistency(expectedConsistency).Scan(&value); err == nil { + t.Fatal("select: expecting error") + } else if observedConsistency != expectedConsistency { + t.Fatalf("select: unexpected observed consistency %s, expected %s", observedConsistency, expectedConsistency) } } @@ -1992,25 +2013,28 @@ func TestBatchObserve(t *testing.T) { } type observation struct { - observedErr error - observedKeyspace string - observedStmts []string - observedValues [][]interface{} + observedErr error + observedKeyspace string + observedStmts []string + observedValues [][]interface{} + observedConsistency Consistency } var observedBatch *observation batch := session.Batch(LoggedBatch) + batch.SetConsistency(Quorum) batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) { if observedBatch != nil { t.Fatal("batch observe called more than once") } observedBatch = &observation{ - observedKeyspace: o.Keyspace, - observedStmts: o.Statements, - observedErr: o.Err, - observedValues: o.Values, + observedKeyspace: o.Keyspace, + observedStmts: o.Statements, + observedErr: o.Err, + observedValues: o.Values, + observedConsistency: o.Consistency, } })) for i := 0; i < 100; i++ { @@ -2040,6 +2064,10 @@ func TestBatchObserve(t *testing.T) { assertDeepEqual(t, "observed value", []interface{}{i}, observedBatch.observedValues[i]) } + + if observedBatch.observedConsistency != Quorum { + t.Fatalf("expecting consistency %s, got %s", batch.Cons, observedBatch.observedConsistency) + } } // TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra diff --git a/session.go b/session.go index c47e753d9..96d8c66ec 100644 --- a/session.go +++ b/session.go @@ -1130,16 +1130,17 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host if q.observer != nil { q.observer.ObserveQuery(q.Context(), ObservedQuery{ - Keyspace: keyspace, - Statement: q.stmt, - Values: q.values, - Start: start, - End: end, - Rows: iter.numRows, - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Keyspace: keyspace, + Statement: q.stmt, + Values: q.values, + Start: start, + End: end, + Rows: iter.numRows, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Consistency: q.cons, }) } } @@ -1980,10 +1981,11 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host Start: start, End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Consistency: b.Cons, }) } @@ -2218,6 +2220,9 @@ type ObservedQuery struct { // Attempt is the index of attempt at executing this query. // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + + // Consistency is a consistency of the query which is being attempted. + Consistency Consistency } // QueryObserver is the interface implemented by query observers / stat collectors. @@ -2255,6 +2260,9 @@ type ObservedBatch struct { // Attempt is the index of attempt at executing this query. // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + + // Consistency is a consistency of the batch which is being attempted. + Consistency Consistency } // BatchObserver is the interface implemented by batch observers / stat collectors.