Skip to content

Commit 5bd9f29

Browse files
committed
Add replica_role label to metrics for HA setups
1 parent a9f281d commit 5bd9f29

File tree

25 files changed

+401
-307
lines changed

25 files changed

+401
-307
lines changed

cmd/kueue/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,14 @@ func main() {
242242
} else {
243243
close(certsReady)
244244
}
245-
cacheOptions := []schdcache.Option{schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg))}
246-
queueOptions := []qcache.Option{qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg))}
245+
cacheOptions := []schdcache.Option{
246+
schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg)),
247+
schdcache.WithRoleTracker(roleTracker),
248+
}
249+
queueOptions := []qcache.Option{
250+
qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg)),
251+
qcache.WithRoleTracker(roleTracker),
252+
}
247253
if cfg.Resources != nil && len(cfg.Resources.ExcludeResourcePrefixes) > 0 {
248254
cacheOptions = append(cacheOptions, schdcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))
249255
queueOptions = append(queueOptions, qcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))

pkg/cache/queue/manager.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/kueue/pkg/metrics"
4141
afs "sigs.k8s.io/kueue/pkg/util/admissionfairsharing"
4242
"sigs.k8s.io/kueue/pkg/util/queue"
43+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4344
"sigs.k8s.io/kueue/pkg/workload"
4445
)
4546

@@ -87,6 +88,13 @@ func WithResourceTransformations(transforms []config.ResourceTransformation) Opt
8788
}
8889
}
8990

91+
// WithRoleTracker sets the roleTracker for HA metrics.
92+
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
93+
return func(m *Manager) {
94+
m.roleTracker = tracker
95+
}
96+
}
97+
9098
// SetDRAReconcileChannel sets the DRA reconcile channel after manager creation.
9199
func (m *Manager) SetDRAReconcileChannel(ch chan<- event.TypedGenericEvent[*kueue.Workload]) {
92100
m.draReconcileChannel = ch
@@ -124,6 +132,8 @@ type Manager struct {
124132
workloadUpdateWatchers []WorkloadUpdateWatcher
125133

126134
draReconcileChannel chan<- event.TypedGenericEvent[*kueue.Workload]
135+
136+
roleTracker *roletracker.RoleTracker
127137
}
128138

129139
func NewManager(client client.Client, checker StatusChecker, options ...Option) *Manager {
@@ -717,7 +727,7 @@ func (m *Manager) reportLQPendingWorkloads(lq *LocalQueue) {
717727
metrics.ReportLocalQueuePendingWorkloads(metrics.LocalQueueReference{
718728
Name: lqName,
719729
Namespace: namespace,
720-
}, active, inadmissible)
730+
}, active, inadmissible, m.roleTracker)
721731
}
722732

723733
func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq *ClusterQueue) {
@@ -726,7 +736,7 @@ func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq
726736
inadmissible += active
727737
active = 0
728738
}
729-
metrics.ReportPendingWorkloads(cqName, active, inadmissible)
739+
metrics.ReportPendingWorkloads(cqName, active, inadmissible, m.roleTracker)
730740
}
731741

732742
func (m *Manager) GetClusterQueueNames() []kueue.ClusterQueueReference {

pkg/cache/scheduler/cache.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"sigs.k8s.io/kueue/pkg/metrics"
4242
"sigs.k8s.io/kueue/pkg/resources"
4343
"sigs.k8s.io/kueue/pkg/util/queue"
44+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4445
"sigs.k8s.io/kueue/pkg/workload"
4546
)
4647

@@ -95,6 +96,13 @@ func WithAdmissionFairSharing(afs *config.AdmissionFairSharing) Option {
9596
}
9697
}
9798

