Skip to content

Commit 46089df

Browse files
committed
Refactor history queue factory
1 parent dc2ad89 commit 46089df

22 files changed

+272
-231
lines changed

service/history/engine/engineimpl/describe_queues.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"context"
2626
"fmt"
2727

28+
"github.com/uber/cadence/common/persistence"
2829
"github.com/uber/cadence/common/types"
2930
"github.com/uber/cadence/service/history/queue"
3031
)
@@ -33,21 +34,25 @@ func (e *historyEngineImpl) DescribeTransferQueue(
3334
ctx context.Context,
3435
clusterName string,
3536
) (*types.DescribeQueueResponse, error) {
36-
return e.describeQueue(ctx, e.txProcessor, clusterName)
37+
return e.describeQueue(ctx, persistence.HistoryTaskCategoryTransfer, clusterName)
3738
}
3839

3940
func (e *historyEngineImpl) DescribeTimerQueue(
4041
ctx context.Context,
4142
clusterName string,
4243
) (*types.DescribeQueueResponse, error) {
43-
return e.describeQueue(ctx, e.timerProcessor, clusterName)
44+
return e.describeQueue(ctx, persistence.HistoryTaskCategoryTimer, clusterName)
4445
}
4546

4647
func (e *historyEngineImpl) describeQueue(
4748
ctx context.Context,
48-
queueProcessor queue.Processor,
49+
category persistence.HistoryTaskCategory,
4950
clusterName string,
5051
) (*types.DescribeQueueResponse, error) {
52+
queueProcessor, ok := e.queueProcessors[category]
53+
if !ok {
54+
return nil, fmt.Errorf("queue processor not found for category %v", category)
55+
}
5156
resp, err := queueProcessor.HandleAction(ctx, clusterName, queue.NewGetStateAction())
5257
if err != nil {
5358
return nil, err

service/history/engine/engineimpl/history_engine.go

+21-71
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import (
2727
"fmt"
2828
"time"
2929

30-
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
31-
3230
"github.com/uber/cadence/client/matching"
3331
"github.com/uber/cadence/client/wrappers/retryable"
3432
"github.com/uber/cadence/common"
@@ -43,10 +41,7 @@ import (
4341
"github.com/uber/cadence/common/metrics"
4442
cndc "github.com/uber/cadence/common/ndc"
4543
"github.com/uber/cadence/common/persistence"
46-
"github.com/uber/cadence/common/quotas"
47-
"github.com/uber/cadence/common/quotas/permember"
4844
"github.com/uber/cadence/common/reconciliation/invariant"
49-
"github.com/uber/cadence/common/service"
5045
"github.com/uber/cadence/common/types"
5146
"github.com/uber/cadence/common/types/mapper/proto"
5247
hcommon "github.com/uber/cadence/service/history/common"
@@ -61,10 +56,8 @@ import (
6156
"github.com/uber/cadence/service/history/replication"
6257
"github.com/uber/cadence/service/history/reset"
6358
"github.com/uber/cadence/service/history/shard"
64-
"github.com/uber/cadence/service/history/task"
6559
"github.com/uber/cadence/service/history/workflow"
66-
"github.com/uber/cadence/service/history/workflowcache"
67-
warchiver "github.com/uber/cadence/service/worker/archiver"
60+
"github.com/uber/cadence/service/worker/archiver"
6861
)
6962

7063
const (
@@ -92,8 +85,7 @@ type historyEngineImpl struct {
9285
historyV2Mgr persistence.HistoryManager
9386
executionManager persistence.ExecutionManager
9487
visibilityMgr persistence.VisibilityManager
95-
txProcessor queue.Processor
96-
timerProcessor queue.Processor
88+
queueProcessors map[persistence.HistoryTaskCategory]queue.Processor
9789
nDCReplicator ndc.HistoryReplicator
9890
nDCActivityReplicator ndc.ActivityReplicator
9991
historyEventNotifier events.Notifier
@@ -104,22 +96,19 @@ type historyEngineImpl struct {
10496
throttledLogger log.Logger
10597
activeClusterManager activecluster.Manager
10698
config *config.Config
107-
archivalClient warchiver.Client
99+
archivalClient archiver.Client
108100
workflowResetter reset.WorkflowResetter
109-
queueTaskProcessor task.Processor
110101
replicationTaskProcessors []replication.TaskProcessor
111102
replicationAckManager replication.TaskAckManager
112103
replicationTaskStore *replication.TaskStore
113104
replicationHydrator replication.TaskHydrator
114105
replicationMetricsEmitter *replication.MetricsEmitterImpl
115-
publicClient workflowserviceclient.Interface
116106
eventsReapplier ndc.EventsReapplier
117107
matchingClient matching.Client
118108
rawMatchingClient matching.Client
119109
clientChecker client.VersionChecker
120110
replicationDLQHandler replication.DLQHandler
121111
failoverMarkerNotifier failover.MarkerNotifier
122-
wfIDCache workflowcache.WFCache
123112

124113
updateWithActionFn func(
125114
context.Context,
@@ -149,15 +138,12 @@ func NewEngineWithShardContext(
149138
shard shard.Context,
150139
visibilityMgr persistence.VisibilityManager,
151140
matching matching.Client,
152-
publicClient workflowserviceclient.Interface,
153141
historyEventNotifier events.Notifier,
154142
config *config.Config,
155143
replicationTaskFetchers replication.TaskFetchers,
156144
rawMatchingClient matching.Client,
157-
queueTaskProcessor task.Processor,
158145
failoverCoordinator failover.Coordinator,
159-
wfIDCache workflowcache.WFCache,
160-
queueProcessorFactory queue.ProcessorFactory,
146+
queueFactories []queue.Factory,
161147
) engine.Engine {
162148
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
163149

@@ -194,40 +180,13 @@ func NewEngineWithShardContext(
194180
metricsClient: shard.GetMetricsClient(),
195181
historyEventNotifier: historyEventNotifier,
196182
config: config,
197-
archivalClient: warchiver.NewClient(
198-
shard.GetMetricsClient(),
199-
logger,
200-
publicClient,
201-
shard.GetConfig().NumArchiveSystemWorkflows,
202-
quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()),
203-
quotas.NewDynamicRateLimiter(func() float64 {
204-
return permember.PerMember(
205-
service.History,
206-
float64(config.ArchiveInlineHistoryGlobalRPS()),
207-
float64(config.ArchiveInlineHistoryRPS()),
208-
shard.GetService().GetMembershipResolver(),
209-
)
210-
}),
211-
quotas.NewDynamicRateLimiter(func() float64 {
212-
return permember.PerMember(
213-
service.History,
214-
float64(config.ArchiveInlineVisibilityGlobalRPS()),
215-
float64(config.ArchiveInlineVisibilityRPS()),
216-
shard.GetService().GetMembershipResolver(),
217-
)
218-
}),
219-
shard.GetService().GetArchiverProvider(),
220-
config.AllowArchivingIncompleteHistory,
221-
),
222183
workflowResetter: reset.NewWorkflowResetter(
223184
shard,
224185
executionCache,
225186
logger,
226187
),
227-
publicClient: publicClient,
228188
matchingClient: matching,
229189
rawMatchingClient: rawMatchingClient,
230-
queueTaskProcessor: queueTaskProcessor,
231190
clientChecker: client.NewVersionChecker(),
232191
failoverMarkerNotifier: failoverMarkerNotifier,
233192
replicationHydrator: replicationHydrator,
@@ -246,8 +205,8 @@ func NewEngineWithShardContext(
246205
replicationTaskStore: replicationTaskStore,
247206
replicationMetricsEmitter: replication.NewMetricsEmitter(
248207
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
249-
wfIDCache: wfIDCache,
250208
updateWithActionFn: workflow.UpdateWithAction,
209+
queueProcessors: make(map[persistence.HistoryTaskCategory]queue.Processor),
251210
}
252211
historyEngImpl.decisionHandler = decision.NewHandler(
253212
shard,
@@ -261,25 +220,13 @@ func NewEngineWithShardContext(
261220
)
262221
openExecutionCheck := invariant.NewConcreteExecutionExists(pRetry, shard.GetDomainCache())
263222

264-
historyEngImpl.txProcessor = queueProcessorFactory.NewTransferQueueProcessor(
265-
shard,
266-
historyEngImpl,
267-
queueTaskProcessor,
268-
executionCache,
269-
historyEngImpl.workflowResetter,
270-
historyEngImpl.archivalClient,
271-
openExecutionCheck,
272-
historyEngImpl.wfIDCache,
273-
)
274-
275-
historyEngImpl.timerProcessor = queueProcessorFactory.NewTimerQueueProcessor(
276-
shard,
277-
historyEngImpl,
278-
queueTaskProcessor,
279-
executionCache,
280-
historyEngImpl.archivalClient,
281-
openExecutionCheck,
282-
)
223+
for _, factory := range queueFactories {
224+
historyEngImpl.queueProcessors[factory.Category()] = factory.CreateQueue(
225+
shard,
226+
executionCache,
227+
openExecutionCheck,
228+
)
229+
}
283230

284231
historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger)
285232

@@ -360,8 +307,9 @@ func (e *historyEngineImpl) Start() {
360307
e.logger.Info("History engine state changed", tag.LifeCycleStarting)
361308
defer e.logger.Info("History engine state changed", tag.LifeCycleStarted)
362309

363-
e.txProcessor.Start()
364-
e.timerProcessor.Start()
310+
for _, processor := range e.queueProcessors {
311+
processor.Start()
312+
}
365313
e.replicationDLQHandler.Start()
366314
e.replicationMetricsEmitter.Start()
367315

@@ -386,8 +334,9 @@ func (e *historyEngineImpl) Stop() {
386334
e.logger.Info("History engine state changed", tag.LifeCycleStopping)
387335
defer e.logger.Info("History engine state changed", tag.LifeCycleStopped)
388336

389-
e.txProcessor.Stop()
390-
e.timerProcessor.Stop()
337+
for _, processor := range e.queueProcessors {
338+
processor.Stop()
339+
}
391340
e.replicationDLQHandler.Stop()
392341
e.replicationMetricsEmitter.Stop()
393342

@@ -420,8 +369,9 @@ func (e *historyEngineImpl) SyncShardStatus(ctx context.Context, request *types.
420369
// 2. notify the timer gate in the timer queue standby processor
421370
// 3. notify the transfer (essentially a no op, just put it here so it looks symmetric)
422371
e.shard.SetCurrentTime(clusterName, now)
423-
e.txProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
424-
e.timerProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
372+
for _, processor := range e.queueProcessors {
373+
processor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
374+
}
425375
return nil
426376
}
427377

service/history/engine/engineimpl/history_engine2_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ func (s *engine2Suite) SetupTest() {
150150
config: s.config,
151151
timeSource: s.mockShard.GetTimeSource(),
152152
historyEventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewClient(tally.NoopScope, metrics.History), func(string) int { return 0 }),
153-
txProcessor: s.mockTxProcessor,
154-
timerProcessor: s.mockTimerProcessor,
153+
queueProcessors: map[p.HistoryTaskCategory]queue.Processor{
154+
p.HistoryTaskCategoryTransfer: s.mockTxProcessor,
155+
p.HistoryTaskCategoryTimer: s.mockTimerProcessor,
156+
},
155157
}
156158
s.mockShard.SetEngine(h)
157159
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)

service/history/engine/engineimpl/history_engine3_eventsv2_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,10 @@ func (s *engine3Suite) SetupTest() {
133133
config: s.config,
134134
timeSource: s.mockShard.GetTimeSource(),
135135
historyEventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewClient(tally.NoopScope, metrics.History), func(string) int { return 0 }),
136-
txProcessor: s.mockTxProcessor,
137-
timerProcessor: s.mockTimerProcessor,
136+
queueProcessors: map[p.HistoryTaskCategory]queue.Processor{
137+
p.HistoryTaskCategoryTransfer: s.mockTxProcessor,
138+
p.HistoryTaskCategoryTimer: s.mockTimerProcessor,
139+
},
138140
}
139141
s.mockShard.SetEngine(h)
140142
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)

service/history/engine/engineimpl/history_engine_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,13 @@ func (s *engineSuite) SetupTest() {
173173
tokenSerializer: common.NewJSONTaskTokenSerializer(),
174174
historyEventNotifier: historyEventNotifier,
175175
config: config.NewForTest(),
176-
txProcessor: s.mockTxProcessor,
177-
timerProcessor: s.mockTimerProcessor,
178-
clientChecker: cc.NewVersionChecker(),
179-
eventsReapplier: s.mockEventsReapplier,
180-
workflowResetter: s.mockWorkflowResetter,
176+
queueProcessors: map[persistence.HistoryTaskCategory]queue.Processor{
177+
persistence.HistoryTaskCategoryTransfer: s.mockTxProcessor,
178+
persistence.HistoryTaskCategoryTimer: s.mockTimerProcessor,
179+
},
180+
clientChecker: cc.NewVersionChecker(),
181+
eventsReapplier: s.mockEventsReapplier,
182+
workflowResetter: s.mockWorkflowResetter,
181183
}
182184
s.mockShard.SetEngine(h)
183185
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)

service/history/engine/engineimpl/notify_tasks.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ func (e *historyEngineImpl) NotifyNewTransferTasks(info *hcommon.NotifyTaskInfo)
4646
task := info.Tasks[0]
4747
clusterName, err := e.shard.GetActiveClusterManager().ClusterNameForFailoverVersion(task.GetVersion(), info.ExecutionInfo.DomainID)
4848
if err == nil {
49-
e.txProcessor.NotifyNewTask(clusterName, info)
49+
transferProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTransfer]
50+
if !ok {
51+
e.logger.Error("transfer processor not found", tag.Error(err))
52+
return
53+
}
54+
transferProcessor.NotifyNewTask(clusterName, info)
5055
}
5156
}
5257

@@ -58,7 +63,12 @@ func (e *historyEngineImpl) NotifyNewTimerTasks(info *hcommon.NotifyTaskInfo) {
5863
task := info.Tasks[0]
5964
clusterName, err := e.shard.GetActiveClusterManager().ClusterNameForFailoverVersion(task.GetVersion(), info.ExecutionInfo.DomainID)
6065
if err == nil {
61-
e.timerProcessor.NotifyNewTask(clusterName, info)
66+
timerProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTimer]
67+
if !ok {
68+
e.logger.Error("timer processor not found", tag.Error(err))
69+
return
70+
}
71+
timerProcessor.NotifyNewTask(clusterName, info)
6272
}
6373
}
6474

service/history/engine/engineimpl/register_domain_failover_callback.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ func (e *historyEngineImpl) domainChangeCB(nextDomains []*cache.DomainCacheEntry
127127
e.logger.Info("Domain Failover Start.", tag.WorkflowDomainIDs(failoverDomainIDs))
128128

129129
// Failover queues are not created for active-active domains. Will revisit after new queue framework implementation.
130-
e.txProcessor.FailoverDomain(failoverDomainIDs)
131-
e.timerProcessor.FailoverDomain(failoverDomainIDs)
130+
for _, processor := range e.queueProcessors {
131+
processor.FailoverDomain(failoverDomainIDs)
132+
}
132133

133134
e.notifyQueues()
134135
}
@@ -159,8 +160,18 @@ func (e *historyEngineImpl) notifyQueues() {
159160
// its length > 0 and has correct timestamp, to trigger a db scan
160161
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}}
161162
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{TaskData: persistence.TaskData{VisibilityTimestamp: now}}}
162-
e.txProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTask})
163-
e.timerProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTimeoutTask})
163+
transferProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTransfer]
164+
if !ok {
165+
e.logger.Error("transfer processor not found")
166+
return
167+
}
168+
transferProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTask})
169+
timerProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTimer]
170+
if !ok {
171+
e.logger.Error("timer processor not found")
172+
return
173+
}
174+
timerProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTimeoutTask})
164175
}
165176

