Skip to content

Commit cc2e5ca

Browse files
committed
fix: fix stream only write will stuck forever
1 parent 9cec5ed commit cc2e5ca

File tree

2 files changed

+107
-20
lines changed

2 files changed

+107
-20
lines changed

yamux/src/session.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,4 +1268,67 @@ mod test {
12681268
assert_eq!(vec![1; 1024 * 1024], buf)
12691269
})
12701270
}
1271+
1272+
#[test]
1273+
fn test_only_write_on_stream() {
1274+
let rt = rt();
1275+
rt.block_on(async {
1276+
let (remote, local) = MockSocket::new();
1277+
1278+
let config = Config::default();
1279+
1280+
let mut session = Session::new_server(local, config);
1281+
1282+
tokio::spawn(async move {
1283+
while let Some(Ok(mut stream)) = session.next().await {
1284+
tokio::spawn(async move {
1285+
let _ignore = stream.read_exact(&mut [0]).await;
1286+
assert!(stream.send_window() == 1024 * 1024);
1287+
assert!(stream.recv_window() == 256 * 1024 - 1);
1288+
let buf = vec![1; 2 * 1024 * 1024];
1289+
// https://github.com/driftluo/tentacle/issues/33
1290+
// it will stuck here forever, because the stream is only for write and can't read the window update frame
1291+
let _ignore = stream.write_all(&buf).await;
1292+
});
1293+
}
1294+
});
1295+
1296+
let config = Config {
1297+
max_stream_window_size: 1024 * 1024,
1298+
..Default::default()
1299+
};
1300+
1301+
let mut client = Session::new_client(remote, config);
1302+
1303+
let mut control = client.control();
1304+
1305+
let mut stream = client.open_stream().unwrap();
1306+
1307+
tokio::spawn(async move {
1308+
loop {
1309+
match client.next().await {
1310+
Some(Ok(_)) => (),
1311+
Some(Err(_)) => {
1312+
break;
1313+
}
1314+
None => {
1315+
break;
1316+
}
1317+
}
1318+
}
1319+
});
1320+
1321+
let _ignore = stream.write_all(&[1]).await;
1322+
assert!(stream.send_window() == 256 * 1024 - 1);
1323+
assert!(stream.recv_window() == 1024 * 1024);
1324+
let mut buf = vec![0; 2 * 1024 * 1024];
1325+
let _ignore = stream.read_exact(&mut buf).await;
1326+
1327+
tokio::spawn(async move {
1328+
control.close().await;
1329+
});
1330+
1331+
assert_eq!(vec![1; 2 * 1024 * 1024], buf)
1332+
})
1333+
}
12711334
}

yamux/src/stream.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub struct StreamHandle {
4545

4646
// when the cache is sent, a writable notification is issued
4747
writeable_wake: Option<Waker>,
48+
49+
// when the cache is received by write, a readable notification is issued
50+
readable_wake: Option<Waker>,
4851
}
4952

5053
impl StreamHandle {
@@ -67,6 +70,7 @@ impl StreamHandle {
6770
unbound_event_sender,
6871
frame_receiver,
6972
writeable_wake: None,
73+
readable_wake: None,
7074
}
7175
}
7276

@@ -253,15 +257,15 @@ impl StreamHandle {
253257

254258
let (_, body) = frame.into_parts();
255259
if let Some(data) = body {
256-
// only when buf is empty, poll read can read from remote
257-
self.read_buf = data;
260+
self.read_buf.extend_from_slice(&data);
258261
}
259262
self.recv_window -= length;
260263
Ok(())
261264
}
262265

