@@ -40,6 +40,7 @@ def __init__(self, cluster_manager: ClusterManager):
4040 self .cluster_manager = cluster_manager
4141 self ._sdk = cluster_manager .sdk
4242 self ._last_job_result = None
43+ self ._job_id : Optional [str ] = None
4344 self ._last_logs = None
4445 self .cluster_startup_timeout = 600
4546 self ._duration = None
@@ -92,7 +93,7 @@ def _run_job(
9293 self .last_job_result = job_response .result
9394 self .start_time = time .time ()
9495
95- logger .info (f"Link to job: " f"{ format_link (self .job_url )} " )
96+ logger .info (f"Link to job: " f"{ format_link (self .job_url () )} " )
9697 return
9798
9899 @property
@@ -102,18 +103,15 @@ def last_job_result(self):
102103 @last_job_result .setter
103104 def last_job_result (self , value ):
104105 self ._last_job_result = value
106+ self ._job_id = value .id if value else None
105107
106- @property
107108 def job_id (self ) -> Optional [str ]:
108- if not self .last_job_result :
109- return None
110- return self .last_job_result .id
109+ return self ._job_id
111110
112- @property
113111 def job_url (self ) -> Optional [str ]:
114- if not self .job_id :
112+ if not self ._job_id :
115113 return None
116- return anyscale_job_url (self .job_id )
114+ return anyscale_job_url (self ._job_id )
117115
118116 @property
119117 def last_job_status (self ) -> Optional [HaJobStates ]:
@@ -127,7 +125,7 @@ def in_progress(self) -> bool:
127125
128126 def _get_job_status_with_retry (self ):
129127 return exponential_backoff_retry (
130- lambda : self ._sdk .get_production_job (self .job_id ),
128+ lambda : self ._sdk .get_production_job (self ._job_id ),
131129 retry_exceptions = Exception ,
132130 initial_retry_delay_s = 1 ,
133131 max_retries = 3 ,
@@ -136,12 +134,12 @@ def _get_job_status_with_retry(self):
136134 def _terminate_job (self , raise_exceptions : bool = False ):
137135 if not self .in_progress :
138136 return
139- logger .info (f"Terminating job { self .job_id } ..." )
137+ logger .info (f"Terminating job { self ._job_id } ..." )
140138 try :
141- self ._sdk .terminate_job (self .job_id )
142- logger .info (f"Job { self .job_id } terminated!" )
139+ self ._sdk .terminate_job (self ._job_id )
140+ logger .info (f"Job { self ._job_id } terminated!" )
143141 except Exception :
144- msg = f"Couldn't terminate job { self .job_id } !"
142+ msg = f"Couldn't terminate job { self ._job_id } !"
145143 if raise_exceptions :
146144 logger .error (msg )
147145 raise
@@ -172,7 +170,7 @@ def _wait_job(self, timeout: int):
172170 # The context ensures the job always either finishes normally
173171 # or is terminated.
174172 with self ._terminate_job_context ():
175- assert self .job_id , "Job must have been started"
173+ assert self ._job_id , "Job must have been started"
176174
177175 start_time = time .monotonic ()
178176 # Waiting for cluster needs to be a part of the whole
@@ -254,10 +252,10 @@ def run_and_wait(
254252
255253 def _get_ray_logs (self ) -> str :
256254 """Obtain the last few log"""
257- return anyscale .job .get_logs (id = self .job_id , max_lines = LAST_LOGS_LENGTH )
255+ return anyscale .job .get_logs (id = self ._job_id , max_lines = LAST_LOGS_LENGTH )
258256
259257 def get_last_logs (self ):
260- if not self .job_id :
258+ if not self ._job_id :
261259 raise RuntimeError (
262260 "Job has not been started, therefore there are no logs to obtain."
263261 )
0 commit comments