Skip to content

Commit 7b15d6c

Browse files
authored
Prevent workflow task and activity task generation when workflow is paused (temporalio#8687)
## What changed? - Prevents workflow task generation when paused (ex: signals, activity completions, child workflow completions etc) - Prevents activity task generation when a workflow is paused and an inflight workflow task comes back with start activity commands. - Prevents eager activity starts when the workflow is paused. ## Why? When a workflow is paused, we should not generate any more new activity and workflow tasks. ## How did you test it? - [x] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Prevents workflow/activity task creation and certain API operations when a workflow is paused, adds safeguards and tests across history, NDC, and task refresh paths. > > - **Behavior/State**: > - Add `MutableState.IsWorkflowExecutionStatusPaused()` and honor paused status across scheduling paths. > - `workflow/util.ScheduleWorkflowTask` returns early if paused. > - **API Handlers**: > - `RespondWorkflowTaskCompleted`: reject `ForceCreateNewWorkflowTask` when paused; block eager activity and bypass activity task gen when paused. > - `SignalWithStartWorkflow`: do not schedule a new workflow task if paused. > - `UpdateWorkflow`: reject updates while paused. > - Activity failure/respond paths: avoid new WFT when paused. > - **NDC/Replication**: > - Reapply events and buffered-event flusher: skip scheduling WFT if paused. > - `ndc.Workflow.FlushBufferedEvents`: don't schedule WFT when paused. > - **Task Refresher**: > - Skip refreshing WFT/activity tasks when paused; allow timers/search-attr updates; skip close tasks for running/paused. > - **Interfaces/Mocks**: > - Add `IsWorkflowExecutionStatusPaused` to `MutableState` interface and update mocks. > - **Tests**: > - New unit tests for paused behavior in handlers, NDC, task refresher, and transfer queue; functional tests for pause/unpause workflow behavior. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 17c11c6. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 9c33cf9 commit 7b15d6c

22 files changed

+721
-59
lines changed

service/history/api/recordchildworkflowcompleted/api_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func Test_Recordchildworkflowcompleted_WithForwards(t *testing.T) {
7272

7373
newParentMutableState := historyi.NewMockMutableState(ctrl)
7474
newParentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
75+
newParentMutableState.EXPECT().IsWorkflowExecutionStatusPaused().Return(false)
7576
newParentMutableState.EXPECT().GetNextEventID().Return(int64(10))
7677
newParentMutableState.EXPECT().AddChildWorkflowExecutionCompletedEvent(anyArg, anyArg, anyArg).Return(nil, nil)
7778
childExecutionInfo := &persistencespb.ChildExecutionInfo{

service/history/api/respondactivitytaskfailed/api_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ func (s *workflowSuite) Test_NoMoreRetriesAndMutableStateHasNoPendingTasks_WillR
305305
request.FailedRequest.GetIdentity(),
306306
request.FailedRequest.WorkerVersion,
307307
).Return(nil, nil)
308+
s.currentMutableState.EXPECT().IsWorkflowExecutionStatusPaused().Return(false)
308309
s.currentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
309310
s.currentMutableState.EXPECT().GetEffectiveVersioningBehavior().Return(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED)
310311
s.workflowContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
195195
return nil, serviceerror.NewNotFound("Workflow task not found.")
196196
}
197197

198+
// We don't accept the request to create a new workflow task if the workflow is paused.
199+
if ms.IsWorkflowExecutionStatusPaused() && request.GetForceCreateNewWorkflowTask() {
200+
// Mutable state wasn't changed yet and doesn't have to be cleared.
201+
releaseLeaseWithError = false
202+
return nil, serviceerror.NewFailedPrecondition("Workflow is paused and force create new workflow task is not allowed.")
203+
}
204+
198205
behavior := request.GetVersioningBehavior()
199206
deployment := worker_versioning.DeploymentFromDeploymentVersion(worker_versioning.DeploymentVersionFromOptions(request.GetDeploymentOptions()))
200207
//nolint:staticcheck // SA1019 deprecated Deployment will clean up later

service/history/api/respondworkflowtaskcompleted/api_test.go

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,39 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
435435
s.Equal("rejection-of-"+tv.UpdateID(), updStatus.Outcome.GetFailure().GetMessage())
436436

437437
s.EqualHistoryEvents(`
438-
13 WorkflowTaskScheduled // WFT events were created even if it was a rejection (because number of events > 10).
438+
13 WorkflowTaskScheduled // WFT events were created even if it was a rejection (because number of events > 10).
439439
14 WorkflowTaskStarted
440440
15 WorkflowTaskCompleted
441441
`, <-writtenHistoryCh)
442442
})
443443
}
444444

