Skip to content

Commit e580300

Browse files
TAS: Filter out Node updates with LastHeartbeatTime changes (#6636)
Signed-off-by: utam0k <k0ma@utam0k.jp> Co-authored-by: utam0k <k0ma@utam0k.jp>
1 parent 7360f11 commit e580300

File tree

3 files changed

+311
-0
lines changed

3 files changed

+311
-0
lines changed

pkg/controller/tas/resource_flavor.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121

2222
"github.com/go-logr/logr"
2323
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/api/equality"
25+
"k8s.io/apimachinery/pkg/api/resource"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/conversion"
2428
"k8s.io/apimachinery/pkg/types"
2529
"k8s.io/client-go/tools/record"
2630
"k8s.io/client-go/util/workqueue"
@@ -43,6 +47,24 @@ import (
4347
"sigs.k8s.io/kueue/pkg/queue"
4448
)
4549

50+
var nodeSemantic = conversion.EqualitiesOrDie(
51+
nodeConditionEqual,
52+
// Handle metav1.Time comparison to avoid panic on unexported fields
53+
func(a, b metav1.Time) bool {
54+
return a.Equal(&b)
55+
},
56+
// Handle resource.Quantity comparison to avoid panic on unexported fields
57+
func(a, b resource.Quantity) bool {
58+
return a.Equal(b)
59+
},
60+
)
61+
62+
func nodeConditionEqual(a, b corev1.NodeCondition) bool {
63+
aCopy, bCopy := a.DeepCopy(), b.DeepCopy()
64+
aCopy.LastHeartbeatTime, bCopy.LastHeartbeatTime = metav1.Time{}, metav1.Time{}
65+
return equality.Semantic.DeepEqual(aCopy, bCopy)
66+
}
67+
4668
type rfReconciler struct {
4769
log logr.Logger
4870
queues *queue.Manager
@@ -109,6 +131,12 @@ func (h *nodeHandler) Update(ctx context.Context, e event.UpdateEvent, q workque
109131
if !isOldNode || !isNewNode {
110132
return
111133
}
134+
135+
if !checkNodeSchedulingPropertiesChanged(oldNode, newNode) {
136+
ctrl.LoggerFrom(ctx).V(5).Info("Skipping node update as new Node is semantically same as old Node", "node", newNode.Name)
137+
return
138+
}
139+
112140
h.queueReconcileForNode(oldNode, q)
113141
h.queueReconcileForNode(newNode, q)
114142
}
@@ -203,3 +231,8 @@ func nodeBelongsToFlavor(node *corev1.Node, nodeLabels map[string]string, levels
203231
}
204232
return true
205233
}
234+
235+
// checkNodeSchedulingPropertiesChanged checks if the node update affects TAS scheduling.
236+
func checkNodeSchedulingPropertiesChanged(oldNode, newNode *corev1.Node) bool {
237+
return !nodeSemantic.DeepEqual(oldNode, newNode)
238+
}
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package tas
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/api/resource"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
27+
testingnode "sigs.k8s.io/kueue/pkg/util/testingjobs/node"
28+
)
29+
30+
func TestNodeHandler_Update(t *testing.T) {
31+
now := metav1.Now()
32+
later := metav1.NewTime(now.Add(10 * time.Second))
33+
34+
baseNode := testingnode.MakeNode("test-node").
35+
Annotation("test-annotation", "value").
36+
Label("topology.kubernetes.io/zone", "zone-a").
37+
Label("node-role", "worker").
38+
Taints(corev1.Taint{
39+
Key: "test-taint",
40+
Value: "value",
41+
Effect: corev1.TaintEffectNoSchedule,
42+
}).
43+
StatusAllocatable(corev1.ResourceList{
44+
corev1.ResourceCPU: resource.MustParse("8"),
45+
corev1.ResourceMemory: resource.MustParse("32Gi"),
46+
})
47+
48+
testCases := map[string]struct {
49+
oldNode *corev1.Node
50+
newNode *corev1.Node
51+
wantChanged bool
52+
}{
53+
"LastHeartbeatTime changed": {
54+
oldNode: baseNode.Clone().
55+
StatusConditions(
56+
corev1.NodeCondition{
57+
Type: corev1.NodeReady,
58+
Status: corev1.ConditionTrue,
59+
LastHeartbeatTime: now,
60+
LastTransitionTime: now,
61+
},
62+
corev1.NodeCondition{
63+
Type: corev1.NodeMemoryPressure,
64+
Status: corev1.ConditionFalse,
65+
LastHeartbeatTime: now,
66+
LastTransitionTime: now,
67+
},
68+
).Obj(),
69+
newNode: baseNode.Clone().
70+
StatusConditions(
71+
corev1.NodeCondition{
72+
Type: corev1.NodeReady,
73+
Status: corev1.ConditionTrue,
74+
LastHeartbeatTime: later,
75+
LastTransitionTime: now,
76+
},
77+
corev1.NodeCondition{
78+
Type: corev1.NodeMemoryPressure,
79+
Status: corev1.ConditionFalse,
80+
LastHeartbeatTime: later,
81+
LastTransitionTime: now,
82+
},
83+
).Obj(),
84+
wantChanged: false,
85+
},
86+
"Annotation changed": {
87+
oldNode: baseNode.Clone().Obj(),
88+
newNode: baseNode.Clone().Annotation("new-annotation", "new-value").Obj(),
89+
wantChanged: true,
90+
},
91+
"Label changed": {
92+
oldNode: baseNode.Clone().Obj(),
93+
newNode: baseNode.Clone().Label("new-label", "new-value").Obj(),
94+
wantChanged: true,
95+
},
96+
"Node Ready status changed": {
97+
oldNode: baseNode.Clone().
98+
StatusConditions(
99+
corev1.NodeCondition{
100+
Type: corev1.NodeReady,
101+
Status: corev1.ConditionTrue,
102+
LastHeartbeatTime: now,
103+
LastTransitionTime: now,
104+
},
105+
).Obj(),
106+
newNode: baseNode.Clone().
107+
StatusConditions(
108+
corev1.NodeCondition{
109+
Type: corev1.NodeReady,
110+
Status: corev1.ConditionFalse,
111+
LastHeartbeatTime: later,
112+
LastTransitionTime: later,
113+
},
114+
).Obj(),
115+
wantChanged: true,
116+
},
117+
"Allocatable resources changed": {
118+
oldNode: baseNode.Clone().Obj(),
119+
newNode: baseNode.Clone().StatusAllocatable(corev1.ResourceList{
120+
corev1.ResourceCPU: resource.MustParse("16"),
121+
corev1.ResourceMemory: resource.MustParse("32Gi"),
122+
}).Obj(),
123+
wantChanged: true,
124+
},
125+
"Taints changed": {
126+
oldNode: baseNode.Clone().Obj(),
127+
newNode: baseNode.Clone().Taints(corev1.Taint{
128+
Key: "new-taint",
129+
Value: "new-value",
130+
Effect: corev1.TaintEffectNoExecute,
131+
}).Obj(),
132+
wantChanged: true,
133+
},
134+
"Taints with TimeAdded": {
135+
oldNode: baseNode.Clone().Taints(corev1.Taint{
136+
Key: "test",
137+
Value: "value",
138+
Effect: corev1.TaintEffectNoExecute,
139+
TimeAdded: &now,
140+
}).Obj(),
141+
newNode: baseNode.Clone().Taints(corev1.Taint{
142+
Key: "test",
143+
Value: "value",
144+
Effect: corev1.TaintEffectNoExecute,
145+
TimeAdded: &later,
146+
}).Obj(),
147+
wantChanged: true,
148+
},
149+
"Taints TimeAdded from null to non-null": {
150+
oldNode: baseNode.Clone().Taints(corev1.Taint{
151+
Key: "test",
152+
Value: "value",
153+
Effect: corev1.TaintEffectNoExecute,
154+
TimeAdded: nil,
155+
}).Obj(),
156+
newNode: baseNode.Clone().Taints(corev1.Taint{
157+
Key: "test",
158+
Value: "value",
159+
Effect: corev1.TaintEffectNoExecute,
160+
TimeAdded: &later,
161+
}).Obj(),
162+
wantChanged: true,
163+
},
164+
"Unschedulable changed": {
165+
oldNode: baseNode.Clone().Obj(),
166+
newNode: baseNode.Clone().Unschedulable().Obj(),
167+
wantChanged: true,
168+
},
169+
"Update Multiple properties": {
170+
oldNode: baseNode.Clone().
171+
StatusConditions(
172+
corev1.NodeCondition{
173+
Type: corev1.NodeReady,
174+
Status: corev1.ConditionTrue,
175+
LastHeartbeatTime: now,
176+
LastTransitionTime: now,
177+
},
178+
corev1.NodeCondition{
179+
Type: corev1.NodeMemoryPressure,
180+
Status: corev1.ConditionFalse,
181+
LastHeartbeatTime: now,
182+
LastTransitionTime: now,
183+
},
184+
).Obj(),
185+
newNode: baseNode.Clone().
186+
StatusConditions(
187+
corev1.NodeCondition{
188+
Type: corev1.NodeReady,
189+
Status: corev1.ConditionTrue,
190+
LastHeartbeatTime: later,
191+
LastTransitionTime: now,
192+
},
193+
corev1.NodeCondition{
194+
Type: corev1.NodeMemoryPressure,
195+
Status: corev1.ConditionFalse,
196+
LastHeartbeatTime: later,
197+
LastTransitionTime: now,
198+
},
199+
).
200+
Annotation("another-annotation", "another-value").
201+
ResourceVersion("12345").
202+
Obj(),
203+
wantChanged: true,
204+
},
205+
"New condition type added": {
206+
oldNode: baseNode.Clone().Obj(),
207+
newNode: baseNode.Clone().StatusConditions(corev1.NodeCondition{
208+
Type: corev1.NodeDiskPressure,
209+
Status: corev1.ConditionTrue,
210+
LastHeartbeatTime: now,
211+
LastTransitionTime: now,
212+
}).Obj(),
213+
wantChanged: true,
214+
},
215+
"Condition removed": {
216+
oldNode: baseNode.Clone().
217+
StatusConditions(
218+
corev1.NodeCondition{
219+
Type: corev1.NodeReady,
220+
Status: corev1.ConditionTrue,
221+
LastHeartbeatTime: now,
222+
LastTransitionTime: now,
223+
},
224+
corev1.NodeCondition{
225+
Type: corev1.NodeDiskPressure,
226+
Status: corev1.ConditionFalse,
227+
LastHeartbeatTime: now,
228+
LastTransitionTime: now,
229+
},
230+
).Obj(),
231+
newNode: baseNode.Clone().
232+
StatusConditions(
233+
corev1.NodeCondition{
234+
Type: corev1.NodeReady,
235+
Status: corev1.ConditionTrue,
236+
LastHeartbeatTime: now,
237+
LastTransitionTime: now,
238+
},
239+
).Obj(),
240+
wantChanged: true,
241+
},
242+
}
243+
244+
for name, tc := range testCases {
245+
t.Run(name, func(t *testing.T) {
246+
got := checkNodeSchedulingPropertiesChanged(tc.oldNode, tc.newNode)
247+
if got != tc.wantChanged {
248+
t.Errorf("nodeSchedulingPropertiesChanged() = %v, want %v", got, tc.wantChanged)
249+
}
250+
})
251+
}
252+
}

