Skip to content

Commit 1a33290

Browse files
danielwitzGuy Baron
authored and
Guy Baron
committed
added generic handler metrics with message type as the label (#144)
* added generic handler metrics with message type as the label * add handler name label to the metrics * adding new metrics to the read me
1 parent 61c7dc4 commit 1a33290

File tree

6 files changed

+119
-42
lines changed

6 files changed

+119
-42
lines changed

docs/METRICS.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ grabbit exposes and reports the following metrics to Prometheus
44

55
| Namespace | Subsystem | Name | Description |
66
| ------------- | ------------- | ----------------------------------| --------------------------------------------------------------------------- |
7-
| grabbit | handler | [name of message handler]_result | records and counts each succesfull or failed execution of a message handler |
8-
| grabbit | handler | [name of message handler]_latency | records the execution time of each handler |
7+
| grabbit | handlers | [name of message handler]_result | records and counts each successful or failed execution of a message handler |
8+
| grabbit | handlers | [name of message handler]_latency | records the execution time of each handler |
9+
| grabbit | handlers | result | records and counts each run of a handler, having the handler's name, message type and the result as labels|
10+
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
911
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
1012
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |

gbus/metrics/handler_metrics.go

+75-23
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,47 @@ package metrics
22

33
import (
44
"fmt"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
56
"sync"
67

78
"github.com/prometheus/client_golang/prometheus"
8-
io_prometheus_client "github.com/prometheus/client_model/go"
9+
"github.com/prometheus/client_model/go"
910
"github.com/sirupsen/logrus"
1011
)
1112

1213
var (
1314
handlerMetricsByHandlerName = &sync.Map{}
15+
handlersResultCounter = promauto.NewCounterVec(prometheus.CounterOpts{
16+
Namespace: grabbitPrefix,
17+
Subsystem: handlers,
18+
Name: handlerResult,
19+
Help: "The result of the message handler. The handler's name, message type and result are labeled",
20+
}, []string{handler, messageTypeLabel, handlerResult})
21+
handlersLatencySummary = promauto.NewSummaryVec(prometheus.SummaryOpts{
22+
Namespace: grabbitPrefix,
23+
Subsystem: handlers,
24+
Name: handlerLatency,
25+
Help: "The latency of the message handler. The handler's name and message type are labeled",
26+
}, []string{handler, messageTypeLabel})
1427
)
1528

1629
const (
17-
failure = "failure"
18-
success = "success"
19-
handlerResult = "result"
20-
handlers = "handlers"
21-
grabbitPrefix = "grabbit"
30+
failure = "failure"
31+
success = "success"
32+
handlerResult = "result"
33+
handlers = "handlers"
34+
grabbitPrefix = "grabbit"
35+
messageTypeLabel = "message_type"
36+
handlerLatency = "latency"
37+
handler = "handler"
2238
)
2339

2440
type handlerMetrics struct {
2541
result *prometheus.CounterVec
2642
latency prometheus.Summary
2743
}
2844

29-
//AddHandlerMetrics adds a handlere to be tracked with metrics
45+
//AddHandlerMetrics adds a handler to be tracked with metrics
3046
func AddHandlerMetrics(handlerName string) {
3147
handlerMetrics := newHandlerMetrics(handlerName)
3248
_, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics)
@@ -37,29 +53,31 @@ func AddHandlerMetrics(handlerName string) {
3753
}
3854

3955
//RunHandlerWithMetric runs a specific handler with metrics being collected and reported to prometheus
40-
func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error {
56+
func RunHandlerWithMetric(handleMessage func() error, handlerName, messageType string, logger logrus.FieldLogger) error {
4157
handlerMetrics := GetHandlerMetrics(handlerName)
4258
defer func() {
4359
if p := recover(); p != nil {
4460
if handlerMetrics != nil {
4561
handlerMetrics.result.WithLabelValues(failure).Inc()
4662
}
47-
63+
handlersResultCounter.With(prometheus.Labels{handler: handlerName, messageTypeLabel: messageType, handlerResult: failure}).Inc()
4864
panic(p)
4965
}
5066
}()
5167

5268
if handlerMetrics == nil {
5369
logger.WithField("handler", handlerName).Warn("Running with metrics - couldn't find metrics for the given handler")
54-
return handleMessage()
70+
return trackTime(handleMessage, handlersLatencySummary.WithLabelValues(handlerName, messageType))
5571
}
5672

57-
err := trackTime(handleMessage, handlerMetrics.latency)
73+
err := trackTime(handleMessage, handlerMetrics.latency, handlersLatencySummary.WithLabelValues(handlerName, messageType))
5874

5975
if err != nil {
6076
handlerMetrics.result.WithLabelValues(failure).Inc()
77+
handlersResultCounter.With(prometheus.Labels{handler: handlerName, messageTypeLabel: messageType, handlerResult: failure}).Inc()
6178
} else {
6279
handlerMetrics.result.WithLabelValues(success).Inc()
80+
handlersResultCounter.With(prometheus.Labels{handler: handlerName, messageTypeLabel: messageType, handlerResult: success}).Inc()
6381
}
6482

6583
return err
@@ -95,41 +113,75 @@ func newHandlerMetrics(handlerName string) *handlerMetrics {
95113
}
96114
}
97115

98-
func trackTime(functionToTrack func() error, observer prometheus.Observer) error {
99-
timer := prometheus.NewTimer(observer)
100-
defer timer.ObserveDuration()
116+
func trackTime(functionToTrack func() error, observers ...prometheus.Observer) error {
117+
timers := make([]*prometheus.Timer, 0)
118+
for _, observer := range observers {
119+
timers = append(timers, prometheus.NewTimer(observer))
120+
}
121+
122+
defer func() {
123+
for _, timer := range timers {
124+
timer.ObserveDuration()
125+
}
126+
}()
101127

102128
return functionToTrack()
103129
}
104130

131+
//GetSuccessCountByMessageTypeAndHandlerName gets the counter value for the successful handlers' run for a given message type and handler's name
132+
func GetSuccessCountByMessageTypeAndHandlerName(messageType, handlerName string) (float64, error) {
133+
return getCounterValue(handlersResultCounter.With(prometheus.Labels{messageTypeLabel: messageType, handler: handlerName, handlerResult: success}))
134+
}
135+
136+
//GetFailureCountByMessageTypeAndHandlerName gets the counter value for the failed handlers' run for a given message type and handler's name
137+
func GetFailureCountByMessageTypeAndHandlerName(messageType, handlerName string) (float64, error) {
138+
return getCounterValue(handlersResultCounter.With(prometheus.Labels{messageTypeLabel: messageType, handler: handlerName, handlerResult: failure}))
139+
}
140+
141+
//GetLatencySampleCountByMessageTypeAndHandlerName gets the summary sample count value for the handlers' run for a given message type and handler's name
142+
func GetLatencySampleCountByMessageTypeAndHandlerName(messageType, handlerName string) (*uint64, error) {
143+
summary, ok := handlersLatencySummary.With(prometheus.Labels{messageTypeLabel: messageType, handler: handlerName}).(prometheus.Summary)
144+
145+
if !ok {
146+
return nil, fmt.Errorf("couldn't find summary for event type: %s", messageType)
147+
}
148+
149+
return getSummarySampleCount(summary)
150+
}
151+
105152
//GetSuccessCount gets the value of the handlers success value
106153
func (hm *handlerMetrics) GetSuccessCount() (float64, error) {
107-
return hm.getLabeledCounterValue(success)
154+
return getCounterValue(hm.result.WithLabelValues(success))
108155
}
109156

110157
//GetFailureCount gets the value of the handlers failure value
111158
func (hm *handlerMetrics) GetFailureCount() (float64, error) {
112-
return hm.getLabeledCounterValue(failure)
159+
return getCounterValue(hm.result.WithLabelValues(failure))
113160
}
114161

115162
//GetLatencySampleCount gets the value of the handlers latency value
116163
func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) {
164+
return getSummarySampleCount(hm.latency)
165+
}
166+
167+
func getCounterValue(counter prometheus.Counter) (float64, error) {
117168
m := &io_prometheus_client.Metric{}
118-
err := hm.latency.Write(m)
169+
err := counter.Write(m)
170+
119171
if err != nil {
120-
return nil, err
172+
return 0, err
121173
}
122174

123-
return m.GetSummary().SampleCount, nil
175+
return m.GetCounter().GetValue(), nil
124176
}
125177

126-
func (hm *handlerMetrics) getLabeledCounterValue(label string) (float64, error) {
178+
func getSummarySampleCount(summary prometheus.Summary) (*uint64, error) {
127179
m := &io_prometheus_client.Metric{}
128-
err := hm.result.WithLabelValues(label).Write(m)
180+
err := summary.Write(m)
129181

130182
if err != nil {
131-
return 0, err
183+
return nil, err
132184
}
133185

134-
return m.GetCounter().GetValue(), nil
186+
return m.GetSummary().SampleCount, nil
135187
}

gbus/saga/instance.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati
7373
return val.Interface().(error)
7474
}
7575
return nil
76-
}, methodName, invocation.Log())
76+
}, methodName, message.PayloadFQN, invocation.Log())
7777

