Skip to content

Commit 08c0bee

Browse files
authored
test concurrent setCurrent/setRamping requests (temporalio#7512)
## What changed? <!-- Describe what has changed in this PR --> - Added some more tests - [Description: ](https://temporalio.atlassian.net/jira/software/c/projects/OSS/boards/31?assignee=712020:e7840624-ab5e-4cba-b6b6-189c3d835eea&assignee=712020:479e6c4f-7c7d-4472-afdd-5d4ce87fb15a&selectedIssue=OSS-3901) ## Why? <!-- Tell your future self why have you made these changes --> - verifying if the versioning feature can handle concurrent requests - desired behavior: back-to-back setCurrent/ramping requests with different inputs should not throw weird errors, only throttling or timeout, or successful result. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> - Ran this suite locally and CI ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> - No
1 parent 45e53c2 commit 08c0bee

File tree

1 file changed

+200
-31
lines changed

1 file changed

+200
-31
lines changed

tests/worker_deployment_test.go

Lines changed: 200 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -545,37 +545,6 @@ func (s *WorkerDeploymentSuite) TestConflictToken_SetCurrent_SetRamping_Wrong()
545545
s.NotNil(err)
546546
}
547547

548-
func (s *WorkerDeploymentSuite) TestSetCurrent_NoDeploymentVersion() {
549-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
550-
defer cancel()
551-
tv := testvars.New(s)
552-
553-
// Setting version as current version should error since there is no created deployment version
554-
_, err := s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{
555-
Namespace: s.Namespace().String(),
556-
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
557-
Version: tv.DeploymentVersionString(),
558-
IgnoreMissingTaskQueues: true,
559-
})
560-
s.Error(err)
561-
s.Contains(err.Error(), "workflow not found for ID")
562-
}
563-
func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Idempotent() {
564-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
565-
defer cancel()
566-
tv := testvars.New(s)
567-
568-
firstVersion := tv.WithBuildIDNumber(1)
569-
570-
s.startVersionWorkflow(ctx, firstVersion)
571-
572-
// Set first version as current version
573-
s.setCurrentVersion(ctx, firstVersion, worker_versioning.UnversionedVersionId, true, "")
574-
575-
// Set first version as current version again
576-
s.setCurrentVersion(ctx, firstVersion, firstVersion.DeploymentVersionString(), true, "")
577-
}
578-
579548
// Testing ListWorkerDeployments
580549

581550
func (s *WorkerDeploymentSuite) TestListWorkerDeployments_OneVersion_OneDeployment() {
@@ -1123,6 +1092,206 @@ func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Unversioned_PromoteUnversi
11231092
s.verifyTaskQueueVersioningInfo(ctx, currentVars.TaskQueue(), worker_versioning.UnversionedVersionId, "", 0)
11241093
}
11251094

