Skip to content

Commit a66b5e6

Browse files
author
Michael Ingley
committed
Optimize TSO waker scheduling and add Criterion benchmark
Signed-off-by: Michael Ingley <mingley@linkedin.com>
1 parent 9d9b680 commit a66b5e6

File tree

4 files changed

+194
-8
lines changed

4 files changed

+194
-8
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ tonic = { version = "0.10", features = ["tls", "gzip"] }
4949

5050
[dev-dependencies]
5151
clap = "2"
52+
criterion = "0.5"
5253
env_logger = "0.10"
5354
fail = { version = "0.4", features = ["failpoints"] }
5455
proptest = "1"
@@ -64,3 +65,7 @@ tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
6465
name = "failpoint_tests"
6566
path = "tests/failpoint_tests.rs"
6667
required-features = ["fail/failpoints"]
68+
69+
[[bench]]
70+
name = "tso_waker_policy"
71+
harness = false

benches/tso_waker_policy.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use std::hint::black_box;
2+
use std::sync::Arc;
3+
use std::task::{Wake, Waker};
4+
use std::time::{Duration, Instant};
5+
6+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
7+
use futures::task::AtomicWaker;
8+
9+
const MAX_PENDING_COUNT: usize = 1 << 16;
10+
const FULL_EVERY: u64 = 1024;
11+
const FULL_WINDOW: u64 = 16;
12+
13+
struct NoopWake;
14+
15+
impl Wake for NoopWake {
16+
fn wake(self: Arc<Self>) {}
17+
fn wake_by_ref(self: &Arc<Self>) {}
18+
}
19+
20+
fn response_policy_old(iterations: u64) -> Duration {
21+
let atomic_waker = AtomicWaker::new();
22+
let waker = Waker::from(Arc::new(NoopWake));
23+
atomic_waker.register(&waker);
24+
25+
let mut pending_len = 0usize;
26+
let start = Instant::now();
27+
for i in 0..iterations {
28+
if i % FULL_EVERY == 0 {
29+
pending_len = MAX_PENDING_COUNT;
30+
}
31+
black_box(pending_len >= MAX_PENDING_COUNT);
32+
pending_len = pending_len.saturating_sub(1);
33+
atomic_waker.wake();
34+
}
35+
start.elapsed()
36+
}
37+
38+
fn response_policy_new(iterations: u64) -> Duration {
39+
let atomic_waker = AtomicWaker::new();
40+
let waker = Waker::from(Arc::new(NoopWake));
41+
atomic_waker.register(&waker);
42+
43+
let mut pending_len = 0usize;
44+
let start = Instant::now();
45+
for i in 0..iterations {
46+
if i % FULL_EVERY == 0 {
47+
pending_len = MAX_PENDING_COUNT;
48+
}
49+
let was_full = pending_len >= MAX_PENDING_COUNT;
50+
pending_len = pending_len.saturating_sub(1);
51+
let should_wake = was_full && pending_len < MAX_PENDING_COUNT;
52+
if black_box(should_wake) {
53+
atomic_waker.wake();
54+
}
55+
}
56+
start.elapsed()
57+
}
58+
59+
fn register_policy_old(iterations: u64) -> Duration {
60+
let atomic_waker = AtomicWaker::new();
61+
let waker = Waker::from(Arc::new(NoopWake));
62+
63+
let start = Instant::now();
64+
for i in 0..iterations {
65+
let pending_len = if i % FULL_EVERY < FULL_WINDOW {
66+
MAX_PENDING_COUNT
67+
} else {
68+
MAX_PENDING_COUNT - 1
69+
};
70+
black_box(pending_len);
71+
atomic_waker.register(&waker);
72+
}
73+
start.elapsed()
74+
}
75+
76+
fn register_policy_new(iterations: u64) -> Duration {
77+
let atomic_waker = AtomicWaker::new();
78+
let waker = Waker::from(Arc::new(NoopWake));
79+
80+
let start = Instant::now();
81+
for i in 0..iterations {
82+
let pending_len = if i % FULL_EVERY < FULL_WINDOW {
83+
MAX_PENDING_COUNT
84+
} else {
85+
MAX_PENDING_COUNT - 1
86+
};
87+
if black_box(pending_len >= MAX_PENDING_COUNT) {
88+
atomic_waker.register(&waker);
89+
}
90+
}
91+
start.elapsed()
92+
}
93+
94+
fn bench_tso_waker_policy(c: &mut Criterion) {
95+
let mut group = c.benchmark_group("tso_waker_policy");
96+
group.warm_up_time(Duration::from_secs(2));
97+
group.measurement_time(Duration::from_secs(6));
98+
99+
group.bench_function(BenchmarkId::new("response", "old"), |b| {
100+
b.iter_custom(response_policy_old);
101+
});
102+
group.bench_function(BenchmarkId::new("response", "new"), |b| {
103+
b.iter_custom(response_policy_new);
104+
});
105+
group.bench_function(BenchmarkId::new("register", "old"), |b| {
106+
b.iter_custom(register_policy_old);
107+
});
108+
group.bench_function(BenchmarkId::new("register", "new"), |b| {
109+
b.iter_custom(register_policy_new);
110+
});
111+
112+
group.finish();
113+
}
114+
115+
criterion_group!(benches, bench_tso_waker_policy);
116+
criterion_main!(benches);

