Skip to content

Disconnect tasklist pollers on domain failover using callback #6903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type (
membershipResolver membership.Resolver
isolationState isolationgroup.State
timeSource clock.TimeSource
notificationVersion int64
}

// HistoryInfo consists of two integer regarding the history size and history count
Expand Down Expand Up @@ -162,6 +163,7 @@ func NewEngine(
}

func (e *matchingEngineImpl) Start() {
e.registerDomainFailoverCallback()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matching engine is not created on-demand so it doesn't matter probably but for consistency reasons let's unregister during Stop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The matching engine can be completely removed and everything can be put inside handler directly, but the change is not unnecessary.

}

func (e *matchingEngineImpl) Stop() {
Expand All @@ -170,6 +172,7 @@ func (e *matchingEngineImpl) Stop() {
for _, l := range e.getTaskLists(math.MaxInt32) {
l.Stop()
}
e.unregisterDomainFailoverCallback()
e.shutdownCompletion.Wait()
}

Expand Down Expand Up @@ -535,7 +538,7 @@ pollLoop:
pollerCtx = tasklist.ContextWithIsolationGroup(pollerCtx, req.GetIsolationGroup())
tlMgr, err := e.getTaskListManager(taskListID, taskListKind)
if err != nil {
return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err)
return nil, fmt.Errorf("couldn't load tasklist manager: %w", err)
}
startT := time.Now() // Record the start time
task, err := tlMgr.GetTask(pollerCtx, nil)
Expand Down Expand Up @@ -724,7 +727,7 @@ pollLoop:
taskListKind := request.TaskList.Kind
tlMgr, err := e.getTaskListManager(taskListID, taskListKind)
if err != nil {
return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err)
return nil, fmt.Errorf("couldn't load tasklist manager: %w", err)
}
startT := time.Now() // Record the start time
task, err := tlMgr.GetTask(pollerCtx, maxDispatch)
Expand Down Expand Up @@ -1425,6 +1428,66 @@ func (e *matchingEngineImpl) isShuttingDown() bool {
}
}

func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCacheEntry) {
newNotificationVersion := e.notificationVersion

for _, domain := range nextDomains {
if domain.GetNotificationVersion() > newNotificationVersion {
newNotificationVersion = domain.GetNotificationVersion()
}

if !isDomainEligibleToDisconnectPollers(domain, e.notificationVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does domain notification version only change when active-> passive switch happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. I guess it can be more efficient, I'll change that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notification version should change for every domain change I should think.

I guess my question is here: what's the use-case or thing you're guarding against here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to be more efficient when getting domain updates, but if we are monotonically increasing the value I'm not sure if it adds any value. If we always get values that were higher than the stored one, it'll not make any difference. Is my understanding correct here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took me a second to get my head around the code structure, that makes sense. No concerns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could be more efficient by checking the failover version of the domain. From my understanding, notification version is also updated when the domain metadata is updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the failover version is independent for each domain, right? I'd have to keep track of each domain failover version independently in the manager, not in the engine. I guess I could use that to track failover instead of using the domain's active name. Does that make sense?

continue
}

req := &types.GetTaskListsByDomainRequest{
Domain: domain.GetInfo().Name,
}

resp, err := e.GetTaskListsByDomain(nil, req)
if err != nil {
continue
}

for taskListName := range resp.DecisionTaskListMap {
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision)
}

for taskListName := range resp.ActivityTaskListMap {
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeActivity)
}
}
e.notificationVersion = newNotificationVersion
}

func (e *matchingEngineImpl) registerDomainFailoverCallback() {
e.domainCache.RegisterDomainChangeCallback(
service.Matching,
func(_ cache.DomainCache, _ cache.PrepareCallbackFn, _ cache.CallbackFn) {},
func() {},
e.domainChangeCallback)
}

func (e *matchingEngineImpl) unregisterDomainFailoverCallback() {
e.domainCache.UnregisterDomainChangeCallback(service.Matching)
}

func (e *matchingEngineImpl) disconnectTaskListPollersAfterDomainFailover(taskListName string, domain *cache.DomainCacheEntry, taskType int) {
taskList, err := tasklist.NewIdentifier(domain.GetInfo().ID, taskListName, taskType)
if err != nil {
return
}
tlMgr, err := e.getTaskListManager(taskList, types.TaskListKindNormal.Ptr())
if err != nil {
e.logger.Error("Couldn't load tasklist manager", tag.Error(err))
return
}

if tlMgr.GetDomainActiveCluster() != "" && tlMgr.GetDomainActiveCluster() != domain.GetReplicationConfig().ActiveClusterName {
tlMgr.DisconnectBlockedPollers(&domain.GetReplicationConfig().ActiveClusterName)
}
}

