-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathrun_nki.py
More file actions
143 lines (122 loc) · 5.37 KB
/
run_nki.py
File metadata and controls
143 lines (122 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from neuronpy.runtime.spike import SpikeExecutor
import autotune.core.parallel as parallel
from autotune.core.compile import create_spike_kernel, run_spike_kernel
from autotune.core.job import ProfileJobs
from autotune.core.metrics import extract_metrics
from autotune.core.utils import capture_error_message
def run_neuron_benchmarks(jobs: ProfileJobs, warmup: int, iters: int) -> None:
"""
Run benchmarks on Neuron cores using SpikeExecutor.
Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
results: List of ProfileResult objects to update with benchmark results
"""
with SpikeExecutor(verbose=0) as spike:
for job_index in jobs.jobs:
job = jobs.jobs[job_index]
# Skip if job already failed
if job.has_error:
continue
try:
spike_kernel = create_spike_kernel(job.neff, job.kernel, job.input_tensors, job.kernel_kwargs)
stats = spike.benchmark(
spike_kernel,
*job.input_tensors,
**job.kernel_kwargs,
warmup_iterations=warmup,
benchmark_iterations=iters,
device_id=0,
)
ntff_file, kernel_outputs = run_spike_kernel(
spike, spike_kernel, job.input_tensors, job.neff, job.kernel_kwargs
)
job.add_attributes(ntff=ntff_file, **stats)
job.postprocessing(job.input_tensors, job.kernel_kwargs, kernel_outputs)
job.add_attributes(postprocessing_result=True)
metrics = extract_metrics(
job.neff,
ntff_file,
latency=job.min_ms,
matmul_mac_count=job.matmul_mac_count,
target_instance_family=jobs.target_instance_family,
)
job.add_attributes(**metrics)
except Exception as e:
error_msg = capture_error_message(e)
job.add_error(error_msg)
def run_neuron_benchmarks_dynamic(jobs: ProfileJobs, warmup: int, iters: int) -> None:
"""
Run benchmarks on Neuron cores using SpikeExecutor.
Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
results: List of ProfileResult objects to update with benchmark results
"""
spike = parallel.worker_spike_executor
for job_index in jobs.jobs:
job = jobs.jobs[job_index]
# Skip if job already failed
if job.has_error:
continue
try:
spike_kernel = create_spike_kernel(job.neff, job.kernel, job.input_tensors, job.kernel_kwargs)
stats = spike.benchmark(
spike_kernel,
*job.input_tensors,
**job.kernel_kwargs,
warmup_iterations=warmup,
benchmark_iterations=iters,
device_id=0,
)
ntff_file, kernel_outputs = run_spike_kernel(
spike, spike_kernel, job.input_tensors, job.neff, job.kernel_kwargs
)
job.add_attributes(ntff=ntff_file, **stats)
job.postprocessing(job.input_tensors, job.kernel_kwargs, kernel_outputs)
job.add_attributes(postprocessing_result=True)
metrics = extract_metrics(
job.neff,
ntff_file,
latency=job.min_ms,
matmul_mac_count=job.matmul_mac_count,
target_instance_family=jobs.target_instance_family,
)
job.add_attributes(**metrics)
except Exception as e:
error_msg = capture_error_message(e)
job.add_error(error_msg)
def run_on_neuron_core(warmup: int, iters: int, jobs: ProfileJobs) -> ProfileJobs:
"""
Run kernels with separated CPU compilation and Neuron execution phases.
This function initializes ProfileResult objects for each job, then
compiles all kernels on CPU (without SpikeExecutor), and finally
runs benchmarks on Neuron cores (with SpikeExecutor).
Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
"""
# Pre-initialize all input tensors once for all jobs with the same shapes
jobs.initialize_input_tensors()
# Run benchmarks on Neuron (requires SpikeExecutor)
run_neuron_benchmarks(jobs, warmup, iters)
return jobs
def run_on_neuron_core_dynamic(warmup: int, iters: int, jobs: ProfileJobs) -> ProfileJobs:
"""
Run kernels with separated CPU compilation and Neuron execution phases.
This function initializes ProfileResult objects for each job, then
compiles all kernels on CPU (without SpikeExecutor), and finally
runs benchmarks on Neuron cores (with SpikeExecutor).
Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
"""
# Pre-initialize all input tensors once for all jobs with the same shapes
jobs.initialize_input_tensors()
# Run benchmarks on Neuron (requires SpikeExecutor)
run_neuron_benchmarks_dynamic(jobs, warmup, iters)
return jobs