Skip to content

Commit dd61397

Browse files
authored
refactor: migrate ActivityApiBatchUnpauseClientTestSuite to parallelsuite (#9677)
Migrate `ActivityApiBatchUnpauseClientTestSuite` from testify suite + `FunctionalTestBase` to `parallelsuite`, following the same pattern as `ActivityAPIBatchResetClientTestSuite`.
1 parent c289998 commit dd61397

1 file changed

Lines changed: 42 additions & 39 deletions

File tree

tests/activity_api_batch_unpause_test.go

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/google/uuid"
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
14-
"github.com/stretchr/testify/suite"
1514
"github.com/temporalio/sqlparser"
1615
batchpb "go.temporal.io/api/batch/v1"
1716
commonpb "go.temporal.io/api/common/v1"
@@ -22,17 +21,17 @@ import (
2221
"go.temporal.io/sdk/temporal"
2322
"go.temporal.io/sdk/workflow"
2423
"go.temporal.io/server/common/searchattribute/sadefs"
24+
"go.temporal.io/server/common/testing/parallelsuite"
2525
"go.temporal.io/server/tests/testcore"
2626
"google.golang.org/grpc/codes"
2727
)
2828

2929
type ActivityApiBatchUnpauseClientTestSuite struct {
30-
testcore.FunctionalTestBase
30+
parallelsuite.Suite[*ActivityApiBatchUnpauseClientTestSuite]
3131
}
3232

3333
func TestActivityApiBatchUnpauseClientTestSuite(t *testing.T) {
34-
s := new(ActivityApiBatchUnpauseClientTestSuite)
35-
suite.Run(t, s)
34+
parallelsuite.Run(t, &ActivityApiBatchUnpauseClientTestSuite{})
3635
}
3736

3837
type internalTestWorkflow struct {
@@ -80,62 +79,63 @@ func (w *internalTestWorkflow) ActivityFunc() (string, error) {
8079
return "done!", nil
8180
}
8281

83-
func (s *ActivityApiBatchUnpauseClientTestSuite) createWorkflow(ctx context.Context, workflowFn WorkflowFunction) sdkclient.WorkflowRun {
82+
func (s *ActivityApiBatchUnpauseClientTestSuite) createWorkflow(env *testcore.TestEnv, workflowFn WorkflowFunction) sdkclient.WorkflowRun {
8483
workflowOptions := sdkclient.StartWorkflowOptions{
8584
ID: testcore.RandomizeStr("wf_id-" + s.T().Name()),
86-
TaskQueue: s.TaskQueue(),
85+
TaskQueue: env.WorkerTaskQueue(),
8786
}
88-
workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, workflowFn)
87+
workflowRun, err := env.SdkClient().ExecuteWorkflow(env.Context(), workflowOptions, workflowFn)
8988
s.NoError(err)
9089
s.NotNil(workflowRun)
9190

9291
return workflowRun
9392
}
9493

9594
func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Success() {
95+
env := testcore.NewEnv(s.T())
9696
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
9797
defer cancel()
9898

9999
internalWorkflow := newInternalWorkflow()
100100

101-
s.SdkWorker().RegisterWorkflow(internalWorkflow.WorkflowFunc)
102-
s.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc)
101+
env.SdkWorker().RegisterWorkflow(internalWorkflow.WorkflowFunc)
102+
env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc)
103103

104-
workflowRun1 := s.createWorkflow(ctx, internalWorkflow.WorkflowFunc)
105-
workflowRun2 := s.createWorkflow(ctx, internalWorkflow.WorkflowFunc)
104+
workflowRun1 := s.createWorkflow(env, internalWorkflow.WorkflowFunc)
105+
workflowRun2 := s.createWorkflow(env, internalWorkflow.WorkflowFunc)
106106

107107
// wait for activity to start in both workflows
108108
s.EventuallyWithT(func(t *assert.CollectT) {
109-
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
109+
description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
110110
require.NoError(t, err)
111111
require.Len(t, description.GetPendingActivities(), 1)
112112
require.Positive(t, internalWorkflow.startedActivityCount.Load())
113113

114-
description, err = s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun2.GetID(), workflowRun2.GetRunID())
114+
description, err = env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun2.GetID(), workflowRun2.GetRunID())
115115
require.NoError(t, err)
116116
require.Len(t, description.GetPendingActivities(), 1)
117117
require.Positive(t, internalWorkflow.startedActivityCount.Load())
118118
}, 5*time.Second, 100*time.Millisecond)
119119

