Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions compio-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde_json = { version = "1.0.140", optional = true }
[dev-dependencies]
compio-runtime = { workspace = true }
compio-macros = { workspace = true }
compio-driver = { workspace = true, features = ["polling"] }
tokio = { workspace = true, features = ["macros", "rt"] }
serde = { version = "1.0.219", features = ["derive"] }
futures-executor = "0.3.30"
Expand All @@ -38,3 +39,11 @@ codec-serde-json = ["dep:serde", "dep:serde_json", "dep:thiserror"]
allocator_api = ["compio-buf/allocator_api"]
read_buf = ["compio-buf/read_buf"]
nightly = ["allocator_api", "read_buf"]

[[test]]
name = "compat"
required-features = ["compat"]

[[test]]
name = "framed"
required-features = ["codec-serde-json"]
99 changes: 0 additions & 99 deletions compio-io/src/framed/codec/serde_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,102 +112,3 @@ impl<T: DeserializeOwned> Decoder<T> for SerdeJsonCodec {
serde_json::from_slice(buf).map_err(SerdeJsonCodecError::SerdeJsonError)
}
}

#[cfg(test)]
mod test {
use std::{
io::{self, Cursor},
rc::Rc,
};

use compio_buf::{BufResult, IoBuf, IoBufMut};
use futures_util::{SinkExt, StreamExt, lock::Mutex};
use serde::{Deserialize, Serialize};

use crate::{
AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt,
framed::{Framed, codec::serde_json::SerdeJsonCodec, frame::LengthDelimited},
};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct Test {
foo: String,
bar: usize,
}

struct InMemoryPipe(Cursor<Rc<Mutex<Vec<u8>>>>);

impl AsyncRead for InMemoryPipe {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
let BufResult(res, buf) = self
.0
.get_ref()
.lock()
.await
.read_at(buf, self.0.position())
.await;
match res {
Ok(len) => {
self.0.set_position(self.0.position() + len as u64);
BufResult(Ok(len), buf)
}
Err(_) => BufResult(res, buf),
}
}
}

impl AsyncWrite for InMemoryPipe {
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
let BufResult(res, buf) = self
.0
.get_ref()
.lock()
.await
.write_at(buf, self.0.position())
.await;
match res {
Ok(len) => {
self.0.set_position(self.0.position() + len as u64);
BufResult(Ok(len), buf)
}
Err(_) => BufResult(res, buf),
}
}

async fn flush(&mut self) -> io::Result<()> {
self.0.get_ref().lock().await.flush().await
}

async fn shutdown(&mut self) -> io::Result<()> {
self.0.get_ref().lock().await.shutdown().await
}
}