445+
func (s *WorkflowTaskCompletedHandlerSuite) TestForceCreateNewWorkflowTaskOnPausedWorkflow() {
446+
s.Run("Returns error when workflow is paused and ForceCreateNewWorkflowTask is true", func() {
447+
tv := testvars.New(s.T())
448+
tv = tv.WithRunID(tv.Any().RunID())
449+
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
450+
wfContext, serializedTaskToken := s.createPausedWorkflowWithWFT(tv)
451+
452+
ms, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext)
453+
s.NoError(err)
454+
s.True(ms.IsWorkflowExecutionStatusPaused())
455+
456+
_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
457+
NamespaceId: tv.NamespaceID().String(),
458+
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
459+
TaskToken: serializedTaskToken,
460+
Identity: tv.Any().String(),
461+
ForceCreateNewWorkflowTask: true,
462+
},
463+
})
464+
s.Error(err)
465+
var failedPrecondition *serviceerror.FailedPrecondition
466+
s.ErrorAs(err, &failedPrecondition)
467+
s.Contains(err.Error(), "Workflow is paused and force create new workflow task is not allowed")
468+
})
469+
}
470+
445471
func (s *WorkflowTaskCompletedHandlerSuite) TestHandleBufferedQueries() {
446472
constructQueryResults := func(ids []string, resultSize int) map[string]*querypb.WorkflowQueryResult {
447473
results := make(map[string]*querypb.WorkflowQueryResult)
@@ -617,3 +643,95 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa
617643

618644
return updRequestMsg, upd, serializedTaskToken
619645
}
646+
647+
func (s *WorkflowTaskCompletedHandlerSuite) createPausedWorkflowWithWFT(tv *testvars.TestVars) (historyi.WorkflowContext, []byte) {
648+
ms := workflow.TestLocalMutableState(s.workflowTaskCompletedHandler.shardContext, s.mockEventsCache, tv.Namespace(),
649+
tv.WorkflowID(), tv.RunID(), log.NewTestLogger())
650+
651+
startRequest := &workflowservice.StartWorkflowExecutionRequest{
652+
WorkflowId: tv.WorkflowID(),
653+
WorkflowType: tv.WorkflowType(),
654+
TaskQueue: tv.TaskQueue(),
655+
Input: tv.Any().Payloads(),
656+
WorkflowExecutionTimeout: tv.Any().InfiniteTimeout(),
657+
WorkflowRunTimeout: tv.Any().InfiniteTimeout(),
658+
WorkflowTaskTimeout: tv.Any().InfiniteTimeout(),
659+
Identity: tv.ClientIdentity(),
660+
}
661+
662+
_, _ = ms.AddWorkflowExecutionStartedEvent(
663+
tv.WorkflowExecution(),
664+
&historyservice.StartWorkflowExecutionRequest{
665+
Attempt: 1,
666+
NamespaceId: tv.NamespaceID().String(),
667+
StartRequest: startRequest,
668+
ParentExecutionInfo: nil,
669+
},
670+
)
671+
672+
// Complete the first workflow task to transition state to Running.
673+
wt, _ := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
674+
_, _, _ = ms.AddWorkflowTaskStartedEvent(
675+
wt.ScheduledEventID,
676+
tv.RunID(),
677+
tv.TaskQueue(),
678+
tv.Any().String(),
679+
nil,
680+
nil,
681+
nil,
682+
false,
683+
)
684+
_, _ = ms.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{
685+
Identity: tv.Any().String(),
686+
}, historyi.WorkflowTaskCompletionLimits{MaxResetPoints: 10, MaxSearchAttributeValueSize: 2048})
687+
688+
// Add a speculative WFT before pausing.
689+
wt2, _ := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
690+
_, _, _ = ms.AddWorkflowTaskStartedEvent(
691+
wt2.ScheduledEventID,
692+
tv.RunID(),
693+
tv.StickyTaskQueue(),
694+
tv.Any().String(),
695+
nil,
696+
nil,
697+
nil,
698+
false,
699+
)
700+
taskToken := &tokenspb.Task{
701+
Attempt: 1,
702+
NamespaceId: tv.NamespaceID().String(),
703+
WorkflowId: tv.WorkflowID(),
704+
RunId: tv.RunID(),
705+
ScheduledEventId: wt2.ScheduledEventID,
706+
}
707+
serializedTaskToken, err := taskToken.Marshal()
708+
s.NoError(err)
709+
710+
// Pause the workflow
711+
_, err = ms.AddWorkflowExecutionPausedEvent("test-identity", "test-reason", tv.Any().String())
712+
s.NoError(err)
713+
s.True(ms.IsWorkflowExecutionStatusPaused())
714+
ms.FlushBufferedEvents()
715+
716+
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
717+
func(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) {
718+
return &persistence.GetWorkflowExecutionResponse{State: workflow.TestCloneToProto(ms)}, nil
719+
}).AnyTimes()
720+
721+
wfContext, release, err := s.workflowCache.GetOrCreateWorkflowExecution(
722+
metrics.AddMetricsContext(context.Background()),
723+
s.mockShard,
724+
tv.NamespaceID(),
725+
tv.WorkflowExecution(),
726+
locks.PriorityHigh,
727+
)
728+
s.NoError(err)
729+
s.NotNil(wfContext)
730+
731+
loadedMS, err := wfContext.LoadMutableState(context.Background(), s.mockShard)
732+
s.NoError(err)
733+
s.NotNil(loadedMS)
734+
release(nil)
735+
736+
return wfContext, serializedTaskToken
737+
}

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,17 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity(
497497
eagerStartActivity := attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) &&
498498
(!versioningUsed || attr.UseWorkflowBuildId)
499499

