Skip to content

Commit abc81ab

Browse files
cg505Michaelvll
andauthored
[jobs] catch jobs that are DONE but non-terminal (#4641)
* [jobs] catch jobs that are DONE but non-terminal Usually, a job should always transition to a terminal status before it is set to DONE. However, if a bug makes that fail for some reason (e.g. the issues fixed in #4637), we should still catch this state. * address PR comments * address more PR comments * Update sky/jobs/utils.py Co-authored-by: Zhanghao Wu <[email protected]> * Update sky/jobs/utils.py Co-authored-by: Zhanghao Wu <[email protected]> --------- Co-authored-by: Zhanghao Wu <[email protected]>
1 parent 3e9bd9d commit abc81ab

File tree

3 files changed

+39
-8
lines changed

3 files changed

+39
-8
lines changed

sky/jobs/state.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -758,8 +758,9 @@ def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:
758758
job_id: Optional job ID to check. If None, checks all jobs.
759759
760760
Returns a list of job_ids, including the following:
761-
- For jobs with schedule state: jobs that have schedule state not DONE
762-
- For legacy jobs (no schedule state): jobs that are in non-terminal status
761+
- Jobs that have a schedule_state that is not DONE
762+
- Jobs have schedule_state DONE but are in a non-terminal status
763+
- Legacy jobs (that is, no schedule state) that are in non-terminal status
763764
"""
764765
job_filter = '' if job_id is None else 'AND spot.spot_job_id=(?)'
765766
job_value = () if job_id is None else (job_id,)
@@ -772,7 +773,9 @@ def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:
772773

773774
# Get jobs that are either:
774775
# 1. Have schedule state that is not DONE, or
775-
# 2. Have no schedule state (legacy) AND are in non-terminal status
776+
# 2. Have schedule state DONE AND are in non-terminal status (unexpected
777+
# inconsistent state), or
778+
# 3. Have no schedule state (legacy) AND are in non-terminal status
776779
with db_utils.safe_cursor(_DB_PATH) as cursor:
777780
rows = cursor.execute(
778781
f"""\
@@ -781,14 +784,23 @@ def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:
781784
LEFT OUTER JOIN job_info
782785
ON spot.spot_job_id=job_info.spot_job_id
783786
WHERE (
787+
-- non-legacy jobs that are not DONE
784788
(job_info.schedule_state IS NOT NULL AND
785789
job_info.schedule_state IS NOT ?)
786790
OR
787-
(job_info.schedule_state IS NULL AND
791+
-- legacy or that are in non-terminal status or
792+
-- DONE jobs that are in non-terminal status
793+
((-- legacy jobs
794+
job_info.schedule_state IS NULL OR
795+
-- non-legacy DONE jobs
796+
job_info.schedule_state IS ?
797+
) AND
798+
-- non-terminal
788799
status NOT IN ({status_filter_str}))
789800
)
790801
{job_filter}
791802
ORDER BY spot.spot_job_id DESC""", [
803+
ManagedJobScheduleState.DONE.value,
792804
ManagedJobScheduleState.DONE.value, *terminal_status_values,
793805
*job_value
794806
]).fetchall()

sky/jobs/utils.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,35 @@ def _handle_legacy_job(job_id: int):
245245
schedule_state = tasks[0]['schedule_state']
246246

247247
# Backwards compatibility: this job was submitted when ray was still
248-
# used for managing the parallelism of job controllers.
248+
# used for managing the parallelism of job controllers, before #4485.
249249
# TODO(cooperc): Remove before 0.11.0.
250250
if (schedule_state is
251251
managed_job_state.ManagedJobScheduleState.INVALID):
252252
_handle_legacy_job(job_id)
253253
continue
254254

255-
# For jobs with schedule state:
255+
# Handle jobs with schedule state (non-legacy jobs):
256256
pid = tasks[0]['controller_pid']
257-
if pid is None:
257+
if schedule_state == managed_job_state.ManagedJobScheduleState.DONE:
258+
# There are two cases where we could get a job that is DONE.
259+
# 1. At query time (get_jobs_to_check_status), the job was not yet
260+
# DONE, but since then (before get_managed_jobs is called) it has
261+
# hit a terminal status, marked itself done, and exited. This is
262+
# fine.
263+
# 2. The job is DONE, but in a non-terminal status. This is
264+
# unexpected. For instance, the task status is RUNNING, but the
265+
# job schedule_state is DONE.
266+
if all(task['status'].is_terminal() for task in tasks):
267+
# Turns out this job is fine, even though it got pulled by
268+
# get_jobs_to_check_status. Probably case #1 above.
269+
continue
270+
271+
logger.error(f'Job {job_id} has DONE schedule state, but some '
272+
f'tasks are not terminal. Task statuses: '
273+
f'{", ".join(task["status"].value for task in tasks)}')
274+
failure_reason = ('Inconsistent internal job state. This is a bug.')
275+
elif pid is None:
276+
# Non-legacy job and controller process has not yet started.
258277
if schedule_state in (
259278
managed_job_state.ManagedJobScheduleState.INACTIVE,
260279
managed_job_state.ManagedJobScheduleState.WAITING):

sky/skylet/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
# cluster yaml is updated.
8787
#
8888
# TODO(zongheng,zhanghao): make the upgrading of skylet automatic?
89-
SKYLET_VERSION = '11'
89+
SKYLET_VERSION = '12'
9090
# The version of the lib files that skylet/jobs use. Whenever there is an API
9191
# change for the job_lib or log_lib, we need to bump this version, so that the
9292
# user can be notified to update their SkyPilot version on the remote cluster.

0 commit comments

Comments
 (0)