Skip to content

Commit aa15f2f

Browse files
committed
Minor updates to peek
- Ensure both peek + poll_peek have docs, and those docs match the tokio ones. - Add a trace point for peek
1 parent 0aa4600 commit aa15f2f

File tree

2 files changed

+31
-14
lines changed

2 files changed

+31
-14
lines changed

src/net/tcp/split_owned.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ impl OwnedReadHalf {
3737
reunite(self, other)
3838
}
3939

40-
/// Attempt to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
40+
/// Attempts to receive data on the socket, without removing that data from
41+
/// the queue, registering the current task for wakeup if data is not yet
42+
/// available.
4143
pub fn poll_peek(
4244
mut self: Pin<&mut Self>,
4345
cx: &mut Context<'_>,
@@ -46,6 +48,11 @@ impl OwnedReadHalf {
4648
Pin::new(&mut self.inner).poll_peek(cx, buf)
4749
}
4850

51+
/// Receives data on the socket from the remote address to which it is
52+
/// connected, without removing that data from the queue. On success,
53+
/// returns the number of bytes peeked.
54+
///
55+
/// Successive calls return the same data.
4956
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
5057
self.inner.peek(buf).await
5158
}

src/net/tcp/stream.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,18 @@ impl TcpStream {
166166
Ok(())
167167
}
168168

169-
/// Receives data on the socket from the remote address to which it is connected,
170-
/// without removing that data from the queue. On success, returns the number of bytes peeked.
169+
/// Receives data on the socket from the remote address to which it is
170+
/// connected, without removing that data from the queue. On success,
171+
/// returns the number of bytes peeked.
172+
///
173+
/// Successive calls return the same data.
171174
pub async fn peek(&mut self, buf: &mut [u8]) -> Result<usize> {
172175
self.read_half.peek(buf).await
173176
}
174177

178+
/// Attempts to receive data on the socket, without removing that data from
179+
/// the queue, registering the current task for wakeup if data is not yet
180+
/// available.
175181
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf) -> Poll<Result<usize>> {
176182
self.read_half.poll_peek(cx, buf)
177183
}
@@ -262,19 +268,23 @@ impl ReadHalf {
262268
}
263269

264270
match ready!(self.rx.recv.poll_recv(cx)) {
265-
Some(seg) => match seg {
266-
SequencedSegment::Data(bytes) => {
267-
let len = std::cmp::min(bytes.len(), buf.remaining());
268-
buf.put_slice(&bytes[..len]);
269-
self.rx.buffer = Some(bytes);
271+
Some(seg) => {
272+
tracing::trace!(target: TRACING_TARGET, src = ?self.pair.remote, dst = ?self.pair.local, protocol = %seg, "Peek");
270273

271-
Poll::Ready(Ok(len))
272-
}
273-
SequencedSegment::Fin => {
274-
self.is_closed = true;
275-
Poll::Ready(Ok(0))
274+
match seg {
275+
SequencedSegment::Data(bytes) => {
276+
let len = std::cmp::min(bytes.len(), buf.remaining());
277+
buf.put_slice(&bytes[..len]);
278+
self.rx.buffer = Some(bytes);
279+
280+
Poll::Ready(Ok(len))
281+
}
282+
SequencedSegment::Fin => {
283+
self.is_closed = true;
284+
Poll::Ready(Ok(0))
285+
}
276286
}
277-
},
287+
}
278288
None => Poll::Ready(Err(io::Error::new(
279289
io::ErrorKind::ConnectionReset,
280290
"Connection reset",

0 commit comments

Comments
 (0)