Skip to content

Commit 4c813e7

Browse files
authored
Add capability to have multiple domains in replication simulation (#6923)
* Add capability to have multiple domains in replication simulation
1 parent 352e8d4 commit 4c813e7

7 files changed

+111
-66
lines changed

docker/buildkite/docker-compose-local-replication-simulation.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ services:
44
environment:
55
- "MAX_HEAP_SIZE=256M"
66
- "HEAP_NEWSIZE=128M"
7-
expose:
8-
- "9042"
7+
ports:
8+
- "9042:9042"
99
networks:
1010
services-network:
1111
aliases:

simulation/replication/replication_simulation_test.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ func TestReplicationSimulation(t *testing.T) {
6767
simCfg.MustInitClientsFor(t, clusterName)
6868
}
6969

70-
simCfg.MustRegisterDomain(t)
70+
simTypes.Logf(t, "Registering domains")
71+
for domainName := range simCfg.Domains {
72+
simTypes.Logf(t, "Domain: %s", domainName)
73+
simCfg.MustRegisterDomain(t, domainName)
74+
}
7175

7276
// wait for domain data to be replicated and workers to start.
7377
waitUntilWorkersReady(t)
@@ -115,14 +119,14 @@ func startWorkflow(
115119
) error {
116120
t.Helper()
117121

118-
simTypes.Logf(t, "Starting workflow: %s on cluster: %s", op.WorkflowID, op.Cluster)
122+
simTypes.Logf(t, "Starting workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)
119123

120124
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
121125
defer cancel()
122126
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).StartWorkflowExecution(ctx,
123127
&types.StartWorkflowExecutionRequest{
124128
RequestID: uuid.New(),
125-
Domain: simCfg.Domain.Name,
129+
Domain: op.Domain,
126130
WorkflowID: op.WorkflowID,
127131
WorkflowType: &types.WorkflowType{Name: simTypes.WorkflowName},
128132
TaskList: &types.TaskList{Name: simTypes.TasklistName},
@@ -135,7 +139,7 @@ func startWorkflow(
135139
return err
136140
}
137141

138-
simTypes.Logf(t, "Started workflow: %s on cluster: %s. RunID: %s", op.WorkflowID, op.Cluster, resp.GetRunID())
142+
simTypes.Logf(t, "Started workflow: %s on domain: %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, resp.GetRunID())
139143

140144
return nil
141145
}
@@ -147,24 +151,24 @@ func changeActiveClusters(
147151
) error {
148152
t.Helper()
149153

150-
simTypes.Logf(t, "Changing active clusters to: %v", op.NewActiveClusters)
154+
simTypes.Logf(t, "Changing active clusters for domain %s to: %v", op.Domain, op.NewActiveClusters)
151155

152156
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
153157
defer cancel()
154-
descResp, err := simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).DescribeDomain(ctx, &types.DescribeDomainRequest{Name: common.StringPtr(simCfg.Domain.Name)})
158+
descResp, err := simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).DescribeDomain(ctx, &types.DescribeDomainRequest{Name: common.StringPtr(op.Domain)})
155159
if err != nil {
156-
return fmt.Errorf("failed to describe domain %s: %w", simCfg.Domain.Name, err)
160+
return fmt.Errorf("failed to describe domain %s: %w", op.Domain, err)
157161
}
158162

159-
if !simCfg.IsActiveActiveDomain() {
163+
if !simCfg.IsActiveActiveDomain(op.Domain) {
160164
fromCluster := descResp.ReplicationConfiguration.ActiveClusterName
161165
toCluster := op.NewActiveClusters[0]
162166

163167
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
164168
defer cancel()
165169
_, err = simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).UpdateDomain(ctx,
166170
&types.UpdateDomainRequest{
167-
Name: simCfg.Domain.Name,
171+
Name: op.Domain,
168172
ActiveClusterName: &toCluster,
169173
FailoverTimeoutInSeconds: op.FailoverTimeout,
170174
})
@@ -196,7 +200,7 @@ func validate(
196200
defer cancel()
197201
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).DescribeWorkflowExecution(ctx,
198202
&types.DescribeWorkflowExecutionRequest{
199-
Domain: simCfg.Domain.Name,
203+
Domain: op.Domain,
200204
Execution: &types.WorkflowExecution{
201205
WorkflowID: op.WorkflowID,
202206
},
@@ -214,7 +218,7 @@ func validate(
214218

215219
// Get history to validate the worker identity that started and completed the workflow
216220
// Some workflows start in cluster0 and complete in cluster1. This is to validate that
217-
history, err := getAllHistory(t, simCfg, op.Cluster, op.WorkflowID)
221+
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID)
218222
if err != nil {
219223
return err
220224
}
@@ -227,17 +231,17 @@ func validate(
227231
if err != nil {
228232
return err
229233
}
230-
if op.Want.StartedByWorkersInCluster != "" && startedWorker != simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster) {
231-
return fmt.Errorf("workflow %s started by worker %s, expected %s", op.WorkflowID, startedWorker, simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster))
234+
if op.Want.StartedByWorkersInCluster != "" && startedWorker != simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster, op.Domain) {
235+
return fmt.Errorf("workflow %s started by worker %s, expected %s", op.WorkflowID, startedWorker, simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster, op.Domain))
232236
}
233237

234238
completedWorker, err := lastDecisionTaskWorker(history)
235239
if err != nil {
236240
return err
237241
}
238242

239-
if op.Want.CompletedByWorkersInCluster != "" && completedWorker != simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster) {
240-
return fmt.Errorf("workflow %s completed by worker %s, expected %s", op.WorkflowID, completedWorker, simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster))
243+
if op.Want.CompletedByWorkersInCluster != "" && completedWorker != simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster, op.Domain) {
244+
return fmt.Errorf("workflow %s completed by worker %s, expected %s", op.WorkflowID, completedWorker, simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster, op.Domain))
241245
}
242246

243247
return nil
@@ -273,14 +277,14 @@ func waitForOpTime(t *testing.T, op *simTypes.Operation, startTime time.Time) {
273277
simTypes.Logf(t, "Operation time (t + %ds) reached: %v", int(op.At.Seconds()), startTime.Add(op.At))
274278
}
275279

276-
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, wfID string) ([]types.HistoryEvent, error) {
280+
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID string) ([]types.HistoryEvent, error) {
277281
frontendCl := simCfg.MustGetFrontendClient(t, clusterName)
278282
var nextPageToken []byte
279283
var history []types.HistoryEvent
280284
for {
281285
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
282286
response, err := frontendCl.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
283-
Domain: simCfg.Domain.Name,
287+
Domain: domainName,
284288
Execution: &types.WorkflowExecution{
285289
WorkflowID: wfID,
286290
},

simulation/replication/testdata/replication_simulation_activeactive.yaml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,33 @@ clusters:
1111
# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
1212
primaryCluster: "cluster0"
1313

14-
domain:
15-
name: test-domain-aa
16-
activeClusters:
17-
- cluster0
18-
- cluster1
14+
domains:
15+
test-domain-aa:
16+
name: test-domain-aa
17+
activeClusters:
18+
- cluster0
19+
- cluster1
1920

2021
operations:
2122
- op: start_workflow
2223
at: 0s
2324
workflowID: wf1
2425
cluster: cluster0
26+
domain: test-domain-aa
2527
workflowDuration: 60s
2628

2729
- op: start_workflow
2830
at: 0s
2931
workflowID: wf2
3032
cluster: cluster1
33+
domain: test-domain-aa
3134
workflowDuration: 60s
3235

3336
- op: validate
3437
at: 70s
3538
workflowID: wf1
3639
cluster: cluster0
40+
domain: test-domain-aa
3741
want:
3842
status: completed
3943
startedByWorkersInCluster: cluster0
@@ -43,6 +47,7 @@ operations:
4347
at: 70s
4448
workflowID: wf2
4549
cluster: cluster1
50+
domain: test-domain-aa
4651
want:
4752
status: completed
4853
startedByWorkersInCluster: cluster1

simulation/replication/testdata/replication_simulation_default.yaml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,31 @@ clusters:
1111
primaryCluster: "cluster0"
1212

1313

14-
domain:
15-
name: test-domain
16-
activeClusters:
17-
- cluster0
14+
domains:
15+
test-domain:
16+
name: test-domain
17+
activeClusters:
18+
- cluster0
1819

1920
operations:
2021
- op: start_workflow
2122
at: 0s
2223
workflowID: wf1
2324
cluster: cluster0
25+
domain: test-domain
2426
workflowDuration: 35s
2527

2628
- op: change_active_clusters # failover from cluster0 to cluster1
2729
at: 20s
30+
domain: test-domain
2831
newActiveClusters: ["cluster1"]
2932
# failoverTimeoutSec: 5 # unset means force failover. setting it means graceful failover request
3033

3134
- op: validate
3235
at: 120s # todo: this should work at 40s mark
3336
workflowID: wf1
3437
cluster: cluster1
38+
domain: test-domain
3539
want:
3640
status: completed
3741
startedByWorkersInCluster: cluster0

simulation/replication/types/repl_sim_config.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ReplicationSimulationConfig struct {
5757
// PrimaryCluster is used for domain registration
5858
PrimaryCluster string `yaml:"primaryCluster"`
5959

60-
Domain ReplicationDomainConfig `yaml:"domain"`
60+
Domains map[string]ReplicationDomainConfig `yaml:"domains"`
6161

6262
Operations []*Operation `yaml:"operations"`
6363
}
@@ -79,6 +79,7 @@ type Operation struct {
7979
WorkflowID string `yaml:"workflowID"`
8080
WorkflowDuration time.Duration `yaml:"workflowDuration"`
8181

82+
Domain string `yaml:"domain"`
8283
NewActiveClusters []string `yaml:"newActiveClusters"`
8384
FailoverTimeout *int32 `yaml:"failoverTimeoutSec"`
8485

@@ -155,12 +156,12 @@ func (s *ReplicationSimulationConfig) MustInitClientsFor(t *testing.T, clusterNa
155156
Logf(t, "Initialized clients for cluster %s", clusterName)
156157
}
157158

158-
func (s *ReplicationSimulationConfig) IsActiveActiveDomain() bool {
159-
return len(s.Domain.ActiveClusters) > 1
159+
func (s *ReplicationSimulationConfig) IsActiveActiveDomain(domainName string) bool {
160+
return len(s.Domains[domainName].ActiveClusters) > 1
160161
}
161162

162-
func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
163-
Logf(t, "Registering domain: %s", s.Domain.Name)
163+
func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T, domainName string) {
164+
Logf(t, "Registering domain: %s", domainName)
164165
var clusters []*types.ClusterReplicationConfiguration
165166
for name := range s.Clusters {
166167
clusters = append(clusters, &types.ClusterReplicationConfiguration{
@@ -170,7 +171,7 @@ func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
170171
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
171172
defer cancel()
172173
err := s.MustGetFrontendClient(t, s.PrimaryCluster).RegisterDomain(ctx, &types.RegisterDomainRequest{
173-
Name: s.Domain.Name,
174+
Name: domainName,
174175
Clusters: clusters,
175176
WorkflowExecutionRetentionPeriodInDays: 1,
176177
IsGlobalDomain: true,
@@ -183,10 +184,10 @@ func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
183184
if _, ok := err.(*shared.DomainAlreadyExistsError); !ok {
184185
require.NoError(t, err, "failed to register domain")
185186
} else {
186-
Logf(t, "Domain already exists: %s", s.Domain.Name)
187+
Logf(t, "Domains already exists: %s", domainName)
187188
}
188189
return
189190
}
190191

191-
Logf(t, "Registered domain: %s", s.Domain.Name)
192+
Logf(t, "Registered domain: %s", domainName)
192193
}

simulation/replication/types/types.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,11 @@ type WorkflowOutput struct {
4343
Count int
4444
}
4545

46-
func WorkerIdentityFor(clusterName string) string {
47-
return fmt.Sprintf("worker-%s", clusterName)
46+
func WorkerIdentityFor(clusterName string, domainName string) string {
47+
if domainName == "" {
48+
return fmt.Sprintf("worker-%s", clusterName)
49+
}
50+
return fmt.Sprintf("worker-%s-%s", domainName, clusterName)
4851
}
4952

5053
func Logf(t *testing.T, msg string, args ...interface{}) {

0 commit comments

Comments
 (0)