doc/tso_waker_criterion.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# TSO Waker Criterion Benchmark
2+
3+
Date: 2026-02-06
4+
Repo: `tikv/client-rust`
5+
Branch: `mingley/tso-waker-criterion`
6+
Host: macOS 26.2 (Darwin 25.2.0), Apple M4 Pro, arm64
7+
Rust toolchain: 1.84.1
8+
9+
## Goal
10+
11+
Quantify the latency impact of reducing TSO stream wake/registration churn in
12+
`src/pd/timestamp.rs`.
13+
14+
## Method
15+
16+
Benchmark framework:
17+
- Criterion (`cargo bench`)
18+
19+
Bench target:
20+
- `benches/tso_waker_policy.rs`
21+
22+
Command used:
23+
24+
```bash
25+
cargo bench --bench tso_waker_policy -- --noplot
26+
```
27+
28+
Criterion configuration in benchmark:
29+
- warmup: 2 seconds
30+
- measurement: 6 seconds
31+
- samples: 100
32+
33+
The benchmark compares old vs new policies in two isolated hot paths:
34+
- `response/*`: wake policy when processing responses
35+
- `register/*`: self-waker registration policy in no-request branch
36+
37+
## Results (Absolute Latency)
38+
39+
From Criterion output (`time` line):
40+
41+
- `tso_waker_policy/response/old`: `[3.2519 ns 3.2712 ns 3.2926 ns]`
42+
- `tso_waker_policy/response/new`: `[763.41 ps 766.39 ps 769.43 ps]`
43+
44+
- `tso_waker_policy/register/old`: `[2.3768 ns 2.3819 ns 2.3874 ns]`
45+
- `tso_waker_policy/register/new`: `[286.76 ps 287.51 ps 288.27 ps]`
46+
47+
Median-based speedups:
48+
- response path: `3.2712 ns / 0.76639 ns = 4.27x`
49+
- registration path: `2.3819 ns / 0.28751 ns = 8.28x`
50+
51+
## Interpretation
52+
53+
The new policy materially reduces per-operation latency in both isolated paths,
54+
with sub-nanosecond median latency for the optimized variants in this synthetic
55+
microbenchmark.
56+
57+
This benchmark is intentionally focused on internal policy overhead. It does not
58+
by itself measure end-to-end PD/TSO RPC latency in a real TiKV deployment.

src/pd/timestamp.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tonic::transport::Channel;
3131
use crate::internal_err;
3232
use crate::proto::pdpb::pd_client::PdClient;
3333
use crate::proto::pdpb::*;
34+
use crate::stats::observe_tso_batch;
3435
use crate::Result;
3536

3637
/// It is an empirical value.
@@ -98,13 +99,17 @@ async fn run_tso(
9899
let mut responses = pd_client.tso(request_stream).await?.into_inner();
99100

100101
while let Some(Ok(resp)) = responses.next().await {
101-
{
102+
let should_wake_sender = {
102103
let mut pending_requests = pending_requests.lock().await;
104+
let was_full = pending_requests.len() >= MAX_PENDING_COUNT;
103105
allocate_timestamps(&resp, &mut pending_requests)?;
104-
}
106+
was_full && pending_requests.len() < MAX_PENDING_COUNT
107+
};
105108

106-
// Wake up the sending future blocked by too many pending requests or locked.
107-
sending_future_waker.wake();
109+
// Only wake sender when a previously full queue gains capacity.
110+
if should_wake_sender {
111+
sending_future_waker.wake();
112+
}
108113
}
109114
// TODO: distinguish between unexpected stream termination and expected end of test
110115
info!("TSO stream terminated");
@@ -137,7 +142,6 @@ impl Stream for TsoRequestStream {
137142
{
138143
pending_requests
139144
} else {
140-
this.self_waker.register(cx.waker());
141145
return Poll::Pending;
142146
};
143147
let mut requests = Vec::new();
@@ -153,6 +157,7 @@ impl Stream for TsoRequestStream {
153157
}
154158

155159
if !requests.is_empty() {
160+
observe_tso_batch(requests.len());
156161
let req = TsoRequest {
157162
header: Some(RequestHeader {
158163
cluster_id: *this.cluster_id,
@@ -170,9 +175,11 @@ impl Stream for TsoRequestStream {
170175

171176
Poll::Ready(Some(req))
172177
} else {
173-
// Set the waker to the context, then the stream can be waked up after the pending queue
174-
// is no longer full.
175-
this.self_waker.register(cx.waker());
178+
// Register self waker only when blocked by a full pending queue.
179+
// When queue is not full, poll_recv above has already registered the receiver waker.
180+
if pending_requests.len() >= MAX_PENDING_COUNT {
181+
this.self_waker.register(cx.waker());
182+
}
176183
Poll::Pending
177184
}
178185
}

0 commit comments

Comments
 (0)