Skip to content

Active-active exploration/prototyping #6724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .cursor/mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"mcpServers": {
"cadence-mcp-server": {
"command": "/Users/taylan/code/forks/cadence/.bin/cadence_mcp",
"args": [],
"env": {}
}
}
}
5 changes: 1 addition & 4 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,7 @@ func (s *server) startService() common.Daemon {
params.GetIsolationGroups = getFromDynamicConfig(params, dc)

params.ClusterMetadata = cluster.NewMetadata(
clusterGroupMetadata.FailoverVersionIncrement,
clusterGroupMetadata.PrimaryClusterName,
clusterGroupMetadata.CurrentClusterName,
clusterGroupMetadata.ClusterGroup,
*clusterGroupMetadata,
dc.GetBoolPropertyFilteredByDomain(dynamicproperties.UseNewInitialFailoverVersion),
params.MetricsClient,
params.Logger,
Expand Down
88 changes: 88 additions & 0 deletions common/activecluster/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package activecluster

import (
"sync/atomic"
"time"
)

var (
wf1FailoverTime = 60 * time.Second

cluster0Result = &LookupResult{
Region: "region0",
ClusterName: "cluster0",
FailoverVersion: 1,
}

cluster1Result = &LookupResult{
Region: "region1",
ClusterName: "cluster1",
FailoverVersion: 2,
}
)

// TODO: Remove below fake implementation and implement properly
// - lookup active region given <domain id, wf id, run id> from executions table RowType=ActiveCluster.
// - cache this info
// - add metrics for cache hit/miss
// - return cluster name

// Fake logic:
// - wf1 is active in cluster0 for first 60 seconds, then active in cluster1.
// Note: Simulation sleeps for 30s in the beginning and runs wf1 for 60s. So wf1 should start in cluster0 and complete in cluster1.
// - other workflows are always active in cluster1
func (m *manager) fakeLookupWorkflow(wfID string) (*LookupResult, error) {
if wfID == "wf1" && (m.wf1StartTime.IsZero() || atomic.LoadInt32(&m.wf1FailedOver) == 0) {
if m.wf1StartTime.IsZero() {
m.logger.Debug("Initializing wf1 failover timer")
m.wf1StartTime = time.Now()
go m.fakeEntityMapChange()
}
m.logger.Debug("Returning cluster0 for wf1")
return cluster0Result, nil
}

if wfID == "wf1" {
m.logger.Debug("Returning cluster1 for wf1")
}

return cluster1Result, nil
}

func (m *manager) fakeEntityMapChange() {
// Based on the fake logic, wf1 will failover to cluster1 after 60 seconds.
t := time.NewTimer(wf1FailoverTime)
defer t.Stop()
for {
select {
case <-t.C:
m.logger.Debugf("Faking wf1 failover")
atomic.CompareAndSwapInt32(&m.wf1FailedOver, 0, 1)
m.notifyChangeCallbacks(ChangeTypeEntityMap)
case <-m.ctx.Done():
return
}
}
}
201 changes: 201 additions & 0 deletions common/activecluster/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package activecluster

import (
"context"
"errors"
"sync"
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)

type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)

type manager struct {
domainIDToDomainFn DomainIDToDomainFn
clusterMetadata cluster.Metadata
metricsCl metrics.Client
logger log.Logger
ctx context.Context
cancel context.CancelFunc

// TODO: fakes to be remove
wf1StartTime time.Time
wf1FailedOver int32

changeCallbacksLock sync.RWMutex
changeCallbacks map[int]func(ChangeType)
}

func NewManager(
domainIDToDomainFn DomainIDToDomainFn,
clusterMetadata cluster.Metadata,
metricsCl metrics.Client,
logger log.Logger,
) Manager {
ctx, cancel := context.WithCancel(context.Background())
return &manager{
domainIDToDomainFn: domainIDToDomainFn,
clusterMetadata: clusterMetadata,
metricsCl: metricsCl,
logger: logger.WithTags(tag.ComponentActiveClusterManager),
ctx: ctx,
cancel: cancel,
changeCallbacks: make(map[int]func(ChangeType)),
}
}

func (m *manager) Start() {
}

func (m *manager) Stop() {
m.cancel()
}

func (m *manager) LookupExternalEntity(ctx context.Context, entityType, entityKey string) (*LookupResult, error) {
// TODO: implement this
return nil, errors.New("not implemented")
}

func (m *manager) LookupExternalEntityOfNewWorkflow(ctx context.Context, req *types.HistoryStartWorkflowExecutionRequest) (*LookupResult, error) {
d, err := m.domainIDToDomainFn(req.DomainUUID)
if err != nil {
return nil, err
}

if !d.GetReplicationConfig().IsActiveActive() {
// Not an active-active domain. return ActiveClusterName from domain entry
return &LookupResult{
ClusterName: d.GetReplicationConfig().ActiveClusterName,
FailoverVersion: d.GetFailoverVersion(),
}, nil
}

wfID := req.StartRequest.WorkflowID
return m.fakeLookupWorkflow(wfID)
}

func (m *manager) LookupWorkflow(ctx context.Context, domainID, wfID, rID string) (*LookupResult, error) {
d, err := m.domainIDToDomainFn(domainID)
if err != nil {
return nil, err
}

if !d.GetReplicationConfig().IsActiveActive() {
// Not an active-active domain. return ActiveClusterName from domain entry
return &LookupResult{
ClusterName: d.GetReplicationConfig().ActiveClusterName,
FailoverVersion: d.GetFailoverVersion(),
}, nil
}

return m.fakeLookupWorkflow(wfID)
}

func (m *manager) LookupFailoverVersion(failoverVersion int64, domainID string) (*LookupResult, error) {
d, err := m.domainIDToDomainFn(domainID)
if err != nil {
return nil, err
}

if !d.GetReplicationConfig().IsActiveActive() {
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
if err != nil {
return nil, err
}
return &LookupResult{
ClusterName: cluster,
FailoverVersion: failoverVersion,
}, nil
}

// For active-active domains, the failover version might be mapped to a cluster or a region
// First check if it maps to a cluster
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
if err == nil {
return &LookupResult{
ClusterName: cluster,
FailoverVersion: failoverVersion,
Region: m.regionOfCluster(cluster),
}, nil
}

// Check if it maps to a region.
region, err := m.clusterMetadata.RegionForFailoverVersion(failoverVersion)
if err != nil {
return nil, err
}

// Now we know the region, find the cluster in the domain's active cluster list which belongs to the region
enabledClusters := m.clusterMetadata.GetEnabledClusterInfo()
for _, c := range d.GetReplicationConfig().ActiveClusters {
cl, ok := enabledClusters[c.ClusterName]
if !ok {
continue
}
if cl.Region == region {
return &LookupResult{
ClusterName: c.ClusterName,
Region: region,
FailoverVersion: failoverVersion,
}, nil
}
}

return nil, errors.New("could not find cluster in the domain's active cluster list which belongs to the region")
}

func (m *manager) RegisterChangeCallback(shardID int, callback func(ChangeType)) {
m.changeCallbacksLock.Lock()
defer m.changeCallbacksLock.Unlock()

m.changeCallbacks[shardID] = callback
}

func (m *manager) UnregisterChangeCallback(shardID int) {
m.changeCallbacksLock.Lock()
defer m.changeCallbacksLock.Unlock()

delete(m.changeCallbacks, shardID)
}

func (m *manager) notifyChangeCallbacks(changeType ChangeType) {
m.changeCallbacksLock.RLock()
defer m.changeCallbacksLock.RUnlock()

for _, callback := range m.changeCallbacks {
callback(changeType)
}
}

// regionOfCluster returns the region of a cluster as defined in cluster metadata. May return empty if cluster is not found or have no region.
func (m *manager) regionOfCluster(cluster string) string {
return m.clusterMetadata.GetAllClusterInfo()[cluster].Region
}
Loading
Loading