func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
m.Lock()
defer m.Unlock()
Expand All @@ -1451,3 +1514,10 @@ func isMatchingRetryableError(err error) bool {
}
return true
}

func isDomainEligibleToDisconnectPollers(domain *cache.DomainCacheEntry, currentVersion int64) bool {
return domain.IsGlobalDomain() &&
domain.GetReplicationConfig() != nil &&
!domain.GetReplicationConfig().IsActiveActive() &&
domain.GetNotificationVersion() > currentVersion
}
2 changes: 2 additions & 0 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().RegisterDomainChangeCallback(service.Matching, gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
s.mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).AnyTimes()
s.mockMembershipResolver = membership.NewMockResolver(s.controller)
s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes()
Expand Down
123 changes: 123 additions & 0 deletions service/matching/handler/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
Expand Down Expand Up @@ -645,7 +646,11 @@ func TestWaitForQueryResult(t *testing.T) {
func TestIsShuttingDown(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(0)
mockDomainCache := cache.NewMockDomainCache(gomock.NewController(t))
mockDomainCache.EXPECT().RegisterDomainChangeCallback(service.Matching, gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).Times(1)
e := matchingEngineImpl{
domainCache: mockDomainCache,
shutdownCompletion: &wg,
shutdown: make(chan struct{}),
}
Expand Down Expand Up @@ -1138,3 +1143,121 @@ func TestRefreshTaskListPartitionConfig(t *testing.T) {
})
}
}