99+
// WithRoleTracker sets the roleTracker for HA metrics.
100+
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
101+
return func(c *Cache) {
102+
c.roleTracker = tracker
103+
}
104+
}
105+
98106
// Cache keeps track of the Workloads that got admitted through ClusterQueues.
99107
type Cache struct {
100108
sync.RWMutex
@@ -112,6 +120,8 @@ type Cache struct {
112120
hm hierarchy.Manager[*clusterQueue, *cohort]
113121

114122
tasCache tasCache
123+
124+
roleTracker *roletracker.RoleTracker
115125
}
116126

117127
func New(client client.Client, options ...Option) *Cache {
@@ -144,6 +154,7 @@ func (c *Cache) newClusterQueue(log logr.Logger, cq *kueue.ClusterQueue) (*clust
144154
AdmissionScope: cq.Spec.AdmissionScope,
145155

146156
workloadsNotAccountedForTAS: sets.New[workload.Reference](),
157+
roleTracker: c.roleTracker,
147158
}
148159
c.hm.AddClusterQueue(cqImpl)
149160
c.hm.UpdateClusterQueueEdge(kueue.ClusterQueueReference(cq.Name), cq.Spec.CohortName)
@@ -359,7 +370,7 @@ func (c *Cache) TerminateClusterQueue(name kueue.ClusterQueueReference) {
359370
defer c.Unlock()
360371
if cq := c.hm.ClusterQueue(name); cq != nil {
361372
cq.Status = terminating
362-
metrics.ReportClusterQueueStatus(cq.Name, cq.Status)
373+
metrics.ReportClusterQueueStatus(cq.Name, cq.Status, c.roleTracker)
363374
}
364375
}
365376

pkg/cache/scheduler/clusterqueue.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
4040
"sigs.k8s.io/kueue/pkg/util/api"
4141
"sigs.k8s.io/kueue/pkg/util/queue"
42+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4243
stringsutils "sigs.k8s.io/kueue/pkg/util/strings"
4344
"sigs.k8s.io/kueue/pkg/workload"
4445
)
@@ -88,6 +89,8 @@ type clusterQueue struct {
8889

8990
workloadsNotAccountedForTAS sets.Set[workload.Reference]
9091
AdmissionScope *kueue.AdmissionScope
92+
93+
roleTracker *roletracker.RoleTracker
9194
}
9295

9396
func (c *clusterQueue) GetName() kueue.ClusterQueueReference {
@@ -228,7 +231,7 @@ func (c *clusterQueue) updateQueueStatus(log logr.Logger) {
228231
if status != c.Status {
229232
log.V(3).Info("Updating status in cache", "clusterQueue", c.Name, "newStatus", status, "oldStatus", c.Status)
230233
c.Status = status
231-
metrics.ReportClusterQueueStatus(c.Name, c.Status)
234+
metrics.ReportClusterQueueStatus(c.Name, c.Status, c.roleTracker)
232235
}
233236
}
234237

@@ -464,14 +467,16 @@ func (c *clusterQueue) deleteWorkload(log logr.Logger, w *kueue.Workload) {
464467
}
465468

466469
func (c *clusterQueue) reportActiveWorkloads() {
467-
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(c.admittedWorkloadsCount))
468-
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(len(c.Workloads)))
470+
role := roletracker.GetRole(c.roleTracker)
471+
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(c.admittedWorkloadsCount))
472+
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(len(c.Workloads)))
469473
}
470474

471-
func (q *LocalQueue) reportActiveWorkloads() {
475+
func (q *LocalQueue) reportActiveWorkloads(tracker *roletracker.RoleTracker) {
476+
role := roletracker.GetRole(tracker)
472477
namespace, name := queue.MustParseLocalQueueReference(q.key)
473-
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.admittedWorkloads))
474-
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.reservingWorkloads))
478+
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.admittedWorkloads))
479+
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.reservingWorkloads))
475480
}
476481

477482
// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
@@ -501,7 +506,7 @@ func (c *clusterQueue) updateWorkloadUsage(log logr.Logger, wi *workload.Info, o
501506
lq.admittedWorkloads += op.asSignedOne()
502507
}
503508
if features.Enabled(features.LocalQueueMetrics) {
504-
lq.reportActiveWorkloads()
509+
lq.reportActiveWorkloads(c.roleTracker)
505510
}
506511
}
507512
}
@@ -571,7 +576,7 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
571576
}
572577
c.localQueues[qKey] = qImpl
573578
if features.Enabled(features.LocalQueueMetrics) {
574-
qImpl.reportActiveWorkloads()
579+
qImpl.reportActiveWorkloads(c.roleTracker)
575580
}
576581
return nil
577582
}

