Skip to content

Commit 4952470

Browse files
liyuying0000copybara-github
authored andcommitted
Support different scheduling strategies:
1) Equal Benchmark Execution Time 2) Equal Workload Execution Time, which is set as default PiperOrigin-RevId: 740453475 Change-Id: I768d87eaebcd8e60076616f70179510bc31bae86
1 parent 3d582fd commit 4952470

File tree

6 files changed

+308
-40
lines changed

6 files changed

+308
-40
lines changed

fleetbench/parallel/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ py_binary(
102102
deps = [
103103
":cpu",
104104
":parallel_bench_lib",
105+
":weights",
105106
"@com_google_absl_py//absl:app",
106107
"@com_google_absl_py//absl/flags",
107108
"@com_google_absl_py//absl/logging",
@@ -118,8 +119,10 @@ py_test(
118119
":reporter",
119120
":result",
120121
":run",
122+
":weights",
121123
"@com_google_absl_py//absl/testing:absltest",
122124
"@com_google_absl_py//absl/testing:flagsaver",
125+
"@com_google_absl_py//absl/testing:parameterized",
123126
requirement("pandas"),
124127
],
125128
)

fleetbench/parallel/parallel_bench.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
from fleetbench.parallel import cpu
3131
from fleetbench.parallel import parallel_bench_lib
32+
from fleetbench.parallel import weights
3233

3334

3435
_CPU_AFFINITY = flags.DEFINE_bool(
@@ -105,6 +106,17 @@
105106
)
106107

107108

109+
_SCHEDULING_STRATEGY = flags.DEFINE_enum_class(
110+
"scheduling_strategy",
111+
weights.SchedulingStrategy.WORKLOAD_WEIGHTED,
112+
weights.SchedulingStrategy,
113+
"The scheduling strategy to use. The default is WORKLOAD_WEIGHTED, which"
114+
" means the benchmarks will be selected based on the expected"
115+
" workload runtime. If set to BM_WEIGHTED, the benchmarks will be selected"
116+
" based on the expected benchmark runtime.",
117+
)
118+
119+
108120
_CUSTOM_BENCHMARK_WEIGHTS = flags.DEFINE_multi_string(
109121
"benchmark_weights",
110122
[],
@@ -175,6 +187,7 @@ def main(argv: Sequence[str]) -> None:
175187
_BENCHMARK_TARGET.value,
176188
_BENCHMARK_FILTER.value,
177189
_WORKLOAD_FILTER.value,
190+
_SCHEDULING_STRATEGY.value,
178191
_CUSTOM_BENCHMARK_WEIGHTS.value,
179192
)
180193

fleetbench/parallel/parallel_bench_lib.py

Lines changed: 112 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ class ParallelBench:
8181
results: List of results from all runs.
8282
utilization_samples: List of (timestamp, utilization) tuples. Used to
8383
generate utilization over time plots.
84+
target_ratios: List of target ratios for each benchmark. This is used to
85+
calculate the probability of each benchmark being selected, and determined
86+
by the benchmark weights.
87+
first_run: Boolean indicating if this is the first run. We use this to
88+
determine if we can randomly select benchmarks or if we need to run all
89+
benchmarks at least once.
8490
"""
8591

8692
def __init__(
@@ -108,12 +114,15 @@ def __init__(
108114
self.runtimes: dict[str, list[BenchmarkMetrics]] = {}
109115
self.workers: dict[int, worker.Worker] = {}
110116
self.utilization_samples: list[tuple[pd.Timestamp, float]] = []
117+
self.target_ratios: list[float] = []
118+
self.first_run = True
111119

112120
def SetWeights(
113121
self,
114122
benchmark_target: str,
115123
benchmark_filter: list[str] | None,
116124
workload_filter: list[str] | None,
125+
scheduling_strategy: weights.SchedulingStrategy,
117126
custom_benchmark_weights: list[str] | None,
118127
) -> None:
119128
"""Sets the benchmark weights."""
@@ -135,7 +144,7 @@ def SetWeights(
135144
)
136145
# Gets the number of workloads and num of benchmark for each workload
137146
self.benchmark_weights = weights.GetBenchmarkWeights(
138-
self.benchmarks, custom_benchmark_weights
147+
self.benchmarks, scheduling_strategy, custom_benchmark_weights
139148
)
140149

141150
def _PreRun(
@@ -157,19 +166,23 @@ def _PreRun(
157166
for benchmark in self.benchmarks.values():
158167
benchmark.AddCommandFlags(benchmark_flags)
159168

160-
# Initialize the runtimes with a fake wall time of 1. This causes all
161-
# benchmarks to be equally likely at first.
162-
self.runtimes = {
163-
benchmark: [
164-
BenchmarkMetrics(
165-
total_duration=1.0,
166-
per_iteration_wall_time=0.0,
167-
per_iteration_cpu_time=0.0,
168-
per_bm_run_iteration=0,
169-
)
170-
]
171-
for benchmark in self.benchmarks.keys()
172-
}
169+
# Initialize the runtimes with a fake wall time. Based on empirically
170+
# observed runtimes, TCMalloc take 4x longer to run than others.
171+
for benchmark in self.benchmarks.keys():
172+
total_duration = 4 if "TCMALLOC" in benchmark else 1
173+
self.runtimes[benchmark] = [
174+
BenchmarkMetrics(
175+
total_duration=total_duration,
176+
per_iteration_wall_time=0.0,
177+
per_iteration_cpu_time=0.0,
178+
per_bm_run_iteration=0,
179+
)
180+
]
181+
182+
self.target_ratios = [
183+
self.benchmark_weights[instance.BenchmarkName()]
184+
for instance in self.benchmarks.values()
185+
]
173186

174187
# Create a worker thread for each CPU.
175188
self.workers = {
@@ -188,39 +201,109 @@ def _PreRun(
188201
logging.debug("CPU activity: %s", self.cpus)
189202
os.sched_setaffinity(os.getpid(), [self.controller_cpu])
190203

204+
def _AdjustProbabilities(
205+
self, target_ratios: list[float], current_runtime: list[float]
206+
) -> np.ndarray:
207+
"""Calculates benchmark probabilities.
208+
209+
We want to run benchmarks based on their expected runtime so that the
210+
actual runtimes align with desired target ratios.
211+
212+
Probability_i = Current RT_0 * Target Ratio_i / (Target Ratio_0 * Current
213+
RT_i)
214+
215+
Args:
216+
target_ratios: List of target ratios.
217+
current_runtime: List of current runtime.
218+
219+
Returns:
220+
List of normalized probabilities.
221+
"""
222+
223+
num_benchmarks = len(target_ratios)
224+
if num_benchmarks != len(current_runtime):
225+
raise ValueError(
226+
"Target and current ratio lists must have the same length."
227+
)
228+
229+
# Set the first BM to be the reference with probability 1.0.
230+
probabilities = [1.0]
231+
232+
# Calculate probabilities for other benchmarks
233+
for j in range(1, num_benchmarks):
234+
probability = (current_runtime[0] * target_ratios[j]) / (
235+
current_runtime[j] * target_ratios[0]
236+
)
237+
probabilities.append(probability)
238+
239+
# Normalize the values
240+
return np.array(probabilities) / np.sum(probabilities)
241+
242+
def _AdjustRuntime(self) -> np.ndarray:
243+
"""Adjusts the runtime of each benchmark to account for variance."""
244+
valid_data = False
245+
valid_index = -1
246+
current_runtimes = np.empty(len(self.runtimes.keys()))
247+
need_adjustment = []
248+
249+
for i, (times) in enumerate(self.runtimes.values()):
250+
# Skip the first run, which is a fake run.
251+
last_10_wall_times = np.array([t.total_duration for t in times[1:][-10:]])
252+
253+
if last_10_wall_times.size:
254+
current_runtimes[i] = last_10_wall_times.mean()
255+
valid_data = True
256+
valid_index = i
257+
else:
258+
current_runtimes[i] = times[0].total_duration
259+
# We need to adjust the runtime as it's already a valid value.
260+
need_adjustment.append(i)
261+
262+
if valid_data:
263+
for index in need_adjustment:
264+
current_runtimes[index] = (
265+
current_runtimes[index] * current_runtimes[valid_index]
266+
)
267+
return current_runtimes
268+
191269
def _ComputeBenchmarkWeights(self) -> np.ndarray:
192270
"""Probability is inversely based on expected runtime."""
193271

194-
inverse_weights = np.empty(len(self.runtimes.keys()))
272+
current_runtimes = self._AdjustRuntime()
195273

196-
# Use the last 10 runtimes to estimate expected runtime.
197-
for i, (benchmark_name, times) in enumerate(self.runtimes.items()):
198-
last_10_wall_times = np.array([t.total_duration for t in times[-10:]])
199-
base_weight = 1 / last_10_wall_times.mean()
274+
if not np.all(current_runtimes == 1.0):
275+
current_rt_ratios = current_runtimes / current_runtimes.sum()
200276

201-
# If we're using adaptive benchmark selection, adjust the weight based on
202-
# the benchmark's performance relative to the fleet.
203-
if self.benchmark_weights:
204-
for keyword, weight in self.benchmark_weights.items():
205-
if keyword in benchmark_name.upper():
206-
base_weight *= weight
207-
break
208-
inverse_weights[i] = base_weight
209-
return inverse_weights / inverse_weights.sum()
277+
probabilities = self._AdjustProbabilities(
278+
self.target_ratios, current_rt_ratios
279+
)
280+
else:
281+
probabilities = np.array(self.target_ratios) / np.sum(self.target_ratios)
282+
283+
return probabilities
210284

211285
def _SelectNextBenchmarks(self, count: int) -> list[bm.Benchmark]:
212286
"""Randomly choose some benchmarks to run."""
213287

214288
if count <= 0:
215289
return []
290+
291+
# We try to run all benchmarks at least once.
292+
benchmarks = []
293+
if self.first_run:
294+
if count > len(self.benchmarks):
295+
benchmarks = list(self.benchmarks.values())
296+
count = count - len(benchmarks)
297+
self.first_run = False
298+
216299
# Probabilities based on the expected runtime.
217300
probabilities = self._ComputeBenchmarkWeights()
218301
# self.runtimes is a dict of benchmark name -> list of runtimes.
219302
benchmark_names = list(self.runtimes.keys())
220303
selected_names = np.random.choice(
221304
benchmark_names, p=probabilities, size=count, replace=True
222305
)
223-
return [self.benchmarks[name] for name in selected_names]
306+
return benchmarks + [self.benchmarks[name] for name in selected_names]
224307

225308
def _RunSchedulingLoop(self) -> None:
226309
"""Check CPU utilization and pick the next job to schedule."""

fleetbench/parallel/parallel_bench_lib_test.py

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from absl.testing import absltest
2020
from absl.testing import flagsaver
21+
from absl.testing import parameterized
2122
import pandas as pd
2223

2324
from fleetbench.parallel import benchmark as bm
@@ -26,9 +27,10 @@
2627
from fleetbench.parallel import reporter
2728
from fleetbench.parallel import result
2829
from fleetbench.parallel import run
30+
from fleetbench.parallel import weights
2931

3032

31-
class ParallelBenchTest(absltest.TestCase):
33+
class ParallelBenchTest(parameterized.TestCase):
3234

3335
def setUp(self):
3436
super().setUp()
@@ -96,6 +98,7 @@ def fake_utilization(unused_cpus):
9698
benchmark_target="fake_bench",
9799
benchmark_filter=None,
98100
workload_filter=None,
101+
scheduling_strategy=weights.SchedulingStrategy.BM_WEIGHTED,
99102
custom_benchmark_weights=None,
100103
)
101104
self.pb.Run()
@@ -244,6 +247,107 @@ def test_post_processing_benchmark_results(
244247
mock_generate_benchmark_report.assert_called_once()
245248
mock_save_benchmark_results.assert_called_once()
246249

250+
@parameterized.named_parameters(
251+
dict(
252+
testcase_name="adjust_probabilities_1",
253+
target_ratios=[0.2, 0.8],
254+
current_runtime=[1, 4],
255+
expected=[0.5, 0.5],
256+
),
257+
dict(
258+
testcase_name="adjust_probabilities_2",
259+
target_ratios=[0.5, 0.5],
260+
current_runtime=[1, 1],
261+
expected=[0.5, 0.5],
262+
),
263+
dict(
264+
testcase_name="adjust_probabilities_3",
265+
target_ratios=[0.2, 0.8],
266+
current_runtime=[2, 2],
267+
expected=[0.2, 0.8],
268+
),
269+
dict(
270+
testcase_name="adjust_probabilities_4",
271+
target_ratios=[0.2, 0.8],
272+
current_runtime=[1, 1],
273+
expected=[0.2, 0.8],
274+
),
275+
)
276+
def test_adjust_probabilities(self, target_ratios, current_runtime, expected):
277+
probabilities = self.pb._AdjustProbabilities(target_ratios, current_runtime)
278+
self.assertSequenceAlmostEqual(probabilities, expected)
279+
280+
def test_adjust_runtime(self):
281+
# First entries are fake durations, the second entries are real durations.
282+
self.pb.runtimes["BM_Test1"] = [
283+
parallel_bench_lib.BenchmarkMetrics(
284+
total_duration=10,
285+
per_iteration_wall_time=1,
286+
per_iteration_cpu_time=1,
287+
per_bm_run_iteration=2,
288+
),
289+
parallel_bench_lib.BenchmarkMetrics(
290+
total_duration=2,
291+
per_iteration_wall_time=3.01,
292+
per_iteration_cpu_time=3,
293+
per_bm_run_iteration=4,
294+
),
295+
parallel_bench_lib.BenchmarkMetrics(
296+
total_duration=4,
297+
per_iteration_wall_time=3.01,
298+
per_iteration_cpu_time=3,
299+
per_bm_run_iteration=4,
300+
),
301+
]
302+
self.pb.runtimes["BM_Test2"] = [
303+
parallel_bench_lib.BenchmarkMetrics(
304+
total_duration=10,
305+
per_iteration_wall_time=1,
306+
per_iteration_cpu_time=1,
307+
per_bm_run_iteration=10,
308+
),
309+
parallel_bench_lib.BenchmarkMetrics(
310+
total_duration=4,
311+
per_iteration_wall_time=4,
312+
per_iteration_cpu_time=5,
313+
per_bm_run_iteration=8,
314+
),
315+
parallel_bench_lib.BenchmarkMetrics(
316+
total_duration=6,
317+
per_iteration_wall_time=4,
318+
per_iteration_cpu_time=5,
319+
per_bm_run_iteration=8,
320+
),
321+
]
322+
323+
current_runtimes = self.pb._AdjustRuntime()
324+
self.assertSequenceAlmostEqual(current_runtimes, [3.0, 5.0])
325+
326+
self.pb.runtimes["BM_Test1"] = [
327+
parallel_bench_lib.BenchmarkMetrics(
328+
total_duration=10,
329+
per_iteration_wall_time=1,
330+
per_iteration_cpu_time=1,
331+
per_bm_run_iteration=2,
332+
),
333+
]
334+
self.pb.runtimes["BM_Test2"] = [
335+
parallel_bench_lib.BenchmarkMetrics(
336+
total_duration=10,
337+
per_iteration_wall_time=1,
338+
per_iteration_cpu_time=1,
339+
per_bm_run_iteration=10,
340+
),
341+
parallel_bench_lib.BenchmarkMetrics(
342+
total_duration=4,
343+
per_iteration_wall_time=4,
344+
per_iteration_cpu_time=5,
345+
per_bm_run_iteration=8,
346+
),
347+
]
348+
current_runtimes = self.pb._AdjustRuntime()
349+
self.assertSequenceAlmostEqual(current_runtimes, [40.0, 4.0])
350+
247351

248352
if __name__ == "__main__":
249353
absltest.main()

0 commit comments

Comments
 (0)