Skip to content

Commit 5a1a3da

Browse files
authored
Fix task queue rate-limit for invalid tasks (#7590)
## What changed? Implement `RecycleToken` in `MultiRateLimiter` and add functional tests. ## Why? `ClockedRateLimiter` is the base rate limiter for many rate limiters in our system, so I made the mistake of assuming that implementing `RecycleToken` there and and then calling `ClockedRateLimiter.RecycleToken` from other wrapper rate limiters (such as `MultiRateLimiter` and `DynamicRateLimiter`) would "just work." However, a limitation of the recycle token implementation is that another process must be waiting on a token (via `ClockedRateLimiter.Wait`) at the time when `RecycleToken` is called. `DynamicRateLimiter.Wait` calls `ClockedRateLimiter.Wait`, which means that `DynamicRateLimiter.RecycleToken` can simply call `ClockedRateLimiter.RecycleToken`. However, `MultiRateLimiter.Wait` does not call the `Wait` method of its sub-rate-limiters; instead it implements a custom `Wait` method using the `Reserve` methods of its sub-rate-limiters. This means the sub-rate-limiters never call `Wait` and never wait for the recycled token to arrive. Giving `MultiRateLimiter` a custom `RecycleToken` implementation that pairs with the custom `MultiRateLimiter.Wait` solves this problem. ## How did you test it? Tested in a test server using sample workflows, and with functional tests for old and new matcher. Note that New Matcher did not have this bug, because it does not use `MultiRateLimiter` for task queue rate limiting. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? It fixes a bug, but the bug only occurs when many workflows that have backlogged tasks become invalid (ie. via termination or deletion). --------- Signed-off-by: Carly de Frondeville <[email protected]>
1 parent 20156eb commit 5a1a3da

File tree

3 files changed

+274
-36
lines changed

3 files changed

+274
-36
lines changed

common/quotas/clocked_rate_limiter.go

+1-26
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {
143143
return nil
144144
case <-l.recycleCh:
145145
if token > 1 {
146-
break // recycling 1 token to a process requesting >1 tokens is a no-op
146+
break // recycling 1 token to a process requesting >1 tokens is a no-op, because we only know that at least one token was not used
147147
}
148148

149149
// Cancel() reverses the effects of this Reservation on the rate limit as much as possible,
@@ -182,31 +182,6 @@ func (l ClockedRateLimiter) TokensAt(t time.Time) int {
182182
// In this case, we want to immediately unblock another process that is waiting for one token
183183
// so that the actual rate of completed actions is as close to the intended rate limit as possible.
184184
// If no process is waiting for a token when RecycleToken is called, this is a no-op.
185-
//
186-
// Since we don't know how many tokens were reserved by the process calling recycle, we will only unblock
187-
// new reservations that are for one token (otherwise we could recycle a 1-token-reservation and unblock
188-
// a 100-token-reservation). If all waiting processes are waiting for >1 tokens, this is a no-op.
189-
//
190-
// Because recycleCh is an unbuffered channel, the token will be reused for the next waiter as long
191-
// as there exists a waiter at the time RecycleToken is called. Usually the attempted rate is consistently
192-
// above or below the limit for a period of time, so if rate limiting is in effect and recycling matters,
193-
// most likely there will be a waiter. If the actual rate is erratically bouncing to either side of the
194-
// rate limit AND we perform many recycles, this will drop some recycled tokens.
195-
// If that situation turns out to be common, we may want to make it a buffered channel instead.
196-
//
197-
// Our goal is to ensure that each token in our bucket is used every second, meaning the time between
198-
// taking and successfully using a token must be <= 1s. For this to be true, we must have:
199-
//
200-
// time_to_recycle * number_of_recycles_per_second <= 1s
201-
// time_to_recycle * probability_of_recycle * number_of_attempts_per_second <= 1s
202-
//
203-
// Therefore, it is also possible for this strategy to be inaccurate if the delay between taking and
204-
// successfully using a token is greater than one second.
205-
//
206-
// Currently, RecycleToken is called when we take a token to attempt a matching task dispatch and
207-
// then later find out (usually via RPC to History) that the task should not be dispatched.
208-
// If history rpc takes 10ms --> 100 opportunities for the token to be used that second --> 99% recycle probability is ok.
209-
// If recycle probability is 50% --> need at least 2 opportunities for token to be used --> 500ms history rpc time is ok.
210185
func (l ClockedRateLimiter) RecycleToken() {
211186
select {
212187
case l.recycleCh <- struct{}{}:

common/quotas/multi_rate_limiter_impl.go

+68-10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type (
4141
// MultiRateLimiterImpl is a wrapper around the limiter interface
4242
MultiRateLimiterImpl struct {
4343
rateLimiters []RateLimiter
44+
recycleCh chan struct{}
4445
}
4546
)
4647

@@ -55,6 +56,7 @@ func NewMultiRateLimiter(
5556
}
5657
return &MultiRateLimiterImpl{
5758
rateLimiters: rateLimiters,
59+
recycleCh: make(chan struct{}),
5860
}
5961
}
6062

@@ -150,13 +152,34 @@ func (rl *MultiRateLimiterImpl) WaitN(ctx context.Context, numToken int) error {
150152

151153
t := time.NewTimer(delay)
152154
defer t.Stop()
153-
select {
154-
case <-t.C:
155-
return nil
155+
for {
156+
select {
157+
case <-t.C:
158+
return nil
159+
case <-ctx.Done():
160+
reservation.CancelAt(time.Now())
161+
return ctx.Err()
162+
case <-rl.recycleCh:
163+
if numToken > 1 {
164+
break // recycling 1 token to a process requesting >1 tokens is a no-op, because we only know that at least one token was not used
165+
}
156166

157-
case <-ctx.Done():
158-
reservation.CancelAt(time.Now())
159-
return ctx.Err()
167+
// Cancel() reverses the effects of this Reservation on the rate limit as much as possible,
168+
// considering that other reservations may have already been made. Normally, Cancel() indicates
169+
// that the reservation holder will not perform the reserved action, so it would make the most
170+
// sense to cancel the reservation whose token was just recycled. However, we don't have access
171+
// to the recycled reservation anymore, and even if we did, Cancel on a reservation that
172+
// has fully waited is a no-op, so instead we cancel the current reservation as a proxy.
173+
//
174+
// Since Cancel() just restores tokens to the rate limiter, cancelling the current 1-token
175+
// reservation should have approximately the same effect on the actual rate as cancelling the
176+
// recycled reservation.
177+
//
178+
// If the recycled reservation was for >1 token, cancelling the current 1-token reservation will
179+
// lead to a slower actual rate than cancelling the original, so the approximation is conservative.
180+
reservation.Cancel()
181+
return nil
182+
}
160183
}
161184
}
162185

@@ -192,10 +215,45 @@ func (rl *MultiRateLimiterImpl) TokensAt(t time.Time) int {
192215
return tokens
193216
}
194217

195-
// RecycleToken returns a token to each sub-rate-limiter, unblocking each
196-
// sub-rate-limiter's WaitN callers.
218+
// RecycleToken should be called when the action being rate limited was not completed
219+
// for some reason (i.e. a task is not dispatched because it was invalid).
220+
// In this case, we want to immediately unblock another process that is waiting for one token
221+
// so that the actual rate of completed actions is as close to the intended rate limit as possible.
222+
// If no process is waiting for a token when RecycleToken is called, this is a no-op.
223+
//
224+
// For most rate limiters, recycle token is implemented in the base level ClockedRateLimiter,
225+
// but because MultiRateLimiter implements WaitN by calling ReserveN on all its sub-rate-limiters,
226+
// instead of WaitN, there is no one waiting on ClockedRateLimiter's recycle token channel.
227+
// So MultiRateLimiter needs its own recycle method and channel.
228+
//
229+
// Since we don't know how many tokens were reserved by the process calling recycle, we will only unblock
230+
// new reservations that are for one token (otherwise we could recycle a 1-token-reservation and unblock
231+
// a 100-token-reservation). If all waiting processes are waiting for >1 tokens, this is a no-op.
232+
//
233+
// Because recycleCh is an unbuffered channel, the token will be reused for the next waiter only if
234+
// there exists a waiter at the time RecycleToken is called. Usually the attempted rate is consistently
235+
// above or below the limit for a period of time, so if rate limiting is in effect and recycling matters,
236+
// most likely there will be a waiter. If the actual rate is erratically bouncing to either side of the
237+
// rate limit AND we perform many recycles, this will drop some recycled tokens.
238+
// If that situation turns out to be common, we may want to make it a buffered channel instead.
239+
//
240+
// Our goal is to ensure that each token in our bucket is used every second, meaning the time between
241+
// taking and successfully using a token must be <= 1s. For this to be true, we must have:
242+
//
243+
// time_to_recycle * number_of_recycles_per_second <= 1s
244+
// time_to_recycle * probability_of_recycle * number_of_attempts_per_second <= 1s
245+
//
246+
// Therefore, it is also possible for this strategy to be inaccurate if the delay between taking and
247+
// successfully using a token is greater than one second.
248+
//
249+
// Currently, RecycleToken is called when we take a token to attempt a matching task dispatch and
250+
// then later find out (usually via RPC to History) that the task should not be dispatched.
251+
// If history rpc takes 10ms --> 100 opportunities for the token to be used that second --> 99% recycle probability is ok.
252+
// If recycle probability is 50% --> need at least 2 opportunities for token to be used --> 500ms history rpc time is ok.
253+
// Note that the task forwarder rate limit impacts the rate of recycle, which can add inaccuracy.
197254
func (rl *MultiRateLimiterImpl) RecycleToken() {
198-
for _, rateLimiter := range rl.rateLimiters {
199-
rateLimiter.RecycleToken()
255+
select {
256+
case rl.recycleCh <- struct{}{}:
257+
default:
200258
}
201259
}

tests/task_queue_test.go

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in
13+
// all copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
// THE SOFTWARE.
22+
23+
package tests
24+
25+
import (
26+
"context"
27+
"fmt"
28+
"testing"
29+
"time"
30+
31+
"github.com/stretchr/testify/suite"
32+
commonpb "go.temporal.io/api/common/v1"
33+
enumspb "go.temporal.io/api/enums/v1"
34+
workflowpb "go.temporal.io/api/workflow/v1"
35+
"go.temporal.io/api/workflowservice/v1"
36+
sdkclient "go.temporal.io/sdk/client"
37+
"go.temporal.io/sdk/worker"
38+
"go.temporal.io/sdk/workflow"
39+
"go.temporal.io/server/common/dynamicconfig"
40+
"go.temporal.io/server/common/testing/testvars"
41+
"go.temporal.io/server/tests/testcore"
42+
)
43+
44+
type TaskQueueSuite struct {
45+
testcore.FunctionalTestSuite
46+
sdkClient sdkclient.Client
47+
}
48+
49+
func TestTaskQueueSuite(t *testing.T) {
50+
t.Parallel()
51+
suite.Run(t, new(TaskQueueSuite))
52+
}
53+
54+
func (s *TaskQueueSuite) SetupSuite() {
55+
dynamicConfigOverrides := map[dynamicconfig.Key]any{
56+
dynamicconfig.MatchingNumTaskqueueWritePartitions.Key(): 4,
57+
dynamicconfig.MatchingNumTaskqueueReadPartitions.Key(): 4,
58+
}
59+
s.FunctionalTestSuite.SetupSuiteWithDefaultCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides))
60+
}
61+
62+
func (s *TaskQueueSuite) SetupTest() {
63+
s.FunctionalTestSuite.SetupTest()
64+
65+
var err error
66+
s.sdkClient, err = sdkclient.Dial(sdkclient.Options{
67+
HostPort: s.FrontendGRPCAddress(),
68+
Namespace: s.Namespace().String(),
69+
})
70+
s.NoError(err)
71+
}
72+
73+
func (s *TaskQueueSuite) TearDownTest() {
74+
if s.sdkClient != nil {
75+
s.sdkClient.Close()
76+
}
77+
s.FunctionalTestBase.TearDownTest()
78+
}
79+
80+
// Not using RunTestWithMatchingBehavior because I want to pass different expected drain times for different configurations
81+
func (s *TaskQueueSuite) TestTaskQueueRateLimit() {
82+
s.RunTaskQueueRateLimitTest(1, 1, 12*time.Second, true) // ~0.75s avg
83+
s.RunTaskQueueRateLimitTest(1, 1, 12*time.Second, false) // ~1.1s avg
84+
85+
// Testing multiple partitions with insufficient pollers is too flaky, because token recycling
86+
// depends on a process being available to accept the token, so I'm not testing it
87+
s.RunTaskQueueRateLimitTest(4, 8, 24*time.Second, true) // ~1.6s avg
88+
s.RunTaskQueueRateLimitTest(4, 8, 24*time.Second, false) // ~6s avg
89+
}
90+
91+
func (s *TaskQueueSuite) RunTaskQueueRateLimitTest(nPartitions, nWorkers int, timeToDrain time.Duration, useNewMatching bool) {
92+
s.Run(s.testTaskQueueRateLimitName(nPartitions, nWorkers, useNewMatching), func() { s.taskQueueRateLimitTest(nPartitions, nWorkers, timeToDrain, useNewMatching) })
93+
}
94+
95+
func (s *TaskQueueSuite) taskQueueRateLimitTest(nPartitions, nWorkers int, timeToDrain time.Duration, useNewMatching bool) {
96+
if useNewMatching {
97+
s.OverrideDynamicConfig(dynamicconfig.MatchingUseNewMatcher, true)
98+
}
99+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, nPartitions)
100+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, nPartitions)
101+
102+
// exclude the effect of the default forwarding rate limit (10)
103+
s.OverrideDynamicConfig(dynamicconfig.MatchingForwarderMaxRatePerSecond, 1000)
104+
105+
// 30 tasks at 1 task per second is 30 seconds.
106+
// if invalid tasks are NOT using the rate limit, then this should take well below that long.
107+
// task forwarding between task queue partitions is rate-limited by default to 10 rps.
108+
s.OverrideDynamicConfig(dynamicconfig.AdminMatchingNamespaceTaskqueueToPartitionDispatchRate, 1)
109+
s.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 0)
110+
111+
const maxBacklog = 30
112+
tv := testvars.New(s.T())
113+
114+
helloRateLimitTest := func(ctx workflow.Context, name string) (string, error) {
115+
return "Hello " + name + " !", nil
116+
}
117+
118+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
119+
defer cancel()
120+
121+
// start workflows to create a backlog
122+
for wfidx := 0; wfidx < maxBacklog; wfidx++ {
123+
_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
124+
TaskQueue: tv.TaskQueue().GetName(),
125+
ID: fmt.Sprintf("wf%d", wfidx),
126+
}, helloRateLimitTest, "Donna")
127+
s.NoError(err)
128+
}
129+
130+
// wait for backlog to be >= maxBacklog
131+
wfBacklogCount := int64(0)
132+
s.Eventually(
133+
func() bool {
134+
wfBacklogCount = s.getBacklogCount(ctx, tv)
135+
return wfBacklogCount >= maxBacklog
136+
},
137+
5*time.Second,
138+
200*time.Millisecond,
139+
)
140+
141+
// terminate all those workflow executions so that all the tasks in the backlog are invalid
142+
var wfList []*workflowpb.WorkflowExecutionInfo
143+
s.Eventually(
144+
func() bool {
145+
listResp, err := s.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
146+
Namespace: s.Namespace().String(),
147+
Query: fmt.Sprintf("TaskQueue = '%s'", tv.TaskQueue().GetName()),
148+
})
149+
s.NoError(err)
150+
wfList = listResp.GetExecutions()
151+
return len(wfList) == maxBacklog
152+
},
153+
5*time.Second,
154+
200*time.Millisecond,
155+
)
156+
157+
for _, exec := range wfList {
158+
_, err := s.FrontendClient().TerminateWorkflowExecution(ctx, &workflowservice.TerminateWorkflowExecutionRequest{
159+
Namespace: s.Namespace().String(),
160+
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: exec.GetExecution().GetWorkflowId(), RunId: exec.GetExecution().GetRunId()},
161+
Reason: "test",
162+
Identity: tv.ClientIdentity(),
163+
})
164+
s.NoError(err)
165+
}
166+
167+
// start some workers
168+
workers := make([]worker.Worker, nWorkers)
169+
for i := 0; i < nWorkers; i++ {
170+
workers[i] = worker.New(s.sdkClient, tv.TaskQueue().GetName(), worker.Options{})
171+
workers[i].RegisterWorkflow(helloRateLimitTest)
172+
err := workers[i].Start()
173+
s.NoError(err)
174+
}
175+
176+
// wait for backlog to be 0
177+
s.Eventually(
178+
func() bool {
179+
wfBacklogCount = s.getBacklogCount(ctx, tv)
180+
return wfBacklogCount == 0
181+
},
182+
timeToDrain,
183+
500*time.Millisecond,
184+
)
185+
186+
}
187+
188+
func (s *TaskQueueSuite) getBacklogCount(ctx context.Context, tv *testvars.TestVars) int64 {
189+
resp, err := s.FrontendClient().DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
190+
Namespace: s.Namespace().String(),
191+
TaskQueue: tv.TaskQueue(),
192+
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
193+
ReportStats: true,
194+
})
195+
s.NoError(err)
196+
return resp.GetVersionsInfo()[""].GetTypesInfo()[sdkclient.TaskQueueTypeWorkflow].GetStats().GetApproximateBacklogCount()
197+
}
198+
199+
func (s *TaskQueueSuite) testTaskQueueRateLimitName(nPartitions, nWorkers int, useNewMatching bool) string {
200+
ret := fmt.Sprintf("%vPartitions_%vWorkers", nPartitions, nWorkers)
201+
if useNewMatching {
202+
return "NewMatching_" + ret
203+
}
204+
return "OldMatching_" + ret
205+
}

0 commit comments

Comments
 (0)