Skip to content

Commit 9242115

Browse files
Active-active domain support - Part 1/N (#6799)
1 parent 6ab886b commit 9242115

32 files changed

+1445
-157
lines changed

cmd/server/cadence/server.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,7 @@ func (s *server) startService() common.Daemon {
221221
params.GetIsolationGroups = getFromDynamicConfig(params, dc)
222222

223223
params.ClusterMetadata = cluster.NewMetadata(
224-
clusterGroupMetadata.FailoverVersionIncrement,
225-
clusterGroupMetadata.PrimaryClusterName,
226-
clusterGroupMetadata.CurrentClusterName,
227-
clusterGroupMetadata.ClusterGroup,
224+
*clusterGroupMetadata,
228225
dc.GetBoolPropertyFilteredByDomain(dynamicproperties.UseNewInitialFailoverVersion),
229226
params.MetricsClient,
230227
params.Logger,

common/activecluster/fake.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"sync/atomic"
27+
"time"
28+
)
29+
30+
var (
31+
wf1FailoverTime = 60 * time.Second
32+
33+
cluster0Result = &LookupResult{
34+
Region: "region0",
35+
ClusterName: "cluster0",
36+
FailoverVersion: 1,
37+
}
38+
39+
cluster1Result = &LookupResult{
40+
Region: "region1",
41+
ClusterName: "cluster1",
42+
FailoverVersion: 2,
43+
}
44+
)
45+
46+
// TODO(active-active): Remove below fake implementation and implement properly
47+
// - lookup active region given <domain id, wf id, run id> from executions table RowType=ActiveCluster.
48+
// - cache this info
49+
// - add metrics for cache hit/miss
50+
// - return cluster name
51+
52+
// Fake logic:
53+
// - wf1 is active in cluster0 for first 60 seconds, then active in cluster1.
54+
// Note: Simulation sleeps for 30s in the beginning and runs wf1 for 60s. So wf1 should start in cluster0 and complete in cluster1.
55+
// - other workflows are always active in cluster1
56+
func (m *manager) fakeLookupWorkflow(wfID string) (*LookupResult, error) {
57+
if wfID == "wf1" && (m.wf1StartTime.IsZero() || atomic.LoadInt32(&m.wf1FailedOver) == 0) {
58+
if m.wf1StartTime.IsZero() {
59+
m.logger.Debug("Initializing wf1 failover timer")
60+
m.wf1StartTime = time.Now()
61+
go m.fakeEntityMapChange()
62+
}
63+
m.logger.Debug("Returning cluster0 for wf1")
64+
return cluster0Result, nil
65+
}
66+
67+
if wfID == "wf1" {
68+
m.logger.Debug("Returning cluster1 for wf1")
69+
}
70+
71+
return cluster1Result, nil
72+
}
73+
74+
func (m *manager) fakeEntityMapChange() {
75+
// Based on the fake logic, wf1 will failover to cluster1 after 60 seconds.
76+
t := time.NewTimer(wf1FailoverTime)
77+
defer t.Stop()
78+
for {
79+
select {
80+
case <-t.C:
81+
m.logger.Debugf("Faking wf1 failover")
82+
atomic.CompareAndSwapInt32(&m.wf1FailedOver, 0, 1)
83+
m.notifyChangeCallbacks(ChangeTypeEntityMap)
84+
case <-m.ctx.Done():
85+
return
86+
}
87+
}
88+
}

common/activecluster/manager.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"context"
27+
"errors"
28+
"fmt"
29+
"sync"
30+
"time"
31+
32+
"github.com/uber/cadence/common/cache"
33+
"github.com/uber/cadence/common/cluster"
34+
"github.com/uber/cadence/common/log"
35+
"github.com/uber/cadence/common/log/tag"
36+
"github.com/uber/cadence/common/metrics"
37+
"github.com/uber/cadence/common/types"
38+
)
39+
40+
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
41+
42+
type manager struct {
43+
domainIDToDomainFn DomainIDToDomainFn
44+
clusterMetadata cluster.Metadata
45+
metricsCl metrics.Client
46+
logger log.Logger
47+
ctx context.Context
48+
cancel context.CancelFunc
49+
50+
// TODO: fakes to be remove
51+
wf1StartTime time.Time
52+
wf1FailedOver int32
53+
54+
changeCallbacksLock sync.RWMutex
55+
changeCallbacks map[int]func(ChangeType)
56+
}
57+
58+
func NewManager(
59+
domainIDToDomainFn DomainIDToDomainFn,
60+
clusterMetadata cluster.Metadata,
61+
metricsCl metrics.Client,
62+
logger log.Logger,
63+
) Manager {
64+
ctx, cancel := context.WithCancel(context.Background())
65+
return &manager{
66+
domainIDToDomainFn: domainIDToDomainFn,
67+
clusterMetadata: clusterMetadata,
68+
metricsCl: metricsCl,
69+
logger: logger.WithTags(tag.ComponentActiveClusterManager),
70+
ctx: ctx,
71+
cancel: cancel,
72+
changeCallbacks: make(map[int]func(ChangeType)),
73+
}
74+
}
75+
76+
func (m *manager) Start() {
77+
}
78+
79+
func (m *manager) Stop() {
80+
m.cancel()
81+
}
82+
83+
func (m *manager) LookupExternalEntity(ctx context.Context, entityType, entityKey string) (*LookupResult, error) {
84+
// TODO: implement this
85+
return nil, errors.New("not implemented")
86+
}
87+
88+
func (m *manager) LookupExternalEntityOfNewWorkflow(ctx context.Context, req *types.HistoryStartWorkflowExecutionRequest) (*LookupResult, error) {
89+
d, err := m.domainIDToDomainFn(req.DomainUUID)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
if !d.GetReplicationConfig().IsActiveActive() {
95+
// Not an active-active domain. return ActiveClusterName from domain entry
96+
return &LookupResult{
97+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
98+
FailoverVersion: d.GetFailoverVersion(),
99+
}, nil
100+
}
101+
102+
wfID := req.StartRequest.WorkflowID
103+
return m.fakeLookupWorkflow(wfID)
104+
}
105+
106+
func (m *manager) LookupWorkflow(ctx context.Context, domainID, wfID, rID string) (*LookupResult, error) {
107+
d, err := m.domainIDToDomainFn(domainID)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
if !d.GetReplicationConfig().IsActiveActive() {
113+
// Not an active-active domain. return ActiveClusterName from domain entry
114+
return &LookupResult{
115+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
116+
FailoverVersion: d.GetFailoverVersion(),
117+
}, nil
118+
}
119+
120+
return m.fakeLookupWorkflow(wfID)
121+
}
122+
123+
func (m *manager) ClusterNameForFailoverVersion(failoverVersion int64, domainID string) (string, error) {
124+
d, err := m.domainIDToDomainFn(domainID)
125+
if err != nil {
126+
return "", err
127+
}
128+
129+
if !d.GetReplicationConfig().IsActiveActive() {
130+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
131+
if err != nil {
132+
return "", err
133+
}
134+
return cluster, nil
135+
}
136+
137+
// For active-active domains, the failover version might be mapped to a cluster or a region
138+
// First check if it maps to a cluster
139+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
140+
if err == nil {
141+
return cluster, nil
142+
}
143+
144+
// Check if it maps to a region.
145+
region, err := m.clusterMetadata.RegionForFailoverVersion(failoverVersion)
146+
if err != nil {
147+
return "", err
148+
}
149+
150+
// Now we know the region, find the cluster in the domain's active cluster list which belongs to the region
151+
cfg, ok := d.GetReplicationConfig().ActiveClusters.RegionToClusterMap[region]
152+
if !ok {
153+
return "", fmt.Errorf("could not find region %s in the domain's active cluster config", region)
154+
}
155+
156+
enabledClusters := m.clusterMetadata.GetEnabledClusterInfo()
157+
_, ok = enabledClusters[cfg.ActiveClusterName]
158+
if !ok {
159+
return "", fmt.Errorf("cluster %s is disabled", cfg.ActiveClusterName)
160+
}
161+
162+
return cfg.ActiveClusterName, nil
163+
}
164+
165+
func (m *manager) RegisterChangeCallback(shardID int, callback func(ChangeType)) {
166+
m.changeCallbacksLock.Lock()
167+
defer m.changeCallbacksLock.Unlock()
168+
169+
m.changeCallbacks[shardID] = callback
170+
}
171+
172+
func (m *manager) UnregisterChangeCallback(shardID int) {
173+
m.changeCallbacksLock.Lock()
174+
defer m.changeCallbacksLock.Unlock()
175+
176+
delete(m.changeCallbacks, shardID)
177+
}
178+
179+
func (m *manager) notifyChangeCallbacks(changeType ChangeType) {
180+
m.changeCallbacksLock.RLock()
181+
defer m.changeCallbacksLock.RUnlock()
182+
183+
for _, callback := range m.changeCallbacks {
184+
callback(changeType)
185+
}
186+
}

0 commit comments

Comments
 (0)