Skip to content

Commit 2db2b33

Browse files
myandprlimarkdcunha
authored andcommitted
[Serve] limit num_workers in replica's ThreadPoolExecutor to num_cpus (ray-project#60271)
## Description - Limit the user-code event loop’s default ThreadPoolExecutor size to the deployment’s ray_actor_options["num_cpus"] (fractional values round up, <=0 leaves defaults). - This ensures asyncio.to_thread in Serve replicas respects the CPU reservation and avoids oversubscription. - Added a Serve test that verifies the default executor’s max_workers matches num_cpus. ## Related issues > Link related issues: "Fixes ray-project#59750 ", "Closes ray-project#59750 ", or "Related to ray-project#59750 ". ## Additional information - Tests run: - python -m pytest python/ray/serve/tests/unit/test_user_callable_wrapper.py - python -m pytest python/ray/serve/tests/test_replica_sync_methods.py --------- Signed-off-by: yaommen <myanstu@163.com>
1 parent 44ba600 commit 2db2b33

File tree

6 files changed

+93
-2
lines changed

6 files changed

+93
-2
lines changed

doc/source/serve/advanced-guides/asyncio-best-practices.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,26 @@ Important differences:
9393
- FastAPI always dispatches `def` endpoints to a threadpool.
9494
- In pure Serve, `def` methods run on the event loop unless you opt into threadpool behavior.
9595

96+
## Threadpool sizing and overrides
97+
98+
Serve sets a default threadpool size for user code that mirrors Python's
99+
`ThreadPoolExecutor` defaults while respecting `ray_actor_options["num_cpus"]`.
100+
101+
In most cases, the default is fine. If you need to tune it, you can override the default
102+
executor inside your deployment:
103+
104+
```{literalinclude} ../doc_code/asyncio_best_practices.py
105+
:start-after: __threadpool_override_begin__
106+
:end-before: __threadpool_override_end__
107+
:language: python
108+
```
109+
110+
Guidance for choosing a size:
111+
112+
- Default is fine in most cases.
113+
- For I/O-blocking code, consider a threadpool larger than `num_cpus`.
114+
- For GIL-releasing compute (NumPy/Pandas/SciPy, etc.), keep the threadpool at or below `num_cpus`.
115+
96116
## Blocking versus non-blocking in practice
97117

98118
Blocking code keeps the event loop from processing other work. Non-blocking code yields control back to the loop when it's waiting on something.

doc/source/serve/doc_code/asyncio_best_practices.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,19 @@ def fetch():
8989
return await asyncio.to_thread(fetch)
9090
# __threaded_http_end__
9191

92+
# __threadpool_override_begin__
93+
from concurrent.futures import ThreadPoolExecutor
94+
95+
@serve.deployment
96+
class CustomThreadPool:
97+
def __init__(self):
98+
loop = asyncio.get_running_loop()
99+
loop.set_default_executor(ThreadPoolExecutor(max_workers=16))
100+
101+
async def __call__(self, request):
102+
return await asyncio.to_thread(lambda: "ok")
103+
# __threadpool_override_end__
104+
92105

93106
# __numpy_deployment_begin__
94107
@serve.deployment
@@ -331,7 +344,13 @@ async def __call__(self, request):
331344
result = cpu_threadpool_handle.remote(None).result()
332345
print(f"CPUWithThreadpool result: {result}")
333346
assert result == "ok"
334-
347+
348+
print("\nTesting CustomThreadPool deployment...")
349+
custom_threadpool_handle = serve.run(CustomThreadPool.bind())
350+
result = custom_threadpool_handle.remote(None).result()
351+
print(f"CustomThreadPool result: {result}")
352+
assert result == "ok"
353+
335354
print("\nTesting BlockingStream deployment...")
336355
# Test BlockingStream - just verify it can be created and called
337356
blocking_stream_handle = serve.run(BlockingStream.bind())
@@ -389,7 +408,7 @@ async def __call__(self, request):
389408
print("✅ ThreadedHTTP test passed")
390409
except Exception as e:
391410
print(f"⚠️ ThreadedHTTP test failed (expected): {type(e).__name__}: {e}")
392-
411+
393412
print("\nTesting OffloadIO deployment...")
394413
try:
395414
offload_io_handle = serve.run(OffloadIO.bind())

python/ray/serve/_private/local_testing_mode.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def make_local_deployment_handle(
7474
local_testing_mode=True,
7575
deployment_config=deployment._deployment_config,
7676
actor_id="local",
77+
ray_actor_options=deployment.ray_actor_options,
7778
)
7879
try:
7980
logger.info(f"Initializing local replica class for {deployment_id}.")

