Skip to content
Merged
10 changes: 10 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ const (
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
)

// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
// is in a terminal state. Terminal states are either Complete or Failed.
func IsJobDeploymentTerminal(status JobDeploymentStatus) bool {
terminalStatusSet := map[JobDeploymentStatus]struct{}{
JobDeploymentStatusComplete: {}, JobDeploymentStatusFailed: {},
}
_, ok := terminalStatusSet[status]
return ok
}

// JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed'
type JobFailedReason string

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
package metrics

import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
)

// RayJobMetricsCollector implements the prometheus.Collector interface to collect ray job metrics.
type RayJobMetricsCollector struct {
// Metrics
//go:generate mockgen -destination=mocks/ray_job_metrics_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics RayJobMetricsCollector
type RayJobMetricsCollector interface {
ObserveRayJobExecutionDuration(name, namespace, result string, retryCount int, duration float64)
}

// RayJobCollector implements the prometheus.Collector and RayJobMetricsCollector interface to collect ray job metrics.
type RayJobCollector struct {
rayJobExecutionDurationSeconds *prometheus.GaugeVec
}

// NewRayJobMetricsCollector creates a new RayJobMetricsCollector instance.
func NewRayJobMetricsCollector() *RayJobMetricsCollector {
collector := &RayJobMetricsCollector{}
// NewRayJobCollector creates a new RayJobCollector instance.
func NewRayJobCollector() *RayJobCollector {
collector := &RayJobCollector{
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kuberay_job_execution_duration_seconds",
Help: "Duration from when the RayJob CR’s JobDeploymentStatus transitions from Initializing to either the Retrying state or a terminal state, such as Complete or Failed. The Retrying state indicates that the CR previously failed and that spec.backoffLimit is enabled.",
},
[]string{"name", "namespace", "job_deployment_result", "retry_count"},
),
}
return collector
}

// Describe implements prometheus.Collector interface Describe method.
func (c *RayJobMetricsCollector) Describe(_ chan<- *prometheus.Desc) {
func (c *RayJobCollector) Describe(ch chan<- *prometheus.Desc) {
c.rayJobExecutionDurationSeconds.Describe(ch)
}

// Collect implements prometheus.Collector interface Collect method.
func (c *RayJobMetricsCollector) Collect(_ chan<- prometheus.Metric) {
func (c *RayJobCollector) Collect(ch chan<- prometheus.Metric) {
c.rayJobExecutionDurationSeconds.Collect(ch)
}

func (c *RayJobCollector) ObserveRayJobExecutionDuration(name, namespace, jobDeploymentResult string, retryCount int, duration float64) {
c.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, jobDeploymentResult, strconv.Itoa(retryCount)).Set(duration)
}
32 changes: 31 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type RayJobReconciler struct {
}

type RayJobReconcilerOptions struct {
RayJobMetricsCollector *metrics.RayJobMetricsCollector
RayJobMetricsCollector metrics.RayJobMetricsCollector
}

// NewRayJobReconciler returns a new reconcile.Reconciler
Expand Down Expand Up @@ -430,9 +430,39 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info("Failed to update RayJob status", "error", err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
emitRayJobMetrics(r.options.RayJobMetricsCollector, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

func emitRayJobMetrics(rayJobMetricsCollector metrics.RayJobMetricsCollector, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
if rayJobMetricsCollector == nil {
return
}
emitRayJobExecutionDuration(rayJobMetricsCollector, rayJobName, rayJobNamespace, originalRayJobStatus, rayJobStatus)
}

func emitRayJobExecutionDuration(rayJobMetricsCollector metrics.RayJobMetricsCollector, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
// Emit kuberay_job_execution_duration_seconds when a job transitions from a non-terminal state to either a terminal state or a retrying state (following a failure).
if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) || rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) {
retryCount := 0
if originalRayJobStatus.Failed != nil {
retryCount += int(*originalRayJobStatus.Failed)
}
jobDeploymentResult := rayJobStatus.JobDeploymentStatus
if jobDeploymentResult == rayv1.JobDeploymentStatusRetrying {
// If the job is in the retrying state, it was previously failed.
jobDeploymentResult = rayv1.JobDeploymentStatusFailed
}
rayJobMetricsCollector.ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
string(jobDeploymentResult),
retryCount,
time.Since(rayJobStatus.StartTime.Time).Seconds(),
)
}
}

// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {
Expand Down
99 changes: 99 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ package ray
import (
"context"
"errors"
"math"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks"
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
)
Expand Down Expand Up @@ -525,3 +531,96 @@ func TestFailedDeleteRayClusterEvent(t *testing.T) {

assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
}

func TestEmitRayJobExecutionDuration(t *testing.T) {
rayJobName := "test-job"
rayJobNamespace := "default"
mockTime := time.Now().Add(-60 * time.Second)

//nolint:govet // disable govet to keep the order of the struct fields
tests := []struct {
name string
originalRayJobStatus rayv1.RayJobStatus
rayJobStatus rayv1.RayJobStatus
expectMetricsCall bool
expectedResult string
expectedRetryCount int
expectedDuration float64
}{
{
name: "non-terminal to complete state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusComplete),
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to failed state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusFailed),
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to retrying state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
Failed: pointer.Int32(2),
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusFailed),
expectedRetryCount: 2,
expectedDuration: 60.0,
},
{
name: "non-terminal to non-terminal state should not emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusInitializing,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
expectMetricsCall: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockCollector := mocks.NewMockRayJobMetricsCollector(ctrl)
if tt.expectMetricsCall {
mockCollector.EXPECT().
ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
tt.expectedResult,
tt.expectedRetryCount,
mock.MatchedBy(func(d float64) bool {
// Allow some wiggle room in timing
return math.Abs(d-tt.expectedDuration) < 1.0
}),
).Times(1)
}

emitRayJobMetrics(mockCollector, rayJobName, rayJobNamespace, tt.originalRayJobStatus, tt.rayJobStatus)
})
}
}
2 changes: 2 additions & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
github.com/stretchr/testify v1.10.0
go.uber.org/mock v0.5.1
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.32.3
Expand Down Expand Up @@ -75,6 +76,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/zapr"
routev1 "github.com/openshift/api/route/v1"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -231,10 +232,14 @@ func main() {
exitOnError(err, "unable to start manager")

var rayClusterMetricCollector *metrics.RayClusterMetricCollector
var rayJobMetricsCollector *metrics.RayJobMetricsCollector
var rayJobMetricsCollector metrics.RayJobMetricsCollector
if config.EnableMetrics {
rayClusterMetricCollector = metrics.NewRayClusterMetricCollector()
rayJobMetricsCollector = metrics.NewRayJobMetricsCollector()
rayJobMetricsCollector = metrics.NewRayJobCollector()
rayJobMetricsCollector, ok := rayJobMetricsCollector.(prometheus.Collector)
if !ok {
exitOnError(fmt.Errorf("RayJobMetricsCollector does not implement prometheus.Collector"), "failed to register RayJobMetricsCollector")
}
ctrlmetrics.Registry.MustRegister(
rayClusterMetricCollector,
rayJobMetricsCollector,
Expand Down
Loading