Skip to content

Commit 69daa0c

Browse files
rkannan82claude
andcommitted
Proactively cancel activities on workflow termination and timeout
When a workflow is terminated or times out, dispatch cancel commands to workers for in-flight activities that registered a control queue. Extends the mechanism from #9233 to cover forceful workflow close scenarios (including reset, which terminates the old run). Guarded by EnableCancelActivityWorkerCommand. Disabled by default. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 95481a7 commit 69daa0c

8 files changed

Lines changed: 584 additions & 2 deletions

File tree

service/history/interfaces/mutable_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type (
4848
AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
4949
AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error)
5050
AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error
51+
GenerateActivityCancelCommandsForClose() error
5152
AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error)
5253
AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error)
5354
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)

service/history/interfaces/mutable_state_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/ndc/events_reapplier_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Termination(
459459
false,
460460
nil,
461461
).Return(nil, nil)
462+
msCurrent.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
462463
events := []*historypb.HistoryEvent{
463464
{EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED},
464465
event,

service/history/ndc/workflow_resetter_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
561561
false,
562562
nil,
563563
).Return(&historypb.HistoryEvent{}, nil)
564+
mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
564565

565566
err := s.workflowResetter.terminateWorkflow(mutableState, terminateReason)
566567
s.NoError(err)
@@ -1255,6 +1256,7 @@ func (s *workflowResetterSuite) TestReapplyEvents() {
12551256
false,
12561257
event.Links,
12571258
).Return(&historypb.HistoryEvent{}, nil)
1259+
ms.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
12581260
}
12591261
}
12601262
}

service/history/workflow/mutable_state_impl.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import (
6161
"go.temporal.io/server/common/searchattribute/sadefs"
6262
serviceerrors "go.temporal.io/server/common/serviceerror"
6363
"go.temporal.io/server/common/softassert"
64+
"go.temporal.io/server/common/tasktoken"
6465
"go.temporal.io/server/common/util"
6566
"go.temporal.io/server/common/worker_versioning"
6667
"go.temporal.io/server/components/callbacks"
@@ -4556,6 +4557,73 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo
45564557
return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue)
45574558
}
45584559

