Skip to content

Commit 3faa7eb

Browse files
andreas-abelcopybara-github
authored andcommitted
Add support for multi-threaded benchmarks to the parallel runner
PiperOrigin-RevId: 794119102 Change-Id: I07a1aefdfcfbdf9767452822c399f7fc8bf2c607
1 parent b8c07c7 commit 3faa7eb

File tree

5 files changed

+215
-21
lines changed

5 files changed

+215
-21
lines changed

fleetbench/parallel/parallel_bench.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@
130130
" default weights.",
131131
)
132132

133+
_CUSTOM_BENCHMARK_THREADS = flags.DEFINE_multi_string(
134+
"benchmark_threads",
135+
[],
136+
"Number of threads to use for selected benchmarks. The input should be in"
137+
" the format of <benchmark_name|benchmark_filter>:<n_threads>. Benchmarks"
138+
" for which no thread count is specified will use one thread by default.",
139+
)
140+
133141
_NUM_CPUS = flags.DEFINE_integer(
134142
"num_cpus",
135143
len(cpu.Available()),
@@ -169,6 +177,35 @@
169177
)
170178

171179

180+
def _ParseBenchmarkThreads(
181+
benchmark_threads_list: list[str],
182+
) -> dict[str, int]:
183+
"""Parses a list of benchmark thread count specs into a dictionary.
184+
185+
The string element in the list should be in the format:
186+
<benchmark_name|benchmark_filter>:<n_threads>.
187+
188+
Args:
189+
benchmark_threads_list: A list of strings to parse.
190+
191+
Returns:
192+
A dictionary of {<benchmark_name|benchmark_filter>: <n_threads>}.
193+
"""
194+
benchmark_threads = {}
195+
for spec in benchmark_threads_list:
196+
try:
197+
benchmark, n_threads = spec.rsplit(":", maxsplit=1)
198+
benchmark_threads[benchmark] = int(n_threads)
199+
except ValueError:
200+
logging.warning(
201+
"Invalid benchmark string: %s. The format should be"
202+
" <benchmark_name|benchmark_filter>:<n_threads>. Skipping...",
203+
spec,
204+
)
205+
206+
return benchmark_threads
207+
208+
172209
def main(argv: Sequence[str]) -> None:
173210
if len(argv) > 1:
174211
raise app.UsageError("Too many command-line arguments.")
@@ -201,6 +238,7 @@ def main(argv: Sequence[str]) -> None:
201238
temp_parent_root=_TEMP_ROOT.value,
202239
keep_raw_data=_KEEP_RAW_DATA.value,
203240
benchmark_perf_counters=_BENCHMARK_PERF_COUNTERS.value,
241+
benchmark_threads=_ParseBenchmarkThreads(_CUSTOM_BENCHMARK_THREADS.value),
204242
)
205243

