Skip to content

Commit a745c5e

Browse files
feat(io): make bytes optional (#750)
* feat(io): make bytes optional * fix: impl * fix: minmax * fix: apply suggestion Co-authored-by: Yuyi Wang <Strawberry_Str@hotmail.com> --------- Co-authored-by: Yuyi Wang <Strawberry_Str@hotmail.com>
1 parent 6aa217a commit a745c5e

4 files changed

Lines changed: 66 additions & 25 deletions

File tree

compio-io/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ license = { workspace = true }
1010
repository = { workspace = true }
1111

1212
[dependencies]
13-
compio-buf = { workspace = true, features = ["arrayvec", "bytes"] }
13+
compio-buf = { workspace = true, features = ["arrayvec"] }
1414
futures-util = { workspace = true, features = ["sink"] }
1515
paste = { workspace = true }
1616
pin-project-lite = { workspace = true, optional = true }
@@ -36,10 +36,11 @@ serde = { version = "1.0.219", features = ["derive"] }
3636
futures-executor = "0.3.30"
3737

3838
[features]
39-
default = []
39+
default = ["bytes"]
4040
compat = ["futures-util/io", "dep:pin-project-lite"]
4141
sync = []
4242
ancillary = ["dep:cfg-if", "dep:libc", "dep:windows-sys"]
43+
bytes = ["compio-buf/bytes"]
4344

4445
# Codecs
4546
# Serde json codec

compio-io/src/framed/codec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::io;
55

66
use compio_buf::{IoBuf, IoBufMut, Slice};
77

8+
#[cfg(feature = "bytes")]
89
pub mod bytes;
910

1011
#[cfg(feature = "codec-serde-json")]

compio-io/src/framed/frame.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
33
use std::io;
44

5-
use compio_buf::{
6-
IoBuf, IoBufMut, Slice,
7-
bytes::{Buf, BufMut},
8-
};
5+
use compio_buf::{IoBuf, IoBufMut, Slice};
96

107
/// An extracted frame
118
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -87,6 +84,9 @@ impl Default for LengthDelimited {
8784
}
8885

8986
impl LengthDelimited {
87+
/// Max allowed length of `len` field
88+
const MAX_LFL: usize = 8;
89+
9090
/// Creates a new `LengthDelimited` framer.
9191
pub fn new() -> Self {
9292
Self::default()
@@ -98,7 +98,16 @@ impl LengthDelimited {
9898
}
9999

100100
/// Sets the length of the length field in bytes.
101+
///
102+
/// # Panics
103+
///
104+
/// This will panic if `len_field_len` is too long (8 bytes is the maximum
105+
/// allowed for now).
101106
pub fn set_length_field_len(mut self, len_field_len: usize) -> Self {
107+
assert!(
108+
len_field_len <= Self::MAX_LFL,
109+
"Length field cannot take over 8 bytes"
110+
);
102111
self.length_field_len = len_field_len;
103112
self
104113
}
@@ -124,29 +133,36 @@ impl<B: IoBufMut> Framer<B> for LengthDelimited {
124133
unsafe { buf.advance_to(len + self.length_field_len) };
125134

126135
let slice = buf.as_mut_slice();
136+
let lfl = self.length_field_len;
127137

128138
// Write the length at the beginning
129-
if self.length_field_is_big_endian {
130-
(&mut slice[0..self.length_field_len]).put_uint(len as _, self.length_field_len);
139+
let len = len as u64;
140+
let len_bytes = if self.length_field_is_big_endian {
141+
&len.to_be_bytes()[Self::MAX_LFL - lfl..]
131142
} else {
132-
(&mut slice[0..self.length_field_len]).put_uint_le(len as _, self.length_field_len);
133-
}
143+
&len.to_le_bytes()[..lfl]
144+
};
145+
slice[..lfl].copy_from_slice(len_bytes);
134146
}
135147

136148
fn extract(&mut self, buf: &Slice<B>) -> io::Result<Option<Frame>> {
137149
if buf.len() < self.length_field_len {
138150
return Ok(None);
139151
}
140152

141-
let mut buf = buf.as_init();
153+
let buf = buf.as_init();
154+
let lfl = self.length_field_len;
155+
let mut len_bytes = [0; Self::MAX_LFL];
142156

143157
let len = if self.length_field_is_big_endian {
144-
buf.get_uint(self.length_field_len)
158+
len_bytes[Self::MAX_LFL - lfl..].copy_from_slice(&buf[..lfl]);
159+
u64::from_be_bytes(len_bytes)
145160
} else {
146-
buf.get_uint_le(self.length_field_len)
161+
len_bytes[..lfl].copy_from_slice(&buf[..lfl]);
162+
u64::from_le_bytes(len_bytes)
147163
} as usize;
148164

149-
if buf.len() < len {
165+
if buf.len() < self.length_field_len + len {
150166
return Ok(None);
151167
}
152168

compio-io/src/framed/mod.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@
55
66
use std::marker::PhantomData;
77

8-
use compio_buf::{IoBufMut, bytes::Bytes};
8+
use compio_buf::IoBufMut;
99
use futures_util::FutureExt;
1010

1111
use crate::{
1212
AsyncRead,
13-
framed::{
14-
codec::{Decoder, bytes::BytesCodec},
15-
frame::NoopFramer,
16-
},
13+
framed::{codec::Decoder, frame::NoopFramer},
1714
util::Splittable,
1815
};
1916

@@ -151,17 +148,43 @@ impl<C, F> Framed<(), (), C, F, (), (), ()> {
151148
}
152149
}
153150

154-
/// A type alias for a `Framed` with bytes as the input and output type.
155-
pub type BytesFramed<R, W> = Framed<R, W, BytesCodec, NoopFramer, Bytes, Bytes>;
156-
151+
/// [`Framed`] that bridges [`AsyncRead`]/[`AsyncWrite`] with [`Bytes`].
152+
///
153+
/// This is useful when you want to read/write raw bytes into/from [`Bytes`]
154+
/// without any additional framing or de/encoding.
155+
///
156+
/// See also: [`ReaderStream`] and [`ReaderStream`].
157+
///
158+
/// [`Bytes`]: compio_buf::bytes::Bytes
159+
/// [`AsyncWrite`]: crate::AsyncWrite
160+
/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
161+
/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
162+
#[cfg(feature = "bytes")]
163+
pub type BytesFramed<R, W> = Framed<
164+
R,
165+
W,
166+
codec::bytes::BytesCodec,
167+
NoopFramer,
168+
compio_buf::bytes::Bytes,
169+
compio_buf::bytes::Bytes,
170+
>;
171+
172+
#[cfg(feature = "bytes")]
157173
impl BytesFramed<(), ()> {
158-
/// Creates a new `Framed` with the given I/O object, codec, and framer with
159-
/// bytes as the input and output type.
174+
/// Creates a new [`BytesFramed`] that bridges [`AsyncRead`]/[`AsyncWrite`]
175+
/// with [`Bytes`].
176+
///
177+
/// See also: [`ReaderStream`] and [`StreamReader`].
178+
///
179+
/// [`Bytes`]: compio_buf::bytes::Bytes
180+
/// [`AsyncWrite`]: crate::AsyncWrite
181+
/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
182+
/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
160183
pub fn new_bytes() -> Self {
161184
Framed {
162185
read_state: read::State::empty(),
163186
write_state: write::State::empty(),
164-
codec: BytesCodec::new(),
187+
codec: codec::bytes::BytesCodec::new(),
165188
framer: NoopFramer::new(),
166189
types: PhantomData,
167190
}

0 commit comments

Comments
 (0)