Skip to content

Commit 78c7756

Browse files
troychiulaurafitzgerald
authored andcommitted
[SLI Metrics] kuberay_job_execution_duration_seconds (ray-project#3488)
1 parent b9fdd93 commit 78c7756

File tree

8 files changed

+227
-13
lines changed

8 files changed

+227
-13
lines changed

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ const (
5656
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
5757
)
5858

59+
// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
60+
// is in a terminal state. Terminal states are either Complete or Failed.
61+
func IsJobDeploymentTerminal(status JobDeploymentStatus) bool {
62+
terminalStatusSet := map[JobDeploymentStatus]struct{}{
63+
JobDeploymentStatusComplete: {}, JobDeploymentStatusFailed: {},
64+
}
65+
_, ok := terminalStatusSet[status]
66+
return ok
67+
}
68+
5969
// JobFailedReason indicates the reason the RayJob changes its JobDeploymentStatus to 'Failed'
6070
type JobFailedReason string
6171

ray-operator/controllers/ray/metrics/mocks/ray_job_metrics_mock.go

Lines changed: 53 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,47 @@
11
package metrics
22

33
import (
4+
"strconv"
5+
46
"github.com/prometheus/client_golang/prometheus"
7+
8+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
59
)
610

7-
// RayJobMetricsCollector implements the prometheus.Collector interface to collect ray job metrics.
8-
type RayJobMetricsCollector struct {
9-
// Metrics
11+
//go:generate mockgen -destination=mocks/ray_job_metrics_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics RayJobMetricsObserver
12+
type RayJobMetricsObserver interface {
13+
ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64)
14+
}
15+
16+
// RayJobMetricsManager implements the prometheus.Collector and RayJobMetricsObserver interface to collect ray job metrics.
17+
type RayJobMetricsManager struct {
18+
rayJobExecutionDurationSeconds *prometheus.GaugeVec
1019
}
1120

12-
// NewRayJobMetricsCollector creates a new RayJobMetricsCollector instance.
13-
func NewRayJobMetricsCollector() *RayJobMetricsCollector {
14-
collector := &RayJobMetricsCollector{}
21+
// NewRayJobMetricsManager creates a new RayJobMetricsManager instance.
22+
func NewRayJobMetricsManager() *RayJobMetricsManager {
23+
collector := &RayJobMetricsManager{
24+
rayJobExecutionDurationSeconds: prometheus.NewGaugeVec(
25+
prometheus.GaugeOpts{
26+
Name: "kuberay_job_execution_duration_seconds",
27+
Help: "Duration from when the RayJob CR’s JobDeploymentStatus transitions from Initializing to either the Retrying state or a terminal state, such as Complete or Failed. The Retrying state indicates that the CR previously failed and that spec.backoffLimit is enabled.",
28+
},
29+
[]string{"name", "namespace", "job_deployment_status", "retry_count"},
30+
),
31+
}
1532
return collector
1633
}
1734

1835
// Describe implements prometheus.Collector interface Describe method.
19-
func (c *RayJobMetricsCollector) Describe(_ chan<- *prometheus.Desc) {
36+
func (c *RayJobMetricsManager) Describe(ch chan<- *prometheus.Desc) {
37+
c.rayJobExecutionDurationSeconds.Describe(ch)
2038
}
2139

2240
// Collect implements prometheus.Collector interface Collect method.
23-
func (c *RayJobMetricsCollector) Collect(_ chan<- prometheus.Metric) {
41+
func (c *RayJobMetricsManager) Collect(ch chan<- prometheus.Metric) {
42+
c.rayJobExecutionDurationSeconds.Collect(ch)
43+
}
44+
45+
func (c *RayJobMetricsManager) ObserveRayJobExecutionDuration(name, namespace string, jobDeploymentStatus rayv1.JobDeploymentStatus, retryCount int, duration float64) {
46+
c.rayJobExecutionDurationSeconds.WithLabelValues(name, namespace, string(jobDeploymentStatus), strconv.Itoa(retryCount)).Set(duration)
2447
}

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type RayJobReconciler struct {
4646
}
4747

4848
type RayJobReconcilerOptions struct {
49-
RayJobMetricsCollector *metrics.RayJobMetricsCollector
49+
RayJobMetricsManager *metrics.RayJobMetricsManager
5050
}
5151

5252
// NewRayJobReconciler returns a new reconcile.Reconciler
@@ -430,9 +430,34 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
430430
logger.Info("Failed to update RayJob status", "error", err)
431431
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
432432
}
433+
emitRayJobMetrics(r.options.RayJobMetricsManager, rayJobInstance.Name, rayJobInstance.Namespace, originalRayJobInstance.Status, rayJobInstance.Status)
433434
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
434435
}
435436

437+
func emitRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
438+
if rayJobMetricsManager == nil {
439+
return
440+
}
441+
emitRayJobExecutionDuration(rayJobMetricsManager, rayJobName, rayJobNamespace, originalRayJobStatus, rayJobStatus)
442+
}
443+
444+
func emitRayJobExecutionDuration(rayJobMetricsObserver metrics.RayJobMetricsObserver, rayJobName, rayJobNamespace string, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
445+
// Emit kuberay_job_execution_duration_seconds when a job transitions from a non-terminal state to either a terminal state or a retrying state (following a failure).
446+
if !rayv1.IsJobDeploymentTerminal(originalRayJobStatus.JobDeploymentStatus) && (rayv1.IsJobDeploymentTerminal(rayJobStatus.JobDeploymentStatus) || rayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusRetrying) {
447+
retryCount := 0
448+
if originalRayJobStatus.Failed != nil {
449+
retryCount += int(*originalRayJobStatus.Failed)
450+
}
451+
rayJobMetricsObserver.ObserveRayJobExecutionDuration(
452+
rayJobName,
453+
rayJobNamespace,
454+
rayJobStatus.JobDeploymentStatus,
455+
retryCount,
456+
time.Since(rayJobStatus.StartTime.Time).Seconds(),
457+
)
458+
}
459+
}
460+
436461
// checkBackoffLimitAndUpdateStatusIfNeeded determines if a RayJob is eligible for retry based on the configured backoff limit,
437462
// the job's success status, and its failure status. If eligible, sets the JobDeploymentStatus to Retrying.
438463
func checkBackoffLimitAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) {

ray-operator/controllers/ray/rayjob_controller_unit_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,28 @@ package ray
33
import (
44
"context"
55
"errors"
6+
"math"
67
"strings"
78
"testing"
9+
"time"
810

911
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
1013
"github.com/stretchr/testify/require"
14+
"go.uber.org/mock/gomock"
1115
batchv1 "k8s.io/api/batch/v1"
1216
corev1 "k8s.io/api/core/v1"
1317
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1418
"k8s.io/apimachinery/pkg/runtime"
1519
"k8s.io/apimachinery/pkg/types"
1620
"k8s.io/client-go/tools/record"
21+
"k8s.io/utils/pointer"
1722
"sigs.k8s.io/controller-runtime/pkg/client"
1823
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
1924
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
2025

2126
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
27+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks"
2228
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2329
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
2430
)
@@ -525,3 +531,96 @@ func TestFailedDeleteRayClusterEvent(t *testing.T) {
525531

526532
assert.Truef(t, foundFailureEvent, "Expected event to be generated for cluster deletion failure, got events: %s", strings.Join(events, "\n"))
527533
}
534+
535+
func TestEmitRayJobExecutionDuration(t *testing.T) {
536+
rayJobName := "test-job"
537+
rayJobNamespace := "default"
538+
mockTime := time.Now().Add(-60 * time.Second)
539+
540+
//nolint:govet // disable govet to keep the order of the struct fields
541+
tests := []struct {
542+
name string
543+
originalRayJobStatus rayv1.RayJobStatus
544+
rayJobStatus rayv1.RayJobStatus
545+
expectMetricsCall bool
546+
expectedJobDeploymentStatus rayv1.JobDeploymentStatus
547+
expectedRetryCount int
548+
expectedDuration float64
549+
}{
550+
{
551+
name: "non-terminal to complete state should emit metrics",
552+
originalRayJobStatus: rayv1.RayJobStatus{
553+
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
554+
},
555+
rayJobStatus: rayv1.RayJobStatus{
556+
JobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
557+
StartTime: &metav1.Time{Time: mockTime},
558+
},
559+
expectMetricsCall: true,
560+
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusComplete,
561+
expectedRetryCount: 0,
562+
expectedDuration: 60.0,
563+
},
564+
{
565+
name: "non-terminal to failed state should emit metrics",
566+
originalRayJobStatus: rayv1.RayJobStatus{
567+
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
568+
},
569+
rayJobStatus: rayv1.RayJobStatus{
570+
JobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
571+
StartTime: &metav1.Time{Time: mockTime},
572+
},
573+
expectMetricsCall: true,
574+
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusFailed,
575+
expectedRetryCount: 0,
576+
expectedDuration: 60.0,
577+
},
578+
{
579+
name: "non-terminal to retrying state should emit metrics",
580+
originalRayJobStatus: rayv1.RayJobStatus{
581+
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
582+
Failed: pointer.Int32(2),
583+
},
584+
rayJobStatus: rayv1.RayJobStatus{
585+
JobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
586+
StartTime: &metav1.Time{Time: mockTime},
587+
},
588+
expectMetricsCall: true,
589+
expectedJobDeploymentStatus: rayv1.JobDeploymentStatusRetrying,
590+
expectedRetryCount: 2,
591+
expectedDuration: 60.0,
592+
},
593+
{
594+
name: "non-terminal to non-terminal state should not emit metrics",
595+
originalRayJobStatus: rayv1.RayJobStatus{
596+
JobDeploymentStatus: rayv1.JobDeploymentStatusInitializing,
597+
},
598+
rayJobStatus: rayv1.RayJobStatus{
599+
JobDeploymentStatus: rayv1.JobDeploymentStatusRunning,
600+
},
601+
expectMetricsCall: false,
602+
},
603+
}
604+
605+
for _, tt := range tests {
606+
t.Run(tt.name, func(t *testing.T) {
607+
ctrl := gomock.NewController(t)
608+
mockObserver := mocks.NewMockRayJobMetricsObserver(ctrl)
609+
if tt.expectMetricsCall {
610+
mockObserver.EXPECT().
611+
ObserveRayJobExecutionDuration(
612+
rayJobName,
613+
rayJobNamespace,
614+
tt.expectedJobDeploymentStatus,
615+
tt.expectedRetryCount,
616+
mock.MatchedBy(func(d float64) bool {
617+
// Allow some wiggle room in timing
618+
return math.Abs(d-tt.expectedDuration) < 1.0
619+
}),
620+
).Times(1)
621+
}
622+
623+
emitRayJobExecutionDuration(mockObserver, rayJobName, rayJobNamespace, tt.originalRayJobStatus, tt.rayJobStatus)
624+
})
625+
}
626+
}

ray-operator/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/pkg/errors v0.9.1
1919
github.com/prometheus/client_golang v1.22.0
2020
github.com/stretchr/testify v1.10.0
21+
go.uber.org/mock v0.5.1
2122
go.uber.org/zap v1.27.0
2223
gopkg.in/natefinch/lumberjack.v2 v2.2.1
2324
k8s.io/api v0.33.0
@@ -73,6 +74,7 @@ require (
7374
github.com/prometheus/common v0.62.0 // indirect
7475
github.com/prometheus/procfs v0.15.1 // indirect
7576
github.com/spf13/pflag v1.0.6 // indirect
77+
github.com/stretchr/objx v0.5.2 // indirect
7678
github.com/x448/float16 v0.8.4 // indirect
7779
go.opentelemetry.io/otel v1.33.0 // indirect
7880
go.opentelemetry.io/otel/trace v1.33.0 // indirect

ray-operator/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,13 @@ func main() {
231231
exitOnError(err, "unable to start manager")
232232

233233
var rayClusterMetricCollector *metrics.RayClusterMetricCollector
234-
var rayJobMetricsCollector *metrics.RayJobMetricsCollector
234+
var rayJobMetricsManager *metrics.RayJobMetricsManager
235235
if config.EnableMetrics {
236236
rayClusterMetricCollector = metrics.NewRayClusterMetricCollector()
237-
rayJobMetricsCollector = metrics.NewRayJobMetricsCollector()
237+
rayJobMetricsManager = metrics.NewRayJobMetricsManager()
238238
ctrlmetrics.Registry.MustRegister(
239239
rayClusterMetricCollector,
240-
rayJobMetricsCollector,
240+
rayJobMetricsManager,
241241
)
242242
}
243243

@@ -254,7 +254,7 @@ func main() {
254254
"unable to create controller", "controller", "RayService")
255255

256256
rayJobOptions := ray.RayJobReconcilerOptions{
257-
RayJobMetricsCollector: rayJobMetricsCollector,
257+
RayJobMetricsManager: rayJobMetricsManager,
258258
}
259259
exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayJobOptions, config).SetupWithManager(mgr, config.ReconcileConcurrency),
260260
"unable to create controller", "controller", "RayJob")

0 commit comments

Comments
 (0)