Skip to content

Commit 211f219

Browse files
author
Guy Baron
authored
fix wrongly set correlation ids when sending a command (#220)
* adding the SagaCorrelationID header only for replies Fixing a bug where grabbit would set the message.CorrelationID and the message.SagaCorrelationID header even if the message was a command and not a reply. * fixing minor documentation issue
1 parent ce8face commit 211f219

File tree

2 files changed

+12
-14
lines changed

2 files changed

+12
-14
lines changed

gbus/abstractions.go

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ const (
1818
CMD Semantics = "cmd"
1919
//EVT represenst a messge with event semantics in grabbit
2020
EVT Semantics = "evt"
21+
//REPLY represenst a messge with reply semantics in grabbit
22+
REPLY Semantics = "reply"
2123
)
2224

2325
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus

gbus/saga/invocation.go

+10-14
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,21 @@ type sagaInvocation struct {
3434
startedByRPCID string
3535
}
3636

37-
func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool, targetService string) {
37+
func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, targetService string, semantics gbus.Semantics) {
3838

39-
message.CorrelationID = si.inboundMsg.ID
4039
message.SagaID = si.sagaID
4140

42-
if !isEvent {
43-
//support saga-to-saga communication
44-
if si.inboundMsg.SagaID != "" {
45-
message.SagaCorrelationID = si.inboundMsg.SagaID
46-
}
41+
if semantics == gbus.REPLY {
42+
message.CorrelationID = si.inboundMsg.ID
43+
message.SagaCorrelationID = si.inboundMsg.SagaID
44+
45+
} else if semantics == gbus.CMD {
4746
//if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
4847
//https://github.com/wework/grabbit/issues/64
49-
5048
if targetService == si.hostingSvc {
5149
message.SagaCorrelationID = message.SagaID
5250
}
53-
5451
}
55-
5652
}
5753
func (si *sagaInvocation) HostingSvc() string {
5854
return si.hostingSvc
@@ -64,13 +60,13 @@ func (si *sagaInvocation) InvokingSvc() string {
6460

6561
func (si *sagaInvocation) Reply(ctx context.Context, message *gbus.BusMessage) error {
6662
_, targetService := si.decoratedInvocation.Routing()
67-
si.setCorrelationIDs(message, false, targetService)
63+
si.setCorrelationIDs(message, targetService, gbus.REPLY)
6864
return si.decoratedInvocation.Reply(ctx, message)
6965
}
7066

7167
func (si *sagaInvocation) ReplyToInitiator(ctx context.Context, message *gbus.BusMessage) error {
7268

73-
si.setCorrelationIDs(message, false, si.startedBy)
69+
si.setCorrelationIDs(message, si.startedBy, gbus.REPLY)
7470

7571
//override the correlation ids to those of the message creating the saga
7672
message.SagaCorrelationID = si.startedBySaga
@@ -93,13 +89,13 @@ func (si *sagaInvocation) Ctx() context.Context {
9389

9490
func (si *sagaInvocation) Send(ctx context.Context, toService string,
9591
command *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
96-
si.setCorrelationIDs(command, false, toService)
92+
si.setCorrelationIDs(command, toService, gbus.CMD)
9793
return si.decoratedBus.Send(ctx, toService, command, policies...)
9894
}
9995

10096
func (si *sagaInvocation) Publish(ctx context.Context, exchange, topic string,
10197
event *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
102-
si.setCorrelationIDs(event, true, "")
98+
si.setCorrelationIDs(event, "", gbus.EVT)
10399
return si.decoratedBus.Publish(ctx, exchange, topic, event, policies...)
104100
}
105101

0 commit comments

Comments
 (0)