[RayJob] Wait for workers before submitted jobs in sidecar mode#4429
[RayJob] Wait for workers before submitted jobs in sidecar mode#4429marosset wants to merge 3 commits intoray-project:masterfrom
Conversation
2600631 to
4d404a3
Compare
4d404a3 to
a665203
Compare
Future-Outlier
left a comment
There was a problem hiding this comment.
Hi, can you help me merge master?
a665203 to
30bfb39
Compare
rebased - thanks! |
Future-Outlier
left a comment
There was a problem hiding this comment.
cc @fsNick @troychiu @CheyuWu @machichima @400Ping to help review
30bfb39 to
a995d8b
Compare
Signed-off-by: Mark Rossett <marosset@microsoft.com>
a995d8b to
05c0fec
Compare
There was a problem hiding this comment.
- if you want the job runs in Ray cluster's worker node, I think you can just set rayStartParams's
num-cpus: "0"like this, (you can try this using master branch)
rayClusterSpec:
rayVersion: '2.52.0'
headGroupSpec:
rayStartParams:
num-cpus: "0"- I think make sidecar mode's behavior similar to k8s job mode is a more correct behavior, so I'm willing to accept this PR.
|
|
||
| // In SidecarMode, pass the expected minimum worker count so the submitter can wait for workers to register | ||
| if submissionMode == rayv1.SidecarMode && rayJobInstance.Spec.RayClusterSpec != nil { | ||
| minWorkers := common.GetMinReplicasFromSpec(rayJobInstance.Spec.RayClusterSpec) |
There was a problem hiding this comment.
minReplicas can be 0 or a low number, I wonder if we should only check min replicas when autoscaling is enabled. If autoscaling is not enabled, we should just check replicas.
There was a problem hiding this comment.
I can change this and push as a separate commit.
I'll defer to others for what the best number of workers to wait/check for is
There was a problem hiding this comment.
we can use this function.
kuberay/ray-operator/controllers/ray/utils/util.go
Lines 410 to 418 in 79b5c30
There was a problem hiding this comment.
Since RayJob in Kubernetes Job mode waits for the RayCluster to reach the Ready state, and the RayCluster’s Ready state requires all pods to be running, including both the head pod and worker pods. The number of worker pods is determined by CalculateDesiredReplicas.
Future-Outlier
left a comment
There was a problem hiding this comment.
In SideCarMode, the submitted can submit jobs before worker are connected to the Ray cluster causing jobs to run on the head node.
if the PR's goal is to wait for worker started, you can use config like this, then ray's scheduler will not schedule jobs on the head node.
rayClusterSpec:
rayVersion: '2.52.0'
headGroupSpec:
rayStartParams:
num-cpus: "0"| // GetMinReplicasFromSpec calculates the minimum expected worker replicas from the RayClusterSpec. | ||
| // This is used in SidecarMode to determine how many workers should be registered before submitting the job. | ||
| func GetMinReplicasFromSpec(rayClusterSpec *rayv1.RayClusterSpec) int32 { | ||
| if rayClusterSpec == nil { | ||
| return 0 | ||
| } | ||
| count := int32(0) | ||
| for _, nodeGroup := range rayClusterSpec.WorkerGroupSpecs { | ||
| if nodeGroup.Suspend != nil && *nodeGroup.Suspend { | ||
| continue | ||
| } | ||
| minReplicas := int32(0) | ||
| if nodeGroup.MinReplicas != nil && *nodeGroup.MinReplicas > 0 { | ||
| minReplicas = *nodeGroup.MinReplicas | ||
| } else if nodeGroup.Replicas != nil && *nodeGroup.Replicas > 0 { | ||
| // Fall back to Replicas when MinReplicas is not set or is 0. | ||
| // This handles static clusters where users only set Replicas. | ||
| minReplicas = *nodeGroup.Replicas | ||
| } | ||
| count += minReplicas * nodeGroup.NumOfHosts | ||
| } | ||
| return count | ||
| } | ||
|
|
There was a problem hiding this comment.
Can we use the function CalculateDesiredReplicas?
kuberay/ray-operator/controllers/ray/utils/util.go
Lines 410 to 418 in 79b5c30
| // Wait for the expected number of worker nodes to register for the Ray cluster. | ||
| // RAY_EXPECTED_MIN_WORKERS is set by the controller based on the MinReplicas in the RayClusterSpec. | ||
| // The loop queries the Ray Dashboard API to get the number of alive nodes and | ||
| // continues until the number of alive nodes is equal to (expected_workers + 1) for head node. | ||
| // This ensures that worker pods are connected before the job is submitted otherwise | ||
| // the jobs may run on the Head node. | ||
| // | ||
| // Note: This loop will never timeout and will wait indefinitely if workers never register. | ||
| // This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field | ||
| // to enforce a maximum job execution time. | ||
| // | ||
| // The wget command includes the x-ray-authorization header if RAY_AUTH_TOKEN is set. | ||
| // This is required when Ray auth token mode is enabled, otherwise the request will fail with 401. | ||
| wgetAuthHeader := "${" + utils.RAY_AUTH_TOKEN_ENV_VAR + ":+--header \"x-ray-authorization: Bearer $" + utils.RAY_AUTH_TOKEN_ENV_VAR + "\"}" | ||
| waitForNodesLoop := []string{ | ||
| "if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then", | ||
| "EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";", | ||
| "echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";", | ||
| "until", "[", | ||
| "\"$(wget " + wgetAuthHeader + " -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"", | ||
| "-ge", "\"$EXPECTED_NODES\"", "]", ";", | ||
| "do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";", | ||
| "echo", strconv.Quote("All expected nodes are registered."), ";", | ||
| "fi", ";", | ||
| } | ||
| cmd = append(cmd, waitForNodesLoop...) |
There was a problem hiding this comment.
Can we use something like ray list nodes using ray's CLI?
we are removing wget
There was a problem hiding this comment.
@Future-Outlier - I pushed a commit that uses python's urllib instead of wget.
Is this OK instead of ray list nodes?
This PR is to address #4199 which does look like the primary issue is make sure the jobs do not get scheduled to the head node. @Future-Outlier - Let me know if we you think it would still be worthwhile to iterate on this PR or if simply specifying the |
|
Hi, @marosset it's great to accept this PR, since we want to make k8s job mode and sidecar mode as similar as possible. |
Great, I'll address the above feedback then! |
Signed-off-by: Mark Rossett <marosset@microsoft.com>
…ecar sumitter jobs Signed-off-by: Mark Rossett <marosset@microsoft.com>
|
Hi, @marosset do you mind fix CI's err? |

Why are these changes needed?
In SideCarMode, the submitted can submit jobs before worker are connected to the Ray cluster causing jobs to run on the head node.
This fix adds a wait loop that polls the Ray Dashboard API to verify that
minReplicasworkers are joined to the cluster before submitting the job. The minimum expected worker count is retrieved from the spec and passed via RAY_EXPECTED_MIN_WORKERS env varRelated issue number
Fixes #4199
Checks