206244
bench.SetWeights(

fleetbench/parallel/parallel_bench_lib.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414

1515
"""Run Fleetbench benchmarks in parallel."""
1616

17+
import collections
1718
import dataclasses
1819
import json
1920
import math
2021
import os
2122
import shutil
2223
import threading
2324
import time
24-
from typing import Any
2525

2626
from absl import logging
2727
import numpy as np
@@ -74,6 +74,7 @@ class ParallelBench:
7474
cpu_affinity: Whether to bind each worker to a CPU or allow the scheduler to
7575
move them around.
7676
benchmark_weights: Whether to use adaptive benchmark selection.
77+
benchmark_threads: Number of threads to use for selected benchmarks.
7778
benchmarks: List of benchmarks to run.
7879
target_utilization: Target utilization from 0 to 1.
7980
duration: How long in seconds to run for.
@@ -107,6 +108,7 @@ def __init__(
107108
temp_parent_root: str,
108109
keep_raw_data: bool,
109110
benchmark_perf_counters: str,
111+
benchmark_threads: dict[str, int],
110112
):
111113
"""Initialize the parallel benchmark runner."""
112114

@@ -118,6 +120,7 @@ def __init__(
118120
self.cpus = cpus[1:]
119121
self.cpu_affinity = cpu_affinity
120122
self.benchmark_weights: dict[str, float] = {}
123+
self.benchmark_threads = benchmark_threads
121124
self.benchmarks: dict[str, bm.Benchmark] = {}
122125
self.target_utilization = utilization * 100
123126
self.duration = duration
@@ -350,8 +353,8 @@ def _RunSchedulingLoop(self) -> None:
350353

351354
self.utilization_samples.append((pd.Timestamp.now(), total_utilization))
352355

353-
least_busy_cpus = sorted(
354-
utilization_per_cpu.keys(), key=utilization_per_cpu.get
356+
least_busy_cpus = collections.OrderedDict(
357+
sorted(utilization_per_cpu.items(), key=lambda item: item[1])
355358
)
356359

357360
# E.g. we are at 50% utilization, target is 70%.
@@ -374,14 +377,40 @@ def _RunSchedulingLoop(self) -> None:
374377
benchmark=benchmark,
375378
out_file=path,
376379
)
377-
for cpu_id in least_busy_cpus:
378-
if self.workers[cpu_id].TryAddRun(r):
380+
required_n_threads = self.benchmark_threads.get(
381+
benchmark.BenchmarkName(), 1
382+
)
383+
# If required_n_threads > 1, we will try to reserve required_n_threads-1
384+
# additional workers, whose CPUs we can temporarily assign to the main
385+
# worker for the benchmark.
386+
extra_workers = []
387+
for cpu_id in list(least_busy_cpus.keys()):
388+
if required_n_threads > 1:
389+
if self.workers[cpu_id].TryBlock():
390+
logging.debug("Reserving CPU %d for %s", cpu_id, benchmark)
391+
extra_workers.append(self.workers[cpu_id])
392+
del least_busy_cpus[cpu_id]
393+
required_n_threads -= 1
394+
elif self.workers[cpu_id].TryAddRun(r, extra_workers):
379395
next_run_id += 1
380-
logging.debug("Scheduling %s on CPU %d", benchmark, cpu_id)
396+
if extra_workers:
397+
logging.debug(
398+
"Scheduling %s on CPUs %s",
399+
benchmark,
400+
[cpu_id]
401+
+ [extra_worker.cpu for extra_worker in extra_workers],
402+
)
403+
else:
404+
logging.debug("Scheduling %s on CPU %d", benchmark, cpu_id)
381405
# We just added something to this worker, so presumably
382406
# trying to add the next benchmark to it will fail.
383-
least_busy_cpus.remove(cpu_id)
407+
del least_busy_cpus[cpu_id]
384408
break
409+
else:
410+
# We did not find enough CPUs for the benchmark. If we blocked any
411+
# extra workers, we will unblock them here.
412+
for w in extra_workers:
413+
w.Unblock()
385414

386415
# Process any available results. This updates the runtimes of each
387416
# benchmark to make the scheduling probabilities more accurate.

fleetbench/parallel/parallel_bench_lib_test.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def setUp(self):
4444
temp_parent_root=absltest.get_default_test_tmpdir(),
4545
keep_raw_data=True,
4646
benchmark_perf_counters="",
47+
benchmark_threads={},
4748
)
4849

4950
def tearDown(self):
@@ -104,6 +105,7 @@ def fake_utilization(unused_cpus):
104105
temp_parent_root=absltest.get_default_test_tmpdir(),
105106
keep_raw_data=True,
106107
benchmark_perf_counters="",
108+
benchmark_threads={},
107109
)
108110
self.pb.SetWeights(
109111
benchmark_target="fake_bench",
@@ -188,6 +190,68 @@ def test_set_extra_benchmark_flags(self):
188190
],
189191
)
190192

