Skip to content

Commit 9ac0b07

Browse files
committed
Add metrics of yamux ping timeout and latency
1 parent 1ccdc37 commit 9ac0b07

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

yamux/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ tokio = { version = "1.0.0" }
1818
tokio-util = { version = "0.7.0", features = ["codec"] }
1919
log = "0.4"
2020
nohash-hasher = "0.2"
21+
metrics = { version = "0.24", optional = true }
2122

2223
futures-timer = { version = "3.0.2", optional = true }
2324

@@ -46,6 +47,7 @@ tokio-timer = ["tokio/time"]
4647
generic-timer = ["futures-timer"]
4748
# use futures-timer's wasm feature
4849
wasm = ["generic-timer", "futures-timer/wasm-bindgen"]
50+
metrics = ["dep:metrics"]
4951

5052

5153
[[bench]]

yamux/src/session.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,16 @@ where
226226
self.remote_go_away && self.local_go_away || self.eof
227227
}
228228

229+
#[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))]
230+
fn now(&self) -> Instant {
231+
Instant::now()
232+
}
233+
234+
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
235+
fn now(&self) -> Instant {
236+
Instant::from_u64(self.time_mock.load(std::sync::atomic::Ordering::Acquire) as u64)
237+
}
238+
229239
fn send_ping(&mut self, cx: &mut Context, ping_id: Option<u32>) -> Result<u32, io::Error> {
230240
let (flag, ping_id) = match ping_id {
231241
Some(ping_id) => (Flag::Ack, ping_id),
@@ -291,6 +301,8 @@ where
291301
.iter()
292302
.any(|(_id, time)| ping_at.saturating_duration_since(*time) > TIMEOUT)
293303
{
304+
#[cfg(feature = "metrics")]
305+
metrics::counter!("yamux.ping_timeout").increment(1);
294306
return Err(io::ErrorKind::TimedOut.into());
295307
}
296308

@@ -509,10 +521,19 @@ where
509521
// Send ping back
510522
self.send_ping(cx, Some(frame.length()))?;
511523
} else if flags.contains(Flag::Ack) {
512-
self.pings.remove(&frame.length());
524+
let ping_id = frame.length();
525+
let sent_ping_at = self.pings.remove(&ping_id);
526+
#[cfg(feature = "metrics")]
527+
if let Some(sent_at) = sent_ping_at {
528+
let now = self.now();
529+
let latency = now.saturating_duration_since(sent_at);
530+
metrics::histogram!("yamux.ping_latency").record(latency.as_millis() as f64);
531+
}
532+
#[cfg(not(feature = "metrics"))]
533+
let _ = sent_ping_at;
513534
// If the remote peer does not follow the protocol,
514535
// there may be a memory leak, so here need to discard all ping ids below the ack.
515-
self.pings = self.pings.split_off(&frame.length());
536+
self.pings = self.pings.split_off(&ping_id);
516537
} else {
517538
// TODO: unexpected case, send a GoAwayCode::ProtocolError ?
518539
}
@@ -652,12 +673,7 @@ where
652673
// Assume that remote peer has gone away and this session should be closed.
653674
self.remote_go_away = true;
654675
} else {
655-
#[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))]
656-
let now = Instant::now();
657-
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
658-
let now = Instant::from_u64(
659-
self.time_mock.load(std::sync::atomic::Ordering::Acquire) as u64,
660-
);
676+
let now = self.now();
661677
self.keep_alive(cx, now)?;
662678
}
663679
}

0 commit comments

Comments
 (0)