diff --git a/CHANGELOG.md b/CHANGELOG.md index 19f353ad8..1e95fbeb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +#### 2.0.0 + +- Don't collect host metrics if a query/batch observer is not provided (CASSGO-90) + #### 2.0.0-rc1 - Support vector type (CASSGO-11) diff --git a/cassandra_test.go b/cassandra_test.go index b92c3cd62..cb6607551 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2068,7 +2068,7 @@ func TestQueryStats(t *testing.T) { t.Fatal("expected at least 1 attempt, but got 0") } if iter.Latency() <= 0 { - t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency()) + t.Fatalf("expected latency to be > 0, but got %v instead.", iter.Latency()) } } } diff --git a/conn.go b/conn.go index 6cfb6fe9a..c728247d9 100644 --- a/conn.go +++ b/conn.go @@ -1665,7 +1665,12 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter { newQry := new(internalQuery) *newQry = *q newQry.pageState = copyBytes(x.meta.pagingState) - newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} + newQry.metrics = &queryMetrics{} + if newQry.qryOpts.observer != nil { + newQry.hostMetricsManager = newHostMetricsManager() + } else { + newQry.hostMetricsManager = emptyHostMetricsManager + } iter.next = &nextIter{ q: newQry, diff --git a/conn_test.go b/conn_test.go index d3271acca..3646dcc85 100644 --- a/conn_test.go +++ b/conn_test.go @@ -450,28 +450,28 @@ func TestQueryMultinodeWithMetrics(t *testing.T) { if err == nil { t.Fatalf("expected error") } + totalLatency := int64(0) + totalAttempts := int64(0) for i, ip := range addresses { host := &HostInfo{connectAddress: net.ParseIP(ip)} - queryMetric := iter.metrics.hostMetrics(host) observedMetrics := observer.GetMetrics(host) - requests := int(atomic.LoadInt64(&nodes[i].nKillReq)) - hostAttempts := queryMetric.Attempts - if requests != hostAttempts { - t.Fatalf("expected requests %v to match query attempts %v", requests, hostAttempts) - } - if hostAttempts != observedMetrics.Attempts { - t.Fatalf("expected observed attempts %v to match query attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip) + if requests != observedMetrics.Attempts { + t.Fatalf("expected observed attempts %v to match server requests %v on host %v", observedMetrics.Attempts, requests, ip) } - hostLatency := queryMetric.TotalLatency observedLatency := observedMetrics.TotalLatency - if hostLatency != observedLatency { - t.Fatalf("expected observed latency %v to match query latency %v on host %v", observedLatency, hostLatency, ip) - } + totalLatency += observedLatency + totalAttempts += int64(observedMetrics.Attempts) } + + observedLatency := totalLatency / totalAttempts + if observedLatency != iter.Latency() { + t.Fatalf("expected observed latency %v (%v/%v) to match query latency %v", observedLatency, totalLatency, totalAttempts, iter.Latency()) + } + // the query will only be attempted once, but is being retried attempts := iter.Attempts() if attempts != rt.NumRetries { diff --git a/control.go b/control.go index b521c41d8..21ec27f58 100644 --- a/control.go +++ b/control.go @@ -557,7 +557,7 @@ func (c *controlConn) withConnHost(fn func(*connHost) *Iter) *Iter { return fn(ch) } - return newErrIter(errNoControl, newQueryMetrics(), "", nil, nil) + return newErrIter(errNoControl, &queryMetrics{}, "", nil, nil) } func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter { @@ -582,7 +582,8 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter newLogFieldString("statement", statement), newLogFieldError("err", iter.err)) } - iter.metrics.attempt(1, 0, c.getConn().host, false) + qry.metrics.attempt(0) + qry.hostMetricsManager.attempt(0, c.getConn().host) if iter.err == nil || !c.retry.Attempt(qry) { break } @@ -593,7 +594,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter func (c *controlConn) awaitSchemaAgreement() error { return c.withConn(func(conn *Conn) *Iter { - return newErrIter(conn.awaitSchemaAgreement(context.TODO()), newQueryMetrics(), "", nil, nil) + return newErrIter(conn.awaitSchemaAgreement(context.TODO()), &queryMetrics{}, "", nil, nil) }).err } diff --git a/policies_test.go b/policies_test.go index ab40a62a6..62b3ad127 100644 --- a/policies_test.go +++ b/policies_test.go @@ -261,7 +261,8 @@ func TestSimpleRetryPolicy(t *testing.T) { } for _, c := range cases { - q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) + q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)} + q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) if c.allow && !rt.Attempt(q) { t.Fatalf("should allow retry after %d attempts", c.attempts) } @@ -345,7 +346,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) { } for _, c := range cases { - q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) + q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)} + q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) if c.retryType != rt.GetRetryType(c.err) { t.Fatalf("retry type should be %v", c.retryType) } diff --git a/query_executor.go b/query_executor.go index 552d0b97c..35422ffeb 100644 --- a/query_executor.go +++ b/query_executor.go @@ -334,14 +334,15 @@ func newQueryOptions(q *Query, ctx context.Context) *queryOptions { } type internalQuery struct { - originalQuery *Query - qryOpts *queryOptions - pageState []byte - metrics *queryMetrics - conn *Conn - consistency uint32 - session *Session - routingInfo *queryRoutingInfo + originalQuery *Query + qryOpts *queryOptions + pageState []byte + conn *Conn + consistency uint32 + session *Session + routingInfo *queryRoutingInfo + metrics *queryMetrics + hostMetricsManager hostMetricsManager } func newInternalQuery(q *Query, ctx context.Context) *internalQuery { @@ -351,15 +352,22 @@ func newInternalQuery(q *Query, ctx context.Context) *internalQuery { newPageState = make([]byte, len(pageState)) copy(newPageState, pageState) } + var hostMetricsMgr hostMetricsManager + if q.observer != nil { + hostMetricsMgr = newHostMetricsManager() + } else { + hostMetricsMgr = emptyHostMetricsManager + } return &internalQuery{ - originalQuery: q, - qryOpts: newQueryOptions(q, ctx), - metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, - consistency: uint32(q.initialConsistency), - pageState: newPageState, - conn: nil, - session: q.session, - routingInfo: &queryRoutingInfo{}, + originalQuery: q, + qryOpts: newQueryOptions(q, ctx), + metrics: &queryMetrics{}, + hostMetricsManager: hostMetricsMgr, + consistency: uint32(q.initialConsistency), + pageState: newPageState, + conn: nil, + session: q.session, + routingInfo: &queryRoutingInfo{}, } } @@ -370,9 +378,10 @@ func (q *internalQuery) Attempts() int { func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { latency := end.Sub(start) - attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.qryOpts.observer != nil) + attempt := q.metrics.attempt(latency) if q.qryOpts.observer != nil { + metricsForHost := q.hostMetricsManager.attempt(latency, host) q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{ Keyspace: keyspace, Statement: q.qryOpts.stmt, @@ -546,22 +555,30 @@ func newBatchOptions(b *Batch, ctx context.Context) *batchOptions { } type internalBatch struct { - originalBatch *Batch - batchOpts *batchOptions - metrics *queryMetrics - consistency uint32 - routingInfo *queryRoutingInfo - session *Session + originalBatch *Batch + batchOpts *batchOptions + consistency uint32 + routingInfo *queryRoutingInfo + session *Session + metrics *queryMetrics + hostMetricsManager hostMetricsManager } func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch { + var hostMetricsMgr hostMetricsManager + if batch.observer != nil { + hostMetricsMgr = newHostMetricsManager() + } else { + hostMetricsMgr = emptyHostMetricsManager + } return &internalBatch{ - originalBatch: batch, - batchOpts: newBatchOptions(batch, ctx), - metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, - routingInfo: &queryRoutingInfo{}, - session: batch.session, - consistency: uint32(batch.GetConsistency()), + originalBatch: batch, + batchOpts: newBatchOptions(batch, ctx), + routingInfo: &queryRoutingInfo{}, + session: batch.session, + consistency: uint32(batch.GetConsistency()), + metrics: &queryMetrics{}, + hostMetricsManager: hostMetricsMgr, } } @@ -572,12 +589,14 @@ func (b *internalBatch) Attempts() int { func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { latency := end.Sub(start) - attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.batchOpts.observer != nil) + attempt := b.metrics.attempt(latency) if b.batchOpts.observer == nil { return } + metricsForHost := b.hostMetricsManager.attempt(latency, host) + statements := make([]string, len(b.batchOpts.entries)) values := make([][]interface{}, len(b.batchOpts.entries)) diff --git a/session.go b/session.go index 4e83e5e5f..625dac4a9 100644 --- a/session.go +++ b/session.go @@ -389,7 +389,7 @@ func (s *Session) AwaitSchemaAgreement(ctx context.Context) error { return errNoControl } return s.control.withConn(func(conn *Conn) *Iter { - return newErrIter(conn.awaitSchemaAgreement(ctx), newQueryMetrics(), "", nil, nil) + return newErrIter(conn.awaitSchemaAgreement(ctx), &queryMetrics{}, "", nil, nil) }).err } @@ -861,40 +861,48 @@ type hostMetrics struct { } type queryMetrics struct { - l sync.RWMutex - m map[string]*hostMetrics - // totalAttempts is total number of attempts. - // Equal to sum of all hostMetrics' Attempts. - totalAttempts int + totalAttempts int64 + totalLatency int64 } -func newQueryMetrics() *queryMetrics { - return &queryMetrics{m: make(map[string]*hostMetrics)} +func (qm *queryMetrics) attempt(addLatency time.Duration) int { + atomic.AddInt64(&qm.totalLatency, addLatency.Nanoseconds()) + return int(atomic.AddInt64(&qm.totalAttempts, 1) - 1) +} + +func (qm *queryMetrics) attempts() int { + return int(atomic.LoadInt64(&qm.totalAttempts)) } -// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data. -func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics { - qm := &queryMetrics{m: m} - for _, hm := range qm.m { - qm.totalAttempts += hm.Attempts +func (qm *queryMetrics) latency() int64 { + attempts := atomic.LoadInt64(&qm.totalAttempts) + if attempts == 0 { + return atomic.LoadInt64(&qm.totalLatency) } - return qm + return atomic.LoadInt64(&qm.totalLatency) / attempts } -// hostMetrics returns a snapshot of metrics for given host. -// If the metrics for host don't exist, they are created. -func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics { - qm.l.Lock() - metrics := qm.hostMetricsLocked(host) - copied := new(hostMetrics) - *copied = *metrics - qm.l.Unlock() - return copied +type hostMetricsManager interface { + attempt(addLatency time.Duration, host *HostInfo) *hostMetrics +} + +type hostMetricsManagerImpl struct { + l sync.RWMutex + m map[string]*hostMetrics +} + +func newHostMetricsManager() *hostMetricsManagerImpl { + return &hostMetricsManagerImpl{m: make(map[string]*hostMetrics)} +} + +// preFilledHostMetricsMetricsManager initializes new hostMetrics based on per-host supplied data. +func preFilledHostMetricsMetricsManager(m map[string]*hostMetrics) *hostMetricsManagerImpl { + return &hostMetricsManagerImpl{m: m} } // hostMetricsLocked gets or creates host metrics for given host. // It must be called only while holding qm.l lock. -func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { +func (qm *hostMetricsManagerImpl) hostMetricsLocked(host *HostInfo) *hostMetrics { metrics, exists := qm.m[host.ConnectAddress().String()] if !exists { // if the host is not in the map, it means it's been accessed for the first time @@ -905,53 +913,22 @@ func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { return metrics } -// attempts returns the number of times the query was executed. -func (qm *queryMetrics) attempts() int { +func (qm *hostMetricsManagerImpl) attempt(addLatency time.Duration, host *HostInfo) *hostMetrics { qm.l.Lock() - attempts := qm.totalAttempts - qm.l.Unlock() - return attempts -} - -func (qm *queryMetrics) latency() int64 { - qm.l.Lock() - var ( - attempts int - latency int64 - ) - for _, metric := range qm.m { - attempts += metric.Attempts - latency += metric.TotalLatency - } + updateHostMetrics := qm.hostMetricsLocked(host) + updateHostMetrics.Attempts += 1 + updateHostMetrics.TotalLatency += addLatency.Nanoseconds() qm.l.Unlock() - if attempts > 0 { - return latency / int64(attempts) - } - return 0 + return updateHostMetrics } -// attempt adds given number of attempts and latency for given host. -// It returns previous total attempts. -// If needsHostMetrics is true, a copy of updated hostMetrics is returned. -func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration, - host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) { - qm.l.Lock() - - totalAttempts := qm.totalAttempts - qm.totalAttempts += addAttempts - - updateHostMetrics := qm.hostMetricsLocked(host) - updateHostMetrics.Attempts += addAttempts - updateHostMetrics.TotalLatency += addLatency.Nanoseconds() +var emptyHostMetricsManager = &emptyHostMetricsManagerImpl{} - var hostMetricsCopy *hostMetrics - if needsHostMetrics { - hostMetricsCopy = new(hostMetrics) - *hostMetricsCopy = *updateHostMetrics - } +type emptyHostMetricsManagerImpl struct { +} - qm.l.Unlock() - return totalAttempts, hostMetricsCopy +func (qm *emptyHostMetricsManagerImpl) attempt(_ time.Duration, _ *HostInfo) *hostMetrics { + return nil } // Query represents a CQL statement that can be executed. @@ -1297,7 +1274,7 @@ func (q *Query) Iter() *Iter { // over all results. func (q *Query) IterContext(ctx context.Context) *Iter { if isUseStatement(q.stmt) { - return newErrIter(ErrUseStmt, newQueryMetrics(), q.Keyspace(), nil, q.getKeyspace) + return newErrIter(ErrUseStmt, &queryMetrics{}, q.Keyspace(), nil, q.getKeyspace) } internalQry := newInternalQuery(q, ctx)