7878
if err != nil {
7979
return err

gbus/worker.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
211211
handlerWrapper := func() error {
212212
return worker.deadletterHandler(tx, &delivery)
213213
}
214-
return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), worker.log())
214+
return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), fmt.Sprintf("deadletter_%s", delivery.Type), worker.log())
215215
}
216216

217217
err := worker.withTx(txWrapper)
@@ -250,7 +250,7 @@ func (worker *worker) runGlobalHandler(delivery *amqp.Delivery) error {
250250
return worker.withTx(txWrapper)
251251
}
252252
//run the global handler with metrics
253-
return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, worker.log())
253+
return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, delivery.Type, worker.log())
254254
}
255255
return worker.SafeWithRetries(retryAction, MaxRetryCount)
256256
}
@@ -417,7 +417,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
417417
//execute the handler with metrics
418418
handlerErr := metrics.RunHandlerWithMetric(func() error {
419419
return pinedHandler(invocation, message)
420-
}, handlerName, worker.log())
420+
}, handlerName, message.PayloadFQN, worker.log())
421421

422422
if handlerErr != nil {
423423
hspan.LogFields(slog.Error(handlerErr))

tests/bus_test.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,22 @@ func TestHandlerRetry(t *testing.T) {
174174
t.Error("Metrics for handleRetry should be initiated")
175175
}
176176
f, _ := hm.GetFailureCount()
177+
mtf, _ := metrics.GetFailureCountByMessageTypeAndHandlerName(reply.PayloadFQN, "handleRetry")
177178
s, _ := hm.GetSuccessCount()
179+
mts, _ := metrics.GetSuccessCountByMessageTypeAndHandlerName(reply.PayloadFQN, "handleRetry")
178180

179181
if f != 2 {
180182
t.Errorf("Failure count should be 2 but was %f", f)
181183
}
184+
if mtf != 2 {
185+
t.Errorf("Failure count should be 2 but was %f", mtf)
186+
}
182187
if s != 1 {
183188
t.Errorf("Success count should be 1 but was %f", s)
184189
}
190+
if mts != 1 {
191+
t.Errorf("Success count should be 1 but was %f", mts)
192+
}
185193
}
186194

