Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
}

// GetMinReplicasFromSpec calculates the minimum expected worker replicas from the RayClusterSpec.
// This is used in SidecarMode to determine how many workers should be registered before submitting the job.
func GetMinReplicasFromSpec(rayClusterSpec *rayv1.RayClusterSpec) int32 {
if rayClusterSpec == nil {
return 0
}
count := int32(0)
for _, nodeGroup := range rayClusterSpec.WorkerGroupSpecs {
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
continue
}
minReplicas := int32(0)
if nodeGroup.MinReplicas != nil && *nodeGroup.MinReplicas > 0 {
minReplicas = *nodeGroup.MinReplicas
} else if nodeGroup.Replicas != nil && *nodeGroup.Replicas > 0 {
// Fall back to Replicas when MinReplicas is not set or is 0.
// This handles static clusters where users only set Replicas.
minReplicas = *nodeGroup.Replicas
}
count += minReplicas * nodeGroup.NumOfHosts
}
return count
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the function CalculateDesiredReplicas?

// CalculateDesiredReplicas calculate desired worker replicas at the cluster level
func CalculateDesiredReplicas(ctx context.Context, cluster *rayv1.RayCluster) int32 {
count := int32(0)
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
count += GetWorkerGroupDesiredReplicas(ctx, nodeGroup)
}
return count
}

// BuildJobSubmitCommand builds the `ray job submit` command based on submission mode.
func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
var address string
Expand Down Expand Up @@ -139,6 +163,33 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
}
cmd = append(cmd, waitLoop...)

// Wait for the expected number of worker nodes to register for the Ray cluster.
// RAY_EXPECTED_MIN_WORKERS is set by the controller based on the MinReplicas in the RayClusterSpec.
// The loop queries the Ray Dashboard API to get the number of alive nodes and
// continues until the number of alive nodes is equal to (expected_workers + 1) for head node.
// This ensures that worker pods are connected before the job is submitted otherwise
// the jobs may run on the Head node.
//
// Note: This loop will never timeout and will wait indefinitely if workers never register.
// This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field
// to enforce a maximum job execution time.
//
// The wget command includes the x-ray-authorization header if RAY_AUTH_TOKEN is set.
// This is required when Ray auth token mode is enabled, otherwise the request will fail with 401.
wgetAuthHeader := "${" + utils.RAY_AUTH_TOKEN_ENV_VAR + ":+--header \"x-ray-authorization: Bearer $" + utils.RAY_AUTH_TOKEN_ENV_VAR + "\"}"
waitForNodesLoop := []string{
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
"until", "[",
"\"$(wget " + wgetAuthHeader + " -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
"echo", strconv.Quote("All expected nodes are registered."), ";",
"fi", ";",
}
cmd = append(cmd, waitForNodesLoop...)
Comment on lines 143 to 174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use something like ray list nodes using ray's CLI?
we are removing wget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Future-Outlier - I pushed a commit that uses python's urllib instead of wget.
Is this OK instead of ray list nodes?

}

// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
Expand Down
227 changes: 225 additions & 2 deletions ray-operator/controllers/ray/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -112,7 +113,9 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
},
}

