Skip to content

Commit 2509935

Browse files
committed
[iris] Add heartbeat and slice-lifecycle debug logging
Per-task slow_log timers on submit_task (500ms) and synchronous kill (2000ms) inside handle_heartbeat identify which task stalls a heartbeat. Worker service heartbeat entrypoint gets an outer slow_log (1000ms) and a DEBUG payload-size line to correlate with controller-side sync timing. Slice ready/failed transitions log registered worker counts and ids to expose partial bootstrap on large slices. Complements #4792 and #4793.
1 parent 489bfc0 commit 2509935

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

lib/iris/src/iris/cluster/controller/autoscaler/scaling_group.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,13 @@ def mark_slice_ready(self, slice_id: str, worker_ids: list[str], timestamp: Time
466466
state.last_active = timestamp
467467
if state is not None:
468468
self._db_upsert_slice(slice_id, state)
469+
logger.info(
470+
"slice ready group=%s slice=%s n_workers=%d worker_ids=%s",
471+
self._config.name,
472+
slice_id,
473+
len(worker_ids),
474+
worker_ids,
475+
)
469476

470477
def mark_slice_failed(self, slice_id: str, error_message: str = "") -> None:
471478
"""Mark a slice as FAILED. Called when bootstrap fails."""
@@ -474,8 +481,19 @@ def mark_slice_failed(self, slice_id: str, error_message: str = "") -> None:
474481
if state is not None:
475482
state.lifecycle = SliceLifecycleState.FAILED
476483
state.error_message = error_message
484+
registered = list(state.worker_ids)
485+
else:
486+
registered = []
477487
if state is not None:
478488
self._db_upsert_slice(slice_id, state)
489+
logger.warning(
490+
"slice failed group=%s slice=%s n_registered=%d registered=%s error=%s",
491+
self._config.name,
492+
slice_id,
493+
len(registered),
494+
registered,
495+
error_message,
496+
)
479497

480498
def reconcile(self) -> None:
481499
"""Discover and adopt existing slices from the cloud.

lib/iris/src/iris/cluster/worker/service.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from iris.rpc import job_pb2
1919
from iris.rpc import worker_pb2
2020
from iris.rpc.errors import rpc_error_handler
21+
from rigging.log_setup import slow_log
2122
from rigging.timing import Timer
2223

2324
logger = logging.getLogger(__name__)
@@ -100,7 +101,7 @@ def heartbeat(
100101
101102
Processes tasks_to_run and tasks_to_kill, then returns current state.
102103
"""
103-
with rpc_error_handler("heartbeat"):
104+
with rpc_error_handler("heartbeat"), slow_log(logger, "heartbeat rpc", threshold_ms=1000):
104105
# Chaos injection for testing heartbeat failures and delays
105106
if rule := chaos("worker.heartbeat"):
106107
if rule.delay_seconds > 0:
@@ -111,6 +112,13 @@ def heartbeat(
111112
if not rule.delay_seconds:
112113
raise RuntimeError("chaos: worker.heartbeat")
113114

115+
logger.debug(
116+
"heartbeat rpc received n_run=%d n_kill=%d n_expected=%d req_bytes=%d",
117+
len(request.tasks_to_run),
118+
len(request.tasks_to_kill),
119+
len(request.expected_tasks),
120+
request.ByteSize(),
121+
)
114122
# Delegate to worker for reconciliation
115123
return self._provider.handle_heartbeat(request)
116124

lib/iris/src/iris/cluster/worker/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,8 @@ def handle_heartbeat(self, request: job_pb2.HeartbeatRequest) -> job_pb2.Heartbe
734734
with slow_log(logger, "heartbeat submit_tasks", threshold_ms=200):
735735
for run_req in request.tasks_to_run:
736736
try:
737-
self.submit_task(run_req)
737+
with slow_log(logger, f"heartbeat submit_task[{run_req.task_id}]", threshold_ms=500):
738+
self.submit_task(run_req)
738739
logger.info("Heartbeat: submitted task %s", run_req.task_id)
739740
except Exception as e:
740741
logger.warning("Heartbeat: failed to submit task %s: %s", run_req.task_id, e)
@@ -747,7 +748,8 @@ def handle_heartbeat(self, request: job_pb2.HeartbeatRequest) -> job_pb2.Heartbe
747748
try:
748749
current = self._get_current_attempt(task_id)
749750
if current:
750-
self._kill_task_attempt(task_id, current.attempt_id, async_kill=False)
751+
with slow_log(logger, f"heartbeat kill_task[{task_id}]", threshold_ms=2000):
752+
self._kill_task_attempt(task_id, current.attempt_id, async_kill=False)
751753
logger.info("Heartbeat: killed task %s", task_id)
752754
except Exception as e:
753755
logger.warning("Heartbeat: failed to kill task %s: %s", task_id, e)

0 commit comments

Comments
 (0)