Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,16 @@ type ClusterConfig struct {
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
QueryObserver QueryObserver

// Similar to QueryObserver but tracks total latency for all attempts.
QueryRequestObserver QueryObserver

// BatchObserver will set the provided batch observer on all queries created from this session.
// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
BatchObserver BatchObserver

// Similar to BatchObserver but tracks total latency for all attempts.
BatchRequestObserver BatchObserver

// ConnectObserver will set the provided connect observer on all queries
// created from this session.
ConnectObserver ConnectObserver
Expand Down
10 changes: 5 additions & 5 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp S
return nil
}

func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
func (q *queryExecutor) executeQuery(qry ExecutableQuery) *Iter {
var hostIter NextHost

// check if the host id is specified for the query,
Expand All @@ -110,7 +110,7 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
// it is, we force the policy to NonSpeculative
sp := qry.speculativeExecutionPolicy()
if qry.GetHostID() != "" || !qry.IsIdempotent() || sp.Attempts() == 0 {
return q.do(qry.Context(), qry, hostIter), nil
return q.do(qry.Context(), qry, hostIter)
}

// When speculative execution is enabled, we could be accessing the host iterator from multiple goroutines below.
Expand All @@ -136,14 +136,14 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
// execution, on a timer. So Speculation{2} would make 3 executions running
// in total.
if iter := q.speculate(ctx, qry, sp, hostIter, results); iter != nil {
return iter, nil
return iter
}

select {
case iter := <-results:
return iter, nil
return iter
case <-ctx.Done():
return &Iter{err: ctx.Err()}, nil
return &Iter{err: ctx.Err()}
}
}

Expand Down
100 changes: 62 additions & 38 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ import (
// and automatically sets a default consistency level on all operations
// that do not have a consistency level set.
type Session struct {
cons Consistency
pageSize int
prefetch float64
routingKeyInfoCache routingKeyInfoLRU
schemaDescriber *schemaDescriber
trace Tracer
queryObserver QueryObserver
batchObserver BatchObserver
connectObserver ConnectObserver
frameObserver FrameHeaderObserver
streamObserver StreamObserver
hostSource *ringDescriber
ringRefresher *refreshDebouncer
stmtsLRU *preparedLRU
cons Consistency
pageSize int
prefetch float64
routingKeyInfoCache routingKeyInfoLRU
schemaDescriber *schemaDescriber
trace Tracer
queryObserver QueryObserver
queryRequestObserver QueryObserver
batchObserver BatchObserver
batchRequestObserver BatchObserver
connectObserver ConnectObserver
frameObserver FrameHeaderObserver
streamObserver StreamObserver
hostSource *ringDescriber
ringRefresher *refreshDebouncer
stmtsLRU *preparedLRU

connCfg *ConnConfig

Expand Down Expand Up @@ -187,7 +189,9 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
}

s.queryObserver = cfg.QueryObserver
s.queryRequestObserver = cfg.QueryRequestObserver
s.batchObserver = cfg.BatchObserver
s.batchRequestObserver = cfg.BatchRequestObserver
s.connectObserver = cfg.ConnectObserver
s.frameObserver = cfg.FrameHeaderObserver
s.streamObserver = cfg.StreamObserver
Expand Down Expand Up @@ -548,15 +552,7 @@ func (s *Session) executeQuery(qry *Query) (it *Iter) {
return &Iter{err: ErrSessionClosed}
}

iter, err := s.executor.executeQuery(qry)
if err != nil {
return &Iter{err: err}
}
if iter == nil {
panic("nil iter")
}

return iter
return s.executor.executeQuery(qry)
}

func (s *Session) removeHost(h *HostInfo) {
Expand Down Expand Up @@ -767,10 +763,9 @@ func (s *Session) executeBatch(batch *Batch) *Iter {
return &Iter{err: ErrTooManyStmts}
}

iter, err := s.executor.executeQuery(batch)
if err != nil {
return &Iter{err: err}
}
start := time.Now()
iter := s.executor.executeQuery(batch)
batch.observeRequest(start, time.Now(), iter)

return iter
}
Expand Down Expand Up @@ -939,6 +934,7 @@ type Query struct {
prefetch float64
trace Tracer
observer QueryObserver
requestObserver QueryObserver
session *Session
conn *Conn
rt RetryPolicy
Expand Down Expand Up @@ -991,6 +987,7 @@ func (q *Query) defaultsFromSession() {
q.pageSize = s.pageSize
q.trace = s.trace
q.observer = s.queryObserver
q.requestObserver = s.queryRequestObserver
q.prefetch = s.prefetch
q.rt = s.cfg.RetryPolicy
q.serialCons = s.cfg.SerialConsistency
Expand Down Expand Up @@ -1167,6 +1164,20 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host
}
}

func (q *Query) observeRequest(start, end time.Time, iter *Iter) {
if q.requestObserver != nil {
q.requestObserver.ObserveQuery(q.Context(), ObservedQuery{
Keyspace: q.session.pool.keyspace,
Statement: q.stmt,
Values: q.values,
Start: start,
End: end,
Rows: iter.numRows,
Err: iter.err,
})
}
}

func (q *Query) retryPolicy() RetryPolicy {
return q.rt
}
Expand Down Expand Up @@ -1352,12 +1363,20 @@ func (q *Query) Iter() *Iter {
if isUseStatement(q.stmt) {
return &Iter{err: ErrUseStmt}
}
return q.iter()
}

func (q *Query) iter() *Iter {
start := time.Now()
var iter *Iter
// if the query was specifically run on a connection then re-use that
// connection when fetching the next results
if q.conn != nil {
return q.conn.executeQuery(q.Context(), q)
iter = q.conn.executeQuery(q.Context(), q)
}
return q.session.executeQuery(q)
iter = q.session.executeQuery(q)
q.observeRequest(start, time.Now(), iter)
return iter
}

// MapScan executes the query, copies the columns of the first selected
Expand Down Expand Up @@ -1792,15 +1811,7 @@ func (n *nextIter) fetchAsync() {
}

func (n *nextIter) fetch() *Iter {
n.once.Do(func() {
// if the query was specifically run on a connection then re-use that
// connection when fetching the next results
if n.qry.conn != nil {
n.next = n.qry.conn.executeQuery(n.qry.Context(), n.qry)
} else {
n.next = n.qry.session.executeQuery(n.qry)
}
})
n.once.Do(func() { n.next = n.qry.iter() })
return n.next
}

Expand All @@ -1814,6 +1825,7 @@ type Batch struct {
spec SpeculativeExecutionPolicy
trace Tracer
observer BatchObserver
requestObserver BatchObserver
session *Session
serialCons Consistency
defaultTimestamp bool
Expand Down Expand Up @@ -1844,6 +1856,7 @@ func (s *Session) Batch(typ BatchType) *Batch {
serialCons: s.cfg.SerialConsistency,
trace: s.trace,
observer: s.batchObserver,
requestObserver: s.batchRequestObserver,
session: s,
Cons: s.cons,
defaultTimestamp: s.cfg.DefaultTimestamp,
Expand Down Expand Up @@ -2052,6 +2065,17 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
})
}

func (b *Batch) observeRequest(start, end time.Time, iter *Iter) {
if b.requestObserver != nil {
b.requestObserver.ObserveBatch(b.Context(), ObservedBatch{
Keyspace: b.session.pool.keyspace,
Start: start,
End: end,
Err: iter.err,
})
}
}

func (b *Batch) GetRoutingKey() ([]byte, error) {
if b.routingKey != nil {
return b.routingKey, nil
Expand Down