Skip to content

Commit ce58186

Browse files
committed
Adding fix for Issue 503
1 parent 2a9e477 commit ce58186

6 files changed

Lines changed: 198 additions & 1 deletion

File tree

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandCompleteWorkflow(
725725
newExecutionRunID = uuid.NewString()
726726
}
727727

728+
if err := handler.mutableState.FlushPendingActivityEventsForCompletion(); err != nil {
729+
return nil, err
730+
}
731+
728732
// Always add workflow completed event to this one
729733
event, err := handler.mutableState.AddCompletedWorkflowEvent(handler.workflowTaskCompletedID, attr, newExecutionRunID)
730734
if err != nil {
@@ -1024,6 +1028,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow(
10241028
}
10251029
}
10261030

1031+
if err := handler.mutableState.FlushPendingActivityEventsForCompletion(); err != nil {
1032+
return nil, err
1033+
}
1034+
10271035
event, newMutableState, err := handler.mutableState.AddContinueAsNewEvent(
10281036
ctx,
10291037
handler.workflowTaskCompletedID,

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ func TestCommandProtocolMessage(t *testing.T) {
186186
CompleteWorkflowExecutionCommandAttributes: completeWorkflowExecutionCommandAttributes,
187187
}
188188

189-
// mock a failed event creation.
189+
// mock a failed event creation (handler calls FlushPendingActivityEventsForCompletion before AddCompletedWorkflowEvent).
190190
event := &historypb.HistoryEvent{}
191+
tc.ms.EXPECT().FlushPendingActivityEventsForCompletion().Return(nil)
191192
tc.ms.EXPECT().AddCompletedWorkflowEvent(tc.handler.workflowTaskCompletedID, completeWorkflowExecutionCommandAttributes, "").MaxTimes(1).Return(event, fmt.Errorf("FAIL"))
192193
tc.ms.EXPECT().GetExecutionInfo().AnyTimes().Return(&persistencespb.WorkflowExecutionInfo{})
193194
tc.ms.EXPECT().GetExecutionState().AnyTimes().Return(&persistencespb.WorkflowExecutionState{})

service/history/interfaces/mutable_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type (
6767
AddChildWorkflowExecutionTimedOutEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionTimedOutEventAttributes) (*historypb.HistoryEvent, error)
6868
AddCompletedWorkflowEvent(int64, *commandpb.CompleteWorkflowExecutionCommandAttributes, string) (*historypb.HistoryEvent, error)
6969
AddContinueAsNewEvent(context.Context, int64, int64, namespace.Name, *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes, worker_versioning.IsWFTaskQueueInVersionDetector) (*historypb.HistoryEvent, MutableState, error)
70+
FlushPendingActivityEventsForCompletion() error
7071
AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, WorkflowTaskCompletionLimits) (*historypb.HistoryEvent, error)
7172
AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity string, versioningStamp *commonpb.WorkerVersionStamp, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error)
7273
AddWorkflowTaskScheduleToStartTimeoutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error)

service/history/interfaces/mutable_state_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/mutable_state_impl.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4048,6 +4048,71 @@ func (ms *MutableStateImpl) addStartedEventForTransientActivity(
40484048
return ms.ApplyActivityTaskStartedEvent(event)
40494049
}
40504050

4051+
// FlushPendingActivityEventsForCompletion records ActivityTaskStarted and (where applicable)
4052+
// ActivityTaskFailed for pending activities that are in retry, so history is complete before
4053+
// the workflow completes or continues-as-new. See issue #503.
4054+
func (ms *MutableStateImpl) FlushPendingActivityEventsForCompletion() error {
4055+
opTag := tag.WorkflowActionWorkflowCompleted
4056+
if err := ms.checkMutability(opTag); err != nil {
4057+
return err
4058+
}
4059+
4060+
pending := ms.GetPendingActivityInfos()
4061+
if len(pending) == 0 {
4062+
return nil
4063+
}
4064+
4065+
scheduledEventIDs := make([]int64, 0, len(pending))
4066+
for id := range pending {
4067+
scheduledEventIDs = append(scheduledEventIDs, id)
4068+
}
4069+
4070+
for _, scheduledEventID := range scheduledEventIDs {
4071+
ai, ok := ms.GetActivityInfo(scheduledEventID)
4072+
if !ok || !ai.HasRetryPolicy {
4073+
continue
4074+
}
4075+
4076+
if ai.RetryLastFailure != nil && ai.StartedEventId != common.TransientEventID {
4077+
failedAttempt := ai.Attempt - 1
4078+
if failedAttempt < 1 {
4079+
failedAttempt = 1
4080+
}
4081+
startedEvent := ms.hBuilder.AddActivityTaskStartedEvent(
4082+
scheduledEventID,
4083+
failedAttempt,
4084+
"",
4085+
ai.RetryLastWorkerIdentity,
4086+
ai.RetryLastFailure,
4087+
nil,
4088+
0,
4089+
)
4090+
if err := ms.ApplyActivityTaskStartedEvent(startedEvent); err != nil {
4091+
return err
4092+
}
4093+
if _, err := ms.AddActivityTaskFailedEvent(
4094+
scheduledEventID,
4095+
startedEvent.GetEventId(),
4096+
ai.RetryLastFailure,
4097+
enumspb.RETRY_STATE_IN_PROGRESS,
4098+
ai.RetryLastWorkerIdentity,
4099+
nil,
4100+
); err != nil {
4101+
return err
4102+
}
4103+
continue
4104+
}
4105+
4106+
if ai.StartedEventId == common.TransientEventID {
4107+
if err := ms.addStartedEventForTransientActivity(scheduledEventID, nil); err != nil {
4108+
return err
4109+
}
4110+
}
4111+
}
4112+
4113+
return nil
4114+
}
4115+
40514116
func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
40524117
ai *persistencespb.ActivityInfo,
40534118
scheduledEventID int64,

