Skip to content

Commit 18af3a4

Browse files
Xerxes-2claude
andauthored
feat(io): add SyncStream::into_parts (#847)
Add methods to safely extract the underlying stream and buffered data from the compat adapter layers, needed for protocol upgrades (e.g. WebSocket) to bypass double-buffering. - `SyncStream::into_parts()`: returns `(S, Vec<u8>)` with unread data Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 54e801c commit 18af3a4

3 files changed

Lines changed: 40 additions & 3 deletions

File tree

compio-io/src/compat/async_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl<S> AsyncStream<S> {
6868
self.inner.get_mut()
6969
}
7070

71-
/// Consumes the `SyncStream`, returning the underlying stream.
71+
/// Consumes the `AsyncStream`, returning the underlying stream.
7272
pub fn into_inner(self) -> S {
7373
self.inner.into_inner()
7474
}

compio-io/src/compat/sync_stream.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,33 @@ impl<S> SyncStream<S> {
102102
}
103103

104104
/// Consumes the `SyncStream`, returning the underlying stream.
105+
///
106+
/// Any buffered data is discarded. Use [`into_parts`](Self::into_parts)
107+
/// if you need to preserve unread data.
105108
pub fn into_inner(self) -> S {
106109
self.inner
107110
}
108111

112+
/// Consumes the `SyncStream`, returning the underlying stream and any
113+
/// unread buffered data.
114+
///
115+
/// If the read buffer is currently lent to an IO operation, the returned
116+
/// `Vec` will be empty.
117+
pub fn into_parts(mut self) -> (S, Vec<u8>) {
118+
let remaining = if self.read_buf.has_inner() {
119+
let slice = self.read_buf.take_inner();
120+
let begin = slice.begin();
121+
let mut vec = slice.into_inner();
122+
if begin > 0 {
123+
vec.drain(..begin);
124+
}
125+
vec
126+
} else {
127+
Vec::new()
128+
};
129+
(self.inner, remaining)
130+
}
131+
109132
/// Returns `true` if the stream has reached EOF.
110133
pub fn is_eof(&self) -> bool {
111134
self.eof

compio-io/tests/compat.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use std::io::Cursor;
1+
use std::io::{Cursor, Read};
22

3-
use compio_io::compat::{AsyncReadStream, AsyncWriteStream};
3+
use compio_io::compat::{AsyncReadStream, AsyncWriteStream, SyncStream};
44
use futures_executor::block_on;
55
use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
66

@@ -87,3 +87,17 @@ fn async_compat_flush_fail() {
8787
assert_eq!(err.kind(), std::io::ErrorKind::WriteZero);
8888
})
8989
}
90+
91+
#[test]
92+
fn sync_stream_into_parts_keeps_unread_buffer() {
93+
let mut stream = SyncStream::new(Cursor::new(b"hello".to_vec()));
94+
let mut buf = [0; 2];
95+
96+
Read::read(&mut stream, &mut buf).unwrap_err();
97+
futures_executor::block_on(stream.fill_read_buf()).unwrap();
98+
assert_eq!(Read::read(&mut stream, &mut buf).unwrap(), 2);
99+
assert_eq!(&buf, b"he");
100+
101+
let (_, remaining) = stream.into_parts();
102+
assert_eq!(remaining, b"llo");
103+
}

0 commit comments

Comments
 (0)