Skip to content

Commit

Permalink
add metrics for workloads count
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Feb 21, 2025
1 parent 260070f commit ff328e6
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}

// Set metrics for workloads enablement
metrics.ReportExternalFrameworksSupported(cfg.Integrations.ExternalFrameworks)
metrics.ReportIntegrationsFrameworksSupported(cfg.Integrations.Frameworks)
}

// setupProbeEndpoints registers the health endpoints
Expand Down
254 changes: 254 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

type AdmissionResult string
type ClusterQueueStatus string
type FrameworkBool string

type LocalQueueReference struct {
Name string
Expand All @@ -57,6 +58,11 @@ const (
CQStatusActive ClusterQueueStatus = "active"
// CQStatusTerminating means the clusterQueue is in pending deletion.
CQStatusTerminating ClusterQueueStatus = "terminating"

// Provide a label to state if the frameworks are
// enabled in the configuration
FrameworkEnabled FrameworkBool = "true"
FrameworkDisabled FrameworkBool = "false"
)

var (
Expand Down Expand Up @@ -369,6 +375,147 @@ If the ClusterQueue has a weight of zero, this will return 9223372036854775807,
the maximum possible share value.`,
}, []string{"cluster_queue"},
)

// Metrics counting usage of frameworks

// Counter of jobs that are managed by Kueue
JobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "job_workload_count",
Help: `The number of batch jobs that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of jobs that Kueue manages.`,
}, []string{"enabled"},
)

JobSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "jobset_workload_count",
Help: `The number of jobsets that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of jobsets that Kueue manages.`,
}, []string{"enabled"},
)

AppWrapperWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "appwrapper_workload_count",
Help: `The number of appwrappers that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of appwrappers that Kueue manages.`,
}, []string{"enabled"},
)

PodWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "pod_workload_count",
Help: `The number of pod-based workloads that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of pod-based workloads that Kueue manages.`,
}, []string{"enabled"},
)

StatefulsetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "statefulset_workload_count",
Help: `The number of statefulsets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of statefulsets that Kueue manages.`,
}, []string{"enabled"},
)

DeploymentWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "deployment_workload_count",
Help: `The number of deployments that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of deployments that Kueue manages.`,
}, []string{"enabled"},
)

LeaderWorkerSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "leaderworkerset_workload_count",
Help: `The number of leaderworkersets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of leaderworkersets that Kueue manages.`,
}, []string{"enabled"},
)

KubeflowMPIWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_mpi_workload_count",
Help: `The number of mpi jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowPyTorchJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_pytorch_workload_count",
Help: `The number of pytorch jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowTensorFlowJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_tensorflow_workload_count",
Help: `The number of tensorflow jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowPaddleJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_paddle_workload_count",
Help: `The number of paddle jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowXGBoostJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_xgboost_workload_count",
Help: "The number of XGBoost jobs that Kueue is/has managed",
}, []string{"enabled"},
)

RayJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "rayjob_workload_count",
Help: "The number of ray job workloads that Kueue is/has managed",
}, []string{"enabled"},
)

RayClusterWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "raycluster_workload_count",
Help: "The number of ray cluster workloads that Kueue is/has managed",
}, []string{"enabled"},
)

ExternalFrameworksWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "externalframeworks_workload_count",
Help: "The number of external frameworks that Kueue is/has managed",
}, []string{"enabled"},
)
)

func generateExponentialBuckets(count int) []float64 {
Expand Down Expand Up @@ -431,6 +578,113 @@ func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) {
ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption)
}

func ReportIntegrationsFrameworksSupported(frameworks []string) {
if len(frameworks) == 0 {
JobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
JobSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
PodWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
}
for _, val := range frameworks {
switch val {
case "batch/job":
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "jobset.x-k8s.io/jobset":
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "ray.io/rayjob":
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/mpijob":
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/pytorchjob":
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/tfjob":
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/xgboostjob":
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "workload.codeflare.dev/appwrapper":
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "pod":
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "statefulset":
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "deployment":
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "leaderworkerset.x-k8s.io/leaderworkerset":
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
}

}
}

func ReportExternalFrameworksSupported(externalFrameworks []string) {
var externalFramework FrameworkBool = FrameworkDisabled
if len(externalFrameworks) > 0 {
externalFramework = FrameworkEnabled
}
ExternalFrameworksWorkloadCounter.WithLabelValues(string(externalFramework)).Add(0)
}

func CountExternalFramework() {
ExternalFrameworksWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobWorkloads() {
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobSetWorkloads() {
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayJobWorkloads() {
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayClusterWorkloads() {
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountMPIJobWorkloads() {
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPyTorchJobWorkloads() {
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPaddleJobWorkloads() {
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountXGBoostJobWorkloads() {
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPodWorkloads() {
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountStatefulSetWorkloads() {
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountDeploymentWorkloads() {
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountLeaderWorkerSetWorkloads() {
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference {
return LocalQueueReference{
Name: wl.Spec.QueueName,
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,17 @@ func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}

func TestReportExternalFrameworkMetricsEnabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{"test"})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkEnabled))
}

func TestReportExternalFrameworkMetricsDisabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkDisabled))
}

func TestReportIntegrationsFrameworksSupported(t *testing.T) {
ReportIntegrationsFrameworksSupported([]string{})
}

0 comments on commit ff328e6

Please sign in to comment.