Skip to content

Commit 58b7cec

Browse files
yuvmendelGuy Baron
authored and
Guy Baron
committed
added metrics on deadLetterHandler, refactored HandleDeadLetter inter… (#122)
* added metrics on deadLetterHandler, refactored HandleDeadLetter interface to receive new DeadLetterMessageHandler type * fix dead letter test and a build error * added documentation for DeadLetterMessageHandler, also fixed poison spelling throughout code * retrigger build
1 parent 6c2a9e6 commit 58b7cec

File tree

6 files changed

+56
-18
lines changed

6 files changed

+56
-18
lines changed

gbus/abstractions.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ type Saga interface {
129129

130130
//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
131131
type Deadlettering interface {
132-
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
132+
HandleDeadletter(handler DeadLetterMessageHandler)
133133
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
134134
}
135135

gbus/bus.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type DefaultBus struct {
4343
amqpOutbox *AMQPOutbox
4444

4545
RPCHandlers map[string]MessageHandler
46-
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
46+
deadletterHandler DeadLetterMessageHandler
4747
HandlersLock *sync.Mutex
4848
RPCLock *sync.Mutex
4949
SenderLock *sync.Mutex
@@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
548548
}
549549

550550
//HandleDeadletter implements GBus.HandleDeadletter
551-
func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) {
552-
b.deadletterHandler = handler
551+
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
552+
b.registerDeadLetterHandler(handler)
553553
}
554554

555555
//ReturnDeadToQueue returns a message to its original destination
@@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
691691
return nil
692692
}
693693

694+
func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
695+
metrics.AddHandlerMetrics(handler.Name())
696+
b.deadletterHandler = handler
697+
}
698+
694699
func (b *DefaultBus) bindQueue(topic, exchange string) error {
695700
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
696701
}

gbus/message_handler.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package gbus
22

33
import (
4+
"database/sql"
5+
"github.com/streadway/amqp"
46
"reflect"
57
"runtime"
68
"strings"
@@ -9,9 +11,21 @@ import (
911
//MessageHandler signature for all command handlers
1012
type MessageHandler func(invocation Invocation, message *BusMessage) error
1113

14+
//DeadLetterMessageHandler signature for dead letter handler
15+
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
16+
1217
//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
1318
func (mg MessageHandler) Name() string {
14-
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
19+
return nameFromFunc(mg)
20+
}
21+
22+
//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
23+
func (dlmg DeadLetterMessageHandler) Name() string {
24+
return nameFromFunc(dlmg)
25+
}
26+
27+
func nameFromFunc(function interface{}) string {
28+
funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
1529
splits := strings.Split(funName, ".")
1630
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
1731
return fn

gbus/worker.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package gbus
22

33
import (
44
"context"
5-
"database/sql"
65
"errors"
76
"fmt"
87
"math/rand"
@@ -36,7 +35,7 @@ type worker struct {
3635
handlersLock *sync.Mutex
3736
registrations []*Registration
3837
rpcHandlers map[string]MessageHandler
39-
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
38+
deadletterHandler DeadLetterMessageHandler
4039
b *DefaultBus
4140
serializer Serializer
4241
txProvider TxProvider
@@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
215214
_ = worker.reject(true, delivery)
216215
return
217216
}
218-
err := worker.deadletterHandler(tx, delivery)
217+
err := metrics.RunHandlerWithMetric(func() error {
218+
return worker.deadletterHandler(tx, delivery)
219+
}, worker.deadletterHandler.Name(), worker.log())
219220
var reject bool
220221
if err != nil {
221222
worker.log().WithError(err).Error("failed handling deadletter")

tests/bus_test.go

+25-7
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) {
231231

232232
var waitgroup sync.WaitGroup
233233
waitgroup.Add(2)
234-
poision := gbus.NewBusMessage(PoisionMessage{})
234+
poison := gbus.NewBusMessage(PoisonMessage{})
235235
service1 := createNamedBusForTest(testSvc1)
236236
deadletterSvc := createNamedBusForTest("deadletterSvc")
237237

238-
deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
238+
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
239239
waitgroup.Done()
240240
return nil
241241
}
@@ -252,30 +252,48 @@ func TestDeadlettering(t *testing.T) {
252252
service1.Start()
253253
defer service1.Shutdown()
254254

255-
service1.Send(context.Background(), testSvc1, poision)
255+
service1.Send(context.Background(), testSvc1, poison)
256256
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))
257257

258258
waitgroup.Wait()
259259
count, _ := metrics.GetRejectedMessagesValue()
260260
if count != 1 {
261261
t.Error("Should have one rejected message")
262262
}
263+
264+
//because deadMessageHandler is an anonymous function and is registered first its name will be "func1"
265+
handlerMetrics := metrics.GetHandlerMetrics("func1")
266+
if handlerMetrics == nil {
267+
t.Fatal("DeadLetterHandler should be registered for metrics")
268+
}
269+
failureCount, _ := handlerMetrics.GetFailureCount()
270+
if failureCount != 0 {
271+
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
272+
}
273+
handlerMetrics = metrics.GetHandlerMetrics("func2")
274+
if handlerMetrics == nil {
275+
t.Fatal("faulty should be registered for metrics")
276+
}
277+
failureCount, _ = handlerMetrics.GetFailureCount()
278+
if failureCount == 1 {
279+
t.Errorf("faulty should have failed once, but it failed %f times", failureCount)
280+
}
263281
}
264282

265283
func TestReturnDeadToQueue(t *testing.T) {
266284

267285
var visited bool
268286
proceed := make(chan bool, 0)
269-
poision := gbus.NewBusMessage(Command1{})
287+
poison := gbus.NewBusMessage(Command1{})
270288

271289
service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
272290
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
273291

274292
deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
275293
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
276294

277-
deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
278-
pub := amqpDeliveryToPublishing(poision)
295+
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
296+
pub := amqpDeliveryToPublishing(poison)
279297
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
280298
return nil
281299
}
@@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) {
297315
service1.Start()
298316
defer service1.Shutdown()
299317

300-
service1.Send(context.Background(), testSvc1, poision)
318+
service1.Send(context.Background(), testSvc1, poison)
301319

302320
select {
303321
case <-proceed:

tests/testMessages.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{}
99
var _ gbus.Message = &Event1{}
1010
var _ gbus.Message = &Event2{}
1111

12-
type PoisionMessage struct {
12+
type PoisonMessage struct {
1313
}
1414

15-
func (PoisionMessage) SchemaName() string {
16-
//an empty schema name will result in a message being treated as poision
15+
func (PoisonMessage) SchemaName() string {
16+
//an empty schema name will result in a message being treated as poison
1717
return ""
1818
}
1919

0 commit comments

Comments
 (0)