Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions lib/iris/src/iris/cluster/controller/transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,34 @@ def task_updates_from_proto(entries) -> list[TaskUpdate]:

Skips UNSPECIFIED/PENDING — the controller is only interested in
transitions to BUILDING or beyond.

TASK_STATE_MISSING is a worker-only signal (the worker lost the task or
reconciled it away). It is translated to WORKER_FAILED so the retry
machinery rolls the task back to PENDING; see issue #5041 for why we
never let a worker-reported KILLED survive onto a task the controller
did not itself ask to kill.
"""
updates: list[TaskUpdate] = []
for entry in entries:
if entry.state in (job_pb2.TASK_STATE_UNSPECIFIED, job_pb2.TASK_STATE_PENDING):
continue
new_state = entry.state
error = entry.error or None
if new_state == job_pb2.TASK_STATE_MISSING:
Copy link
Copy Markdown
Contributor

@ravwojdyla ravwojdyla Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log here, no? To leave trace for future debugging.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good call, I should do a pass and add a lot more logging to Iris in general.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should do a pass and add a lot more logging to Iris in general.

+1

new_state = job_pb2.TASK_STATE_WORKER_FAILED
error = error or "Worker reported task as missing"
logger.info(
"Translating worker-reported MISSING to WORKER_FAILED for task %s attempt %d: %s",
entry.task_id,
entry.attempt_id,
error,
)
updates.append(
TaskUpdate(
task_id=JobName.from_wire(entry.task_id),
attempt_id=entry.attempt_id,
new_state=entry.state,
error=entry.error or None,
new_state=new_state,
error=error,
exit_code=entry.exit_code if entry.HasField("exit_code") else None,
resource_usage=entry.resource_usage if entry.resource_usage.ByteSize() > 0 else None,
container_id=entry.container_id or None,
Expand Down
103 changes: 65 additions & 38 deletions lib/iris/src/iris/cluster/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,14 +794,22 @@ def _encode_task_status(self, task: TaskAttempt, task_id: str) -> job_pb2.Worker
return entry

@staticmethod
def _missing_task_status(task_id: str, expected_attempt_id: int) -> job_pb2.WorkerTaskStatus:
"""Status for an expected task that the worker has no record of (lost state)."""
def _missing_task_status(task_id: str, expected_attempt_id: int, error: str) -> job_pb2.WorkerTaskStatus:
"""Signal to the controller that this worker has no live record of the task.

TASK_STATE_MISSING is a transport-only signal: the worker either lost its
state (worker restart between heartbeats) or reconciled the task away
locally. The controller translates MISSING to the appropriate task
outcome (WORKER_FAILED with preemption-retry semantics) — a missing
task must never be interpreted as a user-requested KILLED, which would
cascade to rolling up the whole parent job. See issue #5041.
"""
return job_pb2.WorkerTaskStatus(
task_id=task_id,
attempt_id=expected_attempt_id,
state=job_pb2.TASK_STATE_WORKER_FAILED,
state=job_pb2.TASK_STATE_MISSING,
exit_code=0,
error="Task not found on worker",
error=error,
finished_at=timestamp_to_proto(Timestamp.now()),
)

Expand All @@ -822,11 +830,14 @@ def _prune_and_get_recent_submission_keys(self) -> set[tuple[str, int]]:
def _reconcile_expected_tasks(
self,
expected_entries,
) -> tuple[list[job_pb2.WorkerTaskStatus], list[tuple[str, int]]]:
) -> tuple[list[job_pb2.WorkerTaskStatus], list[TaskAttempt]]:
"""Build status entries for expected tasks; collect non-terminal local tasks
not in the expected set as targets to kill.
not in the expected set as TaskAttempts to kill.

