Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## yamux 0.3.15

### Bug Fix
- fix keep alive should work on no communication scenario(#412)

## tentacle 0.7.0 yamux 0.3.14 secio 0.6.6

### Features
Expand Down
2 changes: 1 addition & 1 deletion yamux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-yamux"
version = "0.3.14"
version = "0.3.15"
license = "MIT"
repository = "https://github.com/nervosnetwork/tentacle"
description = "Rust implementation of Yamux"
Expand Down
51 changes: 50 additions & 1 deletion yamux/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,11 @@ where
);
}

let mut keep_alive_wake = false;
if let Some(ref mut interval) = self.keepalive {
match Pin::new(interval).as_mut().poll_next(cx) {
Poll::Ready(Some(_)) => {
keep_alive_wake = true;
if self.local_go_away {
// The remote peer has not responded to our sent go away code.
// Assume that remote peer has gone away and this session should be closed.
Expand Down Expand Up @@ -697,7 +699,7 @@ where
}
}

if need_wake {
if need_wake || keep_alive_wake {
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
cx.waker().wake_by_ref()
Expand Down Expand Up @@ -1021,6 +1023,53 @@ mod test {
}
}

// after TIMEOUT time, will finished
#[test]
fn test_keepalive_should_work_on_no_communication_scenario() {
let rt = rt();

rt.block_on(async {
let (remote, local) = MockSocket::new();
let config = Config {
enable_keepalive: true,
keepalive_interval: Duration::from_millis(100),
connection_write_timeout: Duration::from_secs(1),
..Default::default()
};

let mut session = Session::new_server(local, config);
tokio::spawn(async move {
let mut client = Framed::new(
remote,
FrameCodec::default().max_frame_size(config.max_stream_window_size),
);
loop {
client.next().await;
}
});
loop {
match session.next().await {
Some(Ok(mut stream)) => {
tokio::spawn(async move {
let mut buf = [0; 100];
let _ignore = stream.read(&mut buf).await;
});
}
Some(Err(err)) => {
if err.kind() == io::ErrorKind::TimedOut {
// This is expected, since we are not sending any data
break;
}
}
None => {
// Session closed
unreachable!();
}
}
}
})
}

#[test]
fn test_open_exist_stream() {
let rt = rt();
Expand Down