Skip to content
27 changes: 1 addition & 26 deletions common/quotas/clocked_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {
return nil
case <-l.recycleCh:
if token > 1 {
break // recycling 1 token to a process requesting >1 tokens is a no-op
break // recycling 1 token to a process requesting >1 tokens is a no-op, because we only know that at least one token was not used
}

// Cancel() reverses the effects of this Reservation on the rate limit as much as possible,
Expand Down Expand Up @@ -182,31 +182,6 @@ func (l ClockedRateLimiter) TokensAt(t time.Time) int {
// In this case, we want to immediately unblock another process that is waiting for one token
// so that the actual rate of completed actions is as close to the intended rate limit as possible.
// If no process is waiting for a token when RecycleToken is called, this is a no-op.
//
// Since we don't know how many tokens were reserved by the process calling recycle, we will only unblock
// new reservations that are for one token (otherwise we could recycle a 1-token-reservation and unblock
// a 100-token-reservation). If all waiting processes are waiting for >1 tokens, this is a no-op.
//
// Because recycleCh is an unbuffered channel, the token will be reused for the next waiter as long
// as there exists a waiter at the time RecycleToken is called. Usually the attempted rate is consistently
// above or below the limit for a period of time, so if rate limiting is in effect and recycling matters,
// most likely there will be a waiter. If the actual rate is erratically bouncing to either side of the
// rate limit AND we perform many recycles, this will drop some recycled tokens.
// If that situation turns out to be common, we may want to make it a buffered channel instead.
//
// Our goal is to ensure that each token in our bucket is used every second, meaning the time between
// taking and successfully using a token must be <= 1s. For this to be true, we must have:
//
// time_to_recycle * number_of_recycles_per_second <= 1s
// time_to_recycle * probability_of_recycle * number_of_attempts_per_second <= 1s
//
// Therefore, it is also possible for this strategy to be inaccurate if the delay between taking and
// successfully using a token is greater than one second.
//
// Currently, RecycleToken is called when we take a token to attempt a matching task dispatch and
// then later find out (usually via RPC to History) that the task should not be dispatched.
// If history rpc takes 10ms --> 100 opportunities for the token to be used that second --> 99% recycle probability is ok.
// If recycle probability is 50% --> need at least 2 opportunities for token to be used --> 500ms history rpc time is ok.
func (l ClockedRateLimiter) RecycleToken() {
select {
case l.recycleCh <- struct{}{}:
Expand Down
78 changes: 68 additions & 10 deletions common/quotas/multi_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type (
// MultiRateLimiterImpl is a wrapper around the limiter interface
MultiRateLimiterImpl struct {
rateLimiters []RateLimiter
recycleCh chan struct{}
}
)

Expand All @@ -55,6 +56,7 @@ func NewMultiRateLimiter(
}
return &MultiRateLimiterImpl{
rateLimiters: rateLimiters,
recycleCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -150,13 +152,34 @@ func (rl *MultiRateLimiterImpl) WaitN(ctx context.Context, numToken int) error {

t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
return nil
for {
select {
case <-t.C:
return nil
case <-ctx.Done():
reservation.CancelAt(time.Now())
return ctx.Err()
case <-rl.recycleCh:
if numToken > 1 {
break // recycling 1 token to a process requesting >1 tokens is a no-op, because we only know that at least one token was not used
}

case <-ctx.Done():
reservation.CancelAt(time.Now())
return ctx.Err()
// Cancel() reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made. Normally, Cancel() indicates
// that the reservation holder will not perform the reserved action, so it would make the most
// sense to cancel the reservation whose token was just recycled. However, we don't have access
// to the recycled reservation anymore, and even if we did, Cancel on a reservation that
// has fully waited is a no-op, so instead we cancel the current reservation as a proxy.
//
// Since Cancel() just restores tokens to the rate limiter, cancelling the current 1-token
// reservation should have approximately the same effect on the actual rate as cancelling the
// recycled reservation.
//
// If the recycled reservation was for >1 token, cancelling the current 1-token reservation will
// lead to a slower actual rate than cancelling the original, so the approximation is conservative.
reservation.Cancel()
return nil
}
}
}

Expand Down Expand Up @@ -192,10 +215,45 @@ func (rl *MultiRateLimiterImpl) TokensAt(t time.Time) int {
return tokens
}

// RecycleToken returns a token to each sub-rate-limiter, unblocking each
// sub-rate-limiter's WaitN callers.
// RecycleToken should be called when the action being rate limited was not completed
// for some reason (i.e. a task is not dispatched because it was invalid).
// In this case, we want to immediately unblock another process that is waiting for one token
// so that the actual rate of completed actions is as close to the intended rate limit as possible.
// If no process is waiting for a token when RecycleToken is called, this is a no-op.
//
// For most rate limiters, recycle token is implemented in the base level ClockedRateLimiter,
// but because MultiRateLimiter implements WaitN by calling ReserveN on all its sub-rate-limiters,
// instead of WaitN, there is no one waiting on ClockedRateLimiter's recycle token channel.
// So MultiRateLimiter needs its own recycle method and channel.
//
// Since we don't know how many tokens were reserved by the process calling recycle, we will only unblock
// new reservations that are for one token (otherwise we could recycle a 1-token-reservation and unblock
// a 100-token-reservation). If all waiting processes are waiting for >1 tokens, this is a no-op.
//
// Because recycleCh is an unbuffered channel, the token will be reused for the next waiter only if
// there exists a waiter at the time RecycleToken is called. Usually the attempted rate is consistently
// above or below the limit for a period of time, so if rate limiting is in effect and recycling matters,
// most likely there will be a waiter. If the actual rate is erratically bouncing to either side of the
// rate limit AND we perform many recycles, this will drop some recycled tokens.
// If that situation turns out to be common, we may want to make it a buffered channel instead.
//
// Our goal is to ensure that each token in our bucket is used every second, meaning the time between
// taking and successfully using a token must be <= 1s. For this to be true, we must have:
//
// time_to_recycle * number_of_recycles_per_second <= 1s
// time_to_recycle * probability_of_recycle * number_of_attempts_per_second <= 1s
//
// Therefore, it is also possible for this strategy to be inaccurate if the delay between taking and
// successfully using a token is greater than one second.
//
// Currently, RecycleToken is called when we take a token to attempt a matching task dispatch and
// then later find out (usually via RPC to History) that the task should not be dispatched.
// If history rpc takes 10ms --> 100 opportunities for the token to be used that second --> 99% recycle probability is ok.
// If recycle probability is 50% --> need at least 2 opportunities for token to be used --> 500ms history rpc time is ok.
// Note that the task forwarder rate limit impacts the rate of recycle, which can add inaccuracy.
func (rl *MultiRateLimiterImpl) RecycleToken() {
for _, rateLimiter := range rl.rateLimiters {
rateLimiter.RecycleToken()
select {
case rl.recycleCh <- struct{}{}:
default:
}
}
205 changes: 205 additions & 0 deletions tests/task_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// The MIT License
//
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tests

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/tests/testcore"
)

type TaskQueueSuite struct {
testcore.FunctionalTestSuite
sdkClient sdkclient.Client
}

func TestTaskQueueSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(TaskQueueSuite))
}

func (s *TaskQueueSuite) SetupSuite() {
dynamicConfigOverrides := map[dynamicconfig.Key]any{
dynamicconfig.MatchingNumTaskqueueWritePartitions.Key(): 4,
dynamicconfig.MatchingNumTaskqueueReadPartitions.Key(): 4,
}
s.FunctionalTestSuite.SetupSuiteWithDefaultCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides))
}

func (s *TaskQueueSuite) SetupTest() {
s.FunctionalTestSuite.SetupTest()

var err error
s.sdkClient, err = sdkclient.Dial(sdkclient.Options{
HostPort: s.FrontendGRPCAddress(),
Namespace: s.Namespace().String(),
})
s.NoError(err)
}

func (s *TaskQueueSuite) TearDownTest() {
if s.sdkClient != nil {
s.sdkClient.Close()
}
s.FunctionalTestBase.TearDownTest()
}

// Not using RunTestWithMatchingBehavior because I want to pass different expected drain times for different configurations
func (s *TaskQueueSuite) TestTaskQueueRateLimit() {
s.RunTaskQueueRateLimitTest(1, 1, 12*time.Second, true) // ~0.75s avg
s.RunTaskQueueRateLimitTest(1, 1, 12*time.Second, false) // ~1.1s avg

// Testing multiple partitions with insufficient pollers is too flaky, because token recycling
// depends on a process being available to accept the token, so I'm not testing it
s.RunTaskQueueRateLimitTest(4, 8, 24*time.Second, true) // ~1.6s avg
s.RunTaskQueueRateLimitTest(4, 8, 24*time.Second, false) // ~6s avg
}

func (s *TaskQueueSuite) RunTaskQueueRateLimitTest(nPartitions, nWorkers int, timeToDrain time.Duration, useNewMatching bool) {
s.Run(s.testTaskQueueRateLimitName(nPartitions, nWorkers, useNewMatching), func() { s.taskQueueRateLimitTest(nPartitions, nWorkers, timeToDrain, useNewMatching) })
}

func (s *TaskQueueSuite) taskQueueRateLimitTest(nPartitions, nWorkers int, timeToDrain time.Duration, useNewMatching bool) {
if useNewMatching {
s.OverrideDynamicConfig(dynamicconfig.MatchingUseNewMatcher, true)
}
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, nPartitions)
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, nPartitions)

