Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,39 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
}
cmd = append(cmd, waitLoop...)

// Wait for the expected number of worker nodes to register for the Ray cluster.
// RAY_EXPECTED_WORKERS is set by the controller based on CalculateDesiredReplicas.
// The loop queries the Ray Dashboard API to get the number of alive nodes and
// continues until the number of alive nodes is equal to (expected_workers + 1) for head node.
// This ensures that worker pods are connected before the job is submitted otherwise
// the jobs may run on the Head node.
//
// Note: This loop will never timeout and will wait indefinitely if workers never register.
// This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field
// to enforce a maximum job execution time.
//
// The Python script uses urllib (stdlib) to query the Ray Dashboard API.
// It includes the x-ray-authorization header if RAY_AUTH_TOKEN is set.
// This is required when Ray auth token mode is enabled, otherwise the request will fail with 401.
pythonNodeCountScript := "import urllib.request,json,os; " +
"req=urllib.request.Request('" + address + "/nodes?view=summary'); " +
"t=os.environ.get('" + utils.RAY_AUTH_TOKEN_ENV_VAR + "',''); " +
"t and req.add_header('x-ray-authorization','Bearer '+t); " +
"d=json.loads(urllib.request.urlopen(req,timeout=5).read()); " +
"print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state')=='ALIVE']))"
waitForNodesLoop := []string{
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_WORKERS + " + 1))", ";",
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_WORKERS + " workers) to register..."), ";",
"until", "[",
"\"$(python3 -c \"" + pythonNodeCountScript + "\" 2>/dev/null || echo 0)\"",
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
"echo", strconv.Quote("All expected nodes are registered."), ";",
"fi", ";",
}
cmd = append(cmd, waitForNodesLoop...)
}

// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
Expand Down
17 changes: 15 additions & 2 deletions ray-operator/controllers/ray/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
},
}