Caller must hold ``self._lock``.
Caller must hold ``self._lock``. Tasks selected for reconciliation-kill
are removed from ``self._tasks`` before return so subsequent polls
surface them via the not-found → MISSING path rather than echoing a
worker-internal KILLED back to the controller (see issue #5041).

Freshly-submitted tasks (``self._recent_submissions``) are protected
from reconciliation kills via the grace window, which covers the
Expand All @@ -842,15 +853,21 @@ def _reconcile_expected_tasks(
expected_keys.add(key)
task = self._tasks.get(key)
if task is None:
tasks.append(self._missing_task_status(task_id, expected_attempt_id))
logger.warning(
"Reporting TASK_STATE_MISSING for expected task %s attempt %d (not found on worker)",
task_id,
expected_attempt_id,
)
tasks.append(self._missing_task_status(task_id, expected_attempt_id, "Task not found on worker"))
else:
tasks.append(self._encode_task_status(task, task_id))
expected_keys |= self._prune_and_get_recent_submission_keys()
tasks_to_kill: list[tuple[str, int]] = []
for key, task in self._tasks.items():
attempts_to_kill: list[TaskAttempt] = []
for key, task in list(self._tasks.items()):
if key not in expected_keys and task.status not in self._TERMINAL_STATES:
tasks_to_kill.append(key)
return tasks, tasks_to_kill
attempts_to_kill.append(task)
del self._tasks[key]
Comment on lines +868 to +869
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Delay removing unexpected attempts until kill is initiated

Deleting the attempt from self._tasks inside reconciliation before kill execution can misclassify successful work as missing. In the StartTasks→PollTasks race, an attempt can become terminal between this deletion and the later _kill_attempt(...) call; _kill_attempt then no-ops on terminal states, but the worker has already dropped tracking for that attempt. On the next poll (when the controller finally includes that task in expected_tasks), the worker reports MISSING, which the controller maps to WORKER_FAILED and retries, causing duplicate execution and potentially incorrect outcomes for short-lived tasks.

Useful? React with 👍 / 👎.

return tasks, attempts_to_kill

def _collect_resource_metrics(self) -> job_pb2.WorkerResourceSnapshot:
"""Collect host metrics with running-task and process aggregates filled in."""
Expand All @@ -874,10 +891,13 @@ def handle_heartbeat(self, request: job_pb2.HeartbeatRequest) -> job_pb2.Heartbe
2. Kill tasks_to_kill — synchronously, blocks until old process is stopped
so the controller does not assign new work while old tasks hold resources
3. Reconcile expected_tasks — for each expected task, report its current
state. If not found in self._tasks, report WORKER_FAILED ("Task not
found on worker"). This happens when the worker has reset its state
(_tasks.clear() in _reset_worker_state) between heartbeats — from
the controller's perspective this is equivalent to a worker restart.
state. If not found in self._tasks, or if locally KILLED (worker
self-kill during reconciliation or supersede), report
TASK_STATE_MISSING so the controller retries via WORKER_FAILED
instead of accepting an unsolicited KILLED. This happens when the
worker has reset its state (_tasks.clear() in _reset_worker_state)
between heartbeats — from the controller's perspective equivalent
to a worker restart.
4. Kill unexpected tasks — any non-terminal task in self._tasks that is
NOT in expected_tasks and is not within the recent-submission grace
window is killed (controller no longer wants it). The grace window
Expand Down Expand Up @@ -926,12 +946,16 @@ def handle_heartbeat(self, request: job_pb2.HeartbeatRequest) -> job_pb2.Heartbe
# self._recent_submissions and are protected from the race by
# _reconcile_expected_tasks' grace-window logic.
with self._lock:
tasks, tasks_to_kill = self._reconcile_expected_tasks(request.expected_tasks)
tasks, attempts_to_kill = self._reconcile_expected_tasks(request.expected_tasks)

# Kill removed tasks asynchronously outside lock to avoid deadlock
for task_id, attempt_id in tasks_to_kill:
logger.warning("Killing task %s attempt %d (no longer in expected_tasks)", task_id, attempt_id)
self._kill_task_attempt(task_id, attempt_id, async_kill=True)
for attempt in attempts_to_kill:
logger.warning(
"Killing task %s attempt %d (no longer in expected_tasks)",
attempt.task_id,
attempt.attempt_id,
)
self._kill_attempt(attempt, async_kill=True)

# Collect host metrics and aggregate task stats
with slow_log(logger, "heartbeat host_metrics", threshold_ms=100):
Expand Down Expand Up @@ -996,10 +1020,10 @@ def handle_poll_tasks(self, request: worker_pb2.Worker.PollTasksRequest) -> work
applied in _reconcile_expected_tasks.
"""
with self._lock:
tasks, tasks_to_kill = self._reconcile_expected_tasks(request.expected_tasks)
for task_id, attempt_id in tasks_to_kill:
logger.warning("PollTasks: killing task %s attempt %d (unexpected)", task_id, attempt_id)
self._kill_task_attempt(task_id, attempt_id, async_kill=True)
tasks, attempts_to_kill = self._reconcile_expected_tasks(request.expected_tasks)
for attempt in attempts_to_kill:
logger.warning("PollTasks: killing task %s attempt %d (unexpected)", attempt.task_id, attempt.attempt_id)
self._kill_attempt(attempt, async_kill=True)
return worker_pb2.Worker.PollTasksResponse(tasks=tasks)

def _kill_task_attempt(
Expand All @@ -1009,37 +1033,40 @@ def _kill_task_attempt(
term_timeout_ms: int = 5000,
async_kill: bool = False,
) -> bool:
"""Kill a specific task attempt.

Args:
task_id: Wire-format task ID.
attempt_id: Attempt number to kill.
term_timeout_ms: Time to wait for graceful shutdown before SIGKILL.
async_kill: If True, signal the task immediately but perform the
container stop/wait/force-kill sequence in a daemon thread.
Used by heartbeat to avoid blocking the RPC response.
"""
"""Kill a specific task attempt by (task_id, attempt_id) lookup."""
task = self._tasks.get((task_id, attempt_id))
if not task:
return False
return self._kill_attempt(task, term_timeout_ms=term_timeout_ms, async_kill=async_kill)

# Check if already in terminal state
def _kill_attempt(
self,
task: TaskAttempt,
term_timeout_ms: int = 5000,
async_kill: bool = False,
) -> bool:
"""Kill a TaskAttempt we already hold a reference to.

Split from ``_kill_task_attempt`` so callers that have already removed
the attempt from ``self._tasks`` (reconciliation kill) can still kill
it. ``async_kill=True`` signals the execution thread immediately and
performs the container SIGTERM → wait → SIGKILL sequence in a daemon
thread so RPC handlers don't block.
"""
if task.status not in (
job_pb2.TASK_STATE_RUNNING,
job_pb2.TASK_STATE_BUILDING,
job_pb2.TASK_STATE_PENDING,
):
return False

# Set flag to signal the task's execution thread to stop.
# This is always done immediately regardless of async_kill.
task.should_stop = True

if async_kill:
thread = threading.Thread(
target=self._do_kill_container,
args=(task, term_timeout_ms),
name=f"kill-{task_id}-{attempt_id}",
name=f"kill-{task.task_id}-{task.attempt_id}",
daemon=True,
)
thread.start()
Expand Down
6 changes: 6 additions & 0 deletions lib/iris/src/iris/rpc/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ enum TaskState {
TASK_STATE_UNSCHEDULABLE = 8;
TASK_STATE_ASSIGNED = 9;
TASK_STATE_PREEMPTED = 10;
// Worker-only signal (never a controller-persisted state): the worker has no
// record of this task, or killed it during reconciliation. The controller
// maps MISSING to the appropriate outcome (typically WORKER_FAILED with
// preemption-retry semantics) instead of trusting a worker-reported KILLED
// it never asked for. See issue #5041.
TASK_STATE_MISSING = 11;
}

// Status of a single task within a job
Expand Down
Loading
Loading