Skip to content

Expert-wise scheduling and executing #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aaeae22
support split batch by expert id
shawlleyw Apr 17, 2025
f9e64b6
fix serial expert execution, enable expert wise execution
shawlleyw Apr 17, 2025
dfdf27b
benchmark grouped_gemm
shawlleyw Apr 17, 2025
06a429b
visualize expert-wise queue length
shawlleyw Apr 18, 2025
cb102f6
change attention scheduler
shawlleyw Apr 18, 2025
9317659
support increasing poisson & log peak throughput
shawlleyw Apr 18, 2025
4111df5
fuse cuda graph metadata copy
shawlleyw Apr 18, 2025
54c3498
1. fix incremental poisson
shawlleyw Apr 19, 2025
0da1c97
scheduler looks back
shawlleyw Apr 20, 2025
111ec71
fix execution bug and cuda graph copy
shawlleyw Apr 20, 2025
02dfca6
1. fix timing
shawlleyw Apr 21, 2025
e295e2a
fix lookback compilation error
shawlleyw Apr 21, 2025
1710109
fix looking back in scheduler
shawlleyw Apr 21, 2025
63c4264
fix look back mechanism
shawlleyw Apr 22, 2025
164f439
refactor sampler
shawlleyw Apr 22, 2025
a79fb07
fix topk, remove topk weights for attn
shawlleyw Apr 22, 2025
72ed16c
Merge pull request #81 from USC-NSL/sample_at_attn
shawlleyw Apr 22, 2025
dc42e63
execution batch size histogram
shawlleyw Apr 23, 2025
1434ae1
refine plotter
shawlleyw Apr 23, 2025
61c7e89
refine tracer, print memory logs
shawlleyw Apr 23, 2025
bbc14e5
prepare for benchmark
shawlleyw Apr 24, 2025
277c49c
dp scheduler support variable length
shawlleyw Apr 24, 2025
391bdbf
Merge variable_output_length
shawlleyw Apr 24, 2025
b4b4021
add benchmark suites
shawlleyw Apr 24, 2025
778822b
lambda benchmark
shawlleyw Apr 24, 2025
cc4763e
update script
shawlleyw Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 83 additions & 42 deletions benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,15 @@ def launch(args):
model_config.tp_enable_inter_group = False
model_config.enable_cuda_graph_attn = args.cuda_graph_attn
model_config.enable_cuda_graph_expert = False
model_config.enable_grouped_gemm = not args.serial_gemm
model_config.enable_grouped_gemm = not args.serial_gemm and not args.expert_wise_schedule
model_config.num_experts = args.num_experts
model_config.dp_size = args.dp_size
model_config.max_batch_size_attn = args.max_batch_size_attn
model_config.max_batch_size_expert = args.max_batch_size_expert
model_config.graph_stride = args.graph_stride
model_config.top_k = args.topk
model_config.enable_trace = args.trace
model_config.num_kv_heads = args.num_kv_heads

