Skip to content

Commit 823950b

Browse files
authored
Fix RespondWorkflowTaskFailed workflow termination (temporalio#8725)
## What changed? - Properly terminate workflow in RespondWorkflowTaskFailed to flush buffered events. ## Why? - `workflow.TerminateWorkflow` util func performs failing started workflow task, flushing buffered event and terminating workflow, but it won't work properly if workflow task has already been failed. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s)
1 parent 2513ad5 commit 823950b

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

service/history/api/respondworkflowtaskfailed/api.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,6 @@ func Invoke(
7575
}, nil
7676
}
7777

78-
if _, err := mutableState.AddWorkflowTaskFailedEvent(
79-
workflowTask,
80-
request.GetCause(),
81-
request.GetFailure(),
82-
request.GetIdentity(),
83-
request.GetWorkerVersion(),
84-
request.GetBinaryChecksum(),
85-
"",
86-
"",
87-
0); err != nil {
88-
return nil, err
89-
}
90-
9178
metrics.FailedWorkflowTasksCounter.With(shardContext.GetMetricsHandler()).Record(
9279
1,
9380
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskFailedScope),
@@ -112,6 +99,21 @@ func Invoke(
11299
return api.UpdateWorkflowTerminate, nil
113100
}
114101

102+
if _, err := mutableState.AddWorkflowTaskFailedEvent(
103+
workflowTask,
104+
request.GetCause(),
105+
request.GetFailure(),
106+
request.GetIdentity(),
107+
//nolint:staticcheck
108+
request.GetWorkerVersion(),
109+
//nolint:staticcheck
110+
request.GetBinaryChecksum(),
111+
"",
112+
"",
113+
0); err != nil {
114+
return nil, err
115+
}
116+
115117
// TODO (alex-update): if it was speculative WT that failed, and there is nothing but pending updates,
116118
// new WT also should be create as speculative (or not?). Currently, it will be recreated as normal WT.
117119
return &api.UpdateWorkflowAction{

tests/workflow_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,17 @@ func (s *WorkflowTestSuite) TestTerminateWorkflowOnMessageTooLargeFailure() {
10211021
s.Logger.Info("PollWorkflowTaskQueue", tag.Error(err))
10221022
s.NoError(err)
10231023

1024+
// send a signal to test buffered event case.
1025+
_, err = s.FrontendClient().SignalWorkflowExecution(testContext, &workflowservice.SignalWorkflowExecutionRequest{
1026+
Namespace: s.Namespace().String(),
1027+
WorkflowExecution: &commonpb.WorkflowExecution{
1028+
WorkflowId: tv.WorkflowID(),
1029+
RunId: we.RunId,
1030+
},
1031+
SignalName: "buffered-signal",
1032+
})
1033+
s.NoError(err)
1034+
10241035
// respond workflow task as failed with grpc message too large error
10251036
_, err = s.FrontendClient().RespondWorkflowTaskFailed(testContext, &workflowservice.RespondWorkflowTaskFailedRequest{
10261037
Namespace: s.Namespace().String(),
@@ -1034,12 +1045,14 @@ func (s *WorkflowTestSuite) TestTerminateWorkflowOnMessageTooLargeFailure() {
10341045
RunId: we.RunId,
10351046
})
10361047

1048+
// verify that workflow is terminated and buffered signal is flushed
10371049
s.EqualHistoryEvents(`
10381050
1 WorkflowExecutionStarted
10391051
2 WorkflowTaskScheduled
10401052
3 WorkflowTaskStarted
10411053
4 WorkflowTaskFailed
1042-
5 WorkflowExecutionTerminated`, // verify that workflow is terminated
1054+
5 WorkflowExecutionSignaled
1055+
6 WorkflowExecutionTerminated`,
10431056
historyEvents)
10441057
}
10451058

0 commit comments

Comments
 (0)