Skip to content

Commit b13e54b

Browse files
committed
Add replica_role label to metrics and replica-role field to logs
1 parent 81cae06 commit b13e54b

37 files changed

+1085
-389
lines changed

cmd/kueue/main.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import (
7979
"sigs.k8s.io/kueue/pkg/scheduler/preemption/fairsharing"
8080
"sigs.k8s.io/kueue/pkg/util/cert"
8181
"sigs.k8s.io/kueue/pkg/util/kubeversion"
82+
"sigs.k8s.io/kueue/pkg/util/roletracker"
8283
"sigs.k8s.io/kueue/pkg/util/useragent"
8384
"sigs.k8s.io/kueue/pkg/util/waitforpodsready"
8485
"sigs.k8s.io/kueue/pkg/version"
@@ -233,6 +234,15 @@ func main() {
233234
os.Exit(1)
234235
}
235236

237+
var roleTracker *roletracker.RoleTracker
238+
if cfg.LeaderElection != nil && ptr.Deref(cfg.LeaderElection.LeaderElect, false) {
239+
roleTracker = roletracker.NewRoleTracker(mgr.Elected())
240+
go roleTracker.Start(ctx, setupLog)
241+
setupLog.Info("RoleTracker: leader election enabled")
242+
} else {
243+
setupLog.Info("RoleTracker: running in standalone mode")
244+
}
245+
236246
certsReady := make(chan struct{})
237247

238248
if cfg.InternalCertManagement != nil && *cfg.InternalCertManagement.Enable {
@@ -243,8 +253,8 @@ func main() {
243253
} else {
244254
close(certsReady)
245255
}
246-
cacheOptions := []schdcache.Option{schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg))}
247-
queueOptions := []qcache.Option{qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg))}
256+
cacheOptions := []schdcache.Option{schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg)), schdcache.WithRoleTracker(roleTracker)}
257+
queueOptions := []qcache.Option{qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg)), qcache.WithRoleTracker(roleTracker)}
248258
if cfg.Resources != nil && len(cfg.Resources.ExcludeResourcePrefixes) > 0 {
249259
cacheOptions = append(cacheOptions, schdcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))
250260
queueOptions = append(queueOptions, qcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))
@@ -287,12 +297,12 @@ func main() {
287297
os.Exit(1)
288298
}
289299

290-
if err := setupControllers(ctx, mgr, cCache, queues, &cfg, serverVersionFetcher); err != nil {
300+
if err := setupControllers(ctx, mgr, cCache, queues, &cfg, serverVersionFetcher, roleTracker); err != nil {
291301
setupLog.Error(err, "Unable to setup controllers")
292302
os.Exit(1)
293303
}
294304

295-
if failedWebhook, err := webhooks.Setup(mgr); err != nil {
305+
if failedWebhook, err := webhooks.Setup(mgr, webhooks.WithRoleTracker(roleTracker)); err != nil {
296306
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
297307
os.Exit(1)
298308
}
@@ -309,7 +319,7 @@ func main() {
309319
}()
310320
}
311321

312-
if err := setupScheduler(mgr, cCache, queues, &cfg); err != nil {
322+
if err := setupScheduler(mgr, cCache, queues, &cfg, roleTracker); err != nil {
313323
setupLog.Error(err, "Could not setup scheduler")
314324
os.Exit(1)
315325
}
@@ -352,8 +362,8 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
352362
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
353363
}
354364

355-
func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Manager, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) error {
356-
if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg); err != nil {
365+
func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Manager, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher, roleTracker *roletracker.RoleTracker) error {
366+
if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg, core.WithSetupRoleTracker(roleTracker)); err != nil {
357367
return fmt.Errorf("unable to create controller %s: %w", failedCtrl, err)
358368
}
359369
if features.Enabled(features.FailureRecoveryPolicy) {
@@ -413,7 +423,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.C
413423
}
414424

