Skip to content

Commit 09a61bf

Browse files
authored
enable state-based replication for some xdc tests (#6958)
## What changed? <!-- Describe what has changed in this PR --> enable state-based replication for some xdc tests ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent 154b40d commit 09a61bf

7 files changed

+221
-55
lines changed

Diff for: tests/xdc/advanced_visibility_test.go

+54-13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"time"
3333

3434
"github.com/pborman/uuid"
35+
"github.com/stretchr/testify/assert"
3536
"github.com/stretchr/testify/require"
3637
"github.com/stretchr/testify/suite"
3738
commandpb "go.temporal.io/api/command/v1"
@@ -46,8 +47,10 @@ import (
4647
"go.temporal.io/api/workflowservice/v1"
4748
"go.temporal.io/server/api/adminservice/v1"
4849
"go.temporal.io/server/common"
50+
"go.temporal.io/server/common/dynamicconfig"
4951
"go.temporal.io/server/common/log"
5052
"go.temporal.io/server/common/log/tag"
53+
"go.temporal.io/server/common/namespace"
5154
"go.temporal.io/server/common/payload"
5255
"go.temporal.io/server/common/payloads"
5356
"go.temporal.io/server/common/searchattribute"
@@ -77,6 +80,9 @@ type AdvVisCrossDCTestSuite struct {
7780
clusterConfigs []*testcore.TestClusterConfig
7881
isElasticsearchEnabled bool
7982

83+
dynamicConfigOverrides map[dynamicconfig.Key]interface{}
84+
enableTransitionHistory bool
85+
8086
testSearchAttributeKey string
8187
testSearchAttributeVal string
8288

@@ -86,7 +92,26 @@ type AdvVisCrossDCTestSuite struct {
8692

8793
func TestAdvVisCrossDCTestSuite(t *testing.T) {
8894
t.Parallel()
89-
suite.Run(t, new(AdvVisCrossDCTestSuite))
95+
for _, tc := range []struct {
96+
name string
97+
enableTransitionHistory bool
98+
}{
99+
{
100+
name: "EnableTransitionHistory",
101+
enableTransitionHistory: true,
102+
},
103+
{
104+
name: "DisableTransitionHistory",
105+
enableTransitionHistory: false,
106+
},
107+
} {
108+
t.Run(tc.name, func(t *testing.T) {
109+
s := &AdvVisCrossDCTestSuite{
110+
enableTransitionHistory: tc.enableTransitionHistory,
111+
}
112+
suite.Run(t, s)
113+
})
114+
}
90115
}
91116

92117
var (
@@ -105,6 +130,10 @@ func (s *AdvVisCrossDCTestSuite) SetupSuite() {
105130
s.logger = log.NewTestLogger()
106131
s.testClusterFactory = testcore.NewTestClusterFactory()
107132

133+
s.dynamicConfigOverrides = map[dynamicconfig.Key]any{
134+
dynamicconfig.EnableTransitionHistory.Key(): s.enableTransitionHistory,
135+
}
136+
108137
var fileName string
109138
if testcore.UsingSQLAdvancedVisibility() {
110139
// NOTE: can't use xdc_clusters.yaml here because it somehow interferes with the other xDC tests.
@@ -130,6 +159,10 @@ func (s *AdvVisCrossDCTestSuite) SetupSuite() {
130159
s.Require().NoError(yaml.Unmarshal(confContent, &clusterConfigs))
131160
s.clusterConfigs = clusterConfigs
132161

162+
for _, config := range clusterConfigs {
163+
config.DynamicConfigOverrides = s.dynamicConfigOverrides
164+
}
165+
133166
c, err := s.testClusterFactory.NewCluster(s.T(), clusterConfigs[0], log.With(s.logger, tag.ClusterName(clusterNameAdvVis[0])))
134167
s.Require().NoError(err)
135168
s.cluster1 = c
@@ -178,10 +211,10 @@ func (s *AdvVisCrossDCTestSuite) TearDownSuite() {
178211
}
179212

180213
func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
181-
namespace := "test-xdc-search-attr-" + common.GenerateRandomString(5)
214+
ns := "test-xdc-search-attr-" + common.GenerateRandomString(5)
182215
client1 := s.cluster1.FrontendClient() // active
183216
regReq := &workflowservice.RegisterNamespaceRequest{
184-
Namespace: namespace,
217+
Namespace: ns,
185218
Clusters: clusterReplicationConfigAdvVis,
186219
ActiveClusterName: clusterNameAdvVis[0],
187220
IsGlobalNamespace: true,
@@ -195,7 +228,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
195228
if !s.isElasticsearchEnabled {
196229
// When Elasticsearch is enabled, the search attribute aliases are not used.
197230
_, err = client1.UpdateNamespace(testcore.NewContext(), &workflowservice.UpdateNamespaceRequest{
198-
Namespace: namespace,
231+
Namespace: ns,
199232
Config: &namespacepb.NamespaceConfig{
200233
CustomSearchAttributeAliases: map[string]string{
201234
"Bool01": "CustomBoolField",
@@ -212,8 +245,16 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
212245
time.Sleep(cacheRefreshInterval) // nolint:forbidigo
213246
}
214247

248+
s.EventuallyWithT(func(t *assert.CollectT) {
249+
// Wait for namespace record to be replicated and loaded into memory.
250+
for _, r := range s.cluster2.Host().FrontendNamespaceRegistries() {
251+
_, err := r.GetNamespace(namespace.Name(ns))
252+
assert.NoError(t, err)
253+
}
254+
}, 15*time.Second, 500*time.Millisecond)
255+
215256
descReq := &workflowservice.DescribeNamespaceRequest{
216-
Namespace: namespace,
257+
Namespace: ns,
217258
}
218259
resp, err := client1.DescribeNamespace(testcore.NewContext(), descReq)
219260
s.NoError(err)
@@ -239,7 +280,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
239280
}
240281
startReq := &workflowservice.StartWorkflowExecutionRequest{
241282
RequestId: uuid.New(),
242-
Namespace: namespace,
283+
Namespace: ns,
243284
WorkflowId: id,
244285
WorkflowType: workflowType,
245286
TaskQueue: taskQueue,
@@ -259,7 +300,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
259300
startFilter := &filterpb.StartTimeFilter{}
260301
startFilter.EarliestTime = timestamppb.New(startTime)
261302
saListRequest := &workflowservice.ListWorkflowExecutionsRequest{
262-
Namespace: namespace,
303+
Namespace: ns,
263304
PageSize: 5,
264305
Query: fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal),
265306
}
@@ -307,7 +348,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
307348

308349
poller := testcore.TaskPoller{
309350
Client: client1,
310-
Namespace: namespace,
351+
Namespace: ns,
311352
TaskQueue: taskQueue,
312353
Identity: identity,
313354
WorkflowTaskHandler: wtHandler,
@@ -354,7 +395,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
354395
}
355396

356397
saListRequest = &workflowservice.ListWorkflowExecutionsRequest{
357-
Namespace: namespace,
398+
Namespace: ns,
358399
PageSize: int32(2),
359400
Query: fmt.Sprintf(`WorkflowId = "%s" and %s = "another string"`, id, s.testSearchAttributeKey),
360401
}
@@ -365,7 +406,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
365406
testListResult(engine2, saListRequest)
366407

367408
runningListRequest := &workflowservice.ListWorkflowExecutionsRequest{
368-
Namespace: namespace,
409+
Namespace: ns,
369410
PageSize: int32(2),
370411
Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running'`, wt),
371412
}
@@ -378,7 +419,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
378419
terminateReason := "force terminate to make sure standby process tasks"
379420
terminateDetails := payloads.EncodeString("terminate details")
380421
_, err = client1.TerminateWorkflowExecution(testcore.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{
381-
Namespace: namespace,
422+
Namespace: ns,
382423
WorkflowExecution: &commonpb.WorkflowExecution{
383424
WorkflowId: id,
384425
},
@@ -390,7 +431,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
390431

391432
// check terminate done
392433
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
393-
Namespace: namespace,
434+
Namespace: ns,
394435
Execution: &commonpb.WorkflowExecution{
395436
WorkflowId: id,
396437
},
@@ -426,7 +467,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
426467
}, waitTimeInMs*numOfRetry*time.Millisecond, waitTimeInMs*time.Millisecond)
427468

428469
terminatedListRequest := &workflowservice.ListWorkflowExecutionsRequest{
429-
Namespace: namespace,
470+
Namespace: ns,
430471
PageSize: int32(2),
431472
Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Terminated'`, wt),
432473
}

Diff for: tests/xdc/base.go

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"go.temporal.io/sdk/converter"
4141
"go.temporal.io/server/api/adminservice/v1"
4242
"go.temporal.io/server/api/historyservice/v1"
43+
"go.temporal.io/server/common"
4344
"go.temporal.io/server/common/cluster"
4445
"go.temporal.io/server/common/dynamicconfig"
4546
"go.temporal.io/server/common/log"
@@ -93,6 +94,10 @@ func (s *xdcBaseSuite) setupSuite(clusterNames []string, opts ...testcore.Option
9394
params := testcore.ApplyTestClusterParams(opts)
9495

9596
s.clusterNames = clusterNames
97+
for idx, clusterName := range s.clusterNames {
98+
s.clusterNames[idx] = clusterName + "_" + common.GenerateRandomString(5)
99+
}
100+
96101
if s.logger == nil {
97102
s.logger = log.NewTestLogger()
98103
}

Diff for: tests/xdc/failover_test.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,25 @@ type (
7070

7171
func TestFuncClustersTestSuite(t *testing.T) {
7272
t.Parallel()
73-
suite.Run(t, new(FunctionalClustersTestSuite))
73+
for _, tc := range []struct {
74+
name string
75+
enableTransitionHistory bool
76+
}{
77+
{
78+
name: "EnableTransitionHistory",
79+
enableTransitionHistory: true,
80+
},
81+
{
82+
name: "DisableTransitionHistory",
83+
enableTransitionHistory: false,
84+
},
85+
} {
86+
t.Run(tc.name, func(t *testing.T) {
87+
s := &FunctionalClustersTestSuite{}
88+
s.enableTransitionHistory = tc.enableTransitionHistory
89+
suite.Run(t, s)
90+
})
91+
}
7492
}
7593

7694
func (s *FunctionalClustersTestSuite) SetupSuite() {
@@ -480,11 +498,11 @@ func (s *FunctionalClustersTestSuite) TestStickyWorkflowTaskFailover() {
480498
client2 := s.cluster2.FrontendClient() // standby
481499

482500
// Start a workflow
483-
id := "functional-sticky-workflow-task-workflow-failover-test"
484-
wt := "functional-sticky-workflow-task-workflow-failover-test-type"
485-
tq := "functional-sticky-workflow-task-workflow-failover-test-taskqueue"
486-
stq1 := "functional-sticky-workflow-task-workflow-failover-test-taskqueue-sticky1"
487-
stq2 := "functional-sticky-workflow-task-workflow-failover-test-taskqueue-sticky2"
501+
id := "functional-sticky-workflow-task-workflow-failover-test-" + "TransitionHistory" + strconv.FormatBool(s.enableTransitionHistory)
502+
wt := id + "-type"
503+
tq := id + "-taskqueue"
504+
stq1 := id + "-taskqueue-sticky1"
505+
stq2 := id + "-taskqueue-sticky2"
488506
identity1 := "worker1"
489507
identity2 := "worker2"
490508

0 commit comments

Comments
 (0)