Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@
)


def _posix_spawn_can_close_fds() -> bool:
"""Return whether CPython can use posix_spawn with close_fds=True."""
return hasattr(os, "POSIX_SPAWN_CLOSEFROM")


def _site_flags() -> List[str]:
"""Detect whether flags related to site packages are enabled for the current
interpreter. To run Ray in hermetic build environments, it helps to pass these flags
Expand Down Expand Up @@ -745,7 +750,7 @@ def extract_ip_port(bootstrap_address: str):
ip_port = parse_address(bootstrap_address)
if ip_port is None:
raise ValueError(
f"Malformed address {bootstrap_address}. " f"Expected '<host>:<port>'."
f"Malformed address {bootstrap_address}. Expected '<host>:<port>'."
)
ip, port = ip_port
try:
Expand All @@ -754,8 +759,7 @@ def extract_ip_port(bootstrap_address: str):
raise ValueError(f"Malformed address port {port}. Must be an integer.")
if port < 1024 or port > 65535:
raise ValueError(
f"Invalid address port {port}. Must be between 1024 "
"and 65535 (inclusive)."
f"Invalid address port {port}. Must be between 1024 and 65535 (inclusive)."
)
return ip, port

Expand Down Expand Up @@ -867,6 +871,7 @@ def start_ray_process(
stdout_file: Optional[IO[AnyStr]] = None,
stderr_file: Optional[IO[AnyStr]] = None,
pipe_stdin: bool = False,
use_posix_spawn: bool = False,
):
"""Start one of the Ray processes.

