Skip to content

Commit 5ef0359

Browse files
authored
feat: use priority queue for node drainer worker pipeline (#1341)
Signed-off-by: Ajay Mishra <ajmishra@nvidia.com>
1 parent 7643422 commit 5ef0359

8 files changed

Lines changed: 350 additions & 9 deletions

File tree

docs/designs/040-node-drainer-priority-queue.md renamed to docs/designs/041-node-drainer-priority-queue.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# ADR-040: Node Drainer — Priority Queue
1+
# ADR-041: Node Drainer — Priority Queue
22

33
## Context
44

node-drainer/pkg/metrics/metrics.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ var (
119119
},
120120
)
121121

122+
// QueueItemsAssigned tracks priority decisions made when events enter the ready queue
123+
QueueItemsAssigned = promauto.NewCounterVec(
124+
prometheus.CounterOpts{
125+
Name: "node_drainer_queue_items_assigned_total",
126+
Help: "Total number of ready queue items assigned by priority and reason.",
127+
},
128+
[]string{"priority", "reason"},
129+
)
130+
122131
// CustomDrainCRDNotFound tracks failures when custom drain CRD is not found
123132
CustomDrainCRDNotFound = promauto.NewCounterVec(
124133
prometheus.CounterOpts{
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queue
16+
17+
import (
18+
"log/slog"
19+
"sync"
20+
21+
"k8s.io/client-go/util/workqueue"
22+
23+
"github.com/nvidia/nvsentinel/node-drainer/pkg/metrics"
24+
)
25+
26+
type queuePriority string
27+
type nodePriorityStateKind int
28+
29+
const (
30+
queuePriorityHigh queuePriority = "high"
31+
queuePriorityLow queuePriority = "low"
32+
33+
priorityReasonNodeNotYetDraining = "node_not_yet_draining"
34+
priorityReasonNodeAlreadyDraining = "node_already_draining"
35+
priorityReasonNodeHighPriorityQueued = "node_high_priority_queued"
36+
)
37+
38+
const (
39+
nodePriorityStateNone nodePriorityStateKind = iota
40+
nodePriorityStateDraining
41+
nodePriorityStateRepresented
42+
)
43+
44+
// nodePriorityState tracks the node-level state needed to classify new ready
45+
// queue items without changing the event lifecycle itself.
46+
type nodePriorityState struct {
47+
mu sync.Mutex
48+
49+
nodes map[string]nodePriorityStateEntry
50+
}
51+
52+
type nodePriorityStateEntry struct {
53+
kind nodePriorityStateKind
54+
representativeKey string // only meaningful when kind == nodePriorityStateRepresented
55+
}
56+
57+
// nodeEventPriorityQueue is the ready queue used under Kubernetes' rate
58+
// limiting workqueue. It preserves FIFO order within each priority lane.
59+
type nodeEventPriorityQueue struct {
60+
state *nodePriorityState
61+
high []NodeEvent
62+
low []NodeEvent
63+
}
64+
65+
var _ workqueue.Queue[NodeEvent] = (*nodeEventPriorityQueue)(nil)
66+
67+
func newNodePriorityState() *nodePriorityState {
68+
return &nodePriorityState{
69+
nodes: make(map[string]nodePriorityStateEntry),
70+
}
71+
}
72+
73+
func newNodeEventPriorityQueue(state *nodePriorityState) *nodeEventPriorityQueue {
74+
return &nodeEventPriorityQueue{
75+
state: state,
76+
}
77+
}
78+
79+
func (q *nodeEventPriorityQueue) Touch(NodeEvent) {}
80+
81+
// Push assigns priority when an item becomes ready. Only the first queued item
82+
// for a not-yet-draining node gets the high-priority lane.
83+
func (q *nodeEventPriorityQueue) Push(item NodeEvent) {
84+
priority, reason := q.state.classifyForEnqueue(item)
85+
if priority == queuePriorityHigh {
86+
q.high = append(q.high, item)
87+
} else {
88+
q.low = append(q.low, item)
89+
}
90+
91+
metrics.QueueItemsAssigned.WithLabelValues(string(priority), reason).Inc()
92+
slog.Debug("Assigned node-drainer queue priority",
93+
"node", item.NodeName,
94+
"eventID", item.EventID,
95+
"priority", priority,
96+
"reason", reason)
97+
}
98+
99+
func (q *nodeEventPriorityQueue) Len() int {
100+
return len(q.high) + len(q.low)
101+
}
102+
103+
// Pop always drains high-priority representatives before duplicate or
104+
// already-draining work in the low-priority lane.
105+
func (q *nodeEventPriorityQueue) Pop() NodeEvent {
106+
if len(q.high) > 0 {
107+
return popNodeEvent(&q.high)
108+
}
109+
110+
return popNodeEvent(&q.low)
111+
}
112+
113+
// classifyForEnqueue decides whether an item can still improve the time to
114+
// first draining transition for its node.
115+
func (s *nodePriorityState) classifyForEnqueue(item NodeEvent) (queuePriority, string) {
116+
s.mu.Lock()
117+
defer s.mu.Unlock()
118+
119+
entry := s.nodes[item.NodeName]
120+
if entry.kind == nodePriorityStateDraining {
121+
return queuePriorityLow, priorityReasonNodeAlreadyDraining
122+
}
123+
124+
if entry.kind == nodePriorityStateRepresented {
125+
return queuePriorityLow, priorityReasonNodeHighPriorityQueued
126+
}
127+
128+
s.nodes[item.NodeName] = nodePriorityStateEntry{
129+
kind: nodePriorityStateRepresented,
130+
representativeKey: representativeKey(item),
131+
}
132+
133+
return queuePriorityHigh, priorityReasonNodeNotYetDraining
134+
}
135+
136+
// releaseRepresentative allows a node to receive another high-priority item
137+
// after its current high-priority representative has left the ready queue.
138+
func (s *nodePriorityState) releaseRepresentative(item NodeEvent) {
139+
s.mu.Lock()
140+
defer s.mu.Unlock()
141+
142+
entry := s.nodes[item.NodeName]
143+
if entry.kind != nodePriorityStateRepresented || entry.representativeKey != representativeKey(item) {
144+
return
145+
}
146+
147+
delete(s.nodes, item.NodeName)
148+
}
149+
150+
// markNodeDraining moves future work for this node to low priority until the
151+
// draining label is removed or replaced by a terminal state.
152+
func (s *nodePriorityState) markNodeDraining(nodeName string) {
153+
s.mu.Lock()
154+
defer s.mu.Unlock()
155+
156+
s.nodes[nodeName] = nodePriorityStateEntry{kind: nodePriorityStateDraining}
157+
}
158+
159+
// clearNodeDraining lets the node receive high-priority work again after it
160+
// leaves the draining state.
161+
func (s *nodePriorityState) clearNodeDraining(nodeName string) {
162+
s.mu.Lock()
163+
defer s.mu.Unlock()
164+
165+
if s.nodes[nodeName].kind == nodePriorityStateDraining {
166+
delete(s.nodes, nodeName)
167+
}
168+
}
169+
170+
func popNodeEvent(items *[]NodeEvent) NodeEvent {
171+
item := (*items)[0]
172+
(*items)[0] = NodeEvent{}
173+
*items = (*items)[1:]
174+
175+
return item
176+
}
177+
178+
func representativeKey(item NodeEvent) string {
179+
return item.NodeName + ":" + item.EventID
180+
}

node-drainer/pkg/queue/queue.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,23 @@ import (
2929
)
3030

3131
func NewEventQueueManager() EventQueueManager {
32+
priorityState := newNodePriorityState()
33+
baseQueue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[NodeEvent]{
34+
Queue: newNodeEventPriorityQueue(priorityState),
35+
})
36+
delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[NodeEvent]{
37+
Queue: baseQueue,
38+
})
39+
3240
mgr := &eventQueueManager{
33-
queue: workqueue.NewTypedRateLimitingQueue(
41+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
3442
workqueue.NewTypedItemExponentialFailureRateLimiter[NodeEvent](10*time.Second, 2*time.Minute),
43+
workqueue.TypedRateLimitingQueueConfig[NodeEvent]{
44+
DelayingQueue: delayingQueue,
45+
},
3546
),
36-
shutdown: make(chan struct{}),
47+
shutdown: make(chan struct{}),
48+
priorityState: priorityState,
3749
}
3850

3951
return mgr
@@ -45,6 +57,14 @@ func (m *eventQueueManager) SetDataStoreEventProcessor(processor DataStoreEventP
4557
m.dataStoreEventProcessor = processor
4658
}
4759

60+
func (m *eventQueueManager) MarkNodeDraining(nodeName string) {
61+
m.priorityState.markNodeDraining(nodeName)
62+
}
63+
64+
func (m *eventQueueManager) ClearNodeDraining(nodeName string) {
65+
m.priorityState.clearNodeDraining(nodeName)
66+
}
67+
4868
// EnqueueEventGeneric enqueues an event using the new database-agnostic interface.
4969
// Only the document ID is stored in the queue; the full event is fetched from the
5070
// database lazily when the worker processes the item, keeping queue memory minimal.

node-drainer/pkg/queue/queue_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,122 @@ func TestWorkqueueDeduplication_DifferentNodes(t *testing.T) {
343343
queueImpl.queue.Done(item2)
344344
}
345345

346+
func TestPriorityQueue_GroupedFloodPrioritizesUnrepresentedNodes(t *testing.T) {
347+
mgr := NewEventQueueManager()
348+
defer mgr.Shutdown()
349+
350+
ctx := context.Background()
351+
mockDB := &mockDataStore{}
352+
mockHealthEventStore := &MockHealthEventStore{}
353+
354+
node1Event1 := datastore.Event{
355+
"_id": "507f1f77bcf86cd799439011",
356+
"nodeName": "node-1",
357+
}
358+
node1Event2 := datastore.Event{
359+
"_id": "507f1f77bcf86cd799439012",
360+
"nodeName": "node-1",
361+
}
362+
node2Event1 := datastore.Event{
363+
"_id": "507f1f77bcf86cd799439013",
364+
"nodeName": "node-2",
365+
}
366+
367+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-1", node1Event1, mockDB, mockHealthEventStore, node1Event1["_id"]))
368+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-1", node1Event2, mockDB, mockHealthEventStore, node1Event2["_id"]))
369+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-2", node2Event1, mockDB, mockHealthEventStore, node2Event1["_id"]))
370+
371+
queueImpl := mgr.(*eventQueueManager)
372+
373+
item1, shutdown := queueImpl.queue.Get()
374+
require.False(t, shutdown)
375+
assert.Equal(t, "507f1f77bcf86cd799439011", item1.EventID)
376+
377+
item2, shutdown := queueImpl.queue.Get()
378+
require.False(t, shutdown)
379+
assert.Equal(t, "507f1f77bcf86cd799439013", item2.EventID, "unrepresented node gets the high-priority lane")
380+
381+
item3, shutdown := queueImpl.queue.Get()
382+
require.False(t, shutdown)
383+
assert.Equal(t, "507f1f77bcf86cd799439012", item3.EventID, "duplicate work for represented node remains low priority")
384+
385+
queueImpl.queue.Done(item1)
386+
queueImpl.queue.Done(item2)
387+
queueImpl.queue.Done(item3)
388+
}
389+
390+
func TestPriorityQueue_DrainingNodesStayLowPriorityUntilCleared(t *testing.T) {
391+
mgr := NewEventQueueManager()
392+
defer mgr.Shutdown()
393+
394+
ctx := context.Background()
395+
mockDB := &mockDataStore{}
396+
mockHealthEventStore := &MockHealthEventStore{}
397+
398+
queueImpl := mgr.(*eventQueueManager)
399+
queueImpl.MarkNodeDraining("node-1")
400+
401+
node1DrainingEvent := datastore.Event{
402+
"_id": "507f1f77bcf86cd799439021",
403+
"nodeName": "node-1",
404+
}
405+
node2Event := datastore.Event{
406+
"_id": "507f1f77bcf86cd799439022",
407+
"nodeName": "node-2",
408+
}
409+
node1AfterClearEvent := datastore.Event{
410+
"_id": "507f1f77bcf86cd799439023",
411+
"nodeName": "node-1",
412+
}
413+
414+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-1", node1DrainingEvent, mockDB, mockHealthEventStore, node1DrainingEvent["_id"]))
415+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-2", node2Event, mockDB, mockHealthEventStore, node2Event["_id"]))
416+
417+
item1, shutdown := queueImpl.queue.Get()
418+
require.False(t, shutdown)
419+
assert.Equal(t, "507f1f77bcf86cd799439022", item1.EventID, "not-yet-draining node should run before draining duplicate work")
420+
421+
queueImpl.ClearNodeDraining("node-1")
422+
require.NoError(t, mgr.EnqueueEventGeneric(ctx, "node-1", node1AfterClearEvent, mockDB, mockHealthEventStore, node1AfterClearEvent["_id"]))
423+
424+
item2, shutdown := queueImpl.queue.Get()
425+
require.False(t, shutdown)
426+
assert.Equal(t, "507f1f77bcf86cd799439023", item2.EventID, "cleared node can receive high priority again")
427+
428+
item3, shutdown := queueImpl.queue.Get()
429+
require.False(t, shutdown)
430+
assert.Equal(t, "507f1f77bcf86cd799439021", item3.EventID)
431+
432+
queueImpl.queue.Done(item1)
433+
queueImpl.queue.Done(item2)
434+
queueImpl.queue.Done(item3)
435+
}
436+
437+
func TestPriorityQueue_NonComparableDocumentID_DoesNotPanic(t *testing.T) {
438+
state := newNodePriorityState()
439+
queueImpl := newNodeEventPriorityQueue(state)
440+
item := NodeEvent{
441+
NodeName: "node-1",
442+
EventID: "event-1",
443+
DocumentID: map[string]string{"id": "non-comparable"},
444+
}
445+
446+
require.NotPanics(t, func() {
447+
queueImpl.Push(item)
448+
entry := state.nodes[item.NodeName]
449+
require.Equal(t, nodePriorityStateRepresented, entry.kind)
450+
require.Equal(t, representativeKey(item), entry.representativeKey)
451+
452+
got := queueImpl.Pop()
453+
assert.Equal(t, item.NodeName, got.NodeName)
454+
assert.Equal(t, item.EventID, got.EventID)
455+
456+
state.releaseRepresentative(got)
457+
_, represented := state.nodes[item.NodeName]
458+
assert.False(t, represented)
459+
})
460+
}
461+
346462
// Mock DataStore for testing
347463
type mockDataStore struct{}
348464

node-drainer/pkg/queue/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type EventQueueManager interface {
5656

5757
Start(ctx context.Context)
5858
Shutdown()
59+
MarkNodeDraining(nodeName string)
60+
ClearNodeDraining(nodeName string)
5961

6062
// New database-agnostic method
6163
SetDataStoreEventProcessor(processor DataStoreEventProcessor)
@@ -68,4 +70,5 @@ type eventQueueManager struct {
6870
dataStoreEventProcessor DataStoreEventProcessor // New database-agnostic processor
6971
shutdown chan struct{}
7072
sessions sync.Map // EventID -> *DrainSession
73+
priorityState *nodePriorityState
7174
}

node-drainer/pkg/queue/worker.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ func (m *eventQueueManager) processNextWorkItem(ctx context.Context) bool {
7373
return false
7474
}
7575

76-
defer m.queue.Done(nodeEvent)
76+
defer func() {
77+
m.priorityState.releaseRepresentative(nodeEvent)
78+
m.queue.Done(nodeEvent)
79+
}()
7780

7881
if nodeEvent.Database == nil || nodeEvent.HealthEventStore == nil {
7982
slog.ErrorContext(ctx, "NodeEvent missing database or health event store, dropping",

0 commit comments

Comments
 (0)