Skip to content

Commit 5ad8d40

Browse files
active-active prototyping
1 parent f7d936d commit 5ad8d40

File tree

127 files changed

+1969
-566
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+1969
-566
lines changed

Diff for: .cursor/mcp.json

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"mcpServers": {
3+
"cadence-mcp-server": {
4+
"command": "/Users/taylan/code/forks/cadence/.bin/cadence_mcp",
5+
"args": [],
6+
"env": {}
7+
}
8+
}
9+
}

Diff for: cmd/server/cadence/server.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,7 @@ func (s *server) startService() common.Daemon {
221221
params.GetIsolationGroups = getFromDynamicConfig(params, dc)
222222

223223
params.ClusterMetadata = cluster.NewMetadata(
224-
clusterGroupMetadata.FailoverVersionIncrement,
225-
clusterGroupMetadata.PrimaryClusterName,
226-
clusterGroupMetadata.CurrentClusterName,
227-
clusterGroupMetadata.ClusterGroup,
224+
*clusterGroupMetadata,
228225
dc.GetBoolPropertyFilteredByDomain(dynamicconfig.UseNewInitialFailoverVersion),
229226
params.MetricsClient,
230227
params.Logger,

Diff for: common/activecluster/manager.go

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"context"
27+
"errors"
28+
"time"
29+
30+
"github.com/uber/cadence/common/cache"
31+
"github.com/uber/cadence/common/cluster"
32+
"github.com/uber/cadence/common/log"
33+
"github.com/uber/cadence/common/log/tag"
34+
"github.com/uber/cadence/common/metrics"
35+
"github.com/uber/cadence/common/types"
36+
)
37+
38+
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
39+
40+
type manager struct {
41+
domainIDToDomainFn DomainIDToDomainFn
42+
clusterMetadata cluster.Metadata
43+
metricsCl metrics.Client
44+
logger log.Logger
45+
startTime time.Time
46+
}
47+
48+
func NewManager(
49+
domainIDToDomainFn DomainIDToDomainFn,
50+
clusterMetadata cluster.Metadata,
51+
metricsCl metrics.Client,
52+
logger log.Logger,
53+
) Manager {
54+
return &manager{
55+
domainIDToDomainFn: domainIDToDomainFn,
56+
clusterMetadata: clusterMetadata,
57+
metricsCl: metricsCl,
58+
logger: logger.WithTags(tag.ComponentActiveClusterManager),
59+
startTime: time.Now(),
60+
}
61+
}
62+
63+
func (m *manager) Start() {
64+
// TODO: implement this
65+
}
66+
67+
func (m *manager) Stop() {
68+
// TODO: implement this
69+
}
70+
71+
func (m *manager) LookupExternalEntity(ctx context.Context, entityType, entityKey string) (*LookupResult, error) {
72+
// TODO: implement this
73+
return nil, errors.New("not implemented")
74+
}
75+
76+
func (m *manager) LookupExternalEntityOfNewWorkflow(ctx context.Context, req *types.HistoryStartWorkflowExecutionRequest) (*LookupResult, error) {
77+
d, err := m.domainIDToDomainFn(req.DomainUUID)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
if !d.GetReplicationConfig().IsActiveActive() {
83+
// Not an active-active domain. return ActiveClusterName from domain entry
84+
return &LookupResult{
85+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
86+
FailoverVersion: d.GetFailoverVersion(),
87+
}, nil
88+
}
89+
90+
wfID := req.StartRequest.WorkflowID
91+
return m.helperToBeRemoved(wfID)
92+
}
93+
94+
func (m *manager) LookupWorkflow(ctx context.Context, domainID, wfID, rID string) (*LookupResult, error) {
95+
d, err := m.domainIDToDomainFn(domainID)
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
if !d.GetReplicationConfig().IsActiveActive() {
101+
// Not an active-active domain. return ActiveClusterName from domain entry
102+
return &LookupResult{
103+
ClusterName: d.GetReplicationConfig().ActiveClusterName,
104+
FailoverVersion: d.GetFailoverVersion(),
105+
}, nil
106+
}
107+
108+
return m.helperToBeRemoved(wfID)
109+
}
110+
111+
func (m *manager) LookupFailoverVersion(failoverVersion int64, domainID string) (*LookupResult, error) {
112+
d, err := m.domainIDToDomainFn(domainID)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
if !d.GetReplicationConfig().IsActiveActive() {
118+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
119+
if err != nil {
120+
return nil, err
121+
}
122+
return &LookupResult{
123+
ClusterName: cluster,
124+
FailoverVersion: failoverVersion,
125+
}, nil
126+
}
127+
128+
// For active-active domains, the failover version might be mapped to a cluster or a region
129+
// First check if it maps to a cluster
130+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
131+
if err == nil {
132+
return &LookupResult{
133+
ClusterName: cluster,
134+
FailoverVersion: failoverVersion,
135+
Region: m.regionOfCluster(cluster),
136+
}, nil
137+
}
138+
139+
// Check if it maps to a region.
140+
region, err := m.clusterMetadata.RegionForFailoverVersion(failoverVersion)
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
// Now we know the region, find the cluster in the domain's active cluster list which belongs to the region
146+
enabledClusters := m.clusterMetadata.GetEnabledClusterInfo()
147+
for _, c := range d.GetReplicationConfig().ActiveClusters {
148+
cl, ok := enabledClusters[c.ClusterName]
149+
if !ok {
150+
continue
151+
}
152+
if cl.Region == region {
153+
return &LookupResult{
154+
ClusterName: c.ClusterName,
155+
Region: region,
156+
FailoverVersion: failoverVersion,
157+
}, nil
158+
}
159+
}
160+
161+
return nil, errors.New("could not find cluster in the domain's active cluster list which belongs to the region")
162+
}
163+
164+
// regionOfCluster returns the region of a cluster as defined in cluster metadata. May return empty if cluster is not found or have no region.
165+
func (m *manager) regionOfCluster(cluster string) string {
166+
return m.clusterMetadata.GetAllClusterInfo()[cluster].Region
167+
}
168+
169+
func (m *manager) helperToBeRemoved(wfID string) (*LookupResult, error) {
170+
// TODO: Remove below fake implementation and implement properly
171+
// - lookup active region given <domain id, wf id, run id> from executions table RowType=ActiveCluster.
172+
// - cache this info
173+
// - add metrics for cache hit/miss
174+
// - return cluster name
175+
176+
// Fake logic:
177+
// - wf1 is active in cluster0 for first 60 seconds, then active in cluster1.
178+
// Note: Simulation sleeps for 30s in the beginning and runs wf1 for 60s. So wf1 should start in cluster0 and complete in cluster1.
179+
// - other workflows are always active in cluster1
180+
if wfID == "wf1" && time.Since(m.startTime) < 60*time.Second {
181+
m.logger.Debug("Returning cluster0 for wf1")
182+
return &LookupResult{
183+
Region: "region0",
184+
ClusterName: "cluster0",
185+
FailoverVersion: 1,
186+
}, nil
187+
}
188+
189+
if wfID == "wf1" {
190+
m.logger.Debug("Returning cluster1 for wf1")
191+
}
192+
193+
return &LookupResult{
194+
Region: "region1",
195+
ClusterName: "cluster1",
196+
FailoverVersion: 2,
197+
}, nil
198+
}

Diff for: common/activecluster/manager_mock.go

+149
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)