500+
// if the workflow is paused, we bypass activity task generation and also prevent eager activity execution.
501+
bypassActivityTaskGeneration := eagerStartActivity
502+
if handler.mutableState.GetExecutionState().Status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
503+
bypassActivityTaskGeneration = true
504+
eagerStartActivity = false
505+
}
506+
500507
event, _, err := handler.mutableState.AddActivityTaskScheduledEvent(
501508
handler.workflowTaskCompletedID,
502509
attr,
503-
eagerStartActivity,
510+
bypassActivityTaskGeneration,
504511
)
505512
if err != nil {
506513
return nil, nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)

service/history/api/signalwithstartworkflow/signal_with_start_workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ func signalWorkflow(
311311
}
312312

313313
// Create a transfer task to schedule a workflow task
314-
if !mutableState.HasPendingWorkflowTask() {
314+
if !mutableState.HasPendingWorkflowTask() && !mutableState.IsWorkflowExecutionStatusPaused() {
315315

316316
executionInfo := mutableState.GetExecutionInfo()
317317
executionState := mutableState.GetExecutionState()

service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
143143
request.GetLinks(),
144144
).Return(&historypb.HistoryEvent{}, nil)
145145
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
146+
s.currentMutableState.EXPECT().IsWorkflowExecutionStatusPaused().Return(false)
146147
s.currentMutableState.EXPECT().HadOrHasWorkflowTask().Return(true)
147148
s.currentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL).Return(&historyi.WorkflowTaskInfo{}, nil)
148149
s.currentContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)
@@ -187,6 +188,40 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
187188
s.NoError(err)
188189
}
189190

