Skip to content

Commit 39dccd1

Browse files
jshmchenximanirajv06
authored andcommitted
[YUNIKORN-3269] Respect Kubernetes node cordon state (#1020)
Closes: #1020 Signed-off-by: Manikandan R <manirajv06@gmail.com>
1 parent d993fa9 commit 39dccd1

2 files changed

Lines changed: 176 additions & 4 deletions

File tree

pkg/cache/context.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,11 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
219219
}
220220
}
221221

222-
// if node was registered in-line, enable it in the core
223-
if err := ctx.enableNode(node); err != nil {
224-
log.Log(log.ShimContext).Warn("Failed to enable node", zap.Error(err))
222+
// if node is not cordoned, enable it in the core
223+
if !node.Spec.Unschedulable {
224+
if err := ctx.enableNode(node); err != nil {
225+
log.Log(log.ShimContext).Warn("Failed to enable node", zap.Error(err))
226+
}
225227
}
226228
} else {
227229
// existing node
@@ -236,9 +238,24 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
236238
log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name))
237239
}
238240
}
241+
if err := ctx.updateNodeSchedulability(prevNode, node); err != nil {
242+
log.Log(log.ShimContext).Warn("Failed to update node schedulability", zap.Error(err))
243+
}
239244
}
240245
}
241246

