Skip to content

Commit 4545000

Browse files
authored
Validate workflow task attempt in standby task processing (#7663)
## What changed? <!-- Describe what has changed in this PR --> - Validate workflow task attempt in standby task processing ## Why? <!-- Tell your future self why have you made these changes --> - We have a same logic for active timer task, but not in standby. - This is only needed in state-based replication world where transient workflow task (attempt > 1) state will be replicated as well. Previously In history-based replication world, the attempt count in standby should always be 1 since transient workflow task has no events and won't be replicated, so the attempt check is not necessary. In state-based replication world, each replicated attempt will generate a start_to_close timeout timer and without this check, timer for previous attempt will keep doing verification and eventually get dropped and trigger alerts. No impact to the workflow though. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> - Unit test ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> - It worst case, after failover, workflow task can't timeout properly and workflow will get stuck. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> - Could be.
1 parent d529b3a commit 4545000

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

service/history/timer_queue_standby_task_executor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,16 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask(
364364
return nil, err
365365
}
366366

367+
if workflowTask.Attempt != timerTask.ScheduleAttempt {
368+
return nil, nil
369+
}
370+
371+
// We could check if workflow task is started state (since the timeout type here is START_TO_CLOSE)
372+
// but that's unnecessary.
373+
//
374+
// Ifthe workflow task is in scheduled state, it must have a higher attempt
375+
// count and will be captured by the attempt check above.
376+
367377
return &struct{}{}, nil
368378
}
369379

service/history/timer_queue_standby_task_executor_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"go.temporal.io/server/common/clock"
4949
"go.temporal.io/server/common/cluster"
5050
"go.temporal.io/server/common/definition"
51+
"go.temporal.io/server/common/failure"
5152
"go.temporal.io/server/common/locks"
5253
"go.temporal.io/server/common/log"
5354
"go.temporal.io/server/common/metrics"
@@ -919,6 +920,85 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_Succ
919920
s.Nil(resp.ExecutionErr)
920921
}
921922

923+
func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_AttemptMismatch() {
924+
execution := &commonpb.WorkflowExecution{
925+
WorkflowId: "some random workflow ID",
926+
RunId: uuid.New(),
927+
}
928+
workflowType := "some random workflow type"
929+
taskQueueName := "some random task queue"
930+
931+
mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId())
932+
_, err := mutableState.AddWorkflowExecutionStartedEvent(
933+
execution,
934+
&historyservice.StartWorkflowExecutionRequest{
935+
Attempt: 1,
936+
NamespaceId: s.namespaceID.String(),
937+
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
938+
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
939+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
940+
WorkflowRunTimeout: durationpb.New(200 * time.Second),
941+
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
942+
},
943+
},
944+
)
945+
s.Nil(err)
946+
947+
wt := addWorkflowTaskScheduledEvent(mutableState)
948+
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
949+
wt.StartedEventID = event.GetEventId()
950+
951+
// We must manually update the version history here.
952+
// The logic for scheduling transient workflow task below will use the version history to determine
953+
// if there's failover and if workflow task attempt needs to be reset.
954+
vh, err := versionhistory.GetCurrentVersionHistory(mutableState.GetExecutionInfo().VersionHistories)
955+
s.NoError(err)
956+
err = versionhistory.AddOrUpdateVersionHistoryItem(vh, versionhistory.NewVersionHistoryItem(
957+
event.GetEventId(), event.GetVersion(),
958+
))
959+
s.NoError(err)
960+
961+
event, err = mutableState.AddWorkflowTaskFailedEvent(
962+
wt,
963+
enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR,
964+
failure.NewServerFailure("some random workflow task failure details", false),
965+
"some random workflow task failure identity",
966+
nil,
967+
"",
968+
"",
969+
"",
970+
0,
971+
)
972+
s.NoError(err)
973+
974+
wt, err = mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_TRANSIENT)
975+
s.NoError(err)
976+
s.Equal(int32(2), wt.Attempt)
977+
978+
timerTask := &tasks.WorkflowTaskTimeoutTask{
979+
WorkflowKey: definition.NewWorkflowKey(
980+
s.namespaceID.String(),
981+
execution.GetWorkflowId(),
982+
execution.GetRunId(),
983+
),
984+
// Current task attempt is 2, so the standby verification should complete
985+
// despite current workflow task is still exists with the same scheduled ID
986+
ScheduleAttempt: 1,
987+
Version: s.version,
988+
TaskID: s.mustGenerateTaskID(),
989+
TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
990+
VisibilityTimestamp: s.now,
991+
EventID: wt.ScheduledEventID,
992+
}
993+
994+
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId()+1, event.GetVersion())
995+
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
996+
997+
s.mockShard.SetCurrentTime(s.clusterName, s.now)
998+
resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
999+
s.Nil(resp.ExecutionErr)
1000+
}
1001+
9221002
func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowBackoffTimer_Pending() {
9231003
execution := &commonpb.WorkflowExecution{
9241004
WorkflowId: "some random workflow ID",

0 commit comments

Comments
 (0)