Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,18 @@ type ControllerMetrics struct {
// EnableClusterQueueResources, if true the cluster queue resource usage and quotas
// metrics will be reported.
// +optional
EnableClusterQueueResources bool `json:"enableClusterQueueResources,omitempty"`
EnableClusterQueueResources bool `json:"enableClusterQueueResources,omitempty"`
CustomMetricTags CustomMetricTags `json:"customMetricTags,omitempty"`
}

type CustomMetricTags struct {
ClusterQueue []CustomMetricTag `json:"clusterQueue,omitempty"`
LocalQueue []CustomMetricTag `json:"localQueue,omitempty"`
}

type CustomMetricTag struct {
ResourceTag string `json:"resourceTag,omitempty"`
OverrideMetricTag *string `json:"overrideMetricTag,omitempty"`
}

// ControllerHealth defines the health configs.
Expand Down
4 changes: 3 additions & 1 deletion cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ func main() {
}
options.Metrics = metricsServerOptions

metrics.Register()
metrics.Register(metrics.Configuration{
CustomTags: cfg.Metrics.CustomMetricTags,
})

kubeConfig := ctrl.GetConfigOrDie()
if kubeConfig.UserAgent == "" {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
nominal := resource.QuantityToFloat(&r.NominalQuota)
borrow := resource.QuantityToFloat(r.BorrowingLimit)
lend := resource.QuantityToFloat(r.LendingLimit)
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, cq.Name, string(fq.Name), string(r.Name), nominal, borrow, lend)
metrics.ReportClusterQueueQuotas(cq.Spec.CohortName, *cq, string(fq.Name), string(r.Name), nominal, borrow, lend)
}
}
}
Expand All @@ -366,15 +366,16 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) {
fr := &cq.Status.FlavorsReservation[fri]
for ri := range fr.Resources {
r := &fr.Resources[ri]
metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total))

metrics.ReportClusterQueueResourceReservations(cq.Spec.CohortName, *cq, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}

for fui := range cq.Status.FlavorsUsage {
fu := &cq.Status.FlavorsUsage[fui]
for ri := range fu.Resources {
r := &fu.Resources[ri]
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, cq.Name, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
metrics.ReportClusterQueueResourceUsage(cq.Spec.CohortName, *cq, string(fu.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}
}
Expand Down Expand Up @@ -567,7 +568,7 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
if weightedShare == math.Inf(1) {
weightedShare = math.NaN()
}
metrics.ReportClusterQueueWeightedShare(cq.Name, string(cq.Spec.CohortName), weightedShare)
metrics.ReportClusterQueueWeightedShare(*cq, string(cq.Spec.CohortName), weightedShare)
}
if cq.Status.FairSharing == nil {
cq.Status.FairSharing = &kueue.FairSharingStatus{}
Expand Down
110 changes: 110 additions & 0 deletions pkg/metrics/custom_tag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package metrics

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/kueue/apis/config/v1beta1"
)

var customTagsConf = CustomTagsConfiguration{}

type CustomTagsConfiguration struct {
ClusterQueue CustomTagsObjectConfiguration
LocalQueue CustomTagsObjectConfiguration
}

type CustomTagsObjectConfiguration struct {
ResourceTags []string
MetricTags []string
}

func getPrometheusTag(tag v1beta1.CustomMetricTag) string {
if tag.OverrideMetricTag != nil {
return *tag.OverrideMetricTag
}
return tag.ResourceTag
}

type Metric[T any] struct {
Name string
Help string
StandardLabels []string
Buckets []float64
globalVariable **T
clusterQueueCustomLabels bool
localQueueCustomLabels bool
}

type MetricsGroup[T any] struct {
metrics []Metric[T]
initFunc func(Metric[T], []string) *T
once sync.Once
}

func (m *MetricsGroup[T]) Metrics() []Metric[T] {
return m.metrics
}

func (m *MetricsGroup[T]) InitFunc() func(Metric[T], []string) *T {
return m.initFunc
}

func (m *MetricsGroup[T]) init() {
for i, metric := range m.metrics {
labels := metric.StandardLabels
if metric.clusterQueueCustomLabels {
labels = append(metric.StandardLabels, customTagsConf.ClusterQueue.MetricTags...)
}
if metric.localQueueCustomLabels {
labels = append(metric.StandardLabels, customTagsConf.LocalQueue.MetricTags...)
}
*m.metrics[i].globalVariable = m.initFunc(metric, labels)
}
}

func getResourceTagValues(cq metav1.Object, customTags CustomTagsObjectConfiguration) []string {
tags := []string{}
for _, tag := range customTags.ResourceTags {
t, ok := cq.GetLabels()[tag]
if !ok {
t = cq.GetAnnotations()[tag]
}
tags = append(tags, t)
}
return tags
}

func getCustomTagsObjectConfiguration(customMetricTags []v1beta1.CustomMetricTag) CustomTagsObjectConfiguration {
customTagsObjectConf := CustomTagsObjectConfiguration{
ResourceTags: make([]string, 0, len(customMetricTags)),
MetricTags: make([]string, 0, len(customMetricTags)),
}
for _, t := range customMetricTags {
customTagsObjectConf.ResourceTags = append(customTagsObjectConf.ResourceTags, t.ResourceTag)
customTagsObjectConf.MetricTags = append(customTagsObjectConf.MetricTags, getPrometheusTag(t))
}
return customTagsObjectConf
}

func getConfiguration(customMetricTags *v1beta1.CustomMetricTags) CustomTagsConfiguration {
if customMetricTags == nil {
return CustomTagsConfiguration{}
}
return CustomTagsConfiguration{
ClusterQueue: getCustomTagsObjectConfiguration(customMetricTags.ClusterQueue),
LocalQueue: getCustomTagsObjectConfiguration(customMetricTags.LocalQueue),
}
}

func (m *MetricsGroup[T]) Init() {
m.once.Do(m.init)
}

func initCustomTagsMetric(customMetricsTagsConfiguration *v1beta1.CustomMetricTags) {
customTagsConf = getConfiguration(customMetricsTagsConfiguration)

customTagsCounterMetrics.Init()
customTagsGaugeMetrics.Init()
customTagsHistogramMetrics.Init()
}
213 changes: 213 additions & 0 deletions pkg/metrics/custom_tag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package metrics

import (
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
configv1beta1 "sigs.k8s.io/kueue/apis/config/v1beta1"
"testing"
)

func TestGetPrometheusTag(t *testing.T) {
overrideTag := "custom_tag"
tests := []struct {
name string
tag configv1beta1.CustomMetricTag
want string
}{
{
name: "no override, use resource tag",
tag: configv1beta1.CustomMetricTag{
ResourceTag: "resource_label",
},
want: "resource_label",
},
{
name: "with override, use override tag",
tag: configv1beta1.CustomMetricTag{
ResourceTag: "resource_label",
OverrideMetricTag: &overrideTag,
},
want: "custom_tag",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getPrometheusTag(tt.tag)
if got != tt.want {
t.Errorf("getPrometheusTag() = %v, want %v", got, tt.want)
}
})
}
}

