diff --git a/src/transport/tcp/substream.rs b/src/transport/tcp/substream.rs index 0ce8a779..3a8c5410 100644 --- a/src/transport/tcp/substream.rs +++ b/src/transport/tcp/substream.rs @@ -66,10 +66,12 @@ impl AsyncRead for Substream { cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { + let len = buf.filled().len(); match futures::ready!(Pin::new(&mut self.io).poll_read(cx, buf)) { Err(error) => Poll::Ready(Err(error)), Ok(res) => { - self.bandwidth_sink.increase_inbound(buf.filled().len()); + let inbound_size = buf.filled().len().saturating_sub(len); + self.bandwidth_sink.increase_inbound(inbound_size); Poll::Ready(Ok(res)) } } diff --git a/src/transport/websocket/substream.rs b/src/transport/websocket/substream.rs index 427b8c87..8a2db5ea 100644 --- a/src/transport/websocket/substream.rs +++ b/src/transport/websocket/substream.rs @@ -64,10 +64,12 @@ impl AsyncRead for Substream { cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { + let len = buf.filled().len(); match futures::ready!(Pin::new(&mut self.io).poll_read(cx, buf)) { Err(error) => Poll::Ready(Err(error)), Ok(res) => { - self.bandwidth_sink.increase_inbound(buf.filled().len()); + let inbound_size = buf.filled().len().saturating_sub(len); + self.bandwidth_sink.increase_inbound(inbound_size); Poll::Ready(Ok(res)) } }