Skip to content

Commit e1ac9dc

Browse files
adiweissGuy Baron
adiweiss
authored and
Guy Baron
committed
Handle empty body messages (#147)
1 parent eef69fb commit e1ac9dc

File tree

2 files changed

+201
-69
lines changed

2 files changed

+201
-69
lines changed

gbus/worker.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,13 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<
102102
func (worker *worker) consumeMessages() {
103103

104104
for msg := range worker.messages {
105-
if msg.Body == nil || len(msg.Body) == 0 {
106-
continue
107-
}
108105
worker.processMessage(msg, false)
109106
}
110107
}
111108

112109
func (worker *worker) consumeRPC() {
113110

114111
for msg := range worker.rpcMessages {
115-
if msg.Body == nil || len(msg.Body) == 0 {
116-
continue
117-
}
118112
worker.processMessage(msg, true)
119113
}
120114
}
@@ -311,6 +305,17 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
311305
_ = worker.ack(delivery)
312306
return
313307
}
308+
309+
if delivery.Body == nil || len(delivery.Body) == 0 {
310+
worker.log().
311+
WithFields(
312+
logrus.Fields{"message-name": msgName}).
313+
Warn("body is missing for message. Cannot invoke handlers.")
314+
worker.span.LogFields(slog.String("grabbit", "no body found"))
315+
// if there are handlers registered for this type of message, it's a bug and the message must be rejected.
316+
_ = worker.reject(false, delivery)
317+
return
318+
}
314319
/*
315320
extract the bus message only after we are sure there are registered handlers since
316321
it includes deserializing the amqp payload which we want to avoid if no handlers are found

0 commit comments

Comments
 (0)