166177
func (e *historyEngineImpl) generateGracefulFailoverTasksForDomainUpdateCallback(shardNotificationVersion int64, nextDomains []*cache.DomainCacheEntry) []*persistence.FailoverMarkerTask {
@@ -204,14 +215,16 @@ func (e *historyEngineImpl) generateGracefulFailoverTasksForDomainUpdateCallback
204215

205216
func (e *historyEngineImpl) lockTaskProcessingForDomainUpdate() {
206217
e.logger.Debug("Locking processing for domain update")
207-
e.txProcessor.LockTaskProcessing()
208-
e.timerProcessor.LockTaskProcessing()
218+
for _, processor := range e.queueProcessors {
219+
processor.LockTaskProcessing()
220+
}
209221
}
210222

211223
func (e *historyEngineImpl) unlockProcessingForDomainUpdate() {
212224
e.logger.Debug("Unlocking processing for failover")
213-
e.txProcessor.UnlockTaskProcessing()
214-
e.timerProcessor.UnlockTaskProcessing()
225+
for _, processor := range e.queueProcessors {
226+
processor.UnlockTaskProcessing()
227+
}
215228
}
216229

217230
func (e *historyEngineImpl) failoverPredicate(shardNotificationVersion int64, nextDomain *cache.DomainCacheEntry, action func()) {

0 commit comments

Comments
 (0)