Skip to content

Commit 6a5559e

Browse files
authored
Pause activity in recordactivitytaskstarted (temporalio#7559)
## What changed? <!-- Describe what has changed in this PR --> Add code to "recordactivitytaskstarted" api to run workflow rules and pause activity if needed. ## Why? <!-- Tell your future self why have you made these changes --> To cover scenario when workflow rule should prevent activity from starting. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Add functional test specifically to this scenario.
1 parent 17f93a7 commit 6a5559e

File tree

2 files changed

+264
-81
lines changed

2 files changed

+264
-81
lines changed

service/history/api/recordactivitytaskstarted/api.go

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
deploymentspb "go.temporal.io/server/api/deployment/v1"
3535
"go.temporal.io/server/api/historyservice/v1"
3636
"go.temporal.io/server/api/matchingservice/v1"
37+
persistencespb "go.temporal.io/server/api/persistence/v1"
3738
"go.temporal.io/server/common"
3839
"go.temporal.io/server/common/definition"
3940
"go.temporal.io/server/common/metrics"
@@ -48,6 +49,15 @@ import (
4849
"go.temporal.io/server/service/history/workflow"
4950
)
5051

52+
type rejectCode int32
53+
54+
const (
55+
rejectCodeUndefined rejectCode = iota
56+
rejectCodeAccepted
57+
rejectCodePaused
58+
rejectCodeStartedTransition
59+
)
60+
5161
func Invoke(
5262
ctx context.Context,
5363
request *historyservice.RecordActivityTaskStartedRequest,
@@ -58,7 +68,7 @@ func Invoke(
5868

5969
var err error
6070
response := &historyservice.RecordActivityTaskStartedResponse{}
61-
var startedTransition bool
71+
var rejectCode rejectCode
6272

6373
err = api.GetAndUpdateWorkflowWithNew(
6474
ctx,
@@ -74,7 +84,9 @@ func Invoke(
7484
return nil, consts.ErrWorkflowCompleted
7585
}
7686

77-
response, startedTransition, err = recordActivityTaskStarted(ctx, shardContext, mutableState, request, matchingClient)
87+
response, rejectCode, err = recordActivityTaskStarted(
88+
ctx, shardContext, mutableState, request, matchingClient,
89+
)
7890
if err != nil {
7991
return nil, err
8092
}
@@ -84,7 +96,7 @@ func Invoke(
8496
// Create new wft if a transition started with this activity.
8597
// StartDeploymentTransition rescheduled pending wft, but this creates new
8698
// one if there is no pending wft.
87-
CreateWorkflowTask: startedTransition,
99+
CreateWorkflowTask: rejectCode == rejectCodeStartedTransition,
88100
}, nil
89101
},
90102
nil,
@@ -96,11 +108,19 @@ func Invoke(
96108
return nil, err
97109
}
98110

99-
if startedTransition {
111+
if rejectCode == rejectCodeStartedTransition {
100112
// Rejecting the activity start because the workflow is now in transition. Matching can drop
101113
// the task, new activity task will be scheduled after transition completion.
102114
return nil, serviceerrors.NewActivityStartDuringTransition()
103115
}
116+
117+
if rejectCode == rejectCodePaused {
118+
// Rejecting the activity start because activity was modified (paused_.
119+
// Matching can drop the task. New activity will be scheduled once activity is resumed.
120+
errorMessage := fmt.Sprintf("Activity task with this stamp not found: %v", request.Stamp)
121+
return nil, serviceerror.NewNotFound(errorMessage)
122+
}
123+
104124
return response, err
105125
}
106126

@@ -110,10 +130,10 @@ func recordActivityTaskStarted(
110130
mutableState historyi.MutableState,
111131
request *historyservice.RecordActivityTaskStartedRequest,
112132
matchingClient matchingservice.MatchingServiceClient,
113-
) (*historyservice.RecordActivityTaskStartedResponse, bool, error) {
133+
) (*historyservice.RecordActivityTaskStartedResponse, rejectCode, error) {
114134
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()))
115135
if err != nil {
116-
return nil, false, err
136+
return nil, rejectCodeUndefined, err
117137
}
118138
namespaceName := namespaceEntry.Name().String()
119139

@@ -127,20 +147,20 @@ func recordActivityTaskStarted(
127147
// some extreme cassandra failure cases.
128148
if !isRunning && scheduledEventID >= mutableState.GetNextEventID() {
129149
metrics.StaleMutableStateCounter.With(taggedMetrics).Record(1)
130-
return nil, false, consts.ErrStaleState
150+
return nil, rejectCodeUndefined, consts.ErrStaleState
131151
}
132152

133153
// Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If
134154
// task is not outstanding than it is most probably a duplicate and complete the task.
135155
if !isRunning {
136156
// Looks like ActivityTask already completed as a result of another call.
137157
// It is OK to drop the task at this point.
138-
return nil, false, consts.ErrActivityTaskNotFound
158+
return nil, rejectCodeUndefined, consts.ErrActivityTaskNotFound
139159
}
140160

141161
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledEventID)
142162
if err != nil {
143-
return nil, false, err
163+
return nil, rejectCodeUndefined, err
144164
}
145165

146166
response := &historyservice.RecordActivityTaskStartedResponse{
@@ -154,12 +174,17 @@ func recordActivityTaskStarted(
154174
if ai.RequestId == requestID {
155175
response.StartedTime = ai.StartedTime
156176
response.Attempt = ai.Attempt
157-
return response, false, nil
177+
return response, rejectCodeAccepted, nil
158178
}
159179

160180
// Looks like ActivityTask already started as a result of another call.
161181
// It is OK to drop the task at this point.
162-
return nil, false, serviceerrors.NewTaskAlreadyStarted("Activity")
182+
return nil, rejectCodeUndefined, serviceerrors.NewTaskAlreadyStarted("Activity")
183+
}
184+
185+
code, err := processActivityWorkflowRules(shardContext, request, mutableState, ai)
186+
if err != nil || code == rejectCodePaused {
187+
return nil, code, err
163188
}
164189

165190
if ai.Stamp != request.Stamp {
@@ -168,7 +193,7 @@ func recordActivityTaskStarted(
168193
errorMessage := fmt.Sprintf(
169194
"Activity task with this stamp not found. Id: %s,: type: %s, current stamp: %d",
170195
ai.ActivityId, ai.ActivityType.Name, ai.Stamp)
171-
return nil, false, serviceerror.NewNotFound(errorMessage)
196+
return nil, rejectCodeUndefined, serviceerror.NewNotFound(errorMessage)
172197
}
173198

174199
wfBehavior := mutableState.GetEffectiveVersioningBehavior()
@@ -177,13 +202,13 @@ func recordActivityTaskStarted(
177202
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities, request.PollRequest.DeploymentOptions)
178203
err = worker_versioning.ValidateTaskVersionDirective(request.GetVersionDirective(), wfBehavior, wfDeployment, request.ScheduledDeployment)
179204
if err != nil {
180-
return nil, false, err
205+
return nil, rejectCodeUndefined, err
181206
}
182207

183208
if mutableState.GetDeploymentTransition() != nil {
184209
// Can't start activity during a redirect. We reject this request so Matching drops
185210
// the task. The activity will be rescheduled when the redirect completes/fails.
186-
return nil, false, serviceerrors.NewActivityStartDuringTransition()
211+
return nil, rejectCodeUndefined, serviceerrors.NewActivityStartDuringTransition()
187212
}
188213

189214
if !pollerDeployment.Equal(wfDeployment) &&
@@ -202,7 +227,7 @@ func recordActivityTaskStarted(
202227
)
203228
if err != nil {
204229
// Let matching retry
205-
return nil, false, err
230+
return nil, rejectCodeUndefined, err
206231
}
207232
if pollerDeployment.Equal(worker_versioning.DeploymentFromDeploymentVersion(wftDepVer)) {
208233
if err := mutableState.StartDeploymentTransition(pollerDeployment); err != nil {
@@ -211,13 +236,13 @@ func recordActivityTaskStarted(
211236
// now pinned so can't transition. Matching can drop the task safely.
212237
// TODO (shahab): remove this special error check because it is not
213238
// expected to happen once scheduledBehavior is always populated. see TODOs above.
214-
return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error())
239+
return nil, rejectCodeUndefined, serviceerrors.NewObsoleteMatchingTask(err.Error())
215240
}
216-
return nil, false, err
241+
return nil, rejectCodeUndefined, err
217242
}
218243
// This activity started a transition, make sure the MS changes are written but
219244
// reject the activity task.
220-
return nil, true, nil
245+
return nil, rejectCodeStartedTransition, nil
221246
}
222247
}
223248

@@ -226,7 +251,7 @@ func recordActivityTaskStarted(
226251
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
227252
versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(),
228253
); err != nil {
229-
return nil, false, err
254+
return nil, rejectCodeUndefined, err
230255
}
231256

232257
scheduleToStartLatency := ai.GetStartedTime().AsTime().Sub(ai.GetScheduledTime().AsTime())
@@ -251,7 +276,7 @@ func recordActivityTaskStarted(
251276
response.WorkflowType = mutableState.GetWorkflowType()
252277
response.WorkflowNamespace = namespaceName
253278

254-
return response, false, nil
279+
return response, rejectCodeAccepted, nil
255280
}
256281

257282
// TODO (Shahab): move this method to a better place
@@ -281,3 +306,50 @@ func getDeploymentVersionForWorkflowId(
281306
current, ramping := worker_versioning.CalculateTaskQueueVersioningInfo(tqData.GetDeploymentData())
282307
return worker_versioning.FindDeploymentVersionForWorkflowID(current, ramping, workflowId), nil
283308
}
309+
310+
func processActivityWorkflowRules(
311+
shardContext historyi.ShardContext,
312+
request *historyservice.RecordActivityTaskStartedRequest,
313+
ms historyi.MutableState,
314+
ai *persistencespb.ActivityInfo,
315+
) (rejectCode, error) {
316+
if ai.Stamp == request.Stamp && ai.Paused {
317+
// this shouldn't happen. For now log an error
318+
shardContext.GetLogger().Error(
319+
fmt.Sprintf(
320+
"Activity is paused, but new task was schedulled. Activity ID: %v, Stamp: %v, event ID: %v",
321+
ai.ActivityId, request.Stamp, ai.StartedEventId),
322+
)
323+
}
324+
325+
// if activity is already paused or Stamp is not the same as the one in the request we shouldn't process workflow rules
326+
// this is a no-op
327+
if ai.Stamp != request.Stamp || ai.Paused {
328+
return rejectCodeUndefined, nil
329+
}
330+
331+
// This is an attempt to reduce the number of workflow rules calls.
332+
// We only need to process the first invocation of an activity.
333+
// Other invocations should be blocked by either RetryActivity or retryTask.
334+
if ai.Attempt > 1 {
335+
return rejectCodeUndefined, nil
336+
}
337+
338+
ruleMatched := workflow.ActivityMatchWorkflowRules(ms, shardContext.GetLogger(), ai)
339+
if !ruleMatched || !ai.Paused {
340+
return rejectCodeUndefined, nil
341+
}
342+
343+
// activity was paused, need to update activity
344+
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
345+
activityInfo.StartedEventId = common.EmptyEventID
346+
activityInfo.StartedTime = nil
347+
activityInfo.RequestId = ""
348+
return nil
349+
}); err != nil {
350+
return rejectCodeUndefined, err
351+
}
352+
353+
// if activity was paused we will need to update mutable state
354+
return rejectCodePaused, nil
355+
}

0 commit comments

Comments
 (0)