// exclude the effect of the default forwarding rate limit (10)
s.OverrideDynamicConfig(dynamicconfig.MatchingForwarderMaxRatePerSecond, 1000)

// 30 tasks at 1 task per second is 30 seconds.
// if invalid tasks are NOT using the rate limit, then this should take well below that long.
// task forwarding between task queue partitions is rate-limited by default to 10 rps.
s.OverrideDynamicConfig(dynamicconfig.AdminMatchingNamespaceTaskqueueToPartitionDispatchRate, 1)
s.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 0)

const maxBacklog = 30
tv := testvars.New(s.T())

helloRateLimitTest := func(ctx workflow.Context, name string) (string, error) {
return "Hello " + name + " !", nil
}

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// start workflows to create a backlog
for wfidx := 0; wfidx < maxBacklog; wfidx++ {
_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: tv.TaskQueue().GetName(),
ID: fmt.Sprintf("wf%d", wfidx),
}, helloRateLimitTest, "Donna")
s.NoError(err)
}

// wait for backlog to be >= maxBacklog
wfBacklogCount := int64(0)
s.Eventually(
func() bool {
wfBacklogCount = s.getBacklogCount(ctx, tv)
return wfBacklogCount >= maxBacklog
},
5*time.Second,
200*time.Millisecond,
)