120120
// pause activities in both workflows
121121
pauseRequest := &workflowservice.PauseActivityRequest{
122-
Namespace: s.Namespace().String(),
122+
Namespace: env.Namespace().String(),
123123
Execution: &commonpb.WorkflowExecution{},
124124
Activity: &workflowservice.PauseActivityRequest_Id{Id: "activity-id"},
125125
}
126126
pauseRequest.Execution.WorkflowId = workflowRun1.GetID()
127-
resp, err := s.FrontendClient().PauseActivity(ctx, pauseRequest)
127+
resp, err := env.FrontendClient().PauseActivity(ctx, pauseRequest)
128128
s.NoError(err)
129129
s.NotNil(resp)
130130

131131
pauseRequest.Execution.WorkflowId = workflowRun2.GetID()
132-
resp, err = s.FrontendClient().PauseActivity(ctx, pauseRequest)
132+
resp, err = env.FrontendClient().PauseActivity(ctx, pauseRequest)
133133
s.NoError(err)
134134
s.NotNil(resp)
135135

136136
// wait for activities to be paused
137137
s.EventuallyWithT(func(t *assert.CollectT) {
138-
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
138+
description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
139139
require.NoError(t, err)
140140
require.Len(t, description.GetPendingActivities(), 1)
141141
require.True(t, description.PendingActivities[0].Paused)
@@ -151,8 +151,8 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes
151151
query := fmt.Sprintf("(WorkflowType='%s' AND %s)", workflowTypeName, unpauseCause)
152152

153153
s.EventuallyWithT(func(t *assert.CollectT) {
154-
listResp, err = s.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
155-
Namespace: s.Namespace().String(),
154+
listResp, err = env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
155+
Namespace: env.Namespace().String(),
156156
PageSize: 10,
157157
Query: query,
158158
})
@@ -162,8 +162,8 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes
162162
}, 5*time.Second, 500*time.Millisecond)
163163

164164
// unpause the activities in both workflows with batch unpause
165-
_, err = s.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
166-
Namespace: s.Namespace().String(),
165+
_, err = env.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
166+
Namespace: env.Namespace().String(),
167167
Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
168168
UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{
169169
Activity: &batchpb.BatchOperationUnpauseActivities_Type{Type: activityTypeName},
@@ -177,11 +177,11 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes
177177

178178
// make sure activities are unpaused
179179
s.EventuallyWithT(func(t *assert.CollectT) {
180-
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
180+
description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
181181
require.NoError(t, err)
182182
require.Len(t, description.PendingActivities, 1)
183183
require.False(t, description.PendingActivities[0].Paused)
184-
description, err = s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun2.GetID(), workflowRun2.GetRunID())
184+
description, err = env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun2.GetID(), workflowRun2.GetRunID())
185185
require.NoError(t, err)
186186
require.Len(t, description.PendingActivities, 1)
187187
require.False(t, description.PendingActivities[0].Paused)
@@ -199,9 +199,11 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes
199199
}
200200

201201
func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed() {
202+
env := testcore.NewEnv(s.T())
203+
202204
// neither activity type not "match all" is provided
203-
_, err := s.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
204-
Namespace: s.Namespace().String(),
205+
_, err := env.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
206+
Namespace: env.Namespace().String(),
205207
Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
206208
UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{},
207209
},
@@ -214,8 +216,8 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed
214216
s.ErrorAs(err, new(*serviceerror.InvalidArgument))
215217

