Skip to content

Commit fa19f91

Browse files
sahityadgdannycjones
authored andcommitted
Refactor benchmark.py to extend to prefetcher and other benchmarks (awslabs#1507)
This change extracts fio and mountoint specific code from benchmark.py to specific modules to make it cleaner. It also separates the configuration into sections allowing us to have benchmark specific sweeper parameters. ### Does this change impact existing behavior? No, benchmark change only ### Does this change need a changelog entry? Does it require a version change? No, benchmark change only --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Sahitya Damera <sahityad@amazon.com>
1 parent 07a6a9c commit fa19f91

6 files changed

Lines changed: 424 additions & 312 deletions

File tree

benchmark/benchmark.py

Lines changed: 62 additions & 270 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
from contextlib import contextmanager
2-
from datetime import datetime, timezone
32
import json
43
import logging
54
import os
6-
from os import path
75
import signal
86
import subprocess
9-
from subprocess import Popen
10-
import tempfile
11-
from typing import List, Optional
12-
import urllib.request
7+
from datetime import datetime, timezone
8+
from typing import List, Dict, Any, Optional
139

1410
import hydra
1511
from omegaconf import DictConfig, OmegaConf
12+
import urllib.request
13+
14+
from benchmarks.benchmark_config_parser import BenchmarkConfigParser
15+
from benchmarks.fio_benchmark import FioBenchmark
1616

1717
logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper())
1818

@@ -23,259 +23,38 @@
2323
lambda separator, elements: separator.join(elements),
2424
)
2525

26-
MOUNT_DIRECTORY = "s3"
27-
MP_LOGS_DIRECTORY = "mp_logs/"
28-
29-
30-
@contextmanager
31-
def _mounted_bucket(
32-
cfg: DictConfig,
33-
):
34-
"""
35-
Mounts the S3 bucket, providing metadata about the successful mount.
36-
37-
Context manager allows use of `with` clause, automatically unmounting the bucket.
38-
"""
39-
mount_dir = tempfile.mkdtemp(suffix=".mountpoint-s3")
40-
mount_metadata = _mount_mp(cfg, mount_dir)
41-
try:
42-
yield mount_metadata
43-
finally:
44-
try:
45-
subprocess.check_output(["umount", mount_dir])
46-
log.debug(f"{mount_dir} unmounted")
47-
os.rmdir(mount_dir)
48-
os.remove(mount_metadata["mount_s3_env"]["UNSTABLE_MOUNTPOINT_PID_FILE"])
49-
except Exception:
50-
log.error(f"Error cleaning up Mountpoint at {mount_dir}:", exc_info=True)
51-
5226

