-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRunnerConfig_execution_NUC.py
More file actions
423 lines (347 loc) · 21.2 KB
/
Copy pathRunnerConfig_execution_NUC.py
File metadata and controls
423 lines (347 loc) · 21.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
from EventManager.Models.RunnerEvents import RunnerEvents
from EventManager.EventSubscriptionController import EventSubscriptionController
from ConfigValidator.Config.Models.RunTableModel import RunTableModel
from ConfigValidator.Config.Models.FactorModel import FactorModel
from ConfigValidator.Config.Models.RunnerContext import RunnerContext
from ConfigValidator.Config.Models.OperationType import OperationType
from ExtendedTyping.Typing import SupportsStr
from ProgressManager.Output.OutputProcedure import OutputProcedure as output
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
from os.path import dirname, realpath
# Experiment specific imports
import time
import paramiko
import subprocess
import pandas as pd
from os import getenv, remove
from dotenv import load_dotenv
from scp import SCPClient
from collections import defaultdict
load_dotenv()
RAPL_OVERFLOW_VALUE = 262143.328850
class ExternalMachineAPI:
def __init__(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.stdin = None
self.stdout = None
self.stderr = None
try:
self.ssh.connect(hostname=getenv(f'NUC_HOSTNAME'), username=getenv(f'NUC_USERNAME'), password=getenv(f'NUC_PASSWORD'))
except paramiko.SSHException:
output.console_log_FAIL('Failed to send run command to machine!')
def execute_remote_command(self, command : str = '', env : dict = {}, overwrite_channels : bool = True):
try:
# Execute the command
if overwrite_channels:
self.stdin, self.stdout, self.stderr = self.ssh.exec_command(command,environment=env)
else:
self.ssh.exec_command(command)
except paramiko.SSHException:
output.console_log_FAIL('Failed to send run command to machine.')
except TimeoutError:
output.console_log_FAIL('Timeout reached while waiting for command output.')
def copy_file_from_remote(self, remote_path, local_path):
# Create SSH client and SCP client
with SCPClient(self.ssh.get_transport()) as scp:
# Copy the file from remote to local
scp.get(remote_path, local_path, recursive=True)
output.console_log_OK(f"Copied {remote_path} to {local_path}")
def __del__(self):
self.stdin.close()
self.stdout.close()
self.stderr.close()
self.ssh.close()
def parse_perf_output(file_path):
# Initialize dictionary with all data_columns as keys and default to None
perf_data = {
'cache-references': None, 'cache-misses': None, 'LLC-loads': None, 'LLC-load-misses': None, 'LLC-stores': None, 'LLC-store-misses': None,
'cache-misses_percent': None, 'LLC-load-misses_percent': None, 'LLC-store-misses_percent': None
}
# Open the file and parse each line
with open(file_path, 'r') as file:
lines = file.readlines()
# Parse each line in the perf output, skip the header lines
for line in lines[2:]:
parts = line.split(',')
# Extract event count, type, and percentage
event_count = int(parts[0].strip()) if str(parts[0].strip()).isdigit() else None
event_type = parts[2].strip() # Extract only the event name
# Map to dictionary keys
count_key = event_type
# Update dictionary values
for key in perf_data.keys():
if key in count_key:
perf_data[key] = event_count
if perf_data['cache-misses'] and perf_data['cache-references']:
perf_data['cache-misses_percent'] = perf_data['cache-misses']/perf_data['cache-references'] * 100
if perf_data['LLC-load-misses'] and perf_data['LLC-loads']:
perf_data['LLC-load-misses_percent'] = perf_data['LLC-load-misses']/perf_data['LLC-loads'] * 100
if perf_data['LLC-store-misses'] and perf_data['LLC-stores']:
perf_data['LLC-store-misses_percent'] = perf_data['LLC-store-misses']/perf_data['LLC-stores'] * 100
return perf_data
def parse_energibridge_output(file_path):
# Define target columns
target_columns = [
'TOTAL_MEMORY', 'TOTAL_SWAP', 'USED_MEMORY', 'USED_SWAP', 'PROCESS_CPU_USAGE', 'PROCESS_MEMORY', 'PROCESS_VIRTUAL_MEMORY'] + [f'CPU_USAGE_{i}' for i in range(12)] + [f'CPU_FREQUENCY_{i}' for i in range(12)]
delta_target_columns = [
'DRAM_ENERGY (J)', 'PACKAGE_ENERGY (J)', 'PP0_ENERGY (J)', 'PP1_ENERGY (J)'
]
# Read the file into a pandas DataFrame
df = pd.read_csv(file_path).apply(pd.to_numeric, errors='coerce')
# Calculate column-wise averages, ignoring NaN values and deltas from start of experiment to finish
averages = df[target_columns].mean().to_dict()
deltas = {}
# Account and mitigate potential RAPL overflow during metric collection
for column in delta_target_columns:
overflow_counter = 0
# Iterate and adjust values in the array
column_data = df[column].to_numpy()
for i in range(1, len(column_data)):
if column_data[i] < column_data[i - 1]:
output.console_log_WARNING(f"RAPL Overflow found:\nReading {i-1}: {column_data[i-1]}\nReading {i}: {column_data[i]}")
overflow_counter += 1
column_data[i:] += overflow_counter * RAPL_OVERFLOW_VALUE
deltas[column] = column_data[-1] - column_data[0]
return dict(averages.items() | deltas.items())
def compare_files_bash(file1, file2):
try:
# Run the `diff` command to compare the files
result = subprocess.run(["diff", file1, file2], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.stdout == b"" # True if no output, False otherwise
except subprocess.CalledProcessError:
return False # Files are different or an error occurred
class RunnerConfig:
ROOT_DIR = Path(dirname(realpath(__file__)))
# ================================ USER SPECIFIC CONFIG ================================
"""The name of the experiment."""
name: str = f"NUC_experiment_fix"
"""The path in which Experiment Runner will create a folder with the name `self.name`, in order to store the
results from this experiment. (Path does not need to exist - it will be created if necessary.)
Output path defaults to the config file's path, inside the folder 'experiments'"""
results_output_path: Path = ROOT_DIR / 'experiments'
"""Experiment operation type. Unless you manually want to initiate each run, use `OperationType.AUTO`."""
operation_type: OperationType = OperationType.AUTO
"""The time Experiment Runner will wait after a run completes.
This can be essential to accommodate for cooldown periods on some systems."""
time_between_runs_in_ms: int = 120000
# Dynamic configurations can be one-time satisfied here before the program takes the config as-is
# e.g. Setting some variable based on some criteria
def __init__(self):
"""Executes immediately after program start, on config load"""
load_dotenv()
EventSubscriptionController.subscribe_to_multiple_events([
(RunnerEvents.BEFORE_EXPERIMENT, self.before_experiment),
(RunnerEvents.BEFORE_RUN , self.before_run ),
(RunnerEvents.START_RUN , self.start_run ),
(RunnerEvents.START_MEASUREMENT, self.start_measurement),
(RunnerEvents.INTERACT , self.interact ),
(RunnerEvents.STOP_MEASUREMENT , self.stop_measurement ),
(RunnerEvents.STOP_RUN , self.stop_run ),
(RunnerEvents.POPULATE_RUN_DATA, self.populate_run_data),
(RunnerEvents.AFTER_EXPERIMENT , self.after_experiment )
])
self.run_table_model = None # Initialized later
self.project_directory = "./ease25-repl-pkg"
self.venv_python_path = './.cache/pypoetry/virtualenvs/'
self.metric_capturing_interval : int = 200 # Miliseconds
self.warmup_time : int = 60 # Seconds
self.post_warmup_cooldown_time : int = 30 # Seconds
self.subject_execution_templates = {
'cpython' : {
'file_io' : '{venv_python} {target_path}/{target}.py',
'value_io' : '{venv_python} {target_path}/{target}.py {input}',
'python_version' : '3.10'
},
'cython' : {
'build' : '{venv_python} {project_directory}/control_group/{target}/build.py {project_directory}/control_group/{target}/source',
'file_io' : '{venv_python} -c "import sys; sys.path.insert(0, \'{target_path}/build/\'); import {target}; {target}.main()"',
'value_io' : '{venv_python} -c "import sys; sys.path.insert(0, \'{target_path}/build/\'); import {target}; {target}.main({input})"',
'python_version' : '3.10'
},
'nuitka' : {
'build' : '{project_directory}/control_group/{target}/build.sh {project_directory}/control_group/{target}/source {project_directory}/control_group/{target}/build',
'file_io' : '{target_path}/build/{target}.bin',
'value_io' : '{target_path}/build/{target}.bin {input}',
},
'pypy' : {
'file_io' : 'pypy3 {target_path}/{target}.py',
'value_io' : 'pypy3 {target_path}/{target}.py {input}',
},
'mypyc' : {
'build' : '{venv_python} {project_directory}/control_group/{target}/build.py {project_directory}/control_group/{target}/source',
'file_io' : '{venv_python} -c "import sys; sys.path.insert(0, \'{target_path}/build/\'); import {target}; {target}.main()"',
'value_io' : '{venv_python} -c "import sys; sys.path.insert(0, \'{target_path}/build/\'); import {target}; {target}.main({input})"',
'python_version' : '3.10'
},
'codon' : {
'build' : '{project_directory}/control_group/{target}/build.sh {project_directory}/control_group/{target}/source {project_directory}/control_group/{target}/build',
'file_io' : '{target_path}/build/{target} {input}',
'value_io' : '{target_path}/build/{target} {input}',
},
'numba' : {
'file_io' : '{venv_python} {target_path}/{target}.py',
'value_io' : '{venv_python} {target_path}/{target}.py {input}',
'python_version' : '3.10'
},
'py3.13-jit' : {
'file_io' : '{venv_python} {target_path}/{target}.py',
'value_io' : '{venv_python} {target_path}/{target}.py {input}',
'python_version' : '3.13'
},
'pyston-lite' : {
'file_io' : '{venv_python} {target_path}/{target}.py',
'value_io' : '{venv_python} {target_path}/{target}.py {input}',
'python_version' : '3.10'
},
'pyjion' : {
'file_io' : None,
'value_io' : None,
'python_version' : '3.10'
},
'jython' : {
'file_io' : None,
'value_io' : None,
'python_version' : '3.10'
}
}
self.target_inputs = {
'spectralnorm' : 5500,
'binary_trees' : 21,
'fasta' : 25000000,
'k_nucleotide' : f'{self.project_directory}/code/fasta.txt',
'n_body' : 50000000,
'mandelbrot' : 16000,
'fannkuch_redux' : 12,
}
output.console_log("Custom config loaded")
def create_run_table_model(self) -> RunTableModel:
"""Create and return the run_table model here. A run_table is a List (rows) of tuples (columns),
representing each run performed"""
factor1 = FactorModel("subject", ['cpython', 'py3.13-jit', 'pyston-lite']) # 'jython', 'pyjion', 'numba', 'cython', 'pypy', 'codon', 'mypyc', 'nuitka',
factor2 = FactorModel("target", ['mandelbrot', 'spectralnorm', 'binary_trees', 'fasta', 'k_nucleotide', 'n_body', 'fannkuch_redux'])
self.run_table_model = RunTableModel(
factors=[factor1, factor2],
repetitions=15,
shuffle=True,
data_columns=['cache-references', 'cache-misses', 'LLC-loads', 'LLC-load-misses', 'LLC-stores', 'LLC-store-misses',
'cache-misses_percent', 'LLC-load-misses_percent', 'LLC-store-misses_percent',
'DRAM_ENERGY (J)', 'PACKAGE_ENERGY (J)', 'PP0_ENERGY (J)', 'PP1_ENERGY (J)',
'TOTAL_MEMORY', 'TOTAL_SWAP', 'USED_MEMORY', 'USED_SWAP', 'execution_time',
'PROCESS_CPU_USAGE', 'PROCESS_MEMORY', 'PROCESS_VIRTUAL_MEMORY'] + [f"CPU_USAGE_{i}" for i in range(12)] + [f'CPU_FREQUENCY_{i}' for i in range(12)]
)
return self.run_table_model
def before_experiment(self) -> None:
output.console_log("Config.before_experiment() called!")
ssh = ExternalMachineAPI()
# Extract fasta_input.txt file
check_command = f"[ -f {self.project_directory}/code/fasta.txt ] && echo 'exists' || echo 'not_exists'"
ssh.execute_remote_command(check_command)
check_status = ssh.stdout.readline()
if check_status.strip() == 'not_exists':
output.console_log("Unpacking expected results of fasta_input.txt on experimental machine...")
extract_command = f'tar -xf {self.project_directory}/code/fasta.tar.xz -C {self.project_directory}/code/'
ssh.execute_remote_command(extract_command)
# Warmup machine for one minute
output.console_log("Warming up machine using a fibonnaci sequence...")
warmup_command = f"echo {getenv('NUC_PASSWORD')} | sudo -S python {self.project_directory}/code/warmup.py 1000 & pid=$!; echo $pid"
ssh.execute_remote_command(warmup_command)
time.sleep(self.warmup_time)
ssh.execute_remote_command(f"echo {getenv('NUC_PASSWORD')} | sudo -S kill {ssh.stdout.readline()}")
# Cooldown machine
time.sleep(self.post_warmup_cooldown_time)
output.console_log_OK("Warmup finished. Experiment is starting now!")
def before_run(self) -> None:
self.run_time = None
def start_run(self, context: RunnerContext) -> None:
output.console_log("Config.start_run() called!")
ssh = ExternalMachineAPI()
subject = context.run_variation['subject']
target = context.run_variation['target']
input = self.target_inputs[context.run_variation['target']]
target_path = f'{self.project_directory}/code/control_group/{subject}'
self.external_run_dir = f'{self.project_directory}/experiments/{self.name}/{context.run_dir.name}'
energibrige_command = f'sudo -S {self.project_directory}/EnergiBridge/target/release/energibridge --interval {self.metric_capturing_interval} --summary --output {self.external_run_dir}/energibridge.csv --command-output {self.external_run_dir}/output.txt taskset -c 0'
venv_python = ''
if 'python_version' in self.subject_execution_templates[subject]:
python_version = self.subject_execution_templates[subject]['python_version']
ssh.execute_remote_command(f"find {self.venv_python_path} -type d -name '*{python_version}' | head -n 1")
# Get path to desired python venv
venv_python = ssh.stdout.readline().strip()
# Fill command with current run values
subject_command = self.subject_execution_templates[subject]['file_io' if isinstance(input, str) else 'value_io'].format_map(defaultdict(str, {'target_path' : target_path,
'target' : target,
'venv_python' : f"{venv_python}/bin/python",
'input' : input
}))
# Wrap command to fit paramiko call
if isinstance(input, str):
self.execution_command = f"{energibrige_command} {subject_command}< <(echo {getenv('NUC_PASSWORD')} && cat {input})"
else:
self.execution_command = f"echo {getenv('NUC_PASSWORD')} | {energibrige_command} {subject_command}"
# self.perf_command = f'sudo -S perf stat -C 0 -x, -o {self.external_run_dir}/perf.csv -e cache-references,cache-misses,LLC-loads,LLC-load-misses,LLC-stores,LLC-store-misses -p'
self.perf_command = f'sudo -S perf stat -C 0 -x, -o {self.external_run_dir}/perf.csv -e cpu_core/cache-references/,cpu_core/cache-misses/,cpu_core/LLC-loads/,cpu_core/LLC-load-misses/,cpu_core/LLC-stores/,cpu_core/LLC-store-misses/ -p'
# Make directory of run on experimental machine
ssh.execute_remote_command(f"echo {getenv('NUC_PASSWORD')} | sudo -S mkdir -p {self.external_run_dir}")
del ssh
output.console_log(f'Run directory on experimental machine: {self.external_run_dir}')
output.console_log_OK('Run configuration is successful.')
def start_measurement(self, context: RunnerContext) -> None:
output.console_log("Config.start_measurement() called!")
ssh = ExternalMachineAPI()
output.console_log(f'Running command through energibridge with:\n{self.execution_command}')
self.run_time = time.time()
env_var = {'DISABLE_PYSTON' : '1'} if context.run_variation['subject'] != 'pyston-lite' else None
ssh.execute_remote_command(self.execution_command, env=env_var)
self.interaction_pid = int(ssh.stdout.readline())
# Start perf monitor and attach it to the target process
output.console_log(f'PID of run interaction: {self.interaction_pid}')
perf_command = f"echo {getenv('NUC_PASSWORD')} | {self.perf_command} {str(self.interaction_pid)}"
output.console_log(f'Running perf with:\n{perf_command}')
ssh.execute_remote_command(perf_command, overwrite_channels=False)
output.console_log_OK('Run has successfuly started.')
# Energy Bridge Summary format: Energy consumption in joules: 7.630859375 for 2.0023594 sec of execution.
next_line : str = ssh.stdout.readline()
self.run_time = time.time() - self.run_time
output.console_log_bold(f'Summary of energibridge: {next_line}')
del ssh
output.console_log_OK('Run has successfuly finished.')
def interact(self, context: RunnerContext) -> None:
pass
def stop_measurement(self, context: RunnerContext) -> None:
pass
def stop_run(self, context: RunnerContext) -> None:
pass
def populate_run_data(self, context: RunnerContext) -> Optional[Dict[str, SupportsStr]]:
"""Parse and process any measurement data here.
You can also store the raw measurement data under `context.run_dir`
Returns a dictionary with keys `self.run_table_model.data_columns` and their values populated"""
output.console_log("Config.populate_run_data() called!")
ssh = ExternalMachineAPI()
ssh.execute_remote_command(f'ls {self.external_run_dir}')
files = ssh.stdout.readlines()
for file in files:
output.console_log_WARNING(f"This is the file on the other machine {file}")
ssh.copy_file_from_remote(f'{self.external_run_dir}/{file.strip()}', context.run_dir)
# local_output_validation_file = f"./code/outputs/{context.run_variation['target']}.txt"
# # Check if run results are correct before storing data for the run
# if compare_files_bash(local_output_validation_file, received_output_file):
# # Extract perf output from experimental machine for current run
# else:
# output.console_log_FAIL(f'Target function did not return the expected result.')
# # Remove output files because they take a lot of space
# # ssh.execute_remote_command(f"echo {getenv('NUC_PASSWORD')} | sudo -S rm -r {self.external_run_dir}")
received_output_file = f"{context.run_dir}/output.txt"
remove(received_output_file)
perf_output = parse_perf_output(f"{context.run_dir}/perf.csv")
energibridge_output = parse_energibridge_output(f"{context.run_dir}/energibridge.csv")
energibridge_output['execution_time'] = self.run_time
del ssh
return dict(perf_output.items() | energibridge_output.items())
def after_experiment(self) -> None:
ssh = ExternalMachineAPI()
ssh.execute_remote_command(f"echo {getenv('NUC_PASSWORD')} | sudo -S rm -r {self.project_directory}/experiments")
del ssh
# ================================ DO NOT ALTER BELOW THIS LINE ================================
experiment_path: Path = None