Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 32 additions & 37 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.cluster_manager.minimal import MinimalClusterManager
from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
from ray_release.command_runner.command_runner import CommandRunner
from ray_release.config import (
DEFAULT_AUTOSUSPEND_MINS,
DEFAULT_BUILD_TIMEOUT,
Expand Down Expand Up @@ -80,7 +79,7 @@ def _load_test_configuration(
anyscale_project: str,
result: Result,
smoke_test: bool = False,
) -> Tuple[ClusterManager, CommandRunner, str]:
) -> Tuple[ClusterManager, AnyscaleJobRunner, str]:
logger.info(f"Test config: {test}")

# Populate result paramaters
Expand Down Expand Up @@ -136,6 +135,9 @@ def _load_test_configuration(
except Exception as e:
raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e

if not isinstance(command_runner, AnyscaleJobRunner):
raise ReleaseTestSetupError("Command runner is not an AnyscaleJobRunner")

return cluster_manager, command_runner, artifact_path


Expand Down Expand Up @@ -221,32 +223,27 @@ def _setup_cluster_environment(
return prepare_cmd, prepare_timeout, build_timeout, cluster_timeout, command_timeout


def _local_environment_information(
result: Result,
def _build_local_environment_information(
cluster_manager: ClusterManager,
command_runner: CommandRunner,
runner: AnyscaleJobRunner,
build_timeout: int,
cluster_timeout: int,
cluster_env_id: Optional[str],
) -> None:
# Start cluster
buildkite_group(":gear: Building cluster environment")

cluster_manager.cluster_env_id = cluster_env_id

cluster_manager.build_configs(timeout=build_timeout)

if isinstance(command_runner, AnyscaleJobRunner):
command_runner.job_manager.cluster_startup_timeout = cluster_timeout
runner.job_manager.cluster_startup_timeout = cluster_timeout


def _prepare_remote_environment(
test: Test,
command_runner: CommandRunner,
runner: AnyscaleJobRunner,
prepare_cmd: bool,
prepare_timeout: int,
) -> None:
command_runner.prepare_remote_env()
runner.prepare_remote_env()

wait_for_nodes = test["run"].get("wait_for_nodes", None)

Expand All @@ -257,11 +254,11 @@ def _prepare_remote_environment(
wait_for_nodes.get("timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT)
)
num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
command_runner.wait_for_nodes(num_nodes, wait_timeout)
runner.wait_for_nodes(num_nodes, wait_timeout)

if prepare_cmd:
try:
command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
except CommandError as e:
raise PrepareCommandError(e)
except CommandTimeout as e:
Expand Down Expand Up @@ -293,7 +290,7 @@ def _upload_working_dir_to_gcs(working_dir: str) -> str:
def _running_test_script(
test: Test,
smoke_test: bool,
command_runner: CommandRunner,
runner: AnyscaleJobRunner,
command_timeout: int,
) -> None:
command = test["run"]["script"]
Expand All @@ -306,7 +303,7 @@ def _running_test_script(
is_long_running = test["run"].get("long_running", False)

try:
command_runner.run_command(
runner.run_command(
command,
env=command_env,
timeout=command_timeout,
Expand All @@ -329,22 +326,22 @@ def _running_test_script(

def _fetching_results(
result: Result,
command_runner: CommandRunner,
runner: AnyscaleJobRunner,
artifact_path: Optional[str],
smoke_test: bool,
start_time_unix: int,
) -> Tuple[dict, Exception]:
fetch_result_exception = None
try:
command_results = command_runner.fetch_results()
command_results = runner.fetch_results()
except Exception as e:
logger.exception(f"Could not fetch results for test command: {e}")
command_results = {}
fetch_result_exception = e

if artifact_path:
try:
command_runner.fetch_artifact()
runner.fetch_artifact()
except Exception as e:
logger.error("Could not fetch artifact for test command")
logger.exception(e)
Expand All @@ -356,7 +353,7 @@ def _fetching_results(
)

try:
metrics = command_runner.fetch_metrics()
metrics = runner.fetch_metrics()
except Exception as e:
logger.exception(f"Could not fetch metrics for test command: {e}")
metrics = {}
Expand Down Expand Up @@ -462,15 +459,15 @@ def run_release_test_anyscale(
) -> Result:
old_wd = os.getcwd()
start_time = time.monotonic()
command_runner = None
runner = None
cluster_manager = None
pipeline_exception = None
# non critical for some tests. So separate it from the general one.
fetch_result_exception = None
try:

buildkite_group(":spiral_note_pad: Loading test configuration")
cluster_manager, command_runner, artifact_path = _load_test_configuration(
cluster_manager, runner, artifact_path = _load_test_configuration(
test,
anyscale_project,
result,
Expand Down Expand Up @@ -502,11 +499,10 @@ def run_release_test_anyscale(
test_definition_root,
)

buildkite_group(":bulb: Local environment information")
_local_environment_information(
result,
buildkite_group(":bulb: Building local environment information")
_build_local_environment_information(
cluster_manager,
command_runner,
runner,
build_timeout,
cluster_timeout,
cluster_env_id,
Expand All @@ -516,7 +512,7 @@ def run_release_test_anyscale(
buildkite_group(":wrench: Preparing remote environment")
_prepare_remote_environment(
test,
command_runner,
runner,
prepare_cmd,
prepare_timeout,
)
Expand All @@ -526,32 +522,31 @@ def run_release_test_anyscale(
_running_test_script(
test,
smoke_test,
command_runner,
runner,
command_timeout,
)

buildkite_group(":floppy_disk: Fetching results")
metrics, fetch_result_exception = _fetching_results(
result,
command_runner,
runner,
artifact_path,
smoke_test,
start_time_unix,
)

# Obtain the cluster info again as it is set after the
# command was run in case of anyscale jobs
result.job_url = runner.job_manager.job_url
result.job_id = runner.job_manager.job_id
result.last_logs = runner.get_last_logs()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diagnostic info lost when test execution fails

High Severity

Moving the code that sets result.job_url, result.job_id, and result.last_logs inside the try block means these diagnostic values won't be captured when an exception occurs during test execution. Previously, this code ran unconditionally after the try-except, ensuring job info and logs were available for debugging failed tests. Now, when tests fail (the most critical scenario for debugging), last_logs falls back to just the exception traceback instead of actual job logs, and job_url/job_id are not set at all.

Fix in Cursor Fix in Web


except Exception as e:
logger.exception(e)
buildkite_open_last()
pipeline_exception = e
metrics = {}

# Obtain the cluster info again as it is set after the
# command was run in case of anyscale jobs
if isinstance(command_runner, AnyscaleJobRunner):
result.job_url = command_runner.job_manager.job_url
result.job_id = command_runner.job_manager.job_id

result.last_logs = command_runner.get_last_logs() if command_runner else None

reset_signal_handling()

time_taken = time.monotonic() - start_time
Expand Down