247+
func (ctx *Context) updateNodeSchedulability(prevNode, node *v1.Node) error {
248+
if prevNode.Spec.Unschedulable == node.Spec.Unschedulable {
249+
return nil
250+
}
251+
action := si.NodeInfo_DRAIN_TO_SCHEDULABLE
252+
if node.Spec.Unschedulable {
253+
action = si.NodeInfo_DRAIN_NODE
254+
}
255+
request := common.CreateUpdateRequestForDeleteOrRestoreNode(node.Name, action)
256+
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
257+
}
258+
242259
func (ctx *Context) deleteNode(obj interface{}) {
243260
ctx.lock.Lock()
244261
defer ctx.lock.Unlock()
@@ -1405,7 +1422,7 @@ func (ctx *Context) InitializeState() error {
14051422

14061423
// Step 4: Enable nodes. At this point all allocations and asks have been processed, so it is safe to allow the
14071424
// core to begin scheduling.
1408-
err = ctx.enableNodes(acceptedNodes)
1425+
err = ctx.enableNodes(filterSchedulableNodes(acceptedNodes))
14091426
if err != nil {
14101427
log.Log(log.ShimContext).Error("failed to enable nodes", zap.Error(err))
14111428
return err
@@ -1646,6 +1663,16 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
16461663
return nil
16471664
}
16481665

1666+
func filterSchedulableNodes(nodes []*v1.Node) []*v1.Node {
1667+
schedulableNodes := make([]*v1.Node, 0, len(nodes))
1668+
for _, node := range nodes {
1669+
if node != nil && !node.Spec.Unschedulable {
1670+
schedulableNodes = append(schedulableNodes, node)
1671+
}
1672+
}
1673+
return schedulableNodes
1674+
}
1675+
16491676
func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error {
16501677
// list all nodes via the informer
16511678
nodes, err := ctx.apiProvider.GetAPIs().NodeInformer.Lister().List(labels.Everything())

pkg/cache/context_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,110 @@ func TestUpdateNodes(t *testing.T) {
212212
ctx.updateNode(&oldNode, &newNode)
213213
}
214214

215+
func TestAddCordonedNodeDoesNotEnable(t *testing.T) {
216+
ctx, apiProvider := initContextAndAPIProviderForTest()
217+
dispatcher.Start()
218+
defer dispatcher.UnregisterAllEventHandlers()
219+
defer dispatcher.Stop()
220+
221+
actions := make([]si.NodeInfo_ActionFromRM, 0)
222+
apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error {
223+
for _, node := range request.Nodes {
224+
actions = append(actions, node.Action)
225+
if node.Action == si.NodeInfo_CREATE_DRAIN {
226+
dispatcher.Dispatch(CachedSchedulerNodeEvent{
227+
NodeID: node.NodeID,
228+
Event: NodeAccepted,
229+
})
230+
}
231+
}
232+
return nil
233+
})
234+
235+
node := v1.Node{
236+
ObjectMeta: apis.ObjectMeta{
237+
Name: Host1,
238+
Namespace: "default",
239+
UID: uid1,
240+
},
241+
Spec: v1.NodeSpec{
242+
Unschedulable: true,
243+
},
244+
}
245+
246+
ctx.addNode(&node)
247+
248+
assert.Equal(t, len(actions), 1)
249+
assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
250+
}
251+
252+
func TestUpdateNodeSchedulability(t *testing.T) {
253+
ctx, apiProvider := initContextAndAPIProviderForTest()
254+
dispatcher.Start()
255+
defer dispatcher.UnregisterAllEventHandlers()
256+
defer dispatcher.Stop()
257+
258+
actions := make([]si.NodeInfo_ActionFromRM, 0)
259+
apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error {
260+
for _, node := range request.Nodes {
261+
actions = append(actions, node.Action)
262+
if node.Action == si.NodeInfo_CREATE_DRAIN {
263+
dispatcher.Dispatch(CachedSchedulerNodeEvent{
264+
NodeID: node.NodeID,
265+
Event: NodeAccepted,
266+
})
267+
}
268+
}
269+
return nil
270+
})
271+
272+
node := v1.Node{
273+
ObjectMeta: apis.ObjectMeta{
274+
Name: Host1,
275+
Namespace: "default",
276+
UID: uid1,
277+
},
278+
}
279+
280+
ctx.addNode(&node)
281+
assert.Equal(t, len(actions), 2)
282+
assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
283+
assert.Equal(t, actions[1], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
284+
285+
cordoned := node.DeepCopy()
286+
cordoned.Spec.Unschedulable = true
287+
ctx.updateNode(&node, cordoned)
288+
assert.Equal(t, len(actions), 3)
289+
assert.Equal(t, actions[2], si.NodeInfo_DRAIN_NODE)
290+
291+
cordonedCopy := cordoned.DeepCopy()
292+
ctx.updateNode(cordoned, cordonedCopy)
293+
assert.Equal(t, len(actions), 3)
294+
295+
uncordoned := cordoned.DeepCopy()
296+
uncordoned.Spec.Unschedulable = false
297+
ctx.updateNode(cordoned, uncordoned)
298+
assert.Equal(t, len(actions), 4)
299+
assert.Equal(t, actions[3], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
300+
}
301+
302+
func TestFilterSchedulableNodes(t *testing.T) {
303+
node1 := &v1.Node{
304+
ObjectMeta: apis.ObjectMeta{Name: Host1},
305+
}
306+
node2 := &v1.Node{
307+
ObjectMeta: apis.ObjectMeta{Name: Host2},
308+
Spec: v1.NodeSpec{
309+
Unschedulable: true,
310+
},
311+
}
312+
313+
nodes := filterSchedulableNodes([]*v1.Node{node1, nil, node2})
314+
315+
assert.Equal(t, len(nodes), 1)
316+
assert.Equal(t, nodes[0].Name, Host1)
317+
}
318+
215319
func TestDeleteNodes(t *testing.T) {
216320
ctx, apiProvider := initContextAndAPIProviderForTest()
217321
dispatcher.Start()
@@ -2000,6 +2104,47 @@ func TestInitializeState(t *testing.T) {
20002104
assert.Assert(t, task3 == nil, "pod3 was found")
20012105
}
20022106

2107+
func TestInitializeStateDoesNotEnableCordonedNodes(t *testing.T) {
2108+
context, apiProvider := initContextAndAPIProviderForTest()
2109+
apiProvider.RunEventHandler()
2110+
nodeLister, ok := apiProvider.GetAPIs().NodeInformer.Lister().(*test.NodeListerMock)
2111+
assert.Assert(t, ok, "unable to get mock node lister")
2112+
2113+
dispatcher.Start()
2114+
defer dispatcher.UnregisterAllEventHandlers()
2115+
defer dispatcher.Stop()
2116+
2117+
actionsByNode := make(map[string][]si.NodeInfo_ActionFromRM)
2118+
apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error {
2119+
for _, node := range request.Nodes {
2120+
actionsByNode[node.NodeID] = append(actionsByNode[node.NodeID], node.Action)
2121+
if node.Action == si.NodeInfo_CREATE_DRAIN {
2122+
dispatcher.Dispatch(CachedSchedulerNodeEvent{
2123+
NodeID: node.NodeID,
2124+
Event: NodeAccepted,
2125+
})
2126+
}
2127+
}
2128+
return nil
2129+
})
2130+
2131+
schedulableNode := nodeForTest(nodeName1, "10G", "4")
2132+
cordonedNode := nodeForTest(nodeName2, "10G", "4")
2133+
cordonedNode.Spec.Unschedulable = true
2134+
nodeLister.AddNode(schedulableNode)
2135+
nodeLister.AddNode(cordonedNode)
2136+
2137+
err := context.InitializeState()
2138+
assert.NilError(t, err, "InitializeState failed")
2139+
2140+
assert.Assert(t, context.schedulerCache.GetNode(nodeName1) != nil, "schedulable node was not cached")
2141+
assert.Assert(t, context.schedulerCache.GetNode(nodeName2) != nil, "cordoned node was not cached")
2142+
2143+
cordonedActions := actionsByNode[nodeName2]
2144+
assert.Equal(t, len(cordonedActions), 1)
2145+
assert.Equal(t, cordonedActions[0], si.NodeInfo_CREATE_DRAIN)
2146+
}
2147+
20032148
func TestPodAdoption(t *testing.T) {
20042149
ctx, apiProvider := initContextAndAPIProviderForTest()
20052150
dispatcher.Start()

0 commit comments

Comments
 (0)