Skip to content

Commit 2600631

Browse files
committed
Wait for workers before submitted jobs in sidecar mode
Signed-off-by: Mark Rossett <marosset@microsoft.com>
1 parent 910223a commit 2600631

File tree

5 files changed

+242
-4
lines changed

5 files changed

+242
-4
lines changed

ray-operator/controllers/ray/common/job.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,26 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
8181
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
8282
}
8383

84+
// GetMinReplicasFromSpec calculates the minimum expected worker replicas from the RayClusterSpec.
85+
// This is used in SidecarMode to determine how many workers should be registered before submitting the job.
86+
func GetMinReplicasFromSpec(rayClusterSpec *rayv1.RayClusterSpec) int32 {
87+
if rayClusterSpec == nil {
88+
return 0
89+
}
90+
count := int32(0)
91+
for _, nodeGroup := range rayClusterSpec.WorkerGroupSpecs {
92+
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
93+
continue
94+
}
95+
minReplicas := int32(0)
96+
if nodeGroup.MinReplicas != nil {
97+
minReplicas = *nodeGroup.MinReplicas
98+
}
99+
count += minReplicas * nodeGroup.NumOfHosts
100+
}
101+
return count
102+
}
103+
84104
// BuildJobSubmitCommand builds the `ray job submit` command based on submission mode.
85105
func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
86106
var address string
@@ -139,6 +159,29 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
139159
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
140160
}
141161
cmd = append(cmd, waitLoop...)
162+
163+
// Wait for the expected number of worker nodes to register for the Ray cluster.
164+
// RAY_EXPECTED_MIN_WORKERS is set by the controller based on the MinReplicas in the RayClusterSpec.
165+
// The loop queries the Ray Dashboard API to get the number of alive nodes and
166+
// continues until the number of alive nodes is equal to (expected_workers + 1) for head node.
167+
// This ensures that worker pods are connected before the job is submitted otherwise
168+
// the jobs may run on the Head node.
169+
//
170+
// Note: This loop will never timeout and will wait indefinately if workers never register.
171+
// This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field
172+
// to enforce a maximum job execution time.
173+
waitForNodesLoop := []string{
174+
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
175+
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
176+
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
177+
"until", "[",
178+
"\"$(wget -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
179+
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
180+
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
181+
"echo", strconv.Quote("All expected nodes are registered."), ";",
182+
"fi", ";",
183+
}
184+
cmd = append(cmd, waitForNodesLoop...)
142185
}
143186

144187
// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,

ray-operator/controllers/ray/common/job_test.go

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
112112
},
113113
}
114114

115+
address := "http://127.0.0.1:8265"
115116
expected := []string{
117+
// Wait for Dashboard GCS health
116118
"until",
117119
fmt.Sprintf(
118120
utils.BaseWgetHealthCommand,
@@ -121,8 +123,19 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
121123
utils.RayDashboardGCSHealthPath,
122124
),
123125
">/dev/null", "2>&1", ";",
124-
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
125-
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
126+
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
127+
// Wait for expected nodes to register
128+
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
129+
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
130+
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
131+
"until", "[",
132+
"\"$(wget -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
133+
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
134+
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
135+
"echo", strconv.Quote("All expected nodes are registered."), ";",
136+
"fi", ";",
137+
// Job submit command
138+
"ray", "job", "submit", "--address", address,
126139
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
127140
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
128141
"--submission-id", "testJobId",
@@ -240,3 +253,120 @@ func TestGetSubmitterTemplate(t *testing.T) {
240253
template := GetSubmitterTemplate(&rayJob.Spec, &rayCluster.Spec)
241254
assert.Equal(t, template.Spec.Containers[0].Image, rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image)
242255
}
256+
257+
func TestGetMinReplicasFromSpec(t *testing.T) {
258+
tests := []struct {
259+
spec *rayv1.RayClusterSpec
260+
name string
261+
expected int32
262+
}{
263+
{
264+
name: "nil spec returns 0",
265+
spec: nil,
266+
expected: 0,
267+
},
268+
{
269+
name: "no worker groups returns 0",
270+
spec: &rayv1.RayClusterSpec{
271+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{},
272+
},
273+
expected: 0,
274+
},
275+
{
276+
name: "single worker group with minReplicas",
277+
spec: &rayv1.RayClusterSpec{
278+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
279+
{
280+
MinReplicas: ptrInt32(2),
281+
NumOfHosts: 1,
282+
},
283+
},
284+
},
285+
expected: 2,
286+
},
287+
{
288+
name: "multiple worker groups",
289+
spec: &rayv1.RayClusterSpec{
290+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
291+
{
292+
MinReplicas: ptrInt32(2),
293+
NumOfHosts: 1,
294+
},
295+
{
296+
MinReplicas: ptrInt32(3),
297+
NumOfHosts: 1,
298+
},
299+
},
300+
},
301+
expected: 5,
302+
},
303+
{
304+
name: "worker group with NumOfHosts > 1",
305+
spec: &rayv1.RayClusterSpec{
306+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
307+
{
308+
MinReplicas: ptrInt32(2),
309+
NumOfHosts: 2,
310+
},
311+
},
312+
},
313+
expected: 4,
314+
},
315+
{
316+
name: "suspended worker group is skipped",
317+
spec: &rayv1.RayClusterSpec{
318+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
319+
{
320+
MinReplicas: ptrInt32(2),
321+
NumOfHosts: 1,
322+
Suspend: ptrBool(true),
323+
},
324+
{
325+
MinReplicas: ptrInt32(3),
326+
NumOfHosts: 1,
327+
},
328+
},
329+
},
330+
expected: 3,
331+
},
332+
{
333+
name: "nil minReplicas defaults to 0",
334+
spec: &rayv1.RayClusterSpec{
335+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
336+
{
337+
MinReplicas: nil,
338+
NumOfHosts: 1,
339+
},
340+
},
341+
},
342+
expected: 0,
343+
},
344+
{
345+
name: "NumOfHosts 0 results in 0 workers for that group",
346+
spec: &rayv1.RayClusterSpec{
347+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
348+
{
349+
MinReplicas: ptrInt32(2),
350+
NumOfHosts: 0,
351+
},
352+
},
353+
},
354+
expected: 0,
355+
},
356+
}
357+
358+
for _, tt := range tests {
359+
t.Run(tt.name, func(t *testing.T) {
360+
result := GetMinReplicasFromSpec(tt.spec)
361+
assert.Equal(t, tt.expected, result)
362+
})
363+
}
364+
}
365+
366+
func ptrInt32(v int32) *int32 {
367+
return &v
368+
}
369+
370+
func ptrBool(v bool) *bool {
371+
return &v
372+
}

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,13 @@ func configureSubmitterContainer(container *corev1.Container, rayJobInstance *ra
613613
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
614614
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
615615
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})
616+
617+
// In SidecarMode, pass the expected minimum worker count so the submitter can wait for workers to register
618+
if submissionMode == rayv1.SidecarMode && rayJobInstance.Spec.RayClusterSpec != nil {
619+
minWorkers := common.GetMinReplicasFromSpec(rayJobInstance.Spec.RayClusterSpec)
620+
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_EXPECTED_MIN_WORKERS, Value: strconv.Itoa(int(minWorkers))})
621+
}
622+
616623
if rayClusterInstance != nil && utils.IsAuthEnabled(&rayClusterInstance.Spec) {
617624
common.SetContainerTokenAuthEnvVars(rayClusterInstance.Name, container)
618625
}

