@@ -45,6 +45,9 @@ pub struct StreamHandle {
4545
4646 // when the cache is sent, a writable notification is issued
4747 writeable_wake : Option < Waker > ,
48+
49+ // when the cache is received by write, a readable notification is issued
50+ readable_wake : Option < Waker > ,
4851}
4952
5053impl StreamHandle {
@@ -67,6 +70,7 @@ impl StreamHandle {
6770 unbound_event_sender,
6871 frame_receiver,
6972 writeable_wake : None ,
73+ readable_wake : None ,
7074 }
7175 }
7276
@@ -253,15 +257,15 @@ impl StreamHandle {
253257
254258 let ( _, body) = frame. into_parts ( ) ;
255259 if let Some ( data) = body {
256- // only when buf is empty, poll read can read from remote
257- self . read_buf = data;
260+ self . read_buf . extend_from_slice ( & data) ;
258261 }
259262 self . recv_window -= length;
260263 Ok ( ( ) )
261264 }
262265
263- fn recv_frames ( & mut self , cx : & mut Context ) -> Result < ( ) , Error > {
266+ fn recv_frames ( & mut self , cx : & mut Context , mut should_wake_reader : bool ) -> Result < ( ) , Error > {
264267 trace ! ( "stream-handle({}) recv_frames" , self . id) ;
268+ let buf_len = self . read_buf . len ( ) ;
265269 loop {
266270 match self . state {
267271 StreamState :: RemoteClosing => {
@@ -273,32 +277,29 @@ impl StreamHandle {
273277 _ => { }
274278 }
275279
276- // After get data, break here
277- // if not, it will never wake upstream here have some cache buffer
278- // buffer will left here, waiting for the next session wake
279- // this will cause the message to be delayed, unable to read, etc.
280- if !self . read_buf . is_empty ( ) {
281- trace ! (
282- "stream-handle({}) recv_frames break since buf is not empty" ,
283- self . id
284- ) ;
285- break ;
286- }
287-
288280 if self . frame_receiver . is_terminated ( ) {
289281 self . state = StreamState :: RemoteClosing ;
290- return Err ( Error :: SessionShutdown ) ;
282+ return Err ( Error :: SubStreamRemoteClosing ) ;
291283 }
292284
293285 match Pin :: new ( & mut self . frame_receiver ) . as_mut ( ) . poll_next ( cx) {
294- Poll :: Ready ( Some ( frame) ) => self . handle_frame ( frame) ?,
286+ Poll :: Ready ( Some ( frame) ) => {
287+ self . handle_frame ( frame) ?;
288+ should_wake_reader &= true ;
289+ }
295290 Poll :: Ready ( None ) => {
296291 self . state = StreamState :: RemoteClosing ;
297- return Err ( Error :: SessionShutdown ) ;
292+ return Err ( Error :: SubStreamRemoteClosing ) ;
298293 }
299294 Poll :: Pending => break ,
300295 }
301296 }
297+ // poll by write and read something, then wake read
298+ if should_wake_reader && self . read_buf . len ( ) != buf_len {
299+ if let Some ( waker) = self . readable_wake . take ( ) {
300+ waker. wake ( ) ;
301+ }
302+ }
302303 Ok ( ( ) )
303304 }
304305
@@ -335,7 +336,7 @@ impl StreamHandle {
335336 }
336337
337338 if let Err ( Error :: UnexpectedFlag | Error :: RecvWindowExceeded | Error :: InvalidMsgType ) =
338- self . recv_frames ( cx)
339+ self . recv_frames ( cx, false )
339340 {
340341 // read flag error or read data error
341342 self . send_go_away ( ) ;
@@ -355,6 +356,7 @@ impl StreamHandle {
355356 n,
356357 ) ;
357358 if n == 0 {
359+ self . readable_wake = Some ( cx. waker ( ) . clone ( ) ) ;
358360 return Poll :: Pending ;
359361 }
360362
@@ -383,7 +385,7 @@ impl AsyncRead for StreamHandle {
383385 }
384386
385387 if let Err ( Error :: UnexpectedFlag | Error :: RecvWindowExceeded | Error :: InvalidMsgType ) =
386- self . recv_frames ( cx)
388+ self . recv_frames ( cx, false )
387389 {
388390 // read flag error or read data error
389391 self . send_go_away ( ) ;
@@ -429,6 +431,27 @@ impl AsyncWrite for StreamHandle {
429431 cx : & mut Context ,
430432 buf : & [ u8 ] ,
431433 ) -> Poll < io:: Result < usize > > {
434+ // https://github.com/driftluo/tentacle/issues/33
435+ // read frame from session is necessary.
436+ // The window update message of Yamux must be updated normally.
437+ // If the user only writes but does not read, the entire stream will be stuck.
438+ // To avoid this, read operations are required when there is a frame in the session.
439+ //
440+ // Another solution to avoid this problem is to let the session and stream share the state.
441+ // In the rust implementation, at least the following three states are required:
442+ // 1. writeable_wake
443+ // 2. send_window
444+ // 3. state
445+ //
446+ // When the session receives a window update frame, it can update the state of the stream.
447+ // In the implementation here, we try not to share state between the session and the stream.
448+ if let Err ( Error :: UnexpectedFlag | Error :: RecvWindowExceeded | Error :: InvalidMsgType ) =
449+ self . recv_frames ( cx, true )
450+ {
451+ // read flag error or read data error
452+ self . send_go_away ( ) ;
453+ }
454+
432455 match self . state {
433456 StreamState :: Reset => return Poll :: Ready ( Err ( io:: ErrorKind :: BrokenPipe . into ( ) ) ) ,
434457 StreamState :: LocalClosing | StreamState :: Closed => {
0 commit comments