Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions src/guidellm/scheduler/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import asyncio
import contextlib
import math
import threading
import time
Expand Down Expand Up @@ -130,6 +131,10 @@ def __init__(
self.shutdown_event: Event | None = None
self.error_event: Event | None = None

# Background health monitor, created in create_processes
self._health_monitor_task: asyncio.Task | None = None
self._worker_error_details: str | None = None

# Scheduler and messaging state, created in start
self.state: WorkerGroupState[RequestT, ResponseT] | None = None
self.messaging: WorkGroupMessengerT[RequestT, ResponseT] | None = None
Expand Down Expand Up @@ -252,6 +257,8 @@ async def create_processes(self):
proc.start()
self.processes.append(proc)

self._health_monitor_task = asyncio.create_task(self._process_health_monitor())

wait_key = await wait_for_sync_objects(
{
"startup_barrier": self.startup_barrier,
Expand All @@ -262,9 +269,37 @@ async def create_processes(self):
)

if wait_key == "error_event":
raise RuntimeError(
"Worker process group startup failed: error_event is set"
)
detail = self._worker_error_details or "error_event is set"
raise RuntimeError(f"Worker process group startup failed: {detail}")

async def _process_health_monitor(self):
"""Detect worker processes killed by OS signals (e.g. SIGSEGV, OOM)
that bypass Python exception handling and never set error_event."""
while self.processes:
await asyncio.sleep(settings.mp_poll_interval)
dead: list[str] = []
killed_by_signal = False
for proc in self.processes:
if not proc.is_alive() and proc.exitcode is not None:
if proc.exitcode < 0:
killed_by_signal = True
exit_info = f"signal {-proc.exitcode}"
else:
exit_info = f"exit code {proc.exitcode}"
detail = (
f"Worker process {proc.pid} died unexpectedly ({exit_info})"
)
logger.error(detail)
dead.append(detail)

if dead:
message = "; ".join(dead)
if killed_by_signal:
message += ". Check system logs for details"
self._worker_error_details = message
if self.error_event is not None:
self.error_event.set()
return

async def start(self, start_time: float):
"""
Expand Down Expand Up @@ -314,10 +349,11 @@ async def start(self, start_time: float):
if (wait_time := start_time - time.time()) > 0:
await asyncio.sleep(wait_time)
if self.error_event.is_set():
raise RuntimeError(
"error_event is set in WorkerProcessGroup, "
"indicating an error occurred in one of the worker processes."
detail = (
self._worker_error_details
or "an error occurred in one of the worker processes"
)
raise RuntimeError(f"error_event is set in WorkerProcessGroup: {detail}")

async def request_updates(
self,
Expand All @@ -343,9 +379,12 @@ async def request_updates(
while True:
if self.error_event.is_set(): # type: ignore[union-attr]
logger.error("Error event set in WorkerProcessGroup")
detail = (
self._worker_error_details
or "an error occurred in one of the worker processes"
)
raise RuntimeError(
"error_event is set in WorkerProcessGroup, "
"indicating an error occurred in one of the worker processes."
f"error_event is set in WorkerProcessGroup: {detail}"
)

try:
Expand Down Expand Up @@ -373,6 +412,13 @@ async def shutdown(self) -> list[Exception]: # noqa: C901
:return: List of exceptions encountered during shutdown; empty if no errors
"""
exceptions: list[Exception] = []

if self._health_monitor_task is not None:
self._health_monitor_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._health_monitor_task
self._health_monitor_task = None

if self.shutdown_event is not None:
self.shutdown_event.set()

Expand All @@ -390,10 +436,15 @@ async def shutdown(self) -> list[Exception]: # noqa: C901
for proc in self.processes:
try:
await asyncio.to_thread(proc.join, timeout=5.0)
if proc.exitcode is not None and proc.exitcode > 0:
if proc.exitcode is not None and proc.exitcode != 0:
exit_info = (
f"signal {-proc.exitcode}"
if proc.exitcode < 0
else f"exit code {proc.exitcode}"
)
exceptions.append(
RuntimeError(
f"Worker {proc.pid} exited with code {proc.exitcode}"
f"Worker {proc.pid} exited abnormally ({exit_info})"
)
)
except Exception as err: # noqa: BLE001
Expand Down