Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ async def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:
if task.run is None:
logger.info(f'Skip running task {task_id} ({task.name}) due to its '
'run commands being empty.')
# Note: `submitted_at` is recorded once when the task row is first
# inserted at PENDING (see `set_pending`), so this no-op task --
# which short-circuits straight to RUNNING/SUCCEEDED without going
# through STARTING -- still has a real submission time.
# Call set_started first to initialize columns in the state table,
# including start_at and last_recovery_at to avoid issues for
# uninitialized columns.
Expand Down
63 changes: 50 additions & 13 deletions sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,15 @@ def set_pending(
add_job_event(job_id, task_id, ManagedJobStatus.PENDING,
'Job submitted to queue')

# Record `submitted_at` once, when the task row is first inserted at
# PENDING (i.e. the moment the job entered the queue). Doing it here rather
# than at the PENDING -> STARTING transition ensures:
# - No-op tasks (`task.run is None`) that short-circuit straight to
# SUCCEEDED without ever going through STARTING still get a real
# submission time (previously NULL -> blank "Submitted" column).
# - Jobs that back off and re-enter STARTING keep their original
# submission time instead of having it clobbered with the latest
# attempt's timestamp.
engine = _db_manager.get_engine()
with orm.Session(engine) as session:
session.execute(
Expand All @@ -889,6 +898,7 @@ def set_pending(
task_name=task_name,
resources=resources_str,
metadata=metadata,
submitted_at=time.time(),
status=ManagedJobStatus.PENDING.value,
is_primary_in_job_group=is_primary_in_job_group,
))
Expand Down Expand Up @@ -1598,12 +1608,18 @@ def build_managed_jobs_with_filters_no_status_query(
if user_hashes is not None:
query = query.where(job_info_table.c.user_hash.in_(user_hashes))
if submitted_after is not None or submitted_before is not None:
# submitted_at is NULL until a job leaves PENDING (it is set at
# STARTING). For a still-active job that just means "not submitted
# yet", so treat it as submitted "now". A terminal job with no
# submitted_at never started (cancelled/failed before STARTING) and
# has no submission time, so leave it NULL to exclude it from the
# window rather than letting it masquerade as "now".
# submitted_at is now recorded when a job is first queued (see
# set_pending), so normal rows always have a non-NULL submitted_at
# and are filtered directly against the window.
#
# The NULL branches below only apply to legacy rows created before
# this change (pre-upgrade), which may still have a NULL
# submitted_at. For such a legacy job that is still active, NULL just
# means "submission time unknown / not yet recorded", so treat it as
# submitted "now" to keep it visible. A legacy terminal job with no
# submitted_at has no recoverable submission time, so leave it NULL to
# exclude it from the window rather than letting it masquerade as
# "now".
terminal_values = [
s.value for s in ManagedJobStatus.terminal_statuses()
]
Expand Down Expand Up @@ -1885,9 +1901,15 @@ def get_managed_jobs_with_filters(
if sort_column != spot_table.c.spot_job_id:
sort_column = sqlalchemy.func.max(sort_column)
if sort_order == 'asc':
job_ids_subquery = job_ids_subquery.order_by(sort_column.asc())
ordering = sort_column.asc()
else:
job_ids_subquery = job_ids_subquery.order_by(sort_column.desc())
ordering = sort_column.desc()
# Keep jobs without a `submitted_at` (e.g. cancelled/failed while
# still PENDING) deterministically at the end regardless of DB
# engine NULL-ordering semantics.
if sort_by == 'submitted_at':
ordering = ordering.nulls_last()
Comment thread
cg505 marked this conversation as resolved.
Outdated
job_ids_subquery = job_ids_subquery.order_by(ordering)
else:
# Default sort: job_id desc (newest first)
job_ids_subquery = job_ids_subquery.order_by(
Expand Down Expand Up @@ -1938,11 +1960,15 @@ def get_managed_jobs_with_filters(
if sort_by and sort_by in sort_field_map:
sort_column = sort_field_map[sort_by]
if sort_order == 'asc':
query = query.order_by(sort_column.asc(),
spot_table.c.task_id.asc())
ordering = sort_column.asc()
else:
query = query.order_by(sort_column.desc(),
spot_table.c.task_id.asc())
ordering = sort_column.desc()
# Keep jobs without a `submitted_at` (e.g. cancelled/failed while still
# PENDING) deterministically at the end regardless of DB engine
# NULL-ordering semantics.
if sort_by == 'submitted_at':
ordering = ordering.nulls_last()
query = query.order_by(ordering, spot_table.c.task_id.asc())
else:
# Default sort: job_id desc, task_id asc
query = query.order_by(spot_table.c.spot_job_id.desc(),
Expand Down Expand Up @@ -2825,7 +2851,18 @@ async def set_starting_async(job_id: int,
async def _op(session: sql_async.AsyncSession) -> int:
values = {
spot_table.c.resources: resources_str,
spot_table.c.submitted_at: submit_time,
# `submitted_at` is normally recorded once at PENDING (see
# `set_pending`), which is the canonical queue-entry time. Only
# backfill it here if it is missing (e.g. a legacy row created
# before that change) so we never clobber the original submission
# time on a (re-)entry into STARTING.
Comment thread
cg505 marked this conversation as resolved.
# TODO(v1.2): remove this COALESCE backcompat (~2 minor versions
Comment thread
cg505 marked this conversation as resolved.
Outdated
# after this change ships, i.e. after v1.2). It only backfills
# submitted_at for jobs created before submitted_at was recorded
# at queue-entry (set_pending); once no such rows remain,
# set_starting_async can stop writing submitted_at entirely.
spot_table.c.submitted_at: sqlalchemy.func.coalesce(
spot_table.c.submitted_at, submit_time),
spot_table.c.status: ManagedJobStatus.STARTING.value,
spot_table.c.run_timestamp: run_timestamp,
spot_table.c.specs: json.dumps(specs),
Expand Down
Loading
Loading