Skip to content

Commit 2a92b20

Browse files
github-actions[bot]rjpowerclaude
committed
fix: add force-kill fallback and resolve safety to graceful actor shutdown
- Use handle._resolve() instead of handle._actor_ref to handle name-based handles that haven't been resolved yet - Wait on __ray_terminate__ refs with a 30s timeout - Force-kill via ray.kill() any actors that don't terminate in time - Addresses review feedback from both Claude and Codex reviews Co-authored-by: Russell Power <rjpower@users.noreply.github.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3ddaca3 commit 2a92b20

1 file changed

Lines changed: 24 additions & 5 deletions

File tree

lib/fray/src/fray/v2/ray_backend/backend.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -611,17 +611,36 @@ def is_done(self) -> bool:
611611
"""Ray actors managed by Zephyr don't have an independent job lifecycle."""
612612
return False
613613

614-
def shutdown(self) -> None:
615-
"""Gracefully terminate all Ray actors.
614+
def shutdown(self, graceful_timeout_s: float = 30.0) -> None:
615+
"""Gracefully terminate all Ray actors, with force-kill fallback.
616616
617617
Uses __ray_terminate__ instead of ray.kill() so that in-flight tasks
618618
finish before the actor exits. ray.kill() races with task completion
619619
callbacks in Ray's C++ task_manager, triggering a fatal assertion
620620
(ray-project/ray#54260). __ray_terminate__ queues behind pending
621-
tasks and escalates to a force-kill after 30 s.
621+
tasks; we wait up to *graceful_timeout_s* then force-kill stragglers.
622622
"""
623+
# Phase 1: request graceful termination for all actors.
624+
terminate_refs: list[tuple[RayActorHandle, ray.ObjectRef]] = []
623625
for handle in self._handles:
624626
try:
625-
handle._actor_ref.__ray_terminate__.remote()
627+
ref = handle._resolve().__ray_terminate__.remote()
628+
terminate_refs.append((handle, ref))
626629
except Exception as e:
627-
logger.warning("Failed to terminate Ray actor: %s", e)
630+
logger.warning("Failed to send terminate to Ray actor: %s", e)
631+
632+
# Phase 2: wait for graceful termination with a timeout.
633+
pending = [ref for _, ref in terminate_refs]
634+
if pending:
635+
try:
636+
_, still_pending = ray.wait(pending, num_returns=len(pending), timeout=graceful_timeout_s)
637+
except Exception:
638+
still_pending = pending
639+
# Phase 3: force-kill actors that did not terminate in time.
640+
still_pending_set = set(still_pending)
641+
for handle, ref in terminate_refs:
642+
if ref in still_pending_set:
643+
try:
644+
ray.kill(handle._resolve())
645+
except Exception as e:
646+
logger.warning("Failed to force-kill Ray actor: %s", e)

0 commit comments

Comments
 (0)