-
Notifications
You must be signed in to change notification settings - Fork 237
Expand file tree
/
Copy pathretry_activity_workflow.go
More file actions
75 lines (66 loc) · 2.61 KB
/
retry_activity_workflow.go
File metadata and controls
75 lines (66 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package retryactivity
import (
"context"
"math/rand"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// RetryWorkflow executes BatchProcessingActivity with a retry policy and no attempt cap.
// The activity heartbeats progress after each task so retries resume from where they left off,
// rather than starting over. This makes it suitable for demonstrating activity pause/unpause:
// pausing mid-execution shows the last heartbeated task index in the UI, and unpausing
// resumes from that point.
func RetryWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
HeartbeatTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 5 * time.Second,
BackoffCoefficient: 1.0,
MaximumInterval: 5 * time.Second,
// No MaximumAttempts — retries indefinitely until paused or cancelled.
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Large batch ensures the activity never completes naturally; pause it to stop it.
err := workflow.ExecuteActivity(ctx, BatchProcessingActivity, 0, 10, 2*time.Second).Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Info("Workflow completed with error.", "Error", err)
return err
}
workflow.GetLogger(ctx).Info("Workflow completed.")
return nil
}
// BatchProcessingActivity processes tasks one at a time, sleeping to simulate real work.
// After each task it heartbeats the task index as progress. On retry the activity resumes
// from the last heartbeated index rather than starting over.
// It always fails after 3 tasks, creating a high failure rate that keeps the retry loop going.
func BatchProcessingActivity(ctx context.Context, firstTaskID, batchSize int, processDelay time.Duration) error {
logger := activity.GetLogger(ctx)
i := firstTaskID
if activity.HasHeartbeatDetails(ctx) {
// Resume from reported progress on retry.
var completedIdx int
if err := activity.GetHeartbeatDetails(ctx, &completedIdx); err == nil {
i = completedIdx + 1
logger.Info("Resuming from previous attempt", "ResumedAt", i)
}
}
taskProcessedInThisAttempt := 0
for ; i < firstTaskID+batchSize; i++ {
// Inject a 95% failure rate before doing any work on this task.
if rand.Intn(100) < 95 {
logger.Info("Simulating transient failure", "TaskID", i)
time.Sleep(5 * time.Second)
return temporal.NewApplicationError("transient error", "SomeType")
}
logger.Info("Processing task", "TaskID", i)
time.Sleep(processDelay)
activity.RecordHeartbeat(ctx, i)
taskProcessedInThisAttempt++
}
logger.Info("Activity succeeded.")
return nil
}