Skip to content

Commit 7d849a9

Browse files
authored
Merge pull request #412 from nervosnetwork/fix-keep-alive-on-yamux
fix: fix keep alive should work on no communication scenario
2 parents 1d26505 + 3c42cc9 commit 7d849a9

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## yamux 0.3.15
2+
3+
### Bug Fix
4+
- fix keep alive should work on no communication scenario(#412)
5+
16
## tentacle 0.7.0 yamux 0.3.14 secio 0.6.6
27

38
### Features

yamux/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tokio-yamux"
3-
version = "0.3.14"
3+
version = "0.3.15"
44
license = "MIT"
55
repository = "https://github.com/nervosnetwork/tentacle"
66
description = "Rust implementation of Yamux"

yamux/src/session.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,9 +642,11 @@ where
642642
);
643643
}
644644

645+
let mut keep_alive_wake = false;
645646
if let Some(ref mut interval) = self.keepalive {
646647
match Pin::new(interval).as_mut().poll_next(cx) {
647648
Poll::Ready(Some(_)) => {
649+
keep_alive_wake = true;
648650
if self.local_go_away {
649651
// The remote peer has not responded to our sent go away code.
650652
// Assume that remote peer has gone away and this session should be closed.
@@ -697,7 +699,7 @@ where
697699
}
698700
}
699701

700-
if need_wake {
702+
if need_wake || keep_alive_wake {
701703
// To ensure we do not starve other tasks waiting on the executor,
702704
// we yield here, but immediately wake ourselves up to continue.
703705
cx.waker().wake_by_ref()
@@ -1021,6 +1023,53 @@ mod test {
10211023
}
10221024
}
10231025

1026+
// after TIMEOUT time, will finished
1027+
#[test]
1028+
fn test_keepalive_should_work_on_no_communication_scenario() {
1029+
let rt = rt();
1030+
1031+
rt.block_on(async {
1032+
let (remote, local) = MockSocket::new();
1033+
let config = Config {
1034+
enable_keepalive: true,
1035+
keepalive_interval: Duration::from_millis(100),
1036+
connection_write_timeout: Duration::from_secs(1),
1037+
..Default::default()
1038+
};
1039+
1040+
let mut session = Session::new_server(local, config);
1041+
tokio::spawn(async move {
1042+
let mut client = Framed::new(
1043+
remote,
1044+
FrameCodec::default().max_frame_size(config.max_stream_window_size),
1045+
);
1046+
loop {
1047+
client.next().await;
1048+
}
1049+
});
1050+
loop {
1051+
match session.next().await {
1052+
Some(Ok(mut stream)) => {
1053+
tokio::spawn(async move {
1054+
let mut buf = [0; 100];
1055+
let _ignore = stream.read(&mut buf).await;
1056+
});
1057+
}
1058+
Some(Err(err)) => {
1059+
if err.kind() == io::ErrorKind::TimedOut {
1060+
// This is expected, since we are not sending any data
1061+
break;
1062+
}
1063+
}
1064+
None => {
1065+
// Session closed
1066+
unreachable!();
1067+
}
1068+
}
1069+
}
1070+
})
1071+
}
1072+
10241073
#[test]
10251074
fn test_open_exist_stream() {
10261075
let rt = rt();

0 commit comments

Comments
 (0)