4560+
// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all
4561+
// in-flight activities that have a worker control queue. Called when the workflow is being
4562+
// terminated (or otherwise forcefully closed) to proactively notify workers.
4563+
func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error {
4564+
if !ms.config.EnableCancelActivityWorkerCommand() {
4565+
return nil
4566+
}
4567+
4568+
serializer := tasktoken.NewSerializer()
4569+
wfKey := ms.GetWorkflowKey()
4570+
nsID := ms.GetNamespaceEntry().ID().String()
4571+
4572+
commandsByQueue := make(map[string][]*workerpb.WorkerCommand)
4573+
for _, ai := range ms.pendingActivityInfoIDs {
4574+
// No control queue means the activity was started before this feature was deployed.
4575+
if ai.WorkerControlTaskQueue == "" {
4576+
continue
4577+
}
4578+
if ai.StartedClock == nil {
4579+
// StartedClock may be nil for activities started before this feature was deployed.
4580+
// Skip cancel command; the activity will time out normally.
4581+
ms.logger.Debug("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
4582+
tag.WorkflowNamespaceID(wfKey.NamespaceID),
4583+
tag.WorkflowID(wfKey.WorkflowID),
4584+
tag.WorkflowRunID(wfKey.RunID),
4585+
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
4586+
)
4587+
continue
4588+
}
4589+
4590+
taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken(
4591+
nsID,
4592+
wfKey.WorkflowID,
4593+
wfKey.RunID,
4594+
ai.ScheduledEventId,
4595+
ai.ActivityId,
4596+
ai.ActivityType.GetName(),
4597+
ai.Attempt,
4598+
ai.StartedClock,
4599+
ai.Version,
4600+
ai.StartVersion,
4601+
nil,
4602+
))
4603+
if err != nil {
4604+
return err
4605+
}
4606+
4607+
commandsByQueue[ai.WorkerControlTaskQueue] = append(
4608+
commandsByQueue[ai.WorkerControlTaskQueue],
4609+
&workerpb.WorkerCommand{
4610+
Type: &workerpb.WorkerCommand_CancelActivity{
4611+
CancelActivity: &workerpb.CancelActivityCommand{
4612+
TaskToken: taskToken,
4613+
},
4614+
},
4615+
},
4616+
)
4617+
}
4618+
4619+
for controlQueue, commands := range commandsByQueue {
4620+
if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil {
4621+
return err
4622+
}
4623+
}
4624+
return nil
4625+
}
4626+
45594627
func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent(
45604628
event *historypb.HistoryEvent,
45614629
) error {

service/history/workflow/mutable_state_impl_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7285,3 +7285,161 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk
72857285
})
72867286
}
72877287
}
7288+
7289+
func TestGenerateActivityCancelCommandsForClose(t *testing.T) {
7290+
t.Parallel()
7291+
7292+
startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100}
7293+
7294+
testCases := []struct {
7295+
name string
7296+
featureEnabled bool
7297+
activities map[int64]*persistencespb.ActivityInfo
7298+
expectedQueues map[string]int // controlQueue -> expected command count
7299+
expectedNoTasks bool
7300+
}{
7301+
{
7302+
name: "activities with control queue and started clock",
7303+
featureEnabled: true,
7304+
activities: map[int64]*persistencespb.ActivityInfo{
7305+
1: {
7306+
ScheduledEventId: 1,
7307+
ActivityId: "act-1",
7308+
ActivityType: &commonpb.ActivityType{Name: "type1"},
7309+
WorkerControlTaskQueue: "control-queue-1",
7310+
StartedClock: startedClock,
7311+
Attempt: 1,
7312+
},
7313+
},
7314+
expectedQueues: map[string]int{"control-queue-1": 1},
7315+
},
7316+
{
7317+
name: "skips activities without control queue",
7318+
featureEnabled: true,
7319+
activities: map[int64]*persistencespb.ActivityInfo{
7320+
1: {
7321+
ScheduledEventId: 1,
7322+
ActivityId: "act-1",
7323+
ActivityType: &commonpb.ActivityType{Name: "type1"},
7324+
StartedClock: startedClock,
7325+
Attempt: 1,
7326+
},
7327+
},
7328+
expectedNoTasks: true,
7329+
},
7330+
{
7331+
name: "skips activities without started clock",
7332+
featureEnabled: true,
7333+
activities: map[int64]*persistencespb.ActivityInfo{
7334+
1: {
7335+
ScheduledEventId: 1,
7336+
ActivityId: "act-1",
7337+
ActivityType: &commonpb.ActivityType{Name: "type1"},
7338+
WorkerControlTaskQueue: "control-queue-1",
7339+
Attempt: 1,
7340+
},
7341+
},
7342+
expectedNoTasks: true,
7343+
},
7344+
{
7345+
name: "multiple activities batched by control queue",
7346+
featureEnabled: true,
7347+
activities: map[int64]*persistencespb.ActivityInfo{
7348+
1: {
7349+
ScheduledEventId: 1,
7350+
ActivityId: "act-1",
7351+
ActivityType: &commonpb.ActivityType{Name: "type1"},
7352+
WorkerControlTaskQueue: "queue-A",
7353+
StartedClock: startedClock,
7354+
Attempt: 1,
7355+
},
7356+
2: {
7357+
ScheduledEventId: 2,
7358+
ActivityId: "act-2",
7359+
ActivityType: &commonpb.ActivityType{Name: "type2"},
7360+
WorkerControlTaskQueue: "queue-A",
7361+
StartedClock: startedClock,
7362+
Attempt: 1,
7363+
},
7364+
3: {
7365+
ScheduledEventId: 3,
7366+
ActivityId: "act-3",
7367+
ActivityType: &commonpb.ActivityType{Name: "type3"},
7368+
WorkerControlTaskQueue: "queue-B",
7369+
StartedClock: startedClock,
7370+
Attempt: 1,
7371+
},
7372+
},
7373+
expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1},
7374+
},
7375+
{
7376+
name: "feature flag disabled - no tasks generated",
7377+
featureEnabled: false,
7378+
activities: map[int64]*persistencespb.ActivityInfo{
7379+
1: {
7380+
ScheduledEventId: 1,
7381+
ActivityId: "act-1",
7382+
ActivityType: &commonpb.ActivityType{Name: "type1"},
7383+
WorkerControlTaskQueue: "control-queue-1",
7384+
StartedClock: startedClock,
7385+
Attempt: 1,
7386+
},
7387+
},
7388+
expectedNoTasks: true,
7389+
},
7390+
}
7391+
7392+
for _, tc := range testCases {
7393+
t.Run(tc.name, func(t *testing.T) {
7394+
ctrl := gomock.NewController(t)
7395+
mockEventsCache := events.NewMockCache(ctrl)
7396+
mockConfig := tests.NewDynamicConfig()
7397+
mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled)
7398+
7399+
mockShard := shard.NewTestContext(
7400+
ctrl,
7401+
&persistencespb.ShardInfo{ShardId: 0, RangeId: 1},
7402+
mockConfig,
7403+
)
7404+
defer mockShard.StopForTest()
7405+
reg := hsm.NewRegistry()
7406+
require.NoError(t, RegisterStateMachine(reg))
7407+
require.NoError(t, callbacks.RegisterStateMachine(reg))
7408+
require.NoError(t, nexusoperations.RegisterStateMachines(reg))
7409+
mockShard.SetStateMachineRegistry(reg)
7410+
mockShard.SetEventsCacheForTesting(mockEventsCache)
7411+
7412+
namespaceEntry := tests.GlobalNamespaceEntry
7413+
mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes()
7414+
mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes()
7415+
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
7416+
mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()
7417+
7418+
ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC())
7419+
ms.pendingActivityInfoIDs = tc.activities
7420+
7421+
err := ms.GenerateActivityCancelCommandsForClose()
7422+
require.NoError(t, err)
7423+
7424+
if tc.expectedNoTasks {
7425+
require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound])
7426+
return
7427+
}
7428+
7429+
// Verify tasks were generated by checking outbound task messages
7430+
var workerCommandTasks []*tasks.WorkerCommandsTask
7431+
for _, task := range ms.InsertTasks[tasks.CategoryOutbound] {
7432+
if wct, ok := task.(*tasks.WorkerCommandsTask); ok {
7433+
workerCommandTasks = append(workerCommandTasks, wct)
7434+
}
7435+
}
7436+
7437+
// Verify each expected queue got the right number of commands
7438+
tasksByQueue := make(map[string]int)
7439+
for _, wct := range workerCommandTasks {
7440+
tasksByQueue[wct.Destination] = len(wct.Commands)
7441+
}
7442+
require.Equal(t, tc.expectedQueues, tasksByQueue)
7443+
})
7444+
}
7445+
}

service/history/workflow/util.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ func TimeoutWorkflow(
9595
retryState,
9696
continuedRunID,
9797
)
98-
return err
98+
if err != nil {
99+
return err
100+
}
101+
102+
// Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed.
103+
return mutableState.GenerateActivityCancelCommandsForClose()
99104
}
100105

101106
// TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh
@@ -142,8 +147,12 @@ func TerminateWorkflow(
142147
deleteAfterTerminate,
143148
links,
144149
)
150+
if err != nil {
151+
return err
152+
}
145153

146-
return err
154+
// Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed.
155+
return mutableState.GenerateActivityCancelCommandsForClose()
147156
}
148157

149158
// FindAutoResetPoint returns the auto reset point

0 commit comments

Comments
 (0)