Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion common/asyncworkflow/queue/consumer/default_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ func (c *DefaultConsumer) processMessage(msg messaging.Message) {
logger := c.logger.WithTags(tag.Dynamic("partition", msg.Partition()), tag.Dynamic("offset", msg.Offset()))
logger.Debug("Received message")

asyncProcessStart := time.Now()
sw := c.scope.StartTimer(metrics.AsyncWorkflowProcessMsgLatency)
defer sw.Stop()
defer func() {
sw.Stop()
c.scope.RecordHistogramDuration(metrics.AsyncWorkflowProcessMsgLatencyHistogram, time.Since(asyncProcessStart))
}()

var request sqlblobs.AsyncRequestMessage
if err := c.msgDecoder.Decode(msg.Value(), &request); err != nil {
Expand Down
83 changes: 83 additions & 0 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,89 @@ var HistogramMigrationMetrics = map[string]struct{}{
"direct_query_dispatch_non_sticky_latency_ns": {},
"direct_query_dispatch_clear_stickiness_latency": {},
"direct_query_dispatch_clear_stickiness_latency_ns": {},
"cadence_authorization_latency": {},
"cadence_authorization_latency_ns": {},

"pinot_latency": {},
"pinot_latency_ns": {},
"pinot_latency_per_domain": {},
"pinot_latency_per_domain_ns": {},

"sequentialtask_submit_latency": {},
"sequentialtask_submit_latency_ns": {},
"sequentialtask_queue_size": {},
"sequentialtask_queue_size_counts": {},
"sequentialtask_queue_processing_latency": {},
"sequentialtask_queue_processing_latency_ns": {},
"sequentialtask_task_processing_latency": {},
"sequentialtask_task_processing_latency_ns": {},

"prioritytask_submit_latency": {},
"prioritytask_submit_latency_ns": {},

"graceful_failover_latency": {},
"graceful_failover_latency_ns": {},

"async_request_payload_size_per_domain": {},
"async_request_payload_size_per_domain_counts": {},

"task_redispatch_queue_pending_tasks": {},
"task_redispatch_queue_pending_tasks_counts": {},

"workflow_context_lock_latency": {},
"workflow_context_lock_latency_ns": {},

"get_replication_messages_for_shard": {},
"get_replication_messages_for_shard_ns": {},
"get_dlq_replication_messages": {},
"get_dlq_replication_messages_ns": {},

"decision_task_query_latency": {},
"decision_task_query_latency_ns": {},

"syncmatch_latency_per_tl": {},
"syncmatch_latency_per_tl_ns": {},
"asyncmatch_latency_per_tl": {},
"asyncmatch_latency_per_tl_ns": {},

"asyncmatch_local_poll_attempt_per_tl": {},
"asyncmatch_local_poll_attempt_per_tl_counts": {},
"asyncmatch_forward_poll_attempt_per_tl": {},
"asyncmatch_forward_poll_attempt_per_tl_counts": {},
"asyncmatch_local_poll_after_forward_failed_attempt_per_tl": {},
"asyncmatch_local_poll_after_forward_failed_attempt_per_tl_counts": {},

"poll_local_match_latency_per_tl": {},
"poll_local_match_latency_per_tl_ns": {},
"poll_forward_match_latency_per_tl": {},
"poll_forward_match_latency_per_tl_ns": {},
"poll_local_match_after_forward_failed_latency_per_tl": {},
"poll_local_match_after_forward_failed_latency_per_tl_ns": {},

"es_processor_process_msg_latency": {},
"es_processor_process_msg_latency_ns": {},
"index_processor_process_msg_latency": {},
"index_processor_process_msg_latency_ns": {},

"async_workflow_process_msg_latency": {},
"async_workflow_process_msg_latency_ns": {},
"diagnostics_workflow_execution_latency": {},
"diagnostics_workflow_execution_latency_ns": {},

"shard_distributor_latency": {},
"shard_distributor_latency_ns": {},

"global_ratelimiter_update_latency": {},
"global_ratelimiter_update_latency_ns": {},

"cadence_latency": {},
"cadence_latency_ns": {},
"cadence_client_latency": {},
"cadence_client_latency_ns": {},
"cadence_client_latency_redirection": {},
"cadence_client_latency_redirection_ns": {},
"cadence_latency_per_tl": {},
"cadence_latency_per_tl_ns": {},
}

func (h HistogramMigration) EmitTimer(name string) bool {
Expand Down
250 changes: 158 additions & 92 deletions common/metrics/defs.go

Large diffs are not rendered by default.

103 changes: 86 additions & 17 deletions common/persistence/pinot/pinot_visibility_metric_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package pinotvisibility

import (
"context"
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -59,8 +60,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionStarted(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionStartedScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.RecordWorkflowExecutionStarted(ctx, request)

if err != nil {
Expand All @@ -78,8 +83,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionClosed(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionClosedScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.RecordWorkflowExecutionClosed(ctx, request)

if err != nil {
Expand All @@ -97,8 +106,12 @@ func (p *pinotVisibilityMetricsClient) RecordWorkflowExecutionUninitialized(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotRecordWorkflowExecutionUninitializedScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)

if err != nil {
Expand All @@ -116,8 +129,12 @@ func (p *pinotVisibilityMetricsClient) UpsertWorkflowExecution(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotUpsertWorkflowExecutionScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.UpsertWorkflowExecution(ctx, request)

if err != nil {
Expand All @@ -135,8 +152,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutions(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListOpenWorkflowExecutions(ctx, request)

if err != nil {
Expand All @@ -154,8 +175,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutions(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListClosedWorkflowExecutions(ctx, request)

if err != nil {
Expand All @@ -173,8 +198,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutionsByType(
scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsByTypeScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)

pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListOpenWorkflowExecutionsByType(ctx, request)

if err != nil {
Expand All @@ -191,8 +220,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByType(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByTypeScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListClosedWorkflowExecutionsByType(ctx, request)

if err != nil {
Expand All @@ -209,8 +242,12 @@ func (p *pinotVisibilityMetricsClient) ListOpenWorkflowExecutionsByWorkflowID(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListOpenWorkflowExecutionsByWorkflowIDScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)

if err != nil {
Expand All @@ -227,8 +264,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByWorkflowID(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByWorkflowIDScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)

if err != nil {
Expand All @@ -245,8 +286,12 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByStatus(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListClosedWorkflowExecutionsByStatusScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request)

if err != nil {
Expand All @@ -263,8 +308,12 @@ func (p *pinotVisibilityMetricsClient) GetClosedWorkflowExecution(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotGetClosedWorkflowExecutionScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.GetClosedWorkflowExecution(ctx, request)

if err != nil {
Expand All @@ -281,8 +330,12 @@ func (p *pinotVisibilityMetricsClient) ListWorkflowExecutions(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ListWorkflowExecutions(ctx, request)

if err != nil {
Expand All @@ -299,8 +352,12 @@ func (p *pinotVisibilityMetricsClient) ScanWorkflowExecutions(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotScanWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.ScanWorkflowExecutions(ctx, request)

if err != nil {
Expand All @@ -317,8 +374,12 @@ func (p *pinotVisibilityMetricsClient) CountWorkflowExecutions(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotCountWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
response, err := p.persistence.CountWorkflowExecutions(ctx, request)

if err != nil {
Expand All @@ -335,8 +396,12 @@ func (p *pinotVisibilityMetricsClient) DeleteWorkflowExecution(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotDeleteWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.DeleteWorkflowExecution(ctx, request)

if err != nil {
Expand All @@ -353,8 +418,12 @@ func (p *pinotVisibilityMetricsClient) DeleteUninitializedWorkflowExecution(

scopeWithDomainTag := p.metricClient.Scope(metrics.PinotDeleteWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain)
pinotStart := time.Now()
sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain)
defer sw.Stop()
defer func() {
sw.Stop()
scopeWithDomainTag.RecordHistogramDuration(metrics.PinotLatencyPerDomainHistogram, time.Since(pinotStart))
}()
err := p.persistence.DeleteWorkflowExecution(ctx, request)

if err != nil {
Expand Down
Loading
Loading