Skip to content

Commit 8e5b9a1

Browse files
rjogradycopybara-github
authored andcommitted
Performance improvements for parallel_bench.
PiperOrigin-RevId: 669349053 Change-Id: Ib690e15ef1bd636b34ceb7d1b6450e33b6690ae9
1 parent 0aecf88 commit 8e5b9a1

File tree

3 files changed

+41
-41
lines changed

3 files changed

+41
-41
lines changed

fleetbench/parallel/parallel_bench.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,15 @@ def main(argv: Sequence[str]) -> None:
131131
temp_root=_TEMP_ROOT.value,
132132
)
133133

134-
# TODO(rjogrady): Summarize / Print results.
135-
results = bench.Run(
134+
bench.Run(
136135
benchmark_target=_BENCHMARK_TARGET.value,
137136
benchmark_filter=_BENCHMARK_FILTER.value,
138137
workload_filter=_WORKLOAD_FILTER.value,
139138
benchmark_perf_counters=_BENCHMARK_PERF_COUNTERS.value,
140139
benchmark_repetitions=_BENCHMARK_REPETITIONS.value,
141140
benchmark_min_time=_BENCHMARK_MIN_TIME.value,
142141
)
143-
logging.info(
144-
"Ran %d benchmarks. Output is in %s", len(results), _TEMP_ROOT.value
145-
)
142+
logging.info("Benchmark output is in %s", _TEMP_ROOT.value)
146143

147144

148145
if __name__ == "__main__":

fleetbench/parallel/parallel_bench_lib.py

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
import json
1919
import math
2020
import os
21-
import random
22-
import statistics
2321
import time
2422

2523
from absl import logging
24+
import numpy as np
2625
import pandas as pd
2726

2827
from fleetbench.parallel import benchmark as bm
@@ -199,7 +198,6 @@ def __init__(
199198
self.temp_root = temp_root
200199
self.runtimes: dict[str, list[BenchmarkTimes]] = {}
201200
self.workers: dict[int, worker.Worker] = {}
202-
self.results: list[result.Result] = []
203201
self.utilization_samples: list[tuple[pd.Timestamp, float]] = []
204202

205203
def _PreRun(
@@ -257,29 +255,30 @@ def _PreRun(
257255
logging.debug("CPU activity: %s", self.cpus)
258256
os.sched_setaffinity(os.getpid(), [self.controller_cpu])
259257

260-
def _SelectNextBenchmark(self) -> bm.Benchmark:
261-
"""Randomly choose the next benchmark to run.
258+
def _ComputeBenchmarkWeights(self) -> np.ndarray:
259+
"""Probability is inversely based on expected runtime."""
262260

263-
Probability is inversely based on expected runtime.
261+
inverse_weights = np.empty(len(self.runtimes.keys()))
264262

265-
Returns:
266-
Benchmark name to run next.
267-
"""
268-
inverse_weights = []
269263
# Use the last 10 runtimes to estimate expected runtime.
270-
for times in self.runtimes.values():
271-
last_10_wall_times = [time.wall_time for time in times[-10:]]
272-
inverse_weights.append(1 / (statistics.mean(last_10_wall_times)))
273-
probabilities = [x / sum(inverse_weights) for x in inverse_weights]
274-
264+
for i, times in enumerate(self.runtimes.values()):
265+
last_10_wall_times = np.array([t.wall_time for t in times[-10:]])
266+
inverse_weights[i] = 1 / last_10_wall_times.mean()
267+
return inverse_weights / inverse_weights.sum()
268+
269+
def _SelectNextBenchmarks(self, count: int) -> list[bm.Benchmark]:
270+
"""Randomly choose some benchmarks to run."""
271+
272+
if count <= 0:
273+
return []
274+
# Probabilities based on the expected runtime.
275+
probabilities = self._ComputeBenchmarkWeights()
275276
# self.runtimes is a dict of benchmark name -> list of runtimes.
276-
# Pick a random one based on the expected runtime.
277-
benchmark_name = random.choices(
278-
k=1,
279-
population=list(self.runtimes.keys()),
280-
weights=probabilities,
281-
)[0]
282-
return self.benchmarks[benchmark_name]
277+
benchmark_names = list(self.runtimes.keys())
278+
selected_names = np.random.choice(
279+
benchmark_names, p=probabilities, size=count, replace=True
280+
)
281+
return [self.benchmarks[name] for name in selected_names]
283282

284283
def _RunSchedulingLoop(self) -> None:
285284
"""Check CPU utilization and pick the next job to schedule."""
@@ -318,19 +317,26 @@ def _RunSchedulingLoop(self) -> None:
318317
expected_utilization = 100 / len(self.cpus)
319318
jobs_required = math.ceil(utilization_gap / expected_utilization)
320319

320+
# Randomly select benchmarks, weighting by inverse runtime.
321+
# This makes it less likely to select long-running benchmarks and
322+
# get an even distribution.
323+
benchmarks = self._SelectNextBenchmarks(jobs_required)
324+
321325
# Schedule something on an available CPUs if we're under-utilized.
322326
# Otherwise, do nothing while we wait for utilization to drop.
323-
for _ in range(jobs_required):
327+
for benchmark in benchmarks:
328+
path = os.path.join(self.temp_root, "run_%03d" % next_run_id)
329+
r = run.Run(
330+
benchmark=benchmark,
331+
out_file=path,
332+
)
324333
for cpu_id in least_busy_cpus:
325-
benchmark = self._SelectNextBenchmark()
326-
path = os.path.join(self.temp_root, "run_%03d" % next_run_id)
327-
r = run.Run(
328-
benchmark=benchmark,
329-
out_file=path,
330-
)
331334
if self.workers[cpu_id].TryAddRun(r):
332335
next_run_id += 1
333336
logging.debug("Scheduling %s on CPU %d", benchmark, cpu_id)
337+
# We just added something to this worker, so presumably
338+
# trying to add the next benchmark to it will fail.
339+
least_busy_cpus.remove(cpu_id)
334340
break
335341

336342
# Process any available results. This updates the runtimes of each
@@ -343,7 +349,6 @@ def _RunSchedulingLoop(self) -> None:
343349
self.runtimes[r.benchmark].append(
344350
BenchmarkTimes(wall_time=r.duration, cpu_time=r.bm_cpu_time)
345351
)
346-
self.results.append(r)
347352

348353
def GenerateBenchmarkReport(self) -> None:
349354
"""Print some aggregated results of the benchmark runs."""
@@ -362,6 +367,7 @@ def GenerateBenchmarkReport(self) -> None:
362367
runtimes = pd.DataFrame(data)
363368

364369
# TODO(rjogrady): We can do more here - plot utilization over time, etc.
370+
logging.info("Ran %d total benchmarks", runtimes["Benchmark"].count())
365371
logging.info("Median Utilization: %f", utilization["utilization"].median())
366372
logging.info("Benchmark runtimes report:")
367373

@@ -419,7 +425,7 @@ def Run(
419425
benchmark_perf_counters: str = "",
420426
benchmark_repetitions: int = 0,
421427
benchmark_min_time: str = "",
422-
) -> list[result.Result]:
428+
):
423429
"""Run benchmarks in parallel."""
424430
logging.info("Running with benchmark_filter: %s", benchmark_filter)
425431
logging.info("Running with workload_filter: %s", workload_filter)
@@ -451,7 +457,7 @@ def Run(
451457

452458
logging.info("Shutting down all workers...")
453459
for w in self.workers.values():
454-
self.results.extend(w.StopAndGetResults())
460+
w.StopAndGetResults()
455461

456462
for cpu_id, w in self.workers.items():
457463
logging.debug("Joining worker on CPU %d", cpu_id)
@@ -462,5 +468,3 @@ def Run(
462468
if benchmark_perf_counters:
463469
counters = benchmark_perf_counters.split(",")
464470
self.GeneratePerfCounterReport(counters)
465-
466-
return self.results

fleetbench/parallel/parallel_bench_test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ def fake_utilization(unused_cpus):
6161
duration=0.1,
6262
temp_root=absltest.get_default_test_tmpdir(),
6363
)
64-
results = pb.Run("fake_bench", [])
64+
pb.Run("fake_bench", [])
6565
mock_execute.assert_called_once()
66-
self.assertLen(results, 1)
6766

6867
@mock.patch.object(bm, "GetSubBenchmarks", autospec=True)
6968
@flagsaver.flagsaver(

0 commit comments

Comments
 (0)