Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ async def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:
if task.run is None:
logger.info(f'Skip running task {task_id} ({task.name}) due to its '
'run commands being empty.')
# Note: `submitted_at` is recorded once when the task row is first
# inserted at PENDING (see `set_pending`), so this no-op task --
# which short-circuits straight to RUNNING/SUCCEEDED without going
# through STARTING -- still has a real submission time.
# Call set_started first to initialize columns in the state table,
# including start_at and last_recovery_at to avoid issues for
# uninitialized columns.
Expand Down
41 changes: 34 additions & 7 deletions sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,15 @@ def set_pending(
add_job_event(job_id, task_id, ManagedJobStatus.PENDING,
'Job submitted to queue')

# Record `submitted_at` once, when the task row is first inserted at
# PENDING (i.e. the moment the job entered the queue). Doing it here rather
# than at the PENDING -> STARTING transition ensures:
# - No-op tasks (`task.run is None`) that short-circuit straight to
# SUCCEEDED without ever going through STARTING still get a real
# submission time (previously NULL -> blank "Submitted" column).
# - Jobs that back off and re-enter STARTING keep their original
# submission time instead of having it clobbered with the latest
# attempt's timestamp.
engine = _db_manager.get_engine()
with orm.Session(engine) as session:
session.execute(
Expand All @@ -889,6 +898,7 @@ def set_pending(
task_name=task_name,
resources=resources_str,
metadata=metadata,
submitted_at=time.time(),
status=ManagedJobStatus.PENDING.value,
is_primary_in_job_group=is_primary_in_job_group,
))
Expand Down Expand Up @@ -1598,12 +1608,18 @@ def build_managed_jobs_with_filters_no_status_query(
if user_hashes is not None:
query = query.where(job_info_table.c.user_hash.in_(user_hashes))
if submitted_after is not None or submitted_before is not None:
# submitted_at is NULL until a job leaves PENDING (it is set at
# STARTING). For a still-active job that just means "not submitted
# yet", so treat it as submitted "now". A terminal job with no
# submitted_at never started (cancelled/failed before STARTING) and
# has no submission time, so leave it NULL to exclude it from the
# window rather than letting it masquerade as "now".
# submitted_at is now recorded when a job is first queued (see
# set_pending), so normal rows always have a non-NULL submitted_at
# and are filtered directly against the window.
#
# The NULL branches below only apply to legacy rows created before
# this change (pre-upgrade), which may still have a NULL
# submitted_at. For such a legacy job that is still active, NULL just
# means "submission time unknown / not yet recorded", so treat it as
# submitted "now" to keep it visible. A legacy terminal job with no
# submitted_at has no recoverable submission time, so leave it NULL to
# exclude it from the window rather than letting it masquerade as
# "now".
terminal_values = [
s.value for s in ManagedJobStatus.terminal_statuses()
]
Expand Down Expand Up @@ -2825,7 +2841,18 @@ async def set_starting_async(job_id: int,
async def _op(session: sql_async.AsyncSession) -> int:
values = {
spot_table.c.resources: resources_str,
spot_table.c.submitted_at: submit_time,
# `submitted_at` is normally recorded once at PENDING (see
# `set_pending`), which is the canonical queue-entry time. Only
# backfill it here if it is missing (e.g. a legacy row created
# before that change) so we never clobber the original submission
# time on a (re-)entry into STARTING.
Comment thread
cg505 marked this conversation as resolved.
# TODO(cooperc): remove this COALESCE backcompat in v0.14.0 (~2
# minor releases after this change ships). It only backfills
# submitted_at for jobs created before submitted_at was recorded
# at queue-entry (set_pending); once no such rows remain,
# set_starting_async can stop writing submitted_at entirely.
spot_table.c.submitted_at: sqlalchemy.func.coalesce(
spot_table.c.submitted_at, submit_time),
spot_table.c.status: ManagedJobStatus.STARTING.value,
spot_table.c.run_timestamp: run_timestamp,
spot_table.c.specs: json.dumps(specs),
Expand Down
188 changes: 177 additions & 11 deletions tests/unit_tests/test_sky/jobs/test_jobs_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@
from sky.skylet import constants


def _force_submitted_at(engine, job_id, submitted_at):
"""Overwrite a job's submitted_at to a deterministic value for tests.

`submitted_at` is recorded at set_pending (queue-entry) time using
time.time(); tests that need exact, comparable timestamps overwrite it
directly here.
"""
import sqlalchemy
with engine.begin() as conn:
conn.execute(
sqlalchemy.update(state.spot_table).where(
state.spot_table.c.spot_job_id == job_id).values(
submitted_at=submitted_at))


@pytest.fixture
def _mock_managed_jobs_db_conn(tmp_path, monkeypatch):
"""Create a temporary SQLite DB for managed jobs state and monkeypatch engines.
Expand Down Expand Up @@ -903,10 +918,12 @@ async def create_job_states():
task_name='task0',
resources_str='{}',
metadata='{}')
# submitted_at is recorded at set_pending (queue-entry) time; force
# it to the explicit, deterministic submit_time for these tests.
_force_submitted_at(_mock_managed_jobs_db_conn, job_id, submit_time)
state.scheduler_set_waiting([job_id], f'/tmp/dag-{key}.yaml',
f'/tmp/user-{key}.yaml',
f'/tmp/env-{key}', None, 100)
# submitted_at is written here from the explicit submit_time.
await state.set_starting_async(job_id, 0, f'run-{key}', submit_time,
'{}', {}, mock_callback)
await state.set_started_async(job_id, 0, submit_time, mock_callback)
Expand All @@ -922,7 +939,8 @@ async def create_job_states():

@pytest.fixture
def _seed_pending_job(_mock_managed_jobs_db_conn):
"""Seed one job left in PENDING, so its submitted_at stays NULL."""
"""Seed one job left in PENDING; submitted_at is recorded at set_pending
(queue-entry), so it carries a non-NULL submitted_at of ~now."""
job_id = state.set_job_info_without_job_id(name='job-pending',
workspace='ws1',
entrypoint='ep',
Expand All @@ -939,8 +957,13 @@ def _seed_pending_job(_mock_managed_jobs_db_conn):

@pytest.fixture
def _seed_terminal_no_submit(_mock_managed_jobs_db_conn):
"""Seed a job cancelled while PENDING: terminal, with a NULL submitted_at
(it never reached STARTING)."""
"""Seed a legacy terminal row with a NULL submitted_at.

Such rows only exist before this change (pre-upgrade): they were created
before `submitted_at` was recorded at queue-entry (`set_pending`) and never
reached STARTING, so they never got a `submitted_at`. We simulate one by
cancelling a PENDING job and then forcing `submitted_at` back to NULL.
"""
job_id = state.set_job_info_without_job_id(name='job-cancelled',
workspace='ws1',
entrypoint='ep',
Expand All @@ -955,6 +978,8 @@ def _seed_terminal_no_submit(_mock_managed_jobs_db_conn):
state.scheduler_set_waiting([job_id], '/tmp/dag-c.yaml', '/tmp/user-c.yaml',
'/tmp/env-c', None, 100)
state.set_pending_cancelled(job_id)
# Simulate a legacy pre-upgrade row that never recorded a submitted_at.
_force_submitted_at(_mock_managed_jobs_db_conn, job_id, None)
return job_id


Expand Down Expand Up @@ -1017,12 +1042,15 @@ def test_status_count_respects_window(self, _seed_timed_jobs):
counts = state.get_status_count_with_filters(submitted_after=_T200)
assert counts == {state.ManagedJobStatus.RUNNING.value: 2}

# A pending job (NULL submitted_at) is treated as submitted "now", so the
# window keeps or drops it the same way it would a job submitted right now.
# A pending job records its `submitted_at` at queue-entry time (when the
# task row is first inserted), which for these tests is ~now. The window
# therefore keeps or drops it the same way it would a job submitted right
# now -- the outcomes below are identical to the previous NULL-as-now
# behavior, only now the row carries a real (non-NULL) submission time.
def test_pending_kept_by_past_lower_bound(self, _seed_pending_job):
jobs, total = state.get_managed_jobs_with_filters(submitted_after=_T100)
assert total == 1
assert jobs[0]['submitted_at'] is None
assert jobs[0]['submitted_at'] is not None

def test_pending_dropped_by_past_upper_bound(self, _seed_pending_job):
jobs, total = state.get_managed_jobs_with_filters(
Expand All @@ -1034,17 +1062,19 @@ def test_pending_kept_by_future_upper_bound(self, _seed_pending_job):
jobs, total = state.get_managed_jobs_with_filters(
submitted_before=_FAR_FUTURE)
assert total == 1
assert jobs[0]['submitted_at'] is None
assert jobs[0]['submitted_at'] is not None

def test_pending_dropped_by_future_lower_bound(self, _seed_pending_job):
jobs, total = state.get_managed_jobs_with_filters(
submitted_after=_FAR_FUTURE)
assert jobs == []
assert total == 0

# A terminal job that never got a submitted_at (cancelled/failed before
# STARTING) has no submission time, so it is excluded from any window —
# it must not be treated as submitted "now" like a still-pending job.
# A legacy terminal row (pre-upgrade) that never recorded a submitted_at
# has no recoverable submission time, so it is excluded from any window —
# it must not be treated as submitted "now" like a still-active job.
# (Post-change rows always carry a submitted_at from set_pending, so they
# never hit this branch; see TestPendingTerminalRetainsSubmittedAt.)
def test_terminal_no_submit_present_without_filter(
self, _seed_terminal_no_submit):
jobs, total = state.get_managed_jobs_with_filters()
Expand Down Expand Up @@ -1250,3 +1280,139 @@ def test_sort_by_status_uses_status_expr(self, _mock_managed_jobs_db_conn):
fields=['job_id', 'status', 'task_id'])
seq_d = [refined[j['job_id']] for j in desc]
assert seq_d == sorted(seq_d, reverse=True), seq_d


class TestSubmittedAtRecordedAtPending:
"""`submitted_at` is recorded once when the task row is first inserted at
PENDING (queue-entry time), not at the PENDING -> STARTING transition. This
fixes no-op jobs (which never go through STARTING) and prevents the
submission time from drifting on backoff/recovery re-entry."""

async def _mock_callback(self, status: str):
pass

def _run(self, coro_factory):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro_factory())
finally:
loop.close()

def _make_pending_job(self, name):
job_id = state.set_job_info_without_job_id(name=name,
workspace='ws1',
entrypoint='ep',
pool=None,
pool_hash=None,
user_hash='user1')
state.set_pending(job_id,
task_id=0,
task_name='task0',
resources_str='{}',
metadata='{}')
state.scheduler_set_waiting([job_id], '/tmp/dag.yaml', '/tmp/user.yaml',
'/tmp/env', None, 100)
return job_id

def test_set_pending_records_submitted_at(self, _mock_managed_jobs_db_conn):
# A freshly-queued (still PENDING) job already has a non-NULL
# submitted_at, set at insertion time.
before = time.time()
job_id = self._make_pending_job('queued-job')
after = time.time()
jobs, total = state.get_managed_jobs_with_filters()
assert total == 1
assert jobs[0]['job_id'] == job_id
assert jobs[0]['submitted_at'] is not None
assert before <= jobs[0]['submitted_at'] <= after

def test_noop_job_has_submitted_at(self, _mock_managed_jobs_db_conn):
# A no-op job goes PENDING -> RUNNING -> SUCCEEDED without STARTING;
# it must still carry the submitted_at recorded at PENDING.
job_id = self._make_pending_job('noop-job')
submitted_at_at_pending = state.get_managed_jobs_with_filters(
)[0][0]['submitted_at']

async def run():
await state.set_started_async(job_id, 0, time.time(),
self._mock_callback)
await state.set_succeeded_async(job_id, 0, time.time(),
self._mock_callback)

self._run(lambda: run())
jobs, total = state.get_managed_jobs_with_filters()
assert total == 1
assert jobs[0]['status'] == state.ManagedJobStatus.SUCCEEDED
assert jobs[0]['submitted_at'] == submitted_at_at_pending
assert jobs[0]['submitted_at'] is not None

def test_starting_does_not_clobber_submitted_at(self,
_mock_managed_jobs_db_conn):
# The PENDING -> STARTING transition must NOT overwrite the queue-entry
# submitted_at (so backoff/recovery re-entry can't push it forward).
job_id = self._make_pending_job('starting-job')
submitted_at_at_pending = state.get_managed_jobs_with_filters(
)[0][0]['submitted_at']
# Use a clearly-different submit_time argument; it must be ignored
# because submitted_at is already set.
later = submitted_at_at_pending + 10_000.0

async def run():
await state.set_starting_async(job_id, 0, 'run-x', later, '{}', {},
self._mock_callback)

self._run(lambda: run())
jobs, _ = state.get_managed_jobs_with_filters()
assert jobs[0]['status'] == state.ManagedJobStatus.STARTING
# Unchanged: still the original queue-entry time, not `later`.
assert jobs[0]['submitted_at'] == submitted_at_at_pending

def test_starting_backfills_null_submitted_at(self,
_mock_managed_jobs_db_conn):
# Defensive: a legacy row with a NULL submitted_at gets backfilled by
# the STARTING transition (COALESCE only-write-if-null).
job_id = self._make_pending_job('legacy-job')
# Simulate a legacy row by forcing submitted_at back to NULL.
_force_submitted_at(_mock_managed_jobs_db_conn, job_id, None)
backfill = 4242.0

async def run():
await state.set_starting_async(job_id, 0, 'run-x', backfill, '{}',
{}, self._mock_callback)

self._run(lambda: run())
jobs, _ = state.get_managed_jobs_with_filters()
assert jobs[0]['submitted_at'] == backfill

def test_cancelled_while_pending_retains_submitted_at(
self, _mock_managed_jobs_db_conn):
# A job cancelled while still PENDING was still submitted (queued), so
# it keeps the submitted_at recorded by set_pending. Cancelling it must
# not clear or change the submission time.
job_id = self._make_pending_job('cancel-pending')
submitted_at_at_pending = state.get_managed_jobs_with_filters(
)[0][0]['submitted_at']
assert submitted_at_at_pending is not None
state.set_pending_cancelled(job_id)
jobs, total = state.get_managed_jobs_with_filters()
assert total == 1
assert jobs[0]['status'] == state.ManagedJobStatus.CANCELLED
assert jobs[0]['submitted_at'] == submitted_at_at_pending

def test_terminal_while_pending_included_by_submitted_window(
self, _mock_managed_jobs_db_conn):
# A job that cancels/fails while still PENDING (never started) keeps a
# non-NULL submitted_at, so the submitted-window filter includes it the
# same as any other submitted job -- it is no longer excluded.
job_id = self._make_pending_job('cancel-pending-window')
submitted_at_at_pending = state.get_managed_jobs_with_filters(
)[0][0]['submitted_at']
assert submitted_at_at_pending is not None
state.set_pending_cancelled(job_id)
# A submitted_after bound in the past (the row was submitted ~now)
# includes the cancelled-while-pending job.
jobs, total = state.get_managed_jobs_with_filters(submitted_after=_T100)
assert total == 1
assert jobs[0]['job_id'] == job_id
assert jobs[0]['status'] == state.ManagedJobStatus.CANCELLED
assert jobs[0]['submitted_at'] is not None
16 changes: 11 additions & 5 deletions tests/unit_tests/test_sky/skylet/test_managed_jobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,10 @@ def test_get_job_table_basic(self):
assert not target_job.HasField('failure_reason')
assert not target_job.HasField('user_name')
assert target_job.user_hash == 'abcd1234'
assert not target_job.HasField('submitted_at')
# submitted_at is recorded at queue-entry (set_pending) time, so even a
# still-PENDING job has a non-NULL submission time.
assert target_job.HasField('submitted_at')
assert target_job.submitted_at > 0
assert not target_job.HasField('start_at')
assert not target_job.HasField('end_at')
assert not target_job.HasField('user_yaml')
Expand Down Expand Up @@ -408,7 +411,10 @@ def test_get_job_table_no_accessible_workspaces(self):
assert not target_job.HasField('failure_reason')
assert not target_job.HasField('user_name')
assert target_job.user_hash == 'abcd1234'
assert not target_job.HasField('submitted_at')
# submitted_at is recorded at queue-entry (set_pending) time, so even a
# still-PENDING job has a non-NULL submission time.
assert target_job.HasField('submitted_at')
assert target_job.submitted_at > 0
assert not target_job.HasField('start_at')
assert not target_job.HasField('end_at')
assert not target_job.HasField('user_yaml')
Expand Down Expand Up @@ -458,9 +464,9 @@ def test_get_job_table_different_states(self):

job_data = {job.job_id: job for job in response.jobs}

# STARTING, RUNNING, SUCCEEDED jobs should have submitted_at > 0
# PENDING job should have submitted_at = 0.0
assert job_data[self.job_ids['job_id1']].submitted_at == 0.0
# submitted_at is recorded at queue-entry (set_pending) time, so all
# jobs -- including the still-PENDING job1 -- have submitted_at > 0.
assert job_data[self.job_ids['job_id1']].submitted_at > 0
assert job_data[self.job_ids['job_id2']].submitted_at > 0
assert job_data[self.job_ids['job_id3']].submitted_at > 0
assert job_data[self.job_ids['job_id4']].submitted_at > 0
Expand Down
Loading