#[compio_macros::test]
async fn test_framed() {
let codec = SerdeJsonCodec::new();
let framer = LengthDelimited::new();
let buf = Rc::new(Mutex::new(vec![]));
let r = InMemoryPipe(Cursor::new(buf.clone()));
let w = InMemoryPipe(Cursor::new(buf));
let mut framed = Framed::symmetric::<Test>(codec, framer)
.with_reader(r)
.with_writer(w);

let origin = Test {
foo: "hello, world!".to_owned(),
bar: 114514,
};
framed.send(origin.clone()).await.unwrap();
framed.send(origin.clone()).await.unwrap();

let des = framed.next().await.unwrap().unwrap();
println!("{des:?}");

assert_eq!(origin, des);
let des = framed.next().await.unwrap().unwrap();
println!("{des:?}");

assert_eq!(origin, des);
}
}
31 changes: 20 additions & 11 deletions compio-io/src/framed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {
/// Change the reader of the `Framed` object.
pub fn with_reader<Io>(self, reader: Io) -> Framed<Io, W, C, F, In, Out> {
Framed {
read_state: read::State::Idle(Some((reader, Buffer::with_capacity(64)))),
read_state: read::State::new(reader, Buffer::with_capacity(64)),
write_state: self.write_state,
codec: self.codec,
framer: self.framer,
Expand All @@ -48,7 +48,7 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {
pub fn with_writer<Io>(self, writer: Io) -> Framed<R, Io, C, F, In, Out> {
Framed {
read_state: self.read_state,
write_state: write::State::Idle(Some((writer, Vec::new()))),
write_state: write::State::new(writer, Vec::new()),
codec: self.codec,
framer: self.framer,
types: PhantomData,
Expand All @@ -57,18 +57,27 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {

/// Change the codec of the `Framed` object.
///
/// This is useful when you have a duplex I/O type, e.g., a `TcpStream` or
/// `File`, and you want [`Framed`] to implement both
/// [`Sink`](futures_util::Sink) and [`Stream`](futures_util::Stream).
/// This is useful when you have a duplex I/O type, e.g., a
/// `compio::net::TcpStream` or `compio::fs::File`, and you want
/// [`Framed`] to implement both [`Sink`](futures_util::Sink) and
/// [`Stream`](futures_util::Stream).
///
/// Some types like the ones mentioned above are multiplexed by nature, so
/// they implement the [`Splittable`] trait by themselves. For other types,
/// you may want to wrap them in [`Split`] or [`UnsyncSplit`] first, which
/// uses lock or `RefCell` under the hood.
///
/// [`Split`]: crate::util::split::Split
/// [`UnsyncSplit`]: crate::util::split::UnsyncSplit
pub fn with_duplex<Io: Splittable>(
self,
io: Io,
) -> Framed<Io::ReadHalf, Io::WriteHalf, C, F, In, Out> {
let (read_half, write_half) = io.split();

Framed {
read_state: read::State::Idle(Some((read_half, Buffer::with_capacity(64)))),
write_state: write::State::Idle(Some((write_half, Vec::new()))),
read_state: read::State::new(read_half, Buffer::with_capacity(64)),
write_state: write::State::new(write_half, Vec::new()),
codec: self.codec,
framer: self.framer,
types: PhantomData,
Expand All @@ -81,8 +90,8 @@ impl<C, F> Framed<(), (), C, F, (), ()> {
/// different input and output type.
pub fn new<In, Out>(codec: C, framer: F) -> Framed<(), (), C, F, In, Out> {
Framed {
read_state: read::State::Idle(None),
write_state: write::State::Idle(None),
read_state: read::State::empty(),
write_state: write::State::empty(),
codec,
framer,
types: PhantomData,
Expand All @@ -93,8 +102,8 @@ impl<C, F> Framed<(), (), C, F, (), ()> {
/// the same input and output type.
pub fn symmetric<T>(codec: C, framer: F) -> Framed<(), (), C, F, T, T> {
Framed {
read_state: read::State::Idle(None),
write_state: write::State::Idle(None),
read_state: read::State::empty(),
write_state: write::State::empty(),
codec,
framer,
types: PhantomData,
Expand Down
53 changes: 41 additions & 12 deletions compio-io/src/framed/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,32 @@ use compio_buf::BufResult;
use futures_util::Stream;

use super::*;
use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer};
use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};

type ReadResult = BufResult<usize, Buffer>;

pub enum State<Io> {
pub struct State<Io> {
inner: StateInner<Io>,
eof: bool,
}

impl<Io> State<Io> {
pub fn new(io: Io, buf: Buffer) -> Self {
State {
inner: StateInner::Idle(Some((io, buf))),
eof: false,
}
}

pub fn empty() -> Self {
State {
inner: StateInner::Idle(None),
eof: false,
}
}
}

enum StateInner<Io> {
Idle(Option<(Io, Buffer)>),
Reading(PinBoxFuture<(Io, ReadResult)>),
}
Expand All @@ -20,7 +41,7 @@ impl<R, W, C, F, In, Out> Stream for Framed<R, W, C, F, In, Out>
where
R: AsyncRead + 'static,
C: Decoder<Out>,
F: frame::Framer,
F: Framer,
Self: Unpin,
{
type Item = Result<Out, C::Error>;
Expand All @@ -29,20 +50,21 @@ where
let this = self.get_mut();

loop {
match &mut this.read_state {
State::Idle(idle) => {
match &mut this.read_state.inner {
StateInner::Idle(idle) => {
let (mut io, mut buf) = idle.take().expect("Inconsistent state");
let slice = buf.slice();

// First try decode from the buffer
if let Some(frame) = this.framer.extract(buf.slice()) {
let decoded = this.codec.decode(frame.payload(buf.slice()))?;
if let Some(frame) = this.framer.extract(slice) {
let decoded = this.codec.decode(frame.payload(slice))?;
buf.advance(frame.len());

if buf.all_done() {
buf.reset();
}

this.read_state = State::Idle(Some((io, buf)));
this.read_state.inner = StateInner::Idle(Some((io, buf)));

return Poll::Ready(Some(Ok(decoded)));
}
Expand All @@ -54,12 +76,19 @@ where
(io, BufResult(res, buf))
});

this.read_state = State::Reading(fut)
this.read_state.inner = StateInner::Reading(fut)
}
State::Reading(fut) => {
StateInner::Reading(fut) => {
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
this.read_state = State::Idle(Some((io, buf)));
res?;
this.read_state.inner = StateInner::Idle(Some((io, buf)));
if res? == 0 {
// It's the second time EOF is reached, return None
if this.read_state.eof {
return Poll::Ready(None);
}

this.read_state.eof = true;
}
}
};
}
Expand Down
49 changes: 29 additions & 20 deletions compio-io/src/framed/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,46 @@ pub enum State<Io> {
}

impl<Io> State<Io> {
pub fn new(io: Io, buf: Vec<u8>) -> Self {
State::Idle(Some((io, buf)))
}

pub fn empty() -> Self {
State::Idle(None)
}

fn take_idle(&mut self) -> (Io, Vec<u8>) {
match self {
State::Idle(idle) => idle.take().expect("Inconsistent state"),
_ => unreachable!("`Framed` not in idle state"),
}
}

pub fn buf(&mut self) -> Option<&mut Vec<u8>> {
fn buf(&mut self) -> Option<&mut Vec<u8>> {
match self {
State::Idle(Some((_, buf))) => Some(buf),
_ => None,
}
}

pub fn start_flush(&mut self)
fn poll_sink(&mut self, cx: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
let (io, res, buf) = match self {
State::Writing(fut) => {
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
(io, res, buf)
}
State::Closing(fut) | State::Flushing(fut) => ready!(fut.poll_unpin(cx)),
State::Idle(_) => {
return Poll::Ready(Ok(()));
}
};
*self = State::Idle(Some((io, buf)));
Poll::Ready(res)
}
}

impl<Io: AsyncWrite + 'static> State<Io> {
fn start_flush(&mut self)
where
Io: AsyncWrite + 'static,
Comment thread
George-Miao marked this conversation as resolved.
Outdated
{
Expand All @@ -45,7 +70,7 @@ impl<Io> State<Io> {
*self = State::Flushing(fut);
}

pub fn start_close(&mut self)
fn start_close(&mut self)
where
Io: AsyncWrite + 'static,
Comment thread
George-Miao marked this conversation as resolved.
Outdated
{
Expand All @@ -57,7 +82,7 @@ impl<Io> State<Io> {
*self = State::Closing(fut);
}

pub fn start_write(&mut self)
fn start_write(&mut self)
where
Io: AsyncWrite + 'static,
Comment thread
George-Miao marked this conversation as resolved.
Outdated
{
Expand All @@ -68,22 +93,6 @@ impl<Io> State<Io> {
});
*self = State::Writing(fut);
}

/// State that may occur when `Framed` is acting as a [`Sink`].
pub fn poll_sink(&mut self, cx: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
let (io, res, buf) = match self {
State::Writing(fut) => {
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
(io, res, buf)
}
State::Closing(fut) | State::Flushing(fut) => ready!(fut.poll_unpin(cx)),
State::Idle(_) => {
return Poll::Ready(Ok(()));
}
};
*self = State::Idle(Some((io, buf)));
Poll::Ready(res)
}
}

impl<R, W, C, F, In, Out> Sink<In> for Framed<R, W, C, F, In, Out>
Expand Down
Loading
Loading