415425
if features.Enabled(features.TopologyAwareScheduling) {
416-
if failedCtrl, err := tas.SetupControllers(mgr, queues, cCache, cfg); err != nil {
426+
if failedCtrl, err := tas.SetupControllers(mgr, queues, cCache, cfg, tas.WithRoleTracker(roleTracker)); err != nil {
417427
return fmt.Errorf("could not setup TAS controller %s: %w", failedCtrl, err)
418428
}
419429
}
@@ -429,6 +439,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.C
429439
jobframework.WithCache(cCache),
430440
jobframework.WithQueues(queues),
431441
jobframework.WithObjectRetentionPolicies(cfg.ObjectRetentionPolicies),
442+
jobframework.WithRoleTracker(roleTracker),
432443
}
433444
nsSelector, err := metav1.LabelSelectorAsSelector(cfg.ManagedJobsNamespaceSelector)
434445
if err != nil {
@@ -472,7 +483,7 @@ func setupProbeEndpoints(mgr ctrl.Manager, certsReady <-chan struct{}) error {
472483
return nil
473484
}
474485

475-
func setupScheduler(mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Manager, cfg *configapi.Configuration) error {
486+
func setupScheduler(mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Manager, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) error {
476487
sched := scheduler.New(
477488
queues,
478489
cCache,
@@ -481,6 +492,7 @@ func setupScheduler(mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Ma
481492
scheduler.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(cfg)),
482493
scheduler.WithFairSharing(cfg.FairSharing),
483494
scheduler.WithAdmissionFairSharing(cfg.AdmissionFairSharing),
495+
scheduler.WithRoleTracker(roleTracker),
484496
)
485497
if err := mgr.Add(sched); err != nil {
486498
return fmt.Errorf("unable to add scheduler to manager: %w", err)

pkg/cache/queue/manager.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/kueue/pkg/metrics"
4141
afs "sigs.k8s.io/kueue/pkg/util/admissionfairsharing"
4242
"sigs.k8s.io/kueue/pkg/util/queue"
43+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4344
"sigs.k8s.io/kueue/pkg/workload"
4445
)
4546

@@ -52,6 +53,13 @@ var (
5253
// Option configures the manager.
5354
type Option func(*Manager)
5455

56+
// WithRoleTracker sets the role tracker for logs and metrics. Can be nil if leader election is disabled.
57+
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
58+
return func(m *Manager) {
59+
m.roleTracker = tracker
60+
}
61+
}
62+
5563
// WithClock allows to specify a custom clock
5664
func WithClock(c clock.WithDelayedExecution) Option {
5765
return func(m *Manager) {
@@ -105,6 +113,7 @@ type Manager struct {
105113

106114
clock clock.WithDelayedExecution
107115
client client.Client
116+
roleTracker *roletracker.RoleTracker
108117
statusChecker StatusChecker
109118
localQueues map[queue.LocalQueueReference]*LocalQueue
110119

@@ -717,7 +726,7 @@ func (m *Manager) reportLQPendingWorkloads(lq *LocalQueue) {
717726
metrics.ReportLocalQueuePendingWorkloads(metrics.LocalQueueReference{
718727
Name: lqName,
719728
Namespace: namespace,
720-
}, active, inadmissible)
729+
}, active, inadmissible, m.roleTracker)
721730
}
722731

723732
func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq *ClusterQueue) {
@@ -726,7 +735,7 @@ func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq
726735
inadmissible += active
727736
active = 0
728737
}
729-
metrics.ReportPendingWorkloads(cqName, active, inadmissible)
738+
metrics.ReportPendingWorkloads(cqName, active, inadmissible, m.roleTracker)
730739
}
731740

732741
func (m *Manager) GetClusterQueueNames() []kueue.ClusterQueueReference {

pkg/cache/scheduler/cache.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"sigs.k8s.io/kueue/pkg/metrics"
4242
"sigs.k8s.io/kueue/pkg/resources"
4343
"sigs.k8s.io/kueue/pkg/util/queue"
44+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4445
"sigs.k8s.io/kueue/pkg/workload"
4546
)
4647

@@ -61,6 +62,13 @@ const (
6162
// Option configures the reconciler.
6263
type Option func(*Cache)
6364

65+
// WithRoleTracker sets the role tracker for logs and metrics. Can be nil if leader election is disabled.
66+
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
67+
return func(c *Cache) {
68+
c.roleTracker = tracker
69+
}
70+
}
71+
6472
// WithPodsReadyTracking indicates the cache controller tracks the PodsReady
6573
// condition for admitted workloads, and allows to block admission of new
6674
// workloads until all admitted workloads are in the PodsReady condition.
@@ -101,6 +109,7 @@ type Cache struct {
101109
podsReadyCond sync.Cond
102110

103111
client client.Client
112+
roleTracker *roletracker.RoleTracker
104113
assumedWorkloads map[workload.Reference]kueue.ClusterQueueReference
105114
resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor
106115
podsReadyTracking bool
@@ -142,6 +151,7 @@ func (c *Cache) newClusterQueue(log logr.Logger, cq *kueue.ClusterQueue) (*clust
142151
resourceNode: NewResourceNode(),
143152
tasCache: &c.tasCache,
144153
AdmissionScope: cq.Spec.AdmissionScope,
154+
roleTracker: c.roleTracker,
145155

146156
workloadsNotAccountedForTAS: sets.New[workload.Reference](),
147157
}
@@ -359,7 +369,7 @@ func (c *Cache) TerminateClusterQueue(name kueue.ClusterQueueReference) {
359369
defer c.Unlock()
360370
if cq := c.hm.ClusterQueue(name); cq != nil {
361371
cq.Status = terminating
362-
metrics.ReportClusterQueueStatus(cq.Name, cq.Status)
372+
metrics.ReportClusterQueueStatus(cq.Name, cq.Status, c.roleTracker)
363373
}
364374
}
365375

pkg/cache/scheduler/clusterqueue.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
4040
"sigs.k8s.io/kueue/pkg/util/api"
4141
"sigs.k8s.io/kueue/pkg/util/queue"
42+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4243
stringsutils "sigs.k8s.io/kueue/pkg/util/strings"
4344
"sigs.k8s.io/kueue/pkg/workload"
4445
)
@@ -88,6 +89,7 @@ type clusterQueue struct {
8889

8990
workloadsNotAccountedForTAS sets.Set[workload.Reference]
9091
AdmissionScope *kueue.AdmissionScope
92+
roleTracker *roletracker.RoleTracker
9193
}
9294

9395
func (c *clusterQueue) GetName() kueue.ClusterQueueReference {
@@ -228,7 +230,7 @@ func (c *clusterQueue) updateQueueStatus(log logr.Logger) {
228230
if status != c.Status {
229231
log.V(3).Info("Updating status in cache", "clusterQueue", c.Name, "newStatus", status, "oldStatus", c.Status)
230232
c.Status = status
231-
metrics.ReportClusterQueueStatus(c.Name, c.Status)
233+
metrics.ReportClusterQueueStatus(c.Name, c.Status, c.roleTracker)
232234
}
233235
}
234236

@@ -464,14 +466,16 @@ func (c *clusterQueue) deleteWorkload(log logr.Logger, w *kueue.Workload) {
464466
}
465467

466468
func (c *clusterQueue) reportActiveWorkloads() {
467-
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(c.admittedWorkloadsCount))
468-
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(len(c.Workloads)))
469+
role := roletracker.GetMetricsRole(c.roleTracker)
470+
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(c.admittedWorkloadsCount))
471+
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(len(c.Workloads)))
469472
}
470473

