Skip to content

Commit 2483350

Browse files
committed
[release test] convert job_id and job_url into methods
and saves the job ID in `_job_id`. this makes the information flow clearer and simpler. this is preparation for refactoring the job sdk usage. Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
1 parent d2b55a4 commit 2483350

File tree

3 files changed

+24
-24
lines changed

3 files changed

+24
-24
lines changed

release/ray_release/command_runner/anyscale_job_runner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,9 @@ def fetch_output(self) -> Dict[str, Any]:
429429
return self._fetch_json(
430430
_join_cloud_storage_paths(self.path_in_bucket, self.output_json),
431431
)
432+
433+
def job_url(self) -> Optional[str]:
434+
return self.job_manager.job_url()
435+
436+
def job_id(self) -> Optional[str]:
437+
return self.job_manager.job_id()

release/ray_release/glue.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,9 @@ def run_release_test_anyscale(
560560

561561
# Obtain the cluster info again as it is set after the
562562
# command was run in case of anyscale jobs
563-
if isinstance(command_runner, AnyscaleJobRunner):
564-
result.job_url = command_runner.job_manager.job_url
565-
result.job_id = command_runner.job_manager.job_id
566-
567-
result.last_logs = command_runner.get_last_logs() if command_runner else None
563+
result.job_url = command_runner.job_url()
564+
result.job_id = command_runner.job_id()
565+
result.last_logs = command_runner.get_last_logs()
568566

569567
reset_signal_handling()
570568

release/ray_release/job_manager/anyscale_job_manager.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def __init__(self, cluster_manager: ClusterManager):
3838
self.counter = 0
3939
self.cluster_manager = cluster_manager
4040
self._last_job_result = None
41+
self._job_id: Optional[str] = None
4142
self._last_logs = None
4243
self.cluster_startup_timeout = 600
4344
self._duration = None
@@ -92,7 +93,7 @@ def _run_job(
9293
self.last_job_result = job_response.result
9394
self.start_time = time.time()
9495

95-
logger.info(f"Link to job: " f"{format_link(self.job_url)}")
96+
logger.info(f"Link to job: " f"{format_link(self.job_url())}")
9697
return
9798

9899
@property
@@ -106,18 +107,15 @@ def last_job_result(self):
106107
@last_job_result.setter
107108
def last_job_result(self, value):
108109
self._last_job_result = value
110+
self._job_id = value.id if value else None
109111

110-
@property
111112
def job_id(self) -> Optional[str]:
112-
if not self.last_job_result:
113-
return None
114-
return self.last_job_result.id
113+
return self._job_id
115114

116-
@property
117115
def job_url(self) -> Optional[str]:
118-
if not self.job_id:
116+
if not self._job_id:
119117
return None
120-
return anyscale_job_url(self.job_id)
118+
return anyscale_job_url(self._job_id)
121119

122120
@property
123121
def last_job_status(self) -> Optional[HaJobStates]:
@@ -132,7 +130,7 @@ def in_progress(self) -> bool:
132130
def _get_job_status_with_retry(self):
133131
anyscale_client = self.cluster_manager.sdk
134132
return exponential_backoff_retry(
135-
lambda: anyscale_client.get_production_job(self.job_id),
133+
lambda: anyscale_client.get_production_job(self._job_id),
136134
retry_exceptions=Exception,
137135
initial_retry_delay_s=1,
138136
max_retries=3,
@@ -141,12 +139,12 @@ def _get_job_status_with_retry(self):
141139
def _terminate_job(self, raise_exceptions: bool = False):
142140
if not self.in_progress:
143141
return
144-
logger.info(f"Terminating job {self.job_id}...")
142+
logger.info(f"Terminating job {self._job_id}...")
145143
try:
146-
self.sdk.terminate_job(self.job_id)
147-
logger.info(f"Job {self.job_id} terminated!")
144+
self.sdk.terminate_job(self._job_id)
145+
logger.info(f"Job {self._job_id} terminated!")
148146
except Exception:
149-
msg = f"Couldn't terminate job {self.job_id}!"
147+
msg = f"Couldn't terminate job {self._job_id}!"
150148
if raise_exceptions:
151149
logger.error(msg)
152150
raise
@@ -258,17 +256,15 @@ def run_and_wait(
258256
return self._wait_job(timeout)
259257

260258
def _get_ray_logs(self) -> str:
261-
"""
262-
Obtain the last few logs
263-
"""
259+
"""Obtain the last few logs."""
264260
if self.cluster_manager.log_streaming_limit == -1:
265-
return anyscale.job.get_logs(id=self.job_id)
261+
return anyscale.job.get_logs(id=self._job_id)
266262
return anyscale.job.get_logs(
267-
id=self.job_id, max_lines=self.cluster_manager.log_streaming_limit
263+
id=self._job_id, max_lines=self.cluster_manager.log_streaming_limit
268264
)
269265

270266
def get_last_logs(self):
271-
if not self.job_id:
267+
if not self._job_id:
272268
raise RuntimeError(
273269
"Job has not been started, therefore there are no logs to obtain."
274270
)

0 commit comments

Comments
 (0)