Skip to content
Merged
10 changes: 10 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ const (
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
)

// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
// is in a terminal state. Terminal states are either Complete or Failed.
func IsJobDeploymentTerminal(status JobDeploymentStatus) bool {
terminalStatusSet := map[JobDeploymentStatus]struct{}{
JobDeploymentStatusComplete: {}, JobDeploymentStatusFailed: {},
}
_, ok := terminalStatusSet[status]
return ok
}

// JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed'
type JobFailedReason string

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 31 additions & 8 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
package metrics

import (
"strconv"

"github.com/prometheus/client_golang/prometheus"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

// RayJobMetricsCollector implements the prometheus.Collector interface to collect ray job metrics.
type RayJobMetricsCollector struct {
// Metrics
//go:generate mockgen -destination=mocks/ray_job_metrics_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics RayJobMetricsObserver
type RayJobMetricsObserver interface {
ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64)
}

// RayJobMetricsManager implements the prometheus.Collector and RayJobMetricsObserver interface to collect ray job metrics.
type RayJobMetricsManager struct {
rayJobExecutionDurationSeconds *prometheus.GaugeVec
}

// NewRayJobMetricsCollector creates a new RayJobMetricsCollector instance.
func NewRayJobMetricsCollector() *RayJobMetricsCollector {
collector := &RayJobMetricsCollector{}
// NewRayJobMetricsManager creates a new RayJobMetricsManager instance.
func NewRayJobMetricsManager() *RayJobMetricsManager {
collector := &RayJobMetricsManager{
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kuberay_job_execution_duration_seconds",
Help: "Duration from when the RayJob CR’s JobDeploymentStatus transitions from Initializing to either the Retrying state or a terminal state, such as Complete or Failed. The Retrying state indicates that the CR previously failed and that spec.backoffLimit is enabled.",
},
[]string{"name", "namespace", "job_deployment_status", "retry_count"},
),
}
return collector
}

// Describe implements prometheus.Collector interface Describe method.
func (c *RayJobMetricsCollector) Describe(_ chan<- *prometheus.Desc) {
func (c *RayJobMetricsManager) Describe(ch chan<- *prometheus.Desc) {
c.rayJobExecutionDurationSeconds.Describe(ch)
}

// Collect implements prometheus.Collector interface Collect method.
func (c *RayJobMetricsCollector) Collect(_ chan<- prometheus.Metric) {
func (c *RayJobMetricsManager) Collect(ch chan<- prometheus.Metric) {
c.rayJobExecutionDurationSeconds.Collect(ch)
}

func (c *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64) {
c.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
}
27 changes: 26 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type RayJobReconciler struct {
}

type RayJobReconcilerOptions struct {
RayJobMetricsCollector *metrics.RayJobMetricsCollector
RayJobMetricsManager *metrics.RayJobMetricsManager
}

// NewRayJobReconciler returns a new reconcile.Reconciler
Expand Down Expand Up @@ -430,9 +430,34 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info("Failed to update RayJob status", "error", err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
emitRayJobMetrics(r.options.RayJobMetricsManager, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

func emitRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
if rayJobMetricsManager == nil {
return
}
emitRayJobExecutionDuration(rayJobMetricsManager, rayJobName, rayJobNamespace, originalRayJobStatus, rayJobStatus)
}

func emitRayJobExecutionDuration(rayJobMetricsObserver metrics.RayJobMetricsObserver, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
// Emit kuberay_job_execution_duration_seconds when a job transitions from a non-terminal state to either a terminal state or a retrying state (following a failure).
if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) || rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) {
retryCount := 0
if originalRayJobStatus.Failed != nil {
retryCount += int(*originalRayJobStatus.Failed)
}
rayJobMetricsObserver.ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
rayJobStatus.JobDeploymentStatus,
retryCount,
time.Since(rayJobStatus.StartTime.Time).Seconds(),
)
}
}

// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {
Expand Down
99 changes: 99 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ package ray
import (
"context"
"errors"
"math"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks"
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
)
Expand Down Expand Up @@ -525,3 +531,96 @@ func TestFailedDeleteRayClusterEvent(t *testing.T) {

assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
}

func TestEmitRayJobExecutionDuration(t *testing.T) {
rayJobName := "test-job"
rayJobNamespace := "default"
mockTime := time.Now().Add(-60 * time.Second)

//nolint:govet // disable govet to keep the order of the struct fields
tests := []struct {
name string
originalRayJobStatus rayv1.RayJobStatus
rayJobStatus rayv1.RayJobStatus
expectMetricsCall bool
expectedJobDeploymentStatus rayv1.JobDeploymentStatus
expectedRetryCount int
expectedDuration float64
}{
{
name: "non-terminal to complete state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to failed state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to retrying state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
Failed: pointer.Int32(2),
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
expectedRetryCount: 2,
expectedDuration: 60.0,
},
{
name: "non-terminal to non-terminal state should not emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusInitializing,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
expectMetricsCall: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockObserver := mocks.NewMockRayJobMetricsObserver(ctrl)
if tt.expectMetricsCall {
mockObserver.EXPECT().
ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
tt.expectedJobDeploymentStatus,
tt.expectedRetryCount,
mock.MatchedBy(func(d float64) bool {
// Allow some wiggle room in timing
return math.Abs(d-tt.expectedDuration) < 1.0
}),
).Times(1)
}

emitRayJobExecutionDuration(mockObserver, rayJobName, rayJobNamespace, tt.originalRayJobStatus, tt.rayJobStatus)
})
}
}
2 changes: 2 additions & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
github.com/stretchr/testify v1.10.0
go.uber.org/mock v0.5.1
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.32.3
Expand Down Expand Up @@ -75,6 +76,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func main() {
exitOnError(err, "unable to start manager")

var rayClusterMetricCollector *metrics.RayClusterMetricCollector
var rayJobMetricsCollector *metrics.RayJobMetricsCollector
var rayJobMetricsManager *metrics.RayJobMetricsManager
if config.EnableMetrics {
rayClusterMetricCollector = metrics.NewRayClusterMetricCollector()
rayJobMetricsCollector = metrics.NewRayJobMetricsCollector()
rayJobMetricsManager = metrics.NewRayJobMetricsManager()
ctrlmetrics.Registry.MustRegister(
rayClusterMetricCollector,
rayJobMetricsCollector,
rayJobMetricsManager,
)
}

Expand All @@ -254,7 +254,7 @@ func main() {
"unable to create controller", "controller", "RayService")

rayJobOptions := ray.RayJobReconcilerOptions{
RayJobMetricsCollector: rayJobMetricsCollector,
RayJobMetricsManager: rayJobMetricsManager,
}
exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayJobOptions, config).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayJob")
Expand Down
Loading