Skip to content

Commit 2f3beb0

Browse files
author
taylanisikdemir
authored
[active-active] Fix failover version increment logic (cadence-workflow#7246)
<!-- Describe what has changed in this PR --> **What changed?** Active-active domain failovers (changing the region to cluster map) increment the failover version of the updated entries. Rest of the entries are untouched. This causes a problem when an ongoing workflow is attempted to be started after failover. Example scenario: - Domain's initial state ``` ActiveClustersByRegion: [ phx: {staging_phx 2} dca: {staging_dca 0} ] ``` - A workflow with id `cron.phx` is running with version 2. It's active on staging_phx cluster. - Domain is failed over from PHX to DCA by operator/automation. The map in DB now looks like this: ``` ActiveClustersByRegion: [ phx: {staging_dca 10} # version is incremented from 2 to 10. dca: {staging_dca 0} ] ``` - A `StartWorkflow(cron.phx)` request is made by a client. - Call arrives to PHX frontend. - It checks the `ActiveClustersByRegion` of the domain and decides to forward to DCA frontend. - DCA frontend receives the request and makes corresponding request to DCA history. - The history engine responsible from `cron.phx` shard processes the request. - It gets workflow already started error and checks the new mutable state version with the existing one in DB. - Existing mutable state in DB (which was replicated from PHX cluster) has version 2. - New mutable state in memory has version 0 which is the version DCA uses. - New version is less than previous version so it returns domain not active error [ref](https://github.com/cadence-workflow/cadence/blob/147489a7e507a04eade6594854234396daebcd8f/service/history/engine/engineimpl/start_workflow_execution.go#L253) The problem is caused by multiple entries in the `ActiveClustersByRegion` map pointing to the same cluster but having different versions. One way to prevent getting into this state is to increment failover versions of all entries pointing to the same cluster when updating ActiveClustersByRegion. Before: ``` ActiveClustersByRegion: [ phx: {staging_phx 2} dca: {staging_dca 0} ] ``` After: ``` ActiveClustersByRegion: [ phx: {staging_dca 10} # incremented from 2 to 10 so it points to dca cluster dca: {staging_dca 10} # incremented from 0 to 10 so it's greater than or equal to the entry that was updated (above) ] ``` **Validation Change** Another change in this PR is to restrict what kind of updates are allowed for `ActiveClustersByRegion` map. It doesn't make sense to support multiple hops and we should prevent cycles. For example below map contains multiple hops and will not be allowed. ``` ActiveClustersByRegion: [ phx: {staging_dca} dca: {staging_klm} klm: {staging_klm} ] ``` <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** - unit tests - new simulation
1 parent d6c7517 commit 2f3beb0

9 files changed

+252
-6
lines changed

.github/workflows/replication-simulation.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ jobs:
1616
- activeactive
1717
- activeactive_cron
1818
- activeactive_regional_failover
19+
- activeactive_regional_failover_start_same_wfid
1920
- activepassive_to_activeactive
2021
- clusterredirection
2122
- default

common/domain/attrValidator.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package domain
2222

2323
import (
2424
"fmt"
25+
"strings"
2526

2627
"github.com/uber/cadence/common/cluster"
2728
"github.com/uber/cadence/common/persistence"
@@ -117,6 +118,7 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
117118
}
118119

119120
if replicationConfig.IsActiveActive() {
121+
// validate cluster names and check whether they exist
120122
for _, cluster := range activeClusters.ActiveClustersByRegion {
121123
if err := d.validateClusterName(cluster.ActiveClusterName); err != nil {
122124
return err
@@ -126,6 +128,12 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
126128
return errActiveClusterNotInClusters
127129
}
128130
}
131+
132+
// check region mappings are valid
133+
err := d.checkActiveClusterRegionMappings(activeClusters)
134+
if err != nil {
135+
return err
136+
}
129137
} else {
130138
if err := d.validateClusterName(activeCluster); err != nil {
131139
return err
@@ -177,3 +185,40 @@ func (d *AttrValidatorImpl) validateClusterName(
177185
}
178186
return nil
179187
}
188+
189+
// checkActiveClusterRegionMappings validates:
190+
// 1. There's no cycle in region dependencies.
191+
// e.g. Following not allowed: region0 maps to a cluster in region1, and region1 maps to a cluster in region0.
192+
// 2. There's at most one hop in the region dependency chain.
193+
// e.g. Following not allowed: region0 maps to a cluster in region1, and region1 maps to a cluster in region2
194+
func (d *AttrValidatorImpl) checkActiveClusterRegionMappings(activeClusters *types.ActiveClusters) error {
195+
inbounds := make(map[string][]string)
196+
outbounds := make(map[string]string)
197+
allClusters := d.clusterMetadata.GetAllClusterInfo()
198+
for fromRegion, cluster := range activeClusters.ActiveClustersByRegion {
199+
clusterInfo, ok := allClusters[cluster.ActiveClusterName]
200+
if !ok {
201+
return &types.BadRequestError{Message: fmt.Sprintf("Cluster %v not found", cluster.ActiveClusterName)}
202+
}
203+
204+
toRegion := clusterInfo.Region
205+
if fromRegion == toRegion {
206+
continue
207+
}
208+
209+
inbounds[toRegion] = append(inbounds[toRegion], fromRegion)
210+
outbounds[fromRegion] = toRegion
211+
}
212+
213+
// The entries that point to a cluster in the same region is omitted in inbounds and outbounds
214+
// So if a region X is in inbounds it means a cluster in X region is used by other region(s).
215+
// Region X must not be in outbounds. (allow at most one hop rule)
216+
// Validating this also ensures that there's no cycle in region dependencies.
217+
for toRegion := range inbounds {
218+
if _, ok := outbounds[toRegion]; ok {
219+
return &types.BadRequestError{Message: "Region " + toRegion + " cannot map to a cluster in another region because it is used as target region by other regions: " + strings.Join(inbounds[toRegion], ", ")}
220+
}
221+
}
222+
223+
return nil
224+
}

common/domain/attrValidator_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ package domain
2323
import (
2424
"testing"
2525

26+
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/suite"
2728

2829
"github.com/uber/cadence/common/cluster"
30+
"github.com/uber/cadence/common/config"
31+
"github.com/uber/cadence/common/log"
32+
"github.com/uber/cadence/common/metrics"
2933
"github.com/uber/cadence/common/persistence"
3034
"github.com/uber/cadence/common/types"
3135
)
@@ -284,3 +288,89 @@ func (s *attrValidatorSuite) TestValidateDomainReplicationConfigClustersDoesNotR
284288
)
285289
s.IsType(&types.BadRequestError{}, err)
286290
}
291+
292+
func TestCheckActiveClusterRegionMappings(t *testing.T) {
293+
clusterMetadata := cluster.NewMetadata(
294+
config.ClusterGroupMetadata{
295+
ClusterGroup: map[string]config.ClusterInformation{
296+
"A1": {
297+
Region: "A",
298+
},
299+
"A2": {
300+
Region: "A",
301+
},
302+
"B1": {
303+
Region: "B",
304+
},
305+
"B2": {
306+
Region: "B",
307+
},
308+
"C1": {
309+
Region: "C",
310+
},
311+
"C2": {
312+
Region: "C",
313+
},
314+
},
315+
},
316+
func(d string) bool { return false },
317+
metrics.NewNoopMetricsClient(),
318+
log.NewNoop(),
319+
)
320+
321+
tests := []struct {
322+
desc string
323+
activeClusters *types.ActiveClusters
324+
wantErr bool
325+
}{
326+
{
327+
desc: "non-existing cluster",
328+
activeClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
329+
"D": {ActiveClusterName: "D1"},
330+
}},
331+
wantErr: true,
332+
},
333+
{
334+
desc: "no cycle. every region is mapped to a local cluster",
335+
activeClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
336+
"A": {ActiveClusterName: "A1"},
337+
"B": {ActiveClusterName: "B1"},
338+
"C": {ActiveClusterName: "C1"},
339+
}},
340+
wantErr: false,
341+
},
342+
{
343+
desc: "no cycle. A and C failed over to B",
344+
activeClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
345+
"A": {ActiveClusterName: "B1"},
346+
"B": {ActiveClusterName: "B1"},
347+
"C": {ActiveClusterName: "B1"},
348+
}},
349+
wantErr: false,
350+
},
351+
{
352+
desc: "cycle. A -> B -> C -> A",
353+
activeClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
354+
"A": {ActiveClusterName: "B2"},
355+
"B": {ActiveClusterName: "C2"},
356+
"C": {ActiveClusterName: "A2"},
357+
}},
358+
wantErr: true,
359+
},
360+
{
361+
desc: "no cycle but more than one hop. A -> B -> C",
362+
activeClusters: &types.ActiveClusters{ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
363+
"A": {ActiveClusterName: "B2"},
364+
"B": {ActiveClusterName: "C2"},
365+
"C": {ActiveClusterName: "C1"},
366+
}},
367+
wantErr: true,
368+
},
369+
}
370+
for _, tc := range tests {
371+
validator := newAttrValidator(clusterMetadata, int32(1))
372+
err := validator.checkActiveClusterRegionMappings(tc.activeClusters)
373+
assert.Equal(t, tc.wantErr, err != nil)
374+
}
375+
376+
}

