Skip to content

Commit a665203

Browse files
committed
Wait for workers before submitted jobs in sidecar mode
Signed-off-by: Mark Rossett <marosset@microsoft.com>
1 parent 9bd16f2 commit a665203

File tree

5 files changed

+346
-4
lines changed

5 files changed

+346
-4
lines changed

ray-operator/controllers/ray/common/job.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,30 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
8181
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
8282
}
8383

84+
// GetMinReplicasFromSpec calculates the minimum expected worker replicas from the RayClusterSpec.
85+
// This is used in SidecarMode to determine how many workers should be registered before submitting the job.
86+
func GetMinReplicasFromSpec(rayClusterSpec *rayv1.RayClusterSpec) int32 {
87+
if rayClusterSpec == nil {
88+
return 0
89+
}
90+
count := int32(0)
91+
for _, nodeGroup := range rayClusterSpec.WorkerGroupSpecs {
92+
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
93+
continue
94+
}
95+
minReplicas := int32(0)
96+
if nodeGroup.MinReplicas != nil && *nodeGroup.MinReplicas > 0 {
97+
minReplicas = *nodeGroup.MinReplicas
98+
} else if nodeGroup.Replicas != nil && *nodeGroup.Replicas > 0 {
99+
// Fall back to Replicas when MinReplicas is not set or is 0.
100+
// This handles static clusters where users only set Replicas.
101+
minReplicas = *nodeGroup.Replicas
102+
}
103+
count += minReplicas * nodeGroup.NumOfHosts
104+
}
105+
return count
106+
}
107+
84108
// BuildJobSubmitCommand builds the `ray job submit` command based on submission mode.
85109
func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
86110
var address string
@@ -139,6 +163,29 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
139163
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
140164
}
141165
cmd = append(cmd, waitLoop...)
166+
167+
// Wait for the expected number of worker nodes to register for the Ray cluster.
168+
// RAY_EXPECTED_MIN_WORKERS is set by the controller based on the MinReplicas in the RayClusterSpec.
169+
// The loop queries the Ray Dashboard API to get the number of alive nodes and
170+
// continues until the number of alive nodes is equal to (expected_workers + 1) for head node.
171+
// This ensures that worker pods are connected before the job is submitted otherwise
172+
// the jobs may run on the Head node.
173+
//
174+
// Note: This loop will never timeout and will wait indefinitely if workers never register.
175+
// This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field
176+
// to enforce a maximum job execution time.
177+
waitForNodesLoop := []string{
178+
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
179+
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
180+
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
181+
"until", "[",
182+
"\"$(wget -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
183+
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
184+
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
185+
"echo", strconv.Quote("All expected nodes are registered."), ";",
186+
"fi", ";",
187+
}
188+
cmd = append(cmd, waitForNodesLoop...)
142189
}
143190

144191
// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,

ray-operator/controllers/ray/common/job_test.go

Lines changed: 232 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
112112
},
113113
}
114114

