-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[release test] use new sdk to get job state #60821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,8 +18,6 @@ | |||||||||||
| JobBrokenError, | ||||||||||||
| JobNoLogsError, | ||||||||||||
| JobOutOfRetriesError, | ||||||||||||
| JobTerminatedBeforeStartError, | ||||||||||||
| JobTerminatedError, | ||||||||||||
| LogsError, | ||||||||||||
| PrepareCommandError, | ||||||||||||
| PrepareCommandTimeout, | ||||||||||||
|
|
@@ -158,26 +156,15 @@ def wait_for_nodes(self, num_nodes: int, timeout: float = 900): | |||||||||||
| f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30 | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| def _handle_command_output( | ||||||||||||
| self, job_status_code: int, error: str, raise_on_timeout: bool = True | ||||||||||||
| ): | ||||||||||||
| if job_status_code == -2: | ||||||||||||
| raise JobBrokenError(f"Job state is 'BROKEN' with error:\n{error}\n") | ||||||||||||
|
|
||||||||||||
| if job_status_code == -3: | ||||||||||||
| raise JobTerminatedError( | ||||||||||||
| "Job entered 'TERMINATED' state (it was terminated " | ||||||||||||
| "manually or Ray was stopped):" | ||||||||||||
| f"\n{error}\n" | ||||||||||||
| def _handle_command_output(self, job_state: int, raise_on_timeout: bool = True): | ||||||||||||
| if job_state == -1: | ||||||||||||
| raise JobOutOfRetriesError( | ||||||||||||
| "Job returned non-success state: 'FAILED' " | ||||||||||||
| "(command has not been ran or no logs could have been obtained)." | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| if job_status_code == -4: | ||||||||||||
| raise JobTerminatedBeforeStartError( | ||||||||||||
| "Job entered 'TERMINATED' state before it started " | ||||||||||||
| "(most likely due to inability to provision required nodes; " | ||||||||||||
| "otherwise it was terminated manually or Ray was stopped):" | ||||||||||||
| f"\n{error}\n" | ||||||||||||
| ) | ||||||||||||
| if job_state == -2: | ||||||||||||
| raise JobBrokenError("Job state is 'UNKNOWN'.") | ||||||||||||
|
Comment on lines
+166
to
+167
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The migration to the new SDK seems to have removed the detailed error message that was previously included in this exception. To maintain debuggability, could you please fetch and include the job logs in the exception message? This will be very helpful for diagnosing failures.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled return code -4 for soft infra errorsHigh Severity The Additional Locations (1) |
||||||||||||
|
|
||||||||||||
| # First try to obtain the output.json from S3. | ||||||||||||
| # If that fails, try logs. | ||||||||||||
|
|
@@ -214,8 +201,7 @@ def _handle_command_output( | |||||||||||
| ) | ||||||||||||
| raise PrepareCommandError( | ||||||||||||
| f"Prepare command '{self.prepare_commands[-1]}' returned " | ||||||||||||
| f"non-success status: {prepare_return_codes[-1]} with error:" | ||||||||||||
| f"\n{error}\n" | ||||||||||||
| f"non-success status: {prepare_return_codes[-1]}." | ||||||||||||
| ) | ||||||||||||
| else: | ||||||||||||
| raise JobNoLogsError("Could not obtain logs for the job.") | ||||||||||||
|
|
@@ -231,15 +217,7 @@ def _handle_command_output( | |||||||||||
|
|
||||||||||||
| if workload_status_code is not None and workload_status_code != 0: | ||||||||||||
| raise TestCommandError( | ||||||||||||
| f"Command returned non-success status: {workload_status_code} with " | ||||||||||||
| f"error:\n{error}\n" | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| if job_status_code == -1: | ||||||||||||
| raise JobOutOfRetriesError( | ||||||||||||
| "Job returned non-success state: 'OUT_OF_RETRIES' " | ||||||||||||
| "(command has not been ran or no logs could have been obtained) " | ||||||||||||
| f"with error:\n{error}\n" | ||||||||||||
| f"Command returned non-success status: {workload_status_code}." | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| def _get_full_command_env(self, env: Optional[Dict[str, str]] = None): | ||||||||||||
|
|
@@ -348,18 +326,14 @@ def run_command( | |||||||||||
| working_dir = azure_file_path | ||||||||||||
| logger.info(f"Working dir uploaded to {working_dir}") | ||||||||||||
|
|
||||||||||||
| job_status_code, time_taken = self.job_manager.run_and_wait( | ||||||||||||
| job_state, time_taken = self.job_manager.run_and_wait( | ||||||||||||
| full_command, | ||||||||||||
| full_env, | ||||||||||||
| working_dir=working_dir, | ||||||||||||
| upload_path=self.upload_path, | ||||||||||||
| timeout=int(timeout), | ||||||||||||
| ) | ||||||||||||
| error_message = self.job_manager.job_error_message() | ||||||||||||
|
|
||||||||||||
| self._handle_command_output( | ||||||||||||
| job_status_code, error_message, raise_on_timeout=raise_on_timeout | ||||||||||||
| ) | ||||||||||||
| self._handle_command_output(job_state, raise_on_timeout=raise_on_timeout) | ||||||||||||
|
|
||||||||||||
| return time_taken | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,10 @@ | |
| from typing import Any, Dict, Optional, Tuple | ||
|
|
||
| import anyscale | ||
| from anyscale.job.models import JobState | ||
| from anyscale.sdk.anyscale_client.models import ( | ||
| CreateProductionJob, | ||
| CreateProductionJobConfig, | ||
| HaJobStates, | ||
| ) | ||
|
|
||
| from ray_release.anyscale_util import LAST_LOGS_LENGTH | ||
|
|
@@ -25,10 +25,9 @@ | |
| ) | ||
|
|
||
| job_status_to_return_code = { | ||
| HaJobStates.SUCCESS: 0, | ||
| HaJobStates.OUT_OF_RETRIES: -1, | ||
| HaJobStates.BROKEN: -2, | ||
| HaJobStates.TERMINATED: -3, | ||
| JobState.SUCCEEDED: 0, | ||
| JobState.FAILED: -1, | ||
| JobState.UNKNOWN: -2, | ||
| } | ||
| terminal_state = set(job_status_to_return_code.keys()) | ||
|
|
||
|
|
@@ -90,15 +89,17 @@ def _run_job( | |
| f"{e}" | ||
| ) from e | ||
|
|
||
| self.save_last_job_result(job_response.result) | ||
| self._job_id = job_response.result.id | ||
| self._last_job_result = None | ||
| self.start_time = time.time() | ||
|
|
||
| logger.info(f"Link to job: " f"{format_link(self.job_url())}") | ||
| return | ||
|
|
||
| def save_last_job_result(self, value): | ||
| self._last_job_result = value | ||
| self._job_id = value.id if value else None | ||
| def save_last_job_status(self, status): | ||
| if status and hasattr(status, "id") and status.id != self._job_id: | ||
| logger.warning(f"Job ID mismatch: expected {self._job_id}, got {status.id}") | ||
| self._last_job_result = status | ||
|
|
||
| def job_id(self) -> Optional[str]: | ||
| return self._job_id | ||
|
|
@@ -108,15 +109,10 @@ def job_url(self) -> Optional[str]: | |
| return None | ||
| return anyscale_job_url(self._job_id) | ||
|
|
||
| def _last_job_status(self) -> Optional[HaJobStates]: | ||
| def _last_job_status(self) -> Optional["JobState"]: | ||
| if not self._last_job_result: | ||
| return None | ||
| return self._last_job_result.state.current_state | ||
|
|
||
| def job_error_message(self) -> str: | ||
| if self._last_job_result is None: | ||
| return "" | ||
| return self._last_job_result.state.error | ||
| return self._last_job_result.state | ||
|
|
||
| def _in_progress(self) -> bool: | ||
| if not self._last_job_result: | ||
|
|
@@ -125,14 +121,16 @@ def _in_progress(self) -> bool: | |
|
|
||
| def _get_job_status_with_retry(self): | ||
| return exponential_backoff_retry( | ||
| lambda: self._sdk.get_production_job(self._job_id), | ||
| lambda: anyscale.job.status(id=self._job_id), | ||
| retry_exceptions=Exception, | ||
| initial_retry_delay_s=1, | ||
| max_retries=3, | ||
| ).result | ||
| ) | ||
|
|
||
| def _terminate_job(self, raise_exceptions: bool = False): | ||
| if not self._in_progress(): | ||
| if not self._job_id: | ||
| return | ||
| if self._last_job_result is not None and not self._in_progress(): | ||
| return | ||
| logger.info(f"Terminating job {self._job_id}...") | ||
| try: | ||
|
|
@@ -202,12 +200,12 @@ def _wait_job(self, timeout: int): | |
| next_status += 30 | ||
|
|
||
| result = self._get_job_status_with_retry() | ||
| self.save_last_job_result(result) | ||
| self.save_last_job_status(result) | ||
| status = self._last_job_status() | ||
|
|
||
| if not job_running and status in { | ||
| HaJobStates.RUNNING, | ||
| HaJobStates.ERRORED, | ||
| JobState.STARTING, | ||
| JobState.RUNNING, | ||
| }: | ||
| logger.info( | ||
| f"... job started ...({int(now - start_time)} seconds) ..." | ||
|
|
@@ -223,10 +221,10 @@ def _wait_job(self, timeout: int): | |
| time.sleep(1) | ||
|
|
||
| result = self._get_job_status_with_retry() | ||
| self.save_last_job_result(result) | ||
| self.save_last_job_status(result) | ||
| status = self._last_job_status() | ||
| assert status in terminal_state | ||
| if status == HaJobStates.TERMINATED and not job_running: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return code -4 generated but never handledHigh Severity
Additional Locations (1) |
||
| if status == JobState.FAILED and not job_running: | ||
| # Soft infra error | ||
| retcode = -4 | ||
| else: | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Early -1 check skips output analysis and has wrong message
Medium Severity
The check for
job_state == -1was moved from the end of_handle_command_outputto the beginning, changing the error-handling behavior. In the original code, even when the job returned-1(OUT_OF_RETRIES), the function would still fetchoutput.jsonand check for specific errors likePrepareCommandErrororTestCommandError, only raisingJobOutOfRetriesErroras a fallback. Now it raises immediately without analyzing output. Additionally, the error message says "command has not been ran" but-1is only returned whenjob_running == True(the job did start), making the message inaccurate.