Skip to content

Commit 7b8415d

Browse files
committed
Make QueryMetrics (Latency and Attempts) opt-in.
Patch by João Reis; reviewed by TBD for CASSGO-90
1 parent a6e8291 commit 7b8415d

File tree

7 files changed

+115
-44
lines changed

7 files changed

+115
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3636
#### 2.0.0
3737

3838
- Remove release date from changelog and add 2.0.0-rc1 (CASSGO-86)
39+
- Make Query Metrics (Latency and Attempts) opt-in (CASSGO-90)
3940

4041
#### 2.0.0-rc1
4142

cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,11 @@ type ClusterConfig struct {
280280
// default: 0.25.
281281
NextPagePrefetch float64
282282

283+
// WithQueryMetrics sets whether query metrics should be enabled for all queries by default.
284+
// Query metrics are available through Iter.Latency and Iter.Attempts.
285+
// default: false.
286+
WithQueryMetrics bool
287+
283288
// RegisteredTypes will be copied for all sessions created from this Cluster.
284289
// If not provided, a copy of GlobalTypes will be used.
285290
RegisteredTypes *RegisteredTypes

conn.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1665,7 +1665,11 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
16651665
newQry := new(internalQuery)
16661666
*newQry = *q
16671667
newQry.pageState = copyBytes(x.meta.pagingState)
1668-
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
1668+
if qryOpts.withQueryMetrics {
1669+
newQry.metrics = &queryMetricsImpl{m: make(map[string]*hostMetrics)}
1670+
} else {
1671+
newQry.metrics = nilQueryMetrics
1672+
}
16691673

16701674
iter.next = &nextIter{
16711675
q: newQry,

conn_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func TestCancel(t *testing.T) {
348348
}
349349

350350
type testQueryObserver struct {
351-
metrics map[string]*hostMetrics
351+
metrics map[string]hostMetrics
352352
logger StructuredLogger
353353
}
354354

@@ -365,7 +365,7 @@ func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
365365
newLogFieldError("err", q.Err))
366366
}
367367

368-
func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics {
368+
func (o *testQueryObserver) GetMetrics(host *HostInfo) hostMetrics {
369369
return o.metrics[host.ConnectAddress().String()]
370370
}
371371

@@ -395,7 +395,7 @@ func TestQueryRetry(t *testing.T) {
395395

396396
rt := &SimpleRetryPolicy{NumRetries: 1}
397397

398-
qry := db.Query("kill").RetryPolicy(rt)
398+
qry := db.Query("kill").WithQueryMetrics(true).RetryPolicy(rt)
399399
iter := qry.Iter()
400400
err = iter.Close()
401401
if err == nil {
@@ -443,8 +443,8 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
443443

444444
// 1 retry per host
445445
rt := &SimpleRetryPolicy{NumRetries: 3}
446-
observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), logger: log}
447-
qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true)
446+
observer := &testQueryObserver{metrics: make(map[string]hostMetrics), logger: log}
447+
qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true).WithQueryMetrics(true)
448448
iter := qry.Iter()
449449
err = iter.Close()
450450
if err == nil {
@@ -453,7 +453,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
453453

454454
for i, ip := range addresses {
455455
host := &HostInfo{connectAddress: net.ParseIP(ip)}
456-
queryMetric := iter.metrics.hostMetrics(host)
456+
queryMetric := iter.metrics.(*queryMetricsImpl).hostMetrics(host)
457457
observedMetrics := observer.GetMetrics(host)
458458

459459
requests := int(atomic.LoadInt64(&nodes[i].nKillReq))

control.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
582582
newLogFieldString("statement", statement), newLogFieldError("err", iter.err))
583583
}
584584

585-
iter.metrics.attempt(1, 0, c.getConn().host, false)
585+
iter.metrics.attempt(1, 0, c.getConn().host)
586586
if iter.err == nil || !c.retry.Attempt(qry) {
587587
break
588588
}

query_executor.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type internalRequest interface {
6161
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
6262
retryPolicy() RetryPolicy
6363
speculativeExecutionPolicy() SpeculativeExecutionPolicy
64-
getQueryMetrics() *queryMetrics
64+
getQueryMetrics() queryMetrics
6565
getRoutingInfo() *queryRoutingInfo
6666
getKeyspaceFunc() func() string
6767
RetryableQuery
@@ -280,6 +280,8 @@ type queryOptions struct {
280280
// Protocol flag
281281
disableSkipMetadata bool
282282

283+
withQueryMetrics bool
284+
283285
customPayload map[string][]byte
284286
prefetch float64
285287
rt RetryPolicy
@@ -330,14 +332,15 @@ func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
330332
nowInSecondsValue: q.nowInSecondsValue,
331333
keyspace: q.keyspace,
332334
hostID: q.hostID,
335+
withQueryMetrics: q.withQueryMetrics,
333336
}
334337
}
335338

