Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 11 additions & 37 deletions release/ray_release/command_runner/anyscale_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
JobBrokenError,
JobNoLogsError,
JobOutOfRetriesError,
JobTerminatedBeforeStartError,
JobTerminatedError,
LogsError,
PrepareCommandError,
PrepareCommandTimeout,
Expand Down Expand Up @@ -158,26 +156,15 @@ def wait_for_nodes(self, num_nodes: int, timeout: float = 900):
f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30
)

def _handle_command_output(
self, job_status_code: int, error: str, raise_on_timeout: bool = True
):
if job_status_code == -2:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early -1 check skips output analysis and has wrong message

Medium Severity

The check for job_state == -1 was moved from the end of _handle_command_output to the beginning, changing the error-handling behavior. In the original code, even when the job returned -1 (OUT_OF_RETRIES), the function would still fetch output.json and check for specific errors like PrepareCommandError or TestCommandError, only raising JobOutOfRetriesError as a fallback. Now it raises immediately without analyzing output. Additionally, the error message says "command has not been ran" but -1 is only returned when job_running == True (the job did start), making the message inaccurate.

Fix in Cursor Fix in Web

raise JobBrokenError(f"Job state is 'BROKEN' with error:\n{error}\n")

if job_status_code == -3:
raise JobTerminatedError(
"Job entered 'TERMINATED' state (it was terminated "
"manually or Ray was stopped):"
f"\n{error}\n"
def _handle_command_output(self, job_state: int, raise_on_timeout: bool = True):
if job_state == -1:
raise JobOutOfRetriesError(
"Job returned non-success state: 'FAILED' "
"(command has not been ran or no logs could have been obtained)."
)

if job_status_code == -4:
raise JobTerminatedBeforeStartError(
"Job entered 'TERMINATED' state before it started "
"(most likely due to inability to provision required nodes; "
"otherwise it was terminated manually or Ray was stopped):"
f"\n{error}\n"
)
if job_state == -2:
raise JobBrokenError("Job state is 'UNKNOWN'.")
Comment on lines +166 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The migration to the new SDK seems to have removed the detailed error message that was previously included in this exception. To maintain debuggability, could you please fetch and include the job logs in the exception message? This will be very helpful for diagnosing failures.

Suggested change
if job_state == -2:
raise JobBrokenError("Job state is 'UNKNOWN'.")
if job_state == -2:
logs = self.get_last_logs() or "Could not retrieve logs."
raise JobBrokenError(f"Job state is 'UNKNOWN'.\n--- Logs ---\n{logs}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled return code -4 for soft infra errors

High Severity

The _handle_command_output function only handles return codes -1 and -2, but run_and_wait can still return -4 for soft infrastructure errors (when JobState.FAILED occurs before the job starts running). The old handling for -4 that raised JobTerminatedBeforeStartError was removed, but the code path that returns -4 still exists. This causes jobs that fail during cluster provisioning to fall through to output fetching, eventually raising an incorrect JobNoLogsError instead of the appropriate infrastructure failure exception.

Additional Locations (1)

Fix in Cursor Fix in Web


# First try to obtain the output.json from S3.
# If that fails, try logs.
Expand Down Expand Up @@ -214,8 +201,7 @@ def _handle_command_output(
)
raise PrepareCommandError(
f"Prepare command '{self.prepare_commands[-1]}' returned "
f"non-success status: {prepare_return_codes[-1]} with error:"
f"\n{error}\n"
f"non-success status: {prepare_return_codes[-1]}."
)
else:
raise JobNoLogsError("Could not obtain logs for the job.")
Expand All @@ -231,15 +217,7 @@ def _handle_command_output(

if workload_status_code is not None and workload_status_code != 0:
raise TestCommandError(
f"Command returned non-success status: {workload_status_code} with "
f"error:\n{error}\n"
)

if job_status_code == -1:
raise JobOutOfRetriesError(
"Job returned non-success state: 'OUT_OF_RETRIES' "
"(command has not been ran or no logs could have been obtained) "
f"with error:\n{error}\n"
f"Command returned non-success status: {workload_status_code}."
)

def _get_full_command_env(self, env: Optional[Dict[str, str]] = None):
Expand Down Expand Up @@ -348,18 +326,14 @@ def run_command(
working_dir = azure_file_path
logger.info(f"Working dir uploaded to {working_dir}")

job_status_code, time_taken = self.job_manager.run_and_wait(
job_state, time_taken = self.job_manager.run_and_wait(
full_command,
full_env,
working_dir=working_dir,
upload_path=self.upload_path,
timeout=int(timeout),
)
error_message = self.job_manager.job_error_message()

self._handle_command_output(
job_status_code, error_message, raise_on_timeout=raise_on_timeout
)
self._handle_command_output(job_state, raise_on_timeout=raise_on_timeout)

return time_taken

Expand Down
46 changes: 22 additions & 24 deletions release/ray_release/job_manager/anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from typing import Any, Dict, Optional, Tuple

import anyscale
from anyscale.job.models import JobState
from anyscale.sdk.anyscale_client.models import (
CreateProductionJob,
CreateProductionJobConfig,
HaJobStates,
)

from ray_release.anyscale_util import LAST_LOGS_LENGTH
Expand All @@ -25,10 +25,9 @@
)

job_status_to_return_code = {
HaJobStates.SUCCESS: 0,
HaJobStates.OUT_OF_RETRIES: -1,
HaJobStates.BROKEN: -2,
HaJobStates.TERMINATED: -3,
JobState.SUCCEEDED: 0,
JobState.FAILED: -1,
JobState.UNKNOWN: -2,
}
terminal_state = set(job_status_to_return_code.keys())

Expand Down Expand Up @@ -90,15 +89,17 @@ def _run_job(
f"{e}"
) from e

self.save_last_job_result(job_response.result)
self._job_id = job_response.result.id
self._last_job_result = None
self.start_time = time.time()

logger.info(f"Link to job: " f"{format_link(self.job_url())}")
return

def save_last_job_result(self, value):
self._last_job_result = value
self._job_id = value.id if value else None
def save_last_job_status(self, status):
if status and hasattr(status, "id") and status.id != self._job_id:
logger.warning(f"Job ID mismatch: expected {self._job_id}, got {status.id}")
self._last_job_result = status

def job_id(self) -> Optional[str]:
return self._job_id
Expand All @@ -108,15 +109,10 @@ def job_url(self) -> Optional[str]:
return None
return anyscale_job_url(self._job_id)

def _last_job_status(self) -> Optional[HaJobStates]:
def _last_job_status(self) -> Optional["JobState"]:
if not self._last_job_result:
return None
return self._last_job_result.state.current_state

def job_error_message(self) -> str:
if self._last_job_result is None:
return ""
return self._last_job_result.state.error
return self._last_job_result.state

def _in_progress(self) -> bool:
if not self._last_job_result:
Expand All @@ -125,14 +121,16 @@ def _in_progress(self) -> bool:

def _get_job_status_with_retry(self):
return exponential_backoff_retry(
lambda: self._sdk.get_production_job(self._job_id),
lambda: anyscale.job.status(id=self._job_id),
retry_exceptions=Exception,
initial_retry_delay_s=1,
max_retries=3,
).result
)

def _terminate_job(self, raise_exceptions: bool = False):
if not self._in_progress():
if not self._job_id:
return
if self._last_job_result is not None and not self._in_progress():
return
logger.info(f"Terminating job {self._job_id}...")
try:
Expand Down Expand Up @@ -202,12 +200,12 @@ def _wait_job(self, timeout: int):
next_status += 30

result = self._get_job_status_with_retry()
self.save_last_job_result(result)
self.save_last_job_status(result)
status = self._last_job_status()

if not job_running and status in {
HaJobStates.RUNNING,
HaJobStates.ERRORED,
JobState.STARTING,
JobState.RUNNING,
}:
logger.info(
f"... job started ...({int(now - start_time)} seconds) ..."
Expand All @@ -223,10 +221,10 @@ def _wait_job(self, timeout: int):
time.sleep(1)

result = self._get_job_status_with_retry()
self.save_last_job_result(result)
self.save_last_job_status(result)
status = self._last_job_status()
assert status in terminal_state
if status == HaJobStates.TERMINATED and not job_running:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return code -4 generated but never handled

High Severity

_wait_job produces retcode = -4 when JobState.FAILED occurs before the job starts running, but _handle_command_output only handles -1 and -2. The old code raised JobTerminatedBeforeStartError (with retryable CLUSTER_STARTUP_TIMEOUT exit code) for -4. Now the unhandled -4 falls through to output/log collection, which will likely fail for a job that never started, resulting in a JobNoLogsError with a non-retryable exit code — misclassifying a retryable infrastructure failure.

Additional Locations (1)

Fix in Cursor Fix in Web

if status == JobState.FAILED and not job_running:
# Soft infra error
retcode = -4
else:
Expand Down
10 changes: 6 additions & 4 deletions release/ray_release/tests/test_anyscale_job_manager.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import sys

import pytest
from anyscale.job.models import JobState

from ray_release.anyscale_util import Anyscale
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.job_manager.anyscale_job_manager import AnyscaleJobManager
from ray_release.test import Test


class FakeJobResult:
def __init__(self, _id: str):
self.id = _id
class FakeJobStatus:
def __init__(self, state: JobState):
self.state = state


class FakeSDK(Anyscale):
Expand All @@ -31,7 +32,8 @@ def test_get_last_logs_long_running_job():
)
anyscale_job_manager = AnyscaleJobManager(cluster_manager=cluster_manager)
anyscale_job_manager._duration = 4 * 3_600 + 1
anyscale_job_manager.save_last_job_result(FakeJobResult(_id="foo"))
anyscale_job_manager._job_id = "foo"
anyscale_job_manager.save_last_job_status(FakeJobStatus(state=JobState.SUCCEEDED))
assert anyscale_job_manager.get_last_logs() is None


Expand Down