Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add metrics for workloads count #4350

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}

// Set metrics for workloads enablement
metrics.ReportExternalFrameworksSupported(cfg.Integrations.ExternalFrameworks)
metrics.ReportIntegrationsFrameworksSupported(cfg.Integrations.Frameworks)
}

// setupProbeEndpoints registers the health endpoints
Expand Down
254 changes: 254 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

type AdmissionResult string
type ClusterQueueStatus string
type FrameworkBool string

type LocalQueueReference struct {
Name string
Expand All @@ -57,6 +58,11 @@ const (
CQStatusActive ClusterQueueStatus = "active"
// CQStatusTerminating means the clusterQueue is in pending deletion.
CQStatusTerminating ClusterQueueStatus = "terminating"

// Provide a label to state if the frameworks are
// enabled in the configuration
FrameworkEnabled FrameworkBool = "true"
FrameworkDisabled FrameworkBool = "false"
)

var (
Expand Down Expand Up @@ -369,6 +375,147 @@ If the ClusterQueue has a weight of zero, this will return 9223372036854775807,
the maximum possible share value.`,
}, []string{"cluster_queue"},
)

// Metrics counting usage of frameworks

// Counter of jobs that are managed by Kueue
JobWorkloadCounter = prometheus.NewCounterVec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't multiply the metrics by the framework name. Framework name should be a label in the metrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, increasing kind of metrics affect scheduling performance since the metrics counter need to take a lock every time.

So, I would recommend adding the kind of workload type like batch/v1 Job to the existing Workload counting metrics as @mimowo mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you all think I should have two metrics.

one for supported frameworks and the other for external?

I wasn’t sure if exposing the GVK for external frameworks would be valid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagined adding framework-type label to the existing Worklaod-related metric like

QuotaReservedWorkloadsTotal = prometheus.NewCounterVec(

@mimowo If you have any other view, let us know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of extending the existing workload metrics more than entirely new ones. If this is risky we may use a feature gate for a release or two

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can aggregate the metrics at the level of grafana, by summing over the CQ names. Then you will display results by frameworkName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you okay with including external frameworks in that same label?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with that, every workload needs to belong to some framework, either built-in or external, so otherwise we would need to say "". I guess user needs to have some other way (if she / he wants) to determine which are external and which built-in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, increasing kind of metrics affect scheduling performance since the metrics counter need to take a lock every time

Prometheus metric counter is thread safe no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus metric counter is thread safe no?

Yes, that is thread safe. I meant metrics counter take a lock internally. We do not need to take a lock in our side.

prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "job_workload_count",
Help: `The number of batch jobs that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of jobs that Kueue manages.`,
}, []string{"enabled"},
)

JobSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "jobset_workload_count",
Help: `The number of jobsets that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of jobsets that Kueue manages.`,
}, []string{"enabled"},
)

AppWrapperWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "appwrapper_workload_count",
Help: `The number of appwrappers that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of appwrappers that Kueue manages.`,
}, []string{"enabled"},
)

PodWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "pod_workload_count",
Help: `The number of pod-based workloads that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of pod-based workloads that Kueue manages.`,
}, []string{"enabled"},
)

StatefulsetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "statefulset_workload_count",
Help: `The number of statefulsets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of statefulsets that Kueue manages.`,
}, []string{"enabled"},
)

DeploymentWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "deployment_workload_count",
Help: `The number of deployments that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of deployments that Kueue manages.`,
}, []string{"enabled"},
)

LeaderWorkerSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "leaderworkerset_workload_count",
Help: `The number of leaderworkersets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of leaderworkersets that Kueue manages.`,
}, []string{"enabled"},
)

KubeflowMPIWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_mpi_workload_count",
Help: `The number of mpi jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowPyTorchJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_pytorch_workload_count",
Help: `The number of pytorch jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowTensorFlowJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_tensorflow_workload_count",
Help: `The number of tensorflow jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowPaddleJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_paddle_workload_count",
Help: `The number of paddle jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowXGBoostJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_xgboost_workload_count",
Help: "The number of XGBoost jobs that Kueue is/has managed",
}, []string{"enabled"},
)

RayJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "rayjob_workload_count",
Help: "The number of ray job workloads that Kueue is/has managed",
}, []string{"enabled"},
)

RayClusterWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "raycluster_workload_count",
Help: "The number of ray cluster workloads that Kueue is/has managed",
}, []string{"enabled"},
)

ExternalFrameworksWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "externalframeworks_workload_count",
Help: "The number of external frameworks that Kueue is/has managed",
}, []string{"enabled"},
)
)

func generateExponentialBuckets(count int) []float64 {
Expand Down Expand Up @@ -431,6 +578,113 @@ func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) {
ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption)
}

func ReportIntegrationsFrameworksSupported(frameworks []string) {
if len(frameworks) == 0 {
JobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
JobSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
PodWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
}
for _, val := range frameworks {
switch val {
case "batch/job":
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "jobset.x-k8s.io/jobset":
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "ray.io/rayjob":
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/mpijob":
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/pytorchjob":
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/tfjob":
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/xgboostjob":
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "workload.codeflare.dev/appwrapper":
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "pod":
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "statefulset":
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "deployment":
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "leaderworkerset.x-k8s.io/leaderworkerset":
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
}

}
}

func ReportExternalFrameworksSupported(externalFrameworks []string) {
var externalFramework FrameworkBool = FrameworkDisabled
if len(externalFrameworks) > 0 {
externalFramework = FrameworkEnabled
}
ExternalFrameworksWorkloadCounter.WithLabelValues(string(externalFramework)).Add(0)
}

func CountExternalFramework() {
ExternalFrameworksWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobWorkloads() {
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobSetWorkloads() {
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayJobWorkloads() {
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayClusterWorkloads() {
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountMPIJobWorkloads() {
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPyTorchJobWorkloads() {
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPaddleJobWorkloads() {
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountXGBoostJobWorkloads() {
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPodWorkloads() {
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountStatefulSetWorkloads() {
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountDeploymentWorkloads() {
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountLeaderWorkerSetWorkloads() {
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference {
return LocalQueueReference{
Name: wl.Spec.QueueName,
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,17 @@ func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}

func TestReportExternalFrameworkMetricsEnabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{"test"})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkEnabled))
}

func TestReportExternalFrameworkMetricsDisabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkDisabled))
}

func TestReportIntegrationsFrameworksSupported(t *testing.T) {
ReportIntegrationsFrameworksSupported([]string{})
}