generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 234
Expand file tree
/
Copy pathclient_benchmark.py
More file actions
86 lines (65 loc) · 3.33 KB
/
client_benchmark.py
File metadata and controls
86 lines (65 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
import subprocess
import os
from typing import Dict, Any
from benchmarks.base_benchmark import BaseBenchmark
from omegaconf import DictConfig
from benchmarks.benchmark_config_parser import BenchmarkConfigParser
log = logging.getLogger(__name__)
class ClientBenchmark(BaseBenchmark):
def __init__(self, cfg: DictConfig, metadata: Dict[str, Any], backpressure=False):
self.metadata = metadata
self.backpressure = backpressure
self.config_parser = BenchmarkConfigParser(cfg)
self.common_config = self.config_parser.get_common_config()
self.client_config = self.config_parser.get_client_config()
def setup(self) -> Dict[str, Any]:
return self.metadata
def run_benchmark(self) -> Dict[str, Any]:
subprocess_args = [
"cargo",
"run",
"--release",
"--example",
"client_benchmark",
"--",
]
if self.backpressure:
subprocess_args.append("--enable-backpressure")
if (initial_window_size := self.client_config['max_read_window_size']) is not None:
subprocess_args.extend(["--initial-window-size", str(initial_window_size)])
if (run_time := self.common_config['run_time']) is not None:
subprocess_args.extend(["--max-duration", f"{run_time}"])
if (max_throughput := self.common_config.get('max_throughput_gbps')) is not None:
subprocess_args.extend(["--throughput-target-gbps", str(max_throughput)])
if (read_part_size := self.common_config['read_part_size']) is not None:
subprocess_args.extend(["--part-size", read_part_size])
if (crt_mem_limit_gib := self.common_config.get('crt_mem_limit_gib')) is not None:
subprocess_args.extend(["--crt-memory-limit-gb", crt_mem_limit_gib])
subprocess_args.extend(["--output-file", "client-output.json"])
subprocess_args.append("real")
region = self.common_config['region']
subprocess_args.extend(["--region", region])
for interface in self.common_config['network_interfaces']:
subprocess_args.extend(["--bind", interface])
subprocess_args.append(self.common_config['s3_bucket'])
objects = self.common_config['s3_keys']
app_workers = self.common_config['application_workers']
object_size_in_gib = self.common_config['object_size_in_gib']
if not objects:
objects = self.config_parser.default_object_keys(app_workers, object_size_in_gib)
if len(objects) >= app_workers:
for obj in objects:
subprocess_args.append(obj)
else:
raise ValueError("Seeing fewer objects than app workers. So cannot proceed with the run.")
client_env = {}
if not self.common_config['download_checksums']:
client_env["EXPERIMENTAL_MOUNTPOINT_NO_DOWNLOAD_INTEGRITY_VALIDATION"] = "ON"
subprocess_env = os.environ.copy() | client_env
log.debug("Subprocess env: %s", subprocess_env)
log.info("Running client benchmark with args: %s", subprocess_args)
subprocess.run(subprocess_args, check=True, capture_output=True, text=True, env=subprocess_env)
log.info("Client benchmark completed successfully.")
def post_process(self) -> Dict[str, Any]:
return self.metadata