Skip to content

Refactor history queue factory #6907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions service/history/engine/engineimpl/describe_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"fmt"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/queue"
)
Expand All @@ -33,21 +34,25 @@ func (e *historyEngineImpl) DescribeTransferQueue(
ctx context.Context,
clusterName string,
) (*types.DescribeQueueResponse, error) {
return e.describeQueue(ctx, e.txProcessor, clusterName)
return e.describeQueue(ctx, persistence.HistoryTaskCategoryTransfer, clusterName)
}

func (e *historyEngineImpl) DescribeTimerQueue(
ctx context.Context,
clusterName string,
) (*types.DescribeQueueResponse, error) {
return e.describeQueue(ctx, e.timerProcessor, clusterName)
return e.describeQueue(ctx, persistence.HistoryTaskCategoryTimer, clusterName)
}

func (e *historyEngineImpl) describeQueue(
ctx context.Context,
queueProcessor queue.Processor,
category persistence.HistoryTaskCategory,
clusterName string,
) (*types.DescribeQueueResponse, error) {
queueProcessor, ok := e.queueProcessors[category]
if !ok {
return nil, fmt.Errorf("queue processor not found for category %v", category)
}
resp, err := queueProcessor.HandleAction(ctx, clusterName, queue.NewGetStateAction())
if err != nil {
return nil, err
Expand Down
92 changes: 21 additions & 71 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"fmt"
"time"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/wrappers/retryable"
"github.com/uber/cadence/common"
Expand All @@ -43,10 +41,7 @@ import (
"github.com/uber/cadence/common/metrics"
cndc "github.com/uber/cadence/common/ndc"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/quotas/permember"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/proto"
hcommon "github.com/uber/cadence/service/history/common"
Expand All @@ -61,10 +56,8 @@ import (
"github.com/uber/cadence/service/history/replication"
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflow"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
"github.com/uber/cadence/service/worker/archiver"
)

const (
Expand Down Expand Up @@ -92,8 +85,7 @@ type historyEngineImpl struct {
historyV2Mgr persistence.HistoryManager
executionManager persistence.ExecutionManager
visibilityMgr persistence.VisibilityManager
txProcessor queue.Processor
timerProcessor queue.Processor
queueProcessors map[persistence.HistoryTaskCategory]queue.Processor
nDCReplicator ndc.HistoryReplicator
nDCActivityReplicator ndc.ActivityReplicator
historyEventNotifier events.Notifier
Expand All @@ -104,22 +96,19 @@ type historyEngineImpl struct {
throttledLogger log.Logger
activeClusterManager activecluster.Manager
config *config.Config
archivalClient warchiver.Client
archivalClient archiver.Client
workflowResetter reset.WorkflowResetter
queueTaskProcessor task.Processor
replicationTaskProcessors []replication.TaskProcessor
replicationAckManager replication.TaskAckManager
replicationTaskStore *replication.TaskStore
replicationHydrator replication.TaskHydrator
replicationMetricsEmitter *replication.MetricsEmitterImpl
publicClient workflowserviceclient.Interface
eventsReapplier ndc.EventsReapplier
matchingClient matching.Client
rawMatchingClient matching.Client
clientChecker client.VersionChecker
replicationDLQHandler replication.DLQHandler
failoverMarkerNotifier failover.MarkerNotifier
wfIDCache workflowcache.WFCache

updateWithActionFn func(
context.Context,
Expand Down Expand Up @@ -149,15 +138,12 @@ func NewEngineWithShardContext(
shard shard.Context,
visibilityMgr persistence.VisibilityManager,
matching matching.Client,
publicClient workflowserviceclient.Interface,
historyEventNotifier events.Notifier,
config *config.Config,
replicationTaskFetchers replication.TaskFetchers,
rawMatchingClient matching.Client,
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
queueProcessorFactory queue.ProcessorFactory,
queueFactories []queue.Factory,
) engine.Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()

Expand Down Expand Up @@ -194,40 +180,13 @@ func NewEngineWithShardContext(
metricsClient: shard.GetMetricsClient(),
historyEventNotifier: historyEventNotifier,
config: config,
archivalClient: warchiver.NewClient(
shard.GetMetricsClient(),
logger,
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()),
quotas.NewDynamicRateLimiter(func() float64 {
return permember.PerMember(
service.History,
float64(config.ArchiveInlineHistoryGlobalRPS()),
float64(config.ArchiveInlineHistoryRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
quotas.NewDynamicRateLimiter(func() float64 {
return permember.PerMember(
service.History,
float64(config.ArchiveInlineVisibilityGlobalRPS()),
float64(config.ArchiveInlineVisibilityRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
shard.GetService().GetArchiverProvider(),
config.AllowArchivingIncompleteHistory,
),
workflowResetter: reset.NewWorkflowResetter(
shard,
executionCache,
logger,
),
publicClient: publicClient,
matchingClient: matching,
rawMatchingClient: rawMatchingClient,
queueTaskProcessor: queueTaskProcessor,
clientChecker: client.NewVersionChecker(),
failoverMarkerNotifier: failoverMarkerNotifier,
replicationHydrator: replicationHydrator,
Expand All @@ -246,8 +205,8 @@ func NewEngineWithShardContext(
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
wfIDCache: wfIDCache,
updateWithActionFn: workflow.UpdateWithAction,
queueProcessors: make(map[persistence.HistoryTaskCategory]queue.Processor),
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand All @@ -261,25 +220,13 @@ func NewEngineWithShardContext(
)
openExecutionCheck := invariant.NewConcreteExecutionExists(pRetry, shard.GetDomainCache())

historyEngImpl.txProcessor = queueProcessorFactory.NewTransferQueueProcessor(
shard,
historyEngImpl,
queueTaskProcessor,
executionCache,
historyEngImpl.workflowResetter,
historyEngImpl.archivalClient,
openExecutionCheck,
historyEngImpl.wfIDCache,
)

historyEngImpl.timerProcessor = queueProcessorFactory.NewTimerQueueProcessor(
shard,
historyEngImpl,
queueTaskProcessor,
executionCache,
historyEngImpl.archivalClient,
openExecutionCheck,
)
for _, factory := range queueFactories {
historyEngImpl.queueProcessors[factory.Category()] = factory.CreateQueue(
shard,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shard.SetEngine() is called later in this function but the queue factory is calling shard.GetEngine(). Let's make sure the dependencies are clear and future proof.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue processors are depending on shard context and history engine. History engine is depending on shard context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is not clear is the dependency between history engine and shard context, but it's unrelated to this PR.

executionCache,
openExecutionCheck,
)
}

historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger)

Expand Down Expand Up @@ -360,8 +307,9 @@ func (e *historyEngineImpl) Start() {
e.logger.Info("History engine state changed", tag.LifeCycleStarting)
defer e.logger.Info("History engine state changed", tag.LifeCycleStarted)

e.txProcessor.Start()
e.timerProcessor.Start()
for _, processor := range e.queueProcessors {
processor.Start()
}
e.replicationDLQHandler.Start()
e.replicationMetricsEmitter.Start()

Expand All @@ -386,8 +334,9 @@ func (e *historyEngineImpl) Stop() {
e.logger.Info("History engine state changed", tag.LifeCycleStopping)
defer e.logger.Info("History engine state changed", tag.LifeCycleStopped)

e.txProcessor.Stop()
e.timerProcessor.Stop()
for _, processor := range e.queueProcessors {
processor.Stop()
}
e.replicationDLQHandler.Stop()
e.replicationMetricsEmitter.Stop()

Expand Down Expand Up @@ -420,8 +369,9 @@ func (e *historyEngineImpl) SyncShardStatus(ctx context.Context, request *types.
// 2. notify the timer gate in the timer queue standby processor
// 3. notify the transfer (essentially a no op, just put it here so it looks symmetric)
e.shard.SetCurrentTime(clusterName, now)
e.txProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
e.timerProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
for _, processor := range e.queueProcessors {
processor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}})
}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions service/history/engine/engineimpl/history_engine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func (s *engine2Suite) SetupTest() {
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
historyEventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewClient(tally.NoopScope, metrics.History), func(string) int { return 0 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
queueProcessors: map[p.HistoryTaskCategory]queue.Processor{
p.HistoryTaskCategoryTransfer: s.mockTxProcessor,
p.HistoryTaskCategoryTimer: s.mockTimerProcessor,
},
}
s.mockShard.SetEngine(h)
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ func (s *engine3Suite) SetupTest() {
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
historyEventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewClient(tally.NoopScope, metrics.History), func(string) int { return 0 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
queueProcessors: map[p.HistoryTaskCategory]queue.Processor{
p.HistoryTaskCategoryTransfer: s.mockTxProcessor,
p.HistoryTaskCategoryTimer: s.mockTimerProcessor,
},
}
s.mockShard.SetEngine(h)
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)
Expand Down
12 changes: 7 additions & 5 deletions service/history/engine/engineimpl/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,13 @@ func (s *engineSuite) SetupTest() {
tokenSerializer: common.NewJSONTaskTokenSerializer(),
historyEventNotifier: historyEventNotifier,
config: config.NewForTest(),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
clientChecker: cc.NewVersionChecker(),
eventsReapplier: s.mockEventsReapplier,
workflowResetter: s.mockWorkflowResetter,
queueProcessors: map[persistence.HistoryTaskCategory]queue.Processor{
persistence.HistoryTaskCategoryTransfer: s.mockTxProcessor,
persistence.HistoryTaskCategoryTimer: s.mockTimerProcessor,
},
clientChecker: cc.NewVersionChecker(),
eventsReapplier: s.mockEventsReapplier,
workflowResetter: s.mockWorkflowResetter,
}
s.mockShard.SetEngine(h)
h.decisionHandler = decision.NewHandler(s.mockShard, h.executionCache, h.tokenSerializer)
Expand Down
14 changes: 12 additions & 2 deletions service/history/engine/engineimpl/notify_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func (e *historyEngineImpl) NotifyNewTransferTasks(info *hcommon.NotifyTaskInfo)
task := info.Tasks[0]
clusterName, err := e.shard.GetActiveClusterManager().ClusterNameForFailoverVersion(task.GetVersion(), info.ExecutionInfo.DomainID)
if err == nil {
e.txProcessor.NotifyNewTask(clusterName, info)
transferProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTransfer]
if !ok {
e.logger.Error("transfer processor not found", tag.Error(err))
return
}
transferProcessor.NotifyNewTask(clusterName, info)
}
}

Expand All @@ -58,7 +63,12 @@ func (e *historyEngineImpl) NotifyNewTimerTasks(info *hcommon.NotifyTaskInfo) {
task := info.Tasks[0]
clusterName, err := e.shard.GetActiveClusterManager().ClusterNameForFailoverVersion(task.GetVersion(), info.ExecutionInfo.DomainID)
if err == nil {
e.timerProcessor.NotifyNewTask(clusterName, info)
timerProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTimer]
if !ok {
e.logger.Error("timer processor not found", tag.Error(err))
return
}
timerProcessor.NotifyNewTask(clusterName, info)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ func (e *historyEngineImpl) domainChangeCB(nextDomains []*cache.DomainCacheEntry
e.logger.Info("Domain Failover Start.", tag.WorkflowDomainIDs(failoverDomainIDs))

// Failover queues are not created for active-active domains. Will revisit after new queue framework implementation.
e.txProcessor.FailoverDomain(failoverDomainIDs)
e.timerProcessor.FailoverDomain(failoverDomainIDs)
for _, processor := range e.queueProcessors {
processor.FailoverDomain(failoverDomainIDs)
}

e.notifyQueues()
}
Expand Down Expand Up @@ -159,8 +160,18 @@ func (e *historyEngineImpl) notifyQueues() {
// its length > 0 and has correct timestamp, to trigger a db scan
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}}
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{TaskData: persistence.TaskData{VisibilityTimestamp: now}}}
e.txProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTask})
e.timerProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTimeoutTask})
transferProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTransfer]
if !ok {
e.logger.Error("transfer processor not found")
return
}
transferProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTask})
timerProcessor, ok := e.queueProcessors[persistence.HistoryTaskCategoryTimer]
if !ok {
e.logger.Error("timer processor not found")
return
}
timerProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTimeoutTask})
}

func (e *historyEngineImpl) generateGracefulFailoverTasksForDomainUpdateCallback(shardNotificationVersion int64, nextDomains []*cache.DomainCacheEntry) []*persistence.FailoverMarkerTask {
Expand Down Expand Up @@ -204,14 +215,16 @@ func (e *historyEngineImpl) generateGracefulFailoverTasksForDomainUpdateCallback

func (e *historyEngineImpl) lockTaskProcessingForDomainUpdate() {
e.logger.Debug("Locking processing for domain update")
e.txProcessor.LockTaskProcessing()
e.timerProcessor.LockTaskProcessing()
for _, processor := range e.queueProcessors {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in the scope of this PR but let's see if we really need to lock task processing for these domain change callbacks.

processor.LockTaskProcessing()
}
}

func (e *historyEngineImpl) unlockProcessingForDomainUpdate() {
e.logger.Debug("Unlocking processing for failover")
e.txProcessor.UnlockTaskProcessing()
e.timerProcessor.UnlockTaskProcessing()
for _, processor := range e.queueProcessors {
processor.UnlockTaskProcessing()
}
}

func (e *historyEngineImpl) failoverPredicate(shardNotificationVersion int64, nextDomain *cache.DomainCacheEntry, action func()) {
Expand Down
Loading