@@ -101,6 +101,39 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def {
101
101
return defs
102
102
}
103
103
104
+ func (imsm * Glue ) handleNewSaga (def * Def , invocation gbus.Invocation , message * gbus.BusMessage ) error {
105
+ newInstance := def .newInstance ()
106
+ newInstance .StartedBy = invocation .InvokingSvc ()
107
+ newInstance .StartedBySaga = message .SagaCorrelationID
108
+ newInstance .StartedByRPCID = message .RPCID
109
+ newInstance .StartedByMessageID = message .ID
110
+
111
+ imsm .Log ().
112
+ WithFields (logrus.Fields {"saga_def" : def .String (), "saga_id" : newInstance .ID }).
113
+ Info ("created new saga" )
114
+ if invkErr := imsm .invokeSagaInstance (def , newInstance , invocation , message ); invkErr != nil {
115
+ imsm .Log ().WithError (invkErr ).WithField ("saga_id" , newInstance .ID ).Error ("failed to invoke saga" )
116
+ return invkErr
117
+ }
118
+
119
+ if ! newInstance .isComplete () {
120
+ imsm .Log ().WithField ("saga_id" , newInstance .ID ).Info ("saving new saga" )
121
+
122
+ if e := imsm .sagaStore .SaveNewSaga (invocation .Tx (), def .sagaType , newInstance ); e != nil {
123
+ imsm .Log ().WithError (e ).WithField ("saga_id" , newInstance .ID ).Error ("saving new saga failed" )
124
+ return e
125
+ }
126
+
127
+ if requestsTimeout , duration := newInstance .requestsTimeout (); requestsTimeout {
128
+ imsm .Log ().WithFields (logrus.Fields {"saga_id" : newInstance .ID , "timeout_duration" : duration }).Info ("new saga requested timeout" )
129
+ if tme := imsm .timeoutManager .RegisterTimeout (invocation .Tx (), newInstance .ID , duration ); tme != nil {
130
+ return tme
131
+ }
132
+ }
133
+ }
134
+ return nil
135
+ }
136
+
104
137
//SagaHandler is the generic handler invoking saga instances
105
138
func (imsm * Glue ) SagaHandler (invocation gbus.Invocation , message * gbus.BusMessage ) error {
106
139
@@ -121,37 +154,8 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
121
154
*/
122
155
startNew := def .shouldStartNewSaga (message )
123
156
if startNew {
157
+ return imsm .handleNewSaga (def , invocation , message )
124
158
125
- newInstance := def .newInstance ()
126
- newInstance .StartedBy = invocation .InvokingSvc ()
127
- newInstance .StartedBySaga = message .SagaCorrelationID
128
- newInstance .StartedByRPCID = message .RPCID
129
- newInstance .StartedByMessageID = message .ID
130
-
131
- imsm .Log ().
132
- WithFields (logrus.Fields {"saga_def" : def .String (), "saga_id" : newInstance .ID }).
133
- Info ("created new saga" )
134
- if invkErr := imsm .invokeSagaInstance (def , newInstance , invocation , message ); invkErr != nil {
135
- imsm .Log ().WithError (invkErr ).WithField ("saga_id" , newInstance .ID ).Error ("failed to invoke saga" )
136
- return invkErr
137
- }
138
-
139
- if ! newInstance .isComplete () {
140
- imsm .Log ().WithField ("saga_id" , newInstance .ID ).Info ("saving new saga" )
141
-
142
- if e := imsm .sagaStore .SaveNewSaga (invocation .Tx (), def .sagaType , newInstance ); e != nil {
143
- imsm .Log ().WithError (e ).WithField ("saga_id" , newInstance .ID ).Error ("saving new saga failed" )
144
- return e
145
- }
146
-
147
- if requestsTimeout , duration := newInstance .requestsTimeout (); requestsTimeout {
148
- imsm .Log ().WithFields (logrus.Fields {"saga_id" : newInstance .ID , "timeout_duration" : duration }).Info ("new saga requested timeout" )
149
- if tme := imsm .timeoutManager .RegisterTimeout (invocation .Tx (), newInstance .ID , duration ); tme != nil {
150
- return tme
151
- }
152
- }
153
- }
154
- return nil
155
159
} else if message .SagaCorrelationID != "" {
156
160
instance , getErr := imsm .sagaStore .GetSagaByID (invocation .Tx (), message .SagaCorrelationID )
157
161
0 commit comments