Skip to content

Commit c2776b1

Browse files
ajcasagrandenv-nmailhot
authored andcommitted
fix: cap the default number of max workers to 32
- changes the formula for record-processors to be 1 for every 4 workers if not specified - changes formula for default workers to min(concurrency, (cpus * 0.75) - 1) - caps the max workers to 32 regardless of cpu count unless the user specifies a --workers-max
1 parent 6a35b52 commit c2776b1

File tree

6 files changed

+89
-30
lines changed

6 files changed

+89
-30
lines changed

aiperf/common/config/worker_config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from aiperf.common.config.cli_parameter import CLIParameter, DisableCLI
99
from aiperf.common.config.config_defaults import WorkersDefaults
1010
from aiperf.common.config.groups import Groups
11+
from aiperf.common.constants import DEFAULT_MAX_WORKERS_CAP
1112

1213

1314
class WorkersConfig(BaseConfig):
@@ -27,7 +28,9 @@ class WorkersConfig(BaseConfig):
2728
int | None,
2829
Field(
2930
description="Maximum number of workers to create. If not specified, the number of"
30-
" workers will be determined by the smaller of (concurrency + 1) and (num CPUs - 1).",
31+
" workers will be determined by the formula `min(concurrency, (num CPUs * 0.75) - 1)`, "
32+
f" with a default max cap of `{DEFAULT_MAX_WORKERS_CAP}`. Any value provided will still be capped by"
33+
f" the concurrency value (if specified), but not by the max cap.",
3134
),
3235
CLIParameter(
3336
name=("--workers-max", "--max-workers"),

aiperf/common/constants.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,13 @@
9191
DEFAULT_WORKER_HEALTH_CHECK_INTERVAL = 2.0
9292
"""Default interval in seconds between worker health check messages."""
9393

94+
DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR = 4
95+
"""Default scale factor for the number of record processors to spawn based on the number of workers.
96+
This will spawn 1 record processor for every X workers."""
97+
98+
DEFAULT_MAX_WORKERS_CAP = 32
99+
"""Default absolute maximum number of workers to spawn, regardless of the number of CPU cores.
100+
Only applies if the user does not specify a max workers value."""
101+
94102
DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT = 10.0
95103
"""Default timeout for terminating the ZMQ context in seconds."""

aiperf/controller/system_controller.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
AIPERF_DEV_MODE,
1616
DEFAULT_PROFILE_CONFIGURE_TIMEOUT,
1717
DEFAULT_PROFILE_START_TIMEOUT,
18+
DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR,
1819
)
1920
from aiperf.common.enums import (
2021
CommandResponseStatus,
@@ -326,7 +327,8 @@ async def _handle_spawn_workers_command(self, message: SpawnWorkersCommand) -> N
326327
# If we are scaling the record processor service count with the number of workers, spawn the record processors
327328
if self.scale_record_processors_with_workers:
328329
await self.service_manager.run_service(
329-
ServiceType.RECORD_PROCESSOR, max(1, message.num_workers // 2)
330+
ServiceType.RECORD_PROCESSOR,
331+
max(1, message.num_workers // DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR),
330332
)
331333

332334
@on_command(CommandType.SHUTDOWN_WORKERS)

aiperf/workers/worker_manager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from aiperf.common.bootstrap import bootstrap_and_run_service
1010
from aiperf.common.config import ServiceConfig, UserConfig
1111
from aiperf.common.constants import (
12+
DEFAULT_MAX_WORKERS_CAP,
1213
DEFAULT_WORKER_CHECK_INTERVAL,
1314
DEFAULT_WORKER_ERROR_RECOVERY_TIME,
1415
DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE,
@@ -75,20 +76,25 @@ def __init__(
7576
self.max_concurrency = self.user_config.loadgen.concurrency
7677
self.max_workers = self.service_config.workers.max
7778
if self.max_workers is None:
78-
# Default to the number of CPU cores - 1
79-
self.max_workers = self.cpu_count - 1
79+
# Default to 75% of the CPU cores - 1, with a cap of DEFAULT_MAX_WORKERS_CAP, and a minimum of 1
80+
self.max_workers = max(
81+
1, min(int(self.cpu_count * 0.75) - 1, DEFAULT_MAX_WORKERS_CAP)
82+
)
83+
self.debug(
84+
lambda: f"Auto-setting max workers to {self.max_workers} due to no max workers specified."
85+
)
8086

8187
# Cap the worker count to the max concurrency, but only if the user is in concurrency mode.
82-
if self.max_concurrency:
83-
self.max_workers = min(
84-
self.max_concurrency,
85-
self.max_workers,
88+
if self.max_concurrency and self.max_concurrency < self.max_workers:
89+
self.max_workers = self.max_concurrency
90+
self.debug(
91+
lambda: f"Capping max workers to {self.max_workers} due to concurrency."
8692
)
8793

8894
# Ensure we have at least the min workers
8995
self.max_workers = max(
9096
self.max_workers,
91-
self.service_config.workers.min or 0,
97+
self.service_config.workers.min or 1,
9298
)
9399
self.initial_workers = self.max_workers
94100

docs/cli_options.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,9 @@ The following options are available when profiling using AIPerf.
157157
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
158158
```
159159
```
160-
╭─ Workers ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
161-
│ WORKERS-MAX --workers-max --max-workers Maximum number of workers to create. If not specified, the number of workers will be determined by the smaller of (concurrency + 1) and (num │
162-
│ CPUs - 1). │
163-
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
164-
```
160+
╭─ Workers ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
161+
│ WORKERS-MAX --workers-max --max-workers Maximum number of workers to create. If not specified, the number of workers will be determined by the formula │
162+
│ min(concurrency, (num CPUs * 0.75) - 1), with a default max cap of 32. Any value provided will still be capped by the │
163+
│ concurrency value (if specified), but not by the max cap. │
164+
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
165+
```

tests/workers/test_worker_manager.py

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,69 @@ class TestMaxWorkers:
1818
"""Test the max workers calculation logic in WorkerManager."""
1919

2020
@pytest.mark.parametrize(
21-
"concurrency,request_rate,max_workers,expected",
21+
"cpus,concurrency,max_workers,expected",
2222
[
23-
(None, 1000, None, 9), # Default case (10 fake CPUs - 1)
24-
(None, 1000, 4, 4), # Only max set
25-
(None, None, None, 1), # Concurrency defaults to 1
26-
(3, None, None, 3), # Only concurrency set
27-
(2, None, 5, 2), # Concurrency limits max
28-
(8, None, 3, 3), # Max limits concurrency
29-
(10, 1000, 5, 5), # Normal case with all values
23+
(10, 100, None, 6), # CPU-based limit: 10 * 0.75 - 1 = 6
24+
(10, 100, 4, 4), # max_workers setting limits to 4
25+
(10, None, None, 1), # Concurrency defaults to 1, which limits workers to 1
26+
(10,3,None,3), # Low concurrency (3) limits workers below CPU calculation
27+
(10, 8, 3, 3), # max_workers (3) overrides higher concurrency (8)
28+
(10, 10, 5, 5), # max_workers (5) overrides higher concurrency (10)
29+
(224, 1000, None, 32), # High CPU count with hard cap at 32 workers
30+
(32, 1000, None, 23), # CPU-based limit: 32 * 0.75 - 1 = 23
31+
(1, 100, None, 1), # Single CPU system, should default to 1 worker minimum
32+
(2, 100, None, 1), # Two CPUs: 2 * 0.75 - 1 = 0.5, rounded up to 1
33+
(4, 100, None, 2), # Four CPUs: 4 * 0.75 - 1 = 2
34+
(44,1000,None,32), # CPU count that would exceed 32 limit: 44 * 0.75 - 1 = 32
35+
(45,1000,None,32), # CPU count that hits the hard cap: 45 * 0.75 - 1 = 32.75
36+
(4, 100, 100, 100), # Very high max_workers, not limited by CPU calculation
37+
(64, 1, None, 1), # Concurrency of 1 limits to 1 worker regardless of CPUs
3038
],
31-
)
32-
def test_max_workers_combinations(
33-
self, concurrency, request_rate, max_workers, expected
39+
) # fmt: skip
40+
def test_max_workers_combinations(self, cpus, concurrency, max_workers, expected):
41+
"""Test max workers calculation with different CPU counts, concurrency, and max_workers settings."""
42+
with patch(
43+
"aiperf.workers.worker_manager.multiprocessing.cpu_count", return_value=cpus
44+
):
45+
service_config = ServiceConfig(workers=WorkersConfig(max=max_workers))
46+
user_config = UserConfig(
47+
endpoint=EndpointConfig(model_names=["test-model"]),
48+
loadgen=LoadGeneratorConfig(concurrency=concurrency),
49+
)
50+
51+
worker_manager = WorkerManager(
52+
service_config=service_config,
53+
user_config=user_config,
54+
service_id="test-worker-manager",
55+
)
56+
57+
assert worker_manager.max_workers == expected
58+
59+
@pytest.mark.parametrize(
60+
"cpus,request_rate,max_workers,expected",
61+
[
62+
(10, 50, None, 6), # CPU-based limit: 10 * 0.75 - 1 = 6 (no concurrency limit)
63+
(10, 100, 4, 4), # max_workers setting limits to 4
64+
(4, 10, None, 2), # Low CPU count: 4 * 0.75 - 1 = 2
65+
(2, 50, None, 1), # Very low CPU: 2 * 0.75 - 1 = 0.5, rounded up to 1
66+
(1, 100, None, 1), # Single CPU system minimum
67+
(64, 500, None, 32), # High CPU count with hard cap at 32 workers
68+
(10, 1, None, 6), # Very low request rate still uses CPU calculation
69+
(10, 1000, 8, 8), # High request rate with max_workers override
70+
(8, 50, 20, 20), # max_workers higher than CPU calc
71+
],
72+
) # fmt: skip
73+
def test_max_workers_with_request_rate_combinations(
74+
self, cpus, request_rate, max_workers, expected
3475
):
35-
"""Test various combinations of configuration values."""
76+
"""Test max workers calculation with request_rate mode where concurrency is 0/None."""
3677
with patch(
37-
"aiperf.workers.worker_manager.multiprocessing.cpu_count", return_value=10
78+
"aiperf.workers.worker_manager.multiprocessing.cpu_count", return_value=cpus
3879
):
3980
service_config = ServiceConfig(workers=WorkersConfig(max=max_workers))
4081
user_config = UserConfig(
4182
endpoint=EndpointConfig(model_names=["test-model"]),
42-
loadgen=LoadGeneratorConfig(
43-
concurrency=concurrency, request_rate=request_rate
44-
),
83+
loadgen=LoadGeneratorConfig(request_rate=request_rate),
4584
)
4685

4786
worker_manager = WorkerManager(

0 commit comments

Comments
 (0)