Skip to content

Commit 026c7b4

Browse files
aslonnieclaude
andcommitted
[Release] Migrate job status polling from get_production_job to anyscale.job.status()
Replace the old SDK's `self._sdk.get_production_job()` with the new `anyscale.job.status()` API in AnyscaleJobManager. The new API uses `JobState` enums (SUCCEEDED/FAILED/RUNNING) instead of `HaJobStates`. The critical distinction between FAILED-before-running (infra error, retcode -4) and FAILED-after-running (test failure, retcode -1) is preserved. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
1 parent f3d444a commit 026c7b4

File tree

3 files changed

+39
-65
lines changed

3 files changed

+39
-65
lines changed

release/ray_release/command_runner/anyscale_job_runner.py

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
JobBrokenError,
1919
JobNoLogsError,
2020
JobOutOfRetriesError,
21-
JobTerminatedBeforeStartError,
22-
JobTerminatedError,
2321
LogsError,
2422
PrepareCommandError,
2523
PrepareCommandTimeout,
@@ -158,26 +156,15 @@ def wait_for_nodes(self, num_nodes: int, timeout: float = 900):
158156
f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30
159157
)
160158

161-
def _handle_command_output(
162-
self, job_status_code: int, error: str, raise_on_timeout: bool = True
163-
):
164-
if job_status_code == -2:
165-
raise JobBrokenError(f"Job state is 'BROKEN' with error:\n{error}\n")
166-
167-
if job_status_code == -3:
168-
raise JobTerminatedError(
169-
"Job entered 'TERMINATED' state (it was terminated "
170-
"manually or Ray was stopped):"
171-
f"\n{error}\n"
159+
def _handle_command_output(self, job_state: int, raise_on_timeout: bool = True):
160+
if job_state == -1:
161+
raise JobOutOfRetriesError(
162+
"Job returned non-success state: 'FAILED' "
163+
"(command has not been ran or no logs could have been obtained)."
172164
)
173165

174-
if job_status_code == -4:
175-
raise JobTerminatedBeforeStartError(
176-
"Job entered 'TERMINATED' state before it started "
177-
"(most likely due to inability to provision required nodes; "
178-
"otherwise it was terminated manually or Ray was stopped):"
179-
f"\n{error}\n"
180-
)
166+
if job_state == -2:
167+
raise JobBrokenError("Job state is 'UNKNOWN'.")
181168

182169
# First try to obtain the output.json from S3.
183170
# If that fails, try logs.
@@ -214,8 +201,7 @@ def _handle_command_output(
214201
)
215202
raise PrepareCommandError(
216203
f"Prepare command '{self.prepare_commands[-1]}' returned "
217-
f"non-success status: {prepare_return_codes[-1]} with error:"
218-
f"\n{error}\n"
204+
f"non-success status: {prepare_return_codes[-1]}."
219205
)
220206
else:
221207
raise JobNoLogsError("Could not obtain logs for the job.")
@@ -231,15 +217,7 @@ def _handle_command_output(
231217

232218
if workload_status_code is not None and workload_status_code != 0:
233219
raise TestCommandError(
234-
f"Command returned non-success status: {workload_status_code} with "
235-
f"error:\n{error}\n"
236-
)
237-
238-
if job_status_code == -1:
239-
raise JobOutOfRetriesError(
240-
"Job returned non-success state: 'OUT_OF_RETRIES' "
241-
"(command has not been ran or no logs could have been obtained) "
242-
f"with error:\n{error}\n"
220+
f"Command returned non-success status: {workload_status_code}."
243221
)
244222

245223
def _get_full_command_env(self, env: Optional[Dict[str, str]] = None):
@@ -348,18 +326,14 @@ def run_command(
348326
working_dir = azure_file_path
349327
logger.info(f"Working dir uploaded to {working_dir}")
350328

351-
job_status_code, time_taken = self.job_manager.run_and_wait(
329+
job_state, time_taken = self.job_manager.run_and_wait(
352330
full_command,
353331
full_env,
354332
working_dir=working_dir,
355333
upload_path=self.upload_path,
356334
timeout=int(timeout),
357335
)
358-
error_message = self.job_manager.job_error_message()
359-
360-
self._handle_command_output(
361-
job_status_code, error_message, raise_on_timeout=raise_on_timeout
362-
)
336+
self._handle_command_output(job_state, raise_on_timeout=raise_on_timeout)
363337

364338
return time_taken
365339

release/ray_release/job_manager/anyscale_job_manager.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
from typing import Any, Dict, Optional, Tuple
44

55
import anyscale
6+
from anyscale.job.models import JobState
67
from anyscale.sdk.anyscale_client.models import (
78
CreateProductionJob,
89
CreateProductionJobConfig,
9-
HaJobStates,
1010
)
1111

1212
from ray_release.anyscale_util import LAST_LOGS_LENGTH
@@ -25,10 +25,9 @@
2525
)
2626

2727
job_status_to_return_code = {
28-
HaJobStates.SUCCESS: 0,
29-
HaJobStates.OUT_OF_RETRIES: -1,
30-
HaJobStates.BROKEN: -2,
31-
HaJobStates.TERMINATED: -3,
28+
JobState.SUCCEEDED: 0,
29+
JobState.FAILED: -1,
30+
JobState.UNKNOWN: -2,
3231
}
3332
terminal_state = set(job_status_to_return_code.keys())
3433

@@ -90,15 +89,17 @@ def _run_job(
9089
f"{e}"
9190
) from e
9291

93-
self.save_last_job_result(job_response.result)
92+
self._job_id = job_response.result.id
93+
self._last_job_result = None
9494
self.start_time = time.time()
9595

9696
logger.info(f"Link to job: " f"{format_link(self.job_url())}")
9797
return
9898

99-
def save_last_job_result(self, value):
100-
self._last_job_result = value
101-
self._job_id = value.id if value else None
99+
def save_last_job_status(self, status):
100+
if status and hasattr(status, "id") and status.id != self._job_id:
101+
logger.warning(f"Job ID mismatch: expected {self._job_id}, got {status.id}")
102+
self._last_job_result = status
102103

103104
def job_id(self) -> Optional[str]:
104105
return self._job_id
@@ -108,15 +109,10 @@ def job_url(self) -> Optional[str]:
108109
return None
109110
return anyscale_job_url(self._job_id)
110111

111-
def _last_job_status(self) -> Optional[HaJobStates]:
112+
def _last_job_status(self) -> Optional["JobState"]:
112113
if not self._last_job_result:
113114
return None
114-
return self._last_job_result.state.current_state
115-
116-
def job_error_message(self) -> str:
117-
if self._last_job_result is None:
118-
return ""
119-
return self._last_job_result.state.error
115+
return self._last_job_result.state
120116

121117
def _in_progress(self) -> bool:
122118
if not self._last_job_result:
@@ -125,14 +121,16 @@ def _in_progress(self) -> bool:
125121

126122
def _get_job_status_with_retry(self):
127123
return exponential_backoff_retry(
128-
lambda: self._sdk.get_production_job(self._job_id),
124+
lambda: anyscale.job.status(id=self._job_id),
129125
retry_exceptions=Exception,
130126
initial_retry_delay_s=1,
131127
max_retries=3,
132-
).result
128+
)
133129

134130
def _terminate_job(self, raise_exceptions: bool = False):
135-
if not self._in_progress():
131+
if not self._job_id:
132+
return
133+
if self._last_job_result is not None and not self._in_progress():
136134
return
137135
logger.info(f"Terminating job {self._job_id}...")
138136
try:
@@ -202,12 +200,12 @@ def _wait_job(self, timeout: int):
202200
next_status += 30
203201

204202
result = self._get_job_status_with_retry()
205-
self.save_last_job_result(result)
203+
self.save_last_job_status(result)
206204
status = self._last_job_status()
207205

208206
if not job_running and status in {
209-
HaJobStates.RUNNING,
210-
HaJobStates.ERRORED,
207+
JobState.STARTING,
208+
JobState.RUNNING,
211209
}:
212210
logger.info(
213211
f"... job started ...({int(now - start_time)} seconds) ..."
@@ -223,10 +221,10 @@ def _wait_job(self, timeout: int):
223221
time.sleep(1)
224222

225223
result = self._get_job_status_with_retry()
226-
self.save_last_job_result(result)
224+
self.save_last_job_status(result)
227225
status = self._last_job_status()
228226
assert status in terminal_state
229-
if status == HaJobStates.TERMINATED and not job_running:
227+
if status == JobState.FAILED and not job_running:
230228
# Soft infra error
231229
retcode = -4
232230
else:

release/ray_release/tests/test_anyscale_job_manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import sys
22

33
import pytest
4+
from anyscale.job.models import JobState
45

56
from ray_release.anyscale_util import Anyscale
67
from ray_release.cluster_manager.cluster_manager import ClusterManager
78
from ray_release.job_manager.anyscale_job_manager import AnyscaleJobManager
89
from ray_release.test import Test
910

1011

11-
class FakeJobResult:
12-
def __init__(self, _id: str):
13-
self.id = _id
12+
class FakeJobStatus:
13+
def __init__(self, state: JobState):
14+
self.state = state
1415

1516

1617
class FakeSDK(Anyscale):
@@ -31,7 +32,8 @@ def test_get_last_logs_long_running_job():
3132
)
3233
anyscale_job_manager = AnyscaleJobManager(cluster_manager=cluster_manager)
3334
anyscale_job_manager._duration = 4 * 3_600 + 1
34-
anyscale_job_manager.save_last_job_result(FakeJobResult(_id="foo"))
35+
anyscale_job_manager._job_id = "foo"
36+
anyscale_job_manager.save_last_job_status(FakeJobStatus(state=JobState.SUCCEEDED))
3537
assert anyscale_job_manager.get_last_logs() is None
3638

3739

0 commit comments

Comments
 (0)