Skip to content

Commit c09c580

Browse files
authored
Back off on DescribeWorkerDeployment ResourceExhausted error (#291)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed and why #### Problem Namespaces with many TemporalWorkerDeployment objects can trigger Temporal's per-namespace DescribeWorkerDeployment rate limit (frontend.globalNamespaceWorkerDeploymentReadRPS, default 50 RPS). When this happens, the reconciler was returning the error immediately, causing the workqueue to requeue with an exponential backoff starting at ~5ms — effectively a tight retry loop that makes the rate-limit problem worse. The error comes back as *serviceerror.ResourceExhausted (not a standard gRPC codes.ResourceExhausted status), so it must be detected with errors.As rather than grpcstatus.FromError. #### Changes - internal/controller/worker_controller.go: detect *serviceerror.ResourceExhausted from GetWorkerDeploymentState and return RequeueAfter: 30s instead of an immediate error requeue. Sets ConditionProgressing=False with ReasonTemporalStateFetchFailed and a "Rate limited" message so the condition is visible to users. - internal/tests/internal/rate_limit_integration_test.go: new integration test that creates 10 TWDs against a 1 RPS limit, confirming the error surfaces with the expected condition reason and message. - go.mod / go.sum / go.work: bump Temporal server dependency to v1.31.0-154.2, which is the first version that enforces globalNamespaceWorkerDeploymentReadRPS. ## Checklist 1. Closes #278 2. How was this tested: - KUBEBUILDER_ASSETS=.../bin/k8s/1.27.1-darwin-arm64 go test -tags test_dep ./internal/tests/internal -run "TestIntegration/rate-limit" -timeout 120s -v passes - Same test fails when the errors.As block is removed (reverts to immediate retry with generic message) - Full integration suite passes: go test -tags test_dep ./internal/tests/internal -run TestIntegration -timeout 600s 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 1255472 commit c09c580

3 files changed

Lines changed: 115 additions & 0 deletions

File tree

internal/controller/worker_controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
1515
"github.com/temporalio/temporal-worker-controller/internal/k8s"
1616
"github.com/temporalio/temporal-worker-controller/internal/temporal"
17+
"go.temporal.io/api/serviceerror"
1718
appsv1 "k8s.io/api/apps/v1"
1819
corev1 "k8s.io/api/core/v1"
1920
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -242,6 +243,14 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
242243
getControllerIdentity(),
243244
)
244245
if err != nil {
246+
var rateLimitErr *serviceerror.ResourceExhausted
247+
if errors.As(err, &rateLimitErr) {
248+
r.recordWarningAndSetBlocked(ctx, &workerDeploy,
249+
temporaliov1alpha1.ReasonTemporalStateFetchFailed,
250+
fmt.Sprintf("Rate limited fetching Temporal worker deployment state: %v", err),
251+
fmt.Sprintf("Rate limited by Temporal server: %v", err))
252+
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
253+
}
245254
r.recordWarningAndSetBlocked(ctx, &workerDeploy,
246255
temporaliov1alpha1.ReasonTemporalStateFetchFailed,
247256
fmt.Sprintf("Unable to get Temporal worker deployment state: %v", err),

internal/tests/internal/integration_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,17 @@ func TestIntegration(t *testing.T) {
950950

951951
// Conditions and events tests
952952
runConditionsAndEventsTests(t, k8sClient, mgr, ts, testNamespace.Name)
953+
954+
// Rate limit test: uses a dedicated server to avoid interfering with the tests above.
955+
// Ten TWDs in the same Temporal namespace produce 10 concurrent DescribeWorkerDeployment
956+
// calls per second against a 1 RPS limit, reliably triggering ResourceExhausted errors.
957+
dcRateLimit := dynamicconfig.NewMemoryClient()
958+
dcRateLimit.OverrideValue(dynamicconfig.MakeKey("frontend.globalNamespaceWorkerDeploymentReadRPS"), 1)
959+
tsRateLimit := temporaltest.NewServer(
960+
temporaltest.WithT(t),
961+
temporaltest.WithBaseServerOptions(temporal.WithDynamicConfigClient(dcRateLimit)),
962+
)
963+
runRateLimitTest(t, k8sClient, tsRateLimit, testNamespace.Name)
953964
}
954965

955966
// testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
12+
"github.com/temporalio/temporal-worker-controller/internal/testhelpers"
13+
"go.temporal.io/server/temporaltest"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/types"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
)
18+
19+
// runRateLimitTest verifies that when multiple TWDs in the same Temporal namespace
20+
// burst-reconcile concurrently against a 1 RPS DescribeWorkerDeployment limit, the
21+
// controller surfaces the error as ConditionProgressing=False with
22+
// ReasonTemporalStateFetchFailed and a "Rate limited" message.
23+
//
24+
// The server passed in must have frontend.globalNamespaceWorkerDeploymentReadRPS=1.
25+
// With 10 TWDs each reconciling every 1s (RECONCILE_INTERVAL=1s set by setupTestEnvironment),
26+
// steady-state load is 10x the limit, ensuring the burst is reliably triggered.
27+
func runRateLimitTest(
28+
t *testing.T,
29+
k8sClient client.Client,
30+
ts *temporaltest.TestServer,
31+
testNamespace string,
32+
) {
33+
t.Run("rate-limit-burst-surfaces-error", func(t *testing.T) {
34+
ctx := context.Background()
35+
const n = 10
36+
37+
twds := make([]*temporaliov1alpha1.TemporalWorkerDeployment, n)
38+
39+
for i := 0; i < n; i++ {
40+
name := fmt.Sprintf("rate-limit-%d", i)
41+
42+
conn := &temporaliov1alpha1.TemporalConnection{
43+
ObjectMeta: metav1.ObjectMeta{
44+
Name: name,
45+
Namespace: testNamespace,
46+
},
47+
Spec: temporaliov1alpha1.TemporalConnectionSpec{
48+
HostPort: ts.GetFrontendHostPort(),
49+
},
50+
}
51+
if err := k8sClient.Create(ctx, conn); err != nil {
52+
t.Fatalf("failed to create TemporalConnection %s: %v", name, err)
53+
}
54+
55+
twd := testhelpers.NewTemporalWorkerDeploymentBuilder().
56+
WithManualStrategy().
57+
WithTargetTemplate("v1.0").
58+
WithName(name).
59+
WithNamespace(testNamespace).
60+
WithTemporalConnection(name).
61+
WithTemporalNamespace(ts.GetDefaultNamespace()).
62+
Build()
63+
if err := k8sClient.Create(ctx, twd); err != nil {
64+
t.Fatalf("failed to create TWD %s: %v", name, err)
65+
}
66+
twds[i] = twd
67+
}
68+
69+
// With 10 TWDs reconciling concurrently every 1s and a 1 RPS limit, most will be
70+
// rate-limited almost immediately. Verify at least one surfaces the error with the
71+
// expected condition reason and "Rate limited" message (set by our ResourceExhausted
72+
// handler in worker_controller.go).
73+
eventually(t, 30*time.Second, time.Second, func() error {
74+
for _, twd := range twds {
75+
var fresh temporaliov1alpha1.TemporalWorkerDeployment
76+
if err := k8sClient.Get(ctx, types.NamespacedName{
77+
Name: twd.Name,
78+
Namespace: twd.Namespace,
79+
}, &fresh); err != nil {
80+
return fmt.Errorf("get TWD %s: %w", twd.Name, err)
81+
}
82+
for _, c := range fresh.Status.Conditions {
83+
if c.Type == temporaliov1alpha1.ConditionProgressing &&
84+
c.Status == metav1.ConditionFalse &&
85+
c.Reason == temporaliov1alpha1.ReasonTemporalStateFetchFailed &&
86+
strings.Contains(c.Message, "Rate limited") {
87+
t.Logf("TWD %s confirmed rate-limited: %s", twd.Name, c.Message)
88+
return nil
89+
}
90+
}
91+
}
92+
return errors.New("no TWD has been rate-limited yet")
93+
})
94+
})
95+
}

0 commit comments

Comments
 (0)