Skip to content

Commit bb30c0d

Browse files
authored
Check if activity is paused or stamp mismatch in processActivityTask() (temporalio#8068)
## What changed? Now checking if the activity is paused or if the stamp is outdated before dispatching a matching task. ## Why? Prevents unnecessary creation and retrying of tasks in matching queues. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks N/A
1 parent 6bf61a0 commit bb30c0d

File tree

4 files changed

+129
-1
lines changed

4 files changed

+129
-1
lines changed

service/history/transfer_queue_active_task_executor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask(
229229
return consts.ErrActivityTaskNotFound
230230
}
231231

232+
if ai.Stamp != task.Stamp || ai.Paused {
233+
release(nil) // release(nil) so that the mutable state is not unloaded from cache
234+
return consts.ErrStaleReference // drop the task
235+
}
236+
232237
err = CheckTaskVersion(t.shardContext, t.logger, mutableState.GetNamespaceEntry(), ai.Version, task.Version, task)
233238
if err != nil {
234239
return err

service/history/transfer_queue_active_task_executor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,64 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Duplicati
430430
s.ErrorIs(resp.ExecutionErr, consts.ErrActivityTaskNotFound)
431431
}
432432

433+
func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Paused() {
434+
execution := &commonpb.WorkflowExecution{
435+
WorkflowId: "some random workflow ID",
436+
RunId: uuid.New(),
437+
}
438+
workflowType := "some random workflow type"
439+
taskQueueName := "some random task queue"
440+
441+
mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId())
442+
_, err := mutableState.AddWorkflowExecutionStartedEvent(
443+
execution,
444+
&historyservice.StartWorkflowExecutionRequest{
445+
Attempt: 1,
446+
NamespaceId: s.namespaceID.String(),
447+
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
448+
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
449+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
450+
WorkflowExecutionTimeout: durationpb.New(2 * time.Second),
451+
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
452+
},
453+
},
454+
)
455+
s.Nil(err)
456+
457+
wt := addWorkflowTaskScheduledEvent(mutableState)
458+
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
459+
wt.StartedEventID = event.GetEventId()
460+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
461+
462+
taskID := s.mustGenerateTaskID()
463+
activityID := "activity-1"
464+
activityType := "some random activity type"
465+
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskQueueName, &commonpb.Payloads{}, 1*time.Second, 1*time.Second, 1*time.Second, 1*time.Second)
466+
467+
// Set the activity as paused
468+
ai.Paused = true
469+
470+
transferTask := &tasks.ActivityTask{
471+
WorkflowKey: definition.NewWorkflowKey(
472+
s.namespaceID.String(),
473+
execution.GetWorkflowId(),
474+
execution.GetRunId(),
475+
),
476+
Version: s.version,
477+
TaskID: taskID,
478+
TaskQueue: taskQueueName,
479+
ScheduledEventID: event.GetEventId(),
480+
VisibilityTimestamp: time.Now().UTC(),
481+
Stamp: ai.Stamp, // Ensure stamp matches
482+
}
483+
484+
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
485+
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
486+
487+
resp := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
488+
s.ErrorIs(resp.ExecutionErr, consts.ErrStaleReference)
489+
}
490+
433491
func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_FirstWorkflowTask() {
434492
execution := &commonpb.WorkflowExecution{
435493
WorkflowId: "some random workflow ID",

service/history/transfer_queue_standby_task_executor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask(
163163
return nil, nil
164164
}
165165

166+
if activityInfo.Stamp != transferTask.Stamp || activityInfo.Paused {
167+
return nil, nil // drop the task
168+
}
169+
166170
err := CheckTaskVersion(t.shardContext, t.logger, mutableState.GetNamespaceEntry(), activityInfo.Version, transferTask.Version, transferTask)
167171
if err != nil {
168172
return nil, err

service/history/transfer_queue_standby_task_executor_test.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,68 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Success(
429429
s.Nil(resp.ExecutionErr)
430430
}
431431

432+
func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Paused() {
433+
execution := &commonpb.WorkflowExecution{
434+
WorkflowId: "some random workflow ID",
435+
RunId: uuid.New(),
436+
}
437+
workflowType := "some random workflow type"
438+
taskQueueName := "some random task queue"
439+
440+
mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId())
441+
_, err := mutableState.AddWorkflowExecutionStartedEvent(
442+
execution,
443+
&historyservice.StartWorkflowExecutionRequest{
444+
Attempt: 1,
445+
NamespaceId: s.namespaceID.String(),
446+
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
447+
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
448+
TaskQueue: &taskqueuepb.TaskQueue{
449+
Name: taskQueueName,
450+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
451+
},
452+
WorkflowExecutionTimeout: durationpb.New(2 * time.Second),
453+
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
454+
},
455+
},
456+
)
457+
s.Nil(err)
458+
459+
wt := addWorkflowTaskScheduledEvent(mutableState)
460+
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
461+
wt.StartedEventID = event.GetEventId()
462+
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
463+
464+
taskID := s.mustGenerateTaskID()
465+
activityID := "activity-1"
466+
activityType := "some random activity type"
467+
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskQueueName, &commonpb.Payloads{}, 1*time.Second, 1*time.Second, 1*time.Second, 1*time.Second)
468+
469+
// Set the activity as paused without starting it
470+
ai.Paused = true
471+
472+
now := time.Now().UTC()
473+
transferTask := &tasks.ActivityTask{
474+
WorkflowKey: definition.NewWorkflowKey(
475+
s.namespaceID.String(),
476+
execution.GetWorkflowId(),
477+
execution.GetRunId(),
478+
),
479+
Version: s.version,
480+
VisibilityTimestamp: now,
481+
TaskID: taskID,
482+
TaskQueue: taskQueueName,
483+
ScheduledEventID: event.GetEventId(),
484+
Stamp: ai.Stamp, // Ensure stamp matches
485+
}
486+
487+
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
488+
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
489+
490+
resp := s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
491+
s.NoError(resp.ExecutionErr)
492+
}
493+
432494
func (s *transferQueueStandbyTaskExecutorSuite) TestProcessWorkflowTask_Pending() {
433495
execution := &commonpb.WorkflowExecution{
434496
WorkflowId: "some random workflow ID",
@@ -1062,7 +1124,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P
10621124
// clear the cache
10631125
s.transferQueueStandbyTaskExecutor.cache = wcache.NewHostLevelCache(s.mockShard.GetConfig(), s.mockShard.GetLogger(), metrics.NoopMetricsHandler)
10641126
persistenceMutableState = s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
1065-
s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration))
10661127
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
10671128

10681129
s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, nil)

0 commit comments

Comments
 (0)