193+
@mock.patch.object(bm, "GetSubBenchmarks", autospec=True)
194+
@mock.patch.object(run.Run, "Execute", autospec=True)
195+
@mock.patch.object(cpu, "Utilization", autospec=True)
196+
@mock.patch.object(reporter, "GenerateBenchmarkReport", autospec=True)
197+
@mock.patch.object(
198+
reporter, "SaveBenchmarkResults", autospec=True, return_value=(None, None)
199+
)
200+
@flagsaver.flagsaver(
201+
benchmark_dir=absltest.get_default_test_tmpdir(),
202+
)
203+
def testRunThreads(
204+
self,
205+
mock_save_benchmark_results,
206+
mock_generate_benchmark_report,
207+
mock_utilization,
208+
mock_execute,
209+
mock_get_subbenchmarks,
210+
):
211+
mock_get_subbenchmarks.return_value = ["BM_Test1"]
212+
mock_execute.return_value = result.Result(
213+
benchmark="fake_bench (BM_Test1)",
214+
rc=0,
215+
stdout="fake_stdout",
216+
stderr="fake_stderr",
217+
duration=0.01,
218+
bm_cpu_time=0.01,
219+
result="fake_result",
220+
)
221+
self.create_tempfile(
222+
os.path.join(absltest.get_default_test_tmpdir(), "fake_bench")
223+
)
224+
225+
def fake_utilization(unused_cpus):
226+
# Return 0% for the first call, then 55% for the rest.
227+
fake_utilizations = [(0, {1: 0, 2: 0}, 0), (55, {1: 55, 2: 55}, 1)]
228+
return fake_utilizations[min(mock_utilization.call_count - 1, 1)]
229+
230+
mock_utilization.side_effect = fake_utilization
231+
232+
mock_generate_benchmark_report.return_value = pd.DataFrame()
233+
234+
self.pb = parallel_bench_lib.ParallelBench(
235+
cpus=[0, 1, 2],
236+
cpu_affinity=False,
237+
utilization=0.5,
238+
duration=0.1,
239+
repetitions=1,
240+
temp_parent_root=absltest.get_default_test_tmpdir(),
241+
keep_raw_data=True,
242+
benchmark_perf_counters="",
243+
benchmark_threads={"BM_Test1": 2},
244+
)
245+
self.pb.SetWeights(
246+
benchmark_target="fake_bench",
247+
benchmark_filter=None,
248+
workload_filter=None,
249+
scheduling_strategy=weights.SchedulingStrategy.BM_WEIGHTED,
250+
custom_benchmark_weights=None,
251+
)
252+
self.pb.Run()
253+
mock_execute.assert_called_once()
254+
191255
def test_convert_to_dataframe(self):
192256
# First entries are fake durations, the second entries are real durations.
193257
self.pb.runtimes["BM_Test1"] = [

fleetbench/parallel/worker.py

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,26 @@
1414

1515
"""Worker for parallel benchmark."""
1616

17+
import dataclasses
1718
import os
1819
import queue
1920
import threading
20-
from typing import Optional
21+
from typing import Optional, Self
2122

2223
from absl import logging
2324

2425
from fleetbench.parallel import result
2526
from fleetbench.parallel import run as parallel_run
2627

2728

29+
@dataclasses.dataclass
30+
class RunAndExtraWorkers:
31+
"""Groups a benchmark run with any additional workers it needs."""
32+
33+
run: parallel_run.Run
34+
extra_workers: list["Worker"]
35+
36+
2837
class Worker(threading.Thread):
2938
"""Per-CPU worker for running benchmarks.
3039
@@ -34,32 +43,57 @@ class Worker(threading.Thread):
3443
result_q: Queue of results from completed runs.
3544
cpu: The CPU number this worker is assigned to.
3645
affinity: If true, bind this thread to the assigned CPU.
46+
lock: Lock to temporarily block this worker while its CPU is being used by
47+
another worker that requires more than one CPU.
48+
in_use_as_extra_worker: Whether this worker is currently being used as an
49+
extra worker.
3750
"""
3851

3952
def __init__(self, cpu: int, affinity: bool):
4053
super().__init__()
41-
self._command_q: queue.Queue[Optional[parallel_run.Run]] = queue.Queue(
54+
self._command_q: queue.Queue[Optional[RunAndExtraWorkers]] = queue.Queue(
4255
maxsize=1
4356
)
4457
self._result_q: queue.Queue[Optional[result.Result]] = queue.Queue()
45-
self._cpu = cpu
58+
self.cpu = cpu
4659
self._affinity = affinity
60+
self._lock = threading.Lock()
61+
self._in_use_as_extra_worker = False
4762

48-
def TryAddRun(self, run: parallel_run.Run) -> bool:
63+
def TryAddRun(self, run: parallel_run.Run, extra_workers: list[Self]) -> bool:
4964
"""Tries to add a run to the worker's queue.
5065
5166
Args:
5267
run: The run to add to the queue.
68+
extra_workers: The extra workers to add to the queue.
5369
5470
Returns:
5571
True if successful, False if the queue is full.
5672
"""
73+
if self._in_use_as_extra_worker:
74+
return False
5775
try:
58-
self._command_q.put_nowait(run)
76+
self._command_q.put_nowait(RunAndExtraWorkers(run, extra_workers))
5977
return True
6078
except queue.Full:
6179
return False
6280

81+
def TryBlock(self) -> bool:
82+
"""Tries to block the worker so that its CPU can be used by another worker.
83+
84+
Returns:
85+
True if successful, False if the worker is busy.
86+
"""
87+
lock_acquired = self._lock.acquire(blocking=False)
88+
if lock_acquired:
89+
self._in_use_as_extra_worker = True
90+
return lock_acquired
91+
92+
def Unblock(self) -> None:
93+
"""Resume the worker's normal operation."""
94+
self._in_use_as_extra_worker = False
95+
self._lock.release()
96+
6397
def StopAndGetResults(self) -> list[result.Result]:
6498
"""Shut down the worker loop, then wait for results."""
6599
self._command_q.put(None)
@@ -87,15 +121,26 @@ def TryGetResults(self) -> list[result.Result]:
87121
def run(self):
88122
"""Called when the thread is started. Loops executing commands."""
89123
if self._affinity:
90-
os.sched_setaffinity(threading.get_native_id(), [self._cpu])
91-
logging.debug("Worker %d running", self._cpu)
124+
os.sched_setaffinity(threading.get_native_id(), [self.cpu])
125+
logging.debug("Worker %d running", self.cpu)
92126

93127
while True:
94-
run_object = self._command_q.get()
95-
if run_object is None:
96-
logging.debug("Worker %d shutting down", self._cpu)
128+
task = self._command_q.get()
129+
if task is None:
130+
logging.debug("Worker %d shutting down", self.cpu)
97131
self._result_q.put(None)
98132
break
99-
self._result_q.put(run_object.Execute())
100-
101-
logging.debug("Worker %d exiting", self._cpu)
133+
with self._lock:
134+
extra_workers = task.extra_workers
135+
if extra_workers and self._affinity:
136+
os.sched_setaffinity(
137+
threading.get_native_id(),
138+
[self.cpu] + [extra_worker.cpu for extra_worker in extra_workers],
139+
)
140+
self._result_q.put(task.run.Execute())
141+
if extra_workers and self._affinity:
142+
os.sched_setaffinity(threading.get_native_id(), [self.cpu])
143+
for extra_worker in extra_workers:
144+
extra_worker.Unblock()
145+
146+
logging.debug("Worker %d exiting", self.cpu)

0 commit comments

Comments
 (0)