Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,14 @@ func main() {
} else {
close(certsReady)
}
cacheOptions := []schdcache.Option{schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg))}
queueOptions := []qcache.Option{qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg))}
cacheOptions := []schdcache.Option{
schdcache.WithPodsReadyTracking(blockForPodsReady(&cfg)),
schdcache.WithRoleTracker(roleTracker),
}
queueOptions := []qcache.Option{
qcache.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(&cfg)),
qcache.WithRoleTracker(roleTracker),
}
if cfg.Resources != nil && len(cfg.Resources.ExcludeResourcePrefixes) > 0 {
cacheOptions = append(cacheOptions, schdcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))
queueOptions = append(queueOptions, qcache.WithExcludedResourcePrefixes(cfg.Resources.ExcludeResourcePrefixes))
Expand Down
14 changes: 12 additions & 2 deletions pkg/cache/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
afs "sigs.k8s.io/kueue/pkg/util/admissionfairsharing"
"sigs.k8s.io/kueue/pkg/util/queue"
"sigs.k8s.io/kueue/pkg/util/roletracker"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -87,6 +88,13 @@ func WithResourceTransformations(transforms []config.ResourceTransformation) Opt
}
}

// WithRoleTracker sets the roleTracker for HA metrics.
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
return func(m *Manager) {
m.roleTracker = tracker
}
}

// SetDRAReconcileChannel sets the DRA reconcile channel after manager creation.
func (m *Manager) SetDRAReconcileChannel(ch chan<- event.TypedGenericEvent[*kueue.Workload]) {
m.draReconcileChannel = ch
Expand Down Expand Up @@ -124,6 +132,8 @@ type Manager struct {
workloadUpdateWatchers []WorkloadUpdateWatcher

draReconcileChannel chan<- event.TypedGenericEvent[*kueue.Workload]

roleTracker *roletracker.RoleTracker
}

func NewManager(client client.Client, checker StatusChecker, options ...Option) *Manager {
Expand Down Expand Up @@ -717,7 +727,7 @@ func (m *Manager) reportLQPendingWorkloads(lq *LocalQueue) {
metrics.ReportLocalQueuePendingWorkloads(metrics.LocalQueueReference{
Name: lqName,
Namespace: namespace,
}, active, inadmissible)
}, active, inadmissible, m.roleTracker)
}

func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq *ClusterQueue) {
Expand All @@ -726,7 +736,7 @@ func (m *Manager) reportPendingWorkloads(cqName kueue.ClusterQueueReference, cq
inadmissible += active
active = 0
}
metrics.ReportPendingWorkloads(cqName, active, inadmissible)
metrics.ReportPendingWorkloads(cqName, active, inadmissible, m.roleTracker)
}

func (m *Manager) GetClusterQueueNames() []kueue.ClusterQueueReference {
Expand Down
13 changes: 12 additions & 1 deletion pkg/cache/scheduler/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/util/queue"
"sigs.k8s.io/kueue/pkg/util/roletracker"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -95,6 +96,13 @@ func WithAdmissionFairSharing(afs *config.AdmissionFairSharing) Option {
}
}

// WithRoleTracker sets the roleTracker for HA metrics.
func WithRoleTracker(tracker *roletracker.RoleTracker) Option {
return func(c *Cache) {
c.roleTracker = tracker
}
}

// Cache keeps track of the Workloads that got admitted through ClusterQueues.
type Cache struct {
sync.RWMutex
Expand All @@ -112,6 +120,8 @@ type Cache struct {
hm hierarchy.Manager[*clusterQueue, *cohort]

tasCache tasCache

roleTracker *roletracker.RoleTracker
}

func New(client client.Client, options ...Option) *Cache {
Expand Down Expand Up @@ -144,6 +154,7 @@ func (c *Cache) newClusterQueue(log logr.Logger, cq *kueue.ClusterQueue) (*clust
AdmissionScope: cq.Spec.AdmissionScope,

workloadsNotAccountedForTAS: sets.New[workload.Reference](),
roleTracker: c.roleTracker,
}
c.hm.AddClusterQueue(cqImpl)
c.hm.UpdateClusterQueueEdge(kueue.ClusterQueueReference(cq.Name), cq.Spec.CohortName)
Expand Down Expand Up @@ -359,7 +370,7 @@ func (c *Cache) TerminateClusterQueue(name kueue.ClusterQueueReference) {
defer c.Unlock()
if cq := c.hm.ClusterQueue(name); cq != nil {
cq.Status = terminating
metrics.ReportClusterQueueStatus(cq.Name, cq.Status)
metrics.ReportClusterQueueStatus(cq.Name, cq.Status, c.roleTracker)
}
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/cache/scheduler/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/queue"
"sigs.k8s.io/kueue/pkg/util/roletracker"
stringsutils "sigs.k8s.io/kueue/pkg/util/strings"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -88,6 +89,8 @@ type clusterQueue struct {

workloadsNotAccountedForTAS sets.Set[workload.Reference]
AdmissionScope *kueue.AdmissionScope

roleTracker *roletracker.RoleTracker
}

func (c *clusterQueue) GetName() kueue.ClusterQueueReference {
Expand Down Expand Up @@ -228,7 +231,7 @@ func (c *clusterQueue) updateQueueStatus(log logr.Logger) {
if status != c.Status {
log.V(3).Info("Updating status in cache", "clusterQueue", c.Name, "newStatus", status, "oldStatus", c.Status)
c.Status = status
metrics.ReportClusterQueueStatus(c.Name, c.Status)
metrics.ReportClusterQueueStatus(c.Name, c.Status, c.roleTracker)
}
}

Expand Down Expand Up @@ -464,14 +467,16 @@ func (c *clusterQueue) deleteWorkload(log logr.Logger, w *kueue.Workload) {
}

func (c *clusterQueue) reportActiveWorkloads() {
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(c.admittedWorkloadsCount))
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name)).Set(float64(len(c.Workloads)))
role := roletracker.GetRole(c.roleTracker)
metrics.AdmittedActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(c.admittedWorkloadsCount))
metrics.ReservingActiveWorkloads.WithLabelValues(string(c.Name), role).Set(float64(len(c.Workloads)))
}