216218
// neither activity type not "match all" is provided
217-
_, err = s.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
218-
Namespace: s.Namespace().String(),
219+
_, err = env.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
220+
Namespace: env.Namespace().String(),
219221
Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
220222
UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{
221223
Activity: &batchpb.BatchOperationUnpauseActivities_Type{Type: ""},
@@ -235,6 +237,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed
235237
// This is an end-to-end complement to the unit-level checkNamespace tests: it
236238
// exercises the full path from StartBatchOperation through the batcher worker.
237239
func (s *ActivityApiBatchUnpauseClientTestSuite) TestBatchTerminate_NamespaceIsolation() {
240+
env := testcore.NewEnv(s.T())
238241
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
239242
defer cancel()
240243

@@ -243,7 +246,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestBatchTerminate_NamespaceIso
243246
sleepWorkflow := func(ctx workflow.Context) error {
244247
return workflow.Sleep(ctx, 24*time.Hour)
245248
}
246-
s.SdkWorker().RegisterWorkflowWithOptions(sleepWorkflow, workflow.RegisterOptions{Name: wfTypeName})
249+
env.SdkWorker().RegisterWorkflowWithOptions(sleepWorkflow, workflow.RegisterOptions{Name: wfTypeName})
247250

248251
// Start two workflows in the primary namespace (worker is registered and will execute them).
249252
startWf := func(client sdkclient.Client, taskQueue string) sdkclient.WorkflowRun {
@@ -254,26 +257,26 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestBatchTerminate_NamespaceIso
254257
s.NoError(err)
255258
return run
256259
}
257-
primaryRun1 := startWf(s.SdkClient(), s.TaskQueue())
258-
primaryRun2 := startWf(s.SdkClient(), s.TaskQueue())
260+
primaryRun1 := startWf(env.SdkClient(), env.WorkerTaskQueue())
261+
primaryRun2 := startWf(env.SdkClient(), env.WorkerTaskQueue())
259262

260263
// Create a client for the external namespace and start two workflows there.
261264
// No worker polls this task queue in the external namespace, so these workflows
262265
// will remain in RUNNING state without executing.
263266
extClient, err := sdkclient.Dial(sdkclient.Options{
264-
HostPort: s.FrontendGRPCAddress(),
265-
Namespace: s.ExternalNamespace().String(),
267+
HostPort: env.FrontendGRPCAddress(),
268+
Namespace: env.ExternalNamespace().String(),
266269
})
267270
s.NoError(err)
268271
defer extClient.Close()
269-
extRun1 := startWf(extClient, s.TaskQueue())
270-
extRun2 := startWf(extClient, s.TaskQueue())
272+
extRun1 := startWf(extClient, env.WorkerTaskQueue())
273+
extRun2 := startWf(extClient, env.WorkerTaskQueue())
271274

272275
// Wait for both primary-namespace workflows to be indexed in visibility before
273276
// submitting the batch, which uses a visibility query to find its targets.
274277
s.EventuallyWithT(func(t *assert.CollectT) {
275-
resp, err := s.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
276-
Namespace: s.Namespace().String(),
278+
resp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
279+
Namespace: env.Namespace().String(),
277280
Query: fmt.Sprintf("WorkflowType='%s'", wfTypeName),
278281
PageSize: 10,
279282
})
@@ -282,8 +285,8 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestBatchTerminate_NamespaceIso
282285
}, 10*time.Second, 500*time.Millisecond)
283286

284287
// Batch-terminate all workflows of this type in the primary namespace only.
285-
_, err = s.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{
286-
Namespace: s.Namespace().String(),
288+
_, err = env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{
289+
Namespace: env.Namespace().String(),
287290
VisibilityQuery: fmt.Sprintf("WorkflowType='%s'", wfTypeName),
288291
JobId: uuid.NewString(),
289292
Reason: "namespace-isolation-test",
@@ -296,7 +299,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestBatchTerminate_NamespaceIso
296299
// Primary-namespace workflows must reach TERMINATED status.
297300
s.EventuallyWithT(func(t *assert.CollectT) {
298301
for _, run := range []sdkclient.WorkflowRun{primaryRun1, primaryRun2} {
299-
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID())
302+
desc, err := env.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID())
300303
require.NoError(t, err)
301304
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED, desc.WorkflowExecutionInfo.Status)
302305
}

0 commit comments

Comments
 (0)