Skip to content

Commit 99ee06a

Browse files
committed
Implement Prometheus metrics for LocalQueue (kubernetes-sigs#3673)
* add LocalQueue metrics (no feature gate) Signed-off-by: Kevin <[email protected]> * add all clear and report calls Signed-off-by: Kevin <[email protected]> * add feature gate Signed-off-by: Kevin <[email protected]> * cleanup todos and add more feature gates Signed-off-by: Kevin <[email protected]> * use feature gate instead of config Signed-off-by: Kevin <[email protected]> * cleanup Signed-off-by: Kevin <[email protected]> * add metrics checks to a test Signed-off-by: Kevin <[email protected]> * add lq metrics to cq integration test Signed-off-by: Kevin <[email protected]> * lint fix Signed-off-by: Kevin <[email protected]> * use name instead of local_queue Signed-off-by: Kevin <[email protected]> * update status metric description Signed-off-by: Kevin <[email protected]> * fix key name Signed-off-by: Kevin <[email protected]> * move registerLQ into metrics package Signed-off-by: Kevin <[email protected]> --------- Signed-off-by: Kevin <[email protected]>
1 parent db8b9da commit 99ee06a

14 files changed

+528
-4
lines changed

pkg/cache/cache.go

+5
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
409409
if !ok {
410410
return
411411
}
412+
if features.Enabled(features.LocalQueueMetrics) {
413+
for _, q := range c.clusterQueues[cq.Name].localQueues {
414+
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key))
415+
}
416+
}
412417
c.deleteClusterQueueFromCohort(cqImpl)
413418
delete(c.clusterQueues, cq.Name)
414419
metrics.ClearCacheMetrics(cq.Name)

pkg/cache/clusterqueue.go

+15
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,12 @@ func (c *clusterQueue) reportActiveWorkloads() {
427427
metrics.ReservingActiveWorkloads.WithLabelValues(c.Name).Set(float64(len(c.Workloads)))
428428
}
429429

430+
func (q *queue) reportActiveWorkloads() {
431+
qKeySlice := strings.Split(q.key, "/")
432+
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads))
433+
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads))
434+
}
435+
430436
// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
431437
// and the number of admitted workloads for local queues.
432438
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
@@ -445,6 +451,9 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
445451
updateFlavorUsage(frUsage, lq.admittedUsage, m)
446452
lq.admittedWorkloads += int(m)
447453
}
454+
if features.Enabled(features.LocalQueueMetrics) {
455+
lq.reportActiveWorkloads()
456+
}
448457
}
449458
}
450459

@@ -496,11 +505,17 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
496505
}
497506
}
498507
c.localQueues[qKey] = qImpl
508+
if features.Enabled(features.LocalQueueMetrics) {
509+
qImpl.reportActiveWorkloads()
510+
}
499511
return nil
500512
}
501513

502514
func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) {
503515
qKey := queueKey(q)
516+
if features.Enabled(features.LocalQueueMetrics) {
517+
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
518+
}
504519
delete(c.localQueues, qKey)
505520
}
506521

pkg/controller/core/core.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
5757
qManager,
5858
cc,
5959
WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)),
60-
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
6160
WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources),
61+
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
6262
WithFairSharing(fairSharingEnabled),
6363
WithWatchers(rfRec, acRec),
6464
)

pkg/controller/core/localqueue_controller.go

+51-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ import (
4040
"sigs.k8s.io/kueue/pkg/cache"
4141
"sigs.k8s.io/kueue/pkg/constants"
4242
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
43+
"sigs.k8s.io/kueue/pkg/features"
44+
"sigs.k8s.io/kueue/pkg/metrics"
4345
"sigs.k8s.io/kueue/pkg/queue"
46+
"sigs.k8s.io/kueue/pkg/util/resource"
4447
)
4548

