Skip to content

Commit db89ef7

Browse files
committed
Fix ThroughputStressWorkflow
1 parent cbf9369 commit db89ef7

2 files changed

Lines changed: 21 additions & 17 deletions

File tree

scenarios/throughput_stress.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
111111
return fmt.Errorf("failed to parse %s: %w", VisibilityVerificationTimeout, err)
112112
}
113113
timeout := time.Duration(1*internalIterations) * internalIterTimeout
114+
fmt.Println("Timeout for each workflow run: ", timeout)
114115

115116
// Initialize the scenario run.
116117
if t.isResuming {

workers/go/throughputstress/workflow.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
4040
return nil, err
4141
}
4242
err = workflow.SetUpdateHandler(ctx, UpdateActivity, func(ctx workflow.Context) error {
43-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
43+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
4444
return workflow.ExecuteActivity(
4545
actCtx, activityStub.Payload, MakePayloadInput(0, 256)).Get(ctx, nil)
4646
})
4747
if err != nil {
4848
return nil, err
4949
}
5050
err = workflow.SetUpdateHandler(ctx, UpdateLocalActivity, func(ctx workflow.Context) error {
51-
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts())
51+
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts(ctx))
5252
return workflow.ExecuteLocalActivity(
5353
localActCtx, activityStub.Payload, MakePayloadInput(0, 256)).Get(ctx, nil)
5454
})
@@ -57,7 +57,7 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
5757
}
5858
for i := params.InitialIteration; i < params.Iterations; i++ {
5959
// Repeat the steps as defined by the ancient ritual
60-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
60+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
6161
err = workflow.ExecuteActivity(
6262
actCtx, activityStub.Payload, MakePayloadInput(256, 256)).Get(ctx, nil)
6363
if err != nil {
@@ -74,7 +74,7 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
7474
return nil, err
7575
}
7676

77-
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts())
77+
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts(ctx))
7878
err = workflow.ExecuteLocalActivity(
7979
localActCtx, activityStub.Payload, MakePayloadInput(0, 256)).Get(ctx, nil)
8080
if err != nil {
@@ -93,7 +93,8 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
9393
// run to the child.
9494
attrs := workflow.GetInfo(ctx).SearchAttributes.IndexedFields
9595
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
96-
WorkflowID: fmt.Sprintf("%s/child-%d", workflow.GetInfo(ctx).WorkflowExecution.ID, params.ChildrenSpawned),
96+
WorkflowExecutionTimeout: workflow.GetInfo(ctx).WorkflowExecutionTimeout,
97+
WorkflowID: fmt.Sprintf("%s/child-%d", workflow.GetInfo(ctx).WorkflowExecution.ID, params.ChildrenSpawned),
9798
SearchAttributes: map[string]interface{}{
9899
scenarios.ThroughputStressScenarioIdSearchAttribute: attrs[scenarios.ThroughputStressScenarioIdSearchAttribute],
99100
},
@@ -103,30 +104,30 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
103104
return child.Get(ctx, nil)
104105
},
105106
func(ctx workflow.Context) error {
106-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
107+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
107108
return workflow.ExecuteActivity(
108109
actCtx, activityStub.Payload, MakePayloadInput(256, 256)).Get(ctx, nil)
109110
},
110111
func(ctx workflow.Context) error {
111-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
112+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
112113
return workflow.ExecuteActivity(
113114
actCtx, activityStub.Payload, MakePayloadInput(256, 256)).Get(ctx, nil)
114115
},
115116
func(ctx workflow.Context) error {
116-
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts())
117+
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts(ctx))
117118
return workflow.ExecuteLocalActivity(
118119
localActCtx, activityStub.Payload, MakePayloadInput(0, 256)).Get(ctx, nil)
119120
},
120121
func(ctx workflow.Context) error {
121-
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts())
122+
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts(ctx))
122123
return workflow.ExecuteLocalActivity(
123124
localActCtx, activityStub.Payload, MakePayloadInput(0, 256)).Get(ctx, nil)
124125
},
125126
func(ctx workflow.Context) error {
126127
// This self-signal activity didn't exist in the original bench-go workflow, but
127128
// it was signaled semi-routinely externally. This is slightly more obvious and
128129
// introduces receiving signals in the workflow.
129-
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts())
130+
localActCtx := workflow.WithLocalActivityOptions(ctx, defaultLocalActivityOpts(ctx))
130131
return workflow.ExecuteLocalActivity(localActCtx, activityStub.SelfSignal, ASignal).Get(ctx, nil)
131132
},
132133
func(ctx workflow.Context) error {
@@ -136,26 +137,26 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
136137
if actInput == nil {
137138
return nil
138139
}
139-
opts := defaultActivityOpts()
140+
opts := defaultActivityOpts(ctx)
140141
opts.Priority = temporal.Priority{
141142
PriorityKey: int(actInput.Priority),
142143
}
143144
actCtx := workflow.WithActivityOptions(ctx, opts)
144145
return workflow.ExecuteActivity(actCtx, activityStub.Sleep, actInput).Get(ctx, nil)
145146
},
146147
func(ctx workflow.Context) error {
147-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
148+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
148149
if params.SkipSleep {
149150
return nil
150151
}
151152
return workflow.ExecuteActivity(actCtx, activityStub.SelfUpdate, UpdateSleep).Get(ctx, nil)
152153
},
153154
func(ctx workflow.Context) error {
154-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
155+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
155156
return workflow.ExecuteActivity(actCtx, activityStub.SelfUpdate, UpdateActivity).Get(ctx, nil)
156157
},
157158
func(ctx workflow.Context) error {
158-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
159+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
159160
return workflow.ExecuteActivity(actCtx, activityStub.SelfUpdate, UpdateLocalActivity).Get(ctx, nil)
160161
},
161162
func(ctx workflow.Context) error {
@@ -249,7 +250,7 @@ func runEchoNexusOperation(ctx workflow.Context, params *throughputstress.Workfl
249250

250251
func ThroughputStressChild(ctx workflow.Context) error {
251252
for i := 0; i < 3; i++ {
252-
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts())
253+
actCtx := workflow.WithActivityOptions(ctx, defaultActivityOpts(ctx))
253254
err := workflow.ExecuteActivity(
254255
actCtx, activityStub.Payload, MakePayloadInput(256, 256)).Get(ctx, nil)
255256
if err != nil {
@@ -259,16 +260,18 @@ func ThroughputStressChild(ctx workflow.Context) error {
259260
return nil
260261
}
261262

262-
func defaultActivityOpts() workflow.ActivityOptions {
263+
func defaultActivityOpts(ctx workflow.Context) workflow.ActivityOptions {
263264
return workflow.ActivityOptions{
265+
StartToCloseTimeout: workflow.GetInfo(ctx).WorkflowExecutionTimeout,
264266
RetryPolicy: &temporal.RetryPolicy{
265267
MaximumAttempts: 5,
266268
},
267269
}
268270
}
269271

270-
func defaultLocalActivityOpts() workflow.LocalActivityOptions {
272+
func defaultLocalActivityOpts(ctx workflow.Context) workflow.LocalActivityOptions {
271273
return workflow.LocalActivityOptions{
274+
StartToCloseTimeout: workflow.GetInfo(ctx).WorkflowExecutionTimeout,
272275
RetryPolicy: &temporal.RetryPolicy{
273276
InitialInterval: 10 * time.Millisecond,
274277
MaximumAttempts: 10,

0 commit comments

Comments
 (0)