common/domain/handler.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,10 @@ func (d *handlerImpl) updateReplicationConfig(
13841384
d.logger.Debugf("Setting activeCluster for region %v to %v. no update case, just copy the existing active cluster", region, activeCluster)
13851385
}
13861386
}
1387+
1388+
// adjust failover versions so that same cluster in different regions have same failover versions
1389+
d.adjustFailoverVersions(finalActiveClusters)
1390+
13871391
config.ActiveClusters = &types.ActiveClusters{
13881392
ActiveClustersByRegion: finalActiveClusters,
13891393
}
@@ -1394,6 +1398,22 @@ func (d *handlerImpl) updateReplicationConfig(
13941398
return config, clusterUpdated, activeClusterUpdated, nil
13951399
}
13961400

1401+
func (d *handlerImpl) adjustFailoverVersions(activeClusters map[string]types.ActiveClusterInfo) {
1402+
clusterToRegions := make(map[string][]string)
1403+
clusterMaxFailoverVersion := make(map[string]int64)
1404+
for region, activeCluster := range activeClusters {
1405+
clusterToRegions[activeCluster.ActiveClusterName] = append(clusterToRegions[activeCluster.ActiveClusterName], region)
1406+
clusterMaxFailoverVersion[activeCluster.ActiveClusterName] = max(clusterMaxFailoverVersion[activeCluster.ActiveClusterName], activeCluster.FailoverVersion)
1407+
}
1408+
for cluster, regions := range clusterToRegions {
1409+
for _, region := range regions {
1410+
activeCluster := activeClusters[region]
1411+
activeCluster.FailoverVersion = clusterMaxFailoverVersion[cluster]
1412+
activeClusters[region] = activeCluster
1413+
}
1414+
}
1415+
}
1416+
13971417
func (d *handlerImpl) handleGracefulFailover(
13981418
updateRequest *types.UpdateDomainRequest,
13991419
replicationConfig *persistence.DomainReplicationConfig,

common/domain/handler_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1722,7 +1722,8 @@ func TestHandler_UpdateDomain(t *testing.T) {
17221722
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
17231723
cluster.TestRegion1: {
17241724
ActiveClusterName: cluster.TestCurrentClusterName,
1725-
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
1725+
// This is incremented to match below.
1726+
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion + cluster.TestFailoverVersionIncrement,
17261727
},
17271728
cluster.TestRegion2: {
17281729
ActiveClusterName: cluster.TestCurrentClusterName,
@@ -1786,7 +1787,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
17861787
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
17871788
cluster.TestRegion1: {
17881789
ActiveClusterName: cluster.TestCurrentClusterName,
1789-
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
1790+
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion + cluster.TestFailoverVersionIncrement,
17901791
},
17911792
cluster.TestRegion2: {
17921793
ActiveClusterName: cluster.TestCurrentClusterName,
@@ -1822,7 +1823,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
18221823
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
18231824
cluster.TestRegion1: {
18241825
ActiveClusterName: cluster.TestCurrentClusterName,
1825-
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
1826+
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion + cluster.TestFailoverVersionIncrement,
18261827
},
18271828
cluster.TestRegion2: {
18281829
ActiveClusterName: cluster.TestCurrentClusterName,
@@ -2885,7 +2886,7 @@ func TestUpdateReplicationConfig(t *testing.T) {
28852886
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
28862887
cluster.TestRegion1: {
28872888
ActiveClusterName: cluster.TestCurrentClusterName,
2888-
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
2889+
FailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion + cluster.TestFailoverVersionIncrement,
28892890
},
28902891
cluster.TestRegion2: {
28912892
ActiveClusterName: cluster.TestCurrentClusterName,
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# This file is used as dynamicconfig override for "activeactive_regional_failover_start_same_wfid" replication simulation scenario configured via simulation/replication/testdata/replication_simulation_activeactive_regional_failover_start_same_wfid.yaml
2+
system.writeVisibilityStoreName:
3+
- value: "db"
4+
system.readVisibilityStoreName:
5+
- value: "db"
6+
history.replicatorTaskBatchSize:
7+
- value: 25
8+
constraints: {}
9+
frontend.failoverCoolDown:
10+
- value: 5s
11+
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
12+
- value: 10ms
13+
history.standbyTaskMissingEventsResendDelay:
14+
- value: 5s
15+
history.standbyTaskMissingEventsDiscardDelay:
16+
- value: 10s
17+
history.standbyClusterDelay:
18+
- value: 10s
19+
history.enableTransferQueueV2:
20+
- value: true
21+
history.enableTimerQueueV2:
22+
- value: true

simulation/replication/replication_simulation_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ func startWorkflow(
153153
})
154154

155155
if err != nil {
156+
if op.Want.Error != "" {
157+
if strings.Contains(err.Error(), op.Want.Error) {
158+
simTypes.Logf(t, "Start workflow got expected error: %s on domain: %s on cluster: %s. Error: %s", op.WorkflowID, op.Domain, op.Cluster, err.Error())
159+
return nil
160+
}
161+
return fmt.Errorf("expected error: %s, but got: %s", op.Want.Error, err.Error())
162+
}
156163
return err
157164
}
158165

simulation/replication/testdata/replication_simulation_activeactive_regional_failover.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ operations:
4545
region0: cluster1 # this is changed from cluster0 to cluster1
4646
region1: cluster1
4747

48-
# Start wf2 on cluster0 at the same time as failover. It will be started by cluster0 workers and completed by cluster1 workers.
48+
# Start wf2 on cluster0 right before failover. It will be started by cluster0 workers and completed by cluster1 workers.
4949
- op: start_workflow
50-
at: 10s
50+
at: 9s
5151
workflowID: wf2
5252
workflowType: timer-activity-loop-workflow
5353
cluster: cluster0
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# This file is a replication simulation scenario spec.
2+
# It is parsed into ReplicationSimulationConfig struct.
3+
# Replication simulation for this file can be run via ./simulation/replication/run.sh activeactive_regional_failover_start_same_wfid
4+
# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_regional_failover_start_same_wfid.yml
5+
clusters:
6+
cluster0:
7+
grpcEndpoint: "cadence-cluster0:7833"
8+
cluster1:
9+
grpcEndpoint: "cadence-cluster1:7833"
10+
11+
# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
12+
primaryCluster: "cluster0"
13+
14+
domains:
15+
test-domain-aa:
16+
activeClustersByRegion:
17+
region0: cluster0
18+
region1: cluster1
19+
20+
operations:
21+
# Start wf1 on cluster1 before failover.
22+
- op: start_workflow
23+
at: 0s
24+
workflowID: wf1
25+
workflowType: timer-activity-loop-workflow
26+
cluster: cluster1
27+
domain: test-domain-aa
28+
workflowExecutionStartToCloseTimeout: 70s
29+
workflowDuration: 60s
30+
31+
32+
# Failover from cluster1 to cluster0
33+
- op: change_active_clusters
34+
at: 10s
35+
domain: test-domain-aa
36+
newActiveClustersByRegion:
37+
region1: cluster0
38+
39+
# Attempt to start wf1 on cluster1 again. It will be forwarded to cluster0 and cluster0 should reject with error "Workflow execution is already running".
40+
- op: start_workflow
41+
at: 30s
42+
workflowID: wf1
43+
workflowType: timer-activity-loop-workflow
44+
cluster: cluster1
45+
domain: test-domain-aa
46+
workflowExecutionStartToCloseTimeout: 70s
47+
workflowDuration: 60s
48+
want:
49+
error: "Workflow execution is already running"
50+
51+
# Validate that wf1 is completed in cluster0.
52+
- op: validate
53+
at: 80s
54+
workflowID: wf1
55+
cluster: cluster1
56+
domain: test-domain-aa
57+
want:
58+
status: completed
59+
startedByWorkersInCluster: cluster1
60+
completedByWorkersInCluster: cluster0

0 commit comments

Comments
 (0)