Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions clients/python-client/python_client_test/test_job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,15 @@ def test_list_jobs(self):
)
self.assertIsNotNone(status, f"Job {job_info['name']} status should be available")

result = self.api.list_jobs(k8s_namespace=namespace)
# Retry list to allow for eventual consistency (list may lag behind creates)
expected_min_count = initial_count + len(test_jobs)
result = None
for _ in range(10):
result = self.api.list_jobs(k8s_namespace=namespace)
if result and len(result.get("items", [])) >= expected_min_count:
break
time.sleep(2)

self.assertIsNotNone(result, "List jobs should return a result")
self.assertIn("items", result, "Result should contain 'items' field")

Expand All @@ -519,8 +527,8 @@ def test_list_jobs(self):

self.assertGreaterEqual(
current_count,
initial_count + len(test_jobs),
f"Should have at least {len(test_jobs)} more jobs than initially"
expected_min_count,
f"Should have at least {len(test_jobs)} more jobs than initially (got {current_count}, need {expected_min_count})"
)

job_names_in_list = [item.get("metadata", {}).get("name") for item in items]
Expand Down
20 changes: 17 additions & 3 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,27 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo

if submissionMode == rayv1.SidecarMode {
// Wait until Ray Dashboard GCS is healthy before proceeding.
// Use the same Ray Dashboard GCS health check command as the readiness probe
// Use wget for Ray < 2.53 (older images), and Python when wget may be unavailable (e.g. slim images).
rayDashboardGCSHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeFailureThreshold,
utils.BasePythonHealthCommand,
port,
utils.RayDashboardGCSHealthPath,
utils.DefaultReadinessProbeFailureThreshold,
)
if rayJobInstance.Spec.RayClusterSpec != nil {
if v, err := semver.NewVersion(rayJobInstance.Spec.RayClusterSpec.RayVersion); err == nil {
// Ray 2.53.0 introduced a unified HTTP health endpoint; slim images without wget exist for newer versions.
minVersion := semver.MustParse("2.53.0")
if v.LessThan(minVersion) {
rayDashboardGCSHealthCommand = fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeFailureThreshold,
port,
utils.RayDashboardGCSHealthPath,
)
}
}
}

waitLoop := []string{
"until", rayDashboardGCSHealthCommand, ">/dev/null", "2>&1", ";",
Expand Down
59 changes: 31 additions & 28 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,27 +443,33 @@ func initLivenessAndReadinessProbe(rayContainer *corev1.Container, rayNodeType r
},
}

rayAgentRayletHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeTimeoutSeconds,
getPort("dashboard-agent-listen-port", utils.DefaultDashboardAgentListenPort),
utils.RayAgentRayletHealthPath,
)
rayDashboardGCSHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeFailureThreshold,
getPort("dashboard-port", utils.DefaultDashboardPort),
utils.RayDashboardGCSHealthPath,
)

// For Ray < 2.53, liveness/readiness use exec probes (bash) and rely on CLI tools.
// Generally, the liveness and readiness probes perform the same checks.
// For head node => Check GCS and Raylet status.
// For worker node => Check Raylet status.
commands := []string{}
if rayNodeType == rayv1.HeadNode {
commands = append(commands, rayAgentRayletHealthCommand, rayDashboardGCSHealthCommand)
} else {
commands = append(commands, rayAgentRayletHealthCommand)
if !httpHealthCheck {
dashboardAgentPort := getPort("dashboard-agent-listen-port", utils.DefaultDashboardAgentListenPort)
dashboardPort := getPort("dashboard-port", utils.DefaultDashboardPort)

rayAgentRayletHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeTimeoutSeconds,
dashboardAgentPort,
utils.RayAgentRayletHealthPath,
)
rayDashboardGCSHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeFailureThreshold,
dashboardPort,
utils.RayDashboardGCSHealthPath,
)

if rayNodeType == rayv1.HeadNode {
commands = append(commands, rayAgentRayletHealthCommand, rayDashboardGCSHealthCommand)
} else {
commands = append(commands, rayAgentRayletHealthCommand)
}
}