ray-operator/controllers/ray/utils/constant.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ const (
151151

152152
// Environment variables for RayJob submitter Kubernetes Job.
153153
// Example: ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
154-
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
155-
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"
154+
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
155+
RAY_EXPECTED_MIN_WORKERS = "RAY_EXPECTED_MIN_WORKERS"
156+
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"
156157

157158
// Environment variables for Ray Autoscaler V2.
158159
// The value of RAY_CLOUD_INSTANCE_ID is the Pod name for Autoscaler V2 alpha. This may change in the future.

ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,4 +290,61 @@ env_vars:
290290

291291
LogWithTimestamp(test.T(), "RayJob %s/%s completed successfully with auth token", rayJob.Namespace, rayJob.Name)
292292
})
293+
294+
test.T().Run("SidecarMode waits for workers before job submission", func(_ *testing.T) {
295+
// This test verifies that SidecarMode waits for workers to register before submitting
296+
// the job. We create a RayJob with a worker that has a slow init container (30s delay).
297+
// The inline Python script verifies that workers are available when it runs. If the
298+
// sidecar submitted the job before workers were ready, the job would fail because no
299+
// workers would be found.
300+
301+
// Create a worker template with a slow init container to simulate slow worker startup
302+
workerTemplate := WorkerPodTemplateApplyConfiguration()
303+
workerTemplate.Spec.WithInitContainers(corev1ac.Container().
304+
WithName("init-delay").
305+
WithImage(GetRayImage()).
306+
WithCommand("bash", "-c", "echo 'Simulating slow worker startup...'; sleep 30"))
307+
308+
// Simple inline Python script that verifies workers are available
309+
// Uses semicolons to avoid multi-line issues with shell escaping
310+
verifyWorkersScript := `import ray; ray.init(); nodes = ray.nodes(); worker_count = sum(1 for n in nodes if n["Alive"] and not n.get("Resources", {}).get("node:__internal_head__")); print("Worker nodes:", worker_count); assert worker_count >= 1`
311+
312+
rayJobAC := rayv1ac.RayJob("wait-for-workers", namespace.Name).
313+
WithSpec(rayv1ac.RayJobSpec().
314+
WithSubmissionMode(rayv1.SidecarMode).
315+
WithEntrypoint("python -c '" + verifyWorkersScript + "'").
316+
WithShutdownAfterJobFinishes(true).
317+
WithRayClusterSpec(rayv1ac.RayClusterSpec().
318+
WithRayVersion(GetRayVersion()).
319+
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
320+
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
321+
WithTemplate(HeadPodTemplateApplyConfiguration())).
322+
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
323+
WithReplicas(1).
324+
WithMinReplicas(1).
325+
WithMaxReplicas(1).
326+
WithGroupName("slow-worker").
327+
WithRayStartParams(map[string]string{"num-cpus": "1"}).
328+
WithTemplate(workerTemplate))))
329+
330+
rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
331+
g.Expect(err).NotTo(HaveOccurred())
332+
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
333+
334+
// Wait for the RayJob to complete. The job will fail if workers aren't available.
335+
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete (this may take ~40s due to worker init delay)", rayJob.Namespace, rayJob.Name)
336+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
337+
Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))
338+
339+
// Assert the RayJob has completed successfully
340+
// If the sidecar didn't wait for workers, the job would fail with assertion error
341+
g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
342+
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))
343+
344+
// And the RayJob deployment status is updated accordingly
345+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)).
346+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))
347+
348+
LogWithTimestamp(test.T(), "RayJob %s/%s completed successfully - workers were available before job submission", rayJob.Namespace, rayJob.Name)
349+
})
293350
}

0 commit comments

Comments
 (0)