1095+
func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Concurrent_DifferentVersions_NoUnexpectedErrors() {
1096+
s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 10) // this is the default
1097+
1098+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1099+
defer cancel()
1100+
1101+
tv := testvars.New(s)
1102+
errChan := make(chan error)
1103+
1104+
versions := 10
1105+
for i := 0; i < versions; i++ {
1106+
s.startVersionWorkflow(ctx, tv.WithBuildIDNumber(i))
1107+
}
1108+
1109+
// Concurrently set 10 different versions as current version
1110+
for i := 0; i < versions; i++ {
1111+
go func() {
1112+
_, err := s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{
1113+
Namespace: s.Namespace().String(),
1114+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1115+
Version: tv.WithBuildIDNumber(i).DeploymentVersionString(),
1116+
IgnoreMissingTaskQueues: true,
1117+
Identity: tv.ClientIdentity(),
1118+
})
1119+
errChan <- err
1120+
}()
1121+
}
1122+
1123+
for i := 0; i < versions; i++ {
1124+
err := <-errChan
1125+
if err != nil {
1126+
switch err.(type) {
1127+
// DeadlineExceeded and ResourceExhausted are expected errors since there could be more
1128+
// in-flight updates than WorkflowExecutionMaxInFlightUpdates or we could get a timeout error.
1129+
case *serviceerror.DeadlineExceeded, *serviceerror.ResourceExhausted:
1130+
continue
1131+
default:
1132+
s.FailNow("Unexpected error: ", err)
1133+
}
1134+
}
1135+
}
1136+
1137+
// Verify that the current version is set.
1138+
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
1139+
Namespace: s.Namespace().String(),
1140+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1141+
})
1142+
s.NoError(err)
1143+
s.NotEqual(worker_versioning.UnversionedVersionId, resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
1144+
}
1145+
1146+
func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Concurrent_SameVersion_NoUnexpectedErrors() {
1147+
s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 10) // this is the default
1148+
1149+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1150+
defer cancel()
1151+
1152+
tv := testvars.New(s)
1153+
errChan := make(chan error)
1154+
1155+
s.startVersionWorkflow(ctx, tv) // create version
1156+
1157+
// Concurrently set the same version as current version 10 times.
1158+
for i := 0; i < 10; i++ {
1159+
go func() {
1160+
_, err := s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{
1161+
Namespace: s.Namespace().String(),
1162+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1163+
Version: tv.DeploymentVersionString(),
1164+
IgnoreMissingTaskQueues: true,
1165+
Identity: tv.ClientIdentity(),
1166+
})
1167+
errChan <- err
1168+
}()
1169+
}
1170+
1171+
for i := 0; i < 10; i++ {
1172+
err := <-errChan
1173+
if err != nil {
1174+
switch err.(type) {
1175+
// DeadlineExceeded and ResourceExhausted are expected errors since there could be more
1176+
// in-flight updates than WorkflowExecutionMaxInFlightUpdates or we could get a timeout error.
1177+
case *serviceerror.DeadlineExceeded, *serviceerror.ResourceExhausted:
1178+
continue
1179+
default:
1180+
s.FailNow("Unexpected error: ", err)
1181+
}
1182+
}
1183+
}
1184+
1185+
// Verify that the current version is set.
1186+
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
1187+
Namespace: s.Namespace().String(),
1188+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1189+
})
1190+
s.NoError(err)
1191+
s.Equal(tv.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentVersion())
1192+
}
1193+
1194+
func (s *WorkerDeploymentSuite) TestSetRampingVersion_Concurrent_DifferentVersions_NoUnexpectedErrors() {
1195+
s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 10) // this is the default
1196+
1197+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1198+
defer cancel()
1199+
1200+
tv := testvars.New(s)
1201+
errChan := make(chan error)
1202+
1203+
versions := 10
1204+
for i := 0; i < versions; i++ {
1205+
s.startVersionWorkflow(ctx, tv.WithBuildIDNumber(i))
1206+
}
1207+
1208+
// Concurrently set 10 different versions as ramping version
1209+
for i := 0; i < versions; i++ {
1210+
go func() {
1211+
_, err := s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{
1212+
Namespace: s.Namespace().String(),
1213+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1214+
Version: tv.WithBuildIDNumber(i).DeploymentVersionString(),
1215+
IgnoreMissingTaskQueues: true,
1216+
Identity: tv.ClientIdentity(),
1217+
Percentage: 50,
1218+
})
1219+
errChan <- err
1220+
}()
1221+
}
1222+
1223+
for i := 0; i < versions; i++ {
1224+
err := <-errChan
1225+
if err != nil {
1226+
switch err.(type) {
1227+
// DeadlineExceeded and ResourceExhausted are expected errors since there could be more
1228+
// in-flight updates than WorkflowExecutionMaxInFlightUpdates or we could get a timeout error.
1229+
case *serviceerror.DeadlineExceeded, *serviceerror.ResourceExhausted:
1230+
continue
1231+
default:
1232+
s.FailNow("Unexpected error: ", err)
1233+
}
1234+
}
1235+
}
1236+
1237+
// Verify that the ramping version is set.
1238+
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
1239+
Namespace: s.Namespace().String(),
1240+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1241+
})
1242+
s.NoError(err)
1243+
s.NotNil(resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingVersion())
1244+
}
1245+
1246+
func (s *WorkerDeploymentSuite) TestSetRampingVersion_Concurrent_SameVersion_NoUnexpectedErrors() {
1247+
s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 10) // this is the default
1248+
1249+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1250+
defer cancel()
1251+
1252+
tv := testvars.New(s)
1253+
errChan := make(chan error)
1254+
1255+
s.startVersionWorkflow(ctx, tv) // create version
1256+
1257+
// Concurrently set the same version as ramping version 10 times.
1258+
for i := 0; i < 10; i++ {
1259+
go func() {
1260+
_, err := s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{
1261+
Namespace: s.Namespace().String(),
1262+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1263+
Version: tv.DeploymentVersionString(),
1264+
IgnoreMissingTaskQueues: true,
1265+
Identity: tv.ClientIdentity(),
1266+
Percentage: 50,
1267+
})
1268+
errChan <- err
1269+
}()
1270+
}
1271+
1272+
for i := 0; i < 10; i++ {
1273+
err := <-errChan
1274+
if err != nil {
1275+
switch err.(type) {
1276+
// DeadlineExceeded and ResourceExhausted are expected errors since there could be more
1277+
// in-flight updates than WorkflowExecutionMaxInFlightUpdates or we could get a timeout error.
1278+
case *serviceerror.DeadlineExceeded, *serviceerror.ResourceExhausted:
1279+
continue
1280+
default:
1281+
s.FailNow("Unexpected error: ", err)
1282+
}
1283+
}
1284+
}
1285+
1286+
// Verify that the ramping version is set.
1287+
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
1288+
Namespace: s.Namespace().String(),
1289+
DeploymentName: tv.DeploymentVersion().GetDeploymentName(),
1290+
})
1291+
s.NoError(err)
1292+
s.Equal(tv.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingVersion())
1293+
}
1294+
11261295
// Should see it fail because unversioned is already current
11271296
func (s *WorkerDeploymentSuite) TestSetWorkerDeploymentRampingVersion_Unversioned_UnversionedCurrent() {
11281297
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)

0 commit comments

Comments
 (0)