func Test_domainChangeCallback(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockDomainCache := cache.NewMockDomainCache(mockCtrl)

clusters := []string{"cluster0", "cluster1"}

mockTaskListManagerGlobal1 := tasklist.NewMockManager(mockCtrl)
mockTaskListManagerGlobal2 := tasklist.NewMockManager(mockCtrl)
mockTaskListManagerGlobal3 := tasklist.NewMockManager(mockCtrl)
mockTaskListManagerLocal1 := tasklist.NewMockManager(mockCtrl)
mockTaskListManagerActiveActive1 := tasklist.NewMockManager(mockCtrl)

tlIdentifierDecisionGlobal1, _ := tasklist.NewIdentifier("global-domain-1-id", "global-domain-1", persistence.TaskListTypeDecision)
tlIdentifierActivityGlobal1, _ := tasklist.NewIdentifier("global-domain-1-id", "global-domain-1", persistence.TaskListTypeActivity)
tlIdentifierDecisionGlobal2, _ := tasklist.NewIdentifier("global-domain-2-id", "global-domain-2", persistence.TaskListTypeDecision)
tlIdentifierActivityGlobal2, _ := tasklist.NewIdentifier("global-domain-2-id", "global-domain-2", persistence.TaskListTypeActivity)
tlIdentifierDecisionGlobal3, _ := tasklist.NewIdentifier("global-domain-3-id", "global-domain-3", persistence.TaskListTypeDecision)
tlIdentifierActivityGlobal3, _ := tasklist.NewIdentifier("global-domain-3-id", "global-domain-3", persistence.TaskListTypeActivity)
tlIdentifierDecisionLocal1, _ := tasklist.NewIdentifier("local-domain-1-id", "local-domain-1", persistence.TaskListTypeDecision)
tlIdentifierActivityLocal1, _ := tasklist.NewIdentifier("local-domain-1-id", "local-domain-1", persistence.TaskListTypeActivity)
tlIdentifierDecisionActiveActive1, _ := tasklist.NewIdentifier("active-active-domain-1-id", "active-active-domain-1", persistence.TaskListTypeDecision)
tlIdentifierActivityActiveActive1, _ := tasklist.NewIdentifier("active-active-domain-1-id", "active-active-domain-1", persistence.TaskListTypeActivity)

engine := &matchingEngineImpl{
domainCache: mockDomainCache,
notificationVersion: 0,
config: defaultTestConfig(),
taskLists: map[tasklist.Identifier]tasklist.Manager{
*tlIdentifierDecisionGlobal1: mockTaskListManagerGlobal1,
*tlIdentifierActivityGlobal1: mockTaskListManagerGlobal1,
*tlIdentifierDecisionGlobal2: mockTaskListManagerGlobal2,
*tlIdentifierActivityGlobal2: mockTaskListManagerGlobal2,
*tlIdentifierDecisionGlobal3: mockTaskListManagerGlobal3,
*tlIdentifierActivityGlobal3: mockTaskListManagerGlobal3,
*tlIdentifierDecisionLocal1: mockTaskListManagerLocal1,
*tlIdentifierActivityLocal1: mockTaskListManagerLocal1,
*tlIdentifierDecisionActiveActive1: mockTaskListManagerActiveActive1,
*tlIdentifierActivityActiveActive1: mockTaskListManagerActiveActive1,
},
}

mockTaskListManagerGlobal1.EXPECT().DisconnectBlockedPollers(&clusters[0]).Times(0)
mockTaskListManagerGlobal2.EXPECT().GetDomainActiveCluster().Return(clusters[0]).Times(4)
mockTaskListManagerGlobal2.EXPECT().GetTaskListKind().Return(types.TaskListKindNormal).Times(2)
mockTaskListManagerGlobal2.EXPECT().DescribeTaskList(gomock.Any()).Return(&types.DescribeTaskListResponse{}).Times(2)
mockTaskListManagerGlobal2.EXPECT().DisconnectBlockedPollers(&clusters[1]).Times(2)
mockTaskListManagerGlobal3.EXPECT().GetDomainActiveCluster().Return(clusters[1]).Times(4)
mockTaskListManagerGlobal3.EXPECT().GetTaskListKind().Return(types.TaskListKindNormal).Times(2)
mockTaskListManagerGlobal3.EXPECT().DescribeTaskList(gomock.Any()).Return(&types.DescribeTaskListResponse{}).Times(2)
mockTaskListManagerGlobal3.EXPECT().DisconnectBlockedPollers(clusters[1]).Times(0)
mockTaskListManagerLocal1.EXPECT().DisconnectBlockedPollers(gomock.Any()).Times(0)
mockTaskListManagerActiveActive1.EXPECT().DisconnectBlockedPollers(gomock.Any()).Times(0)
mockDomainCache.EXPECT().GetDomainID("global-domain-2").Return("global-domain-2-id", nil).Times(1)
mockDomainCache.EXPECT().GetDomainID("global-domain-3").Return("global-domain-3-id", nil).Times(1)

domains := []*cache.DomainCacheEntry{
cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "global-domain-1", ID: "global-domain-1-id"},
nil,
true,
&persistence.DomainReplicationConfig{ActiveClusterName: clusters[0], Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: "cluster0"}, {ClusterName: "cluster1"}}},
0,
nil,
0,
0,
0,
),
cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "global-domain-2", ID: "global-domain-2-id"},
nil,
true,
&persistence.DomainReplicationConfig{ActiveClusterName: clusters[1], Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: "cluster0"}, {ClusterName: "cluster1"}}},
0,
nil,
0,
0,
4,
),
cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "global-domain-3", ID: "global-domain-3-id"},
nil,
true,
&persistence.DomainReplicationConfig{ActiveClusterName: clusters[1], Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: "cluster0"}, {ClusterName: "cluster1"}}},
0,
nil,
0,
0,
5,
),
cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "local-domain-1", ID: "local-domain-1-id"},
nil,
false,
nil,
0,
nil,
0,
0,
3,
),
cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "active-active-domain-1", ID: "active-active-domain-1-id"},
nil,
true,
&persistence.DomainReplicationConfig{ActiveClusters: &persistence.ActiveClustersConfig{}},
0,
nil,
0,
0,
3,
),
}

engine.domainChangeCallback(domains)

assert.Equal(t, int64(5), engine.notificationVersion)
}
1 change: 1 addition & 0 deletions service/matching/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewHandler(

// Start starts the handler
func (h *handlerImpl) Start() {
h.engine.Start()
h.startWG.Done()
}

Expand Down
3 changes: 3 additions & 0 deletions service/matching/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *handlerSuite) TestStart() {
cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test", getIsolationGroupsHelper)
handler := s.getHandler(cfg)

s.mockEngine.EXPECT().Start().Times(1)

handler.Start()
}

Expand All @@ -132,6 +134,7 @@ func (s *handlerSuite) TestStop() {
cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test", getIsolationGroupsHelper)
handler := s.getHandler(cfg)

s.mockEngine.EXPECT().Start().Times(1)
s.mockEngine.EXPECT().Stop().Times(1)

handler.Start()
Expand Down
Loading