Skip to content

Commit 9c25306

Browse files
authored
Change routing policy API to be async to support more policies (sgl-project#17048)
1 parent d2c8638 commit 9c25306

16 files changed

Lines changed: 350 additions & 217 deletions

sgl-model-gateway/benches/manual_policy_benchmark.rs

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::{sync::Arc, thread};
1+
use std::sync::Arc;
22

33
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
44
use smg::{
55
core::{BasicWorkerBuilder, Worker, WorkerType},
66
policies::{LoadBalancingPolicy, ManualPolicy, SelectWorkerInfo},
77
};
8+
use tokio::runtime::Runtime;
89

910
// ============================================================================
1011
// Test Helpers
@@ -22,19 +23,24 @@ fn create_workers(count: usize) -> Vec<Arc<dyn Worker>> {
2223
.collect()
2324
}
2425

25-
fn select_with_key(policy: &ManualPolicy, workers: &[Arc<dyn Worker>], key: &str) -> Option<usize> {
26+
fn select_with_key(
27+
rt: &Runtime,
28+
policy: &ManualPolicy,
29+
workers: &[Arc<dyn Worker>],
30+
key: &str,
31+
) -> Option<usize> {
2632
let mut headers = http::HeaderMap::new();
2733
headers.insert("x-smg-routing-key", key.parse().unwrap());
2834
let info = SelectWorkerInfo {
2935
headers: Some(&headers),
3036
..Default::default()
3137
};
32-
policy.select_worker(workers, &info)
38+
rt.block_on(policy.select_worker(workers, &info))
3339
}
3440

35-
fn warmup_keys(policy: &ManualPolicy, workers: &[Arc<dyn Worker>], keys: &[String]) {
41+
fn warmup_keys(rt: &Runtime, policy: &ManualPolicy, workers: &[Arc<dyn Worker>], keys: &[String]) {
3642
for key in keys {
37-
select_with_key(policy, workers, key);
43+
select_with_key(rt, policy, workers, key);
3844
}
3945
}
4046

@@ -47,13 +53,14 @@ fn gen_keys(count: usize, prefix: &str) -> Vec<String> {
4753
// ============================================================================
4854

4955
fn bench_fast_path_hit(c: &mut Criterion) {
56+
let rt = Runtime::new().unwrap();
5057
let mut group = c.benchmark_group("manual_policy/fast_path");
5158

5259
for worker_count in [4, 16, 64, 256] {
5360
let policy = ManualPolicy::new();
5461
let workers = create_workers(worker_count);
5562
let keys = gen_keys(1000, "user-");
56-
warmup_keys(&policy, &workers, &keys);
63+
warmup_keys(&rt, &policy, &workers, &keys);
5764

5865
group.throughput(Throughput::Elements(1));
5966
group.bench_with_input(
@@ -62,7 +69,7 @@ fn bench_fast_path_hit(c: &mut Criterion) {
6269
|b, _| {
6370
let mut idx = 0;
6471
b.iter(|| {
65-
let result = select_with_key(&policy, &workers, &keys[idx % keys.len()]);
72+
let result = select_with_key(&rt, &policy, &workers, &keys[idx % keys.len()]);
6673
idx += 1;
6774
black_box(result)
6875
});
@@ -73,6 +80,7 @@ fn bench_fast_path_hit(c: &mut Criterion) {
7380
}
7481

7582
fn bench_slow_path_vacant(c: &mut Criterion) {
83+
let rt = Runtime::new().unwrap();
7684
let mut group = c.benchmark_group("manual_policy/slow_path_vacant");
7785

7886
for worker_count in [4, 16, 64, 256] {
@@ -87,7 +95,7 @@ fn bench_slow_path_vacant(c: &mut Criterion) {
8795
let mut idx = 0;
8896
b.iter(|| {
8997
let key = format!("new-user-{}", idx);
90-
let result = select_with_key(&policy, &workers, &key);
98+
let result = select_with_key(&rt, &policy, &workers, &key);
9199
idx += 1;
92100
black_box(result)
93101
});
@@ -98,6 +106,7 @@ fn bench_slow_path_vacant(c: &mut Criterion) {
98106
}
99107

100108
fn bench_no_routing_key(c: &mut Criterion) {
109+
let rt = Runtime::new().unwrap();
101110
let mut group = c.benchmark_group("manual_policy/no_routing_key");
102111

103112
for worker_count in [4, 16, 64, 256] {
@@ -110,14 +119,15 @@ fn bench_no_routing_key(c: &mut Criterion) {
110119
&worker_count,
111120
|b, _| {
112121
let info = SelectWorkerInfo::default();
113-
b.iter(|| black_box(policy.select_worker(&workers, &info)));
122+
b.iter(|| black_box(rt.block_on(policy.select_worker(&workers, &info))));
114123
},
115124
);
116125
}
117126
group.finish();
118127
}
119128

120129
fn bench_failover(c: &mut Criterion) {
130+
let rt = Runtime::new().unwrap();
121131
let mut group = c.benchmark_group("manual_policy/failover");
122132
group.sample_size(50);
123133

@@ -130,12 +140,12 @@ fn bench_failover(c: &mut Criterion) {
130140
|| {
131141
let policy = ManualPolicy::new();
132142
let workers = create_workers(count);
133-
let idx = select_with_key(&policy, &workers, "failover-test").unwrap();
143+
let idx = select_with_key(&rt, &policy, &workers, "failover-test").unwrap();
134144
workers[idx].set_healthy(false);
135145
(policy, workers)
136146
},
137147
|(policy, workers)| {
138-
black_box(select_with_key(&policy, &workers, "failover-test"))
148+
black_box(select_with_key(&rt, &policy, &workers, "failover-test"))
139149
},
140150
);
141151
},
@@ -145,6 +155,12 @@ fn bench_failover(c: &mut Criterion) {
145155
}
146156

147157
fn bench_concurrent(c: &mut Criterion) {
158+
let rt = Arc::new(
159+
tokio::runtime::Builder::new_multi_thread()
160+
.worker_threads(4)
161+
.build()
162+
.unwrap(),
163+
);
148164
let mut group = c.benchmark_group("manual_policy/concurrent");
149165
group.sample_size(50);
150166

@@ -157,32 +173,35 @@ fn bench_concurrent(c: &mut Criterion) {
157173
let policy = Arc::new(ManualPolicy::new());
158174
let workers: Arc<Vec<Arc<dyn Worker>>> = Arc::new(create_workers(16));
159175

160-
let handles: Vec<_> = (0..threads)
161-
.map(|t| {
162-
let policy = Arc::clone(&policy);
163-
let workers = Arc::clone(&workers);
164-
thread::spawn(move || {
165-
for i in 0..500 {
166-
let key = if i % 5 == 0 {
167-
format!("thread{}_user{}", t, i)
168-
} else {
169-
format!("shared_user{}", i % 50)
170-
};
171-
let mut headers = http::HeaderMap::new();
172-
headers.insert("x-smg-routing-key", key.parse().unwrap());
173-
let info = SelectWorkerInfo {
174-
headers: Some(&headers),
175-
..Default::default()
176-
};
177-
let _ = black_box(policy.select_worker(&workers, &info));
178-
}
176+
rt.block_on(async {
177+
let handles: Vec<_> = (0..threads)
178+
.map(|t| {
179+
let policy = Arc::clone(&policy);
180+
let workers = Arc::clone(&workers);
181+
tokio::spawn(async move {
182+
for i in 0..500 {
183+
let key = if i % 5 == 0 {
184+
format!("thread{}_user{}", t, i)
185+
} else {
186+
format!("shared_user{}", i % 50)
187+
};
188+
let mut headers = http::HeaderMap::new();
189+
headers.insert("x-smg-routing-key", key.parse().unwrap());
190+
let info = SelectWorkerInfo {
191+
headers: Some(&headers),
192+
..Default::default()
193+
};
194+
let _ =
195+
black_box(policy.select_worker(&workers, &info).await);
196+
}
197+
})
179198
})
180-
})
181-
.collect();
199+
.collect();
182200

183-
for h in handles {
184-
h.join().unwrap();
185-
}
201+
for h in handles {
202+
h.await.unwrap();
203+
}
204+
});
186205
});
187206
},
188207
);
@@ -191,19 +210,20 @@ fn bench_concurrent(c: &mut Criterion) {
191210
}
192211

193212
fn bench_cache_size_impact(c: &mut Criterion) {
213+
let rt = Runtime::new().unwrap();
194214
let mut group = c.benchmark_group("manual_policy/cache_size");
195215

196216
for cache_size in [100, 1000, 10000, 100000] {
197217
let policy = ManualPolicy::new();
198218
let workers = create_workers(16);
199219
let keys = gen_keys(cache_size, "user-");
200-
warmup_keys(&policy, &workers, &keys);
220+
warmup_keys(&rt, &policy, &workers, &keys);
201221

202222
group.throughput(Throughput::Elements(1));
203223
group.bench_with_input(BenchmarkId::new("keys", cache_size), &cache_size, |b, _| {
204224
let mut idx = 0;
205225
b.iter(|| {
206-
let result = select_with_key(&policy, &workers, &keys[idx % keys.len()]);
226+
let result = select_with_key(&rt, &policy, &workers, &keys[idx % keys.len()]);
207227
idx += 1;
208228
black_box(result)
209229
});

0 commit comments

Comments
 (0)