Skip to content
Merged
8 changes: 8 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ const (
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
)

func IsJobDeploymentTerminal(status JobDeploymentStatus) bool {
terminalStatusSet := map[JobDeploymentStatus]struct{}{
JobDeploymentStatusComplete: {}, JobDeploymentStatusFailed: {},
}
_, ok := terminalStatusSet[status]
return ok
}

// JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed'
type JobFailedReason string

Expand Down
41 changes: 33 additions & 8 deletions ray-operator/controllers/ray/metrics/ray_job_metrics.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
package metrics

import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
)

// RayJobMetricsCollector implements the prometheus.Collector interface to collect ray job metrics.
type RayJobMetricsCollector struct {
// Metrics
// RayJobCollector implements the prometheus.Collector and ray.RayJobMetricsCollector interface to collect ray job metrics.
type RayJobCollector struct {
rayJobExecutionDurationSeconds *prometheus.GaugeVec
}

// NewRayJobMetricsCollector creates a new RayJobMetricsCollector instance.
func NewRayJobMetricsCollector() *RayJobMetricsCollector {
collector := &RayJobMetricsCollector{}
// NewRayJobCollector creates a new RayJobCollector instance.
func NewRayJobCollector() *RayJobCollector {
collector := &RayJobCollector{
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kuberay_job_execution_duration_seconds",
Help: "Duration from RayJob CR initialization to reaching a terminal state or retrying state, where retrying state indicates the CR was previously failed and backoff is enabled.",
},
[]string{"name", "namespace", "result", "retry_count"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better name for "result"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to job_deployment_result. Wdyt?

),
}
return collector
}

// Describe implements prometheus.Collector interface Describe method.
func (c *RayJobMetricsCollector) Describe(_ chan<- *prometheus.Desc) {
func (c *RayJobCollector) Describe(ch chan<- *prometheus.Desc) {
c.rayJobExecutionDurationSeconds.Describe(ch)
}

// Collect implements prometheus.Collector interface Collect method.
func (c *RayJobMetricsCollector) Collect(_ chan<- prometheus.Metric) {
func (c *RayJobCollector) Collect(ch chan<- prometheus.Metric) {
c.rayJobExecutionDurationSeconds.Collect(ch)
}

func (c *RayJobCollector) ObserveRayJobExecutionDuration(name, namespace, result string, retryCount int, duration float64) {
c.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, result, strconv.Itoa(retryCount)).Set(duration)
}

type RayJobNoopCollector struct{}

func NewRayJobNoopCollector() *RayJobNoopCollector {
return &RayJobNoopCollector{}
}

func (c *RayJobNoopCollector) ObserveRayJobExecutionDuration(_ string, _ string, _ string, _ int, _ float64) {
}
52 changes: 52 additions & 0 deletions ray-operator/controllers/ray/mocks/rayjob_controller_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 34 additions & 2 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)
Expand All @@ -45,8 +44,13 @@ type RayJobReconciler struct {
options RayJobReconcilerOptions
}

//go:generate mockgen -destination=mocks/rayjob_controller_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray RayJobMetricsCollector
type RayJobMetricsCollector interface {
ObserveRayJobExecutionDuration(name, namespace, result string, retryCount int, duration float64)
}

type RayJobReconcilerOptions struct {
RayJobMetricsCollector *metrics.RayJobMetricsCollector
RayJobMetricsCollector RayJobMetricsCollector
}

// NewRayJobReconciler returns a new reconcile.Reconciler
Expand Down Expand Up @@ -430,9 +434,37 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info("Failed to update RayJob status", "error", err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// Emit metrics for the RayJob
emitRayJobMetrics(r.options.RayJobMetricsCollector, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

func emitRayJobMetrics(rayJobMetricsCollector RayJobMetricsCollector, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
emitRayJobExecutionDuration(rayJobMetricsCollector, rayJobName, rayJobNamespace, originalRayJobStatus, rayJobStatus)
}

func emitRayJobExecutionDuration(rayJobMetricsCollector RayJobMetricsCollector, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
// Emit kuberay_job_execution_duration_seconds when a job transitions from a non-terminal state to either a terminal state or a retrying state (following a failure).
if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) || rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) {
retryCount := 0
if originalRayJobStatus.Failed != nil {
retryCount += int(*originalRayJobStatus.Failed)
}
result := rayJobStatus.JobDeploymentStatus
if result == rayv1.JobDeploymentStatusRetrying {
// If the job is in the retrying state, it was previously failed.
result = rayv1.JobDeploymentStatusFailed
}
rayJobMetricsCollector.ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
string(result),
retryCount,
time.Since(rayJobStatus.StartTime.Time).Seconds(),
)
}
}

// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {
Expand Down
99 changes: 99 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ package ray
import (
"context"
"errors"
"math"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/mocks"
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
)
Expand Down Expand Up @@ -525,3 +531,96 @@ func TestFailedDeleteRayClusterEvent(t *testing.T) {

assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
}

func TestEmitRayJobExecutionDuration(t *testing.T) {
rayJobName := "test-job"
rayJobNamespace := "default"
mockTime := time.Now().Add(-60 * time.Second)

//nolint:govet // disable govet to keep the order of the struct fields
tests := []struct {
name string
originalRayJobStatus rayv1.RayJobStatus
rayJobStatus rayv1.RayJobStatus
expectMetricsCall bool
expectedResult string
expectedRetryCount int
expectedDuration float64
}{
{
name: "non-terminal to complete state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusComplete),
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to failed state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusFailed),
expectedRetryCount: 0,
expectedDuration: 60.0,
},
{
name: "non-terminal to retrying state should emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
Failed: pointer.Int32(2),
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
StartTime: &metav1.Time{Time: mockTime},
},
expectMetricsCall: true,
expectedResult: string(rayv1.JobDeploymentStatusFailed),
expectedRetryCount: 2,
expectedDuration: 60.0,
},
{
name: "non-terminal to non-terminal state should not emit metrics",
originalRayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusInitializing,
},
rayJobStatus: rayv1.RayJobStatus{
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
},
expectMetricsCall: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockCollector := mocks.NewMockRayJobMetricsCollector(ctrl)
if tt.expectMetricsCall {
mockCollector.EXPECT().
ObserveRayJobExecutionDuration(
rayJobName,
rayJobNamespace,
tt.expectedResult,
tt.expectedRetryCount,
mock.MatchedBy(func(d float64) bool {
// Allow some wiggle room in timing
return math.Abs(d-tt.expectedDuration) < 1.0
}),
).Times(1)
}

emitRayJobMetrics(mockCollector, rayJobName, rayJobNamespace, tt.originalRayJobStatus, tt.rayJobStatus)
})
}
}
5 changes: 4 additions & 1 deletion ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand Down Expand Up @@ -125,7 +126,9 @@ var _ = BeforeSuite(func(ctx SpecContext) {
err = NewRayServiceReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayService controller")

rayJobOptions := RayJobReconcilerOptions{}
rayJobOptions := RayJobReconcilerOptions{
RayJobMetricsCollector: metrics.NewRayJobNoopCollector(),
}
err = NewRayJobReconciler(ctx, mgr, rayJobOptions, testClientProvider).SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayJob controller")

Expand Down
2 changes: 2 additions & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
github.com/stretchr/testify v1.10.0
go.uber.org/mock v0.5.1
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.32.3
Expand Down Expand Up @@ -75,6 +76,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/zapr"
routev1 "github.com/openshift/api/route/v1"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -231,10 +232,14 @@ func main() {
exitOnError(err, "unable to start manager")

var rayClusterMetricCollector *metrics.RayClusterMetricCollector
var rayJobMetricsCollector *metrics.RayJobMetricsCollector
var rayJobMetricsCollector ray.RayJobMetricsCollector = metrics.NewRayJobNoopCollector()
if config.EnableMetrics {
rayClusterMetricCollector = metrics.NewRayClusterMetricCollector()
rayJobMetricsCollector = metrics.NewRayJobMetricsCollector()
rayJobMetricsCollector = metrics.NewRayJobCollector()
rayJobMetricsCollector, ok := rayJobMetricsCollector.(prometheus.Collector)
if !ok {
exitOnError(fmt.Errorf("RayJobMetricsCollector does not implement prometheus.Collector"), "failed to register RayJobMetricsCollector")
}
ctrlmetrics.Registry.MustRegister(
rayClusterMetricCollector,
rayJobMetricsCollector,
Expand Down
Loading