Skip to content

Commit fd19194

Browse files
authored
address follow-up comments to #8484 (#8686) (#8693)
1 parent a6c4e94 commit fd19194

File tree

8 files changed

+78
-29
lines changed

8 files changed

+78
-29
lines changed

pkg/cache/scheduler/tas_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (t *tasCache) DeleteTopology(name kueue.TopologyReference) {
120120

121121
// Update may add a pod to the cache, or
122122
// delete a terminated pod.
123-
func (t *tasCache) Update(pod corev1.Pod, log logr.Logger) {
123+
func (t *tasCache) Update(pod *corev1.Pod, log logr.Logger) {
124124
t.nonTasUsageCache.update(pod, log)
125125
}
126126

pkg/cache/scheduler/tas_cache_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package scheduler
1919
import (
2020
"testing"
2121

22-
"github.com/go-logr/logr"
2322
"github.com/google/go-cmp/cmp"
2423
corev1 "k8s.io/api/core/v1"
2524
"k8s.io/apimachinery/pkg/api/resource"
@@ -4264,7 +4263,7 @@ func TestFindTopologyAssignments(t *testing.T) {
42644263
}
42654264
for name, tc := range cases {
42664265
t.Run(name, func(t *testing.T) {
4267-
ctx, _ := utiltesting.ContextWithLog(t)
4266+
ctx, log := utiltesting.ContextWithLog(t)
42684267
// TODO: remove after dropping the TAS profiles feature gates
42694268
for _, gate := range tc.enableFeatureGates {
42704269
features.SetFeatureGateDuringTest(t, gate, true)
@@ -4291,7 +4290,7 @@ func TestFindTopologyAssignments(t *testing.T) {
42914290
NodeLabels: tc.nodeLabels,
42924291
}
42934292
for _, pod := range tc.pods {
4294-
tasCache.Update(pod, logr.FromContextOrDiscard(ctx))
4293+
tasCache.Update(&pod, log)
42954294
}
42964295
tasFlavorCache := tasCache.NewTASFlavorCache(topologyInformation, flavorInformation)
42974296

pkg/cache/scheduler/tas_non_tas_pod_cache.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,21 @@ type podUsageValue struct {
4343

4444
// update may add a pod to the cache, or
4545
// delete a terminated pod.
46-
func (n *nonTasUsageCache) update(pod corev1.Pod, log logr.Logger) {
46+
func (n *nonTasUsageCache) update(pod *corev1.Pod, log logr.Logger) {
4747
n.lock.Lock()
4848
defer n.lock.Unlock()
4949

5050
// delete terminated pods as they no longer use any capacity.
51-
if utilpod.IsTerminated(&pod) {
51+
if utilpod.IsTerminated(pod) {
5252
log.V(5).Info("Deleting terminated pod from the cache")
53-
delete(n.podUsage, client.ObjectKeyFromObject(&pod))
53+
delete(n.podUsage, client.ObjectKeyFromObject(pod))
5454
return
5555
}
5656

5757
log.V(5).Info("Adding non-TAS pod to the cache")
5858
requests := resources.NewRequests(
59-
resourcehelpers.PodRequests(&pod, resourcehelpers.PodResourcesOptions{}))
60-
n.podUsage[client.ObjectKeyFromObject(&pod)] = podUsageValue{
59+
resourcehelpers.PodRequests(pod, resourcehelpers.PodResourcesOptions{}))
60+
n.podUsage[client.ObjectKeyFromObject(pod)] = podUsageValue{
6161
node: pod.Spec.NodeName,
6262
usage: requests,
6363
}

pkg/controller/tas/controllers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func SetupControllers(mgr ctrl.Manager, queues *qcache.Manager, cache *schdcache
4545
return ctrlName, err
4646
}
4747
}
48-
nonTasUsageController := newNonTasUsageReconciler(mgr.GetClient(), cache, queues)
48+
nonTasUsageController := newNonTasUsageReconciler(mgr.GetClient(), cache)
4949
if ctrlName, err := nonTasUsageController.SetupWithManager(mgr); err != nil {
5050
return ctrlName, err
5151
}

pkg/controller/tas/non_tas_usage_controller.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,22 @@ import (
3131
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3232
"sigs.k8s.io/controller-runtime/pkg/source"
3333

34-
qcache "sigs.k8s.io/kueue/pkg/cache/queue"
3534
schdcache "sigs.k8s.io/kueue/pkg/cache/scheduler"
3635
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
3736
)
3837

39-
func newNonTasUsageReconciler(client client.Client, cache *schdcache.Cache, qcache *qcache.Manager) *NonTasUsageReconciler {
38+
func newNonTasUsageReconciler(k8sClient client.Client, cache *schdcache.Cache) *NonTasUsageReconciler {
4039
return &NonTasUsageReconciler{
41-
Client: client,
42-
Cache: cache,
43-
QCache: qcache,
40+
k8sClient: k8sClient,
41+
cache: cache,
4442
}
4543
}
4644

4745
// NonTasUsageReconciler monitors pods to update
4846
// the TAS cache with non-TAS usage.
4947
type NonTasUsageReconciler struct {
50-
client.Client
51-
Cache *schdcache.Cache
52-
QCache *qcache.Manager
48+
k8sClient client.Client
49+
cache *schdcache.Cache
5350
}
5451

5552
var _ reconcile.Reconciler = (*NonTasUsageReconciler)(nil)
@@ -61,17 +58,17 @@ func (r *NonTasUsageReconciler) Reconcile(ctx context.Context, req ctrl.Request)
6158
log := klog.FromContext(ctx).WithValues("pod", req.NamespacedName)
6259
log.V(3).Info("Non-TAS usage cache reconciling")
6360
var pod corev1.Pod
64-
err := r.Get(ctx, req.NamespacedName, &pod)
61+
err := r.k8sClient.Get(ctx, req.NamespacedName, &pod)
6562
if err != nil {
6663
if client.IgnoreNotFound(err) != nil {
6764
return ctrl.Result{}, err
6865
}
6966
log.V(5).Info("Idempotently deleting not found pod")
70-
r.Cache.TASCache().DeletePodByKey(req.NamespacedName)
67+
r.cache.TASCache().DeletePodByKey(req.NamespacedName)
7168
return ctrl.Result{}, nil
7269
}
7370

74-
r.Cache.TASCache().Update(pod, log)
71+
r.cache.TASCache().Update(&pod, log)
7572
return ctrl.Result{}, nil
7673
}
7774

pkg/scheduler/scheduler_tas_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"testing"
2424
"time"
2525

26-
"github.com/go-logr/logr"
2726
"github.com/google/go-cmp/cmp"
2827
"github.com/google/go-cmp/cmp/cmpopts"
2928
corev1 "k8s.io/api/core/v1"
@@ -2716,7 +2715,7 @@ func TestScheduleForTAS(t *testing.T) {
27162715
}
27172716
}
27182717
for _, pod := range tc.pods {
2719-
cqCache.TASCache().Update(pod, logr.FromContextOrDiscard(ctx))
2718+
cqCache.TASCache().Update(&pod, log)
27202719
}
27212720
initiallyAdmittedWorkloads := sets.New[workload.Reference]()
27222721
for _, w := range tc.workloads {

test/integration/singlecluster/tas/tas_test.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() {
343343
gomega.Consistently(func(g gomega.Gomega) {
344344
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed())
345345
g.Expect(workload.IsAdmitted(wl)).To(gomega.BeFalse())
346-
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
346+
}, util.ShortConsistentDuration, util.Interval).Should(gomega.Succeed())
347347
})
348348
})
349349

@@ -393,7 +393,7 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() {
393393
gomega.Consistently(func(g gomega.Gomega) {
394394
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed())
395395
g.Expect(workload.IsAdmitted(wl)).To(gomega.BeFalse())
396-
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
396+
}, util.ShortConsistentDuration, util.Interval).Should(gomega.Succeed())
397397
})
398398

399399
ginkgo.By("delete the non-TAS pod", func() {
@@ -476,6 +476,59 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() {
476476
util.ExpectAdmittedWorkloadsTotalMetric(clusterQueue, "", 2)
477477
})
478478
})
479+
480+
ginkgo.It("non-TAS pod terminating; capacity not released", func() {
481+
var wl *kueue.Workload
482+
var nonTasPod *corev1.Pod
483+
484+
ginkgo.By("create a non-TAS pod which consumes the node's capacity", func() {
485+
nonTasPod = testingpod.MakePod("pod", ns.Name).
486+
Request(corev1.ResourceCPU, "1").
487+
NodeName("node1").
488+
StatusPhase(corev1.PodRunning).
489+
Finalizer("kueue.sigs.k8s.io/test-finalizer").
490+
Obj()
491+
util.MustCreate(ctx, k8sClient, nonTasPod)
492+
})
493+
494+
ginkgo.By("create a workload which requires the node's capacity", func() {
495+
wl = utiltestingapi.MakeWorkload("wl", ns.Name).
496+
Queue("local-queue").
497+
Request(corev1.ResourceCPU, "1").
498+
Obj()
499+
util.MustCreate(ctx, k8sClient, wl)
500+
501+
ginkgo.By("verify the workload is not admitted", func() {
502+
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 1)
503+
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl)
504+
gomega.Consistently(func(g gomega.Gomega) {
505+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed())
506+
g.Expect(workload.IsAdmitted(wl)).To(gomega.BeFalse())
507+
}, util.ShortConsistentDuration, util.Interval).Should(gomega.Succeed())
508+
})
509+
})
510+
511+
ginkgo.By("delete the non-TAS pod", func() {
512+
gomega.Expect(k8sClient.Delete(ctx, nonTasPod)).To(gomega.Succeed())
513+
})
514+
515+
ginkgo.By("verify the non-TAS pod has deletionTimestamp", func() {
516+
gomega.Eventually(func(g gomega.Gomega) {
517+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(nonTasPod), nonTasPod)).To(gomega.Succeed())
518+
g.Expect(nonTasPod.DeletionTimestamp).NotTo(gomega.BeNil())
519+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
520+
})
521+
522+
ginkgo.By("Verify that the TAS-workload doesn't admit", func() {
523+
gomega.Consistently(func(g gomega.Gomega) {
524+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed())
525+
g.Expect(workload.IsAdmitted(wl)).To(gomega.BeFalse())
526+
}, util.ShortConsistentDuration, util.Interval).Should(gomega.Succeed())
527+
})
528+
// note to future developer: this non-TAS pod doesn't delete properly after ns clean-up,
529+
// so it will keep taking node1's capacity.
530+
// need to debug this before writing future tests.
531+
})
479532
})
480533

481534
ginkgo.When("Single TAS Resource Flavor", func() {

test/util/constants.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ const (
4141
// Taken into account that after the certificates are ready, all Kueue's components
4242
// need started and the time it takes for a change in ready probe response triggers
4343
// a change in the deployment status.
44-
StartUpTimeout = 5 * time.Minute
45-
ConsistentDuration = time.Second
46-
ShortInterval = 10 * time.Millisecond
47-
Interval = time.Millisecond * 250
44+
StartUpTimeout = 5 * time.Minute
45+
ShortConsistentDuration = 10 * time.Millisecond
46+
ConsistentDuration = 1 * time.Second
47+
ShortInterval = 10 * time.Millisecond
48+
Interval = time.Millisecond * 250
4849
)
4950

5051
var (

0 commit comments

Comments
 (0)