Skip to content

Commit ff18fa1

Browse files
committed
feat(metrics): dual-emit timer metrics as histograms (batch 7)
Signed-off-by: Neil Xie <neil.xie@uber.com>
1 parent 8e0a703 commit ff18fa1

22 files changed

Lines changed: 329 additions & 44 deletions

File tree

common/asyncworkflow/queue/consumer/default_consumer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,12 @@ func (c *DefaultConsumer) processMessage(msg messaging.Message) {
156156
logger := c.logger.WithTags(tag.Dynamic("partition", msg.Partition()), tag.Dynamic("offset", msg.Offset()))
157157
logger.Debug("Received message")
158158

159+
asyncProcessStart := time.Now()
159160
sw := c.scope.StartTimer(metrics.AsyncWorkflowProcessMsgLatency)
160-
defer sw.Stop()
161+
defer func() {
162+
sw.Stop()
163+
c.scope.RecordHistogramDuration(metrics.AsyncWorkflowProcessMsgLatencyHistogram, time.Since(asyncProcessStart))
164+
}()
161165

162166
var request sqlblobs.AsyncRequestMessage
163167
if err := c.msgDecoder.Decode(msg.Value(), &request); err != nil {

common/metrics/config.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,80 @@ var HistogramMigrationMetrics = map[string]struct{}{
265265
"direct_query_dispatch_non_sticky_latency_ns": {},
266266
"direct_query_dispatch_clear_stickiness_latency": {},
267267
"direct_query_dispatch_clear_stickiness_latency_ns": {},
268+
"cadence_authorization_latency": {},
269+
"cadence_authorization_latency_ns": {},
270+
271+
"pinot_latency": {},
272+
"pinot_latency_ns": {},
273+
"pinot_latency_per_domain": {},
274+
"pinot_latency_per_domain_ns": {},
275+
276+
"sequentialtask_submit_latency": {},
277+
"sequentialtask_submit_latency_ns": {},
278+
"sequentialtask_queue_size": {},
279+
"sequentialtask_queue_size_counts": {},
280+
"sequentialtask_queue_processing_latency": {},
281+
"sequentialtask_queue_processing_latency_ns": {},
282+
"sequentialtask_task_processing_latency": {},
283+
"sequentialtask_task_processing_latency_ns": {},
284+
285+
"prioritytask_submit_latency": {},
286+
"prioritytask_submit_latency_ns": {},
287+
288+
"graceful_failover_latency": {},
289+
"graceful_failover_latency_ns": {},
290+
291+
"async_request_payload_size_per_domain": {},
292+
"async_request_payload_size_per_domain_counts": {},
293+
294+
"task_redispatch_queue_pending_tasks": {},
295+
"task_redispatch_queue_pending_tasks_counts": {},
296+
297+
"workflow_context_lock_latency": {},
298+
"workflow_context_lock_latency_ns": {},
299+
300+
"get_replication_messages_for_shard": {},
301+
"get_replication_messages_for_shard_ns": {},
302+
"get_dlq_replication_messages": {},
303+
"get_dlq_replication_messages_ns": {},
304+
305+
"decision_task_query_latency": {},
306+
"decision_task_query_latency_ns": {},
307+
308+
"syncmatch_latency_per_tl": {},
309+
"syncmatch_latency_per_tl_ns": {},
310+
"asyncmatch_latency_per_tl": {},
311+
"asyncmatch_latency_per_tl_ns": {},
312+
313+
"asyncmatch_local_poll_attempt_per_tl": {},
314+
"asyncmatch_local_poll_attempt_per_tl_counts": {},
315+
"asyncmatch_forward_poll_attempt_per_tl": {},
316+
"asyncmatch_forward_poll_attempt_per_tl_counts": {},
317+
"asyncmatch_local_poll_after_forward_failed_attempt_per_tl": {},
318+
"asyncmatch_local_poll_after_forward_failed_attempt_per_tl_counts": {},
319+
320+
"poll_local_match_latency_per_tl": {},
321+
"poll_local_match_latency_per_tl_ns": {},
322+
"poll_forward_match_latency_per_tl": {},
323+
"poll_forward_match_latency_per_tl_ns": {},
324+
"poll_local_match_after_forward_failed_latency_per_tl": {},
325+
"poll_local_match_after_forward_failed_latency_per_tl_ns": {},
326+
327+
"es_processor_process_msg_latency": {},
328+
"es_processor_process_msg_latency_ns": {},
329+
"index_processor_process_msg_latency": {},
330+
"index_processor_process_msg_latency_ns": {},
331+
332+
"async_workflow_process_msg_latency": {},
333+
"async_workflow_process_msg_latency_ns": {},
334+
"diagnostics_workflow_execution_latency": {},
335+
"diagnostics_workflow_execution_latency_ns": {},
336+
337+
"shard_distributor_latency": {},
338+
"shard_distributor_latency_ns": {},
339+
340+
"global_ratelimiter_update_latency": {},
341+
"global_ratelimiter_update_latency_ns": {},
268342
}
269343

270344
func (h HistogramMigration) EmitTimer(name string) bool {

common/metrics/defs.go

Lines changed: 62 additions & 6 deletions
Large diffs are not rendered by default.

common/persistence/pinot/pinot_visibility_metric_clients.go

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package pinotvisibility
2222

2323
import (
2424
"context"
25+
"time"
2526

2627
"github.com/uber/cadence/common/log"
2728
"github.com/uber/cadence/common/log/tag"
@@ -59,8 +60,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionStarted(
5960
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionStartedScope, metrics.DomainTag(request.Domain))
6061
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
6162

63+
pinotStart := time.Now()
6264
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
63-
defer sw.Stop()
65+
defer func() {
66+
sw.Stop()
67+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
68+
}()
6469
err := p.persistence.RecordWorkflowExecutionStarted(ctx, request)
6570

6671
if err != nil {
@@ -78,8 +83,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionClosed(
7883
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionClosedScope, metrics.DomainTag(request.Domain))
7984
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
8085

86+
pinotStart := time.Now()
8187
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
82-
defer sw.Stop()
88+
defer func() {
89+
sw.Stop()
90+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
91+
}()
8392
err := p.persistence.RecordWorkflowExecutionClosed(ctx, request)
8493

8594
if err != nil {
@@ -97,8 +106,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionUninitialized(
97106
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionUninitializedScope, metrics.DomainTag(request.Domain))
98107
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
99108

109+
pinotStart := time.Now()
100110
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
101-
defer sw.Stop()
111+
defer func() {
112+
sw.Stop()
113+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
114+
}()
102115
err := p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)
103116

104117
if err != nil {
@@ -116,8 +129,12 @@ func (p *pinotVisibilityMetricsClient) UpsertWorkflowExecution(
116129
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotUpsertWorkflowExecutionScope, metrics.DomainTag(request.Domain))
117130
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
118131

132+
pinotStart := time.Now()
119133
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
120-
defer sw.Stop()
134+
defer func() {
135+
sw.Stop()
136+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
137+
}()
121138
err := p.persistence.UpsertWorkflowExecution(ctx, request)
122139

123140
if err != nil {
@@ -135,8 +152,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutions(
135152
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
136153
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
137154

155+
pinotStart := time.Now()
138156
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
139-
defer sw.Stop()
157+
defer func() {
158+
sw.Stop()
159+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
160+
}()
140161
response, err := p.persistence.ListOpenWorkflowExecutions(ctx, request)
141162

142163
if err != nil {
@@ -154,8 +175,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutions(
154175
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
155176
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
156177

178+
pinotStart := time.Now()
157179
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
158-
defer sw.Stop()
180+
defer func() {
181+
sw.Stop()
182+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
183+
}()
159184
response, err := p.persistence.ListClosedWorkflowExecutions(ctx, request)
160185

161186
if err != nil {
@@ -173,8 +198,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutionsByType(
173198
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsByTypeScope, metrics.DomainTag(request.Domain))
174199
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
175200

201+
pinotStart := time.Now()
176202
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
177-
defer sw.Stop()
203+
defer func() {
204+
sw.Stop()
205+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
206+
}()
178207
response, err := p.persistence.ListOpenWorkflowExecutionsByType(ctx, request)
179208

180209
if err != nil {
@@ -191,8 +220,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByType(
191220

192221
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByTypeScope, metrics.DomainTag(request.Domain))
193222
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
223+
pinotStart := time.Now()
194224
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
195-
defer sw.Stop()
225+
defer func() {
226+
sw.Stop()
227+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
228+
}()
196229
response, err := p.persistence.ListClosedWorkflowExecutionsByType(ctx, request)
197230

198231
if err != nil {
@@ -209,8 +242,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutionsByWorkflowID(
209242

210243
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsByWorkflowIDScope, metrics.DomainTag(request.Domain))
211244
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
245+
pinotStart := time.Now()
212246
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
213-
defer sw.Stop()
247+
defer func() {
248+
sw.Stop()
249+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
250+
}()
214251
response, err := p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
215252

216253
if err != nil {
@@ -227,8 +264,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByWorkflowID(
227264

228265
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByWorkflowIDScope, metrics.DomainTag(request.Domain))
229266
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
267+
pinotStart := time.Now()
230268
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
231-
defer sw.Stop()
269+
defer func() {
270+
sw.Stop()
271+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
272+
}()
232273
response, err := p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
233274

234275
if err != nil {
@@ -245,8 +286,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByStatus(
245286

246287
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByStatusScope, metrics.DomainTag(request.Domain))
247288
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
289+
pinotStart := time.Now()
248290
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
249-
defer sw.Stop()
291+
defer func() {
292+
sw.Stop()
293+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
294+
}()
250295
response, err := p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request)
251296

252297
if err != nil {
@@ -263,8 +308,12 @@ func (p *pinotVisibilityMetricsClient) GetClosedWorkflowExecution(
263308

264309
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotGetClosedWorkflowExecutionScope, metrics.DomainTag(request.Domain))
265310
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
311+
pinotStart := time.Now()
266312
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
267-
defer sw.Stop()
313+
defer func() {
314+
sw.Stop()
315+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
316+
}()
268317
response, err := p.persistence.GetClosedWorkflowExecution(ctx, request)
269318

270319
if err != nil {
@@ -281,8 +330,12 @@ func (p *pinotVisibilityMetricsClient) ListWorkflowExecutions(
281330

282331
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
283332
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
333+
pinotStart := time.Now()
284334
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
285-
defer sw.Stop()
335+
defer func() {
336+
sw.Stop()
337+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
338+
}()
286339
response, err := p.persistence.ListWorkflowExecutions(ctx, request)
287340

288341
if err != nil {
@@ -299,8 +352,12 @@ func (p *pinotVisibilityMetricsClient) ScanWorkflowExecutions(
299352

300353
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotScanWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
301354
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
355+
pinotStart := time.Now()
302356
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
303-
defer sw.Stop()
357+
defer func() {
358+
sw.Stop()
359+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
360+
}()
304361
response, err := p.persistence.ScanWorkflowExecutions(ctx, request)
305362

306363
if err != nil {
@@ -317,8 +374,12 @@ func (p *pinotVisibilityMetricsClient) CountWorkflowExecutions(
317374

318375
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotCountWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
319376
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
377+
pinotStart := time.Now()
320378
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
321-
defer sw.Stop()
379+
defer func() {
380+
sw.Stop()
381+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
382+
}()
322383
response, err := p.persistence.CountWorkflowExecutions(ctx, request)
323384

324385
if err != nil {
@@ -335,8 +396,12 @@ func (p *pinotVisibilityMetricsClient) DeleteWorkflowExecution(
335396

336397
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotDeleteWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
337398
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
399+
pinotStart := time.Now()
338400
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
339-
defer sw.Stop()
401+
defer func() {
402+
sw.Stop()
403+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
404+
}()
340405
err := p.persistence.DeleteWorkflowExecution(ctx, request)
341406

342407
if err != nil {
@@ -353,8 +418,12 @@ func (p *pinotVisibilityMetricsClient) DeleteUninitializedWorkflowExecution(
353418

354419
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotDeleteWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
355420
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
421+
pinotStart := time.Now()
356422
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
357-
defer sw.Stop()
423+
defer func() {
424+
sw.Stop()
425+
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
426+
}()
358427
err := p.persistence.DeleteWorkflowExecution(ctx, request)
359428

360429
if err != nil {

common/quotas/global/collection/collection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,11 @@ func (c *Collection) backgroundUpdateLoop() {
407407
c.scope.RecordHistogramValue(metrics.GlobalRatelimiterGlobalUsageHistogram, float64(globals))
408408

409409
if len(usage) > 0 {
410+
ratelimiterUpdateStart := time.Now()
410411
sw := c.scope.StartTimer(metrics.GlobalRatelimiterUpdateLatency)
411412
c.doUpdate(now.Sub(lastGatherTime), usage)
412413
sw.Stop()
414+
c.scope.RecordHistogramDuration(metrics.GlobalRatelimiterUpdateLatencyHistogram, time.Since(ratelimiterUpdateStart))
413415
}
414416

415417
<-localMetricsDone // should be much faster than doUpdate, unless it's no-opped

common/task/hierarchical_weighted_round_robin_task_scheduler.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,12 @@ func (w *hierarchicalWeightedRoundRobinTaskSchedulerImpl[K, T]) Stop() {
116116

117117
func (w *hierarchicalWeightedRoundRobinTaskSchedulerImpl[K, T]) Submit(task T) error {
118118
w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
119+
submitStart := time.Now()
119120
sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
120-
defer sw.Stop()
121+
defer func() {
122+
sw.Stop()
123+
w.metricsScope.RecordHistogramDuration(metrics.PriorityTaskSubmitLatencyHistogram, time.Since(submitStart))
124+
}()
121125

122126
w.RLock()
123127
defer w.RUnlock()
@@ -137,8 +141,12 @@ func (w *hierarchicalWeightedRoundRobinTaskSchedulerImpl[K, T]) TrySubmit(
137141
task T,
138142
) (bool, error) {
139143
w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
144+
submitStart := time.Now()
140145
sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
141-
defer sw.Stop()
146+
defer func() {
147+
sw.Stop()
148+
w.metricsScope.RecordHistogramDuration(metrics.PriorityTaskSubmitLatencyHistogram, time.Since(submitStart))
149+
}()
142150

143151
w.RLock()
144152
defer w.RUnlock()

0 commit comments

Comments
 (0)