func TestGetConfiguration(t *testing.T) {
overrideTag1 := "custom_cq_tag"
overrideTag2 := "custom_lq_tag"

tests := []struct {
name string
customMetricTags configv1beta1.CustomMetricTags
want CustomTagsConfiguration
}{
{
name: "cluster queue tags only",
customMetricTags: configv1beta1.CustomMetricTags{
ClusterQueue: []configv1beta1.CustomMetricTag{
{ResourceTag: "team"},
{ResourceTag: "env"},
},
LocalQueue: []configv1beta1.CustomMetricTag{},
},
want: CustomTagsConfiguration{
ClusterQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{"team", "env"},
MetricTags: []string{"team", "env"},
},
LocalQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{},
MetricTags: []string{},
},
},
},
{
name: "local queue tags only",
customMetricTags: configv1beta1.CustomMetricTags{
ClusterQueue: []configv1beta1.CustomMetricTag{},
LocalQueue: []configv1beta1.CustomMetricTag{
{ResourceTag: "project"},
},
},
want: CustomTagsConfiguration{
ClusterQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{},
MetricTags: []string{},
},
LocalQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{"project"},
MetricTags: []string{"project"},
},
},
},
{
name: "both cluster and local queue tags with overrides",
customMetricTags: configv1beta1.CustomMetricTags{
ClusterQueue: []configv1beta1.CustomMetricTag{
{ResourceTag: "team", OverrideMetricTag: &overrideTag1},
{ResourceTag: "env"},
},
LocalQueue: []configv1beta1.CustomMetricTag{
{ResourceTag: "project", OverrideMetricTag: &overrideTag2},
},
},
want: CustomTagsConfiguration{
ClusterQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{"team", "env"},
MetricTags: []string{"custom_cq_tag", "env"},
},
LocalQueue: CustomTagsObjectConfiguration{
ResourceTags: []string{"project"},
MetricTags: []string{"custom_lq_tag"},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getConfiguration(&tt.customMetricTags)
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("getConfiguration() mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestGetResourceTagValues(t *testing.T) {
tests := []struct {
name string
obj metav1.Object
customTags CustomTagsObjectConfiguration
want []string
}{
{
name: "empty tags configuration",
obj: &metav1.ObjectMeta{
Name: "test-queue",
Labels: map[string]string{
"team": "platform",
},
},
customTags: CustomTagsObjectConfiguration{
ResourceTags: []string{},
},
want: []string{},
},
{
name: "tags from labels only",
obj: &metav1.ObjectMeta{
Name: "test-queue",
Labels: map[string]string{
"team": "platform",
"env": "production",
},
},
customTags: CustomTagsObjectConfiguration{
ResourceTags: []string{"team", "env"},
},
want: []string{"platform", "production"},
},
{
name: "tags from annotations only",
obj: &metav1.ObjectMeta{
Name: "test-queue",
Annotations: map[string]string{
"team": "data",
"cost": "high",
},
},
customTags: CustomTagsObjectConfiguration{
ResourceTags: []string{"team", "cost"},
},
want: []string{"data", "high"},
},
{
name: "labels take precedence over annotations",
obj: &metav1.ObjectMeta{
Name: "test-queue",
Labels: map[string]string{
"team": "platform",
},
Annotations: map[string]string{
"team": "data",
},
},
customTags: CustomTagsObjectConfiguration{
ResourceTags: []string{"team"},
},
want: []string{"platform"},
},
{
name: "missing tags return empty strings",
obj: &metav1.ObjectMeta{
Name: "test-queue",
Labels: map[string]string{
"team": "platform",
},
},
customTags: CustomTagsObjectConfiguration{
ResourceTags: []string{"team", "missing_tag"},
},
want: []string{"platform", ""},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getResourceTagValues(tt.obj, tt.customTags)

if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("getResourceTagValues() mismatch (-want +got):\n%s", diff)
}
})
}
}
Loading