53-
class MountError(Exception):
54-
pass
55-
56-
57-
def _mount_mp(
58-
cfg: DictConfig,
59-
mount_dir: str,
60-
) -> dict[str, any] | MountError | subprocess.CalledProcessError:
61-
"""
62-
Mount an S3 bucket using Mountpoint,
63-
using the configuration to apply Mountpoint arguments.
64-
65-
Returns Mountpoint version string.
66-
"""
67-
bucket = cfg['s3_bucket']
68-
stub_mode = str(cfg["stub_mode"]).lower()
69-
70-
if cfg['mountpoint_binary'] is None:
71-
mountpoint_args = [
72-
"cargo",
73-
"run",
74-
"--quiet",
75-
"--release",
76-
"--features=mock",
77-
]
78-
79-
if stub_mode == "s3_client":
80-
# `mock-mount-s3` requires bucket to be prefixed with `sthree-` to verify we're not actually reaching S3
81-
logging.debug("using mock-mount-s3 due to `stub_mode`, bucket will be prefixed with \"sthree-\"")
82-
bucket = f"sthree-{cfg['s3_bucket']}"
83-
84-
mountpoint_args.append("--bin=mock-mount-s3")
85-
86-
# End Cargo command, begin passing arguments to Mountpoint
87-
mountpoint_args.append("--")
88-
else:
89-
mountpoint_args = [cfg['mountpoint_binary']]
90-
91-
os.makedirs(MP_LOGS_DIRECTORY, exist_ok=True)
92-
93-
mountpoint_version_output = subprocess.check_output([*mountpoint_args, "--version"]).decode("utf-8")
94-
log.info("Mountpoint version: %s", mountpoint_version_output.strip())
95-
96-
subprocess_args = [
97-
*mountpoint_args,
98-
bucket,
99-
mount_dir,
100-
"--log-metrics",
101-
"--allow-overwrite",
102-
"--allow-delete",
103-
f"--log-directory={MP_LOGS_DIRECTORY}",
104-
]
105-
subprocess_env = os.environ.copy()
106-
107-
if cfg['s3_prefix'] is not None:
108-
subprocess_args.append(f"--prefix={cfg['s3_prefix']}")
109-
110-
if cfg['mountpoint_debug']:
111-
subprocess_args.append("--debug")
112-
if cfg['mountpoint_debug_crt']:
113-
subprocess_args.append("--debug-crt")
114-
115-
if cfg["read_part_size"]:
116-
subprocess_args.append(f"--read-part-size={cfg['read_part_size']}")
117-
if cfg["write_part_size"]:
118-
subprocess_args.append(f"--write-part-size={cfg['write_part_size']}")
119-
120-
if cfg['metadata_ttl'] is not None:
121-
subprocess_args.append(f"--metadata-ttl={cfg['metadata_ttl']}")
122-
123-
if cfg['upload_checksums'] is not None:
124-
subprocess_args.append(f"--upload-checksums={cfg['upload_checksums']}")
125-
126-
if cfg['fuse_threads'] is not None:
127-
subprocess_args.append(f"--max-threads={cfg['fuse_threads']}")
128-
129-
for network_interface in cfg['network']['interface_names']:
130-
subprocess_args.append(f"--bind={network_interface}")
131-
if (max_throughput := cfg['network']['maximum_throughput_gbps']) is not None:
132-
if stub_mode == "s3_client":
133-
raise ValueError(
134-
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
135-
)
136-
subprocess_args.append(f"--maximum-throughput-gbps={max_throughput}")
137-
138-
if cfg['mountpoint_max_background'] is not None:
139-
subprocess_env["UNSTABLE_MOUNTPOINT_MAX_BACKGROUND"] = str(cfg['mountpoint_max_background'])
140-
141-
if cfg['mountpoint_congestion_threshold'] is not None:
142-
subprocess_env["UNSTABLE_MOUNTPOINT_CONGESTION_THRESHOLD"] = str(cfg["mountpoint_congestion_threshold"])
143-
144-
subprocess_env["UNSTABLE_MOUNTPOINT_PID_FILE"] = f"{mount_dir}.pid"
145-
146-
if stub_mode != "off" and cfg["mountpoint_binary"] is not None:
147-
raise ValueError("Cannot use `stub_mode` with `mountpoint_binary`, `stub_mode` requires recompilation")
148-
match stub_mode:
149-
case "off":
150-
pass
151-
case "fs_handler":
152-
subprocess_env["MOUNTPOINT_BUILD_STUB_FS_HANDLER"] = "1"
153-
case "s3_client":
154-
# Already handled when building cargo command
155-
pass
156-
case _:
157-
raise ValueError(f"Unknown stub_mode: {stub_mode}")
158-
159-
log.info(f"Mounting S3 bucket {bucket} with args: %s; env: %s", subprocess_args, subprocess_env)
160-
try:
161-
output = subprocess.check_output(subprocess_args, env=subprocess_env)
162-
except subprocess.CalledProcessError as e:
163-
log.error(f"Error during mounting: {e}")
164-
raise MountError() from e
165-
166-
mountpoint_pid = _get_mount_s3_pid(subprocess_env["UNSTABLE_MOUNTPOINT_PID_FILE"])
167-
log.info("Mountpoint pid: %d, output: %s", mountpoint_pid, output.decode("utf-8").strip())
168-
169-
return {
170-
"mount_dir": mount_dir,
171-
"mount_s3_command": " ".join(subprocess_args),
172-
"mount_s3_env": subprocess_env,
173-
"mp_version": mountpoint_version_output.strip(),
174-
"mp_pid": mountpoint_pid,
175-
}
176-
177-
178-
def _run_fio(cfg: DictConfig, mount_dir: str) -> None:
179-
"""
180-
Run the FIO workload against the file system.
181-
"""
182-
FIO_BINARY = "fio"
183-
fio_job_name = cfg["fio_benchmark"]
184-
fio_output_filepath = f"fio.{fio_job_name}.json"
185-
186-
# TODO: Avoid duplicating/diverging the FIO jobs between `benchmark/fio/` and `mountpoint-s3/scripts/fio/`
187-
fio_job_filepath = hydra.utils.to_absolute_path(f"fio/{fio_job_name}.fio")
188-
subprocess_args = [
189-
FIO_BINARY,
190-
"--eta=never",
191-
"--output-format=json",
192-
f"--output={fio_output_filepath}",
193-
f"--directory={mount_dir}",
194-
fio_job_filepath,
195-
]
196-
subprocess_env = os.environ.copy()
197-
subprocess_env["APP_WORKERS"] = str(cfg['application_workers'])
198-
subprocess_env["SIZE_GIB"] = "100"
199-
subprocess_env["DIRECT"] = "1" if cfg['direct_io'] else "0"
200-
subprocess_env["UNIQUE_DIR"] = datetime.now(tz=timezone.utc).isoformat()
201-
subprocess_env["IO_ENGINE"] = cfg['fio_io_engine']
202-
log.info("Running FIO with args: %s; env: %s", subprocess_args, subprocess_env)
203-
204-
# Use Popen instead of check_output, as we had some issues when trying to attach perf
205-
with Popen(subprocess_args, env=subprocess_env) as process:
206-
exit_code = process.wait()
207-
if exit_code != 0:
208-
log.error(f"FIO process failed with exit code {exit_code}")
209-
raise subprocess.CalledProcessError(exit_code, subprocess_args)
210-
else:
211-
log.info("FIO process completed successfully")
212-
213-
214-
def _collect_logs() -> None:
215-
"""
216-
Collect the Mountpoint log if it exists and move to the output directory.
217-
Mountpoint log filename will be normalized removing the date, etc..
218-
The old log directory is removed.
219-
220-
Fails if more than one log file is found.
221-
"""
222-
logs_directory = path.join(os.getcwd(), MP_LOGS_DIRECTORY)
223-
dir_entries = os.listdir(logs_directory)
224-
225-
if not dir_entries:
226-
log.debug(f"No Mountpoint log files in directory {logs_directory}")
227-
return
228-
229-
assert len(dir_entries) <= 1, f"Expected no more than one log file in {logs_directory}"
230-
231-
old_log_dir = path.join(logs_directory, dir_entries[0])
232-
new_log_path = "mountpoint-s3.log"
233-
log.debug(f"Renaming {old_log_dir} to {new_log_path}")
234-
os.rename(old_log_dir, new_log_path)
235-
os.rmdir(logs_directory)
236-
237-
238-
def _write_metadata(metadata: dict[str, any]) -> None:
239-
with open("metadata.json", "w") as f:
240-
json.dump(metadata, f, default=str)
241-
242-
243-
def _postprocessing(metadata: dict[str, any]) -> None:
244-
_collect_logs()
245-
_write_metadata(metadata)
246-
247-
248-
def _get_ec2_instance_id() -> Optional[str]:
27+
def get_ec2_instance_id() -> Optional[str]:
28+
"""Get the EC2 instance ID if running on EC2."""
24929
if os.getenv("AWS_EC2_METADATA_DISABLED") == "true":
25030
return None
25131