address := "http://127.0.0.1:8265"
expected := []string{
// Wait for Dashboard GCS health
"until",
fmt.Sprintf(
utils.BaseWgetHealthCommand,
Expand All @@ -121,8 +123,19 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
utils.RayDashboardGCSHealthPath,
),
">/dev/null", "2>&1", ";",
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
// Wait for expected nodes to register
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_WORKERS + " + 1))", ";",
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_WORKERS + " workers) to register..."), ";",
"until", "[",
"\"$(python3 -c \"import urllib.request,json,os; req=urllib.request.Request('" + address + "/nodes?view=summary'); t=os.environ.get('" + utils.RAY_AUTH_TOKEN_ENV_VAR + "',''); t and req.add_header('x-ray-authorization','Bearer '+t); d=json.loads(urllib.request.urlopen(req,timeout=5).read()); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
"echo", strconv.Quote("All expected nodes are registered."), ";",
"fi", ";",
// Job submit command
"ray", "job", "submit", "--address", address,
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
"--submission-id", "testJobId",
Expand Down
25 changes: 16 additions & 9 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func (r *RayJobReconciler) createK8sJobIfNeed(ctx context.Context, rayJobInstanc
namespacedName := common.RayJobK8sJobNamespacedName(rayJobInstance)
if err := r.Client.Get(ctx, namespacedName, job); err != nil {
if errors.IsNotFound(err) {
submitterTemplate, err := getSubmitterTemplate(rayJobInstance, rayClusterInstance)
submitterTemplate, err := getSubmitterTemplate(ctx, rayJobInstance, rayClusterInstance)
if err != nil {
return err
}
Expand All @@ -567,30 +567,30 @@ func (r *RayJobReconciler) createK8sJobIfNeed(ctx context.Context, rayJobInstanc
}

// getSubmitterTemplate builds the submitter pod template for the Ray job.
func getSubmitterTemplate(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.PodTemplateSpec, error) {
func getSubmitterTemplate(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.PodTemplateSpec, error) {
// Set the default value for the optional field SubmitterPodTemplate if not provided.
submitterTemplate := common.GetSubmitterTemplate(&rayJobInstance.Spec, &rayClusterInstance.Spec)

if err := configureSubmitterContainer(&submitterTemplate.Spec.Containers[utils.RayContainerIndex], rayJobInstance, rayClusterInstance, rayv1.K8sJobMode); err != nil {
if err := configureSubmitterContainer(ctx, &submitterTemplate.Spec.Containers[utils.RayContainerIndex], rayJobInstance, rayClusterInstance, rayv1.K8sJobMode); err != nil {
return corev1.PodTemplateSpec{}, err
}

return submitterTemplate, nil
}

// getSubmitterContainer builds the submitter container for the Ray job Sidecar mode.
func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.Container, error) {
func getSubmitterContainer(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.Container, error) {
submitterContainer := common.GetDefaultSubmitterContainer(&rayClusterInstance.Spec)

if err := configureSubmitterContainer(&submitterContainer, rayJobInstance, rayClusterInstance, rayv1.SidecarMode); err != nil {
if err := configureSubmitterContainer(ctx, &submitterContainer, rayJobInstance, rayClusterInstance, rayv1.SidecarMode); err != nil {
return corev1.Container{}, err
}

return submitterContainer, nil
}

// pass the RayCluster instance for cluster selector case
func configureSubmitterContainer(container *corev1.Container, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster, submissionMode rayv1.JobSubmissionMode) error {
func configureSubmitterContainer(ctx context.Context, container *corev1.Container, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster, submissionMode rayv1.JobSubmissionMode) error {
// If the command in the submitter container manifest isn't set, use the default command.
jobCmd, err := common.BuildJobSubmitCommand(rayJobInstance, submissionMode)
if err != nil {
Expand All @@ -613,6 +613,13 @@ func configureSubmitterContainer(container *corev1.Container, rayJobInstance *ra
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})

// In SidecarMode, pass the expected worker count so the submitter can wait for workers to register
if submissionMode == rayv1.SidecarMode && rayClusterInstance != nil {
expectedWorkers := utils.CalculateDesiredReplicas(ctx, rayClusterInstance)
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_EXPECTED_WORKERS, Value: strconv.Itoa(int(expectedWorkers))})
}

if rayClusterInstance != nil && utils.IsAuthEnabled(&rayClusterInstance.Spec) {
common.SetContainerTokenAuthEnvVars(rayClusterInstance.Name, container)
}
Expand Down Expand Up @@ -912,7 +919,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
}

logger.Info("RayCluster not found, creating RayCluster!", "RayCluster", rayClusterNamespacedName)
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterNamespacedName.Name)
rayClusterInstance, err = r.constructRayClusterForRayJob(ctx, rayJobInstance, rayClusterNamespacedName.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -946,7 +953,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
return rayClusterInstance, nil
}

func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.RayJob, rayClusterName string) (*rayv1.RayCluster, error) {
func (r *RayJobReconciler) constructRayClusterForRayJob(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterName string) (*rayv1.RayCluster, error) {
labels := make(map[string]string, len(rayJobInstance.Labels))
maps.Copy(labels, rayJobInstance.Labels)
labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobInstance.Name
Expand Down Expand Up @@ -976,7 +983,7 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra

// Inject a submitter container into the head Pod in SidecarMode.
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
sidecar, err := getSubmitterContainer(rayJobInstance, rayCluster)
sidecar, err := getSubmitterContainer(ctx, rayJobInstance, rayCluster)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,34 +161,35 @@ func TestGetSubmitterTemplate(t *testing.T) {
}

// Test 1: User provided template with command
submitterTemplate, err := getSubmitterTemplate(rayJobInstanceWithTemplate, rayClusterInstance)
ctx := context.Background()
submitterTemplate, err := getSubmitterTemplate(ctx, rayJobInstanceWithTemplate, rayClusterInstance)
require.NoError(t, err)
assert.Equal(t, "user-command", submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command[0])

// Test 2: User provided template without command
rayJobInstanceWithTemplate.Spec.SubmitterPodTemplate.Spec.Containers[utils.RayContainerIndex].Command = []string{}
submitterTemplate, err = getSubmitterTemplate(rayJobInstanceWithTemplate, rayClusterInstance)
submitterTemplate, err = getSubmitterTemplate(ctx, rayJobInstanceWithTemplate, rayClusterInstance)
require.NoError(t, err)
assert.Equal(t, []string{"/bin/bash", "-ce", "--"}, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command)
assert.Equal(t, []string{"if ! ray job status --address http://test-url test-job-id >/dev/null 2>&1 ; then ray job submit --address http://test-url --no-wait --submission-id test-job-id -- echo no quote 'single quote' \"double quote\" ; fi ; ray job logs --address http://test-url --follow test-job-id"}, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Args)

// Test 3: User did not provide template, should use the image of the Ray Head
submitterTemplate, err = getSubmitterTemplate(rayJobInstanceWithoutTemplate, rayClusterInstance)
submitterTemplate, err = getSubmitterTemplate(ctx, rayJobInstanceWithoutTemplate, rayClusterInstance)
require.NoError(t, err)
assert.Equal(t, []string{"/bin/bash", "-ce", "--"}, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command)
assert.Equal(t, []string{"if ! ray job status --address http://test-url test-job-id >/dev/null 2>&1 ; then ray job submit --address http://test-url --no-wait --submission-id test-job-id -- echo no quote 'single quote' \"double quote\" ; fi ; ray job logs --address http://test-url --follow test-job-id"}, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Args)
assert.Equal(t, "rayproject/ray:custom-version", submitterTemplate.Spec.Containers[utils.RayContainerIndex].Image)

// Test 4: Check default PYTHONUNBUFFERED setting
submitterTemplate, err = getSubmitterTemplate(rayJobInstanceWithoutTemplate, rayClusterInstance)
submitterTemplate, err = getSubmitterTemplate(ctx, rayJobInstanceWithoutTemplate, rayClusterInstance)
require.NoError(t, err)

envVar, found := utils.EnvVarByName(PythonUnbufferedEnvVarName, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env)
assert.True(t, found)
assert.Equal(t, "1", envVar.Value)

// Test 5: Check default RAY_DASHBOARD_ADDRESS env var
submitterTemplate, err = getSubmitterTemplate(rayJobInstanceWithTemplate, rayClusterInstance)
submitterTemplate, err = getSubmitterTemplate(ctx, rayJobInstanceWithTemplate, rayClusterInstance)
require.NoError(t, err)

envVar, found = utils.EnvVarByName(utils.RAY_DASHBOARD_ADDRESS, submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env)
Expand Down
1 change: 1 addition & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ const (
// Environment variables for RayJob submitter Kubernetes Job.
// Example: ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
RAY_EXPECTED_WORKERS = "RAY_EXPECTED_WORKERS"
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"

// Environment variables for Ray Autoscaler V2.
Expand Down
57 changes: 57 additions & 0 deletions ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,61 @@ env_vars:

LogWithTimestamp(test.T(), "RayJob %s/%s completed successfully with auth token", rayJob.Namespace, rayJob.Name)
})

test.T().Run("SidecarMode waits for workers before job submission", func(_ *testing.T) {
// This test verifies that SidecarMode waits for workers to register before submitting
// the job. We create a RayJob with a worker that has a slow init container (30s delay).
// The inline Python script verifies that workers are available when it runs. If the
// sidecar submitted the job before workers were ready, the job would fail because no
// workers would be found.

// Create a worker template with a slow init container to simulate slow worker startup
workerTemplate := WorkerPodTemplateApplyConfiguration()
workerTemplate.Spec.WithInitContainers(corev1ac.Container().
WithName("init-delay").
WithImage(GetRayImage()).
WithCommand("bash", "-c", "echo 'Simulating slow worker startup...'; sleep 30"))

// Simple inline Python script that verifies workers are available
// Uses semicolons to avoid multi-line issues with shell escaping
verifyWorkersScript := `import ray; ray.init(); nodes = ray.nodes(); worker_count = sum(1 for n in nodes if n["Alive"] and not n.get("Resources", {}).get("node:__internal_head__")); print("Worker nodes:", worker_count); assert worker_count >= 1`

rayJobAC := rayv1ac.RayJob("wait-for-workers", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithSubmissionMode(rayv1.SidecarMode).
WithEntrypoint("python -c '" + verifyWorkersScript + "'").
WithShutdownAfterJobFinishes(true).
WithRayClusterSpec(rayv1ac.RayClusterSpec().
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(HeadPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(1).
WithMinReplicas(1).
WithMaxReplicas(1).
WithGroupName("slow-worker").
WithRayStartParams(map[string]string{"num-cpus": "1"}).
WithTemplate(workerTemplate))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

// Wait for the RayJob to complete. The job will fail if workers aren't available.
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete (this may take ~40s due to worker init delay)", rayJob.Namespace, rayJob.Name)
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))

// Assert the RayJob has completed successfully
// If the sidecar didn't wait for workers, the job would fail with assertion error
g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))

// And the RayJob deployment status is updated accordingly
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))

LogWithTimestamp(test.T(), "RayJob %s/%s completed successfully - workers were available before job submission", rayJob.Namespace, rayJob.Name)
})
}
Loading