471-
func (q *LocalQueue) reportActiveWorkloads() {
474+
func (q *LocalQueue) reportActiveWorkloads(roleTracker *roletracker.RoleTracker) {
472475
namespace, name := queue.MustParseLocalQueueReference(q.key)
473-
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.admittedWorkloads))
474-
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.reservingWorkloads))
476+
role := roletracker.GetMetricsRole(roleTracker)
477+
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.admittedWorkloads))
478+
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.reservingWorkloads))
475479
}
476480

477481
// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
@@ -501,7 +505,7 @@ func (c *clusterQueue) updateWorkloadUsage(log logr.Logger, wi *workload.Info, o
501505
lq.admittedWorkloads += op.asSignedOne()
502506
}
503507
if features.Enabled(features.LocalQueueMetrics) {
504-
lq.reportActiveWorkloads()
508+
lq.reportActiveWorkloads(c.roleTracker)
505509
}
506510
}
507511
}
@@ -571,7 +575,7 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
571575
}
572576
c.localQueues[qKey] = qImpl
573577
if features.Enabled(features.LocalQueueMetrics) {
574-
qImpl.reportActiveWorkloads()
578+
qImpl.reportActiveWorkloads(c.roleTracker)
575579
}
576580
return nil
577581
}

pkg/controller/core/admissioncheck_controller.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
4141
qcache "sigs.k8s.io/kueue/pkg/cache/queue"
4242
schdcache "sigs.k8s.io/kueue/pkg/cache/scheduler"
43+
"sigs.k8s.io/kueue/pkg/util/roletracker"
4344
"sigs.k8s.io/kueue/pkg/util/slices"
4445
)
4546