pkg/util/testingjobs/node/wrappers.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ func (n *NodeWrapper) Label(k, v string) *NodeWrapper {
6161
return n
6262
}
6363

64+
// Annotation adds an annotation to the Node
65+
func (n *NodeWrapper) Annotation(k, v string) *NodeWrapper {
66+
if n.Annotations == nil {
67+
n.Annotations = make(map[string]string)
68+
}
69+
n.Annotations[k] = v
70+
return n
71+
}
72+
6473
// StatusConditions appends the given status conditions to the Node.
6574
func (n *NodeWrapper) StatusConditions(conditions ...corev1.NodeCondition) *NodeWrapper {
6675
n.Status.Conditions = append(n.Status.Conditions, conditions...)
@@ -107,3 +116,20 @@ func (n *NodeWrapper) Unschedulable() *NodeWrapper {
107116
n.Spec.Unschedulable = true
108117
return n
109118
}
119+
120+
// ConditionHeartbeat updates the LastHeartbeatTime of an existing condition.
121+
func (n *NodeWrapper) ConditionHeartbeat(conditionType corev1.NodeConditionType, heartbeat metav1.Time) *NodeWrapper {
122+
for i := range n.Status.Conditions {
123+
if n.Status.Conditions[i].Type == conditionType {
124+
n.Status.Conditions[i].LastHeartbeatTime = heartbeat
125+
break
126+
}
127+
}
128+
return n
129+
}
130+
131+
// ResourceVersion sets the ResourceVersion of the Node.
132+
func (n *NodeWrapper) ResourceVersion(version string) *NodeWrapper {
133+
n.ObjectMeta.ResourceVersion = version
134+
return n
135+
}

0 commit comments

Comments
 (0)