Skip to content

Commit 223d1c4

Browse files
committed
Less confusing log messages.
1 parent 12b0df8 commit 223d1c4

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ class _SyncFailureAccumulator:
185185
"""Mutable accumulator for tracking failures during provider sync."""
186186

187187
fail_count: int = 0
188-
failed_workers: list[str] = field(default_factory=list)
188+
transient_failed_workers: list[str] = field(default_factory=list)
189+
terminal_failed_workers: list[str] = field(default_factory=list)
189190
all_tasks_to_kill: set[JobName] = field(default_factory=set)
190191
all_task_kill_workers: dict[JobName, WorkerId] = field(default_factory=dict)
191192

@@ -2188,7 +2189,8 @@ def _sync_all_execution_units(self) -> None:
21882189
self._log_sync_health_summary(
21892190
batch_count=len(batches),
21902191
fail_count=acc.fail_count,
2191-
failed_workers=acc.failed_workers,
2192+
transient_failed_workers=acc.transient_failed_workers,
2193+
terminal_failed_workers=acc.terminal_failed_workers,
21922194
elapsed_ms=round_timer.elapsed_ms(),
21932195
)
21942196

@@ -2252,12 +2254,12 @@ def _handle_failed_heartbeats(
22522254
)
22532255
if result.action == HeartbeatAction.WORKER_FAILED:
22542256
acc.fail_count += 1
2255-
acc.failed_workers.append(batch.worker_id)
2257+
acc.terminal_failed_workers.append(batch.worker_id)
22562258
self._provider.on_worker_failed(batch.worker_id, batch.worker_address)
22572259
primary_failed_workers.append(str(batch.worker_id))
22582260
elif result.action == HeartbeatAction.TRANSIENT_FAILURE:
22592261
acc.fail_count += 1
2260-
acc.failed_workers.append(batch.worker_id)
2262+
acc.transient_failed_workers.append(batch.worker_id)
22612263
return primary_failed_workers
22622264

22632265
def _handle_sibling_worker_failures(
@@ -2282,7 +2284,7 @@ def _handle_sibling_worker_failures(
22822284
self._provider.on_worker_failed(wid, addr)
22832285
if sibling_failures.removed_workers:
22842286
acc.fail_count += len(sibling_failures.removed_workers)
2285-
acc.failed_workers.extend(wid for wid, _ in sibling_failures.removed_workers)
2287+
acc.terminal_failed_workers.extend(wid for wid, _ in sibling_failures.removed_workers)
22862288
logger.info(
22872289
"Failed %d sibling workers from slices: %s",
22882290
len(sibling_failures.removed_workers),
@@ -2293,17 +2295,34 @@ def _log_sync_health_summary(
22932295
self,
22942296
batch_count: int,
22952297
fail_count: int,
2296-
failed_workers: list[str],
2298+
transient_failed_workers: list[str],
2299+
terminal_failed_workers: list[str],
22972300
elapsed_ms: int,
22982301
) -> None:
22992302
"""Log provider sync timing and periodic cluster health summary."""
23002303
level = logging.WARNING if elapsed_ms > _SLOW_HEARTBEAT_MS else logging.DEBUG
2301-
fmt = "Provider sync: %d workers, %d failed, %dms"
2302-
args: list[object] = [batch_count, fail_count, elapsed_ms]
2303-
if failed_workers:
2304-
fmt += " failed=[%s]"
2305-
args.append(", ".join(failed_workers))
2306-
logger.log(level, fmt, *args)
2304+
logger.log(
2305+
level,
2306+
"Provider sync: %d workers, %d failed (%d transient, %d terminal), %dms",
2307+
batch_count,
2308+
fail_count,
2309+
len(transient_failed_workers),
2310+
len(terminal_failed_workers),
2311+
elapsed_ms,
2312+
)
2313+
if transient_failed_workers:
2314+
logger.log(
2315+
level,
2316+
"Provider sync transient failures (%d): [%s]",
2317+
len(transient_failed_workers),
2318+
", ".join(transient_failed_workers),
2319+
)
2320+
if terminal_failed_workers:
2321+
logger.warning(
2322+
"Provider sync terminal failures (%d): [%s]",
2323+
len(terminal_failed_workers),
2324+
", ".join(terminal_failed_workers),
2325+
)
23072326

23082327
self._heartbeat_iteration += 1
23092328
if _HEALTH_SUMMARY_INTERVAL.should_run():

0 commit comments

Comments
 (0)