Skip to content

Commit 3b6f8df

Browse files
authored
Merge pull request #274 from kerthcet/automated-cherry-pick-of-#245-upstream-release-0.1
Automated cherry pick of #245: fix: always requeue with the latest object
2 parents 89df4c6 + 0fa2967 commit 3b6f8df

File tree

3 files changed

+130
-27
lines changed

3 files changed

+130
-27
lines changed

CHANGELOG/CHANGELOG-0.1.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ Changes since `v0.1.0`:
55
- Fixed number of pending workloads in a BestEffortFIFO ClusterQueue.
66
- Fixed bug in a BestEffortFIFO ClusterQueue where a workload might not be
77
retried after a transient error.
8+
- Fixed requeuing an out-of-date workload when failed to admit it.

pkg/queue/manager.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,24 +288,26 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, imme
288288
m.Lock()
289289
defer m.Unlock()
290290

291-
q := m.queues[queueKeyForWorkload(info.Obj)]
292-
if q == nil {
293-
return false
294-
}
295-
296291
var w kueue.Workload
292+
// Always get the newest workload to avoid requeuing the out-of-date obj.
297293
err := m.client.Get(ctx, client.ObjectKeyFromObject(info.Obj), &w)
298294
// Since the client is cached, the only possible error is NotFound
299295
if apierrors.IsNotFound(err) || w.Spec.Admission != nil {
300296
return false
301297
}
302298

303-
q.AddIfNotPresent(info)
299+
q := m.queues[queueKeyForWorkload(&w)]
300+
if q == nil {
301+
return false
302+
}
303+
q.AddOrUpdate(&w)
304304
cq := m.clusterQueues[q.ClusterQueue]
305305
if cq == nil {
306306
return false
307307
}
308308

309+
// TODO(#162) Update the info object instead of constructing one.
310+
info = workload.NewInfo(&w)
309311
added := cq.RequeueIfNotPresent(info, immediate)
310312
if added {
311313
m.cond.Broadcast()

pkg/queue/manager_test.go

Lines changed: 121 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/google/go-cmp/cmp"
26+
"github.com/google/go-cmp/cmp/cmpopts"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
"k8s.io/apimachinery/pkg/util/sets"
@@ -699,6 +700,8 @@ func TestHeads(t *testing.T) {
699700
}
700701
}
701702

