Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion release/ray_release/anyscale_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def create_cluster_env_from_image(
config_json=dict(
docker_image=image,
ray_version="nightly",
env_vars=runtime_env,
),
)
)
Expand Down
5 changes: 1 addition & 4 deletions release/ray_release/cluster_manager/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ def set_cluster_env(self):
.replace(":", "_")
.replace(".", "_")
)
self.cluster_env_name = (
f"{byod_image_name_normalized}"
f"__env__{dict_hash(self.test.get_byod_runtime_env())}"
)
self.cluster_env_name = byod_image_name_normalized

def set_cluster_compute(
self,
Expand Down
48 changes: 11 additions & 37 deletions release/ray_release/command_runner/anyscale_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
JobBrokenError,
JobNoLogsError,
JobOutOfRetriesError,
JobTerminatedBeforeStartError,
JobTerminatedError,
LogsError,
PrepareCommandError,
PrepareCommandTimeout,
Expand Down Expand Up @@ -158,26 +156,15 @@ def wait_for_nodes(self, num_nodes: int, timeout: float = 900):
f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30
)

def _handle_command_output(
self, job_status_code: int, error: str, raise_on_timeout: bool = True
):
if job_status_code == -2:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early FAILED check skips output parsing for running jobs

Medium Severity

When job_state == -1 (a FAILED job that was already running), _handle_command_output immediately raises JobOutOfRetriesError without attempting to parse output. The old code checked this condition last, after parsing output, so it could raise more specific errors like TestCommandError or PrepareCommandError. The error message also incorrectly says "command has not been ran" even though the command did run (since -1 is only returned when job_running was True).

Fix in Cursor Fix in Web

raise JobBrokenError(f"Job state is 'BROKEN' with error:\n{error}\n")

