Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `ObservedQuery.Consistency` and `ObservedBatch.Consistency` fields (CASSGO-53)

### Changed

- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)
Expand Down
50 changes: 39 additions & 11 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ func TestObserve(t *testing.T) {
}

var (
observedErr error
observedKeyspace string
observedStmt string
observedErr error
observedKeyspace string
observedStmt string
observedConsistency Consistency
)

const keyspace = "gocql_test"
Expand All @@ -184,12 +185,14 @@ func TestObserve(t *testing.T) {
observedErr = errors.New("placeholder only") // used to distinguish err=nil cases
observedKeyspace = ""
observedStmt = ""
observedConsistency = 0
}

observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) {
observedKeyspace = o.Keyspace
observedStmt = o.Statement
observedErr = o.Err
observedConsistency = o.Consistency
})

// select before inserted, will error but the reporting is err=nil as the query is valid
Expand All @@ -203,6 +206,8 @@ func TestObserve(t *testing.T) {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
} else if observedConsistency != Quorum {
t.Fatal("select: unexpected observed consistency", observedConsistency)
}

resetObserved()
Expand All @@ -214,6 +219,8 @@ func TestObserve(t *testing.T) {
t.Fatal("insert: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `INSERT INTO observe (id) VALUES (?)` {
t.Fatal("insert: unexpected observed stmt", observedStmt)
} else if observedConsistency != Quorum {
t.Fatal("select: unexpected observed consistency", observedConsistency)
}

resetObserved()
Expand All @@ -228,6 +235,8 @@ func TestObserve(t *testing.T) {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
} else if observedConsistency != Quorum {
t.Fatal("select: unexpected observed consistency", observedConsistency)
}

// also works from session observer
Expand All @@ -241,6 +250,8 @@ func TestObserve(t *testing.T) {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
} else if observedConsistency != Quorum {
t.Fatal("select: unexpected observed consistency", observedConsistency)
}

// reports errors when the query is poorly formed
Expand All @@ -254,6 +265,16 @@ func TestObserve(t *testing.T) {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM unknown_table WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
} else if observedConsistency != Quorum {
t.Fatal("select: unexpected observed consistency", observedConsistency)
}

resetObserved()
expectedConsistency := One
if err := session.Query(`SELECT id FROM unknown_table WHERE id = ?`, 42).Observer(observer).Consistency(expectedConsistency).Scan(&value); err == nil {
t.Fatal("select: expecting error")
} else if observedConsistency != expectedConsistency {
t.Fatalf("select: unexpected observed consistency %s, expected %s", observedConsistency, expectedConsistency)
}
}

Expand Down Expand Up @@ -1992,25 +2013,28 @@ func TestBatchObserve(t *testing.T) {
}

type observation struct {
observedErr error
observedKeyspace string
observedStmts []string
observedValues [][]interface{}
observedErr error
observedKeyspace string
observedStmts []string
observedValues [][]interface{}
observedConsistency Consistency
}

var observedBatch *observation

batch := session.Batch(LoggedBatch)
batch.SetConsistency(Quorum)
batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) {
if observedBatch != nil {
t.Fatal("batch observe called more than once")
}

observedBatch = &observation{
observedKeyspace: o.Keyspace,
observedStmts: o.Statements,
observedErr: o.Err,
observedValues: o.Values,
observedKeyspace: o.Keyspace,
observedStmts: o.Statements,
observedErr: o.Err,
observedValues: o.Values,
observedConsistency: o.Consistency,
}
}))
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -2040,6 +2064,10 @@ func TestBatchObserve(t *testing.T) {

assertDeepEqual(t, "observed value", []interface{}{i}, observedBatch.observedValues[i])
}

if observedBatch.observedConsistency != Quorum {
t.Fatalf("expecting consistency %s, got %s", batch.Cons, observedBatch.observedConsistency)
}
}

// TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra
Expand Down
36 changes: 22 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,16 +1130,17 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host

if q.observer != nil {
q.observer.ObserveQuery(q.Context(), ObservedQuery{
Keyspace: keyspace,
Statement: q.stmt,
Values: q.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Keyspace: keyspace,
Statement: q.stmt,
Values: q.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Consistency: q.cons,
})
}
}
Expand Down Expand Up @@ -1980,10 +1981,11 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
Start: start,
End: end,
// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Consistency: b.Cons,
})
}

Expand Down Expand Up @@ -2218,6 +2220,9 @@ type ObservedQuery struct {
// Attempt is the index of attempt at executing this query.
// The first attempt is number zero and any retries have non-zero attempt number.
Attempt int

// Consistency is a consistency of the query which is being attempted.
Consistency Consistency
}

// QueryObserver is the interface implemented by query observers / stat collectors.
Expand Down Expand Up @@ -2255,6 +2260,9 @@ type ObservedBatch struct {
// Attempt is the index of attempt at executing this query.
// The first attempt is number zero and any retries have non-zero attempt number.
Attempt int

// Consistency is a consistency of the batch which is being attempted.
Consistency Consistency
}

// BatchObserver is the interface implemented by batch observers / stat collectors.
Expand Down
Loading