diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 1c86a5208..697c39237 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -1771,7 +1772,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_Message_Admitted_Paged() { func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() { backoffInterval := 10 * time.Millisecond workflowComplete := false - laFailures := 0 + var laFailures atomic.Uint64 retryLocalActivityWorkflowFunc := func(ctx Context, input []byte) error { ao := LocalActivityOptions{ @@ -1786,11 +1787,11 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() { ctx = WithLocalActivityOptions(ctx, ao) err := ExecuteLocalActivity(ctx, func() error { - if laFailures > 2 { + if laFailures.Load() > 2 { return nil } - laFailures++ - return errors.New("fail number " + strconv.Itoa(laFailures)) + laFailures.Add(1) + return errors.New("fail number " + strconv.Itoa(int(laFailures.Load()))) }).Get(ctx, nil) workflowComplete = true return err @@ -1834,7 +1835,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() { task, _ := laTaskPoller.PollTask() _ = laTaskPoller.ProcessTask(task) // Quit after we've polled enough times - if laFailures == 4 { + if laFailures.Load() == 4 { return } } diff --git a/test/integration_test.go b/test/integration_test.go index a143127f6..943fad354 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -612,12 +612,8 @@ func (ts *IntegrationTestSuite) TestActivityPause() { ts.Len(desc.GetPendingActivities(), 1) ts.Equal(desc.GetPendingActivities()[0].GetActivityType().GetName(), "ActivityToBePaused") ts.Equal(desc.GetPendingActivities()[0].GetAttempt(), int32(1)) - if os.Getenv("DISABLE_SERVER_1_27_TESTS") == "1" { - ts.Nil(desc.GetPendingActivities()[0].GetLastFailure()) - } else { - ts.NotNil(desc.GetPendingActivities()[0].GetLastFailure()) - ts.Equal(desc.GetPendingActivities()[0].GetLastFailure().GetMessage(), "activity paused") - } + ts.NotNil(desc.GetPendingActivities()[0].GetLastFailure()) + ts.Equal(desc.GetPendingActivities()[0].GetLastFailure().GetMessage(), "activity paused") ts.True(desc.GetPendingActivities()[0].GetPaused()) } @@ -6594,151 +6590,6 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateWorkflowActionMemo() { } } -func (ts *IntegrationTestSuite) TestVersioningBehaviorInRespondWorkflowTaskCompletedRequest() { - versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0) - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) - defer cancel() - - seriesName := "deploy-test-" + uuid.NewString() - res, err := ts.client.DeploymentClient().SetCurrent(ctx, client.DeploymentSetCurrentOptions{ - Deployment: client.Deployment{ - BuildID: "1.0", - SeriesName: seriesName, - }, - }) - ts.NoError(err) - ts.True(res.Current.IsCurrent) - ts.Equal(res.Current.Deployment.BuildID, "1.0") - ts.Equal(res.Current.Deployment.SeriesName, seriesName) - ts.Empty(res.Previous.Deployment) - - c, err := client.Dial(client.Options{ - HostPort: ts.config.ServiceAddr, - Namespace: ts.config.Namespace, - ConnectionOptions: client.ConnectionOptions{ - TLS: ts.config.TLS, - DialOptions: []grpc.DialOption{ - grpc.WithUnaryInterceptor(func( - ctx context.Context, - method string, - req interface{}, - reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, - ) error { - if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" { - asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest) - versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior) - } - return invoker(ctx, method, req, reply, cc, opts...) - }), - }, - }, - }) - ts.NoError(err) - defer c.Close() - - ts.worker.Stop() - ts.workerStopped = true - w := worker.New(c, ts.taskQueueName, worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - UseVersioning: true, - Version: worker.WorkerDeploymentVersion{ - DeploymentName: seriesName, - BuildId: "1.0", - }, - DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }, - }) - ts.registerWorkflowsAndActivities(w) - ts.Nil(w.Start()) - defer w.Stop() - - wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior") - ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil)) - - ts.Equal(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, versioningBehaviorAll[0]) - for i := 1; i < len(versioningBehaviorAll); i++ { - ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE) - } -} - -func (ts *IntegrationTestSuite) TestVersioningBehaviorPerWorkflowType() { - versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0) - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) - defer cancel() - - seriesName := "deploy-test-" + uuid.NewString() - - res, err := ts.client.DeploymentClient().SetCurrent(ctx, client.DeploymentSetCurrentOptions{ - Deployment: client.Deployment{ - BuildID: "1.0", - SeriesName: seriesName, - }, - }) - ts.NoError(err) - ts.True(res.Current.IsCurrent) - ts.Equal(res.Current.Deployment.BuildID, "1.0") - ts.Equal(res.Current.Deployment.SeriesName, seriesName) - ts.Empty(res.Previous.Deployment) - - c, err := client.Dial(client.Options{ - HostPort: ts.config.ServiceAddr, - Namespace: ts.config.Namespace, - ConnectionOptions: client.ConnectionOptions{ - TLS: ts.config.TLS, - DialOptions: []grpc.DialOption{ - grpc.WithUnaryInterceptor(func( - ctx context.Context, - method string, - req interface{}, - reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, - ) error { - if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" { - asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest) - versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior) - } - return invoker(ctx, method, req, reply, cc, opts...) - }), - }, - }, - }) - ts.NoError(err) - defer c.Close() - - ts.worker.Stop() - ts.workerStopped = true - w := worker.New(c, ts.taskQueueName, worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - UseVersioning: true, - Version: worker.WorkerDeploymentVersion{ - DeploymentName: seriesName, - BuildId: "1.0", - }, - DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }, - }) - - w.RegisterWorkflowWithOptions(ts.workflows.Basic, workflow.RegisterOptions{ - VersioningBehavior: workflow.VersioningBehaviorPinned, - }) - ts.activities.register(w) - - ts.Nil(w.Start()) - defer w.Stop() - wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior-per-type") - ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil)) - - ts.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, versioningBehaviorAll[0]) - for i := 1; i < len(versioningBehaviorAll); i++ { - ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_PINNED) - } -} - func (ts *IntegrationTestSuite) TestNoVersioningBehaviorPanics() { seriesName := "deploy-test-" + uuid.NewString()