Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions autotune/core/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def __init__(self, jobs: ProfileJobs, warmup: int = 10, iters: int = 100):

def __call__(self):
"""Execute the full benchmarking pipeline: compile, then run on device."""
self._compile_all_kernels()
self.jobs.dump_json()
self._run_on_neuron_cores()
self._compile_and_run_dynamic_load_balancing()
self.jobs.dump_json()

def _compile_all_kernels(self):
Expand Down Expand Up @@ -98,3 +96,60 @@ def _run_on_neuron_cores(self):
pbar.close()
for executor in executors:
executor.shutdown(wait=True)

def _compile_and_run_dynamic_load_balancing(self):
# --- Compile Setup --- #
cpu_count = os.cpu_count() or 1
num_cpu_workers = min(max(cpu_count - 1, 1), len(self.jobs.jobs))
num_jobs = len(self.jobs.jobs)
job_id_groups = split_jobs_into_groups(job_ids=list(range(num_jobs)), num_groups=num_cpu_workers)
compiler_executor = ProcessPoolExecutor(max_workers=num_cpu_workers)
c_pbar = tqdm(total=num_jobs, desc=f"Compiling {num_jobs} kernels on {num_cpu_workers} CPUs", unit="kernels")

# --- Neuron Execute Setup --- #
num_neuron_workers = 128
counter = mp.Value("i", 0)
lock = mp.Lock()
runner_executor = ProcessPoolExecutor(
max_workers=num_neuron_workers, initializer=set_neuron_core_dynamic, initargs=(counter, lock)
)
e_pbar = tqdm(
total=num_jobs,
desc=f"Processing/Running {num_jobs} kernels on {num_neuron_workers} Neuron cores",
unit="kernels",
)

# --- Compile + Execute --- #
pending_futures = set()
compiled_futures = set()
for rank_job_ids in job_id_groups:
rank_jobs = self.jobs.subset(rank_job_ids)
compiling_future = compiler_executor.submit(compile_jobs, rank_jobs)
compiled_futures.add(compiling_future)
pending_futures.add(compiling_future)

while pending_futures:
for future in as_completed(pending_futures):
pending_futures.remove(future)
updated_jobs = future.result()
for job_index in updated_jobs.jobs:
updated_job = updated_jobs.jobs[job_index]
self.jobs.jobs[job_index] = updated_job
if future in compiled_futures:
if not updated_job.has_error:
kwargs = {"warmup": self.warmup, "iters": self.iters, "jobs": self.jobs.subset([job_index])}
executing_future = runner_executor.submit(
run_on_neuron_core_dynamic, **kwargs
) # TODO (shraya): changed from run_on_neuron_core_dynamic
pending_futures.add(executing_future)
else:
e_pbar.update(1)
c_pbar.update(len(updated_jobs.jobs))
else:
e_pbar.update(1)

# --- Cleanup --- #
c_pbar.close()
e_pbar.close()
compiler_executor.shutdown(wait=True)
runner_executor.shutdown(wait=True)
18 changes: 18 additions & 0 deletions autotune/core/parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import atexit
import os
import random
from multiprocessing import Lock, Value
from typing import List

from neuronpy.runtime.spike import SpikeExecutor

worker_spike_executor = None


def split_jobs_into_groups(job_ids: List[int], num_groups: int) -> List[List[int]]:
"""
Expand Down Expand Up @@ -37,3 +43,15 @@ def set_neuron_core(core_id: int):
"""
os.environ["NEURON_RT_VISIBLE_CORES"] = str(core_id)
os.environ["NEURON_LOGICAL_NC_CONFIG"] = "1"

def set_neuron_core_dynamic(core_counter: Value, lock: Lock):
with lock:
set_neuron_core(core_counter.value)
core_counter.value += 1
global worker_spike_executor
worker_spike_executor = SpikeExecutor(verbose=0)

def cleanup_spike():
worker_spike_executor.sc.close()

atexit.register(cleanup_spike)
67 changes: 67 additions & 0 deletions autotune/core/run_nki.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from neuronpy.runtime.spike import SpikeExecutor

import autotune.core.parallel as parallel
from autotune.core.compile import create_spike_kernel, run_spike_kernel
from autotune.core.job import ProfileJobs
from autotune.core.metrics import extract_metrics
Expand Down Expand Up @@ -52,6 +53,50 @@ def run_neuron_benchmarks(jobs: ProfileJobs, warmup: int, iters: int) -> None:
error_msg = capture_error_message(e)
job.add_error(error_msg)

def run_neuron_benchmarks_dynamic(jobs: ProfileJobs, warmup: int, iters: int) -> None:
"""
Run benchmarks on Neuron cores using SpikeExecutor.
Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
results: List of ProfileResult objects to update with benchmark results
"""
spike = parallel.worker_spike_executor
for job_index in jobs.jobs:
job = jobs.jobs[job_index]
# Skip if job already failed
if job.has_error:
continue

try:
spike_kernel = create_spike_kernel(job.neff, job.kernel, job.input_tensors, job.kernel_kwargs)
stats = spike.benchmark(
spike_kernel,
*job.input_tensors,
**job.kernel_kwargs,
warmup_iterations=warmup,
benchmark_iterations=iters,
device_id=0,
)
ntff_file, kernel_outputs = run_spike_kernel(
spike, spike_kernel, job.input_tensors, job.neff, job.kernel_kwargs
)
job.add_attributes(ntff=ntff_file, **stats)
job.postprocessing(job.input_tensors, job.kernel_kwargs, kernel_outputs)
job.add_attributes(postprocessing_result=True)
metrics = extract_metrics(
job.neff,
ntff_file,
latency=job.min_ms,
matmul_mac_count=job.matmul_mac_count,
target_instance_family=jobs.target_instance_family,
)
job.add_attributes(**metrics)

except Exception as e:
error_msg = capture_error_message(e)
job.add_error(error_msg)

def run_on_neuron_core(warmup: int, iters: int, jobs: ProfileJobs) -> ProfileJobs:
"""
Expand All @@ -74,3 +119,25 @@ def run_on_neuron_core(warmup: int, iters: int, jobs: ProfileJobs) -> ProfileJob
run_neuron_benchmarks(jobs, warmup, iters)

return jobs

def run_on_neuron_core_dynamic(warmup: int, iters: int, jobs: ProfileJobs) -> ProfileJobs:
"""
Run kernels with separated CPU compilation and Neuron execution phases.

This function initializes ProfileResult objects for each job, then
compiles all kernels on CPU (without SpikeExecutor), and finally
runs benchmarks on Neuron cores (with SpikeExecutor).

Args:
warmup: Number of warmup iterations
iters: Number of benchmark iterations
jobs: ProfileJobs containing all jobs to run
"""

# Pre-initialize all input tensors once for all jobs with the same shapes
jobs.initialize_input_tensors()

# Run benchmarks on Neuron (requires SpikeExecutor)
run_neuron_benchmarks_dynamic(jobs, warmup, iters)

return jobs