Skip to content

Commit 139d56c

Browse files
active-active prototyping
1 parent f7d936d commit 139d56c

File tree

133 files changed

+2203
-583
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+2203
-583
lines changed

Diff for: .cursor/mcp.json

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"mcpServers": {
3+
"cadence-mcp-server": {
4+
"command": "/Users/taylan/code/forks/cadence/.bin/cadence_mcp",
5+
"args": [],
6+
"env": {}
7+
}
8+
}
9+
}

Diff for: cmd/server/cadence/server.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,7 @@ func (s *server) startService() common.Daemon {
221221
params.GetIsolationGroups = getFromDynamicConfig(params, dc)
222222

223223
params.ClusterMetadata = cluster.NewMetadata(
224-
clusterGroupMetadata.FailoverVersionIncrement,
225-
clusterGroupMetadata.PrimaryClusterName,
226-
clusterGroupMetadata.CurrentClusterName,
227-
clusterGroupMetadata.ClusterGroup,
224+
*clusterGroupMetadata,
228225
dc.GetBoolPropertyFilteredByDomain(dynamicconfig.UseNewInitialFailoverVersion),
229226
params.MetricsClient,
230227
params.Logger,

Diff for: common/activecluster/fake.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"sync/atomic"
27+
"time"
28+
)
29+
30+
var (
31+
wf1FailoverTime = 60 * time.Second
32+
33+
cluster0Result = &LookupResult{
34+
Region: "region0",
35+
ClusterName: "cluster0",
36+
FailoverVersion: 1,
37+
}
38+
39+
cluster1Result = &LookupResult{
40+
Region: "region1",
41+
ClusterName: "cluster1",
42+
FailoverVersion: 2,
43+
}
44+
)
45+
46+
// TODO: Remove below fake implementation and implement properly
47+
// - lookup active region given <domain id, wf id, run id> from executions table RowType=ActiveCluster.
48+
// - cache this info
49+
// - add metrics for cache hit/miss
50+
// - return cluster name
51+
52+
// Fake logic:
53+
// - wf1 is active in cluster0 for first 60 seconds, then active in cluster1.
54+
// Note: Simulation sleeps for 30s in the beginning and runs wf1 for 60s. So wf1 should start in cluster0 and complete in cluster1.
55+
// - other workflows are always active in cluster1
56+
func (m *manager) fakeLookupWorkflow(wfID string) (*LookupResult, error) {
57+
if wfID == "wf1" && (m.wf1StartTime.IsZero() || atomic.LoadInt32(&m.wf1FailedOver) == 0) {
58+
if m.wf1StartTime.IsZero() {
59+
m.logger.Debug("Initializing wf1 failover timer")
60+
m.wf1StartTime = time.Now()
61+
go m.fakeEntityMapChange()
62+
}
63+
m.logger.Debug("Returning cluster0 for wf1")
64+
return cluster0Result, nil
65+
}
66+
67+
if wfID == "wf1" {
68+
m.logger.Debug("Returning cluster1 for wf1")
69+
}
70+
71+
return cluster1Result, nil
72+
}
73+
74+
func (m *manager) fakeEntityMapChange() {
75+
// Based on the fake logic, wf1 will failover to cluster1 after 60 seconds.
76+
t := time.NewTimer(wf1FailoverTime)
77+
defer t.Stop()
78+
for {
79+
select {
80+
case <-t.C:
81+
m.logger.Debugf("Faking wf1 failover")
82+
atomic.CompareAndSwapInt32(&m.wf1FailedOver, 0, 1)
83+
m.notifyChangeCallbacks(ChangeTypeEntityMap)
84+
case <-m.ctx.Done():
85+
return
86+
}
87+
}
88+
}

Diff for: common/activecluster/manager.go

+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"context"
27+
"errors"
28+
"sync"
29+
"time"
30+
31+
"github.com/uber/cadence/common/cache"
32+
"github.com/uber/cadence/common/cluster"
33+
"github.com/uber/cadence/common/log"
34+
"github.com/uber/cadence/common/log/tag"
35+
"github.com/uber/cadence/common/metrics"
36+
"github.com/uber/cadence/common/types"
37+
)
38+
39+
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
40+
41+
type manager struct {
42+
domainIDToDomainFn DomainIDToDomainFn
43+
clusterMetadata cluster.Metadata
44+
metricsCl metrics.Client
45+
logger log.Logger
46+
ctx context.Context
47+
cancel context.CancelFunc
48+
49+
// TODO: fakes to be remove
50+
wf1StartTime time.Time
51+
wf1FailedOver int32
52+
53+
changeCallbacksLock sync.RWMutex
54+
changeCallbacks map[int]func(ChangeType)
55+
}
56+
57+
func NewManager(
58+
domainIDToDomainFn DomainIDToDomainFn,
59+
clusterMetadata cluster.Metadata,
60+
metricsCl metrics.Client,
61+
logger log.Logger,
62+
) Manager {
63+
ctx, cancel := context.WithCancel(context.Background())
64+
return &manager{
65+
domainIDToDomainFn: domainIDToDomainFn,
66+
clusterMetadata: clusterMetadata,
67+
metricsCl: metricsCl,
68+
logger: logger.WithTags(tag.ComponentActiveClusterManager),
69+
ctx: ctx,
70+
cancel: cancel,
71+
changeCallbacks: make(map[int]func(ChangeType)),
72+
}
73+
}
74+
75+
func (m *manager) Start() {
76+
}
77+
78+
func (m *manager) Stop() {
79+
m.cancel()
80+
}
81+
82+
func (m *manager) LookupExternalEntity(ctx context.Context, entityType, entityKey string) (*LookupResult, error) {
83+
// TODO: implement this
84+
return nil, errors.New("not implemented")
85+
}
86+
87+
func (m *manager) LookupExternalEntityOfNewWorkflow(ctx context.Context, req *types.HistoryStartWorkflowExecutionRequest) (*LookupResult, error) {
88+
d, err := m.domainIDToDomainFn(req.DomainUUID)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
if !d.GetReplicationConfig().IsActiveActive() {
94+
// Not an active-active domain. return ActiveClusterName from domain entry
95+
return &LookupResult{
96+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
97+
FailoverVersion: d.GetFailoverVersion(),
98+
}, nil
99+
}
100+
101+
wfID := req.StartRequest.WorkflowID
102+
return m.fakeLookupWorkflow(wfID)
103+
}
104+
105+
func (m *manager) LookupWorkflow(ctx context.Context, domainID, wfID, rID string) (*LookupResult, error) {
106+
d, err := m.domainIDToDomainFn(domainID)
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
if !d.GetReplicationConfig().IsActiveActive() {
112+
// Not an active-active domain. return ActiveClusterName from domain entry
113+
return &LookupResult{
114+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
115+
FailoverVersion: d.GetFailoverVersion(),
116+
}, nil
117+
}
118+
119+
return m.fakeLookupWorkflow(wfID)
120+
}
121+
122+
func (m *manager) LookupFailoverVersion(failoverVersion int64, domainID string) (*LookupResult, error) {
123+
d, err := m.domainIDToDomainFn(domainID)
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
if !d.GetReplicationConfig().IsActiveActive() {
129+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
130+
if err != nil {
131+
return nil, err
132+
}
133+
return &LookupResult{
134+
ClusterName: cluster,
135+
FailoverVersion: failoverVersion,
136+
}, nil
137+
}
138+
139+
// For active-active domains, the failover version might be mapped to a cluster or a region
140+
// First check if it maps to a cluster
141+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
142+
if err == nil {
143+
return &LookupResult{
144+
ClusterName: cluster,
145+
FailoverVersion: failoverVersion,
146+
Region: m.regionOfCluster(cluster),
147+
}, nil
148+
}
149+
150+
// Check if it maps to a region.
151+
region, err := m.clusterMetadata.RegionForFailoverVersion(failoverVersion)
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
// Now we know the region, find the cluster in the domain's active cluster list which belongs to the region
157+
enabledClusters := m.clusterMetadata.GetEnabledClusterInfo()
158+
for _, c := range d.GetReplicationConfig().ActiveClusters {
159+
cl, ok := enabledClusters[c.ClusterName]
160+
if !ok {
161+
continue
162+
}
163+
if cl.Region == region {
164+
return &LookupResult{
165+
ClusterName: c.ClusterName,
166+
Region: region,
167+
FailoverVersion: failoverVersion,
168+
}, nil
169+
}
170+
}
171+
172+
return nil, errors.New("could not find cluster in the domain's active cluster list which belongs to the region")
173+
}
174+
175+
func (m *manager) RegisterChangeCallback(shardID int, callback func(ChangeType)) {
176+
m.changeCallbacksLock.Lock()
177+
defer m.changeCallbacksLock.Unlock()
178+
179+
m.changeCallbacks[shardID] = callback
180+
}
181+
182+
func (m *manager) UnregisterChangeCallback(shardID int) {
183+
m.changeCallbacksLock.Lock()
184+
defer m.changeCallbacksLock.Unlock()
185+
186+
delete(m.changeCallbacks, shardID)
187+
}
188+
189+
func (m *manager) notifyChangeCallbacks(changeType ChangeType) {
190+
m.changeCallbacksLock.RLock()
191+
defer m.changeCallbacksLock.RUnlock()
192+
193+
for _, callback := range m.changeCallbacks {
194+
callback(changeType)
195+
}
196+
}
197+
198+
// regionOfCluster returns the region of a cluster as defined in cluster metadata. May return empty if cluster is not found or have no region.
199+
func (m *manager) regionOfCluster(cluster string) string {
200+
return m.clusterMetadata.GetAllClusterInfo()[cluster].Region
201+
}

0 commit comments

Comments
 (0)