Skip to content

Commit b7f9b07

Browse files
committed
[release test] convert job_id and job_url into methods
and saves the job ID in `_job_id`. this makes the information flow clearer and simpler. this is preparation for refactoring the job sdk usage. Signed-off-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
1 parent 5ce6a93 commit b7f9b07

File tree

4 files changed

+28
-25
lines changed

4 files changed

+28
-25
lines changed

release/ray_release/command_runner/anyscale_job_runner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,3 +449,9 @@ def fetch_output(self) -> Dict[str, Any]:
449449
return self._fetch_json(
450450
_join_cloud_storage_paths(self.path_in_bucket, self.output_json),
451451
)
452+
453+
def job_url(self) -> Optional[str]:
454+
return self.job_manager.job_url()
455+
456+
def job_id(self) -> Optional[str]:
457+
return self.job_manager.job_id()

release/ray_release/glue.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -538,20 +538,19 @@ def run_release_test_anyscale(
538538
smoke_test,
539539
start_time_unix,
540540
)
541+
542+
# Obtain the cluster info again as it is set after the
543+
# command was run in case of anyscale jobs
544+
result.job_url = command_runner.job_url()
545+
result.job_id = command_runner.job_id()
546+
result.last_logs = command_runner.get_last_logs()
547+
541548
except Exception as e:
542549
logger.exception(e)
543550
buildkite_open_last()
544551
pipeline_exception = e
545552
metrics = {}
546553

547-
# Obtain the cluster info again as it is set after the
548-
# command was run in case of anyscale jobs
549-
if isinstance(command_runner, AnyscaleJobRunner):
550-
result.job_url = command_runner.job_manager.job_url
551-
result.job_id = command_runner.job_manager.job_id
552-
553-
result.last_logs = command_runner.get_last_logs() if command_runner else None
554-
555554
reset_signal_handling()
556555

557556
time_taken = time.monotonic() - start_time

release/ray_release/job_manager/anyscale_job_manager.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, cluster_manager: ClusterManager):
4040
self.cluster_manager = cluster_manager
4141
self._sdk = cluster_manager.sdk
4242
self._last_job_result = None
43+
self._job_id: Optional[str] = None
4344
self._last_logs = None
4445
self.cluster_startup_timeout = 600
4546
self._duration = None
@@ -92,7 +93,7 @@ def _run_job(
9293
self.last_job_result = job_response.result
9394
self.start_time = time.time()
9495

95-
logger.info(f"Link to job: " f"{format_link(self.job_url)}")
96+
logger.info(f"Link to job: " f"{format_link(self.job_url())}")
9697
return
9798

9899
@property
@@ -102,18 +103,15 @@ def last_job_result(self):
102103
@last_job_result.setter
103104
def last_job_result(self, value):
104105
self._last_job_result = value
106+
self._job_id = value.id if value else None
105107

106-
@property
107108
def job_id(self) -> Optional[str]:
108-
if not self.last_job_result:
109-
return None
110-
return self.last_job_result.id
109+
return self._job_id
111110

112-
@property
113111
def job_url(self) -> Optional[str]:
114-
if not self.job_id:
112+
if not self._job_id:
115113
return None
116-
return anyscale_job_url(self.job_id)
114+
return anyscale_job_url(self._job_id)
117115

118116
@property
119117
def last_job_status(self) -> Optional[HaJobStates]:
@@ -127,7 +125,7 @@ def in_progress(self) -> bool:
127125

128126
def _get_job_status_with_retry(self):
129127
return exponential_backoff_retry(
130-
lambda: self._sdk.get_production_job(self.job_id),
128+
lambda: self._sdk.get_production_job(self._job_id),
131129
retry_exceptions=Exception,
132130
initial_retry_delay_s=1,
133131
max_retries=3,
@@ -136,12 +134,12 @@ def _get_job_status_with_retry(self):
136134
def _terminate_job(self, raise_exceptions: bool = False):
137135
if not self.in_progress:
138136
return
139-
logger.info(f"Terminating job {self.job_id}...")
137+
logger.info(f"Terminating job {self._job_id}...")
140138
try:
141-
self._sdk.terminate_job(self.job_id)
142-
logger.info(f"Job {self.job_id} terminated!")
139+
self._sdk.terminate_job(self._job_id)
140+
logger.info(f"Job {self._job_id} terminated!")
143141
except Exception:
144-
msg = f"Couldn't terminate job {self.job_id}!"
142+
msg = f"Couldn't terminate job {self._job_id}!"
145143
if raise_exceptions:
146144
logger.error(msg)
147145
raise
@@ -172,7 +170,7 @@ def _wait_job(self, timeout: int):
172170
# The context ensures the job always either finishes normally
173171
# or is terminated.
174172
with self._terminate_job_context():
175-
assert self.job_id, "Job must have been started"
173+
assert self._job_id, "Job must have been started"
176174

177175
start_time = time.monotonic()
178176
# Waiting for cluster needs to be a part of the whole
@@ -254,10 +252,10 @@ def run_and_wait(
254252

255253
def _get_ray_logs(self) -> str:
256254
"""Obtain the last few log"""
257-
return anyscale.job.get_logs(id=self.job_id, max_lines=LAST_LOGS_LENGTH)
255+
return anyscale.job.get_logs(id=self._job_id, max_lines=LAST_LOGS_LENGTH)
258256

259257
def get_last_logs(self):
260-
if not self.job_id:
258+
if not self._job_id:
261259
raise RuntimeError(
262260
"Job has not been started, therefore there are no logs to obtain."
263261
)

release/ray_release/tests/test_anyscale_job_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_get_last_logs_long_running_job():
3131
)
3232
anyscale_job_manager = AnyscaleJobManager(cluster_manager=cluster_manager)
3333
anyscale_job_manager._duration = 4 * 3_600 + 1
34-
anyscale_job_manager._last_job_result = FakeJobResult(_id="foo")
34+
anyscale_job_manager.last_job_result = FakeJobResult(_id="foo")
3535
assert anyscale_job_manager.get_last_logs() is None
3636

3737

0 commit comments

Comments
 (0)