mp = get_model_placement(model_config, cluster_config, args.placement,
step_attn=args.step_attn, step_expert=args.step_expert,
Expand All @@ -113,7 +114,12 @@ def launch(args):

global master

master = init_controller(cluster_config.n_node, cluster_config.n_gpu, args.nsys)
master = init_controller(
cluster_config.n_node,
cluster_config.n_gpu,
expert_wise_schedule=args.expert_wise_schedule,
enable_nsys=args.nsys
)

cache_config = CacheConfig(args.block_size, args.gpu_usage, 2, "auto",
num_gpu_blocks=args.num_blocks + RESERVED_BLOCKS if args.num_blocks else None, # default should be None
Expand All @@ -123,9 +129,6 @@ def launch(args):

master.init_engine(mp, model_config, cache_config, sampling_config)

if args.profile_dir is not None:
master.start_profile(args.profile_dir)

master.start_engine()

return master
Expand All @@ -148,7 +151,7 @@ def analyze_results(results: List[SloStat], duration: float):
itls.extend(result.t_tokens)
req_latency.append(result.t_decode)

return BenchmarkMetrics(
metrics = BenchmarkMetrics(
e2e_duration=duration,
req_throughput=num_reqs / duration,
token_throughput=total_output_tokens / duration,
Expand All @@ -161,7 +164,8 @@ def analyze_results(results: List[SloStat], duration: float):
itl_latency_median_ms=np.median(itls) * 1000,
itl_latency_p99_ms=np.percentile(itls, 99) * 1000,
)


return metrics

def analyze_batch_sizes(all_batch_sizes: List[List[int]]):
try:
Expand Down Expand Up @@ -248,51 +252,75 @@ def ms_to_us(ms):
metrics[pid] = asdict(metric)

from benchmark.plotter.namer import get_trace_name, get_queue_length_name, get_trace_metrics_name

trace_dir = os.path.dirname(args.file)

with gzip.open(get_trace_name(args), "w") as f:
with gzip.open(get_trace_name(args, trace_dir), "w") as f:
f.write(json.dumps(events).encode("utf-8"))

with open(get_trace_metrics_name(args), "w") as f:
with open(get_trace_metrics_name(args, trace_dir), "w") as f:
json.dump(metrics, f)

with open(get_queue_length_name(args), "wb") as f:
with open(get_queue_length_name(args, trace_dir), "wb") as f:
pickle.dump(queue_length_per_step, f)

def analyze_throughput(args,
req_submit_timestamps: List[float],
req_finish_timestamps: List[float],
sampler_step_infos: List[SamplerStepInfo],
attn_queueing_delays: List[List[float]],
exp_queueing_delays: List[List[float]],
t_submitted: Dict[int, int],
slo_stats: List[SloStat]):
from benchmark.plotter.namer import get_sampler_step_name, get_worker_queueing_delay_name, get_ttft_name, get_req_finish_time_name
from benchmark.plotter.namer import get_sampler_step_name, get_worker_queueing_delay_name, \
get_ttft_name, get_req_finish_time_name, get_req_submit_time_name
trace_dir = os.path.dirname(args.file)
# request submit timestamp
req_submit_fn = get_req_submit_time_name(args, trace_dir)
req_submit_df = pd.DataFrame(req_submit_timestamps)
req_submit_df.to_csv(req_submit_fn, index=False)

# request finish timestamp
req_finish_fn = get_req_finish_time_name(args)
req_finish_fn = get_req_finish_time_name(args, trace_dir)
req_finish_df = pd.DataFrame(req_finish_timestamps)
req_finish_df.to_csv(req_finish_fn, index=False)

# sampler throughput
sampler_fn = get_sampler_step_name(args)
sampler_fn = get_sampler_step_name(args, trace_dir)
sampler_df = pd.DataFrame([asdict(info) for info in sampler_step_infos])
sampler_df.to_csv(sampler_fn, index=False)

def get_peak_throughput(time_bin=5):
sampler_df['time_stamp'] = (sampler_df['time_stamp'] - sampler_df['time_stamp'].iloc[0]) / (10 ** 3)
seg = int((sampler_df['time_stamp'].iloc[-1] - sampler_df['time_stamp'].iloc[0] + time_bin - 1) // time_bin)
time_bins = [sampler_df['time_stamp'].iloc[0] + i * time_bin for i in range(seg + 1)]
time_sums = sampler_df.groupby(pd.cut(sampler_df['time_stamp'], bins=time_bins))['num_tokens'].sum()
time_sums /= time_bin

num_bins = len(time_sums)
peak_throughput_time_range = 60 # seconds
step = peak_throughput_time_range // 2 // time_bin
peak_throughput_range = time_sums[num_bins // 2 - step : num_bins // 2 + step]
return sum(peak_throughput_range) / len(peak_throughput_range)

# queueing delay
attn_fn = get_worker_queueing_delay_name(args, "attn")
attn_fn = get_worker_queueing_delay_name(args, "attn", trace_dir)
attn_df = pd.DataFrame(attn_queueing_delays)
attn_df.to_csv(attn_fn, index=False)

exp_fn = get_worker_queueing_delay_name(args, "exp")
exp_fn = get_worker_queueing_delay_name(args, "exp", trace_dir)
exp_df = pd.DataFrame(exp_queueing_delays)
exp_df.to_csv(exp_fn, index=False)

# TTFT
ttft_fn = get_ttft_name(args)
ttft_fn = get_ttft_name(args, trace_dir)
ttft_df = pd.DataFrame([
stat.t_prefill_std - t_submitted[stat.req_id]
for stat in slo_stats
])
ttft_df.to_csv(ttft_fn, index=False)

return get_peak_throughput()


async def run_benchmark(master: Controller, args,
Expand All @@ -301,20 +329,21 @@ async def run_benchmark(master: Controller, args,
min_output_len, max_output_len,
rate, warmup=False):
GeneratorType = get_generator(generator_type)
generator = GeneratorType(rate, 1, min_input_len, max_input_len)
generator = GeneratorType(rate, 1, min_input_len, max_input_len, min_output_len, max_output_len)
workload = generator.generate_num(num_requests)
pbar = tqdm.tqdm(total=num_requests)
t_start = time.perf_counter()
tasks = []
req_submit_timestamps = []
req_finish_timestamps = []
logger.info(f"generating requests at rate {args.rate} s/req, in total {args.num_requests} requests")
for i in range(num_requests):
t_elapsed = time.perf_counter() - t_start
arrival, input_len = workload[i]
# logger.debug(f"Request {i} arrives at {arrival}s, input_len={input_len}")
arrival, input_len, output_len = workload[i]
if t_elapsed < arrival:
await asyncio.sleep(arrival - t_elapsed)
resp = master.put_single_request(input_len)
req_submit_timestamps.append(time.perf_counter() - t_start)
resp = master.put_single_request(input_len, output_len)
tasks.append(asyncio.create_task(process_response(resp, req_finish_timestamps, pbar)))

results: List[SloStat] = await asyncio.gather(*tasks)
Expand All @@ -325,32 +354,30 @@ async def run_benchmark(master: Controller, args,
return None

logger.info("Benchmark finished, now analyznig results ...")
metrics = analyze_results(results, t_duration)
logger.debug(f"{metrics}")


req_finish_timestamps.sort()
for i in range(num_requests):
req_finish_timestamps[i] -= t_start

metrics.write_to_file(args)
logger.info("Results written to file.")

return results, req_finish_timestamps

return results, req_submit_timestamps, req_finish_timestamps, t_duration

async def benchmark_warmup(master: Controller, args):
logger.info("Now running warmup ...")
_num_warmup_requests = 10
_num_warmup_rate = 5
warmup_input = 100
warmup_output = 50
await run_benchmark(
master, args, args.generator_type, _num_warmup_requests,
args.min_input_len, args.max_input_len,
args.min_output_len, args.max_output_len,
warmup_input, warmup_input + 1, warmup_output, warmup_output + 1,
_num_warmup_rate, warmup=True
)
master.reset()
logger.info("Warmup done.")

def post_benchmark(master, args, results, req_finish_timestamps):
def post_benchmark(master, args, results, req_submit_timestamps, req_finish_timestamps, duration):
metrics = analyze_results(results, duration)

if args.trace:
step_stats = master.fetch_step_stats()
generate_step_trace(args, step_stats)
Expand All @@ -359,11 +386,17 @@ def post_benchmark(master, args, results, req_finish_timestamps):
sampler_step_infos = master.fetch_sampler_step_infos()
attn_delays, exp_delays = master.fetch_queueing_delays()
t_submitted = master.fetch_submitted_time()
analyze_throughput(args,
req_finish_timestamps,
sampler_step_infos,
attn_delays, exp_delays,
t_submitted, results)
throughput = analyze_throughput(args,
req_submit_timestamps,
req_finish_timestamps,
sampler_step_infos,
attn_delays, exp_delays,
t_submitted, results)
metrics.token_throughput = throughput

metrics.write_to_file(args)
logger.info("Results written to file.")
return metrics

async def benchmark_serving(
master: Controller,
Expand All @@ -376,10 +409,13 @@ async def benchmark_serving(
master.start_polling_results()
await master.start_scheduler()
await benchmark_warmup(master, args)


if args.profile_dir is not None:
master.start_profile(args.profile_dir)

# run benchmark
logger.info("Now running benchmark.")
results, req_finish_timestamps = await run_benchmark(
results, req_submit_timestamps, req_finish_timestamps, duration = await run_benchmark(
master, args, args.generator_type, args.num_requests,
args.min_input_len, args.max_input_len,
args.min_output_len, args.max_output_len, args.rate
Expand All @@ -388,11 +424,15 @@ async def benchmark_serving(
if not is_api_server:
await master.stop_scheduler()
master.stop_workers()

master.stop_profile()

post_benchmark(master, args, results, req_finish_timestamps)
metrics = post_benchmark(master, args, results, req_submit_timestamps, req_finish_timestamps, duration)

master.reset()


return metrics

def get_args():
args = get_parser_base().parse_args()

Expand All @@ -414,9 +454,10 @@ def main():

try:
launch(args)
asyncio.run(benchmark_serving(master, args))
metrics = asyncio.run(benchmark_serving(master, args))
print(metrics) # Or do something else with the metrics
except Exception as e:
print("Error: failed to run benchmark, with exception:", e.with_traceback())
print("Error: failed to run benchmark, with exception:", e.with_traceback(None))
BenchmarkMetrics().write_to_file(args)

if __name__ == "__main__":
Expand Down
Loading