Skip to content

Commit 1ed0401

Browse files
authored
Disconnect tasklist pollers on domain failover using callback (#6903)
1 parent 4c813e7 commit 1ed0401

13 files changed

+500
-31
lines changed

service/matching/handler/engine.go

Lines changed: 106 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,24 +79,25 @@ type (
7979
}
8080

8181
matchingEngineImpl struct {
82-
shutdownCompletion *sync.WaitGroup
83-
shutdown chan struct{}
84-
taskManager persistence.TaskManager
85-
clusterMetadata cluster.Metadata
86-
historyService history.Client
87-
matchingClient matching.Client
88-
tokenSerializer common.TaskTokenSerializer
89-
logger log.Logger
90-
metricsClient metrics.Client
91-
taskListsLock sync.RWMutex // locks mutation of taskLists
92-
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
93-
config *config.Config
94-
lockableQueryTaskMap lockableQueryTaskMap
95-
domainCache cache.DomainCache
96-
versionChecker client.VersionChecker
97-
membershipResolver membership.Resolver
98-
isolationState isolationgroup.State
99-
timeSource clock.TimeSource
82+
shutdownCompletion *sync.WaitGroup
83+
shutdown chan struct{}
84+
taskManager persistence.TaskManager
85+
clusterMetadata cluster.Metadata
86+
historyService history.Client
87+
matchingClient matching.Client
88+
tokenSerializer common.TaskTokenSerializer
89+
logger log.Logger
90+
metricsClient metrics.Client
91+
taskListsLock sync.RWMutex // locks mutation of taskLists
92+
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
93+
config *config.Config
94+
lockableQueryTaskMap lockableQueryTaskMap
95+
domainCache cache.DomainCache
96+
versionChecker client.VersionChecker
97+
membershipResolver membership.Resolver
98+
isolationState isolationgroup.State
99+
timeSource clock.TimeSource
100+
failoverNotificationVersion int64
100101
}
101102

102103
// HistoryInfo consists of two integer regarding the history size and history count
@@ -162,6 +163,7 @@ func NewEngine(
162163
}
163164

164165
func (e *matchingEngineImpl) Start() {
166+
e.registerDomainFailoverCallback()
165167
}
166168

167169
func (e *matchingEngineImpl) Stop() {
@@ -170,6 +172,7 @@ func (e *matchingEngineImpl) Stop() {
170172
for _, l := range e.getTaskLists(math.MaxInt32) {
171173
l.Stop()
172174
}
175+
e.unregisterDomainFailoverCallback()
173176
e.shutdownCompletion.Wait()
174177
}
175178

@@ -535,7 +538,7 @@ pollLoop:
535538
pollerCtx = tasklist.ContextWithIsolationGroup(pollerCtx, req.GetIsolationGroup())
536539
tlMgr, err := e.getTaskListManager(taskListID, taskListKind)
537540
if err != nil {
538-
return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err)
541+
return nil, fmt.Errorf("couldn't load tasklist manager: %w", err)
539542
}
540543
startT := time.Now() // Record the start time
541544
task, err := tlMgr.GetTask(pollerCtx, nil)
@@ -724,7 +727,7 @@ pollLoop:
724727
taskListKind := request.TaskList.Kind
725728
tlMgr, err := e.getTaskListManager(taskListID, taskListKind)
726729
if err != nil {
727-
return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err)
730+
return nil, fmt.Errorf("couldn't load tasklist manager: %w", err)
728731
}
729732
startT := time.Now() // Record the start time
730733
task, err := tlMgr.GetTask(pollerCtx, maxDispatch)
@@ -1425,6 +1428,82 @@ func (e *matchingEngineImpl) isShuttingDown() bool {
14251428
}
14261429
}
14271430