263-
fn recv_frames(&mut self, cx: &mut Context) -> Result<(), Error> {
266+
fn recv_frames(&mut self, cx: &mut Context, mut should_wake_reader: bool) -> Result<(), Error> {
264267
trace!("stream-handle({}) recv_frames", self.id);
268+
let buf_len = self.read_buf.len();
265269
loop {
266270
match self.state {
267271
StreamState::RemoteClosing => {
@@ -273,32 +277,29 @@ impl StreamHandle {
273277
_ => {}
274278
}
275279

276-
// After get data, break here
277-
// if not, it will never wake upstream here have some cache buffer
278-
// buffer will left here, waiting for the next session wake
279-
// this will cause the message to be delayed, unable to read, etc.
280-
if !self.read_buf.is_empty() {
281-
trace!(
282-
"stream-handle({}) recv_frames break since buf is not empty",
283-
self.id
284-
);
285-
break;
286-
}
287-
288280
if self.frame_receiver.is_terminated() {
289281
self.state = StreamState::RemoteClosing;
290-
return Err(Error::SessionShutdown);
282+
return Err(Error::SubStreamRemoteClosing);
291283
}
292284

293285
match Pin::new(&mut self.frame_receiver).as_mut().poll_next(cx) {
294-
Poll::Ready(Some(frame)) => self.handle_frame(frame)?,
286+
Poll::Ready(Some(frame)) => {
287+
self.handle_frame(frame)?;
288+
should_wake_reader &= true;
289+
}
295290
Poll::Ready(None) => {
296291
self.state = StreamState::RemoteClosing;
297-
return Err(Error::SessionShutdown);
292+
return Err(Error::SubStreamRemoteClosing);
298293
}
299294
Poll::Pending => break,
300295
}
301296
}
297+
// poll by write and read something, then wake read
298+
if should_wake_reader && self.read_buf.len() != buf_len {
299+
if let Some(waker) = self.readable_wake.take() {
300+
waker.wake();
301+
}
302+
}
302303
Ok(())
303304
}
304305

@@ -335,7 +336,7 @@ impl StreamHandle {
335336
}
336337

337338
if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
338-
self.recv_frames(cx)
339+
self.recv_frames(cx, false)
339340
{
340341
// read flag error or read data error
341342
self.send_go_away();
@@ -355,6 +356,7 @@ impl StreamHandle {
355356
n,
356357
);
357358
if n == 0 {
359+
self.readable_wake = Some(cx.waker().clone());
358360
return Poll::Pending;
359361
}
360362

@@ -383,7 +385,7 @@ impl AsyncRead for StreamHandle {
383385
}
384386

385387
if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
386-
self.recv_frames(cx)
388+
self.recv_frames(cx, false)
387389
{
388390
// read flag error or read data error
389391
self.send_go_away();
@@ -403,6 +405,7 @@ impl AsyncRead for StreamHandle {
403405
n,
404406
);
405407
if n == 0 {
408+
self.readable_wake = Some(cx.waker().clone());
406409
return Poll::Pending;
407410
}
408411
let b = self.read_buf.split_to(n);
@@ -429,6 +432,27 @@ impl AsyncWrite for StreamHandle {
429432
cx: &mut Context,
430433
buf: &[u8],
431434
) -> Poll<io::Result<usize>> {
435+
// https://github.com/driftluo/tentacle/issues/33
436+
// read frame from session is necessary.
437+
// The window update message of Yamux must be updated normally.
438+
// If the user only writes but does not read, the entire stream will be stuck.
439+
// To avoid this, read operations are required when there is a frame in the session.
440+
//
441+
// Another solution to avoid this problem is to let the session and stream share the state.
442+
// In the rust implementation, at least the following three states are required:
443+
// 1. writeable_wake
444+
// 2. send_window
445+
// 3. state
446+
//
447+
// When the session receives a window update frame, it can update the state of the stream.
448+
// In the implementation here, we try not to share state between the session and the stream.
449+
if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
450+
self.recv_frames(cx, true)
451+
{
452+
// read flag error or read data error
453+
self.send_go_away();
454+
}
455+
432456
match self.state {
433457
StreamState::Reset => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
434458
StreamState::LocalClosing | StreamState::Closed => {

0 commit comments

Comments
 (0)