diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 4cf9d1acfe5..3f044d8a4c6 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -56,6 +56,16 @@ const ( JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting" ) +// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus +// is in a terminal state. Terminal states are either Complete or Failed. +func IsJobDeploymentTerminal(status JobDeploymentStatus) bool { + terminalStatusSet := map[JobDeploymentStatus]struct{}{ + JobDeploymentStatusComplete: {}, JobDeploymentStatusFailed: {}, + } + _, ok := terminalStatusSet[status] + return ok +} + // JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed' type JobFailedReason string diff --git a/ray-operator/controllers/ray/metrics/mocks/ray_job_metrics_mock.go b/ray-operator/controllers/ray/metrics/mocks/ray_job_metrics_mock.go new file mode 100644 index 00000000000..0248670a571 --- /dev/null +++ b/ray-operator/controllers/ray/metrics/mocks/ray_job_metrics_mock.go @@ -0,0 +1,53 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics (interfaces: RayJobMetricsObserver) +// +// Generated by this command: +// +// mockgen -destination=mocks/ray_job_metrics_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics RayJobMetricsObserver +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + gomock "go.uber.org/mock/gomock" +) + +// MockRayJobMetricsObserver is a mock of RayJobMetricsObserver interface. +type MockRayJobMetricsObserver struct { + ctrl *gomock.Controller + recorder *MockRayJobMetricsObserverMockRecorder + isgomock struct{} +} + +// MockRayJobMetricsObserverMockRecorder is the mock recorder for MockRayJobMetricsObserver. +type MockRayJobMetricsObserverMockRecorder struct { + mock *MockRayJobMetricsObserver +} + +// NewMockRayJobMetricsObserver creates a new mock instance. +func NewMockRayJobMetricsObserver(ctrl *gomock.Controller) *MockRayJobMetricsObserver { + mock := &MockRayJobMetricsObserver{ctrl: ctrl} + mock.recorder = &MockRayJobMetricsObserverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRayJobMetricsObserver) EXPECT() *MockRayJobMetricsObserverMockRecorder { + return m.recorder +} + +// ObserveRayJobExecutionDuration mocks base method. +func (m *MockRayJobMetricsObserver) ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus v1.JobDeploymentStatus, retryCount int, duration float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ObserveRayJobExecutionDuration", name, namespace, jobDeploymentStatus, retryCount, duration) +} + +// ObserveRayJobExecutionDuration indicates an expected call of ObserveRayJobExecutionDuration. +func (mr *MockRayJobMetricsObserverMockRecorder) ObserveRayJobExecutionDuration(name, namespace, jobDeploymentStatus, retryCount, duration any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObserveRayJobExecutionDuration", reflect.TypeOf((*MockRayJobMetricsObserver)(nil).ObserveRayJobExecutionDuration), name, namespace, jobDeploymentStatus, retryCount, duration) +} diff --git a/ray-operator/controllers/ray/metrics/ray_job_metrics.go b/ray-operator/controllers/ray/metrics/ray_job_metrics.go index 0c9565a3006..4bc0233771f 100644 --- a/ray-operator/controllers/ray/metrics/ray_job_metrics.go +++ b/ray-operator/controllers/ray/metrics/ray_job_metrics.go @@ -1,24 +1,47 @@ package metrics import ( + "strconv" + "github.com/prometheus/client_golang/prometheus" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) -// RayJobMetricsCollector implements the prometheus.Collector interface to collect ray job metrics. -type RayJobMetricsCollector struct { - // Metrics +//go:generate mockgen -destination=mocks/ray_job_metrics_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics RayJobMetricsObserver +type RayJobMetricsObserver interface { + ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64) +} + +// RayJobMetricsManager implements the prometheus.Collector and RayJobMetricsObserver interface to collect ray job metrics. +type RayJobMetricsManager struct { + rayJobExecutionDurationSeconds *prometheus.GaugeVec } -// NewRayJobMetricsCollector creates a new RayJobMetricsCollector instance. -func NewRayJobMetricsCollector() *RayJobMetricsCollector { - collector := &RayJobMetricsCollector{} +// NewRayJobMetricsManager creates a new RayJobMetricsManager instance. +func NewRayJobMetricsManager() *RayJobMetricsManager { + collector := &RayJobMetricsManager{ + rayJobExecutionDurationSeconds: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kuberay_job_execution_duration_seconds", + Help: "Duration from when the RayJob CR’s JobDeploymentStatus transitions from Initializing to either the Retrying state or a terminal state, such as Complete or Failed. The Retrying state indicates that the CR previously failed and that spec.backoffLimit is enabled.", + }, + []string{"name", "namespace", "job_deployment_status", "retry_count"}, + ), + } return collector } // Describe implements prometheus.Collector interface Describe method. -func (c *RayJobMetricsCollector) Describe(_ chan<- *prometheus.Desc) { +func (c *RayJobMetricsManager) Describe(ch chan<- *prometheus.Desc) { + c.rayJobExecutionDurationSeconds.Describe(ch) } // Collect implements prometheus.Collector interface Collect method. -func (c *RayJobMetricsCollector) Collect(_ chan<- prometheus.Metric) { +func (c *RayJobMetricsManager) Collect(ch chan<- prometheus.Metric) { + c.rayJobExecutionDurationSeconds.Collect(ch) +} + +func (c *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64) { + c.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration) } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index c211b6dd06d..537301b1e4d 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -46,7 +46,7 @@ type RayJobReconciler struct { } type RayJobReconcilerOptions struct { - RayJobMetricsCollector *metrics.RayJobMetricsCollector + RayJobMetricsManager *metrics.RayJobMetricsManager } // NewRayJobReconciler returns a new reconcile.Reconciler @@ -430,9 +430,34 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) logger.Info("Failed to update RayJob status", "error", err) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } + emitRayJobMetrics(r.options.RayJobMetricsManager, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil } +func emitRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) { + if rayJobMetricsManager == nil { + return + } + emitRayJobExecutionDuration(rayJobMetricsManager, rayJobName, rayJobNamespace, originalRayJobStatus, rayJobStatus) +} + +func emitRayJobExecutionDuration(rayJobMetricsObserver metrics.RayJobMetricsObserver, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) { + // Emit kuberay_job_execution_duration_seconds when a job transitions from a non-terminal state to either a terminal state or a retrying state (following a failure). + if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) || rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) { + retryCount := 0 + if originalRayJobStatus.Failed != nil { + retryCount += int(*originalRayJobStatus.Failed) + } + rayJobMetricsObserver.ObserveRayJobExecutionDuration( + rayJobName, + rayJobNamespace, + rayJobStatus.JobDeploymentStatus, + retryCount, + time.Since(rayJobStatus.StartTime.Time).Seconds(), + ) + } +} + // checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit, // the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying. func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) { diff --git a/ray-operator/controllers/ray/rayjob_controller_unit_test.go b/ray-operator/controllers/ray/rayjob_controller_unit_test.go index 39e63c98f67..4bee882f8ad 100644 --- a/ray-operator/controllers/ray/rayjob_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_unit_test.go @@ -3,22 +3,28 @@ package ray import ( "context" "errors" + "math" "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks" utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" ) @@ -525,3 +531,96 @@ func TestFailedDeleteRayClusterEvent(t *testing.T) { assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n")) } + +func TestEmitRayJobExecutionDuration(t *testing.T) { + rayJobName := "test-job" + rayJobNamespace := "default" + mockTime := time.Now().Add(-60 * time.Second) + + //nolint:govet // disable govet to keep the order of the struct fields + tests := []struct { + name string + originalRayJobStatus rayv1.RayJobStatus + rayJobStatus rayv1.RayJobStatus + expectMetricsCall bool + expectedJobDeploymentStatus rayv1.JobDeploymentStatus + expectedRetryCount int + expectedDuration float64 + }{ + { + name: "non-terminal to complete state should emit metrics", + originalRayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + }, + rayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusComplete, + StartTime: &metav1.Time{Time: mockTime}, + }, + expectMetricsCall: true, + expectedJobDeploymentStatus: rayv1.JobDeploymentStatusComplete, + expectedRetryCount: 0, + expectedDuration: 60.0, + }, + { + name: "non-terminal to failed state should emit metrics", + originalRayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + }, + rayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusFailed, + StartTime: &metav1.Time{Time: mockTime}, + }, + expectMetricsCall: true, + expectedJobDeploymentStatus: rayv1.JobDeploymentStatusFailed, + expectedRetryCount: 0, + expectedDuration: 60.0, + }, + { + name: "non-terminal to retrying state should emit metrics", + originalRayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + Failed: pointer.Int32(2), + }, + rayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusRetrying, + StartTime: &metav1.Time{Time: mockTime}, + }, + expectMetricsCall: true, + expectedJobDeploymentStatus: rayv1.JobDeploymentStatusRetrying, + expectedRetryCount: 2, + expectedDuration: 60.0, + }, + { + name: "non-terminal to non-terminal state should not emit metrics", + originalRayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusInitializing, + }, + rayJobStatus: rayv1.RayJobStatus{ + JobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + }, + expectMetricsCall: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockObserver := mocks.NewMockRayJobMetricsObserver(ctrl) + if tt.expectMetricsCall { + mockObserver.EXPECT(). + ObserveRayJobExecutionDuration( + rayJobName, + rayJobNamespace, + tt.expectedJobDeploymentStatus, + tt.expectedRetryCount, + mock.MatchedBy(func(d float64) bool { + // Allow some wiggle room in timing + return math.Abs(d-tt.expectedDuration) < 1.0 + }), + ).Times(1) + } + + emitRayJobExecutionDuration(mockObserver, rayJobName, rayJobNamespace, tt.originalRayJobStatus, tt.rayJobStatus) + }) + } +} diff --git a/ray-operator/go.mod b/ray-operator/go.mod index fe294615ff8..b3e5e00247c 100644 --- a/ray-operator/go.mod +++ b/ray-operator/go.mod @@ -18,6 +18,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.22.0 github.com/stretchr/testify v1.10.0 + go.uber.org/mock v0.5.1 go.uber.org/zap v1.27.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 k8s.io/api v0.32.3 @@ -75,6 +76,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.6 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect diff --git a/ray-operator/go.sum b/ray-operator/go.sum index 459cff00486..f7cd2c3ffe5 100644 --- a/ray-operator/go.sum +++ b/ray-operator/go.sum @@ -118,6 +118,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= @@ -131,6 +133,8 @@ go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+ go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.5.1 h1:ASgazW/qBmR+A32MYFDB6E2POoTgOwT509VP0CT/fjs= +go.uber.org/mock v0.5.1/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= diff --git a/ray-operator/main.go b/ray-operator/main.go index 00e2f0c7127..bb8f17603ea 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -231,13 +231,13 @@ func main() { exitOnError(err, "unable to start manager") var rayClusterMetricCollector *metrics.RayClusterMetricCollector - var rayJobMetricsCollector *metrics.RayJobMetricsCollector + var rayJobMetricsManager *metrics.RayJobMetricsManager if config.EnableMetrics { rayClusterMetricCollector = metrics.NewRayClusterMetricCollector() - rayJobMetricsCollector = metrics.NewRayJobMetricsCollector() + rayJobMetricsManager = metrics.NewRayJobMetricsManager() ctrlmetrics.Registry.MustRegister( rayClusterMetricCollector, - rayJobMetricsCollector, + rayJobMetricsManager, ) } @@ -254,7 +254,7 @@ func main() { "unable to create controller", "controller", "RayService") rayJobOptions := ray.RayJobReconcilerOptions{ - RayJobMetricsCollector: rayJobMetricsCollector, + RayJobMetricsManager: rayJobMetricsManager, } exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayJobOptions, config).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayJob")