Skip to content

Commit f7174c2

Browse files
Fix TestActivityPause for Temporal Server v1.28.0 (temporalio#1989)
Fix TestActivityPause for Temporal Server v1.28.0
1 parent a1a14b9 commit f7174c2

2 files changed

Lines changed: 8 additions & 156 deletions

File tree

internal/internal_task_handlers_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"strconv"
88
"sync"
9+
"sync/atomic"
910
"testing"
1011
"time"
1112

@@ -1771,7 +1772,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_Message_Admitted_Paged() {
17711772
func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
17721773
backoffInterval := 10 * time.Millisecond
17731774
workflowComplete := false
1774-
laFailures := 0
1775+
var laFailures atomic.Uint64
17751776

17761777
retryLocalActivityWorkflowFunc := func(ctx Context, input []byte) error {
17771778
ao := LocalActivityOptions{
@@ -1786,11 +1787,11 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
17861787
ctx = WithLocalActivityOptions(ctx, ao)
17871788

17881789
err := ExecuteLocalActivity(ctx, func() error {
1789-
if laFailures > 2 {
1790+
if laFailures.Load() > 2 {
17901791
return nil
17911792
}
1792-
laFailures++
1793-
return errors.New("fail number " + strconv.Itoa(laFailures))
1793+
laFailures.Add(1)
1794+
return errors.New("fail number " + strconv.Itoa(int(laFailures.Load())))
17941795
}).Get(ctx, nil)
17951796
workflowComplete = true
17961797
return err
@@ -1834,7 +1835,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
18341835
task, _ := laTaskPoller.PollTask()
18351836
_ = laTaskPoller.ProcessTask(task)
18361837
// Quit after we've polled enough times
1837-
if laFailures == 4 {
1838+
if laFailures.Load() == 4 {
18381839
return
18391840
}
18401841
}

test/integration_test.go

Lines changed: 2 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -612,12 +612,8 @@ func (ts *IntegrationTestSuite) TestActivityPause() {
612612
ts.Len(desc.GetPendingActivities(), 1)
613613
ts.Equal(desc.GetPendingActivities()[0].GetActivityType().GetName(), "ActivityToBePaused")
614614
ts.Equal(desc.GetPendingActivities()[0].GetAttempt(), int32(1))
615-
if os.Getenv("DISABLE_SERVER_1_27_TESTS") == "1" {
616-
ts.Nil(desc.GetPendingActivities()[0].GetLastFailure())
617-
} else {
618-
ts.NotNil(desc.GetPendingActivities()[0].GetLastFailure())
619-
ts.Equal(desc.GetPendingActivities()[0].GetLastFailure().GetMessage(), "activity paused")
620-
}
615+
ts.NotNil(desc.GetPendingActivities()[0].GetLastFailure())
616+
ts.Equal(desc.GetPendingActivities()[0].GetLastFailure().GetMessage(), "activity paused")
621617
ts.True(desc.GetPendingActivities()[0].GetPaused())
622618
}
623619

@@ -6594,151 +6590,6 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateWorkflowActionMemo() {
65946590
}
65956591
}
65966592

6597-
func (ts *IntegrationTestSuite) TestVersioningBehaviorInRespondWorkflowTaskCompletedRequest() {
6598-
versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0)
6599-
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
6600-
defer cancel()
6601-
6602-
seriesName := "deploy-test-" + uuid.NewString()
6603-
res, err := ts.client.DeploymentClient().SetCurrent(ctx, client.DeploymentSetCurrentOptions{
6604-
Deployment: client.Deployment{
6605-
BuildID: "1.0",
6606-
SeriesName: seriesName,
6607-
},
6608-
})
6609-
ts.NoError(err)
6610-
ts.True(res.Current.IsCurrent)
6611-
ts.Equal(res.Current.Deployment.BuildID, "1.0")
6612-
ts.Equal(res.Current.Deployment.SeriesName, seriesName)
6613-
ts.Empty(res.Previous.Deployment)
6614-
6615-
c, err := client.Dial(client.Options{
6616-
HostPort: ts.config.ServiceAddr,
6617-
Namespace: ts.config.Namespace,
6618-
ConnectionOptions: client.ConnectionOptions{
6619-
TLS: ts.config.TLS,
6620-
DialOptions: []grpc.DialOption{
6621-
grpc.WithUnaryInterceptor(func(
6622-
ctx context.Context,
6623-
method string,
6624-
req interface{},
6625-
reply interface{},
6626-
cc *grpc.ClientConn,
6627-
invoker grpc.UnaryInvoker,
6628-
opts ...grpc.CallOption,
6629-
) error {
6630-
if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" {
6631-
asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest)
6632-
versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior)
6633-
}
6634-
return invoker(ctx, method, req, reply, cc, opts...)
6635-
}),
6636-
},
6637-
},
6638-
})
6639-
ts.NoError(err)
6640-
defer c.Close()
6641-
6642-
ts.worker.Stop()
6643-
ts.workerStopped = true
6644-
w := worker.New(c, ts.taskQueueName, worker.Options{
6645-
DeploymentOptions: worker.DeploymentOptions{
6646-
UseVersioning: true,
6647-
Version: worker.WorkerDeploymentVersion{
6648-
DeploymentName: seriesName,
6649-
BuildId: "1.0",
6650-
},
6651-
DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
6652-
},
6653-
})
6654-
ts.registerWorkflowsAndActivities(w)
6655-
ts.Nil(w.Start())
6656-
defer w.Stop()
6657-
6658-
wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior")
6659-
ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil))
6660-
6661-
ts.Equal(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, versioningBehaviorAll[0])
6662-
for i := 1; i < len(versioningBehaviorAll); i++ {
6663-
ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE)
6664-
}
6665-
}
6666-
6667-
func (ts *IntegrationTestSuite) TestVersioningBehaviorPerWorkflowType() {
6668-
versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0)
6669-
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
6670-
defer cancel()
6671-
6672-
seriesName := "deploy-test-" + uuid.NewString()
6673-
6674-
res, err := ts.client.DeploymentClient().SetCurrent(ctx, client.DeploymentSetCurrentOptions{
6675-
Deployment: client.Deployment{
6676-
BuildID: "1.0",
6677-
SeriesName: seriesName,
6678-
},
6679-
})
6680-
ts.NoError(err)
6681-
ts.True(res.Current.IsCurrent)
6682-
ts.Equal(res.Current.Deployment.BuildID, "1.0")
6683-
ts.Equal(res.Current.Deployment.SeriesName, seriesName)
6684-
ts.Empty(res.Previous.Deployment)
6685-
6686-
c, err := client.Dial(client.Options{
6687-
HostPort: ts.config.ServiceAddr,
6688-
Namespace: ts.config.Namespace,
6689-
ConnectionOptions: client.ConnectionOptions{
6690-
TLS: ts.config.TLS,
6691-
DialOptions: []grpc.DialOption{
6692-
grpc.WithUnaryInterceptor(func(
6693-
ctx context.Context,
6694-
method string,
6695-
req interface{},
6696-
reply interface{},
6697-
cc *grpc.ClientConn,
6698-
invoker grpc.UnaryInvoker,
6699-
opts ...grpc.CallOption,
6700-
) error {
6701-
if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" {
6702-
asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest)
6703-
versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior)
6704-
}
6705-
return invoker(ctx, method, req, reply, cc, opts...)
6706-
}),
6707-
},
6708-
},
6709-
})
6710-
ts.NoError(err)
6711-
defer c.Close()
6712-
6713-
ts.worker.Stop()
6714-
ts.workerStopped = true
6715-
w := worker.New(c, ts.taskQueueName, worker.Options{
6716-
DeploymentOptions: worker.DeploymentOptions{
6717-
UseVersioning: true,
6718-
Version: worker.WorkerDeploymentVersion{
6719-
DeploymentName: seriesName,
6720-
BuildId: "1.0",
6721-
},
6722-
DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
6723-
},
6724-
})
6725-
6726-
w.RegisterWorkflowWithOptions(ts.workflows.Basic, workflow.RegisterOptions{
6727-
VersioningBehavior: workflow.VersioningBehaviorPinned,
6728-
})
6729-
ts.activities.register(w)
6730-
6731-
ts.Nil(w.Start())
6732-
defer w.Stop()
6733-
wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior-per-type")
6734-
ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil))
6735-
6736-
ts.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, versioningBehaviorAll[0])
6737-
for i := 1; i < len(versioningBehaviorAll); i++ {
6738-
ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_PINNED)
6739-
}
6740-
}
6741-
67426593
func (ts *IntegrationTestSuite) TestNoVersioningBehaviorPanics() {
67436594
seriesName := "deploy-test-" + uuid.NewString()
67446595

0 commit comments

Comments
 (0)