generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 234
Expand file tree
/
Copy pathprefetch_benchmark.py
More file actions
86 lines (64 loc) · 3.3 KB
/
prefetch_benchmark.py
File metadata and controls
86 lines (64 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
import os
import subprocess
from typing import Dict, Any
from benchmarks.base_benchmark import BaseBenchmark
from omegaconf import DictConfig
from benchmarks.benchmark_config_parser import BenchmarkConfigParser
log = logging.getLogger(__name__)
class PrefetchBenchmark(BaseBenchmark):
def __init__(self, cfg: DictConfig, metadata: Dict[str, Any]):
self.config_parser = BenchmarkConfigParser(cfg)
self.common_config = self.config_parser.get_common_config()
self.prefetch_config = self.config_parser.get_prefetch_config()
def setup(self) -> None:
pass
def run_benchmark(self) -> None:
subprocess_args = [
"cargo",
"run",
"--release",
"--example",
"prefetch_benchmark",
self.common_config['s3_bucket'],
]
object_size = self.common_config['object_size_in_gib']
size_gib = str(object_size)
app_workers = self.common_config['application_workers']
# Check if objects are specified or if we have to fall back to objects
# generated by fio.
objects = self.common_config['s3_keys']
if not objects:
objects = self.config_parser.default_object_keys(app_workers, size_gib)
if len(objects) >= app_workers:
for i in range(app_workers):
subprocess_args.append(objects[i])
else:
raise ValueError("Seeing fewer objects than app workers. So cannot proceed with the run.")
region = self.common_config["region"]
subprocess_args.extend(["--region", region])
if (max_throughput := self.common_config['max_throughput_gbps']) is not None:
subprocess_args.extend(["--maximum-throughput-gbps", str(max_throughput)])
if (max_memory_target := self.prefetch_config['max_memory_target']) is not None:
subprocess_args.extend(["--max-memory-target", str(max_memory_target)])
if (read_part_size := self.common_config['read_part_size']) is not None:
subprocess_args.extend(["--part-size", str(read_part_size)])
read_size = self.common_config['read_size']
subprocess_args.extend(["--read-size", str(read_size)])
for interface in self.common_config['network_interfaces']:
subprocess_args.extend(["--bind", interface])
if (run_time := self.common_config['run_time']) is not None:
subprocess_args.extend(["--max-duration", str(run_time)])
subprocess_args.extend(["--output-file", "prefetch-output.json"])
prefetch_env = {}
if not self.common_config['download_checksums']:
prefetch_env["EXPERIMENTAL_MOUNTPOINT_NO_DOWNLOAD_INTEGRITY_VALIDATION"] = "ON"
if self.prefetch_config['max_read_window_size'] is not None:
prefetch_env["UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE"] = str(self.prefetch_config['max_read_window_size'])
subprocess_env = os.environ.copy() | prefetch_env
log.debug("Subprocess env: %s", subprocess_env)
log.info("Running prefetch benchmark with args: %s", subprocess_args)
subprocess.run(subprocess_args, check=True, capture_output=True, text=True, env=subprocess_env)
log.info("Prefetch benchmark completed successfully.")
def post_process(self) -> None:
pass