if rayContainer.LivenessProbe == nil {
Expand Down Expand Up @@ -504,20 +510,17 @@ func initLivenessAndReadinessProbe(rayContainer *corev1.Container, rayNodeType r
rayContainer.ReadinessProbe.Exec = &corev1.ExecAction{Command: []string{"bash", "-c", strings.Join(commands, " && ")}}
}

// For worker Pods serving traffic, we need to add an additional HTTP proxy health check for the readiness probe.
// Note: head Pod checks the HTTP proxy's health at every rayservice controller reconcile instaed of using readiness probe.
// For worker Pods serving traffic, readiness checks Ray Serve proxy health only (liveness covers node health).
// Note: head Pod checks the HTTP proxy's health at every rayservice controller reconcile instead of using readiness probe.
// See https://github.com/ray-project/kuberay/pull/1808 for reasons.
if creatorCRDType == utils.RayServiceCRD && rayNodeType == rayv1.WorkerNode {
rayContainer.ReadinessProbe.FailureThreshold = utils.ServeReadinessProbeFailureThreshold
rayServeProxyHealthCommand := fmt.Sprintf(
utils.BaseWgetHealthCommand,
utils.DefaultReadinessProbeInitialDelaySeconds,
utils.FindContainerPort(rayContainer, utils.ServingPortName, utils.DefaultServingPort),
utils.RayServeProxyHealthPath,
)
commands = append(commands, rayServeProxyHealthCommand)
rayContainer.ReadinessProbe.HTTPGet = nil
rayContainer.ReadinessProbe.Exec = &corev1.ExecAction{Command: []string{"bash", "-c", strings.Join(commands, " && ")}}
servingPort := utils.FindContainerPort(rayContainer, utils.ServingPortName, utils.DefaultServingPort)
rayContainer.ReadinessProbe.HTTPGet = &corev1.HTTPGetAction{
Path: "/" + utils.RayServeProxyHealthPath,
Port: intstr.IntOrString{Type: intstr.Int, IntVal: servingPort},
}
rayContainer.ReadinessProbe.Exec = nil
}
}
}
Expand Down
35 changes: 21 additions & 14 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,16 +1693,18 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {
assert.Nil(t, rayContainer.LivenessProbe.Exec)
assert.Nil(t, rayContainer.ReadinessProbe.Exec)

// Test 2: User does not define a custom probe. KubeRay will inject Exec probe for worker pod.
// Here we test the case where the Ray Pod originates from RayServiceCRD,
// implying that an additional serve health check will be added to the readiness probe.
// Test 2: User does not define a custom probe. RayService worker: liveness = exec (node health), readiness = HTTPGet /-/healthz (Serve proxy).
rayContainer.LivenessProbe = nil
rayContainer.ReadinessProbe = nil
initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayStartParams, "")
assert.NotNil(t, rayContainer.LivenessProbe.Exec)
assert.NotNil(t, rayContainer.ReadinessProbe.Exec)
assert.NotContains(t, strings.Join(rayContainer.LivenessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath)
assert.Contains(t, strings.Join(rayContainer.ReadinessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath)
livenessCmd := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ")
assert.Contains(t, livenessCmd, "wget", "exec probe should use wget for Ray < 2.53")
assert.NotContains(t, livenessCmd, "python3", "exec probe should not require Python for Ray < 2.53")
assert.NotContains(t, livenessCmd, utils.RayServeProxyHealthPath)
assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy")
assert.Nil(t, rayContainer.ReadinessProbe.Exec)
assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path)
assert.Equal(t, int32(2), rayContainer.LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(2), rayContainer.ReadinessProbe.TimeoutSeconds)

Expand Down Expand Up @@ -1734,6 +1736,8 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {
livenessCommand := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ")
readinessCommand := strings.Join(rayContainer.ReadinessProbe.Exec.Command, " ")

assert.Contains(t, livenessCommand, "wget", "exec probe should use wget for Ray < 2.53")
assert.NotContains(t, livenessCommand, "python3")
assert.Contains(t, livenessCommand, ":8266", "Head pod liveness probe should use custom dashboard-agent-listen-port")
assert.Contains(t, livenessCommand, ":8365", "Head pod liveness probe should use custom dashboard-port")
assert.Contains(t, readinessCommand, ":8266", "Head pod readiness probe should use custom dashboard-agent-listen-port")
Expand All @@ -1757,7 +1761,7 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {
assert.NotContains(t, workerLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Worker pod should not check dashboard-port")
assert.NotContains(t, workerReadinessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Worker pod should not check dashboard-port")

// Test 6: Test RayService worker with custom ports and serve proxy health check
// Test 6: Test RayService worker with custom dashboard port and serve proxy health check (readiness = HTTPGet /-/healthz).
rayContainer.LivenessProbe = nil
rayContainer.ReadinessProbe = nil
rayContainer.Ports = []corev1.ContainerPort{
Expand All @@ -1770,9 +1774,10 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {
"dashboard-agent-listen-port": "8500",
}
initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayServiceWorkerParams, "")
rayServiceReadinessCommand := strings.Join(rayContainer.ReadinessProbe.Exec.Command, " ")
assert.Contains(t, rayServiceReadinessCommand, ":8500", "RayService worker should use custom dashboard-agent-listen-port")
assert.Contains(t, rayServiceReadinessCommand, utils.RayServeProxyHealthPath, "RayService worker should include serve proxy health check")
assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy")
assert.Nil(t, rayContainer.ReadinessProbe.Exec)
assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path)
assert.Equal(t, int32(utils.DefaultServingPort), rayContainer.ReadinessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, int32(utils.ServeReadinessProbeFailureThreshold), rayContainer.ReadinessProbe.FailureThreshold, "RayService worker should have correct failure threshold")

// Test 8: Test invalid port values (should fall back to defaults)
Expand All @@ -1786,6 +1791,8 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {

invalidPortLivenessCommand := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ")

assert.Contains(t, invalidPortLivenessCommand, "wget", "exec probe should use wget for Ray < 2.53")
assert.NotContains(t, invalidPortLivenessCommand, "python3")
// Should fall back to default ports when invalid values are provided
assert.Contains(t, invalidPortLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardAgentListenPort), "Should fall back to default dashboard-agent-listen-port for invalid input")
assert.Contains(t, invalidPortLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Should fall back to default dashboard-port for invalid input")
Expand All @@ -1811,15 +1818,15 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {
assert.NotNil(t, rayContainer.LivenessProbe.HTTPGet)
assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet)

// Ray Serve workers still use exec probes for readiness to check the proxy actor.
// Ray Serve workers: liveness = HTTPGet /api/healthz (node), readiness = HTTPGet /-/healthz (Serve proxy).
rayContainer.LivenessProbe = nil
rayContainer.ReadinessProbe = nil
initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayStartParams, "2.53.0")
assert.NotNil(t, rayContainer.LivenessProbe.HTTPGet)
assert.Nil(t, rayContainer.LivenessProbe.Exec)
assert.Nil(t, rayContainer.ReadinessProbe.HTTPGet)
assert.NotNil(t, rayContainer.ReadinessProbe.Exec)
assert.Contains(t, strings.Join(rayContainer.ReadinessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath)
assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy")
assert.Nil(t, rayContainer.ReadinessProbe.Exec)
assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path)

// Versions parsed below 2.53 must use exec probes.
rayContainer.LivenessProbe = nil
Expand Down
9 changes: 7 additions & 2 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,13 @@ const (
RayAgentRayletHealthPath = "api/local_raylet_healthz"
RayDashboardGCSHealthPath = "api/gcs_healthz"
RayServeProxyHealthPath = "-/healthz"
BaseWgetHealthCommand = "wget --tries 1 -T %d -q -O- http://localhost:%d/%s | grep success"
RayNodeHealthPath = "/api/healthz"
// BaseWgetHealthCommand checks a single health URL; args: timeout_sec, port, path (no leading slash).
// This is used for Ray versions that rely on exec probes and assume common CLI tools exist in the image.
BaseWgetHealthCommand = `wget -q -T %d -O- http://localhost:%d/%s | grep -q success`
// BasePythonHealthCommand checks a single health URL; args: port, path (no leading slash), timeout_sec.
// This is used when wget is not available (e.g. slim Ray images).
BasePythonHealthCommand = `python3 -c "import urllib.request; r=urllib.request.urlopen('http://localhost:%d/%s', timeout=%d); exit(0 if b'success' in r.read() else 1)"`
RayNodeHealthPath = "/api/healthz"

// Finalizers for RayJob
RayJobStopJobFinalizer = "ray.io/rayjob-finalizer"
Expand Down
Loading