187195
func handleRetry(invocation gbus.Invocation, message *gbus.BusMessage) error {
@@ -249,8 +257,9 @@ func TestDeadlettering(t *testing.T) {
249257
service1.Start()
250258
defer assertBusShutdown(service1, t)
251259

260+
cmd := gbus.NewBusMessage(Command1{})
252261
service1.Send(context.Background(), testSvc1, poison)
253-
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))
262+
service1.Send(context.Background(), testSvc1, cmd)
254263

255264
proceedOrTimeout(2, proceed, nil, t)
256265

@@ -268,6 +277,10 @@ func TestDeadlettering(t *testing.T) {
268277
if failureCount != 0 {
269278
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
270279
}
280+
poisonF, _ := metrics.GetFailureCountByMessageTypeAndHandlerName(poison.PayloadFQN, "func1")
281+
if poisonF != 0 {
282+
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", poisonF)
283+
}
271284
handlerMetrics = metrics.GetHandlerMetrics("func2")
272285
if handlerMetrics == nil {
273286
t.Fatal("faulty should be registered for metrics")
@@ -276,6 +289,10 @@ func TestDeadlettering(t *testing.T) {
276289
if failureCount == 1 {
277290
t.Errorf("faulty should have failed once, but it failed %f times", failureCount)
278291
}
292+
cmdF, _ := metrics.GetFailureCountByMessageTypeAndHandlerName(cmd.PayloadFQN, "func2")
293+
if cmdF == 1 {
294+
t.Errorf("faulty should have failed once, but it failed %f times", cmdF)
295+
}
279296
}
280297

281298
func TestRawMessageHandling(t *testing.T) {

tests/metrics_test.go

+18-12
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestRunHandlerWithMetric_FailureCounter(t *testing.T) {
5959
}
6060

6161
for i := 1; i < runningTries; i++ {
62-
err := metrics.RunHandlerWithMetric(failure, name, logger)
62+
err := metrics.RunHandlerWithMetric(failure, name, name, logger)
6363

6464
if err == nil {
6565
t.Error("Failed handler run should return an error")
@@ -90,7 +90,7 @@ func TestRunHandlerWithMetric_SuccessCounter(t *testing.T) {
9090
}
9191

9292
for i := 1; i < runningTries; i++ {
93-
err := metrics.RunHandlerWithMetric(success, name, logger)
93+
err := metrics.RunHandlerWithMetric(success, name, name, logger)
9494

9595
if err != nil {
9696
t.Error("Successful handler run shouldn't return an error")
@@ -121,17 +121,23 @@ func TestRunHandlerWithMetric_Latency(t *testing.T) {
121121
}
122122

123123
for i := 1; i < runningTries; i++ {
124-
_ = metrics.RunHandlerWithMetric(success, name, logger)
124+
_ = metrics.RunHandlerWithMetric(success, name, name, logger)
125125
sc, err := hm.GetLatencySampleCount()
126126

127-
if err != nil {
128-
t.Errorf("Failed to get latency value: %e", err)
129-
}
130-
if sc == nil {
131-
t.Errorf("Expected latency sample count not be nil")
132-
}
133-
if *sc != uint64(i) {
134-
t.Errorf("Expected to get %d as the value of the latency sample count, but got %d", uint64(i), *sc)
135-
}
127+
checkLatency(t, sc, uint64(i), err)
128+
mtsc, err := metrics.GetLatencySampleCountByMessageTypeAndHandlerName(name, name)
129+
checkLatency(t, mtsc, uint64(i), err)
130+
}
131+
}
132+
133+
func checkLatency(t *testing.T, sc *uint64, expected uint64, err error) {
134+
if err != nil {
135+
t.Errorf("Failed to get latency value: %e", err)
136+
}
137+
if sc == nil {
138+
t.Errorf("Expected latency sample count not be nil")
139+
}
140+
if *sc != expected {
141+
t.Errorf("Expected to get %d as the value of the latency sample count, but got %d", expected, *sc)
136142
}
137143
}

0 commit comments

Comments
 (0)