252-
token_url = "http://169.254.169.254/latest/api/token"
253-
token_request = urllib.request.Request(token_url, method='PUT')
254-
token_request.add_header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
255-
with urllib.request.urlopen(token_request) as token_response:
256-
token = token_response.read().decode()
257-
258-
metadata_url = "http://169.254.169.254/latest/meta-data/instance-id"
259-
metadata_request = urllib.request.Request(metadata_url, headers={"X-aws-ec2-metadata-token": token})
260-
with urllib.request.urlopen(metadata_request) as metadata_response:
261-
instance_id = metadata_response.read().decode()
262-
263-
return instance_id
264-
265-
266-
def _get_mount_s3_pid(pid_file: str) -> int:
26732
try:
268-
with open(pid_file, 'r') as f:
269-
pid = int(f.read().strip())
270-
271-
log.debug(f"Read mount-s3 pid: {pid} from file: {pid_file}")
33+
token_url = "http://169.254.169.254/latest/api/token"
34+
token_request = urllib.request.Request(token_url, method='PUT')
35+
token_request.add_header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
36+
with urllib.request.urlopen(token_request) as token_response:
37+
token = token_response.read().decode()
38+
39+
metadata_url = "http://169.254.169.254/latest/meta-data/instance-id"
40+
metadata_request = urllib.request.Request(metadata_url, headers={"X-aws-ec2-metadata-token": token})
41+
with urllib.request.urlopen(metadata_request) as metadata_response:
42+
instance_id = metadata_response.read().decode()
43+
44+
return instance_id
45+
except Exception:
46+
log.warning("Failed to retrieve EC2 instance ID", exc_info=True)
47+
return None
27248