191+
// Tests SignalWithStart when the workflow is paused.
192+
// Asserts that no new workflow task is scheduled.
193+
func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_WhenPaused() {
194+
ctx := context.Background()
195+
currentWorkflowLease := api.NewWorkflowLease(
196+
s.currentContext,
197+
wcache.NoopReleaseFn,
198+
s.currentMutableState,
199+
)
200+
request := s.randomRequest()
201+
202+
s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
203+
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false)
204+
s.currentMutableState.EXPECT().AddSignalRequested(request.GetRequestId())
205+
s.currentMutableState.EXPECT().AddWorkflowExecutionSignaled(
206+
request.GetSignalName(),
207+
request.GetSignalInput(),
208+
request.GetIdentity(),
209+
request.GetHeader(),
210+
request.GetLinks(),
211+
).Return(&historypb.HistoryEvent{}, nil)
212+
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
213+
s.currentMutableState.EXPECT().IsWorkflowExecutionStatusPaused().Return(true)
214+
s.currentContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)
215+
216+
err := signalWorkflow(
217+
ctx,
218+
s.shardContext,
219+
currentWorkflowLease,
220+
request,
221+
)
222+
s.NoError(err)
223+
}
224+
190225
func (s *signalWithStartWorkflowSuite) randomRequest() *workflowservice.SignalWithStartWorkflowExecutionRequest {
191226
var request workflowservice.SignalWithStartWorkflowExecutionRequest
192227
_ = fakedata.FakeStruct(&request)

service/history/api/update_workflow_util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ func UpdateWorkflowWithNew(
7878

7979
mutableState := workflowLease.GetMutableState()
8080
if postActions.CreateWorkflowTask {
81-
// Create a transfer task to schedule a workflow task
82-
if !mutableState.HasPendingWorkflowTask() {
81+
// Create a transfer task to schedule a workflow task only if the workflow is not paused and there is no pending workflow task.
82+
if !mutableState.HasPendingWorkflowTask() && !mutableState.IsWorkflowExecutionStatusPaused() {
8383
if _, err := mutableState.AddWorkflowTaskScheduledEvent(
8484
false,
8585
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,

service/history/api/updateworkflow/api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func (u *Updater) ApplyRequest(
124124
return nil, consts.ErrWorkflowCompleted
125125
}
126126

127+
// We don't accept the request to update the workflow if the workflow is paused.
128+
if ms.IsWorkflowExecutionStatusPaused() {
129+
return nil, serviceerror.NewFailedPrecondition("Workflow is paused. Cannot update the workflow.")
130+
}
131+
127132
if ms.GetExecutionInfo().WorkflowTaskAttempt >= failUpdateWorkflowTaskAttemptCount {
128133
// If workflow task is constantly failing, the update to that workflow will also fail.
129134
// Additionally, workflow update can't "fix" workflow state because updates (delivered with messages)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package updateworkflow
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
"go.temporal.io/api/serviceerror"
9+
updatepb "go.temporal.io/api/update/v1"
10+
"go.temporal.io/api/workflowservice/v1"
11+
"go.temporal.io/server/api/historyservice/v1"
12+
"go.temporal.io/server/common/definition"
13+
historyi "go.temporal.io/server/service/history/interfaces"
14+
"go.temporal.io/server/service/history/workflow/update"
15+
"go.uber.org/mock/gomock"
16+
)
17+
18+
func TestApplyRequest_RejectsUpdateOnPausedWorkflow(t *testing.T) {
19+
t.Parallel()
20+
21+
ctrl := gomock.NewController(t)
22+
23+
ms := historyi.NewMockMutableState(ctrl)
24+
ms.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey("ns-id", "wf-id", "run-id"))
25+
ms.EXPECT().IsWorkflowExecutionRunning().Return(true)
26+
ms.EXPECT().IsWorkflowExecutionStatusPaused().Return(true)
27+
// Required by update.NewRegistry
28+
ms.EXPECT().GetCurrentVersion().Return(int64(1))
29+
ms.EXPECT().VisitUpdates(gomock.Any())
30+
31+
updateReg := update.NewRegistry(ms)
32+
33+
updater := &Updater{
34+
req: createUpdateRequest("test-update-id"),
35+
}
36+
37+
action, err := updater.ApplyRequest(context.Background(), updateReg, ms)
38+
39+
require.Nil(t, action)
40+
require.Error(t, err)
41+
var failedPrecondition *serviceerror.FailedPrecondition
42+
require.ErrorAs(t, err, &failedPrecondition)
43+
require.Contains(t, err.Error(), "Workflow is paused")
44+
}
45+
46+
func createUpdateRequest(updateID string) *historyservice.UpdateWorkflowExecutionRequest {
47+
return &historyservice.UpdateWorkflowExecutionRequest{
48+
Request: &workflowservice.UpdateWorkflowExecutionRequest{
49+
Request: &updatepb.Request{
50+
Meta: &updatepb.Meta{
51+
UpdateId: updateID,
52+
},
53+
},
54+
},
55+
}
56+
}

0 commit comments

Comments
 (0)