Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions release/ray_release/command_runner/anyscale_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,9 @@ def fetch_output(self) -> Dict[str, Any]:
return self._fetch_json(
_join_cloud_storage_paths(self.path_in_bucket, self.output_json),
)

def job_url(self) -> Optional[str]:
return self.job_manager.job_url()

def job_id(self) -> Optional[str]:
return self.job_manager.job_id()
4 changes: 2 additions & 2 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ def run_release_test_anyscale(

# Obtain the cluster info again as it is set after the
# command was run in case of anyscale jobs
result.job_url = runner.job_manager.job_url
result.job_id = runner.job_manager.job_id
result.job_url = runner.job_url()
result.job_id = runner.job_id()
result.last_logs = runner.get_last_logs()

except Exception as e:
Expand Down
30 changes: 14 additions & 16 deletions release/ray_release/job_manager/anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, cluster_manager: ClusterManager):
self.cluster_manager = cluster_manager
self._sdk = cluster_manager.sdk
self._last_job_result = None
self._job_id: Optional[str] = None
self._last_logs = None
self.cluster_startup_timeout = 600
self._duration = None
Expand Down Expand Up @@ -92,7 +93,7 @@ def _run_job(
self.last_job_result = job_response.result
self.start_time = time.time()

logger.info(f"Link to job: " f"{format_link(self.job_url)}")
logger.info(f"Link to job: " f"{format_link(self.job_url())}")
return

@property
Expand All @@ -102,18 +103,15 @@ def last_job_result(self):
@last_job_result.setter
def last_job_result(self, value):
self._last_job_result = value
self._job_id = value.id if value else None

@property
def job_id(self) -> Optional[str]:
if not self.last_job_result:
return None
return self.last_job_result.id
return self._job_id

@property
def job_url(self) -> Optional[str]:
if not self.job_id:
if not self._job_id:
return None
return anyscale_job_url(self.job_id)
return anyscale_job_url(self._job_id)

@property
def last_job_status(self) -> Optional[HaJobStates]:
Expand All @@ -127,7 +125,7 @@ def in_progress(self) -> bool:

def _get_job_status_with_retry(self):
return exponential_backoff_retry(
lambda: self._sdk.get_production_job(self.job_id),
lambda: self._sdk.get_production_job(self._job_id),
retry_exceptions=Exception,
initial_retry_delay_s=1,
max_retries=3,
Expand All @@ -136,12 +134,12 @@ def _get_job_status_with_retry(self):
def _terminate_job(self, raise_exceptions: bool = False):
if not self.in_progress:
return
logger.info(f"Terminating job {self.job_id}...")
logger.info(f"Terminating job {self._job_id}...")
try:
self._sdk.terminate_job(self.job_id)
logger.info(f"Job {self.job_id} terminated!")
self._sdk.terminate_job(self._job_id)
logger.info(f"Job {self._job_id} terminated!")
except Exception:
msg = f"Couldn't terminate job {self.job_id}!"
msg = f"Couldn't terminate job {self._job_id}!"
if raise_exceptions:
logger.error(msg)
raise
Expand Down Expand Up @@ -172,7 +170,7 @@ def _wait_job(self, timeout: int):
# The context ensures the job always either finishes normally
# or is terminated.
with self._terminate_job_context():
assert self.job_id, "Job must have been started"
assert self._job_id, "Job must have been started"

start_time = time.monotonic()
# Waiting for cluster needs to be a part of the whole
Expand Down Expand Up @@ -254,10 +252,10 @@ def run_and_wait(

def _get_ray_logs(self) -> str:
"""Obtain the last few log"""
return anyscale.job.get_logs(id=self.job_id, max_lines=LAST_LOGS_LENGTH)
return anyscale.job.get_logs(id=self._job_id, max_lines=LAST_LOGS_LENGTH)

def get_last_logs(self):
if not self.job_id:
if not self._job_id:
raise RuntimeError(
"Job has not been started, therefore there are no logs to obtain."
)
Expand Down
2 changes: 1 addition & 1 deletion release/ray_release/tests/test_anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_get_last_logs_long_running_job():
)
anyscale_job_manager = AnyscaleJobManager(cluster_manager=cluster_manager)
anyscale_job_manager._duration = 4 * 3_600 + 1
anyscale_job_manager._last_job_result = FakeJobResult(_id="foo")
anyscale_job_manager.last_job_result = FakeJobResult(_id="foo")
assert anyscale_job_manager.get_last_logs() is None


Expand Down