Skip to content

Commit 1cafd1d

Browse files
authored
Fix workload slices when creation timestamps collide (#6203)
1 parent 06930a2 commit 1cafd1d

File tree

3 files changed

+26
-2
lines changed

3 files changed

+26
-2
lines changed

pkg/scheduler/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
328328
// Evict old workload-slice if any. Note: that oldWorkloadSlice is not nil only if
329329
// this is a workload-slice enabled workload and there is an old slice to evict.
330330
if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
331-
if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj); err != nil {
331+
if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil {
332332
log.Error(err, "Failed to aggregate workload slice")
333333
continue
334334
}

pkg/workloadslicing/workloadslicing.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
apimeta "k8s.io/apimachinery/pkg/api/meta"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/runtime/schema"
31+
"k8s.io/apimachinery/pkg/util/sets"
3132
"sigs.k8s.io/controller-runtime/pkg/client"
3233

3334
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
@@ -124,9 +125,15 @@ func FindNotFinishedWorkloads(ctx context.Context, clnt client.Client, jobObject
124125
return list.Items[i].CreationTimestamp.Before(&list.Items[j].CreationTimestamp)
125126
})
126127

128+
replacedSlices := sets.New[workload.Reference]()
129+
for _, w := range list.Items {
130+
if replacedKey := ReplacementForKey(&w); replacedKey != nil {
131+
replacedSlices.Insert(*replacedKey)
132+
}
133+
}
127134
// Filter out workloads with activated "Finished" condition.
128135
return slices.DeleteFunc(list.Items, func(w kueue.Workload) bool {
129-
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished)
136+
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) || replacedSlices.Has(workload.Key(&w))
130137
}), nil
131138
}
132139

pkg/workloadslicing/workloadslicing_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,23 @@ func TestFindNotFinishedWorkloads(t *testing.T) {
219219
*testWorkload("test-2", testJobObject.Name, testJobObject.UID, now).Obj(),
220220
},
221221
},
222+
"TwoActiveWorkloadsSelectNotReplaced": {
223+
args: args{
224+
ctx: t.Context(),
225+
clnt: testWorkloadClientBuilder().WithLists(&kueue.WorkloadList{
226+
Items: []kueue.Workload{
227+
*testWorkload("test-2", testJobObject.Name, testJobObject.UID, now).
228+
Annotation(WorkloadSliceReplacementFor, string(workload.NewReference("default", "test-2"))).Obj(),
229+
*testWorkload("test-1", testJobObject.Name, testJobObject.UID, now).Obj(),
230+
},
231+
}).Build(),
232+
jobObject: testJobObject,
233+
jobObjectGVK: testJobGVK,
234+
},
235+
want: []kueue.Workload{
236+
*testWorkload("test-1", testJobObject.Name, testJobObject.UID, now).Obj(),
237+
},
238+
},
222239
}
223240
for name, tt := range tests {
224241
t.Run(name, func(t *testing.T) {

0 commit comments

Comments
 (0)