pkg/controller/core/clusterqueue_controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (r *ClusterQueueReconciler) Create(e event.TypedCreateEvent[*kueue.ClusterQ
310310
}
311311

312312
if r.reportResourceMetrics {
313-
recordResourceMetrics(e.Object)
313+
recordResourceMetrics(e.Object, r.roleTracker)
314314
}
315315

316316
return true
@@ -347,7 +347,7 @@ func (r *ClusterQueueReconciler) Update(e event.TypedUpdateEvent[*kueue.ClusterQ
347347
}
348348

349349
if r.reportResourceMetrics {
350-
updateResourceMetrics(e.ObjectOld, e.ObjectNew)
350+
updateResourceMetrics(e.ObjectOld, e.ObjectNew, r.roleTracker)
351351
}
352352
return true
353353
}
@@ -357,7 +357,7 @@ func (r *ClusterQueueReconciler) Generic(e event.TypedGenericEvent[*kueue.Cluste
357357
return true
358358
}
359359

360-
func recordResourceMetrics(cq *kueue.ClusterQueue) {
360+
func recordResourceMetrics(cq *kueue.ClusterQueue, tracker *roletracker.RoleTracker) {
361361
for rgi := range cq.Spec.ResourceGroups {
362362
rg := &cq.Spec.ResourceGroups[rgi]
363363
for fqi := range rg.Flavors {
@@ -367,7 +367,7 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
367367
nominal := resource.QuantityToFloat(&r.NominalQuota)
368368
borrow := resource.QuantityToFloat(r.BorrowingLimit)
369369
lend := resource.QuantityToFloat(r.LendingLimit)
370-
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, cq.Name, string(fq.Name), string(r.Name), nominal, borrow, lend)
370+
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, cq.Name, string(fq.Name), string(r.Name), nominal, borrow, lend, tracker)
371371
}
372372
}
373373
}
@@ -376,28 +376,28 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
376376
fr := &cq.Status.FlavorsReservation[fri]
377377
for ri := range fr.Resources {
378378
r := &fr.Resources[ri]
379-
metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
379+
metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
380380
}
381381
}
382382

383383
for fui := range cq.Status.FlavorsUsage {
384384
fu := &cq.Status.FlavorsUsage[fui]
385385
for ri := range fu.Resources {
386386
r := &fu.Resources[ri]
387-
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, cq.Name, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
387+
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, cq.Name, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
388388
}
389389
}
390390
}
391391