115+
address := "http://127.0.0.1:8265"
115116
expected := []string{
117+
// Wait for Dashboard GCS health
116118
"until",
117119
fmt.Sprintf(
118120
utils.BaseWgetHealthCommand,
@@ -121,8 +123,19 @@ func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
121123
utils.RayDashboardGCSHealthPath,
122124
),
123125
">/dev/null", "2>&1", ";",
124-
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
125-
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
126+
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
127+
// Wait for expected nodes to register
128+
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
129+
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
130+
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
131+
"until", "[",
132+
"\"$(wget -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
133+
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
134+
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
135+
"echo", strconv.Quote("All expected nodes are registered."), ";",
136+
"fi", ";",
137+
// Job submit command
138+
"ray", "job", "submit", "--address", address,
126139
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
127140
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
128141
"--submission-id", "testJobId",
@@ -240,3 +253,220 @@ func TestGetSubmitterTemplate(t *testing.T) {
240253
template := GetSubmitterTemplate(&rayJob.Spec, &rayCluster.Spec)
241254
assert.Equal(t, template.Spec.Containers[0].Image, rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image)
242255
}
256+
257+
func TestGetMinReplicasFromSpec(t *testing.T) {
258+
tests := []struct {
259+
spec *rayv1.RayClusterSpec
260+
name string
261+
expected int32
262+
}{
263+
{
264+
name: "nil spec returns 0",
265+
spec: nil,
266+
expected: 0,
267+
},
268+
{
269+
name: "no worker groups returns 0",
270+
spec: &rayv1.RayClusterSpec{
271+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{},
272+
},
273+
expected: 0,
274+
},
275+
{
276+
name: "single worker group with minReplicas",
277+
spec: &rayv1.RayClusterSpec{
278+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
279+
{
280+
MinReplicas: ptrInt32(2),
281+
NumOfHosts: 1,
282+
},
283+
},
284+
},
285+
expected: 2,
286+
},
287+
{
288+
name: "multiple worker groups",
289+
spec: &rayv1.RayClusterSpec{
290+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
291+
{
292+
MinReplicas: ptrInt32(2),
293+
NumOfHosts: 1,
294+
},
295+
{
296+
MinReplicas: ptrInt32(3),
297+
NumOfHosts: 1,
298+
},
299+
},
300+
},
301+
expected: 5,
302+
},
303+
{
304+
name: "worker group with NumOfHosts > 1",
305+
spec: &rayv1.RayClusterSpec{
306+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
307+
{
308+
MinReplicas: ptrInt32(2),
309+
NumOfHosts: 2,
310+
},
311+
},
312+
},
313+
expected: 4,
314+
},
315+
{
316+
name: "suspended worker group is skipped",
317+
spec: &rayv1.RayClusterSpec{
318+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
319+
{
320+
MinReplicas: ptrInt32(2),
321+
NumOfHosts: 1,
322+
Suspend: ptrBool(true),
323+
},
324+
{
325+
MinReplicas: ptrInt32(3),
326+
NumOfHosts: 1,
327+
},
328+
},
329+
},
330+
expected: 3,
331+
},
332+
{
333+
name: "nil minReplicas defaults to 0",
334+
spec: &rayv1.RayClusterSpec{
335+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
336+
{
337+
MinReplicas: nil,
338+
NumOfHosts: 1,
339+
},
340+
},
341+
},
342+
expected: 0,
343+
},
344+
{
345+
name: "NumOfHosts 0 results in 0 workers for that group",
346+
spec: &rayv1.RayClusterSpec{
347+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
348+
{
349+
MinReplicas: ptrInt32(2),
350+
NumOfHosts: 0,
351+
},
352+
},
353+
},
354+
expected: 0,
355+
},
356+
{
357+
name: "falls back to Replicas when MinReplicas is nil",
358+
spec: &rayv1.RayClusterSpec{
359+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
360+
{
361+
Replicas: ptrInt32(3),
362+
MinReplicas: nil,
363+
NumOfHosts: 1,
364+
},
365+
},
366+
},
367+
expected: 3,
368+
},
369+
{
370+
name: "falls back to Replicas when MinReplicas is 0",
371+
spec: &rayv1.RayClusterSpec{
372+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
373+
{
374+
Replicas: ptrInt32(5),
375+
MinReplicas: ptrInt32(0),
376+
NumOfHosts: 1,
377+
},
378+
},
379+
},
380+
expected: 5,
381+
},
382+
{
383+
name: "uses MinReplicas when both are set and MinReplicas > 0",
384+
spec: &rayv1.RayClusterSpec{
385+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
386+
{
387+
Replicas: ptrInt32(5),
388+
MinReplicas: ptrInt32(2),
389+
NumOfHosts: 1,
390+
},
391+
},
392+
},
393+
expected: 2,
394+
},
395+
{
396+
name: "both MinReplicas and Replicas are nil",
397+
spec: &rayv1.RayClusterSpec{
398+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
399+
{
400+
Replicas: nil,
401+
MinReplicas: nil,
402+
NumOfHosts: 1,
403+
},
404+
},
405+
},
406+
expected: 0,
407+
},
408+
{
409+
name: "both MinReplicas and Replicas are 0",
410+
spec: &rayv1.RayClusterSpec{
411+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
412+
{
413+
Replicas: ptrInt32(0),
414+
MinReplicas: ptrInt32(0),
415+
NumOfHosts: 1,
416+
},
417+
},
418+
},
419+
expected: 0,
420+
},
421+
{
422+
name: "mixed worker groups - some with MinReplicas some with only Replicas",
423+
spec: &rayv1.RayClusterSpec{
424+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
425+
{
426+
MinReplicas: ptrInt32(2),
427+
NumOfHosts: 1,
428+
},
429+
{
430+
Replicas: ptrInt32(3),
431+
MinReplicas: nil,
432+
NumOfHosts: 1,
433+
},
434+
{
435+
Replicas: ptrInt32(4),
436+
MinReplicas: ptrInt32(0),
437+
NumOfHosts: 1,
438+
},
439+
},
440+
},
441+
expected: 9, // 2 + 3 + 4
442+
},
443+
{
444+
name: "NumOfHosts > 1 with Replicas fallback",
445+
spec: &rayv1.RayClusterSpec{
446+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
447+
{
448+
Replicas: ptrInt32(3),
449+
MinReplicas: nil,
450+
NumOfHosts: 2,
451+
},
452+
},
453+
},
454+
expected: 6, // 3 * 2
455+
},
456+
}
457+
458+
for _, tt := range tests {
459+
t.Run(tt.name, func(t *testing.T) {
460+
result := GetMinReplicasFromSpec(tt.spec)
461+
assert.Equal(t, tt.expected, result)
462+
})
463+
}
464+
}
465+
466+
func ptrInt32(v int32) *int32 {
467+
return &v
468+
}
469+
470+
func ptrBool(v bool) *bool {
471+
return &v
472+
}

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,13 @@ func configureSubmitterContainer(container *corev1.Container, rayJobInstance *ra
613613
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
614614
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
615615
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})
616+
617+
// In SidecarMode, pass the expected minimum worker count so the submitter can wait for workers to register
618+
if submissionMode == rayv1.SidecarMode && rayJobInstance.Spec.RayClusterSpec != nil {
619+
minWorkers := common.GetMinReplicasFromSpec(rayJobInstance.Spec.RayClusterSpec)
620+
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_EXPECTED_MIN_WORKERS, Value: strconv.Itoa(int(minWorkers))})
621+
}
622+
616623
if rayClusterInstance != nil && utils.IsAuthEnabled(&rayClusterInstance.Spec) {
617624
common.SetContainerTokenAuthEnvVars(rayClusterInstance.Name, container)
618625
}

ray-operator/controllers/ray/utils/constant.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ const (
151151

152152
// Environment variables for RayJob submitter Kubernetes Job.
153153
// Example: ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
154-
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
155-
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"
154+
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
155+
RAY_EXPECTED_MIN_WORKERS = "RAY_EXPECTED_MIN_WORKERS"
156+
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"
156157

157158
// Environment variables for Ray Autoscaler V2.
158159
// The value of RAY_CLOUD_INSTANCE_ID is the Pod name for Autoscaler V2 alpha. This may change in the future.

0 commit comments

Comments
 (0)