if job_status_code == -3:
raise JobTerminatedError(
"Job entered 'TERMINATED' state (it was terminated "
"manually or Ray was stopped):"
f"\n{error}\n"
def _handle_command_output(self, job_state: int, raise_on_timeout: bool = True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The refactoring to use the new SDK has removed the detailed error messages from the exceptions raised on job failure. For example, JobBrokenError, PrepareCommandError, and TestCommandError are now raised with generic messages. Previously, they included a specific error message from the job, which is very helpful for debugging.

Please consider re-introducing these error details. The new Anyscale SDK's JobStatus object likely contains an error message (e.g., in a message attribute). This could be exposed via the AnyscaleJobManager and passed to this function to be included in the exceptions.

if job_state == -1:
raise JobOutOfRetriesError(
"Job returned non-success state: 'FAILED' "
"(command has not been ran or no logs could have been obtained)."
)
Comment on lines +160 to 164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _handle_command_output function is missing a check for job_state == -4. This state is returned by AnyscaleJobManager for jobs that fail before starting (a "soft infra error"). Without handling this case, a real failure could be missed. The previous implementation correctly raised a JobTerminatedBeforeStartError.

Please add a check for job_state == -4. You will also need to re-import JobTerminatedBeforeStartError at the top of the file.

        if job_state == -4:
            raise JobTerminatedBeforeStartError(
                "Job entered 'TERMINATED' state before it started "
                "(most likely due to inability to provision required nodes; "
                "otherwise it was terminated manually or Ray was stopped)."
            )

        if job_state == -1:
            raise JobOutOfRetriesError(
                "Job returned non-success state: 'FAILED' "
                "(command has not been ran or no logs could have been obtained)."
            )


if job_status_code == -4:
raise JobTerminatedBeforeStartError(
"Job entered 'TERMINATED' state before it started "
"(most likely due to inability to provision required nodes; "
"otherwise it was terminated manually or Ray was stopped):"
f"\n{error}\n"
)
if job_state == -2:
raise JobBrokenError("Job state is 'UNKNOWN'.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return code -4 unhandled in command output handler

High Severity

_wait_job can return retcode -4 when JobState.FAILED occurs before the job starts running, but _handle_command_output only handles -1 and -2. The old code raised JobTerminatedBeforeStartError (with ExitCode.CLUSTER_STARTUP_TIMEOUT, a retryable infra error) for -4. Now the code falls through to output parsing, which will fail since the job never ran, ultimately raising JobNoLogsError with ExitCode.ANYSCALE_ERROR — a different, non-retryable error category.

Additional Locations (1)

Fix in Cursor Fix in Web


# First try to obtain the output.json from S3.
# If that fails, try logs.
Expand Down Expand Up @@ -214,8 +201,7 @@ def _handle_command_output(
)
raise PrepareCommandError(
f"Prepare command '{self.prepare_commands[-1]}' returned "
f"non-success status: {prepare_return_codes[-1]} with error:"
f"\n{error}\n"
f"non-success status: {prepare_return_codes[-1]}."
)
else:
raise JobNoLogsError("Could not obtain logs for the job.")
Expand All @@ -231,15 +217,7 @@ def _handle_command_output(

if workload_status_code is not None and workload_status_code != 0:
raise TestCommandError(
f"Command returned non-success status: {workload_status_code} with "
f"error:\n{error}\n"
)

if job_status_code == -1:
raise JobOutOfRetriesError(
"Job returned non-success state: 'OUT_OF_RETRIES' "
"(command has not been ran or no logs could have been obtained) "
f"with error:\n{error}\n"
f"Command returned non-success status: {workload_status_code}."
)

def _get_full_command_env(self, env: Optional[Dict[str, str]] = None):
Expand Down Expand Up @@ -348,18 +326,14 @@ def run_command(
working_dir = azure_file_path
logger.info(f"Working dir uploaded to {working_dir}")

job_status_code, time_taken = self.job_manager.run_and_wait(
job_state, time_taken = self.job_manager.run_and_wait(
full_command,
full_env,
working_dir=working_dir,
upload_path=self.upload_path,
timeout=int(timeout),
)
error_message = self.job_manager.job_error_message()

self._handle_command_output(
job_status_code, error_message, raise_on_timeout=raise_on_timeout
)
self._handle_command_output(job_state, raise_on_timeout=raise_on_timeout)

return time_taken

Expand Down
59 changes: 44 additions & 15 deletions release/ray_release/custom_byod_build_init_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,31 @@ def generate_custom_build_step_key(image: str) -> str:

def get_images_from_tests(
tests: List[Test], build_id: str
) -> Tuple[List[Tuple[str, str, str, str]], Dict[str, List[str]]]:
) -> Tuple[
List[Tuple[str, str, Optional[str], Optional[str], Optional[Dict[str, str]]]],
Dict[str, List[str]],
]:
"""Get a list of custom BYOD images to build from a list of tests."""
custom_byod_images = set()
custom_byod_images = {}
custom_image_test_names_map = {}
for test in tests:
if not test.require_custom_byod_image():
continue
custom_byod_image_build = (
test.get_anyscale_byod_image(build_id),
test.get_anyscale_base_byod_image(build_id),
test.get_byod_post_build_script(),
test.get_byod_python_depset(),
)
custom_byod_images.add(custom_byod_image_build)
image_tag = custom_byod_image_build[0]
image_tag = test.get_anyscale_byod_image(build_id)
if image_tag not in custom_byod_images:
runtime_env = test.get_byod_runtime_env() or None
custom_byod_images[image_tag] = (
image_tag,
test.get_anyscale_base_byod_image(build_id),
test.get_byod_post_build_script(),
test.get_byod_python_depset(),
runtime_env,
)
logger.info(f"To be built: {image_tag}")
if image_tag not in custom_image_test_names_map:
custom_image_test_names_map[image_tag] = []
custom_image_test_names_map[image_tag].append(test.get_name())
return list(custom_byod_images), custom_image_test_names_map
return list(custom_byod_images.values()), custom_image_test_names_map


def create_custom_build_yaml(destination_file: str, tests: List[Test]) -> None:
Expand All @@ -57,14 +62,38 @@ def create_custom_build_yaml(destination_file: str, tests: List[Test]) -> None:
return
build_config = {"group": "Custom images build", "steps": []}
ray_want_commit = os.getenv("RAY_WANT_COMMIT_IN_IMAGE", "")
for image, base_image, post_build_script, python_depset in custom_byod_images:
for (
image,
base_image,
post_build_script,
python_depset,
runtime_env,
) in custom_byod_images:
logger.info(
f"Building custom BYOD image: {image}, base image: {base_image}, post build script: {post_build_script}"
f"Building custom BYOD image: {image}, base image: {base_image}, "
f"post build script: {post_build_script}, runtime_env: {runtime_env}"
)
if not post_build_script and not python_depset:
if not post_build_script and not python_depset and not runtime_env:
continue
step_key = generate_custom_build_step_key(image)
step_name = _get_step_name(image, step_key, custom_image_test_names_map[image])
env_args = ""
if runtime_env:
env_args = " ".join(
f"--env {k}={v}" for k, v in sorted(runtime_env.items())
)
build_cmd_parts = [
"bazelisk run //release:custom_byod_build --",
f"--image-name {image}",
f"--base-image {base_image}",
]
if post_build_script:
build_cmd_parts.append(f"--post-build-script {post_build_script}")
if python_depset:
build_cmd_parts.append(f"--python-depset {python_depset}")
if env_args:
build_cmd_parts.append(env_args)
build_cmd = " ".join(build_cmd_parts)
step = {
"label": step_name,
"key": step_key,
Expand All @@ -77,7 +106,7 @@ def create_custom_build_yaml(destination_file: str, tests: List[Test]) -> None:
"bash release/azure_docker_login.sh",
f"az acr login --name {AZURE_REGISTRY_NAME}",
f"aws ecr get-login-password --region {config['byod_ecr_region']} | docker login --username AWS --password-stdin {config['byod_ecr']}",
f"bazelisk run //release:custom_byod_build -- --image-name {image} --base-image {base_image} {f'--post-build-script {post_build_script}' if post_build_script else ''} {f'--python-depset {python_depset}' if python_depset else ''}",
build_cmd,
],
}
step["depends_on"] = get_prerequisite_step(image, base_image)
Expand Down
80 changes: 31 additions & 49 deletions release/ray_release/job_manager/anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
from typing import Any, Dict, Optional, Tuple

import anyscale
from anyscale.sdk.anyscale_client.models import (
CreateProductionJob,
CreateProductionJobConfig,
HaJobStates,
)
from anyscale.job.models import JobConfig, JobState

from ray_release.anyscale_util import LAST_LOGS_LENGTH
from ray_release.cluster_manager.cluster_manager import ClusterManager
Expand All @@ -25,10 +21,9 @@
)

job_status_to_return_code = {
HaJobStates.SUCCESS: 0,
HaJobStates.OUT_OF_RETRIES: -1,
HaJobStates.BROKEN: -2,
HaJobStates.TERMINATED: -3,
JobState.SUCCEEDED: 0,
JobState.FAILED: -1,
JobState.UNKNOWN: -2,
}
terminal_state = set(job_status_to_return_code.keys())

Expand All @@ -38,7 +33,6 @@ def __init__(self, cluster_manager: ClusterManager):
self.start_time = None
self.counter = 0
self.cluster_manager = cluster_manager
self._sdk = cluster_manager.sdk
self._last_job_result = None
self._job_id: Optional[str] = None
self._last_logs = None
Expand All @@ -61,44 +55,35 @@ def _run_job(
f"Executing {cmd_to_run} with {env_vars_for_job} via Anyscale job submit"
)

runtime_env = {
"env_vars": env_vars_for_job,
}
if working_dir:
runtime_env["working_dir"] = working_dir
if upload_path:
runtime_env["upload_path"] = upload_path

try:
job_request = CreateProductionJob(
job_config = JobConfig(
name=self.cluster_manager.cluster_name,
description=f"Smoke test: {self.cluster_manager.smoke_test}",
project_id=self.cluster_manager.project_id,
config=CreateProductionJobConfig(
entrypoint=cmd_to_run,
runtime_env=runtime_env,
build_id=self.cluster_manager.cluster_env_build_id,
compute_config_id=self.cluster_manager.cluster_compute_id,
max_retries=0,
),
entrypoint=cmd_to_run,
image_uri=self.cluster_manager.test.get_anyscale_byod_image(),
compute_config=self.cluster_manager.cluster_compute_id,
project=self.cluster_manager.project_name,
env_vars=env_vars_for_job,
working_dir=working_dir if working_dir else None,
max_retries=0,
)
job_response = self._sdk.create_job(job_request)
self._job_id = anyscale.job.submit(config=job_config)
except Exception as e:
raise JobStartupFailed(
"Error starting job with name "
f"{self.cluster_manager.cluster_name}: "
f"{e}"
) from e

self.save_last_job_result(job_response.result)
self._last_job_result = None
self.start_time = time.time()

logger.info(f"Link to job: " f"{format_link(self.job_url())}")
return

def save_last_job_result(self, value):
self._last_job_result = value
self._job_id = value.id if value else None
def save_last_job_status(self, status):
if status and hasattr(status, "id") and status.id != self._job_id:
logger.warning(f"Job ID mismatch: expected {self._job_id}, got {status.id}")
self._last_job_result = status

def job_id(self) -> Optional[str]:
return self._job_id
Expand All @@ -108,15 +93,10 @@ def job_url(self) -> Optional[str]:
return None
return anyscale_job_url(self._job_id)

def _last_job_status(self) -> Optional[HaJobStates]:
def _last_job_status(self) -> Optional["JobState"]:
if not self._last_job_result:
return None
return self._last_job_result.state.current_state

def job_error_message(self) -> str:
if self._last_job_result is None:
return ""
return self._last_job_result.state.error
return self._last_job_result.state

def _in_progress(self) -> bool:
if not self._last_job_result:
Expand All @@ -125,18 +105,20 @@ def _in_progress(self) -> bool:

def _get_job_status_with_retry(self):
return exponential_backoff_retry(
lambda: self._sdk.get_production_job(self._job_id),
lambda: anyscale.job.status(id=self._job_id),
retry_exceptions=Exception,
initial_retry_delay_s=1,
max_retries=3,
).result
)

def _terminate_job(self, raise_exceptions: bool = False):
if not self._in_progress():
if not self._job_id:
return
if self._last_job_result is not None and not self._in_progress():
return
logger.info(f"Terminating job {self._job_id}...")
try:
self._sdk.terminate_job(self._job_id)
anyscale.job.terminate(id=self._job_id)
logger.info(f"Job {self._job_id} terminated!")
except Exception:
msg = f"Couldn't terminate job {self._job_id}!"
Expand Down Expand Up @@ -202,12 +184,12 @@ def _wait_job(self, timeout: int):
next_status += 30

result = self._get_job_status_with_retry()
self.save_last_job_result(result)
self.save_last_job_status(result)
status = self._last_job_status()

if not job_running and status in {
HaJobStates.RUNNING,
HaJobStates.ERRORED,
JobState.STARTING,
JobState.RUNNING,
}:
logger.info(
f"... job started ...({int(now - start_time)} seconds) ..."
Expand All @@ -223,10 +205,10 @@ def _wait_job(self, timeout: int):
time.sleep(1)

result = self._get_job_status_with_retry()
self.save_last_job_result(result)
self.save_last_job_status(result)
status = self._last_job_status()
assert status in terminal_state
if status == HaJobStates.TERMINATED and not job_running:
if status == JobState.FAILED and not job_running:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return code -4 produced but never handled

High Severity

_wait_job generates retcode = -4 when status == JobState.FAILED and not job_running, but _handle_command_output only handles -1 and -2. The old code had an explicit handler for -4 that raised JobTerminatedBeforeStartError. Now -4 falls through to the S3/log-parsing path, which will fail (since the job never ran) and raise a misleading JobNoLogsError instead of a proper infrastructure error.

Additional Locations (1)

Fix in Cursor Fix in Web

# Soft infra error
retcode = -4
else:
Expand Down
Loading