392-
func updateResourceMetrics(oldCq, newCq *kueue.ClusterQueue) {
392+
func updateResourceMetrics(oldCq, newCq *kueue.ClusterQueue, tracker *roletracker.RoleTracker) {
393393
// if the cohort changed, drop all the old metrics
394394
if oldCq.Spec.CohortName != newCq.Spec.CohortName {
395395
metrics.ClearClusterQueueResourceMetrics(oldCq.Name)
396396
} else {
397397
// selective remove
398398
clearOldResourceQuotas(oldCq, newCq)
399399
}
400-
recordResourceMetrics(newCq)
400+
recordResourceMetrics(newCq, tracker)
401401
}
402402

403403
func clearOldResourceQuotas(oldCq, newCq *kueue.ClusterQueue) {
@@ -578,7 +578,7 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
578578
if weightedShare == math.Inf(1) {
579579
weightedShare = math.NaN()
580580
}
581-
metrics.ReportClusterQueueWeightedShare(cq.Name, string(cq.Spec.CohortName), weightedShare)
581+
metrics.ReportClusterQueueWeightedShare(cq.Name, string(cq.Spec.CohortName), weightedShare, r.roleTracker)
582582
}
583583
if cq.Status.FairSharing == nil {
584584
cq.Status.FairSharing = &kueue.FairSharingStatus{}

pkg/controller/core/clusterqueue_controller_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
qcache "sigs.k8s.io/kueue/pkg/cache/queue"
3131
schdcache "sigs.k8s.io/kueue/pkg/cache/scheduler"
3232
"sigs.k8s.io/kueue/pkg/metrics"
33+
"sigs.k8s.io/kueue/pkg/util/roletracker"
3334
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
3435
testingmetrics "sigs.k8s.io/kueue/pkg/util/testing/metrics"
3536
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
@@ -266,6 +267,7 @@ func resourceDataPoint(cohort, name, flavor, res string, v float64) testingmetri
266267
"cluster_queue": name,
267268
"flavor": flavor,
268269
"resource": res,
270+
"replica_role": roletracker.RoleStandalone,
269271
},
270272
Value: v,
271273
}
@@ -492,14 +494,14 @@ func TestRecordResourceMetrics(t *testing.T) {
492494

493495
for name, tc := range testCases {
494496
t.Run(name, func(t *testing.T) {
495-
recordResourceMetrics(tc.queue)
497+
recordResourceMetrics(tc.queue, nil)
496498
gotMetrics := allMetricsForQueue(tc.queue.Name)
497499
if diff := cmp.Diff(tc.wantMetrics, gotMetrics, opts...); len(diff) != 0 {
498500
t.Errorf("Unexpected metrics (-want,+got):\n%s", diff)
499501
}
500502

501503
if tc.updatedQueue != nil {
502-
updateResourceMetrics(tc.queue, tc.updatedQueue)
504+
updateResourceMetrics(tc.queue, tc.updatedQueue, nil)
503505
gotMetricsAfterUpdate := allMetricsForQueue(tc.queue.Name)
504506
if diff := cmp.Diff(tc.wantUpdatedMetrics, gotMetricsAfterUpdate, opts...); len(diff) != 0 {
505507
t.Errorf("Unexpected metrics (-want,+got):\n%s", diff)

pkg/controller/core/cohort_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (r *CohortReconciler) updateCohortStatusIfChanged(ctx context.Context, coho
178178
}
179179

180180
if r.fairSharingEnabled {
181-
metrics.ReportCohortWeightedShare(cohort.Name, stats.WeightedShare)
181+
metrics.ReportCohortWeightedShare(cohort.Name, stats.WeightedShare, r.roleTracker)
182182
if cohort.Status.FairSharing == nil {
183183
cohort.Status.FairSharing = &kueue.FairSharingStatus{}
184184
}

pkg/controller/core/localqueue_controller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (r *LocalQueueReconciler) Create(e event.TypedCreateEvent[*kueue.LocalQueue
222222
}
223223

224224
if features.Enabled(features.LocalQueueMetrics) {
225-
recordLocalQueueUsageMetrics(e.Object)
225+
recordLocalQueueUsageMetrics(e.Object, r.roleTracker)
226226
}
227227

228228
return true
@@ -250,7 +250,7 @@ func (r *LocalQueueReconciler) Update(e event.TypedUpdateEvent[*kueue.LocalQueue
250250
log.V(2).Info("Queue update event")
251251

252252
if features.Enabled(features.LocalQueueMetrics) {
253-
updateLocalQueueResourceMetrics(e.ObjectNew)
253+
updateLocalQueueResourceMetrics(e.ObjectNew, r.roleTracker)
254254
}
255255

256256
oldStopPolicy := ptr.Deref(e.ObjectOld.Spec.StopPolicy, kueue.None)
@@ -366,22 +366,22 @@ func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueR
366366
}
367367
}
368368

369-
func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) {
369+
func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue, tracker *roletracker.RoleTracker) {
370370
for _, flavor := range queue.Status.FlavorsUsage {
371371
for _, r := range flavor.Resources {
372-
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
372+
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
373373
}
374374
}
375375
for _, flavor := range queue.Status.FlavorsReservation {
376376
for _, r := range flavor.Resources {
377-
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
377+
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
378378
}
379379
}
380380
}
381381

382-
func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) {
382+
func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue, tracker *roletracker.RoleTracker) {
383383
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue))
384-
recordLocalQueueUsageMetrics(queue)
384+
recordLocalQueueUsageMetrics(queue, tracker)
385385
}
386386

387387
func (r *LocalQueueReconciler) Generic(e event.TypedGenericEvent[*kueue.LocalQueue]) bool {
@@ -538,7 +538,7 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
538538
metrics.ReportLocalQueueStatus(metrics.LocalQueueReference{
539539
Name: kueue.LocalQueueName(queue.Name),
540540
Namespace: queue.Namespace,
541-
}, conditionStatus)
541+
}, conditionStatus, r.roleTracker)
542542
}
543543
}
544544
if !equality.Semantic.DeepEqual(oldStatus, queue.Status) {

0 commit comments

Comments
 (0)