Expand Down Expand Up @@ -898,6 +903,12 @@ def start_ray_process(
no redirection should happen, then this should be None.
pipe_stdin: If true, subprocess.PIPE will be passed to the process as
stdin.
use_posix_spawn: If true on POSIX, avoid preexec_fn so CPython can use
its posix_spawn fast path. On runtimes that support closing file
descriptors from posix_spawn, keep close_fds=True. Older runtimes
need close_fds=False to stay off the fork path. This also skips
Ray's SIGINT-masking preexec hook, so it is only safe for
subprocesses that do not need fate sharing or that signal mask.

Returns:
Information about the process that was started including a handle to
Expand Down Expand Up @@ -963,6 +974,7 @@ def start_ray_process(
env_updates = {}
if not isinstance(env_updates, dict):
raise ValueError("The 'env_updates' argument must be a dictionary.")
use_posix_spawn = use_posix_spawn and sys.platform != "win32"

modified_env = os.environ.copy()
modified_env.update(env_updates)
Expand Down Expand Up @@ -1015,6 +1027,9 @@ def start_ray_process(
"kernel-level fate-sharing must only be specified if "
"detect_fate_sharing_support() has returned True"
)
if use_posix_spawn and fate_share:
raise ValueError("'use_posix_spawn' cannot be combined with 'fate_share'.")
close_fds = not use_posix_spawn or _posix_spawn_can_close_fds()

def preexec_fn():
import signal
Expand All @@ -1037,20 +1052,35 @@ def preexec_fn():
total_chrs = sum([len(x) for x in command])
if total_chrs > 31766:
raise ValueError(
f"command is limited to a total of 31767 characters, "
f"got {total_chrs}"
f"command is limited to a total of 31767 characters, got {total_chrs}"
)

process = ConsolePopen(
command,
env=modified_env,
cwd=cwd,
stdout=stdout_file,
stderr=stderr_file,
stdin=subprocess.PIPE if pipe_stdin else None,
preexec_fn=preexec_fn if sys.platform != "win32" else None,
creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
previous_sigmask = None
should_block_sigint_for_spawn = (
use_posix_spawn
and hasattr(signal, "pthread_sigmask")
and hasattr(signal, "SIG_BLOCK")
and hasattr(signal, "SIG_SETMASK")
)
if should_block_sigint_for_spawn:
previous_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
try:
process = ConsolePopen(
command,
env=modified_env,
cwd=cwd,
stdout=stdout_file,
stderr=stderr_file,
stdin=subprocess.PIPE if pipe_stdin else None,
preexec_fn=(
None if sys.platform == "win32" or use_posix_spawn else preexec_fn
),
close_fds=close_fds,
creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
)
finally:
if previous_sigmask is not None:
signal.pthread_sigmask(signal.SIG_SETMASK, previous_sigmask)

if win32_fate_sharing:
try:
Expand Down Expand Up @@ -2473,13 +2503,27 @@ def start_ray_client_server(
if node_id:
command.append(f"--node-id={node_id}")

use_posix_spawn = server_type == "specific-server" and sys.platform != "win32"
# Specific Ray Client servers are spawned by the proxier, which is itself a
# multi-threaded gRPC server. Avoid a fork+preexec path there: gRPC may have
# active poller threads and can skip fork handlers, leaving the child to
# crash before it opens its channel. Specific servers self-terminate after
# being idle, monitor stdin EOF from setup_worker for abnormal parent death,
# and inherit a temporarily-blocked SIGINT mask from the spawning thread, so
# they can trade kernel fate sharing for a fork-safe spawn path.
process_fate_share = False if use_posix_spawn else fate_share
if use_posix_spawn:
command.append("--monitor-parent-pipe")
Comment on lines +2514 to +2516

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve parent-death cleanup during server bootstrap

When a POSIX specific server is launched, this disables kernel fate sharing immediately, but the replacement stdin monitor is only started later after setup_worker has finished and ray.util.client.server.main() begins. If the proxier process dies during that bootstrap/import/runtime-env command path, the child no longer gets killed by fate sharing and has not yet started monitoring stdin EOF, so it can continue running orphaned until it eventually reaches server startup. Start the parent-pipe monitor in the setup-worker phase or keep a startup-time fate-sharing mechanism until the monitor is active.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in the current branch. The parent-pipe monitor now starts in setup_worker.py before RuntimeEnvContext.deserialize() / exec_worker(), so parent death during runtime-env/bootstrap exits the child early. Because exec_worker() replaces the process with os.execvp, setup_worker also forwards --monitor-parent-pipe to the final Ray Client server, which starts its own post-exec stdin EOF monitor.

This preserves parent-death cleanup both before and after the exec boundary while keeping the fork-safe spawn path.

Validated with compileall, Ruff check, Ruff format check, and git diff --check. Targeted pytest still cannot collect locally because this checkout lacks ray._raylet.


process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER,
stdout_file=stdout_file,
stderr_file=stderr_file,
fate_share=fate_share,
fate_share=process_fate_share,
env_updates=env_updates,
pipe_stdin=use_posix_spawn,
use_posix_spawn=use_posix_spawn,
)
return process_info

Expand Down
47 changes: 47 additions & 0 deletions python/ray/_private/workers/setup_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import argparse
import logging
import os
import subprocess
import sys

from ray._private.ray_constants import LOGGER_FORMAT, LOGGER_LEVEL
from ray._private.ray_logging import setup_logger
Expand All @@ -8,6 +11,30 @@

logger = logging.getLogger(__name__)

_PARENT_PIPE_MONITOR_SCRIPT = """
import os
import select
import signal
import sys
target_pid = int(sys.argv[1])
while True:
readable, _, _ = select.select([sys.stdin], [], [], 1.0)
if readable:
data = os.read(sys.stdin.fileno(), 1)
if not data:
try:
os.kill(target_pid, signal.SIGTERM)
except ProcessLookupError:
pass
sys.exit(0)
try:
os.kill(target_pid, 0)
except ProcessLookupError:
sys.exit(0)
"""

parser = argparse.ArgumentParser(
description=("Set up the environment for a Ray worker and launch the worker.")
)
Expand All @@ -20,10 +47,30 @@

parser.add_argument("--language", type=str, help="the language type of the worker")

parser.add_argument(
"--monitor-parent-pipe",
required=False,
action="store_true",
help="Internal: exit when inherited stdin pipe reaches EOF.",
)


def _start_parent_pipe_monitor(enabled: bool):
if not enabled:
return None
stdin = getattr(sys.stdin, "buffer", sys.stdin)
return subprocess.Popen(
[sys.executable, "-c", _PARENT_PIPE_MONITOR_SCRIPT, str(os.getpid())],
stdin=stdin,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)


if __name__ == "__main__":
setup_logger(LOGGER_LEVEL, LOGGER_FORMAT)
args, remaining_args = parser.parse_known_args()
_start_parent_pipe_monitor(args.monitor_parent_pipe)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep parent monitor alive through runtime-env exec

When --monitor-parent-pipe is used with a non-empty runtime-env command_prefix, this monitor thread only exists until runtime_env_context.exec_worker(...) replaces setup_worker via os.execvp("bash", ...) in python/ray/_private/runtime_env/context.py; the daemon thread is then gone while the shell runs the prefix and before ray.util.client.server starts its own monitor. In that interval, if the proxier dies, stdin EOF is not consumed and fate_share has already been disabled for the specific server, so the child can still be orphaned during bootstrap. The monitor needs to survive the exec path or the parent-death mechanism needs to remain active until the final server monitor is running.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in the current branch. The parent-pipe monitor is no longer a daemon thread inside setup_worker; _start_parent_pipe_monitor() now starts a small subprocess that inherits the same stdin pipe and targets the setup_worker PID. That process survives RuntimeEnvContext.exec_worker() / os.execvp(...), so it continues covering the runtime-env command-prefix window and the final Ray Client server process. It also exits when the target PID is gone to avoid leaking after normal server shutdown.

The final server no longer starts a second stdin reader, so there is only one monitor consuming the inherited pipe.

Validated:

  • python -m compileall -q python/ray/_private/services.py python/ray/_private/workers/setup_worker.py python/ray/util/client/server/server.py python/ray/tests/test_debug_tools.py
  • ruff check python/ray/_private/services.py python/ray/_private/workers/setup_worker.py python/ray/util/client/server/server.py python/ray/tests/test_debug_tools.py
  • ruff format --check python/ray/_private/services.py python/ray/_private/workers/setup_worker.py python/ray/util/client/server/server.py python/ray/tests/test_debug_tools.py
  • git diff --check

I also retried the targeted pytest command, but this local checkout still cannot collect Ray tests because it lacks the compiled ray._raylet extension.

# NOTE(edoakes): args.serialized_runtime_env_context is only None when
# we're starting the main Ray client proxy server. That case should
# probably not even go through this codepath.
Expand Down
Loading
Loading