-
Notifications
You must be signed in to change notification settings - Fork 298
Expand file tree
/
Copy pathactivity.go
More file actions
495 lines (450 loc) · 21 KB
/
activity.go
File metadata and controls
495 lines (450 loc) · 21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
package internal
import (
"context"
"fmt"
"time"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
)
type (
// ActivityType identifies an activity type.
//
// Exposed as: [go.temporal.io/sdk/activity.Type]
ActivityType struct {
// Name is the name of the activity type.
Name string
}
// ActivityInfo contains information about a currently executing activity.
//
// Exposed as: [go.temporal.io/sdk/activity.Info]
ActivityInfo struct {
// TaskToken is the token that identifies the activity task.
TaskToken []byte
// WorkflowType is the type of the workflow that started this activity.
WorkflowType *WorkflowType
// Namespace of the workflow that started this activity. Empty if this activity was not started by a workflow.
// If present, the value is always the same as Namespace since workflows can only run activities in their own
// namespace.
//
// Deprecated: use Namespace instead.
WorkflowNamespace string
// Execution details of the workflow that started this activity. All fields are empty if this activity was not
// started by a workflow.
WorkflowExecution WorkflowExecution
// ActivityID is the ID of the activity.
ActivityID string
// ActivityRunID is the run ID of the activity. Empty if the activity was started by a workflow.
ActivityRunID string
// ActivityType is the type of the activity.
ActivityType ActivityType
// TaskQueue is the name of the task queue that the activity needs to be scheduled on.
TaskQueue string
// Namespace is the namespace of this activity.
Namespace string
// HeartbeatTimeout is the maximum time between heartbeats. 0 means no heartbeat needed.
HeartbeatTimeout time.Duration
// ScheduleToCloseTimeout is the schedule to close timeout set by the activity options.
ScheduleToCloseTimeout time.Duration
// StartToCloseTimeout is the start to close timeout set by the activity options.
StartToCloseTimeout time.Duration
// ScheduledTime is the time when the activity was scheduled by a workflow.
ScheduledTime time.Time
// StartedTime is the time when the activity started.
StartedTime time.Time
// Deadline is the time of activity timeout.
Deadline time.Time
// Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
Attempt int32
// IsLocalActivity is true if it is a local activity.
IsLocalActivity bool
// Priority settings that control relative ordering of task processing when activity tasks are backed up in a queue.
// If no priority is set, the default value is the zero value.
//
// WARNING: Task queue priority is currently experimental.
Priority Priority
// Retry policy for the activity. Note that the server may have set a different policy than the one provided
// when scheduling the activity. If the value is nil, it means the server didn't send information about
// retry policy (e.g. due to old server version), but it may still be defined server-side.
RetryPolicy *RetryPolicy
}
// RegisterActivityOptions consists of options for registering an activity.
//
// Exposed as: [go.temporal.io/sdk/activity.RegisterOptions]
RegisterActivityOptions struct {
// When an activity is a function the name is an actual activity type name.
// When an activity is part of a structure then each member of the structure becomes an activity with
// this Name as a prefix + activity function name.
//
// If this is set, users are strongly recommended to set
// worker.Options.DisableRegistrationAliasing at the worker level to prevent
// ambiguity between string names and function references. Also users should
// always use this string name when executing this activity.
Name string
// DisableAlreadyRegisteredCheck disables the check for already registered activities.
DisableAlreadyRegisteredCheck bool
// When registering a struct with activities, skip functions that are not valid activities. If false,
// registration panics.
SkipInvalidStructFunctions bool
}
// ActivityOptions stores all activity-specific parameters that will be stored inside of a context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But this is
// subject to change in the future.
//
// Exposed as: [go.temporal.io/sdk/workflow.ActivityOptions]
ActivityOptions struct {
// TaskQueue - Name of the task queue that the activity needs to be scheduled on.
//
// Optional: The default task queue with the same name as the workflow task queue.
TaskQueue string
// ScheduleToCloseTimeout - Total time that the workflow will wait for an Activity to complete.
// ScheduleToCloseTimeout limits the total time of an Activity's execution including retries
// (use StartToCloseTimeout to limit the time of a single attempt).
// The zero value of this uses default value.
// Either this option or StartToCloseTimeout is required: Defaults to unlimited.
ScheduleToCloseTimeout time.Duration
// ScheduleToStartTimeout - Time that the Activity Task can stay in the Task Queue before it is picked up by
// a Worker. Do not specify this timeout unless using host specific Task Queues for Activity Tasks are being
// used for routing. In almost all situations that don't involve routing activities to specific hosts, it is
// better to rely on the default value.
// ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense, as it would
// just put the Activity Task back into the same Task Queue.
//
// Optional: Defaults to unlimited.
ScheduleToStartTimeout time.Duration
// StartToCloseTimeout - Maximum time of a single Activity execution attempt.
// Note that the Temporal Server doesn't detect Worker process failures directly. It relies on this timeout
// to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest
// possible execution of the Activity body. Potentially long running Activities must specify HeartbeatTimeout
// and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection.
// Either this option or ScheduleToCloseTimeout is required: Defaults to the ScheduleToCloseTimeout value.
StartToCloseTimeout time.Duration
// HeartbeatTimeout - Heartbeat interval. Activity must call Activity.RecordHeartbeat(ctx, "my-heartbeat")
// before this interval passes after the last heartbeat or the Activity starts.
HeartbeatTimeout time.Duration
// WaitForCancellation - Whether to wait for canceled activity to be completed(
// activity can be failed, completed, cancel accepted)
//
// Optional: default false
WaitForCancellation bool
// ActivityID - Business-level activity ID. This is not typically needed.
//
// Optional: default empty string
ActivityID string
// RetryPolicy - Specifies how to retry an Activity if an error occurs.
// More details are available at docs.temporal.io.
// RetryPolicy is optional. If one is not specified, a default RetryPolicy is provided by the server.
// The default RetryPolicy provided by the server specifies:
// - InitialInterval of 1 second
// - BackoffCoefficient of 2.0
// - MaximumInterval of 100 x InitialInterval
// - MaximumAttempts of 0 (unlimited)
// To disable retries, set MaximumAttempts to 1.
// The default RetryPolicy provided by the server can be overridden by the dynamic config.
RetryPolicy *RetryPolicy
// If true, eager execution will not be requested, regardless of worker settings.
// If false, eager execution may still be disabled at the worker level or
// may not be requested due to lack of available slots.
//
// Eager activity execution means the server returns requested eager
// activities directly from the workflow task back to this worker. This is
// faster than non-eager, which may be dispatched to a separate worker.
DisableEagerExecution bool
// VersioningIntent - Specifies whether this activity should run on a worker with a compatible
// build ID or not. See temporal.VersioningIntent.
//
// Deprecated: Use Worker Deployment Versioning instead. See https://docs.temporal.io/worker-versioning
VersioningIntent VersioningIntent
// Summary is a single-line summary for this activity that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
// Priority - Optional priority settings that control relative ordering of
// task processing when tasks are backed up in a queue.
//
// WARNING: Task queue priority is currently experimental.
Priority Priority
}
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
//
// Exposed as: [go.temporal.io/sdk/workflow.LocalActivityOptions]
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity, including retries.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// Defaults to StartToCloseTimeout if not set.
ScheduleToCloseTimeout time.Duration
// StartToCloseTimeout - The timeout for a single execution of the local activity.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// Defaults to ScheduleToCloseTimeout if not set.
StartToCloseTimeout time.Duration
// RetryPolicy - Specify how to retry activity if error happens.
//
// Optional: default is to retry according to the default retry policy up to ScheduleToCloseTimeout
// with 1sec initial delay between retries and 2x backoff.
RetryPolicy *RetryPolicy
// Summary is a single-line summary for this activity that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}
)
// IsWorkflowActivity returns true if this activity was started by a workflow.
func (i *ActivityInfo) IsWorkflowActivity() bool {
return i.WorkflowExecution.ID != ""
}
// GetActivityInfo returns information about the currently executing activity.
//
// Exposed as: [go.temporal.io/sdk/activity.GetInfo]
func GetActivityInfo(ctx context.Context) ActivityInfo {
return getActivityOutboundInterceptor(ctx).GetInfo(ctx)
}
// HasHeartbeatDetails checks if there are heartbeat details from last attempt.
//
// Exposed as: [go.temporal.io/sdk/activity.HasHeartbeatDetails]
func HasHeartbeatDetails(ctx context.Context) bool {
return getActivityOutboundInterceptor(ctx).HasHeartbeatDetails(ctx)
}
// IsActivity checks if the context is an activity context from a normal or local activity.
//
// Exposed as: [go.temporal.io/sdk/activity.IsActivity]
func IsActivity(ctx context.Context) bool {
a := ctx.Value(activityInterceptorContextKey)
return a != nil
}
// GetHeartbeatDetails extracts heartbeat details from the last failed attempt. This is used in combination with the retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed, then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there were heartbeat
// details reported by activity from the failed attempt, the details would be delivered along with the activity task for
// the retry attempt. An activity can extract the details from GetHeartbeatDetails() and resume progress from there.
//
// Note: Values should not be reused for extraction here because merging on top
// of existing values may result in unexpected behavior similar to json.Unmarshal.
//
// Exposed as: [go.temporal.io/sdk/activity.GetHeartbeatDetails]
func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error {
return getActivityOutboundInterceptor(ctx).GetHeartbeatDetails(ctx, d...)
}
// GetActivityLogger returns a logger that can be used in the activity.
//
// Exposed as: [go.temporal.io/sdk/activity.GetLogger]
func GetActivityLogger(ctx context.Context) log.Logger {
return getActivityOutboundInterceptor(ctx).GetLogger(ctx)
}
// GetActivityMetricsHandler returns a metrics handler that can be used in the activity.
//
// Exposed as: [go.temporal.io/sdk/activity.GetMetricsHandler]
func GetActivityMetricsHandler(ctx context.Context) metrics.Handler {
return getActivityOutboundInterceptor(ctx).GetMetricsHandler(ctx)
}
// GetWorkerStopChannel returns a read-only channel. The closure of this channel indicates the activity worker is stopping.
// When the worker is stopping, it will close this channel and wait until the worker stop timeout finishes. After the timeout
// hits, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout.
// Use this channel to handle a graceful activity exit when the activity worker stops.
//
// Exposed as: [go.temporal.io/sdk/activity.GetWorkerStopChannel]
func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
return getActivityOutboundInterceptor(ctx).GetWorkerStopChannel(ctx)
}
// RecordActivityHeartbeat sends a heartbeat for the currently executing activity.
// If the activity is either canceled or workflow/activity doesn't exist, then we would cancel
// the context with error context.Canceled.
//
// TODO: Implement automatic heartbeating with cancellation through ctx.
//
// details - The details that you provided here can be seen in the workflow when it receives TimeoutError. You
// can check error TimeoutType()/Details().
//
// Exposed as: [go.temporal.io/sdk/activity.RecordHeartbeat]
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
getActivityOutboundInterceptor(ctx).RecordHeartbeat(ctx, details...)
}
// GetClient returns a client that can be used to interact with the Temporal
// service from an activity.
//
// Exposed as: [go.temporal.io/sdk/activity.GetClient]
func GetClient(ctx context.Context) Client {
return getActivityOutboundInterceptor(ctx).GetClient(ctx)
}
// ServiceInvoker abstracts calls to the Temporal service from an activity implementation.
// Implement to unit test activities.
type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is canceled
Heartbeat(ctx context.Context, details *commonpb.Payloads, skipBatching bool) error
Close(ctx context.Context, flushBufferedHeartbeat bool)
GetClient(options ClientOptions) Client
}
// WithActivityTask adds activity specific information into context.
// Use this method to unit test activity implementations that use context extractor methodshared.
func WithActivityTask(
ctx context.Context,
task *workflowservice.PollActivityTaskQueueResponse,
taskQueue string,
invoker ServiceInvoker,
logger log.Logger,
metricsHandler metrics.Handler,
dataConverter converter.DataConverter,
workerStopChannel <-chan struct{},
contextPropagators []ContextPropagator,
interceptors []WorkerInterceptor,
client *WorkflowClient,
) (context.Context, error) {
scheduled := task.GetScheduledTime().AsTime()
started := task.GetStartedTime().AsTime()
scheduleToCloseTimeout := task.GetScheduleToCloseTimeout().AsDuration()
startToCloseTimeout := task.GetStartToCloseTimeout().AsDuration()
heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration()
deadline := calculateActivityDeadline(scheduled, scheduleToCloseTimeout, startToCloseTimeout)
env := &activityEnvironment{
taskToken: task.TaskToken,
serviceInvoker: invoker,
activityType: ActivityType{Name: task.ActivityType.GetName()},
activityID: task.ActivityId,
metricsHandler: metricsHandler,
deadline: deadline,
heartbeatTimeout: heartbeatTimeout,
scheduleToCloseTimeout: scheduleToCloseTimeout,
startToCloseTimeout: startToCloseTimeout,
scheduledTime: scheduled,
startedTime: started,
taskQueue: taskQueue,
dataConverter: dataConverter,
attempt: task.GetAttempt(),
priority: task.GetPriority(),
heartbeatDetails: task.HeartbeatDetails,
namespace: task.WorkflowNamespace,
retryPolicy: convertFromPBRetryPolicy(task.RetryPolicy),
workerStopChannel: workerStopChannel,
contextPropagators: contextPropagators,
client: client,
}
if task.WorkflowExecution.GetWorkflowId() == "" {
env.activityRunID = task.ActivityRunId
env.logger = log.With(logger,
tagActivityID, task.ActivityId,
tagActivityRunID, task.ActivityRunId,
tagActivityType, task.ActivityType.GetName(),
tagAttempt, task.Attempt,
)
} else {
env.workflowExecution = WorkflowExecution{
ID: task.WorkflowExecution.GetWorkflowId(),
RunID: task.WorkflowExecution.GetRunId(),
}
env.workflowType = &WorkflowType{
Name: task.WorkflowType.GetName(),
}
env.logger = log.With(logger,
tagActivityID, task.ActivityId,
tagActivityType, task.ActivityType.GetName(),
tagAttempt, task.Attempt,
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
)
}
return newActivityContext(ctx, interceptors, env)
}
// WithLocalActivityTask adds local activity specific information into context.
func WithLocalActivityTask(
ctx context.Context,
task *localActivityTask,
logger log.Logger,
metricsHandler metrics.Handler,
dataConverter converter.DataConverter,
interceptors []WorkerInterceptor,
client *WorkflowClient,
workerStopChannel <-chan struct{},
) (context.Context, error) {
if ctx == nil {
ctx = context.Background()
}
workflowTypeLocal := task.params.WorkflowInfo.WorkflowType
workflowType := task.params.WorkflowInfo.WorkflowType.Name
activityType := task.params.ActivityType
logger = log.With(logger,
tagActivityID, task.activityID,
tagActivityType, activityType,
tagAttempt, task.attempt,
tagWorkflowType, workflowType,
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID,
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID,
)
startedTime := time.Now()
scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout
startToCloseTimeout := task.params.StartToCloseTimeout
if startToCloseTimeout == 0 {
startToCloseTimeout = scheduleToCloseTimeout
}
if scheduleToCloseTimeout == 0 {
scheduleToCloseTimeout = startToCloseTimeout
}
deadline := calculateActivityDeadline(task.scheduledTime, scheduleToCloseTimeout, startToCloseTimeout)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}
return newActivityContext(ctx, interceptors, &activityEnvironment{
workflowType: &workflowTypeLocal,
namespace: task.params.WorkflowInfo.Namespace,
taskQueue: task.params.WorkflowInfo.TaskQueueName,
activityType: ActivityType{Name: activityType},
activityID: fmt.Sprintf("%v", task.activityID),
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
logger: logger,
metricsHandler: metricsHandler,
scheduleToCloseTimeout: scheduleToCloseTimeout,
startToCloseTimeout: startToCloseTimeout,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
retryPolicy: task.retryPolicy,
client: client,
workerStopChannel: workerStopChannel,
})
}
func newActivityContext(
ctx context.Context,
interceptors []WorkerInterceptor,
env *activityEnvironment,
) (context.Context, error) {
ctx = context.WithValue(ctx, activityEnvContextKey, env)
// Create interceptor with default inbound and outbound values and put on
// context
envInterceptor := &activityEnvironmentInterceptor{env: env}
envInterceptor.inboundInterceptor = envInterceptor
envInterceptor.outboundInterceptor = envInterceptor
ctx = context.WithValue(ctx, activityEnvInterceptorContextKey, envInterceptor)
ctx = context.WithValue(ctx, activityInterceptorContextKey, envInterceptor.outboundInterceptor)
// Intercept, run init, and put the new outbound interceptor on the context
for i := len(interceptors) - 1; i >= 0; i-- {
envInterceptor.inboundInterceptor = interceptors[i].InterceptActivity(ctx, envInterceptor.inboundInterceptor)
}
err := envInterceptor.inboundInterceptor.Init(envInterceptor)
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, activityInterceptorContextKey, envInterceptor.outboundInterceptor)
return ctx, nil
}
func calculateActivityDeadline(scheduled time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
startToCloseDeadline := time.Now().Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
return scheduleToCloseDeadline
}
}
return startToCloseDeadline
}