Skip to content

Commit f6ae8c5

Browse files
author
Guy Baron
authored
adding all logging context data to worker and saga log entries (#215)
* adding all logging context data to worker and saga log entries * added logging with context when command or reply received for saga def but no saga correlation id found * removing minor discrepancies and updating documentation
1 parent 657037d commit f6ae8c5

File tree

4 files changed

+97
-81
lines changed

4 files changed

+97
-81
lines changed

docs/LOGGING.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ annotated with the following contextual data (added as logrus fields to the log
99
allowing for a better debugging experience.
1010

1111
- _service: the service name
12+
- correlation_id: the correlation id set for the message
13+
- exchange: the exchange the message was published to
1214
- handler_name: the name of the handler being invoked
15+
- idempotency_key: the idempotency key set for the message
1316
- message_id: the id of the processed message
1417
- message_name: the type of the message that is being processed
1518
- routing_key: the routing_key of the message
1619
- saga_id: the id of the saga instance being invoked
17-
- saga_def: the type of the saga that is being invoked
20+
- saga_def: the type of the saga that is being invoked
21+
- worker: the worker identifier that is processing the message
1822

1923
```go
2024

gbus/messages.go

+15
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/opentracing/opentracing-go/log"
99
"github.com/rs/xid"
10+
"github.com/sirupsen/logrus"
1011
"github.com/streadway/amqp"
1112
)
1213

@@ -109,6 +110,20 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
109110
}
110111
}
111112

113+
func GetDeliveryLogEntries(delivery amqp.Delivery) logrus.Fields {
114+
115+
return logrus.Fields{
116+
"message_name": castToString(delivery.Headers["x-msg-name"]),
117+
"message_id": delivery.MessageId,
118+
"routing_key": delivery.RoutingKey,
119+
"exchange": delivery.Exchange,
120+
"idempotency_key": castToString(delivery.Headers["x-idempotency-key"]),
121+
"correlation_id": castToString(delivery.CorrelationId),
122+
"rpc_id": castToString(delivery.Headers["x-grabbit-msg-rpc-id"]),
123+
}
124+
125+
}
126+
112127
func castToString(i interface{}) string {
113128
v, ok := i.(string)
114129
if !ok {

gbus/saga/glue.go

+20-14
Original file line numberDiff line numberDiff line change
@@ -108,24 +108,25 @@ func (imsm *Glue) handleNewSaga(def *Def, invocation gbus.Invocation, message *g
108108
newInstance.StartedByRPCID = message.RPCID
109109
newInstance.StartedByMessageID = message.ID
110110

111-
imsm.Log().
112-
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
111+
logInContext := invocation.Log().WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID})
112+
113+
logInContext.
113114
Info("created new saga")
114115
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
115-
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
116+
logInContext.Error("failed to invoke saga")
116117
return invkErr
117118
}
118119

119120
if !newInstance.isComplete() {
120-
imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga")
121+
logInContext.Info("saving new saga")
121122

122123
if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil {
123-
imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed")
124+
logInContext.Error("saving new saga failed")
124125
return e
125126
}
126127

127128
if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout {
128-
imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout")
129+
logInContext.WithField("timeout_duration", duration).Info("new saga requested timeout")
129130
if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil {
130131
return tme
131132
}
@@ -152,15 +153,19 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
152153
4) Else if message is not an event drop it (cmd messages should have 1 specific target)
153154
5) Else iterate over all instances and invoke the needed handler
154155
*/
156+
logInContext := invocation.Log().WithFields(
157+
logrus.Fields{"saga_def": def.String(),
158+
"saga_type": def.sagaType})
155159
startNew := def.shouldStartNewSaga(message)
156160
if startNew {
157161
return imsm.handleNewSaga(def, invocation, message)
158162

159163
} else if message.SagaCorrelationID != "" {
160164
instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID)
161165

166+
logInContext = logInContext.WithField("saga_correlation_id", message.SagaCorrelationID)
162167
if getErr != nil {
163-
imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id")
168+
logInContext.Error("failed to fetch saga by id")
164169
return getErr
165170
}
166171
if instance == nil {
@@ -173,34 +178,35 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
173178
174179
https://github.com/wework/grabbit/issues/196
175180
*/
176-
imsm.Log().WithField("saga_correlation_id", message.SagaCorrelationID).Warn("message routed with SagaCorrelationID but no saga instance with the same id found")
181+
logInContext.Warn("message routed with SagaCorrelationID but no saga instance with the same id found")
177182
return nil
178183
}
184+
logInContext = logInContext.WithField("saga_id", instance.ID)
179185
def.configureSaga(instance)
180186
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
181-
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
187+
logInContext.WithError(invkErr).Error("failed to invoke saga")
182188
return invkErr
183189
}
184190

185191
return imsm.completeOrUpdateSaga(invocation.Tx(), instance)
186192

187193
} else if message.Semantics == gbus.CMD {
188-
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
189-
return e
194+
logInContext.Warn("command or reply message with no saga reference received")
195+
return errors.New("can not resolve saga instance for message")
190196
} else {
191197

192-
imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type")
198+
logInContext.Info("fetching saga instances by type")
193199
instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType)
194200

195201
if e != nil {
196202
return e
197203
}
198-
imsm.Log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")
204+
logInContext.WithFields(logrus.Fields{"instances_fetched": len(instances)}).Info("fetched saga instances")
199205

200206
for _, instance := range instances {
201207
def.configureSaga(instance)
202208
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
203-
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
209+
logInContext.WithError(invkErr).Error("failed to invoke saga")
204210
return invkErr
205211
}
206212
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)

0 commit comments

Comments
 (0)