Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions lib/iris/src/iris/cluster/controller/transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ def _kill_non_terminal_tasks(
placeholders = ",".join("?" * len(terminal_states))
rows = cur.execute(
"SELECT t.task_id, t.current_attempt_id, t.current_worker_id, "
"jc.res_cpu_millicores, jc.res_memory_bytes, jc.res_disk_bytes, jc.res_device_json "
"jc.res_cpu_millicores, jc.res_memory_bytes, jc.res_disk_bytes, jc.res_device_json, "
"j.is_reservation_holder "
"FROM tasks t "
"JOIN jobs j ON j.job_id = t.job_id "
f"{JOB_CONFIG_JOIN} "
Expand All @@ -556,15 +557,24 @@ def _kill_non_terminal_tasks(
task_id = str(row["task_id"])
worker_id = row["current_worker_id"]
task_name = JobName.from_wire(task_id)
resources = None
is_reservation_holder = bool(int(row["is_reservation_holder"]))
decommit_worker: str | None = None
decommit_resources = None
if worker_id is not None:
resources = resource_spec_from_scalars(
int(row["res_cpu_millicores"]),
int(row["res_memory_bytes"]),
int(row["res_disk_bytes"]),
row["res_device_json"],
)
task_kill_workers[task_name] = WorkerId(str(worker_id))
# Reservation holders never commit resources on assignment
# (see _assign_task), so they must not decommit on termination —
# otherwise we subtract chips that were never added, which floors
# committed_* below a co-tenant's legitimate reservation and lets
# the scheduler double-book the worker.
if not is_reservation_holder:
decommit_worker = str(worker_id)
decommit_resources = resource_spec_from_scalars(
int(row["res_cpu_millicores"]),
int(row["res_memory_bytes"]),
int(row["res_disk_bytes"]),
row["res_device_json"],
)
_terminate_task(
cur,
registry,
Expand All @@ -573,8 +583,8 @@ def _kill_non_terminal_tasks(
job_pb2.TASK_STATE_KILLED,
reason,
now_ms,
worker_id=str(worker_id) if worker_id is not None else None,
resources=resources,
worker_id=decommit_worker,
resources=decommit_resources,
)
tasks_to_kill.add(task_name)
return tasks_to_kill, task_kill_workers
Expand Down
55 changes: 55 additions & 0 deletions lib/iris/tests/cluster/controller/test_transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3544,6 +3544,61 @@ def test_kill_non_terminal_direct_provider_tasks(state):
assert task_ids[0] in result.tasks_to_kill


def test_kill_non_terminal_reservation_holder_does_not_decommit_co_tenant(harness):
"""Finalizing a reservation-holder task must not decommit a co-tenant's resources.

Regression: ``_kill_non_terminal_tasks`` passed ``resources`` into
``_terminate_task`` unconditionally. Reservation-holder tasks never commit
on assignment (see ``_assign_task``), so decommitting them on termination
subtracts chips that were never added — on a worker co-tenanted by a real
task, this floored ``committed_*`` below the co-tenant's true reservation,
letting the scheduler double-book the VM (seen in prod: two v5p-8 jobs on
the same 4-chip VM, with the second crashing on ``/dev/vfio/0 busy``).
"""
from iris.cluster.controller.transitions import _kill_non_terminal_tasks
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local import inside the test body. Per AGENTS.md — Code Style: "All imports at the top of the file. No local imports except to break circular dependencies or guard optional deps." There is no circular-dep or optional-dep justification here — iris.cluster.controller.transitions is already imported at the top of the file.

Suggested fix: remove this line and add _kill_non_terminal_tasks to the existing top-level import block. (Noting that the file already has several pre-existing local imports that violate this rule; this comment is scoped to the new one.)


worker_id = harness.add_worker("w1")

real_tasks = harness.submit("real-job", replicas=1)
harness.dispatch(real_tasks[0], worker_id)

baseline_cpu = _query_worker(harness.state, worker_id).committed_cpu_millicores
baseline_mem = _query_worker(harness.state, worker_id).committed_mem
assert baseline_cpu > 0

holder_tasks = harness.submit("holder-job", replicas=1)
holder_job_id = JobName.root("test-user", "holder-job")
harness.state._db.execute(
"UPDATE jobs SET is_reservation_holder = 1 WHERE job_id = ?",
(holder_job_id.to_wire(),),
)
dispatch_task(harness.state, holder_tasks[0], worker_id)

# Holder did not consume capacity.
assert _query_worker(harness.state, worker_id).committed_cpu_millicores == baseline_cpu
assert _query_worker(harness.state, worker_id).committed_mem == baseline_mem

# Exercise the exact finalization path: _finalize_terminal_job cascades to
# the holder sub-job via _kill_non_terminal_tasks. cancel_job has its own
# inline gated path and doesn't cover this.
with harness.state._db.transaction() as cur:
_kill_non_terminal_tasks(
cur,
harness.state._db.endpoints,
holder_job_id.to_wire(),
"Job finalized",
0,
)

# Holder's termination must not touch the co-tenant's committed counters.
assert (
_query_worker(harness.state, worker_id).committed_cpu_millicores == baseline_cpu
), "holder finalization leaked committed_cpu_millicores onto co-tenant's reservation"
assert (
_query_worker(harness.state, worker_id).committed_mem == baseline_mem
), "holder finalization leaked committed_mem onto co-tenant's reservation"


def test_max_failures_kills_direct_provider_tasks(state):
"""When a task fails and triggers kill of siblings, direct-provider tasks appear in tasks_to_kill."""
task_ids = _submit_job_direct(state, "/user/job1", replicas=2, max_retries_failure=0)
Expand Down
Loading