Skip to content

Commit

Permalink
use a metric for internal frameworks and one for external frameworks
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Feb 21, 2025
1 parent ff328e6 commit 4639bf5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 230 deletions.
244 changes: 17 additions & 227 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
CQStatusActive ClusterQueueStatus = "active"
// CQStatusTerminating means the clusterQueue is in pending deletion.
CQStatusTerminating ClusterQueueStatus = "terminating"

// Provide a label to state if the frameworks are
// enabled in the configuration
FrameworkEnabled FrameworkBool = "true"
FrameworkDisabled FrameworkBool = "false"
)

var (
Expand Down Expand Up @@ -379,142 +374,22 @@ the maximum possible share value.`,
// Metrics counting usage of frameworks

// Counter of jobs that are managed by Kueue
JobWorkloadCounter = prometheus.NewCounterVec(
WorkloadIntegrationsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "job_workload_count",
Name: "workload_integrations",
Help: `The number of batch jobs that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of jobs that Kueue manages.`,
}, []string{"enabled"},
)

JobSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "jobset_workload_count",
Help: `The number of jobsets that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of jobsets that Kueue manages.`,
}, []string{"enabled"},
)

AppWrapperWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "appwrapper_workload_count",
Help: `The number of appwrappers that Kueue is/has managed
Enabled will have a value on true or false
And the count will track the number of appwrappers that Kueue manages.`,
}, []string{"enabled"},
)

PodWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "pod_workload_count",
Help: `The number of pod-based workloads that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of pod-based workloads that Kueue manages.`,
}, []string{"enabled"},
)

StatefulsetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "statefulset_workload_count",
Help: `The number of statefulsets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of statefulsets that Kueue manages.`,
}, []string{"enabled"},
)

DeploymentWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "deployment_workload_count",
Help: `The number of deployments that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of deployments that Kueue manages.`,
}, []string{"enabled"},
)

LeaderWorkerSetWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "leaderworkerset_workload_count",
Help: `The number of leaderworkersets that Kueue is/has managed.
Enabled will have a value on true or false
And the count will track the number of leaderworkersets that Kueue manages.`,
}, []string{"enabled"},
)

KubeflowMPIWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_mpi_workload_count",
Help: `The number of mpi jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowPyTorchJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_pytorch_workload_count",
Help: `The number of pytorch jobs that Kueue is/has managed.
Enabled will have a value of true or false`,
}, []string{"enabled"},
)

KubeflowTensorFlowJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_tensorflow_workload_count",
Help: `The number of tensorflow jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowPaddleJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_paddle_workload_count",
Help: `The number of paddle jobs that Kueue is/has managed.
Enabled will have a value of true or false.`,
}, []string{"enabled"},
)

KubeflowXGBoostJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "kubeflow_xgboost_workload_count",
Help: "The number of XGBoost jobs that Kueue is/has managed",
}, []string{"enabled"},
)

RayJobWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "rayjob_workload_count",
Help: "The number of ray job workloads that Kueue is/has managed",
}, []string{"enabled"},
)

RayClusterWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "raycluster_workload_count",
Help: "The number of ray cluster workloads that Kueue is/has managed",
}, []string{"enabled"},
"Workload will specify what kind of workload`,
}, []string{"workload"},
)

ExternalFrameworksWorkloadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "externalframeworks_workload_count",
Help: "The number of external frameworks that Kueue is/has managed",
}, []string{"enabled"},
Name: "workload_external_integrations",
Help: `The number of external frameworks that Kueue is/has managed.
Workload will specify what kind of workload`,
}, []string{"workload"},
)
)

Expand Down Expand Up @@ -579,110 +454,23 @@ func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) {
}

func ReportIntegrationsFrameworksSupported(frameworks []string) {
if len(frameworks) == 0 {
JobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
JobSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
PodWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkDisabled)).Add(0)
}
for _, val := range frameworks {
switch val {
case "batch/job":
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "jobset.x-k8s.io/jobset":
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "ray.io/rayjob":
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/mpijob":
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/pytorchjob":
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/tfjob":
KubeflowTensorFlowJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "kubeflow.org/xgboostjob":
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "workload.codeflare.dev/appwrapper":
AppWrapperWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "pod":
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "statefulset":
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "deployment":
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
case "leaderworkerset.x-k8s.io/leaderworkerset":
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Add(0)
}

WorkloadIntegrationsCounter.WithLabelValues(val).Add(0)
}
}

func ReportExternalFrameworksSupported(externalFrameworks []string) {
var externalFramework FrameworkBool = FrameworkDisabled
if len(externalFrameworks) > 0 {
externalFramework = FrameworkEnabled
for _, val := range externalFrameworks {
ExternalFrameworksWorkloadCounter.WithLabelValues(val).Add(0)
}
ExternalFrameworksWorkloadCounter.WithLabelValues(string(externalFramework)).Add(0)
}

func CountExternalFramework() {
ExternalFrameworksWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobWorkloads() {
JobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountJobSetWorkloads() {
JobSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayJobWorkloads() {
RayJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountRayClusterWorkloads() {
RayClusterWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountMPIJobWorkloads() {
KubeflowMPIWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPyTorchJobWorkloads() {
KubeflowPyTorchJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPaddleJobWorkloads() {
KubeflowPaddleJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountXGBoostJobWorkloads() {
KubeflowXGBoostJobWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountPodWorkloads() {
PodWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountStatefulSetWorkloads() {
StatefulsetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
}

func CountDeploymentWorkloads() {
DeploymentWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
func CountExternalFramework(externalGVK string) {
ExternalFrameworksWorkloadCounter.WithLabelValues(externalGVK).Inc()
}

func CountLeaderWorkerSetWorkloads() {
LeaderWorkerSetWorkloadCounter.WithLabelValues(string(FrameworkEnabled)).Inc()
func CountFramework(framework string) {
WorkloadIntegrationsCounter.WithLabelValues(framework).Inc()
}

func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference {
Expand Down Expand Up @@ -879,6 +667,8 @@ func Register() {
ClusterQueueResourceBorrowingLimit,
ClusterQueueResourceLendingLimit,
ClusterQueueWeightedShare,
WorkloadIntegrationsCounter,
ExternalFrameworksWorkloadCounter,
)
if features.Enabled(features.LocalQueueMetrics) {
RegisterLQMetrics()
Expand Down
7 changes: 4 additions & 3 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {

func TestReportExternalFrameworkMetricsEnabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{"test"})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkEnabled))
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "workload", "test")
}

func TestReportExternalFrameworkMetricsDisabled(t *testing.T) {
ReportExternalFrameworksSupported([]string{})
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 1, "enabled", string(FrameworkDisabled))
expectFilteredMetricsCount(t, ExternalFrameworksWorkloadCounter, 0)
}

func TestReportIntegrationsFrameworksSupported(t *testing.T) {
ReportIntegrationsFrameworksSupported([]string{})
ReportIntegrationsFrameworksSupported([]string{"batch/job"})
expectFilteredMetricsCount(t, WorkloadIntegrationsCounter, 1, "workload", "batch/job")
}

0 comments on commit 4639bf5

Please sign in to comment.