703+
var ignoreTypeMeta = cmpopts.IgnoreTypes(metav1.TypeMeta{})
704+
702705
// TestHeadAsync ensures that Heads call is blocked until the queues are filled
703706
// asynchronously.
704707
func TestHeadsAsync(t *testing.T) {
@@ -707,76 +710,101 @@ func TestHeadsAsync(t *testing.T) {
707710
t.Fatalf("Failed adding kueue scheme: %s", err)
708711
}
709712
now := time.Now().Truncate(time.Second)
710-
cq := utiltesting.MakeClusterQueue("fooCq").Obj()
713+
clusterQueues := []*kueue.ClusterQueue{
714+
utiltesting.MakeClusterQueue("fooCq").Obj(),
715+
utiltesting.MakeClusterQueue("barCq").Obj(),
716+
}
711717
wl := kueue.Workload{
712718
ObjectMeta: metav1.ObjectMeta{
713719
Name: "a",
714720
CreationTimestamp: metav1.NewTime(now),
715721
},
716722
Spec: kueue.WorkloadSpec{QueueName: "foo"},
717723
}
718-
q := kueue.Queue{
719-
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
720-
Spec: kueue.QueueSpec{
721-
ClusterQueue: "fooCq",
724+
var newWl kueue.Workload
725+
queues := []kueue.Queue{
726+
{
727+
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
728+
Spec: kueue.QueueSpec{
729+
ClusterQueue: "fooCq",
730+
},
722731
},
723-
}
724-
wantHeads := []workload.Info{
725732
{
726-
Obj: &wl,
727-
ClusterQueue: "fooCq",
733+
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
734+
Spec: kueue.QueueSpec{
735+
ClusterQueue: "barCq",
736+
},
728737
},
729738
}
730739
cases := map[string]struct {
731740
initialObjs []client.Object
732741
op func(context.Context, *Manager)
742+
wantHeads []workload.Info
733743
}{
734744
"AddClusterQueue": {
735-
initialObjs: []client.Object{&wl, &q},
745+
initialObjs: []client.Object{&wl, &queues[0]},
736746
op: func(ctx context.Context, mgr *Manager) {
737-
if err := mgr.AddQueue(ctx, &q); err != nil {
747+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
738748
t.Errorf("Failed adding queue: %s", err)
739749
}
740750
mgr.AddOrUpdateWorkload(&wl)
741751
go func() {
742-
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
752+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
743753
t.Errorf("Failed adding clusterQueue: %v", err)
744754
}
745755
}()
746756
},
757+
wantHeads: []workload.Info{
758+
{
759+
Obj: &wl,
760+
ClusterQueue: "fooCq",
761+
},
762+
},
747763
},
748764
"AddQueue": {
749765
initialObjs: []client.Object{&wl},
750766
op: func(ctx context.Context, mgr *Manager) {
751-
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
767+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
752768
t.Errorf("Failed adding clusterQueue: %v", err)
753769
}
754770
go func() {
755-
if err := mgr.AddQueue(ctx, &q); err != nil {
771+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
756772
t.Errorf("Failed adding queue: %s", err)
757773
}
758774
}()
759775
},
776+
wantHeads: []workload.Info{
777+
{
778+
Obj: &wl,
779+
ClusterQueue: "fooCq",
780+
},
781+
},
760782
},
761783
"AddWorkload": {
762784
op: func(ctx context.Context, mgr *Manager) {
763-
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
785+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
764786
t.Errorf("Failed adding clusterQueue: %v", err)
765787
}
766-
if err := mgr.AddQueue(ctx, &q); err != nil {
788+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
767789
t.Errorf("Failed adding queue: %s", err)
768790
}
769791
go func() {
770792
mgr.AddOrUpdateWorkload(&wl)
771793
}()
772794
},
795+
wantHeads: []workload.Info{
796+
{
797+
Obj: &wl,
798+
ClusterQueue: "fooCq",
799+
},
800+
},
773801
},
774802
"UpdateWorkload": {
775803
op: func(ctx context.Context, mgr *Manager) {
776-
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
804+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
777805
t.Errorf("Failed adding clusterQueue: %v", err)
778806
}
779-
if err := mgr.AddQueue(ctx, &q); err != nil {
807+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
780808
t.Errorf("Failed adding queue: %s", err)
781809
}
782810
go func() {
@@ -785,14 +813,20 @@ func TestHeadsAsync(t *testing.T) {
785813
mgr.UpdateWorkload(wlCopy, &wl)
786814
}()
787815
},
816+
wantHeads: []workload.Info{
817+
{
818+
Obj: &wl,
819+
ClusterQueue: "fooCq",
820+
},
821+
},
788822
},
789823
"RequeueWorkload": {
790824
initialObjs: []client.Object{&wl},
791825
op: func(ctx context.Context, mgr *Manager) {
792-
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
826+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
793827
t.Errorf("Failed adding clusterQueue: %v", err)
794828
}
795-
if err := mgr.AddQueue(ctx, &q); err != nil {
829+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
796830
t.Errorf("Failed adding queue: %s", err)
797831
}
798832
// Remove the initial workload from the manager.
@@ -801,6 +835,72 @@ func TestHeadsAsync(t *testing.T) {
801835
mgr.RequeueWorkload(ctx, workload.NewInfo(&wl), true)
802836
}()
803837
},
838+
wantHeads: []workload.Info{
839+
{
840+
Obj: &wl,
841+
ClusterQueue: "fooCq",
842+
},
843+
},
844+
},
845+
"RequeueWithOutOfDateWorkload": {
846+
initialObjs: []client.Object{&wl},
847+
op: func(ctx context.Context, mgr *Manager) {
848+
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
849+
t.Errorf("Failed adding clusterQueue: %v", err)
850+
}
851+
if err := mgr.AddQueue(ctx, &queues[0]); err != nil {
852+
t.Errorf("Failed adding queue: %s", err)
853+
}
854+
855+
newWl = wl
856+
newWl.Annotations = map[string]string{"foo": "bar"}
857+
if err := mgr.client.Update(ctx, &newWl, &client.UpdateOptions{}); err != nil {
858+
t.Errorf("Failed to update the workload; %s", err)
859+
}
860+
// Remove the initial workload from the manager.
861+
mgr.Heads(ctx)
862+
go func() {
863+
mgr.RequeueWorkload(ctx, workload.NewInfo(&wl), true)
864+
}()
865+
},
866+
wantHeads: []workload.Info{
867+
{
868+
Obj: &newWl,
869+
ClusterQueue: "fooCq",
870+
},
871+
},
872+
},
873+
"RequeueWithQueueChangedWorkload": {
874+
initialObjs: []client.Object{&wl},
875+
op: func(ctx context.Context, mgr *Manager) {
876+
for _, cq := range clusterQueues {
877+
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
878+
t.Errorf("Failed adding clusterQueue: %v", err)
879+
}
880+
}
881+
for _, q := range queues {
882+
if err := mgr.AddQueue(ctx, &q); err != nil {
883+
t.Errorf("Failed adding queue: %s", err)
884+
}
885+
}
886+
887+
newWl = wl
888+
newWl.Spec.QueueName = "bar"
889+
if err := mgr.client.Update(ctx, &newWl, &client.UpdateOptions{}); err != nil {
890+
t.Errorf("Failed to update the workload; %s", err)
891+
}
892+
// Remove the initial workload from the manager.
893+
mgr.Heads(ctx)
894+
go func() {
895+
mgr.RequeueWorkload(ctx, workload.NewInfo(&wl), true)
896+
}()
897+
},
898+
wantHeads: []workload.Info{
899+
{
900+
Obj: &newWl,
901+
ClusterQueue: "barCq",
902+
},
903+
},
804904
},
805905
}
806906
for name, tc := range cases {
@@ -812,7 +912,7 @@ func TestHeadsAsync(t *testing.T) {
812912
go manager.CleanUpOnContext(ctx)
813913
tc.op(ctx, manager)
814914
heads := manager.Heads(ctx)
815-
if diff := cmp.Diff(wantHeads, heads); diff != "" {
915+
if diff := cmp.Diff(tc.wantHeads, heads, ignoreTypeMeta); diff != "" {
816916
t.Errorf("GetHeads returned wrong heads (-want,+got):\n%s", diff)
817917
}
818918
})

0 commit comments

Comments
 (0)