python/ray/serve/_private/replica.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import functools
44
import inspect
55
import logging
6+
import math
67
import os
78
import pickle
89
import threading
@@ -635,6 +636,7 @@ def __init__(
635636
local_testing_mode=False,
636637
deployment_config=deployment_config,
637638
actor_id=actor_id,
639+
ray_actor_options=self._version.ray_actor_options,
638640
)
639641
self._semaphore = Semaphore(lambda: self.max_ongoing_requests)
640642

@@ -1692,6 +1694,7 @@ def __init__(
16921694
local_testing_mode: bool,
16931695
deployment_config: DeploymentConfig,
16941696
actor_id: str,
1697+
ray_actor_options: Optional[Dict] = None,
16951698
):
16961699
if not (inspect.isfunction(deployment_def) or inspect.isclass(deployment_def)):
16971700
raise TypeError(
@@ -1715,6 +1718,10 @@ def __init__(
17151718
# Will be populated in `initialize_callable`.
17161719
self._callable = None
17171720
self._deployment_config = deployment_config
1721+
self._ray_actor_options = ray_actor_options or {}
1722+
self._user_code_threadpool: Optional[
1723+
concurrent.futures.ThreadPoolExecutor
1724+
] = None
17181725

17191726
if self._run_user_code_in_separate_thread:
17201727
# All interactions with user code run on this loop to avoid blocking the
@@ -1740,6 +1747,7 @@ def _run_user_code_event_loop():
17401747
# Required so that calls to get the current running event loop work
17411748
# properly in user code.
17421749
asyncio.set_event_loop(self._user_code_event_loop)
1750+
self._configure_user_code_threadpool()
17431751
# Start monitoring before run_forever so the task is scheduled.
17441752
self._user_code_loop_monitor.start(self._user_code_event_loop)
17451753
self._user_code_event_loop.run_forever()
@@ -1752,11 +1760,28 @@ def _run_user_code_event_loop():
17521760
else:
17531761
self._user_code_event_loop = asyncio.get_running_loop()
17541762
self._user_code_loop_monitor = None
1763+
self._configure_user_code_threadpool()
17551764

17561765
@property
17571766
def event_loop(self) -> asyncio.AbstractEventLoop:
17581767
return self._user_code_event_loop
17591768

1769+
def _get_user_code_threadpool_max_workers(self) -> Optional[int]:
1770+
num_cpus = self._ray_actor_options.get("num_cpus")
1771+
if num_cpus is None:
1772+
return None
1773+
# Mirror ThreadPoolExecutor default behavior while respecting num_cpus.
1774+
return min(32, max(1, int(math.ceil(num_cpus))) + 4)
1775+
1776+
def _configure_user_code_threadpool(self) -> None:
1777+
max_workers = self._get_user_code_threadpool_max_workers()
1778+
if max_workers is None:
1779+
return
1780+
self._user_code_threadpool = concurrent.futures.ThreadPoolExecutor(
1781+
max_workers=max_workers
1782+
)
1783+
self._user_code_event_loop.set_default_executor(self._user_code_threadpool)
1784+
17601785
def _run_user_code(f: Callable) -> Callable:
17611786
"""Decorator to run a coroutine method on the user code event loop.
17621787
@@ -2477,3 +2502,6 @@ async def call_destructor(self):
24772502

24782503
except Exception as e:
24792504
logger.exception(f"Exception during graceful shutdown of replica: {e}")
2505+
finally:
2506+
if self._user_code_threadpool is not None:
2507+
self._user_code_threadpool.shutdown(wait=False)

python/ray/serve/tests/test_replica_sync_methods.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,27 @@ async def __call__(self):
126126
assert h.remote().result() == 10
127127

128128

129+
@pytest.mark.skipif(
130+
not RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
131+
reason="Run sync method in threadpool FF disabled.",
132+
)
133+
@pytest.mark.parametrize(
134+
("num_cpus", "expected_workers"),
135+
[(0, 5), (2.2, 7), (30, 32)],
136+
)
137+
def test_asyncio_default_executor_limited_by_num_cpus(
138+
serve_instance, num_cpus, expected_workers
139+
):
140+
@serve.deployment(ray_actor_options={"num_cpus": num_cpus})
141+
class D:
142+
async def __call__(self):
143+
loop = asyncio.get_running_loop()
144+
executor = loop._default_executor
145+
return executor._max_workers if executor is not None else None
146+
147+
h = serve.run(D.bind())
148+
assert h.remote().result() == expected_workers
149+
150+
129151
if __name__ == "__main__":
130152
sys.exit(pytest.main(["-v", "-s", __file__]))

python/ray/serve/tests/unit/test_user_callable_wrapper.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def _make_user_callable_wrapper(
106106
local_testing_mode=False,
107107
deployment_config=DeploymentConfig(max_ongoing_requests=100),
108108
actor_id="test-actor-id",
109+
ray_actor_options={},
109110
)
110111

111112

0 commit comments

Comments
 (0)