@@ -3,21 +3,24 @@ use std::pin::Pin;
3
3
use std:: task:: { Context , Poll } ;
4
4
use std:: time:: Duration ;
5
5
6
+ use async_io:: Timer ;
7
+ use async_net:: TcpStream ;
8
+ use asyncs:: select;
6
9
use bytes:: buf:: BufMut ;
10
+ use futures:: io:: BufReader ;
11
+ use futures:: prelude:: * ;
12
+ use futures_lite:: AsyncReadExt ;
7
13
use ignore_result:: Ignore ;
8
- use tokio:: io:: { AsyncBufReadExt , AsyncRead , AsyncReadExt , AsyncWrite , AsyncWriteExt , BufStream , ReadBuf } ;
9
- use tokio:: net:: TcpStream ;
10
- use tokio:: { select, time} ;
11
14
use tracing:: { debug, trace} ;
12
15
13
16
#[ cfg( feature = "tls" ) ]
14
17
mod tls {
15
18
pub use std:: sync:: Arc ;
16
19
20
+ pub use futures_rustls:: client:: TlsStream ;
21
+ pub use futures_rustls:: TlsConnector ;
17
22
pub use rustls:: pki_types:: ServerName ;
18
23
pub use rustls:: ClientConfig ;
19
- pub use tokio_rustls:: client:: TlsStream ;
20
- pub use tokio_rustls:: TlsConnector ;
21
24
}
22
25
#[ cfg( feature = "tls" ) ]
23
26
use tls:: * ;
@@ -51,7 +54,7 @@ pub trait AsyncReadToBuf: AsyncReadExt {
51
54
impl < T > AsyncReadToBuf for T where T : AsyncReadExt { }
52
55
53
56
impl AsyncRead for Connection {
54
- fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut ReadBuf < ' _ > ) -> Poll < Result < ( ) > > {
57
+ fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < Result < usize > > {
55
58
match self . get_mut ( ) {
56
59
Self :: Raw ( stream) => Pin :: new ( stream) . poll_read ( cx, buf) ,
57
60
#[ cfg( feature = "tls" ) ]
@@ -85,11 +88,11 @@ impl AsyncWrite for Connection {
85
88
}
86
89
}
87
90
88
- fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
91
+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
89
92
match self . get_mut ( ) {
90
- Self :: Raw ( stream) => Pin :: new ( stream) . poll_shutdown ( cx) ,
93
+ Self :: Raw ( stream) => Pin :: new ( stream) . poll_close ( cx) ,
91
94
#[ cfg( feature = "tls" ) ]
92
- Self :: Tls ( stream) => Pin :: new ( stream) . poll_shutdown ( cx) ,
95
+ Self :: Tls ( stream) => Pin :: new ( stream) . poll_close ( cx) ,
93
96
}
94
97
}
95
98
}
@@ -99,7 +102,7 @@ pub struct ConnReader<'a> {
99
102
}
100
103
101
104
impl AsyncRead for ConnReader < ' _ > {
102
- fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut ReadBuf < ' _ > ) -> Poll < Result < ( ) > > {
105
+ fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < Result < usize > > {
103
106
Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_read ( cx, buf)
104
107
}
105
108
}
@@ -121,8 +124,8 @@ impl AsyncWrite for ConnWriter<'_> {
121
124
Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_flush ( cx)
122
125
}
123
126
124
- fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
125
- Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_shutdown ( cx)
127
+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
128
+ Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_close ( cx)
126
129
}
127
130
}
128
131
@@ -142,13 +145,14 @@ impl Connection {
142
145
Self :: Tls ( stream)
143
146
}
144
147
145
- pub async fn command ( self , cmd : & str ) -> Result < String > {
146
- let mut stream = BufStream :: new ( self ) ;
147
- stream . write_all ( cmd. as_bytes ( ) ) . await ?;
148
- stream . flush ( ) . await ?;
148
+ pub async fn command ( mut self , cmd : & str ) -> Result < String > {
149
+ // let mut stream = BufStream::new(self);
150
+ self . write_all ( cmd. as_bytes ( ) ) . await ?;
151
+ self . flush ( ) . await ?;
149
152
let mut line = String :: new ( ) ;
150
- stream. read_line ( & mut line) . await ?;
151
- stream. shutdown ( ) . await . ignore ( ) ;
153
+ let mut reader = BufReader :: new ( self ) ;
154
+ reader. read_line ( & mut line) . await ?;
155
+ reader. close ( ) . await . ignore ( ) ;
152
156
Ok ( line)
153
157
}
154
158
@@ -212,7 +216,7 @@ impl Connector {
212
216
}
213
217
select ! {
214
218
_ = unsafe { Pin :: new_unchecked( deadline) } => Err ( Error :: new( ErrorKind :: TimedOut , "deadline exceed" ) ) ,
215
- _ = time :: sleep ( self . timeout) => Err ( Error :: new( ErrorKind :: TimedOut , format!( "connection timeout{:?} exceed" , self . timeout) ) ) ,
219
+ _ = Timer :: after ( self . timeout) => Err ( Error :: new( ErrorKind :: TimedOut , format!( "connection timeout{:?} exceed" , self . timeout) ) ) ,
216
220
r = TcpStream :: connect( ( endpoint. host, endpoint. port) ) => {
217
221
match r {
218
222
Err ( err) => Err ( err) ,
@@ -255,10 +259,10 @@ impl Connector {
255
259
"fails to contact writable server from endpoints {:?}" ,
256
260
endpoints. endpoints( )
257
261
) ;
258
- time :: sleep ( timeout) . await ;
262
+ Timer :: after ( timeout) . await ;
259
263
timeout = max_timeout. min ( timeout * 2 ) ;
260
264
} else {
261
- time :: sleep ( Duration :: from_millis ( 5 ) ) . await ;
265
+ Timer :: after ( Duration :: from_millis ( 5 ) ) . await ;
262
266
}
263
267
}
264
268
None
@@ -273,7 +277,7 @@ mod tests {
273
277
use crate :: deadline:: Deadline ;
274
278
use crate :: endpoint:: EndpointRef ;
275
279
276
- #[ tokio :: test]
280
+ #[ asyncs :: test]
277
281
async fn raw ( ) {
278
282
let connector = Connector :: new ( ) ;
279
283
let endpoint = EndpointRef :: new ( "host1" , 2181 , true ) ;
0 commit comments