@@ -49,12 +50,13 @@ type AdmissionCheckUpdateWatcher interface {
4950

5051
// AdmissionCheckReconciler reconciles a AdmissionCheck object
5152
type AdmissionCheckReconciler struct {
52-
log logr.Logger
53-
qManager *qcache.Manager
54-
client client.Client
55-
cache *schdcache.Cache
56-
cqUpdateCh chan event.GenericEvent
57-
watchers []AdmissionCheckUpdateWatcher
53+
log logr.Logger
54+
qManager *qcache.Manager
55+
client client.Client
56+
cache *schdcache.Cache
57+
cqUpdateCh chan event.GenericEvent
58+
watchers []AdmissionCheckUpdateWatcher
59+
roleTracker *roletracker.RoleTracker
5860
}
5961

6062
var _ reconcile.Reconciler = (*AdmissionCheckReconciler)(nil)
@@ -64,13 +66,15 @@ func NewAdmissionCheckReconciler(
6466
client client.Client,
6567
qMgr *qcache.Manager,
6668
cache *schdcache.Cache,
69+
roleTracker *roletracker.RoleTracker,
6770
) *AdmissionCheckReconciler {
6871
return &AdmissionCheckReconciler{
69-
log: ctrl.Log.WithName("admissioncheck-reconciler"),
70-
qManager: qMgr,
71-
client: client,
72-
cache: cache,
73-
cqUpdateCh: make(chan event.GenericEvent, updateChBuffer),
72+
log: ctrl.Log.WithName("admissioncheck-reconciler"),
73+
qManager: qMgr,
74+
client: client,
75+
cache: cache,
76+
cqUpdateCh: make(chan event.GenericEvent, updateChBuffer),
77+
roleTracker: roleTracker,
7478
}
7579
}
7680

@@ -239,6 +243,7 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *confi
239243
WithOptions(controller.Options{
240244
NeedLeaderElection: ptr.To(false),
241245
MaxConcurrentReconciles: mgr.GetControllerOptions().GroupKindConcurrency[kueue.GroupVersion.WithKind("AdmissionCheck").GroupKind().String()],
246+
LogConstructor: roletracker.NewLogConstructor(r.roleTracker, "admissioncheck-reconciler"),
242247
}).
243248
WatchesRawSource(source.Channel(r.cqUpdateCh, &h)).
244249
Complete(WithLeadingManager(mgr, r, &kueue.AdmissionCheck{}, cfg))

0 commit comments

Comments
 (0)