1431+
func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCacheEntry) {
1432+
newFailoverNotificationVersion := e.failoverNotificationVersion
1433+
1434+
for _, domain := range nextDomains {
1435+
if domain.GetFailoverNotificationVersion() > newFailoverNotificationVersion {
1436+
newFailoverNotificationVersion = domain.GetFailoverNotificationVersion()
1437+
}
1438+
1439+
if !isDomainEligibleToDisconnectPollers(domain, e.failoverNotificationVersion) {
1440+
continue
1441+
}
1442+
1443+
req := &types.GetTaskListsByDomainRequest{
1444+
Domain: domain.GetInfo().Name,
1445+
}
1446+
1447+
resp, err := e.GetTaskListsByDomain(nil, req)
1448+
if err != nil {
1449+
continue
1450+
}
1451+
1452+
for taskListName := range resp.DecisionTaskListMap {
1453+
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision)
1454+
}
1455+
1456+
for taskListName := range resp.ActivityTaskListMap {
1457+
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeActivity)
1458+
}
1459+
}
1460+
e.failoverNotificationVersion = newFailoverNotificationVersion
1461+
}
1462+
1463+
func (e *matchingEngineImpl) registerDomainFailoverCallback() {
1464+
catchUpFn := func(domainCache cache.DomainCache, _ cache.PrepareCallbackFn, _ cache.CallbackFn) {
1465+
for _, domain := range domainCache.GetAllDomain() {
1466+
if domain.GetFailoverNotificationVersion() > e.failoverNotificationVersion {
1467+
e.failoverNotificationVersion = domain.GetFailoverNotificationVersion()
1468+
}
1469+
}
1470+
}
1471+
1472+
e.domainCache.RegisterDomainChangeCallback(
1473+
service.Matching,
1474+
catchUpFn,
1475+
func() {},
1476+
e.domainChangeCallback)
1477+
}
1478+
1479+
func (e *matchingEngineImpl) unregisterDomainFailoverCallback() {
1480+
e.domainCache.UnregisterDomainChangeCallback(service.Matching)
1481+
}
1482+
1483+
func (e *matchingEngineImpl) disconnectTaskListPollersAfterDomainFailover(taskListName string, domain *cache.DomainCacheEntry, taskType int) {
1484+
taskList, err := tasklist.NewIdentifier(domain.GetInfo().ID, taskListName, taskType)
1485+
if err != nil {
1486+
return
1487+
}
1488+
tlMgr, err := e.getTaskListManager(taskList, types.TaskListKindNormal.Ptr())
1489+
if err != nil {
1490+
e.logger.Error("Couldn't load tasklist manager", tag.Error(err))
1491+
return
1492+
}
1493+
1494+
err = tlMgr.ReleaseBlockedPollers()
1495+
if err != nil {
1496+
e.logger.Error("Couldn't disconnect tasklist pollers after domain failover",
1497+
tag.Error(err),
1498+
tag.WorkflowDomainID(domain.GetInfo().ID),
1499+
tag.WorkflowDomainName(domain.GetInfo().Name),
1500+
tag.WorkflowTaskListName(taskListName),
1501+
tag.WorkflowTaskListType(taskType),
1502+
)
1503+
return
1504+
}
1505+
}
1506+
14281507
func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
14291508
m.Lock()
14301509
defer m.Unlock()
@@ -1451,3 +1530,10 @@ func isMatchingRetryableError(err error) bool {
14511530
}
14521531
return true
14531532
}
1533+
1534+
func isDomainEligibleToDisconnectPollers(domain *cache.DomainCacheEntry, currentVersion int64) bool {
1535+
return domain.IsGlobalDomain() &&
1536+
domain.GetReplicationConfig() != nil &&
1537+
!domain.GetReplicationConfig().IsActiveActive() &&
1538+
domain.GetFailoverNotificationVersion() > currentVersion
1539+
}

service/matching/handler/engine_integration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ func (s *matchingEngineSuite) SetupTest() {
131131
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
132132
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
133133
s.mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes()
134+
s.mockDomainCache.EXPECT().RegisterDomainChangeCallback(service.Matching, gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
135+
s.mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).AnyTimes()
134136
s.mockMembershipResolver = membership.NewMockResolver(s.controller)
135137
s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes()
136138
s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes()

0 commit comments

Comments
 (0)