4649
const (
@@ -63,7 +66,11 @@ type LocalQueueReconciler struct {
6366
wlUpdateCh chan event.GenericEvent
6467
}
6568

66-
func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler {
69+
func NewLocalQueueReconciler(
70+
client client.Client,
71+
queues *queue.Manager,
72+
cache *cache.Cache,
73+
) *LocalQueueReconciler {
6774
return &LocalQueueReconciler{
6875
log: ctrl.Log.WithName("localqueue-reconciler"),
6976
queues: queues,
@@ -142,6 +149,10 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
142149
log.Error(err, "Failed to add localQueue to the cache")
143150
}
144151

152+
if features.Enabled(features.LocalQueueMetrics) {
153+
recordLocalQueueUsageMetrics(q)
154+
}
155+
145156
return true
146157
}
147158

@@ -151,6 +162,11 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
151162
// No need to interact with the queue manager for other objects.
152163
return true
153164
}
165+
166+
if features.Enabled(features.LocalQueueMetrics) {
167+
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q))
168+
}
169+
154170
r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q))
155171
r.queues.DeleteLocalQueue(q)
156172
r.cache.DeleteLocalQueue(q)
@@ -191,10 +207,38 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
191207
}
192208

193209
r.queues.DeleteLocalQueue(oldLq)
210+
if features.Enabled(features.LocalQueueMetrics) {
211+
updateLocalQueueResourceMetrics(newLq)
212+
}
194213

195214
return true
196215
}
197216

217+
func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueReference {
218+
return metrics.LocalQueueReference{
219+
Name: lq.Name,
220+
Namespace: lq.Namespace,
221+
}
222+
}
223+
224+
func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) {
225+
for _, flavor := range queue.Status.FlavorUsage {
226+
for _, r := range flavor.Resources {
227+
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
228+
}
229+
}
230+
for _, flavor := range queue.Status.FlavorsReservation {
231+
for _, r := range flavor.Resources {
232+
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
233+
}
234+
}
235+
}
236+
237+
func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) {
238+
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue))
239+
recordLocalQueueUsageMetrics(queue)
240+
}
241+
198242
func (r *LocalQueueReconciler) Generic(e event.GenericEvent) bool {
199243
r.log.V(3).Info("Got Workload event", "workload", klog.KObj(e.Object))
200244
return true
@@ -337,6 +381,12 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
337381
Message: msg,
338382
ObservedGeneration: queue.Generation,
339383
})
384+
if features.Enabled(features.LocalQueueMetrics) {
385+
metrics.ReportLocalQueueStatus(metrics.LocalQueueReference{
386+
Name: queue.Name,
387+
Namespace: queue.Namespace,
388+
}, conditionStatus)
389+
}
340390
}
341391
if !equality.Semantic.DeepEqual(oldStatus, queue.Status) {
342392
return r.client.Status().Update(ctx, queue)

pkg/controller/core/workload_controller.go

+8
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
5050
"sigs.k8s.io/kueue/pkg/cache"
5151
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
52+
"sigs.k8s.io/kueue/pkg/features"
5253
"sigs.k8s.io/kueue/pkg/metrics"
5354
"sigs.k8s.io/kueue/pkg/queue"
5455
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
@@ -258,6 +259,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
258259
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
259260
metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime)
260261
metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime)
262+
if features.Enabled(features.LocalQueueMetrics) {
263+
metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime)
264+
metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime)
265+
}
261266
}
262267
return ctrl.Result{}, nil
263268
}
@@ -384,6 +389,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex
384389
cqName := string(lq.Spec.ClusterQueue)
385390
if slices.Contains(r.queues.GetClusterQueueNames(), cqName) {
386391
metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped)
392+
if features.Enabled(features.LocalQueueMetrics) {
393+
metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped)
394+
}
387395
}
388396
}
389397
return true, client.IgnoreNotFound(err)

pkg/features/kube_features.go

+7
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ const (
105105
// Enable the Flavors status field in the LocalQueue, allowing users to view
106106
// all currently available ResourceFlavors in the LocalQueue.
107107
ExposeFlavorsInLocalQueue featuregate.Feature = "ExposeFlavorsInLocalQueue"
108+
109+
// owner: @kpostoffice
110+
// alpha: v0.10
111+
//
112+
// Enabled gathering of LocalQueue metrics
113+
LocalQueueMetrics featuregate.Feature = "LocalQueueMetrics"
108114
)
109115

110116
func init() {
@@ -129,6 +135,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
129135
MultiKueueBatchJobWithManagedBy: {Default: false, PreRelease: featuregate.Alpha},
130136
MultiplePreemptions: {Default: false, PreRelease: featuregate.Alpha},
131137
ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta},
138+
LocalQueueMetrics: {Default: false, PreRelease: featuregate.Alpha},
132139
}
133140

134141
func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {

0 commit comments

Comments
 (0)