Skip to content

Commit f52b92b

Browse files
committed
Add better tests, fix them, bump version
1 parent c6f4336 commit f52b92b

File tree

3 files changed

+456
-23
lines changed

3 files changed

+456
-23
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "async-io-typed"
3-
version = "1.0.2"
3+
version = "1.0.3"
44
edition = "2021"
55
license = "MIT OR Apache-2.0"
66
description = "Adapts any AsyncRead or AsyncWrite type to send serde compatible types"
@@ -21,4 +21,6 @@ thiserror = "1.0.37"
2121

2222
[dev-dependencies]
2323
rand = "0.8"
24-
tokio = { version = "1.22.0", features = ["rt-multi-thread"]}
24+
tokio = { version = "1.22.0", features = ["rt-multi-thread", "sync", "macros", "time"]}
25+
tokio-util = "0.7"
26+
futures-util = { version = "0.3", features = ["io"] }

src/lib.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ use futures_io::{AsyncRead, AsyncWrite};
2727
use futures_util::{stream::Stream, Sink, SinkExt};
2828
use serde::{de::DeserializeOwned, Serialize};
2929

30+
#[cfg(test)]
31+
mod tests;
32+
3033
const U16_MARKER: u8 = 252;
3134
const U32_MARKER: u8 = 253;
3235
const U64_MARKER: u8 = 254;
@@ -313,24 +316,22 @@ impl<R: AsyncRead + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncReadTyp
313316
let mut buf = [0; 8];
314317
let accumulated = *len_in_progress_assigned as usize;
315318
let slice = match len_read_mode {
316-
LenReadMode::U16 => &mut buf[0..(2 - accumulated)],
317-
LenReadMode::U32 => &mut buf[0..(4 - accumulated)],
318-
LenReadMode::U64 => &mut buf[0..(8 - accumulated)],
319+
LenReadMode::U16 => &mut buf[accumulated..2],
320+
LenReadMode::U32 => &mut buf[accumulated..4],
321+
LenReadMode::U64 => &mut buf[accumulated..8],
319322
};
320323
let len = futures_core::ready!(Pin::new(&mut raw).poll_read(cx, slice))?;
321-
len_in_progress[accumulated..(accumulated + slice.len())]
324+
len_in_progress[accumulated..(accumulated + len)]
322325
.copy_from_slice(&slice[..len]);
323326
*len_in_progress_assigned += len as u8;
324327
if len == slice.len() {
325328
let new_len = match len_read_mode {
326329
LenReadMode::U16 => u16::from_le_bytes(
327330
(&len_in_progress[0..2]).try_into().expect("infallible"),
328-
)
329-
as u64,
331+
) as u64,
330332
LenReadMode::U32 => u32::from_le_bytes(
331333
(&len_in_progress[0..4]).try_into().expect("infallible"),
332-
)
333-
as u64,
334+
) as u64,
334335
LenReadMode::U64 => u64::from_le_bytes(*len_in_progress),
335336
};
336337
if new_len > size_limit {
@@ -344,7 +345,9 @@ impl<R: AsyncRead + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncReadTyp
344345
}
345346
AsyncReadState::ReadingItem { ref mut len_read } => {
346347
while *len_read < item_buffer.len() {
347-
let len = futures_core::ready!(Pin::new(&mut raw).poll_read(cx, &mut item_buffer[*len_read..]))?;
348+
let len = futures_core::ready!(
349+
Pin::new(&mut raw).poll_read(cx, &mut item_buffer[*len_read..])
350+
)?;
348351
*len_read += len;
349352
if *len_read == item_buffer.len() {
350353
break;
@@ -386,7 +389,8 @@ enum AsyncWriteState {
386389
Idle,
387390
WritingLen {
388391
current_len: [u8; 9],
389-
len_to_be_sent: u8,
392+
len_to_be_sent: usize,
393+
len_sent: usize,
390394
},
391395
WritingValue {
392396
bytes_sent: usize,
@@ -513,15 +517,19 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
513517
};
514518
*state = AsyncWriteState::WritingLen {
515519
current_len: new_current_len,
516-
len_to_be_sent: to_be_sent as u8,
520+
len_to_be_sent: to_be_sent,
521+
len_sent: 0,
517522
};
518-
let len = futures_core::ready!(Pin::new(&mut *raw).poll_write(cx, &new_current_len[0..to_be_sent]))?;
523+
let len = futures_core::ready!(
524+
Pin::new(&mut *raw).poll_write(cx, &new_current_len[0..to_be_sent])
525+
)?;
519526
*state = if len == to_be_sent {
520527
AsyncWriteState::WritingValue { bytes_sent: 0 }
521528
} else {
522529
AsyncWriteState::WritingLen {
523530
current_len: new_current_len,
524-
len_to_be_sent: (to_be_sent - len) as u8,
531+
len_to_be_sent: to_be_sent,
532+
len_sent: len,
525533
}
526534
};
527535
continue;
@@ -533,20 +541,22 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
533541
}
534542
}
535543
AsyncWriteState::WritingLen {
536-
current_len,
537-
len_to_be_sent,
544+
ref current_len,
545+
ref len_to_be_sent,
546+
ref mut len_sent,
538547
} => {
539548
let len = futures_core::ready!(Pin::new(&mut *raw)
540-
.poll_write(cx, &current_len[0..(*len_to_be_sent as usize)]))?;
541-
if len == *len_to_be_sent as usize {
549+
.poll_write(cx, &current_len[(*len_sent)..(*len_to_be_sent)]))?;
550+
*len_sent += len;
551+
if *len_sent == *len_to_be_sent {
542552
*state = AsyncWriteState::WritingValue { bytes_sent: 0 };
543-
} else {
544-
*len_to_be_sent -= len as u8;
545553
}
546554
continue;
547555
}
548556
AsyncWriteState::WritingValue { bytes_sent } => {
549-
let len = futures_core::ready!(Pin::new(&mut *raw).poll_write(cx, &write_buffer[*bytes_sent..]))?;
557+
let len = futures_core::ready!(
558+
Pin::new(&mut *raw).poll_write(cx, &write_buffer[*bytes_sent..])
559+
)?;
550560
*bytes_sent += len;
551561
if *bytes_sent == write_buffer.len() {
552562
*state = AsyncWriteState::Idle;
@@ -564,7 +574,7 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
564574
} else {
565575
continue;
566576
}
567-
},
577+
}
568578
AsyncWriteState::Closed => Poll::Ready(Ok(None)),
569579
};
570580
}

0 commit comments

Comments
 (0)