service/history/workflow/mutable_state_impl_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2959,6 +2959,114 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
29592959
s.Equal(int32(2), updatedActivityInfo.Attempt)
29602960
}
29612961

2962+
func (s *mutableStateSuite) TestFlushPendingActivityEventsForCompletion() {
2963+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
2964+
2965+
tq := &taskqueuepb.TaskQueue{Name: "tq"}
2966+
retryPolicy := &commonpb.RetryPolicy{
2967+
InitialInterval: durationpb.New(time.Second),
2968+
}
2969+
2970+
s.Run("no_pending_activities", func() {
2971+
s.createVersionedMutableStateWithCompletedWFT(tq)
2972+
nextBefore := s.mutableState.GetNextEventID()
2973+
err := s.mutableState.FlushPendingActivityEventsForCompletion()
2974+
s.NoError(err)
2975+
s.Equal(nextBefore, s.mutableState.GetNextEventID())
2976+
})
2977+
2978+
s.Run("pending_activity_with_transient_started_flushes_started", func() {
2979+
s.createVersionedMutableStateWithCompletedWFT(tq)
2980+
workflowTaskCompletedEventID := s.mutableState.GetNextEventID() - 1
2981+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
2982+
workflowTaskCompletedEventID,
2983+
&commandpb.ScheduleActivityTaskCommandAttributes{
2984+
ActivityId: "flush-started",
2985+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
2986+
TaskQueue: tq,
2987+
RetryPolicy: retryPolicy,
2988+
},
2989+
false,
2990+
)
2991+
s.NoError(err)
2992+
_, err = s.mutableState.AddActivityTaskStartedEvent(
2993+
activityInfo,
2994+
activityInfo.ScheduledEventId,
2995+
uuid.NewString(),
2996+
"worker",
2997+
nil,
2998+
nil,
2999+
nil,
3000+
)
3001+
s.NoError(err)
3002+
ai, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
3003+
s.True(ok)
3004+
s.Equal(common.TransientEventID, ai.StartedEventId)
3005+
3006+
err = s.mutableState.FlushPendingActivityEventsForCompletion()
3007+
s.NoError(err)
3008+
ai, ok = s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
3009+
s.True(ok)
3010+
s.NotEqual(common.TransientEventID, ai.StartedEventId, "flush should have written ActivityTaskStarted so StartedEventId is no longer transient")
3011+
})
3012+
3013+
s.Run("pending_activity_between_retries_flushes_started_and_failed", func() {
3014+
s.createVersionedMutableStateWithCompletedWFT(tq)
3015+
workflowTaskCompletedEventID := s.mutableState.GetNextEventID() - 1
3016+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
3017+
workflowTaskCompletedEventID,
3018+
&commandpb.ScheduleActivityTaskCommandAttributes{
3019+
ActivityId: "flush-failed",
3020+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
3021+
TaskQueue: tq,
3022+
RetryPolicy: retryPolicy,
3023+
},
3024+
false,
3025+
)
3026+
s.NoError(err)
3027+
_, err = s.mutableState.AddActivityTaskStartedEvent(
3028+
activityInfo,
3029+
activityInfo.ScheduledEventId,
3030+
uuid.NewString(),
3031+
"worker",
3032+
nil,
3033+
nil,
3034+
nil,
3035+
)
3036+
s.NoError(err)
3037+
_, err = s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{Message: "attempt failed"})
3038+
s.NoError(err)
3039+
s.Len(s.mutableState.GetPendingActivityInfos(), 1)
3040+
3041+
err = s.mutableState.FlushPendingActivityEventsForCompletion()
3042+
s.NoError(err)
3043+
s.Len(s.mutableState.GetPendingActivityInfos(), 0)
3044+
_, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
3045+
s.False(ok)
3046+
})
3047+
3048+
s.Run("pending_activity_without_retry_skipped", func() {
3049+
s.createVersionedMutableStateWithCompletedWFT(tq)
3050+
workflowTaskCompletedEventID := s.mutableState.GetNextEventID() - 1
3051+
_, _, err := s.mutableState.AddActivityTaskScheduledEvent(
3052+
workflowTaskCompletedEventID,
3053+
&commandpb.ScheduleActivityTaskCommandAttributes{
3054+
ActivityId: "no-retry",
3055+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
3056+
TaskQueue: tq,
3057+
},
3058+
false,
3059+
)
3060+
s.NoError(err)
3061+
s.Len(s.mutableState.GetPendingActivityInfos(), 1)
3062+
nextBefore := s.mutableState.GetNextEventID()
3063+
err = s.mutableState.FlushPendingActivityEventsForCompletion()
3064+
s.NoError(err)
3065+
s.Equal(nextBefore, s.mutableState.GetNextEventID())
3066+
s.Len(s.mutableState.GetPendingActivityInfos(), 1)
3067+
})
3068+
}
3069+
29623070
func (s *mutableStateSuite) TestupdateBuildIdsAndDeploymentSearchAttributes() {
29633071
versioned := func(buildId string) *commonpb.WorkerVersionStamp {
29643072
return &commonpb.WorkerVersionStamp{BuildId: buildId, UseVersioning: true}

0 commit comments

Comments
 (0)