336339
type internalQuery struct {
337340
originalQuery *Query
338341
qryOpts *queryOptions
339342
pageState []byte
340-
metrics *queryMetrics
343+
metrics queryMetrics
341344
conn *Conn
342345
consistency uint32
343346
session *Session
@@ -351,10 +354,16 @@ func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
351354
newPageState = make([]byte, len(pageState))
352355
copy(newPageState, pageState)
353356
}
357+
var metrics queryMetrics
358+
if q.withQueryMetrics {
359+
metrics = &queryMetricsImpl{m: make(map[string]*hostMetrics)}
360+
} else {
361+
metrics = nilQueryMetrics
362+
}
354363
return &internalQuery{
355364
originalQuery: q,
356365
qryOpts: newQueryOptions(q, ctx),
357-
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
366+
metrics: metrics,
358367
consistency: uint32(q.initialConsistency),
359368
pageState: newPageState,
360369
conn: nil,
@@ -370,7 +379,7 @@ func (q *internalQuery) Attempts() int {
370379

371380
func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
372381
latency := end.Sub(start)
373-
attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.qryOpts.observer != nil)
382+
attempt, metricsForHost := q.metrics.attempt(1, latency, host)
374383

375384
if q.qryOpts.observer != nil {
376385
q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{
@@ -381,7 +390,7 @@ func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Ite
381390
End: end,
382391
Rows: iter.numRows,
383392
Host: host,
384-
Metrics: metricsForHost,
393+
Metrics: *metricsForHost,
385394
Err: iter.err,
386395
Attempt: attempt,
387396
Query: q.originalQuery,
@@ -457,7 +466,7 @@ func (q *internalQuery) IsIdempotent() bool {
457466
return q.qryOpts.idempotent
458467
}
459468

460-
func (q *internalQuery) getQueryMetrics() *queryMetrics {
469+
func (q *internalQuery) getQueryMetrics() queryMetrics {
461470
return q.metrics
462471
}
463472

@@ -548,17 +557,23 @@ func newBatchOptions(b *Batch, ctx context.Context) *batchOptions {
548557
type internalBatch struct {
549558
originalBatch *Batch
550559
batchOpts *batchOptions
551-
metrics *queryMetrics
560+
metrics queryMetrics
552561
consistency uint32
553562
routingInfo *queryRoutingInfo
554563
session *Session
555564
}
556565

557566
func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
567+
var metrics queryMetrics
568+
if batch.withQueryMetrics {
569+
metrics = &queryMetricsImpl{m: make(map[string]*hostMetrics)}
570+
} else {
571+
metrics = nilQueryMetrics
572+
}
558573
return &internalBatch{
559574
originalBatch: batch,
560575
batchOpts: newBatchOptions(batch, ctx),
561-
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
576+
metrics: metrics,
562577
routingInfo: &queryRoutingInfo{},
563578
session: batch.session,
564579
consistency: uint32(batch.GetConsistency()),
@@ -572,7 +587,7 @@ func (b *internalBatch) Attempts() int {
572587

573588
func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
574589
latency := end.Sub(start)
575-
attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.batchOpts.observer != nil)
590+
attempt, metricsForHost := b.metrics.attempt(1, latency, host)
576591

577592
if b.batchOpts.observer == nil {
578593
return
@@ -594,7 +609,7 @@ func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Ite
594609
End: end,
595610
// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
596611
Host: host,
597-
Metrics: metricsForHost,
612+
Metrics: *metricsForHost,
598613
Err: iter.err,
599614
Attempt: attempt,
600615
Batch: b.originalBatch,
@@ -651,7 +666,7 @@ func (b *internalBatch) IsIdempotent() bool {
651666
return b.batchOpts.idempotent
652667
}
653668

654-
func (b *internalBatch) getQueryMetrics() *queryMetrics {
669+
func (b *internalBatch) getQueryMetrics() queryMetrics {
655670
return b.metrics
656671
}
657672

0 commit comments

Comments
 (0)