diff --git a/integ/any_command_combination_test.go b/integ/any_command_combination_test.go index 085c44e7..84b1bbf4 100644 --- a/integ/any_command_combination_test.go +++ b/integ/any_command_combination_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/indeedeng/iwf/gen/iwfidl" - anycommandconbination "github.com/indeedeng/iwf/integ/workflow/any_command_combination" + anycommandcombination "github.com/indeedeng/iwf/integ/workflow/any_command_combination" "github.com/indeedeng/iwf/service" "github.com/stretchr/testify/assert" ) @@ -59,7 +59,7 @@ func TestAnyCommandCombinationWorkflowCadenceContinueAsNew(t *testing.T) { func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { assertions := assert.New(t) // start test workflow server - wfHandler := anycommandconbination.NewHandler() + wfHandler := anycommandcombination.NewHandler() closeFunc1 := startWorkflowWorker(wfHandler, t) defer closeFunc1() @@ -74,14 +74,14 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe }, }, }) - wfId := anycommandconbination.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) + wfId := anycommandcombination.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) _, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{ WorkflowId: wfId, - IwfWorkflowType: anycommandconbination.WorkflowType, + IwfWorkflowType: anycommandcombination.WorkflowType, WorkflowTimeoutSeconds: 40, IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, - StartStateId: ptr.Any(anycommandconbination.State1), + StartStateId: ptr.Any(anycommandcombination.State1), WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ WorkflowConfigOverride: config, }, @@ -97,13 +97,13 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background()) httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ WorkflowId: wfId, - SignalChannelName: anycommandconbination.SignalNameAndId1, + SignalChannelName: anycommandcombination.SignalNameAndId1, SignalValue: &signalValue, }).Execute() failTestAtHttpError(err, httpResp, t) httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ WorkflowId: wfId, - SignalChannelName: anycommandconbination.SignalNameAndId1, + SignalChannelName: anycommandcombination.SignalNameAndId1, SignalValue: &signalValue, }).Execute() failTestAtHttpError(err, httpResp, t) @@ -114,7 +114,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{ WorkflowId: wfId, WorkflowStateExecutionId: "S1-1", - TimerCommandId: iwfidl.PtrString(anycommandconbination.TimerId1), + TimerCommandId: iwfidl.PtrString(anycommandcombination.TimerId1), }).Execute() failTestAtHttpError(err, httpResp, t) @@ -124,7 +124,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe // send first signal for s2 httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ WorkflowId: wfId, - SignalChannelName: anycommandconbination.SignalNameAndId1, + SignalChannelName: anycommandcombination.SignalNameAndId1, SignalValue: &signalValue, }).Execute() failTestAtHttpError(err, httpResp, t) @@ -140,7 +140,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ WorkflowId: wfId, - SignalChannelName: anycommandconbination.SignalNameAndId3, + SignalChannelName: anycommandcombination.SignalNameAndId3, SignalValue: &signalValue, }).Execute() failTestAtHttpError(err, httpResp, t) @@ -148,7 +148,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe // send 2nd signal for s2 httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ WorkflowId: wfId, - SignalChannelName: anycommandconbination.SignalNameAndId2, + SignalChannelName: anycommandcombination.SignalNameAndId2, SignalValue: &signalValue, }).Execute() failTestAtHttpError(err, httpResp, t) @@ -176,7 +176,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe "S1_decide": 1, "S2_start": 2, "S2_decide": 1, - }, history, "anycommandconbination test fail, %v", history) + }, history, "anycommandcombination test fail, %v", history) var s1CommandResults iwfidl.CommandResults var s2CommandResults iwfidl.CommandResults diff --git a/integ/workflow/any_command_close/routers.go b/integ/workflow/any_command_close/routers.go index c7ca3675..86e7f437 100644 --- a/integ/workflow/any_command_close/routers.go +++ b/integ/workflow/any_command_close/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" ) @@ -30,14 +31,14 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -51,7 +52,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } if req.GetWorkflowStateId() == State1 { // Proceed after either signal is received @@ -95,21 +100,25 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } if req.GetWorkflowStateId() == State1 { signalResults := req.GetCommandResults() - h.invokeData["signalCommandResultsLength"] = len(signalResults.SignalResults) + h.invokeData.Store("signalCommandResultsLength", len(signalResults.SignalResults)) // Trigger signals - h.invokeData["signalChannelName0"] = signalResults.SignalResults[0].GetSignalChannelName() - h.invokeData["signalCommandId0"] = signalResults.SignalResults[0].GetCommandId() - h.invokeData["signalStatus0"] = signalResults.SignalResults[0].GetSignalRequestStatus() + h.invokeData.Store("signalChannelName0", signalResults.SignalResults[0].GetSignalChannelName()) + h.invokeData.Store("signalCommandId0", signalResults.SignalResults[0].GetCommandId()) + h.invokeData.Store("signalStatus0", signalResults.SignalResults[0].GetSignalRequestStatus()) - h.invokeData["signalChannelName1"] = signalResults.SignalResults[1].GetSignalChannelName() - h.invokeData["signalCommandId1"] = signalResults.SignalResults[1].GetCommandId() - h.invokeData["signalStatus1"] = signalResults.SignalResults[1].GetSignalRequestStatus() - h.invokeData["signalValue1"] = signalResults.SignalResults[1].GetSignalValue() + h.invokeData.Store("signalChannelName1", signalResults.SignalResults[1].GetSignalChannelName()) + h.invokeData.Store("signalCommandId1", signalResults.SignalResults[1].GetCommandId()) + h.invokeData.Store("signalStatus1", signalResults.SignalResults[1].GetSignalRequestStatus()) + h.invokeData.Store("signalValue1", signalResults.SignalResults[1].GetSignalValue()) // Move to State 2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -141,5 +150,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/any_command_combination/routers.go b/integ/workflow/any_command_combination/routers.go index dd58ced9..7a946012 100644 --- a/integ/workflow/any_command_combination/routers.go +++ b/integ/workflow/any_command_combination/routers.go @@ -9,6 +9,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" "time" ) @@ -34,8 +35,8 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map //we want to confirm that the interpreter workflow activity will fail when the commandId is empty with ANY_COMMAND_COMBINATION_COMPLETED hasS1RetriedForInvalidCommandId bool hasS2RetriedForInvalidCommandId bool @@ -43,8 +44,8 @@ type handler struct { func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, hasS1RetriedForInvalidCommandId: false, hasS2RetriedForInvalidCommandId: false, } @@ -99,7 +100,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { } if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } if req.GetWorkflowStateId() == State1 { // If the state has already retried an invalid command, proceed on combination completed @@ -192,11 +197,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } // Trigger signals and move to State 2 if req.GetWorkflowStateId() == State1 { - h.invokeData["s1_commandResults"] = req.GetCommandResults() + h.invokeData.Store("s1_commandResults", req.GetCommandResults()) c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ @@ -210,7 +219,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { return } else if req.GetWorkflowStateId() == State2 { // Trigger data and move to completion - h.invokeData["s2_commandResults"] = req.GetCommandResults() + h.invokeData.Store("s2_commandResults", req.GetCommandResults()) c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ NextStates: []iwfidl.StateMovement{ @@ -228,5 +237,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/any_timer_signal/routers.go b/integ/workflow/any_timer_signal/routers.go index d42f42aa..20f83fe8 100644 --- a/integ/workflow/any_timer_signal/routers.go +++ b/integ/workflow/any_timer_signal/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" "time" ) @@ -30,14 +31,14 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -51,7 +52,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } if req.GetWorkflowStateId() == State1 { var timerCommands []iwfidl.TimerCommand @@ -105,7 +110,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } if req.GetWorkflowStateId() == State1 { signalResults := req.GetCommandResults() var movements []iwfidl.StateMovement @@ -113,16 +122,16 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { context := req.GetContext() // On first State 1 attempt, trigger signals and stay on the first state if context.GetStateExecutionId() == State1+"-"+"1" { - h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName() - h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId() - h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus() + h.invokeData.Store("signalChannelName1", signalResults.SignalResults[0].GetSignalChannelName()) + h.invokeData.Store("signalCommandId1", signalResults.SignalResults[0].GetCommandId()) + h.invokeData.Store("signalStatus1", signalResults.SignalResults[0].GetSignalRequestStatus()) movements = []iwfidl.StateMovement{{StateId: State1}} } else { // After the first State 1 attempt, trigger signals and move to next state - h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName() - h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId() - h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus() - h.invokeData["signalValue2"] = signalResults.SignalResults[0].GetSignalValue() + h.invokeData.Store("signalChannelName2", signalResults.SignalResults[0].GetSignalChannelName()) + h.invokeData.Store("signalCommandId2", signalResults.SignalResults[0].GetCommandId()) + h.invokeData.Store("signalStatus2", signalResults.SignalResults[0].GetSignalRequestStatus()) + h.invokeData.Store("signalValue2", signalResults.SignalResults[0].GetSignalValue()) movements = []iwfidl.StateMovement{{StateId: State2}} } @@ -151,5 +160,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/basic/routers.go b/integ/workflow/basic/routers.go index a1e6b01b..5ba438b7 100644 --- a/integ/workflow/basic/routers.go +++ b/integ/workflow/basic/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" ) @@ -27,12 +28,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -53,7 +54,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { if req.GetWorkflowType() == WorkflowType { // Basic workflow go straight to decide methods without any commands if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ CommandRequest: &iwfidl.CommandRequest{ DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), @@ -79,7 +84,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } if req.GetWorkflowStateId() == State1 { // Move to next state @@ -130,5 +139,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/conditional_close/routers.go b/integ/workflow/conditional_close/routers.go index 6b6e008e..b900927b 100644 --- a/integ/workflow/conditional_close/routers.go +++ b/integ/workflow/conditional_close/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" "time" ) @@ -35,14 +36,14 @@ var TestInput = iwfidl.EncodedObject{ } type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -53,7 +54,11 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { return } log.Println("received workflow worker rpc request, ", req) - h.invokeHistory[req.RpcName]++ + if value, ok := h.invokeHistory.Load(req.RpcName); ok { + h.invokeHistory.Store(req.RpcName, value.(int64)+1) + } else { + h.invokeHistory.Store(req.RpcName, int64(1)) + } // Return channel name c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{ @@ -75,7 +80,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } if req.GetWorkflowStateId() == State1 { // Proceed when channel is published to @@ -120,7 +129,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { var internalChanPub []iwfidl.InterStateChannelPublishing @@ -174,5 +188,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/deadend/routers.go b/integ/workflow/deadend/routers.go index 257b01ba..c5246da4 100644 --- a/integ/workflow/deadend/routers.go +++ b/integ/workflow/deadend/routers.go @@ -9,6 +9,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" ) @@ -32,14 +33,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -103,7 +102,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } // Move to the dead-end state if req.GetWorkflowStateId() == State1 { @@ -125,5 +128,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/greedy_timer/routers.go b/integ/workflow/greedy_timer/routers.go index 830217ce..01632491 100644 --- a/integ/workflow/greedy_timer/routers.go +++ b/integ/workflow/greedy_timer/routers.go @@ -12,6 +12,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" ) @@ -26,8 +27,8 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } type Input struct { @@ -36,8 +37,8 @@ type Input struct { func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -84,7 +85,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == ScheduleTimerState { var input Input @@ -124,13 +130,17 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ if req.GetWorkflowStateId() == ScheduleTimerState { - h.invokeData["completed_state_id"] = req.GetContext().StateExecutionId + h.invokeData.Store("completed_state_id", req.GetContext().StateExecutionId) results := req.GetCommandResults() timerResults := results.GetTimerResults() - h.invokeData["completed_timer_id"] = timerResults[0].CommandId + h.invokeData.Store("completed_timer_id", timerResults[0].CommandId) c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ @@ -150,5 +160,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/headers/routers.go b/integ/workflow/headers/routers.go index f4b5d6c2..d14634d9 100644 --- a/integ/workflow/headers/routers.go +++ b/integ/workflow/headers/routers.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" ) @@ -25,12 +26,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -52,7 +53,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { if req.GetWorkflowType() == WorkflowType { // Basic workflow to go straight to the decide methods without any commands if req.GetWorkflowStateId() == State1 { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ CommandRequest: &iwfidl.CommandRequest{ DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), @@ -80,7 +86,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Move to completion c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -101,5 +112,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/interstate/routers.go b/integ/workflow/interstate/routers.go index d0476afc..b1705fa4 100644 --- a/integ/workflow/interstate/routers.go +++ b/integ/workflow/interstate/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" "time" ) @@ -49,14 +50,14 @@ var TestVal2 = iwfidl.EncodedObject{ } type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -70,7 +71,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } // Go straight to the decide methods without any commands if req.GetWorkflowStateId() == State1 { @@ -143,7 +148,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // State 1 requires no pre-reqs // Move to state 21 & 22: @@ -166,7 +176,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { if req.GetWorkflowStateId() == State21 { results := req.GetCommandResults() - h.invokeData[State21+"received"] = results.GetInterStateChannelResults()[0].GetValue() + h.invokeData.Store(State21+"received", results.GetInterStateChannelResults()[0].GetValue()) // Move to state 31, which will wait for channel 2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -183,7 +193,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { if req.GetWorkflowStateId() == State31 { results := req.GetCommandResults() - h.invokeData[State31+"received"] = results.GetInterStateChannelResults()[0].GetValue() + h.invokeData.Store(State31+"received", results.GetInterStateChannelResults()[0].GetValue()) // Move to completion c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -224,5 +234,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/locking/routers.go b/integ/workflow/locking/routers.go index 7ab9abdb..e86cdf42 100644 --- a/integ/workflow/locking/routers.go +++ b/integ/workflow/locking/routers.go @@ -11,6 +11,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" "time" ) @@ -89,15 +90,13 @@ var state2Movement = iwfidl.StateMovement{ } type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map rpcInvokes int32 } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -237,7 +236,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } // Go straight to the decide methods without any commands if req.GetWorkflowStateId() == State1 { @@ -312,7 +315,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { stms := []iwfidl.StateMovement{ @@ -401,5 +409,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/parallel/routers.go b/integ/workflow/parallel/routers.go index adb24162..d1a4acec 100644 --- a/integ/workflow/parallel/routers.go +++ b/integ/workflow/parallel/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" "time" ) @@ -52,12 +53,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -71,7 +72,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } // Go straight to the decide methods without any commands c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ @@ -94,7 +99,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + var nextStates []iwfidl.StateMovement switch req.GetWorkflowStateId() { case State1: @@ -185,5 +195,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/persistence/routers.go b/integ/workflow/persistence/routers.go index 01889e75..aee809de 100644 --- a/integ/workflow/persistence/routers.go +++ b/integ/workflow/persistence/routers.go @@ -9,6 +9,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" ) @@ -64,14 +65,14 @@ var testStateLocalVal = iwfidl.EncodedObject{ } type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -97,7 +98,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { } if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { var sa []iwfidl.SearchAttribute sa = []iwfidl.SearchAttribute{ @@ -159,8 +165,8 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { intSaFounds++ } } - h.invokeData["S2_start_kwSaFounds"] = kwSaFounds - h.invokeData["S2_start_intSaFounds"] = intSaFounds + h.invokeData.Store("S2_start_kwSaFounds", kwSaFounds) + h.invokeData.Store("S2_start_intSaFounds", intSaFounds) // Determine if the attribute is found in the request queryAttFound := false @@ -174,7 +180,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { helpers.FailTestWithErrorMessage("should not load key that is not included in partial loading", t) } } - h.invokeData["S2_start_queryAttFound"] = queryAttFound + h.invokeData.Store("S2_start_queryAttFound", queryAttFound) // Go straight to the decide methods without any commands c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ @@ -238,7 +244,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { sas := req.GetSearchAttributes() @@ -255,8 +266,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { intSaFounds++ } } - h.invokeData["S1_decide_kwSaFounds"] = kwSaFounds - h.invokeData["S1_decide_intSaFounds"] = intSaFounds + h.invokeData.Store("S1_decide_kwSaFounds", kwSaFounds) + h.invokeData.Store("S1_decide_intSaFounds", intSaFounds) queryAttFound := 0 queryAtts := req.GetDataObjects() @@ -271,7 +282,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { queryAttFound++ } } - h.invokeData["S1_decide_queryAttFound"] = queryAttFound + h.invokeData.Store("S1_decide_queryAttFound", queryAttFound) // Determine if local attribute is found localAttFound := false @@ -280,7 +291,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { if localAtt.GetKey() == TestStateLocalKey && value.GetData() == testStateLocalVal.GetData() && value.GetEncoding() == testStateLocalVal.GetEncoding() { localAttFound = true } - h.invokeData["S1_decide_localAttFound"] = localAttFound + h.invokeData.Store("S1_decide_localAttFound", localAttFound) var sa []iwfidl.SearchAttribute sa = []iwfidl.SearchAttribute{ @@ -345,8 +356,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { intSaFounds++ } } - h.invokeData["S2_decide_kwSaFounds"] = kwSaFounds - h.invokeData["S2_decide_intSaFounds"] = intSaFounds + h.invokeData.Store("S2_decide_kwSaFounds", kwSaFounds) + h.invokeData.Store("S2_decide_intSaFounds", intSaFounds) queryAttFound := false queryAtts := req.GetDataObjects() @@ -362,7 +373,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } } - h.invokeData["S2_decide_queryAttFound"] = queryAttFound + h.invokeData.Store("S2_decide_queryAttFound", queryAttFound) // Move to state 3 after with set options c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -443,5 +454,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/persistence_loading_policy/routers.go b/integ/workflow/persistence_loading_policy/routers.go index ee32f7f5..9da9b487 100644 --- a/integ/workflow/persistence_loading_policy/routers.go +++ b/integ/workflow/persistence_loading_policy/routers.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "log" "net/http" + "sync" "testing" ) @@ -31,12 +32,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -54,7 +55,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { return } - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } if req.GetWorkflowStateId() == State2 { // Dynamically get the loadingType from input @@ -85,7 +90,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { return } - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } // Dynamically get the loadingType from input loadingTypeFromInput := req.GetStateInput() @@ -159,7 +168,11 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { return } - h.invokeHistory["rpc"]++ + if value, ok := h.invokeHistory.Load("rpc"); ok { + h.invokeHistory.Store("rpc", value.(int64)+1) + } else { + h.invokeHistory.Store("rpc", int64(1)) + } // dynamically get the loadingType from input loadingTypeFromInput := req.GetInput() @@ -174,9 +187,13 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } - func verifyLoadedAttributes( t *testing.T, searchAttributes []iwfidl.SearchAttribute, diff --git a/integ/workflow/reset/routers.go b/integ/workflow/reset/routers.go index e785ed99..b50f993e 100644 --- a/integ/workflow/reset/routers.go +++ b/integ/workflow/reset/routers.go @@ -9,6 +9,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" ) @@ -29,12 +30,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -57,7 +58,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // go to S2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -119,5 +125,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/rpc/routers.go b/integ/workflow/rpc/routers.go index 7d3e7784..4b45419a 100644 --- a/integ/workflow/rpc/routers.go +++ b/integ/workflow/rpc/routers.go @@ -10,6 +10,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" ) @@ -48,14 +49,14 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -106,9 +107,9 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { helpers.FailTestWithErrorMessage(fmt.Sprintf("invalid rpc name: %s", req.RpcName), t) } - h.invokeData[req.RpcName+"-input"] = req.Input - h.invokeData[req.RpcName+"-search-attributes"] = req.SearchAttributes - h.invokeData[req.RpcName+"-data-attributes"] = req.DataAttributes + h.invokeData.Store(req.RpcName+"-input", req.Input) + h.invokeData.Store(req.RpcName+"-search-attributes", req.SearchAttributes) + h.invokeData.Store(req.RpcName+"-data-attributes", req.DataAttributes) if req.RpcName == RPCNameReadOnly { c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{ @@ -177,7 +178,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { upsertSAs := []iwfidl.SearchAttribute{ { @@ -240,14 +246,19 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { commandRes := req.GetCommandResults() res := commandRes.GetInterStateChannelResults()[0] if res.GetRequestStatus() != iwfidl.RECEIVED || res.GetChannelName() != TestInterStateChannelName { helpers.FailTestWithErrorMessage("the signal should be received", t) } - h.invokeData[TestInterStateChannelName] = res.Value + h.invokeData.Store(TestInterStateChannelName, res.Value) // Move to state 2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -279,5 +290,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/signal/routers.go b/integ/workflow/signal/routers.go index 7ca8d11e..063f6c06 100644 --- a/integ/workflow/signal/routers.go +++ b/integ/workflow/signal/routers.go @@ -11,6 +11,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" ) @@ -42,8 +43,15 @@ var StateOptionsForLargeDataAttributes = iwfidl.WorkflowStateOptions{ } type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map +} + +func NewHandler() common.WorkflowHandlerWithRpc { + return &handler{ + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, + } } func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { @@ -87,13 +95,6 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { return } -func NewHandler() common.WorkflowHandlerWithRpc { - return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), - } -} - // ApiV1WorkflowStartPost - for a workflow func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { var req iwfidl.WorkflowStateStartRequest @@ -104,7 +105,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Proceed when 4 signals are received c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ @@ -153,7 +159,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { signalResults := req.GetCommandResults() @@ -162,8 +173,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { signalId := signalResults.SignalResults[i].GetCommandId() signalValue := signalResults.SignalResults[i].GetSignalValue() - h.invokeData[fmt.Sprintf("signalId%v", i)] = signalId - h.invokeData[fmt.Sprintf("signalValue%v", i)] = signalValue + h.invokeData.Store(fmt.Sprintf("signalId%v", i), signalId) + h.invokeData.Store(fmt.Sprintf("signalValue%v", i), signalValue) } // Move to State 2 @@ -197,5 +208,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/skipstart/routers.go b/integ/workflow/skipstart/routers.go index c9e0a910..c242de0a 100644 --- a/integ/workflow/skipstart/routers.go +++ b/integ/workflow/skipstart/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" ) @@ -27,12 +28,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -50,7 +51,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Move to State 2 with the provided input & options c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -87,5 +93,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/timer/routers.go b/integ/workflow/timer/routers.go index 4b3c76b6..b8ce61f5 100644 --- a/integ/workflow/timer/routers.go +++ b/integ/workflow/timer/routers.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" "time" @@ -32,14 +33,14 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -53,14 +54,19 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { nowInt, err := strconv.Atoi(req.StateInput.GetData()) if err != nil { helpers.FailTestWithError(err, t) } now := int64(nowInt) - h.invokeData["scheduled_at"] = now + h.invokeData.Store("scheduled_at", now) // Proceed after 3 timers complete c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ @@ -108,13 +114,18 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { now := time.Now().Unix() - h.invokeData["fired_at"] = now + h.invokeData.Store("fired_at", now) timerResults := req.GetCommandResults() timerId := timerResults.GetTimerResults()[0].GetCommandId() - h.invokeData["timer_id"] = timerId + h.invokeData.Store("timer_id", timerId) // Move to State 2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ @@ -145,5 +156,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/wait_for_state_completion/routers.go b/integ/workflow/wait_for_state_completion/routers.go index 10fb7f5a..c0fc0064 100644 --- a/integ/workflow/wait_for_state_completion/routers.go +++ b/integ/workflow/wait_for_state_completion/routers.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" "time" @@ -32,14 +33,14 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, + invokeData: sync.Map{}, } } @@ -53,14 +54,19 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { nowInt, err := strconv.Atoi(req.StateInput.GetData()) if err != nil { helpers.FailTestWithError(err, t) } now := int64(nowInt) - h.invokeData["scheduled_at"] = now + h.invokeData.Store("scheduled_at", now) // Proceed after 10s c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ CommandRequest: &iwfidl.CommandRequest{ @@ -99,13 +105,18 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { now := time.Now().Unix() - h.invokeData["fired_at"] = now + h.invokeData.Store("fired_at", now) timerResults := req.GetCommandResults() timerId := timerResults.GetTimerResults()[0].GetCommandId() - h.invokeData["timer_id"] = timerId + h.invokeData.Store("timer_id", timerId) // Move to State 2 c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ @@ -137,5 +148,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/wait_until_search_attributes/routers.go b/integ/workflow/wait_until_search_attributes/routers.go index 1954ea6f..f04469a6 100644 --- a/integ/workflow/wait_until_search_attributes/routers.go +++ b/integ/workflow/wait_until_search_attributes/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" "time" ) @@ -32,12 +33,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -50,7 +51,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Go straight to the decide methods without any commands c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ @@ -83,7 +89,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Move to State 2, skipping wait until c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -120,5 +131,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wait_until_search_attributes_optimization/routers.go b/integ/workflow/wait_until_search_attributes_optimization/routers.go index 296123c9..c6c5bc44 100644 --- a/integ/workflow/wait_until_search_attributes_optimization/routers.go +++ b/integ/workflow/wait_until_search_attributes_optimization/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" + "sync" "testing" "time" ) @@ -51,12 +52,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -69,7 +70,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 || req.GetWorkflowStateId() == State3 || req.GetWorkflowStateId() == State5 || req.GetWorkflowStateId() == State6 || req.GetWorkflowStateId() == State7 { // Go straight to the decide methods without any commands @@ -109,7 +115,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { context := req.GetContext() if context.GetStateExecutionId() == "S1-5" { @@ -254,5 +265,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_execute_api_fail_and_proceed/routers.go b/integ/workflow/wf_execute_api_fail_and_proceed/routers.go index 2b7be978..8681a6a1 100644 --- a/integ/workflow/wf_execute_api_fail_and_proceed/routers.go +++ b/integ/workflow/wf_execute_api_fail_and_proceed/routers.go @@ -5,6 +5,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" "github.com/gin-gonic/gin" @@ -30,14 +31,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -54,7 +53,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } } input := req.StateInput if req.WorkflowStateId == State1 { @@ -88,5 +91,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_force_fail/routers.go b/integ/workflow/wf_force_fail/routers.go index 286354fa..0de950cc 100644 --- a/integ/workflow/wf_force_fail/routers.go +++ b/integ/workflow/wf_force_fail/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" ) @@ -24,8 +25,7 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } var TestData = &iwfidl.EncodedObject{ @@ -35,8 +35,7 @@ var TestData = &iwfidl.EncodedObject{ func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -49,7 +48,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Empty response c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{}) @@ -69,7 +73,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType && req.GetWorkflowStateId() == State1 { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + // Force fail c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ @@ -88,5 +97,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_ignore_already_started/routers.go b/integ/workflow/wf_ignore_already_started/routers.go index 3f0b58a9..6375cae2 100644 --- a/integ/workflow/wf_ignore_already_started/routers.go +++ b/integ/workflow/wf_ignore_already_started/routers.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strconv" + "sync" "testing" "time" @@ -27,13 +28,13 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map + invokeData sync.Map } func NewHandler() *handler { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -47,14 +48,19 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { nowInt, err := strconv.Atoi(req.StateInput.GetData()) if err != nil { helpers.FailTestWithError(err, t) } now := int64(nowInt) - h.invokeData["scheduled_at"] = now + h.invokeData.Store("scheduled_at", now) c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ CommandRequest: &iwfidl.CommandRequest{ TimerCommands: []iwfidl.TimerCommand{ @@ -82,13 +88,18 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("received state decide request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } + if req.GetWorkflowStateId() == State1 { now := time.Now().Unix() - h.invokeData["fired_at"] = now + h.invokeData.Store("fired_at", now) timerResults := req.GetCommandResults() timerId := timerResults.GetTimerResults()[0].GetCommandId() - h.invokeData["timer_id"] = timerId + h.invokeData.Store("timer_id", timerId) c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ StateDecision: &iwfidl.StateDecision{ NextStates: []iwfidl.StateMovement{ @@ -106,5 +117,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + invokeData := make(map[string]interface{}) + h.invokeData.Range(func(key, value interface{}) bool { + invokeData[key.(string)] = value + return true + }) + return invokeHistory, invokeData } diff --git a/integ/workflow/wf_state_api_fail/routers.go b/integ/workflow/wf_state_api_fail/routers.go index f33f72a5..cf483b0e 100644 --- a/integ/workflow/wf_state_api_fail/routers.go +++ b/integ/workflow/wf_state_api_fail/routers.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/integ/workflow/common" "log" "net/http" + "sync" "testing" ) @@ -22,14 +23,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -42,7 +41,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Bad Request response c.JSON(http.StatusBadRequest, iwfidl.WorkflowStateStartResponse{}) @@ -58,5 +62,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_state_api_fail_and_proceed/routers.go b/integ/workflow/wf_state_api_fail_and_proceed/routers.go index e4a63560..7a8a583f 100644 --- a/integ/workflow/wf_state_api_fail_and_proceed/routers.go +++ b/integ/workflow/wf_state_api_fail_and_proceed/routers.go @@ -5,6 +5,7 @@ import ( "github.com/indeedeng/iwf/service" "log" "net/http" + "sync" "testing" "github.com/gin-gonic/gin" @@ -24,14 +25,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -44,7 +43,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Bad Request response c.JSON(http.StatusBadRequest, iwfidl.WorkflowStateStartResponse{}) @@ -68,7 +72,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } } // Move to completion c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ @@ -83,5 +91,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_state_api_timeout/routers.go b/integ/workflow/wf_state_api_timeout/routers.go index 0c849c38..d906b545 100644 --- a/integ/workflow/wf_state_api_timeout/routers.go +++ b/integ/workflow/wf_state_api_timeout/routers.go @@ -8,6 +8,7 @@ import ( "github.com/indeedeng/iwf/integ/workflow/common" "log" "net/http" + "sync" "testing" "time" ) @@ -25,14 +26,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 - invokeData map[string]interface{} + invokeHistory sync.Map } func NewHandler() common.WorkflowHandler { return &handler{ - invokeHistory: make(map[string]int64), - invokeData: make(map[string]interface{}), + invokeHistory: sync.Map{}, } } @@ -45,7 +44,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("received state start request, ", req) if req.GetWorkflowType() == WorkflowType { - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } + if req.GetWorkflowStateId() == State1 { // Sleep for longer than the timeout time.Sleep(time.Second * 30) @@ -65,5 +69,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, h.invokeData + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_state_options_data_attributes_loading/routers.go b/integ/workflow/wf_state_options_data_attributes_loading/routers.go index b7210e53..954db5a6 100644 --- a/integ/workflow/wf_state_options_data_attributes_loading/routers.go +++ b/integ/workflow/wf_state_options_data_attributes_loading/routers.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "log" "net/http" + "sync" "testing" ) @@ -48,12 +49,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -71,7 +72,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("state_options_data_attributes_loading: received state start request, ", req) - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } currentMethod := "WaitUntil" loadingTypeFromInput := req.GetStateInput() @@ -107,7 +112,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("state_options_data_attributes_loading: received state decide request, ", req) - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } currentMethod := "Execute" loadingTypeFromInput := req.GetStateInput() @@ -352,5 +361,10 @@ func getExpectedDataAttributes(stateId string, method string, loadingType iwfidl } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil } diff --git a/integ/workflow/wf_state_options_search_attributes_loading/routers.go b/integ/workflow/wf_state_options_search_attributes_loading/routers.go index 97f8e6b5..a35b8d88 100644 --- a/integ/workflow/wf_state_options_search_attributes_loading/routers.go +++ b/integ/workflow/wf_state_options_search_attributes_loading/routers.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "log" "net/http" + "sync" "testing" ) @@ -48,12 +49,12 @@ const ( ) type handler struct { - invokeHistory map[string]int64 + invokeHistory sync.Map } func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ - invokeHistory: make(map[string]int64), + invokeHistory: sync.Map{}, } } @@ -71,7 +72,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { log.Println("state_options_search_attributes_loading: received state decide request, ", req) - h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1)) + } currentMethod := "WaitUntil" loadingTypeFromInput := req.GetStateInput() @@ -107,7 +112,11 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { log.Println("state_options_search_attributes_loading: received state decide request, ", req) - h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1) + } else { + h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1)) + } currentMethod := "Execute" loadingTypeFromInput := req.GetStateInput() @@ -345,5 +354,10 @@ func getExpectedSearchAttributes(stateId string, method string, loadingType iwfi } func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { - return h.invokeHistory, nil + invokeHistory := make(map[string]int64) + h.invokeHistory.Range(func(key, value interface{}) bool { + invokeHistory[key.(string)] = value.(int64) + return true + }) + return invokeHistory, nil }