address := "http://127.0.0.1:8265"
expected := []string{
// Wait for Dashboard GCS health
"until",
fmt.Sprintf(
utils.BaseWgetHealthCommand,
Expand All @@ -121,8 +124,19 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
utils.RayDashboardGCSHealthPath,
),
">/dev/null", "2>&1", ";",
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
// Wait for expected nodes to register
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
"until", "[",
"\"$(wget ${" + utils.RAY_AUTH_TOKEN_ENV_VAR + ":+--header \"x-ray-authorization: Bearer $" + utils.RAY_AUTH_TOKEN_ENV_VAR + "\"} -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
"echo", strconv.Quote("All expected nodes are registered."), ";",
"fi", ";",
// Job submit command
"ray", "job", "submit", "--address", address,
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
"--submission-id", "testJobId",
Expand Down Expand Up @@ -240,3 +254,212 @@ func TestGetSubmitterTemplate(t *testing.T) {
template := GetSubmitterTemplate(&rayJob.Spec, &rayCluster.Spec)
assert.Equal(t, template.Spec.Containers[0].Image, rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image)
}

func TestGetMinReplicasFromSpec(t *testing.T) {
tests := []struct {
spec *rayv1.RayClusterSpec
name string
expected int32
}{
{
name: "nil spec returns 0",
spec: nil,
expected: 0,
},
{
name: "no worker groups returns 0",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{},
},
expected: 0,
},
{
name: "single worker group with minReplicas",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 1,
},
},
},
expected: 2,
},
{
name: "multiple worker groups",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 1,
},
{
MinReplicas: ptr.To[int32](3),
NumOfHosts: 1,
},
},
},
expected: 5,
},
{
name: "worker group with NumOfHosts > 1",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 2,
},
},
},
expected: 4,
},
{
name: "suspended worker group is skipped",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 1,
Suspend: ptr.To(true),
},
{
MinReplicas: ptr.To[int32](3),
NumOfHosts: 1,
},
},
},
expected: 3,
},
{
name: "nil minReplicas defaults to 0",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: nil,
NumOfHosts: 1,
},
},
},
expected: 0,
},
{
name: "NumOfHosts 0 results in 0 workers for that group",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 0,
},
},
},
expected: 0,
},
{
name: "falls back to Replicas when MinReplicas is nil",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: ptr.To[int32](3),
MinReplicas: nil,
NumOfHosts: 1,
},
},
},
expected: 3,
},
{
name: "falls back to Replicas when MinReplicas is 0",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: ptr.To[int32](5),
MinReplicas: ptr.To[int32](0),
NumOfHosts: 1,
},
},
},
expected: 5,
},
{
name: "uses MinReplicas when both are set and MinReplicas > 0",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: ptr.To[int32](5),
MinReplicas: ptr.To[int32](2),
NumOfHosts: 1,
},
},
},
expected: 2,
},
{
name: "both MinReplicas and Replicas are nil",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: nil,
MinReplicas: nil,
NumOfHosts: 1,
},
},
},
expected: 0,
},
{
name: "both MinReplicas and Replicas are 0",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: ptr.To[int32](0),
MinReplicas: ptr.To[int32](0),
NumOfHosts: 1,
},
},
},
expected: 0,
},
{
name: "mixed worker groups - some with MinReplicas some with only Replicas",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
MinReplicas: ptr.To[int32](2),
NumOfHosts: 1,
},
{
Replicas: ptr.To[int32](3),
MinReplicas: nil,
NumOfHosts: 1,
},
{
Replicas: ptr.To[int32](4),
MinReplicas: ptr.To[int32](0),
NumOfHosts: 1,
},
},
},
expected: 9, // 2 + 3 + 4
},
{
name: "NumOfHosts > 1 with Replicas fallback",
spec: &rayv1.RayClusterSpec{
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: ptr.To[int32](3),
MinReplicas: nil,
NumOfHosts: 2,
},
},
},
expected: 6, // 3 * 2
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetMinReplicasFromSpec(tt.spec)
assert.Equal(t, tt.expected, result)
})
}
}
7 changes: 7 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,13 @@ func configureSubmitterContainer(container *corev1.Container, rayJobInstance *ra
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})

// In SidecarMode, pass the expected minimum worker count so the submitter can wait for workers to register
if submissionMode == rayv1.SidecarMode && rayJobInstance.Spec.RayClusterSpec != nil {
minWorkers := common.GetMinReplicasFromSpec(rayJobInstance.Spec.RayClusterSpec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minReplicas can be 0 or a low number, I wonder if we should only check min replicas when autoscaling is enabled. If autoscaling is not enabled, we should just check replicas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change this and push as a separate commit.
I'll defer to others for what the best number of workers to wait/check for is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use this function.

// CalculateDesiredReplicas calculate desired worker replicas at the cluster level
func CalculateDesiredReplicas(ctx context.Context, cluster *rayv1.RayCluster) int32 {
count := int32(0)
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
count += GetWorkerGroupDesiredReplicas(ctx, nodeGroup)
}
return count
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since RayJob in Kubernetes Job mode waits for the RayCluster to reach the Ready state, and the RayCluster’s Ready state requires all pods to be running, including both the head pod and worker pods. The number of worker pods is determined by CalculateDesiredReplicas.

container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_EXPECTED_MIN_WORKERS, Value: strconv.Itoa(int(minWorkers))})
}

if rayClusterInstance != nil && utils.IsAuthEnabled(&rayClusterInstance.Spec) {
common.SetContainerTokenAuthEnvVars(rayClusterInstance.Name, container)
}
Expand Down
5 changes: 3 additions & 2 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ const (

// Environment variables for RayJob submitter Kubernetes Job.
// Example: ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
RAY_EXPECTED_MIN_WORKERS = "RAY_EXPECTED_MIN_WORKERS"
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"

// Environment variables for Ray Autoscaler V2.
// The value of RAY_CLOUD_INSTANCE_ID is the Pod name for Autoscaler V2 alpha. This may change in the future.
Expand Down
Loading
Loading