Skip to content

Commit e4121ee

Browse files
yonromaiyoblinclaude
authored
fix: worker processes exit cleanly after SHUTDOWN (#4099)
## Summary Fixes #4098 — worker processes now exit cleanly after receiving SHUTDOWN instead of blocking forever. - **Root cause**: `_host_actor()` called `threading.Event().wait()` on an anonymous Event with no external reference — nothing could ever call `.set()`. The non-daemon `ActorServer` thread independently kept the process alive. - **Fix**: Add `request_shutdown()` to the fray actor API. It sets a module-level shutdown event that `_host_actor` waits on. When signaled, `_host_actor` calls `server.stop()` for clean teardown. `ZephyrWorker` calls `request_shutdown()` when its polling loop ends (after SHUTDOWN or coordinator death). No-op on Ray/local backends. Uses a module-level global (not ContextVar) so child threads can reach it. ## Test plan - [x] `test_request_shutdown_unblocks_wait` — sets the event from the same thread - [x] `test_request_shutdown_noop_outside_actor` — no-op on Ray/local backends - [x] `test_host_actor_shutdown_stops_server` — full lifecycle: start server, signal shutdown, verify server + threads exit - [x] `test_request_shutdown_works_from_child_thread` — cross-thread: child thread signals shutdown event - [ ] Integration: run a Zephyr pipeline and verify workers exit after SHUTDOWN without external kill --- 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: yoblin <268258002+yoblin@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3de7918 commit e4121ee

5 files changed

Lines changed: 136 additions & 5 deletions

File tree

lib/fray/src/fray/v2/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@
33

44
"""Fray v2: minimal job and actor scheduling interface."""
55

6-
from fray.v2.actor import ActorContext, ActorFuture, ActorGroup, ActorHandle, ActorMethod, current_actor
6+
from fray.v2.actor import (
7+
ActorContext,
8+
ActorFuture,
9+
ActorGroup,
10+
ActorHandle,
11+
ActorMethod,
12+
current_actor,
13+
request_shutdown,
14+
)
715
from fray.v2.client import Client, JobAlreadyExists, JobFailed, JobHandle, current_client, set_current_client, wait_all
816
from fray.v2.local_backend import LocalActorHandle, LocalActorMethod, LocalClient, LocalJobHandle
917
from fray.v2.types import (
@@ -61,6 +69,7 @@
6169
"current_actor",
6270
"current_client",
6371
"get_tpu_topology",
72+
"request_shutdown",
6473
"set_current_client",
6574
"wait_all",
6675
]

lib/fray/src/fray/v2/actor.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from __future__ import annotations
1212

13+
import threading
1314
from contextvars import ContextVar
1415
from dataclasses import dataclass
1516
from typing import Any, Protocol
@@ -50,6 +51,10 @@ def shutdown(self) -> None:
5051

5152
_current_actor_ctx: ContextVar[ActorContext | None] = ContextVar("actor_context", default=None)
5253

54+
# Module-level (not ContextVar) so child threads spawned by the actor can call
55+
# request_shutdown(). _host_actor runs one actor per process, so global scope is correct.
56+
_actor_shutdown_event: threading.Event | None = None
57+
5358

5459
def current_actor() -> ActorContext:
5560
"""Get the current actor's context. Must be called from within an actor.
@@ -75,6 +80,27 @@ def _reset_current_actor(token):
7580
_current_actor_ctx.reset(token)
7681

7782

83+
def request_shutdown() -> None:
84+
"""Signal that the hosting actor process should exit.
85+
86+
Call from within an actor (e.g. after receiving SHUTDOWN from a coordinator)
87+
to unblock _host_actor and trigger a clean server teardown. No-op when
88+
running under a backend that doesn't use _host_actor (Ray, LocalClient).
89+
"""
90+
if _actor_shutdown_event is not None:
91+
_actor_shutdown_event.set()
92+
93+
94+
def _set_shutdown_event(event: threading.Event) -> None:
95+
global _actor_shutdown_event
96+
_actor_shutdown_event = event
97+
98+
99+
def _clear_shutdown_event() -> None:
100+
global _actor_shutdown_event
101+
_actor_shutdown_event = None
102+
103+
78104
class ActorFuture(Protocol):
79105
"""Future for an actor method call."""
80106

lib/fray/src/fray/v2/iris_backend.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@
3535
from iris.cluster.types import Entrypoint as IrisEntrypoint
3636
from iris.rpc import cluster_pb2
3737

38-
from fray.v2.actor import ActorContext, ActorFuture, ActorHandle, HostedActor, _reset_current_actor, _set_current_actor
38+
from fray.v2.actor import (
39+
ActorContext,
40+
ActorFuture,
41+
ActorHandle,
42+
HostedActor,
43+
_clear_shutdown_event,
44+
_reset_current_actor,
45+
_set_current_actor,
46+
_set_shutdown_event,
47+
)
3948
from fray.v2.client import JobAlreadyExists as FrayJobAlreadyExists
4049
from fray.v2.types import (
4150
ActorConfig,
@@ -216,6 +225,11 @@ def _host_actor(actor_class: type, args: tuple, kwargs: dict, name_prefix: str)
216225
actor_name = f"{ctx.job_id}/{name_prefix}-{job_info.task_index}"
217226
logger.info(f"Starting actor: {actor_name} (job_id={ctx.job_id})")
218227

228+
# Shutdown event lets the actor signal that the hosting process should exit.
229+
# request_shutdown() sets this event, unblocking the wait below.
230+
shutdown_event = threading.Event()
231+
_set_shutdown_event(shutdown_event)
232+
219233
# Create handle BEFORE instance so actor can access it during __init__
220234
handle = IrisActorHandle(actor_name)
221235
actor_ctx = ActorContext(handle=handle, index=job_info.task_index, group_name=name_prefix)
@@ -236,8 +250,11 @@ def _host_actor(actor_class: type, args: tuple, kwargs: dict, name_prefix: str)
236250
ctx.registry.register(actor_name, address)
237251
logger.info(f"Actor {actor_name} ready and listening")
238252

239-
# Block forever — job termination kills the process
240-
threading.Event().wait()
253+
# Block until the actor signals shutdown via request_shutdown()
254+
shutdown_event.wait()
255+
logger.info(f"Actor {actor_name} shutting down")
256+
_clear_shutdown_event()
257+
server.stop()
241258

242259

243260
class IrisActorHandle:

lib/zephyr/src/zephyr/execution.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import pyarrow as pa
3535
from iris.marin_fs import open_url, url_to_fs
3636
from fray.v2 import ActorConfig, ActorFuture, ActorHandle, Client, ResourceConfig
37+
from fray.v2.actor import request_shutdown
3738
from fray.v2.client import JobHandle
3839
from fray.v2.types import Entrypoint, JobRequest
3940
from iris.marin_fs import marin_temp_bucket
@@ -928,7 +929,8 @@ def _run_polling(self, coordinator: ActorHandle) -> None:
928929
finally:
929930
self._shutdown_event.set()
930931
heartbeat_thread.join(timeout=5.0)
931-
logger.debug("[%s] Polling loop ended", self._worker_id)
932+
logger.debug("[%s] Polling loop ended, requesting host shutdown", self._worker_id)
933+
request_shutdown()
932934

933935
def _heartbeat_loop(
934936
self, coordinator: ActorHandle, interval: float = 5.0, max_consecutive_failures: int = 5

tests/test_host_actor_shutdown.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Regression tests for #4098: _host_actor blocks forever after SHUTDOWN."""
5+
6+
import threading
7+
8+
from iris.actor.server import ActorServer
9+
from iris.managed_thread import thread_container_scope
10+
11+
from fray.v2.actor import _clear_shutdown_event, _set_shutdown_event, request_shutdown
12+
13+
14+
class _Noop:
15+
def ping(self) -> str:
16+
return "pong"
17+
18+
19+
def test_request_shutdown_unblocks_wait():
20+
event = threading.Event()
21+
_set_shutdown_event(event)
22+
try:
23+
assert not event.is_set()
24+
request_shutdown()
25+
assert event.is_set()
26+
finally:
27+
_clear_shutdown_event()
28+
29+
30+
def test_request_shutdown_noop_outside_actor():
31+
"""No-op when not in a hosted actor — supports Ray/local backends."""
32+
request_shutdown() # should not raise
33+
34+
35+
def test_host_actor_shutdown_stops_server():
36+
"""Shutdown signal unblocks the host thread and tears down the ActorServer."""
37+
with thread_container_scope("test-shutdown") as threads:
38+
server = ActorServer(host="127.0.0.1", port=0, threads=threads)
39+
server.register("test-actor", _Noop())
40+
server.serve_background()
41+
42+
shutdown_event = threading.Event()
43+
host_done = threading.Event()
44+
45+
def host_main():
46+
shutdown_event.wait()
47+
server.stop()
48+
host_done.set()
49+
50+
host_thread = threading.Thread(target=host_main, daemon=True)
51+
host_thread.start()
52+
assert threads.is_alive
53+
54+
shutdown_event.set()
55+
assert host_done.wait(timeout=5.0)
56+
host_thread.join(timeout=2.0)
57+
assert not host_thread.is_alive()
58+
assert not threads.is_alive
59+
60+
61+
def test_request_shutdown_works_from_child_thread():
62+
"""request_shutdown() must work from threads spawned by the actor (e.g. polling thread)."""
63+
event = threading.Event()
64+
_set_shutdown_event(event)
65+
try:
66+
triggered = threading.Event()
67+
68+
def child():
69+
request_shutdown()
70+
triggered.set()
71+
72+
t = threading.Thread(target=child, daemon=True)
73+
t.start()
74+
assert triggered.wait(timeout=2.0)
75+
assert event.is_set()
76+
finally:
77+
_clear_shutdown_event()

0 commit comments

Comments
 (0)