1- use std:: io ;
1+ use std:: { hint :: unreachable_unchecked , io } ;
22
3- use bytes:: { Buf , BufMut , BytesMut } ;
4- use monoio:: io:: { AsyncReadRent , AsyncWriteRent , AsyncWriteRentExt } ;
3+ use monoio:: {
4+ buf:: { IoBuf , IoBufMut } ,
5+ io:: { AsyncReadRent , AsyncWriteRent , AsyncWriteRentExt } ,
6+ } ;
57
68const BUFFER_SIZE : usize = 16 * 1024 ;
79
10+ struct Buffer {
11+ read : usize ,
12+ write : usize ,
13+ buf : Box < [ u8 ] > ,
14+ }
15+
16+ impl Buffer {
17+ fn new ( ) -> Self {
18+ Self {
19+ read : 0 ,
20+ write : 0 ,
21+ buf : vec ! [ 0 ; BUFFER_SIZE ] . into_boxed_slice ( ) ,
22+ }
23+ }
24+
25+ fn len ( & self ) -> usize {
26+ self . write - self . read
27+ }
28+
29+ fn is_empty ( & self ) -> bool {
30+ self . len ( ) == 0
31+ }
32+
33+ fn available ( & self ) -> usize {
34+ self . buf . len ( ) - self . write
35+ }
36+
37+ fn is_full ( & self ) -> bool {
38+ self . available ( ) == 0
39+ }
40+
41+ fn advance ( & mut self , n : usize ) {
42+ assert ! ( self . write - self . read <= n) ;
43+ self . read += n;
44+ if self . read == self . write {
45+ self . read = 0 ;
46+ self . write = 0 ;
47+ }
48+ }
49+ }
50+
51+ unsafe impl monoio:: buf:: IoBuf for Buffer {
52+ fn read_ptr ( & self ) -> * const u8 {
53+ unsafe { self . buf . as_ptr ( ) . add ( self . read ) }
54+ }
55+
56+ fn bytes_init ( & self ) -> usize {
57+ self . write - self . read
58+ }
59+ }
60+
61+ unsafe impl monoio:: buf:: IoBufMut for Buffer {
62+ fn write_ptr ( & mut self ) -> * mut u8 {
63+ unsafe { self . buf . as_mut_ptr ( ) . add ( self . write ) }
64+ }
65+
66+ fn bytes_total ( & mut self ) -> usize {
67+ self . buf . len ( ) - self . write
68+ }
69+
70+ unsafe fn set_init ( & mut self , pos : usize ) {
71+ self . write += pos;
72+ }
73+ }
74+
875pub ( crate ) struct SafeRead {
976 // the option is only meant for temporary take, it always should be some
10- buffer : Option < BytesMut > ,
77+ buffer : Option < Buffer > ,
78+ status : ReadStatus ,
79+ }
80+
81+ enum ReadStatus {
82+ Eof ,
83+ Err ( io:: Error ) ,
84+ Ok ,
1185}
1286
1387impl Default for SafeRead {
1488 fn default ( ) -> Self {
1589 Self {
16- buffer : Some ( BytesMut :: default ( ) ) ,
90+ buffer : Some ( Buffer :: new ( ) ) ,
91+ status : ReadStatus :: Ok ,
1792 }
1893 }
1994}
@@ -27,11 +102,24 @@ impl SafeRead {
27102 }
28103
29104 // read from raw io
30- let mut buffer = self . buffer . take ( ) . expect ( "buffer ownership expected" ) ;
31- buffer. reserve ( BUFFER_SIZE ) ;
105+ let buffer = self . buffer . take ( ) . expect ( "buffer ownership expected" ) ;
32106 let ( result, buf) = io. read ( buffer) . await ;
33107 self . buffer = Some ( buf) ;
34- result
108+ match result {
109+ Ok ( 0 ) => {
110+ self . status = ReadStatus :: Eof ;
111+ return result;
112+ }
113+ Ok ( _) => {
114+ self . status = ReadStatus :: Ok ;
115+ return result;
116+ }
117+ Err ( e) => {
118+ let rerr = e. kind ( ) . into ( ) ;
119+ self . status = ReadStatus :: Err ( e) ;
120+ return Err ( rerr) ;
121+ }
122+ }
35123 }
36124}
37125
@@ -40,12 +128,19 @@ impl io::Read for SafeRead {
40128 // if buffer is empty, return WoundBlock.
41129 let buffer = self . buffer . as_mut ( ) . expect ( "buffer mut expected" ) ;
42130 if buffer. is_empty ( ) {
131+ if !matches ! ( self . status, ReadStatus :: Ok ) {
132+ match std:: mem:: replace ( & mut self . status , ReadStatus :: Ok ) {
133+ ReadStatus :: Eof => return Ok ( 0 ) ,
134+ ReadStatus :: Err ( e) => return Err ( e) ,
135+ ReadStatus :: Ok => unsafe { unreachable_unchecked ( ) } ,
136+ }
137+ }
43138 return Err ( io:: ErrorKind :: WouldBlock . into ( ) ) ;
44139 }
45140
46141 // now buffer is not empty. copy it.
47142 let to_copy = buffer. len ( ) . min ( buf. len ( ) ) ;
48- unsafe { std:: ptr:: copy_nonoverlapping ( buffer. as_ptr ( ) , buf. as_mut_ptr ( ) , to_copy) } ;
143+ unsafe { std:: ptr:: copy_nonoverlapping ( buffer. read_ptr ( ) , buf. as_mut_ptr ( ) , to_copy) } ;
49144 buffer. advance ( to_copy) ;
50145
51146 Ok ( to_copy)
@@ -54,13 +149,20 @@ impl io::Read for SafeRead {
54149
55150pub ( crate ) struct SafeWrite {
56151 // the option is only meant for temporary take, it always should be some
57- buffer : Option < BytesMut > ,
152+ buffer : Option < Buffer > ,
153+ status : WriteStatus ,
154+ }
155+
156+ enum WriteStatus {
157+ Err ( io:: Error ) ,
158+ Ok ,
58159}
59160
60161impl Default for SafeWrite {
61162 fn default ( ) -> Self {
62163 Self {
63- buffer : Some ( BytesMut :: default ( ) ) ,
164+ buffer : Some ( Buffer :: new ( ) ) ,
165+ status : WriteStatus :: Ok ,
64166 }
65167 }
66168}
@@ -77,32 +179,49 @@ impl SafeWrite {
77179 let buffer = self . buffer . take ( ) . expect ( "buffer ownership expected" ) ;
78180 let ( result, buffer) = io. write_all ( buffer) . await ;
79181 self . buffer = Some ( buffer) ;
80- let written_len = result?;
81- unsafe { self . buffer . as_mut ( ) . unwrap_unchecked ( ) . advance ( written_len) } ;
82-
83- Ok ( written_len)
182+ match result {
183+ Ok ( written_len) => {
184+ unsafe { self . buffer . as_mut ( ) . unwrap_unchecked ( ) . advance ( written_len) } ;
185+ Ok ( written_len)
186+ }
187+ Err ( e) => {
188+ let rerr = e. kind ( ) . into ( ) ;
189+ self . status = WriteStatus :: Err ( e) ;
190+ Err ( rerr)
191+ }
192+ }
84193 }
85194}
86195
87196impl io:: Write for SafeWrite {
88197 fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
89198 // if there is too much data inside the buffer, return WoundBlock
90199 let buffer = self . buffer . as_mut ( ) . expect ( "buffer mut expected" ) ;
91- if buffer. len ( ) >= BUFFER_SIZE {
200+ if !matches ! ( self . status, WriteStatus :: Ok ) {
201+ match std:: mem:: replace ( & mut self . status , WriteStatus :: Ok ) {
202+ WriteStatus :: Err ( e) => return Err ( e) ,
203+ WriteStatus :: Ok => unsafe { unreachable_unchecked ( ) } ,
204+ }
205+ }
206+ if buffer. is_full ( ) {
92207 return Err ( io:: ErrorKind :: WouldBlock . into ( ) ) ;
93208 }
94209
95- // there is space inside the buffer, copy it.
96- let space_left = BUFFER_SIZE - buffer. len ( ) ;
97- buffer. reserve ( space_left) ;
98- let to_copy = buf. len ( ) . min ( space_left) ;
99- unsafe { std:: ptr:: copy_nonoverlapping ( buf. as_ptr ( ) , buffer. as_mut_ptr ( ) , to_copy) } ;
100- unsafe { buffer. advance_mut ( to_copy) } ;
210+ // there is space inside the buffer, copy to it.
211+ let to_copy = buf. len ( ) . min ( buffer. available ( ) ) ;
212+ unsafe { std:: ptr:: copy_nonoverlapping ( buf. as_ptr ( ) , buffer. write_ptr ( ) , to_copy) } ;
213+ unsafe { buffer. set_init ( to_copy) } ;
101214 Ok ( to_copy)
102215 }
103216
104217 fn flush ( & mut self ) -> io:: Result < ( ) > {
105218 let buffer = self . buffer . as_mut ( ) . expect ( "buffer mut expected" ) ;
219+ if !matches ! ( self . status, WriteStatus :: Ok ) {
220+ match std:: mem:: replace ( & mut self . status , WriteStatus :: Ok ) {
221+ WriteStatus :: Err ( e) => return Err ( e) ,
222+ WriteStatus :: Ok => unsafe { unreachable_unchecked ( ) } ,
223+ }
224+ }
106225 if !buffer. is_empty ( ) {
107226 return Err ( io:: ErrorKind :: WouldBlock . into ( ) ) ;
108227 }
0 commit comments