// terminate all those workflow executions so that all the tasks in the backlog are invalid
var wfList []*workflowpb.WorkflowExecutionInfo
s.Eventually(
func() bool {
listResp, err := s.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.Namespace().String(),
Query: fmt.Sprintf("TaskQueue = '%s'", tv.TaskQueue().GetName()),
})
s.NoError(err)
wfList = listResp.GetExecutions()
return len(wfList) == maxBacklog
},
5*time.Second,
200*time.Millisecond,
)

for _, exec := range wfList {
_, err := s.FrontendClient().TerminateWorkflowExecution(ctx, &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: s.Namespace().String(),
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: exec.GetExecution().GetWorkflowId(), RunId: exec.GetExecution().GetRunId()},
Reason: "test",
Identity: tv.ClientIdentity(),
})
s.NoError(err)
}

// start some workers
workers := make([]worker.Worker, nWorkers)
for i := 0; i < nWorkers; i++ {
workers[i] = worker.New(s.sdkClient, tv.TaskQueue().GetName(), worker.Options{})
workers[i].RegisterWorkflow(helloRateLimitTest)
err := workers[i].Start()
s.NoError(err)
}

// wait for backlog to be 0
s.Eventually(
func() bool {
wfBacklogCount = s.getBacklogCount(ctx, tv)
return wfBacklogCount == 0
},
timeToDrain,
500*time.Millisecond,
)

}

func (s *TaskQueueSuite) getBacklogCount(ctx context.Context, tv *testvars.TestVars) int64 {
resp, err := s.FrontendClient().DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: s.Namespace().String(),
TaskQueue: tv.TaskQueue(),
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
ReportStats: true,
})
s.NoError(err)
return resp.GetVersionsInfo()[""].GetTypesInfo()[sdkclient.TaskQueueTypeWorkflow].GetStats().GetApproximateBacklogCount()
}

func (s *TaskQueueSuite) testTaskQueueRateLimitName(nPartitions, nWorkers int, useNewMatching bool) string {
ret := fmt.Sprintf("%vPartitions_%vWorkers", nPartitions, nWorkers)
if useNewMatching {
return "NewMatching_" + ret
}
return "OldMatching_" + ret
}
Loading