func (q *LocalQueue) reportActiveWorkloads() {
func (q *LocalQueue) reportActiveWorkloads(tracker *roletracker.RoleTracker) {
role := roletracker.GetRole(tracker)
namespace, name := queue.MustParseLocalQueueReference(q.key)
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.admittedWorkloads))
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace).Set(float64(q.reservingWorkloads))
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.admittedWorkloads))
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(string(name), namespace, role).Set(float64(q.reservingWorkloads))
}

// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
Expand Down Expand Up @@ -501,7 +506,7 @@ func (c *clusterQueue) updateWorkloadUsage(log logr.Logger, wi *workload.Info, o
lq.admittedWorkloads += op.asSignedOne()
}
if features.Enabled(features.LocalQueueMetrics) {
lq.reportActiveWorkloads()
lq.reportActiveWorkloads(c.roleTracker)
}
}
}
Expand Down Expand Up @@ -571,7 +576,7 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
}
c.localQueues[qKey] = qImpl
if features.Enabled(features.LocalQueueMetrics) {
qImpl.reportActiveWorkloads()
qImpl.reportActiveWorkloads(c.roleTracker)
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (r *ClusterQueueReconciler) Create(e event.TypedCreateEvent[*kueue.ClusterQ
}

if r.reportResourceMetrics {
recordResourceMetrics(e.Object)
recordResourceMetrics(e.Object, r.roleTracker)
}

return true
Expand Down Expand Up @@ -347,7 +347,7 @@ func (r *ClusterQueueReconciler) Update(e event.TypedUpdateEvent[*kueue.ClusterQ
}

if r.reportResourceMetrics {
updateResourceMetrics(e.ObjectOld, e.ObjectNew)
updateResourceMetrics(e.ObjectOld, e.ObjectNew, r.roleTracker)
}
return true
}
Expand All @@ -357,7 +357,7 @@ func (r *ClusterQueueReconciler) Generic(e event.TypedGenericEvent[*kueue.Cluste
return true
}

func recordResourceMetrics(cq *kueue.ClusterQueue) {
func recordResourceMetrics(cq *kueue.ClusterQueue, tracker *roletracker.RoleTracker) {
for rgi := range cq.Spec.ResourceGroups {
rg := &cq.Spec.ResourceGroups[rgi]
for fqi := range rg.Flavors {
Expand All @@ -367,7 +367,7 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
nominal := resource.QuantityToFloat(&r.NominalQuota)
borrow := resource.QuantityToFloat(r.BorrowingLimit)
lend := resource.QuantityToFloat(r.LendingLimit)
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, cq.Name, string(fq.Name), string(r.Name), nominal, borrow, lend)
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, cq.Name, string(fq.Name), string(r.Name), nominal, borrow, lend, tracker)
}
}
}
Expand All @@ -376,28 +376,28 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
fr := &cq.Status.FlavorsReservation[fri]
for ri := range fr.Resources {
r := &fr.Resources[ri]
metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
}
}

for fui := range cq.Status.FlavorsUsage {
fu := &cq.Status.FlavorsUsage[fui]
for ri := range fu.Resources {
r := &fu.Resources[ri]
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, cq.Name, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, cq.Name, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
}
}
}

