Skip to content

Commit bdb525b

Browse files
committed
Don't collect host metrics if a query/batch observer is not provided
Patch by João Reis; reviewed by Bohdan Siryk and James Hartig for CASSGO-90
1 parent a6e8291 commit bdb525b

File tree

8 files changed

+149
-116
lines changed

8 files changed

+149
-116
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
### Added
2121

22+
#### 2.0.0
23+
24+
- Don't collect host metrics if a query/batch observer is not provided (CASSGO-90)
25+
2226
#### 2.0.0-rc1
2327

2428
- Support vector type (CASSGO-11)

cassandra_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2061,6 +2061,19 @@ func TestQueryStats(t *testing.T) {
20612061
qry := session.Query("SELECT * FROM system.peers")
20622062
iter := qry.Iter()
20632063
err := iter.Close()
2064+
if err != nil {
2065+
t.Fatalf("query failed. %v", err)
2066+
} else {
2067+
if iter.Attempts() < 1 {
2068+
t.Fatal("expected at least 1 attempt, but got 0")
2069+
}
2070+
if iter.Latency() != 0 {
2071+
t.Fatalf("expected latency to be 0, but got %v instead.", iter.Latency())
2072+
}
2073+
}
2074+
qry = session.Query("SELECT * FROM system.peers")
2075+
iter = qry.Iter()
2076+
err = iter.Close()
20642077
if err != nil {
20652078
t.Fatalf("query failed. %v", err)
20662079
} else {
@@ -2103,6 +2116,18 @@ func TestBatchStats(t *testing.T) {
21032116
b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
21042117
iter := b.Iter()
21052118
err := iter.Close()
2119+
if err != nil {
2120+
t.Fatalf("query failed. %v", err)
2121+
} else {
2122+
if iter.Attempts() < 1 {
2123+
t.Fatal("expected at least 1 attempt, but got 0")
2124+
}
2125+
if iter.Latency() != 0 {
2126+
t.Fatalf("expected latency to be 0, but got %v instead.", iter.Latency())
2127+
}
2128+
}
2129+
iter = b.Iter()
2130+
err = iter.Close()
21062131
if err != nil {
21072132
t.Fatalf("query failed. %v", err)
21082133
} else {

conn.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1665,7 +1665,12 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
16651665
newQry := new(internalQuery)
16661666
*newQry = *q
16671667
newQry.pageState = copyBytes(x.meta.pagingState)
1668-
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
1668+
newQry.metrics = &queryMetrics{}
1669+
if newQry.qryOpts.observer != nil {
1670+
newQry.hostMetricsManager = newHostMetricsManager()
1671+
} else {
1672+
newQry.hostMetricsManager = emptyHostMetricsManager
1673+
}
16691674

16701675
iter.next = &nextIter{
16711676
q: newQry,

conn_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -450,28 +450,28 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
450450
if err == nil {
451451
t.Fatalf("expected error")
452452
}
453+
totalLatency := int64(0)
454+
totalAttempts := int64(0)
453455

454456
for i, ip := range addresses {
455457
host := &HostInfo{connectAddress: net.ParseIP(ip)}
456-
queryMetric := iter.metrics.hostMetrics(host)
457458
observedMetrics := observer.GetMetrics(host)
458-
459459
requests := int(atomic.LoadInt64(&nodes[i].nKillReq))
460-
hostAttempts := queryMetric.Attempts
461-
if requests != hostAttempts {
462-
t.Fatalf("expected requests %v to match query attempts %v", requests, hostAttempts)
463-
}
464460

465-
if hostAttempts != observedMetrics.Attempts {
466-
t.Fatalf("expected observed attempts %v to match query attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip)
461+
if requests != observedMetrics.Attempts {
462+
t.Fatalf("expected observed attempts %v to match server requests %v on host %v", observedMetrics.Attempts, requests, ip)
467463
}
468464

469-
hostLatency := queryMetric.TotalLatency
470465
observedLatency := observedMetrics.TotalLatency
471-
if hostLatency != observedLatency {
472-
t.Fatalf("expected observed latency %v to match query latency %v on host %v", observedLatency, hostLatency, ip)
473-
}
466+
totalLatency += observedLatency
467+
totalAttempts += int64(observedMetrics.Attempts)
474468
}
469+
470+
observedLatency := totalLatency / totalAttempts
471+
if observedLatency != iter.Latency() {
472+
t.Fatalf("expected observed latency %v (%v/%v) to match query latency %v", observedLatency, totalLatency, totalAttempts, iter.Latency())
473+
}
474+
475475
// the query will only be attempted once, but is being retried
476476
attempts := iter.Attempts()
477477
if attempts != rt.NumRetries {

control.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ func (c *controlConn) withConnHost(fn func(*connHost) *Iter) *Iter {
557557
return fn(ch)
558558
}
559559

560-
return newErrIter(errNoControl, newQueryMetrics(), "", nil, nil)
560+
return newErrIter(errNoControl, &queryMetrics{}, "", nil, nil)
561561
}
562562

563563
func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
@@ -582,7 +582,8 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
582582
newLogFieldString("statement", statement), newLogFieldError("err", iter.err))
583583
}
584584

585-
iter.metrics.attempt(1, 0, c.getConn().host, false)
585+
qry.metrics.attempt(0)
586+
qry.hostMetricsManager.attempt(0, c.getConn().host)
586587
if iter.err == nil || !c.retry.Attempt(qry) {
587588
break
588589
}
@@ -593,7 +594,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
593594

594595
func (c *controlConn) awaitSchemaAgreement() error {
595596
return c.withConn(func(conn *Conn) *Iter {
596-
return newErrIter(conn.awaitSchemaAgreement(context.TODO()), newQueryMetrics(), "", nil, nil)
597+
return newErrIter(conn.awaitSchemaAgreement(context.TODO()), &queryMetrics{}, "", nil, nil)
597598
}).err
598599
}
599600

policies_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ func TestSimpleRetryPolicy(t *testing.T) {
261261
}
262262

263263
for _, c := range cases {
264-
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
264+
q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
265+
q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
265266
if c.allow && !rt.Attempt(q) {
266267
t.Fatalf("should allow retry after %d attempts", c.attempts)
267268
}
@@ -345,7 +346,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
345346
}
346347

347348
for _, c := range cases {
348-
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
349+
q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
350+
q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
349351
if c.retryType != rt.GetRetryType(c.err) {
350352
t.Fatalf("retry type should be %v", c.retryType)
351353
}

query_executor.go

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -334,14 +334,15 @@ func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
334334
}
335335

336336
type internalQuery struct {
337-
originalQuery *Query
338-
qryOpts *queryOptions
339-
pageState []byte
340-
metrics *queryMetrics
341-
conn *Conn
342-
consistency uint32
343-
session *Session
344-
routingInfo *queryRoutingInfo
337+
originalQuery *Query
338+
qryOpts *queryOptions
339+
pageState []byte
340+
conn *Conn
341+
consistency uint32
342+
session *Session
343+
routingInfo *queryRoutingInfo
344+
metrics *queryMetrics
345+
hostMetricsManager hostMetricsManager
345346
}
346347

347348
func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
@@ -351,15 +352,22 @@ func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
351352
newPageState = make([]byte, len(pageState))
352353
copy(newPageState, pageState)
353354
}
355+
var hostMetricsMgr hostMetricsManager
356+
if q.observer != nil {
357+
hostMetricsMgr = newHostMetricsManager()
358+
} else {
359+
hostMetricsMgr = emptyHostMetricsManager
360+
}
354361
return &internalQuery{
355-
originalQuery: q,
356-
qryOpts: newQueryOptions(q, ctx),
357-
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
358-
consistency: uint32(q.initialConsistency),
359-
pageState: newPageState,
360-
conn: nil,
361-
session: q.session,
362-
routingInfo: &queryRoutingInfo{},
362+
originalQuery: q,
363+
qryOpts: newQueryOptions(q, ctx),
364+
metrics: &queryMetrics{},
365+
hostMetricsManager: hostMetricsMgr,
366+
consistency: uint32(q.initialConsistency),
367+
pageState: newPageState,
368+
conn: nil,
369+
session: q.session,
370+
routingInfo: &queryRoutingInfo{},
363371
}
364372
}
365373

@@ -370,9 +378,10 @@ func (q *internalQuery) Attempts() int {
370378

371379
func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
372380
latency := end.Sub(start)
373-
attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.qryOpts.observer != nil)
381+
attempt := q.metrics.attempt(latency)
374382

375383
if q.qryOpts.observer != nil {
384+
metricsForHost := q.hostMetricsManager.attempt(latency, host)
376385
q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{
377386
Keyspace: keyspace,
378387
Statement: q.qryOpts.stmt,
@@ -546,22 +555,30 @@ func newBatchOptions(b *Batch, ctx context.Context) *batchOptions {
546555
}
547556

548557
type internalBatch struct {
549-
originalBatch *Batch
550-
batchOpts *batchOptions
551-
metrics *queryMetrics
552-
consistency uint32
553-
routingInfo *queryRoutingInfo
554-
session *Session
558+
originalBatch *Batch
559+
batchOpts *batchOptions
560+
consistency uint32
561+
routingInfo *queryRoutingInfo
562+
session *Session
563+
metrics *queryMetrics
564+
hostMetricsManager hostMetricsManager
555565
}
556566

557567
func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
568+
var hostMetricsMgr hostMetricsManager
569+
if batch.observer != nil {
570+
hostMetricsMgr = newHostMetricsManager()
571+
} else {
572+
hostMetricsMgr = emptyHostMetricsManager
573+
}
558574
return &internalBatch{
559-
originalBatch: batch,
560-
batchOpts: newBatchOptions(batch, ctx),
561-
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
562-
routingInfo: &queryRoutingInfo{},
563-
session: batch.session,
564-
consistency: uint32(batch.GetConsistency()),
575+
originalBatch: batch,
576+
batchOpts: newBatchOptions(batch, ctx),
577+
routingInfo: &queryRoutingInfo{},
578+
session: batch.session,
579+
consistency: uint32(batch.GetConsistency()),
580+
metrics: &queryMetrics{},
581+
hostMetricsManager: hostMetricsMgr,
565582
}
566583
}
567584

@@ -572,12 +589,14 @@ func (b *internalBatch) Attempts() int {
572589

573590
func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
574591
latency := end.Sub(start)
575-
attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.batchOpts.observer != nil)
592+
attempt := b.metrics.attempt(latency)
576593

577594
if b.batchOpts.observer == nil {
578595
return
579596
}
580597

598+
metricsForHost := b.hostMetricsManager.attempt(latency, host)
599+
581600
statements := make([]string, len(b.batchOpts.entries))
582601
values := make([][]interface{}, len(b.batchOpts.entries))
583602

0 commit comments

Comments
 (0)