Skip to content

Commit da1016b

Browse files
chore: Patch in race condition for eviction queue fixes to v0.32.x (#1035)
1 parent 94e7412 commit da1016b

File tree

5 files changed

+58
-13
lines changed

5 files changed

+58
-13
lines changed

pkg/controllers/node/termination/machine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ var _ = Describe("Machine/Termination", func() {
568568
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
569569

570570
// Expect that the old pod's key still exists in the queue
571-
Expect(queue.Has(terminator.NewQueueKey(pod)))
571+
Expect(queue.Has(pod)).To(BeTrue())
572572

573573
// Re-create the pod and node, it should now have the same name, but a different UUID
574574
node = test.Node(test.NodeOptions{

pkg/controllers/node/termination/nodeclaim_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ var _ = Describe("NodeClaim/Termination", func() {
570570
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
571571

572572
// Expect that the old pod's key still exists in the queue
573-
Expect(queue.Has(terminator.NewQueueKey(pod)))
573+
Expect(queue.Has(pod)).To(BeTrue())
574574

575575
// Re-create the pod and node, it should now have the same name, but a different UUID
576576
node = test.Node(test.NodeOptions{

pkg/controllers/node/termination/suite_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"testing"
2121
"time"
2222

23+
. "github.com/onsi/ginkgo/v2"
24+
. "github.com/onsi/gomega"
25+
2326
"github.com/samber/lo"
2427
clock "k8s.io/utils/clock/testing"
2528
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -34,8 +37,6 @@ import (
3437
"github.com/aws/karpenter-core/pkg/operator/scheme"
3538
"github.com/aws/karpenter-core/pkg/test"
3639

37-
. "github.com/onsi/ginkgo/v2"
38-
. "github.com/onsi/gomega"
3940
v1 "k8s.io/api/core/v1"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4142
. "knative.dev/pkg/logging/testing"
@@ -75,7 +76,7 @@ var _ = AfterSuite(func() {
7576
func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) {
7677
GinkgoHelper()
7778
for _, pod := range pods {
78-
Expect(e.Has(terminator.NewQueueKey(pod))).To(BeFalse())
79+
Expect(e.Has(pod)).To(BeFalse())
7980
}
8081
}
8182

pkg/controllers/node/termination/terminator/eviction.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"sync"
2122
"time"
2223

2324
"github.com/samber/lo"
@@ -74,7 +75,9 @@ func NewQueueKey(pod *v1.Pod) QueueKey {
7475

7576
type Queue struct {
7677
workqueue.RateLimitingInterface
77-
sets.Set[QueueKey]
78+
79+
mu sync.Mutex
80+
set sets.Set[QueueKey]
7881

7982
kubeClient client.Client
8083
recorder events.Recorder
@@ -83,7 +86,7 @@ type Queue struct {
8386
func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
8487
queue := &Queue{
8588
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
86-
Set: sets.New[QueueKey](),
89+
set: sets.New[QueueKey](),
8790
kubeClient: kubeClient,
8891
recorder: recorder,
8992
}
@@ -100,15 +103,25 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder
100103

101104
// Add adds pods to the Queue
102105
func (q *Queue) Add(pods ...*v1.Pod) {
106+
q.mu.Lock()
107+
defer q.mu.Unlock()
108+
103109
for _, pod := range pods {
104110
qk := NewQueueKey(pod)
105-
if !q.Set.Has(qk) {
106-
q.Set.Insert(qk)
111+
if !q.set.Has(qk) {
112+
q.set.Insert(qk)
107113
q.RateLimitingInterface.Add(qk)
108114
}
109115
}
110116
}
111117

118+
func (q *Queue) Has(pod *v1.Pod) bool {
119+
q.mu.Lock()
120+
defer q.mu.Unlock()
121+
122+
return q.set.Has(NewQueueKey(pod))
123+
}
124+
112125
func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
113126
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
114127
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
@@ -126,7 +139,9 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
126139
// Evict pod
127140
if q.Evict(ctx, qk) {
128141
q.RateLimitingInterface.Forget(qk)
129-
q.Set.Delete(qk)
142+
q.mu.Lock()
143+
q.set.Delete(qk)
144+
q.mu.Unlock()
130145
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
131146
}
132147
// Requeue pod if eviction failed
@@ -170,6 +185,9 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
170185
}
171186

172187
func (q *Queue) Reset() {
188+
q.mu.Lock()
189+
defer q.mu.Unlock()
190+
173191
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
174-
q.Set = sets.New[QueueKey]()
192+
q.set = sets.New[QueueKey]()
175193
}

pkg/controllers/node/termination/terminator/suite_test.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@ package terminator_test
1616

1717
import (
1818
"context"
19+
"sync"
1920
"testing"
2021

2122
. "github.com/onsi/ginkgo/v2"
2223
. "github.com/onsi/gomega"
2324
"github.com/samber/lo"
25+
v1 "k8s.io/api/core/v1"
2426
policyv1 "k8s.io/api/policy/v1"
2527
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
"k8s.io/apimachinery/pkg/util/intstr"
2729
"k8s.io/apimachinery/pkg/util/uuid"
2830
. "knative.dev/pkg/logging/testing"
2931
"sigs.k8s.io/controller-runtime/pkg/client"
3032

31-
v1 "k8s.io/api/core/v1"
32-
3333
"github.com/aws/karpenter-core/pkg/controllers/node/termination/terminator"
3434

3535
"github.com/aws/karpenter-core/pkg/apis"
@@ -125,5 +125,31 @@ var _ = Describe("Eviction/Queue", func() {
125125
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
126126
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
127127
})
128+
It("should ensure that calling Evict() is valid while making Add() calls", func() {
129+
cancelCtx, cancel := context.WithCancel(ctx)
130+
wg := sync.WaitGroup{}
131+
DeferCleanup(func() {
132+
cancel()
133+
wg.Wait() // Ensure that we wait for reconcile loop to finish so that we don't get a RACE
134+
})
135+
136+
// Keep calling Reconcile() for the entirety of this test
137+
wg.Add(1)
138+
go func() {
139+
defer wg.Done()
140+
141+
for {
142+
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
143+
if cancelCtx.Err() != nil {
144+
return
145+
}
146+
}
147+
}()
148+
149+
// Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE)
150+
for i := 0; i < 10000; i++ {
151+
queue.Add(test.Pod())
152+
}
153+
})
128154
})
129155
})

0 commit comments

Comments
 (0)