Skip to content

Commit 6d8f1ff

Browse files
committed
add read lock to process item for concurrent workers, protect default queue
1 parent 7f70991 commit 6d8f1ff

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

pkg/dynamiccontroller/dynamic_controller.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ func NewDynamicController(
175175
// pass version and pod id from env
176176
}
177177

178-
dc.ensureWeightedQueue(defaultQueueWeight)
179178
dc.setGVRWeight(schema.GroupVersionResource{}, defaultQueueWeight)
180179

181180
return dc
@@ -198,6 +197,8 @@ func (dc *DynamicController) getGVRWeight(gvr schema.GroupVersionResource) int {
198197
// setGVRWeight sets the weight for a given GroupVersionResource (GVR) and updates the
199198
// corresponding weighted queue.
200199
func (dc *DynamicController) setGVRWeight(gvr schema.GroupVersionResource, weight int) {
200+
dc.ensureWeightedQueue(weight)
201+
201202
dc.mu.Lock()
202203
defer dc.mu.Unlock()
203204

@@ -215,6 +216,11 @@ func (dc *DynamicController) deleteGVRWeight(gvr schema.GroupVersionResource) {
215216
weight := dc.gvrWeights[gvr]
216217
wq := dc.weightedQueues[weight]
217218

219+
if weight == defaultQueueWeight {
220+
dc.log.Error(nil, "cannot delete default queue", "weight", weight)
221+
return
222+
}
223+
218224
delete(dc.gvrWeights, gvr)
219225
delete(wq.gvrSet, gvr)
220226
if len(wq.gvrSet) < 1 {
@@ -295,9 +301,9 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool {
295301
// shutdownQueues shuts down all the weighted queues managed by the DynamicController.
296302
func (dc *DynamicController) shutdownQueues() {
297303
dc.log.Info("Shutting down dynamic controller queues")
298-
for key := range dc.weightedQueues {
304+
for key, wq := range dc.weightedQueues {
299305
dc.log.Info("Shutting down weighted queue", "key", key)
300-
dc.weightedQueues[key].queue.ShutDown()
306+
wq.queue.ShutDown()
301307
}
302308
}
303309

@@ -335,13 +341,18 @@ func (dc *DynamicController) worker(ctx context.Context) {
335341
// A queue with a weight of 200 will be selected twice as often assuming an even
336342
// number of events are distributed between the queues
337343
func (dc *DynamicController) selectQueueByWeight() *WeightedQueue {
344+
startTime := time.Now()
345+
338346
var (
339347
totalWeight int = 0
340348
maxWeight int = 0
341349
selectedQueue *WeightedQueue
342350
activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues))
343351
)
344352

353+
dc.mu.RLock()
354+
defer dc.mu.RUnlock()
355+
345356
for _, wq := range dc.weightedQueues {
346357
if wq.queue.Len() > 0 {
347358
totalWeight += wq.weight
@@ -365,6 +376,9 @@ func (dc *DynamicController) selectQueueByWeight() *WeightedQueue {
365376
}
366377
}
367378

379+
duration := time.Since(startTime)
380+
dc.log.V(1).Info("Time to select queue", "duration", duration.String(), "weight", selectedQueue.weight)
381+
368382
return selectedQueue
369383
}
370384

@@ -374,7 +388,7 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
374388

375389
obj, shutdown := weightedQueue.queue.Get()
376390
if shutdown {
377-
return false
391+
return true
378392
}
379393
defer weightedQueue.queue.Done(obj)
380394

@@ -570,9 +584,6 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro
570584
dc.log.V(1).Info("Registering new GVK", "gvr", gvr)
571585

572586
// Set the weight for the GVR and ensure the queue exists for that weight class
573-
if ok := dc.ensureWeightedQueue(queueWeight); !ok {
574-
return fmt.Errorf("failed to create or get weighted queue with weight: %d", queueWeight)
575-
}
576587
dc.setGVRWeight(gvr, queueWeight)
577588

578589
_, exists := dc.informers.Load(gvr)

pkg/metadata/labels.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ func (gl GenericLabeler) Copy() map[string]string {
115115
// ResourceGraphDefinitionLabel and ResourceGraphDefinitionIDLabel labels on a resource.
116116
func NewResourceGraphDefinitionLabeler(rgMeta metav1.Object) GenericLabeler {
117117
return map[string]string{
118-
ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()),
119-
ResourceGraphDefinitionNameLabel: rgMeta.GetName(),
118+
ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()),
119+
ResourceGraphDefinitionNameLabel: rgMeta.GetName(),
120120
}
121121
}
122122

0 commit comments

Comments
 (0)