273-
return pid
27449

275-
except FileNotFoundError:
276-
raise RuntimeError(f"Mountpoint pid file not found: {pid_file}")
277-
except Exception as e:
278-
raise RuntimeError("Could not determine mountpoint pid") from e
50+
def write_metadata(metadata: Dict[str, Any]) -> None:
51+
"""Write metadata to a file."""
52+
try:
53+
with open("metadata.json", "w") as f:
54+
json.dump(metadata, f, default=str)
55+
log.debug("Metadata written to metadata.json")
56+
except Exception:
57+
log.error("Failed to write metadata", exc_info=True)
27958

28059

28160
class ResourceMonitoring:
@@ -380,34 +159,47 @@ def managed(target_pid, with_bwm=False, with_perf_stat=False):
380159
@hydra.main(version_base=None, config_path="conf", config_name="config")
381160
def run_experiment(cfg: DictConfig) -> None:
382161
"""
383-
At a high level, we want to mount the S3 bucket using Mountpoint,
384-
run a synthetic workload against Mountpoint while capturing metrics and logs,
385-
then end the load and unmount the bucket.
162+
Run the benchmark experiment with the given configuration.
386163
387-
We should collect all of the logs and metric and dump them in the output directory.
164+
Args:
165+
cfg: Configuration object containing benchmark parameters
388166
"""
389167
log.debug("Experiment starting")
168+
169+
config_parser = BenchmarkConfigParser(cfg)
170+
common_config = config_parser.get_common_config()
171+
benchmark_type = common_config['benchmark_type']
390172
metadata = {
391-
"ec2_instance_id": _get_ec2_instance_id(),
173+
"ec2_instance_id": get_ec2_instance_id(),
392174
"start_time": datetime.now(tz=timezone.utc),
393175
"success": False,
394176
}
395177

396-
with _mounted_bucket(cfg) as mount_metadata:
397-
metadata.update(mount_metadata)
398-
mount_dir = mount_metadata["mount_dir"]
399-
mountpoint_pid = mount_metadata["mp_pid"]
400-
try:
401-
with ResourceMonitoring.managed(mountpoint_pid, cfg['with_bwm'], cfg['with_perf_stat']):
402-
_run_fio(cfg, mount_dir)
403-
metadata["success"] = True
404-
except Exception as e:
405-
log.error(f"Error running experiment: {e}")
178+
if benchmark_type == "fio":
179+
benchmark = FioBenchmark(cfg, metadata)
180+
else:
181+
raise ValueError(f"Unsupported benchmark type: {benchmark_type}")
182+
183+
try:
184+
benchmark.setup()
185+
target_pid = metadata.get("target_pid")
406186

407-
metadata["end_time"] = datetime.now(tz=timezone.utc)
187+
with ResourceMonitoring.managed(target_pid, cfg.monitoring.with_bwm, cfg.monitoring.with_perf_stat):
188+
benchmark.run_benchmark()
408189

409-
_postprocessing(metadata)
410-
log.info("Experiment ended")
190+
# Mark success if we get here without exceptions
191+
metadata["success"] = True
192+
except Exception:
193+
log.error("Benchmark execution failed:", exc_info=True)
194+
raise
195+
finally:
196+
try:
197+
benchmark.post_process()
198+
except Exception:
199+
log.error("Post-processing failed:", exc_info=True)
200+
finally:
201+
write_metadata(metadata)
202+
metadata["end_time"] = datetime.now(tz=timezone.utc)
411203

412204

413205
if __name__ == "__main__":

0 commit comments

Comments
 (0)