func updateResourceMetrics(oldCq, newCq *kueue.ClusterQueue) {
func updateResourceMetrics(oldCq, newCq *kueue.ClusterQueue, tracker *roletracker.RoleTracker) {
// if the cohort changed, drop all the old metrics
if oldCq.Spec.CohortName != newCq.Spec.CohortName {
metrics.ClearClusterQueueResourceMetrics(oldCq.Name)
} else {
// selective remove
clearOldResourceQuotas(oldCq, newCq)
}
recordResourceMetrics(newCq)
recordResourceMetrics(newCq, tracker)
}

func clearOldResourceQuotas(oldCq, newCq *kueue.ClusterQueue) {
Expand Down Expand Up @@ -578,7 +578,7 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
if weightedShare == math.Inf(1) {
weightedShare = math.NaN()
}
metrics.ReportClusterQueueWeightedShare(cq.Name, string(cq.Spec.CohortName), weightedShare)
metrics.ReportClusterQueueWeightedShare(cq.Name, string(cq.Spec.CohortName), weightedShare, r.roleTracker)
}
if cq.Status.FairSharing == nil {
cq.Status.FairSharing = &kueue.FairSharingStatus{}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/core/clusterqueue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
qcache "sigs.k8s.io/kueue/pkg/cache/queue"
schdcache "sigs.k8s.io/kueue/pkg/cache/scheduler"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/roletracker"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingmetrics "sigs.k8s.io/kueue/pkg/util/testing/metrics"
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
Expand Down Expand Up @@ -266,6 +267,7 @@ func resourceDataPoint(cohort, name, flavor, res string, v float64) testingmetri
"cluster_queue": name,
"flavor": flavor,
"resource": res,
"replica_role": roletracker.RoleStandalone,
},
Value: v,
}
Expand Down Expand Up @@ -492,14 +494,14 @@ func TestRecordResourceMetrics(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
recordResourceMetrics(tc.queue)
recordResourceMetrics(tc.queue, nil)
gotMetrics := allMetricsForQueue(tc.queue.Name)
if diff := cmp.Diff(tc.wantMetrics, gotMetrics, opts...); len(diff) != 0 {
t.Errorf("Unexpected metrics (-want,+got):\n%s", diff)
}

if tc.updatedQueue != nil {
updateResourceMetrics(tc.queue, tc.updatedQueue)
updateResourceMetrics(tc.queue, tc.updatedQueue, nil)
gotMetricsAfterUpdate := allMetricsForQueue(tc.queue.Name)
if diff := cmp.Diff(tc.wantUpdatedMetrics, gotMetricsAfterUpdate, opts...); len(diff) != 0 {
t.Errorf("Unexpected metrics (-want,+got):\n%s", diff)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/cohort_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *CohortReconciler) updateCohortStatusIfChanged(ctx context.Context, coho
}

if r.fairSharingEnabled {
metrics.ReportCohortWeightedShare(cohort.Name, stats.WeightedShare)
metrics.ReportCohortWeightedShare(cohort.Name, stats.WeightedShare, r.roleTracker)
if cohort.Status.FairSharing == nil {
cohort.Status.FairSharing = &kueue.FairSharingStatus{}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *LocalQueueReconciler) Create(e event.TypedCreateEvent[*kueue.LocalQueue
}

if features.Enabled(features.LocalQueueMetrics) {
recordLocalQueueUsageMetrics(e.Object)
recordLocalQueueUsageMetrics(e.Object, r.roleTracker)
}

return true
Expand Down Expand Up @@ -250,7 +250,7 @@ func (r *LocalQueueReconciler) Update(e event.TypedUpdateEvent[*kueue.LocalQueue
log.V(2).Info("Queue update event")

if features.Enabled(features.LocalQueueMetrics) {
updateLocalQueueResourceMetrics(e.ObjectNew)
updateLocalQueueResourceMetrics(e.ObjectNew, r.roleTracker)
}

oldStopPolicy := ptr.Deref(e.ObjectOld.Spec.StopPolicy, kueue.None)
Expand Down Expand Up @@ -366,22 +366,22 @@ func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueR
}
}

func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) {
func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue, tracker *roletracker.RoleTracker) {
for _, flavor := range queue.Status.FlavorsUsage {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
}
}
for _, flavor := range queue.Status.FlavorsReservation {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total), tracker)
}
}
}

func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) {
func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue, tracker *roletracker.RoleTracker) {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue))
recordLocalQueueUsageMetrics(queue)
recordLocalQueueUsageMetrics(queue, tracker)
}

func (r *LocalQueueReconciler) Generic(e event.TypedGenericEvent[*kueue.LocalQueue]) bool {
Expand Down Expand Up @@ -538,7 +538,7 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
metrics.ReportLocalQueueStatus(metrics.LocalQueueReference{
Name: kueue.LocalQueueName(queue.Name),
Namespace: queue.Namespace,
}, conditionStatus)
}, conditionStatus, r.roleTracker)
}
}
if !equality.Semantic.DeepEqual(oldStatus, queue.Status) {
Expand Down
Loading