From d217f0dd018f16d62de3e3cef42647c782c14a42 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Mon, 10 Feb 2025 15:10:14 +0200 Subject: [PATCH] Added ObservedQuery.Consistency and ObservedBatch.Consistency fields During retries, consistency of the query may be changed by retry policy implementations (e.g. DowngradingConsistencyRetryPolicy). To observe consistency changes new ObservedQuery.Consistency and ObservedBatch.Consistency fields were added. Patch by Bohdan Siryk; Reviewed by <> for CASSGO-53 --- CHANGELOG.md | 2 ++ cassandra_test.go | 50 ++++++++++++++++++++++++++++++++++++----------- session.go | 36 +++++++++++++++++++++------------- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cfff094a..485fe6320 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `ObservedQuery.Consistency` and `ObservedBatch.Consistency` fields (CASSGO-53) + ### Changed - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) diff --git a/cassandra_test.go b/cassandra_test.go index 545b307e0..dcd5694a5 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -173,9 +173,10 @@ func TestObserve(t *testing.T) { } var ( - observedErr error - observedKeyspace string - observedStmt string + observedErr error + observedKeyspace string + observedStmt string + observedConsistency Consistency ) const keyspace = "gocql_test" @@ -184,12 +185,14 @@ func TestObserve(t *testing.T) { observedErr = errors.New("placeholder only") // used to distinguish err=nil cases observedKeyspace = "" observedStmt = "" + observedConsistency = 0 } observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) { observedKeyspace = o.Keyspace observedStmt = o.Statement observedErr = o.Err + observedConsistency = o.Consistency }) // select before inserted, will error but the reporting is err=nil as the query is valid @@ -203,6 +206,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } resetObserved() @@ -214,6 +219,8 @@ func TestObserve(t *testing.T) { t.Fatal("insert: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `INSERT INTO observe (id) VALUES (?)` { t.Fatal("insert: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } resetObserved() @@ -228,6 +235,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } // also works from session observer @@ -241,6 +250,8 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM observe WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) } // reports errors when the query is poorly formed @@ -254,6 +265,16 @@ func TestObserve(t *testing.T) { t.Fatal("select: unexpected observed keyspace", observedKeyspace) } else if observedStmt != `SELECT id FROM unknown_table WHERE id = ?` { t.Fatal("select: unexpected observed stmt", observedStmt) + } else if observedConsistency != Quorum { + t.Fatal("select: unexpected observed consistency", observedConsistency) + } + + resetObserved() + expectedConsistency := One + if err := session.Query(`SELECT id FROM unknown_table WHERE id = ?`, 42).Observer(observer).Consistency(expectedConsistency).Scan(&value); err == nil { + t.Fatal("select: expecting error") + } else if observedConsistency != expectedConsistency { + t.Fatalf("select: unexpected observed consistency %s, expected %s", observedConsistency, expectedConsistency) } } @@ -1992,25 +2013,28 @@ func TestBatchObserve(t *testing.T) { } type observation struct { - observedErr error - observedKeyspace string - observedStmts []string - observedValues [][]interface{} + observedErr error + observedKeyspace string + observedStmts []string + observedValues [][]interface{} + observedConsistency Consistency } var observedBatch *observation batch := session.Batch(LoggedBatch) + batch.SetConsistency(Quorum) batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) { if observedBatch != nil { t.Fatal("batch observe called more than once") } observedBatch = &observation{ - observedKeyspace: o.Keyspace, - observedStmts: o.Statements, - observedErr: o.Err, - observedValues: o.Values, + observedKeyspace: o.Keyspace, + observedStmts: o.Statements, + observedErr: o.Err, + observedValues: o.Values, + observedConsistency: o.Consistency, } })) for i := 0; i < 100; i++ { @@ -2040,6 +2064,10 @@ func TestBatchObserve(t *testing.T) { assertDeepEqual(t, "observed value", []interface{}{i}, observedBatch.observedValues[i]) } + + if observedBatch.observedConsistency != Quorum { + t.Fatalf("expecting consistency %s, got %s", batch.Cons, observedBatch.observedConsistency) + } } // TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra diff --git a/session.go b/session.go index c47e753d9..96d8c66ec 100644 --- a/session.go +++ b/session.go @@ -1130,16 +1130,17 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host if q.observer != nil { q.observer.ObserveQuery(q.Context(), ObservedQuery{ - Keyspace: keyspace, - Statement: q.stmt, - Values: q.values, - Start: start, - End: end, - Rows: iter.numRows, - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Keyspace: keyspace, + Statement: q.stmt, + Values: q.values, + Start: start, + End: end, + Rows: iter.numRows, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Consistency: q.cons, }) } } @@ -1980,10 +1981,11 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host Start: start, End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Consistency: b.Cons, }) } @@ -2218,6 +2220,9 @@ type ObservedQuery struct { // Attempt is the index of attempt at executing this query. // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + + // Consistency is a consistency of the query which is being attempted. + Consistency Consistency } // QueryObserver is the interface implemented by query observers / stat collectors. @@ -2255,6 +2260,9 @@ type ObservedBatch struct { // Attempt is the index of attempt at executing this query. // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + + // Consistency is a consistency of the batch which is being attempted. + Consistency Consistency } // BatchObserver is the interface implemented by batch observers / stat collectors.