Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (ntp *nexusTaskPoller) Cleanup() error {
}

// PollTask polls a new task
func (ntp *nexusTaskPoller) PollTask() (taskForWorker, error) {
func (ntp *nexusTaskPoller) PollTask(_ *pollHint) (taskForWorker, error) {
return ntp.doPoll(ntp.poll)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil, stopCh)
go func() {
for {
task, _ := laTaskPoller.PollTask()
task, _ := laTaskPoller.PollTask(nil)
_ = laTaskPoller.ProcessTask(task)
// Quit after we've polled enough times
if laFailures.Load() == 4 {
Expand Down Expand Up @@ -1915,7 +1915,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_WorkflowTaskHeartbeatFail
doneCh := make(chan struct{})
go func() {
// laTaskPoller needs to poll the local activity and process it
task, err := laTaskPoller.PollTask()
task, err := laTaskPoller.PollTask(nil)
t.NoError(err)
err = laTaskPoller.ProcessTask(task)
t.NoError(err)
Expand Down
98 changes: 77 additions & 21 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ const (
Sticky
)

// pollHint carries a pre-decided sticky/normal decision from runPoller to PollTask.
// When non-nil, the workflow task poller uses the hint instead of re-deciding.
type pollHint struct {
isSticky bool
}

type (
// taskPoller interface to poll for tasks
taskPoller interface {
// PollTask polls for one new task
PollTask() (taskForWorker, error)
// PollTask polls for one new task. The hint, when non-nil, carries a
// pre-decided sticky/normal choice for workflow task pollers. Non-workflow
// pollers should ignore it.
PollTask(hint *pollHint) (taskForWorker, error)
// Called when the poller will no longer be polled. Presently only useful for
// workflow workers.
Cleanup() error
Expand Down Expand Up @@ -373,10 +381,13 @@ func (wtp *workflowTaskPoller) Cleanup() error {
return err
}

// PollTask polls a new task
func (wtp *workflowTaskPoller) PollTask() (taskForWorker, error) {
// PollTask polls a new task. If hint is non-nil, the pre-decided sticky/normal
// choice is used instead of re-deciding in getNextPollRequest.
func (wtp *workflowTaskPoller) PollTask(hint *pollHint) (taskForWorker, error) {
// Get the task.
workflowTask, err := wtp.doPoll(wtp.poll)
workflowTask, err := wtp.doPoll(func(ctx context.Context) (taskForWorker, error) {
return wtp.poll(ctx, hint)
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -741,7 +752,7 @@ func (latp *localActivityTaskPoller) Cleanup() error {
return nil
}

func (latp *localActivityTaskPoller) PollTask() (taskForWorker, error) {
func (latp *localActivityTaskPoller) PollTask(_ *pollHint) (taskForWorker, error) {
return latp.laTunnel.getTask(), nil
}

Expand Down Expand Up @@ -915,16 +926,50 @@ func (wtp *workflowTaskPoller) updateBacklog(taskQueueKind enumspb.TaskQueueKind
wtp.requestLock.Unlock()
}

// decideNextPollKind commits to a sticky/normal decision for the next poll.
// It increments the appropriate pending counter and returns whether the poll
// should target the sticky queue. Only meaningful in Mixed mode; for Sticky and
// NonSticky modes the answer is deterministic and no counter is changed.
func (wtp *workflowTaskPoller) decideNextPollKind() (isSticky bool) {
if wtp.mode != Mixed || wtp.stickyCacheSize <= 0 {
return wtp.mode == Sticky
}
wtp.requestLock.Lock()
defer wtp.requestLock.Unlock()
if wtp.stickyBacklog > 0 || wtp.pendingStickyPollCount <= wtp.pendingRegularPollCount {
wtp.pendingStickyPollCount++
return true
}
wtp.pendingRegularPollCount++
return false
}

// undoPollDecision reverses the counter increment made by decideNextPollKind.
// Call this when a pre-decided poll will not happen (e.g. slot reservation
// was cancelled or failed).
func (wtp *workflowTaskPoller) undoPollDecision(isSticky bool) {
if wtp.mode != Mixed || wtp.stickyCacheSize <= 0 {
return
}
if isSticky {
wtp.release(enumspb.TASK_QUEUE_KIND_STICKY)
} else {
wtp.release(enumspb.TASK_QUEUE_KIND_NORMAL)
}
}

// getNextPollRequest returns appropriate next poll request based on poller configuration and mode.
// Simple rules:
// 1. if mode is NonSticky, always poll from regular task queue
// 2. if mode is Sticky, always poll from sticky task queue
// 3. if mode is Mixed
// 3.1. if sticky execution is disabled, always poll for regular task queue
// 3.2. otherwise:
// 3.2.1) if sticky task queue has backlog, always prefer to process sticky task first
// 3.2.2) poll from the task queue that has less pending requests (prefer sticky when they are the same).
func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.PollWorkflowTaskQueueRequest) {
// 3.2. otherwise, if a hint is provided, use the pre-decided kind (counter
// was already incremented by decideNextPollKind)
// 3.3. otherwise (no hint / nil hint):
// 3.3.1) if sticky task queue has backlog, always prefer to process sticky task first
// 3.3.2) poll from the task queue that has less pending requests (prefer sticky when they are the same).
func (wtp *workflowTaskPoller) getNextPollRequest(hint *pollHint) (request *workflowservice.PollWorkflowTaskQueueRequest) {
taskQueue := &taskqueuepb.TaskQueue{
Name: wtp.taskQueueName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
Expand All @@ -937,16 +982,27 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
taskQueue.NormalName = wtp.taskQueueName
} else if wtp.mode == Mixed {
wtp.requestLock.Lock()
if wtp.stickyBacklog > 0 || wtp.pendingStickyPollCount <= wtp.pendingRegularPollCount {
wtp.pendingStickyPollCount++
taskQueue.Name = getWorkerTaskQueue(wtp.stickyUUID)
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
taskQueue.NormalName = wtp.taskQueueName
if hint != nil {
// Use the pre-decided kind; counter was already incremented
// by decideNextPollKind.
if hint.isSticky {
taskQueue.Name = getWorkerTaskQueue(wtp.stickyUUID)
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
taskQueue.NormalName = wtp.taskQueueName
}
} else {
wtp.pendingRegularPollCount++
// Fallback: decide inline (original behavior).
wtp.requestLock.Lock()
if wtp.stickyBacklog > 0 || wtp.pendingStickyPollCount <= wtp.pendingRegularPollCount {
wtp.pendingStickyPollCount++
taskQueue.Name = getWorkerTaskQueue(wtp.stickyUUID)
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
taskQueue.NormalName = wtp.taskQueueName
} else {
wtp.pendingRegularPollCount++
}
wtp.requestLock.Unlock()
}
wtp.requestLock.Unlock()
} else {
panic("unknown workflow task poller mode")
}
Expand Down Expand Up @@ -987,12 +1043,12 @@ func (wtp *workflowTaskPoller) pollWorkflowTaskQueue(ctx context.Context, reques
}

// Poll for a single workflow task from the service
func (wtp *workflowTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
func (wtp *workflowTaskPoller) poll(ctx context.Context, hint *pollHint) (taskForWorker, error) {
traceLog(func() {
wtp.logger.Debug("workflowTaskPoller::Poll")
})

request := wtp.getNextPollRequest()
request := wtp.getNextPollRequest(hint)
defer wtp.release(request.TaskQueue.GetKind())

response, err := wtp.pollWorkflowTaskQueue(ctx, request)
Expand Down Expand Up @@ -1221,7 +1277,7 @@ func (atp *activityTaskPoller) Cleanup() error {
}

// PollTask polls a new task
func (atp *activityTaskPoller) PollTask() (taskForWorker, error) {
func (atp *activityTaskPoller) PollTask(_ *pollHint) (taskForWorker, error) {
// Get the task.
activityTask, err := atp.doPoll(atp.poll)
if err != nil {
Expand Down
27 changes: 23 additions & 4 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,26 @@ func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) {
}
}

// Pre-decide sticky/normal for workflow task pollers so that
// IsSticky() is accurate at slot reservation time.
data := bw.options.slotReservationData
var hint *pollHint
wtp, isWorkflowPoller := taskWorker.taskPoller.(*workflowTaskPoller)
if isWorkflowPoller {
isSticky := wtp.decideNextPollKind()
data.isSticky = isSticky
hint = &pollHint{isSticky: isSticky}
}

bw.stopWG.Add(1)
go func() {
defer bw.stopWG.Done()
s, err := bw.slotSupplier.ReserveSlot(ctx, &bw.options.slotReservationData)
s, err := bw.slotSupplier.ReserveSlot(ctx, &data)
if err != nil {
// Undo pre-decided counter since the poll will not happen.
if isWorkflowPoller {
wtp.undoPollDecision(hint.isSticky)
}
if !errors.Is(err, context.Canceled) {
bw.logger.Error("Error while trying to reserve slot", "error", err)
select {
Expand All @@ -460,6 +475,10 @@ func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) {
select {
case reserveChan <- s:
case <-ctx.Done():
// Worker stopped: undo pre-decided counter and release the slot.
if isWorkflowPoller {
wtp.undoPollDecision(hint.isSticky)
}
bw.releaseSlot(s, SlotReleaseReasonUnused)
}
}()
Expand All @@ -481,7 +500,7 @@ func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) {
if bw.pollerBalancer != nil {
bw.pollerBalancer.incrementPoller(taskWorker.taskPollerType)
}
bw.pollTask(taskWorker, permit)
bw.pollTask(taskWorker, permit, hint)
if bw.pollerBalancer != nil {
bw.pollerBalancer.decrementPoller(taskWorker.taskPollerType)
}
Expand Down Expand Up @@ -588,7 +607,7 @@ func (bw *baseWorker) runEagerTaskDispatcher() {
}
}

func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPermit) {
func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPermit, hint *pollHint) {
var err error
var task taskForWorker
didSendTask := false
Expand All @@ -600,7 +619,7 @@ func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPe

bw.retrier.Throttle(bw.stopCh)
if bw.pollLimiter == nil || bw.pollLimiter.Wait(bw.limiterContext) == nil {
task, err = taskWorker.taskPoller.PollTask()
task, err = taskWorker.taskPoller.PollTask(hint)
bw.logPollTaskError(err)
if err != nil {
// We retry "non retriable" errors while long polling for a while, because some proxies return
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func newSemaphoreProbeTaskPoller() *semaphoreProbeTaskPoller {
}

// PollTask implements taskPoller and blocks until a signal is provided so the semaphore permits stay acquired.
func (p *semaphoreProbeTaskPoller) PollTask() (taskForWorker, error) {
func (p *semaphoreProbeTaskPoller) PollTask(_ *pollHint) (taskForWorker, error) {
_, ok := <-p.signals
if !ok {
return nil, nil
Expand Down
12 changes: 12 additions & 0 deletions internal/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type SlotReservationInfo interface {
// MetricsHandler returns an appropriately tagged metrics handler that can be used to record
// custom metrics.
MetricsHandler() metrics.Handler
// IsSticky returns true if the slot being reserved will be used to poll a sticky task queue.
// This is only meaningful for workflow task slots. For activity and local activity slots,
// this will always return false.
IsSticky() bool
}

// SlotMarkUsedInfo contains information that SlotSupplier instances can use during
Expand Down Expand Up @@ -287,6 +291,7 @@ func (f *FixedSizeSlotSupplier) MaxSlots() int {

type slotReservationData struct {
taskQueue string
isSticky bool
}

type slotReserveInfoImpl struct {
Expand All @@ -296,6 +301,7 @@ type slotReserveInfoImpl struct {
issuedSlots *atomic.Int32
logger log.Logger
metrics metrics.Handler
sticky bool
}

func (s slotReserveInfoImpl) TaskQueue() string {
Expand All @@ -322,6 +328,10 @@ func (s slotReserveInfoImpl) MetricsHandler() metrics.Handler {
return s.metrics
}

func (s slotReserveInfoImpl) IsSticky() bool {
return s.sticky
}

type slotMarkUsedContextImpl struct {
permit *SlotPermit
logger log.Logger
Expand Down Expand Up @@ -410,6 +420,7 @@ func (t *trackingSlotSupplier) ReserveSlot(
issuedSlots: &t.issuedSlotsAtomic,
logger: t.logger,
metrics: t.metrics,
sticky: data.isSticky,
})
if err != nil {
return nil, err
Expand All @@ -433,6 +444,7 @@ func (t *trackingSlotSupplier) TryReserveSlot(data *slotReservationData) *SlotPe
issuedSlots: &t.issuedSlotsAtomic,
logger: t.logger,
metrics: t.metrics,
sticky: data.isSticky,
})
if permit != nil {
t.issuedSlotsAtomic.Add(1)
Expand Down
Loading
Loading