@@ -2,7 +2,10 @@ use std::io;
22use std:: os:: unix:: net:: SocketAddr ;
33use std:: path:: PathBuf ;
44
5- use futures:: { Async , Poll , Stream , Sink , StartSend , AsyncSink } ;
5+ use futures:: { Async , AsyncSink , Poll , Sink , StartSend , Stream } ;
6+
7+ #[ cfg( feature = "unstable-futures" ) ]
8+ use futures2:: { self , task} ;
69
710use UnixDatagram ;
811
@@ -50,8 +53,7 @@ pub trait UnixDatagramCodec {
5053 ///
5154 /// The encode method also determines the destination to which the buffer
5255 /// should be directed, which will be returned as a `SocketAddr`.
53- fn encode ( & mut self , msg : Self :: Out , buf : & mut Vec < u8 > )
54- -> io:: Result < PathBuf > ;
56+ fn encode ( & mut self , msg : Self :: Out , buf : & mut Vec < u8 > ) -> io:: Result < PathBuf > ;
5557}
5658
5759/// A unified `Stream` and `Sink` interface to an underlying
@@ -73,14 +75,28 @@ impl<C: UnixDatagramCodec> Stream for UnixDatagramFramed<C> {
7375 type Error = io:: Error ;
7476
7577 fn poll ( & mut self ) -> Poll < Option < C :: In > , io:: Error > {
76- let ( n, addr) = try_nb ! ( self . socket. recv_from( & mut self . rd) ) ;
78+ let ( n, addr) = try_ready ! ( self . socket. recv_from( & mut self . rd) ) ;
7779 trace ! ( "received {} bytes, decoding" , n) ;
7880 let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
7981 trace ! ( "frame decoded from buffer" ) ;
8082 Ok ( Async :: Ready ( Some ( frame) ) )
8183 }
8284}
8385
86+ #[ cfg( feature = "unstable-futures" ) ]
87+ impl < C : UnixDatagramCodec > futures2:: Stream for UnixDatagramFramed < C > {
88+ type Item = C :: In ;
89+ type Error = io:: Error ;
90+
91+ fn poll_next ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < Option < C :: In > , io:: Error > {
92+ let ( n, addr) = try_ready2 ! ( self . socket. recv_from2( cx, & mut self . rd) ) ;
93+ trace ! ( "received {} bytes, decoding" , n) ;
94+ let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
95+ trace ! ( "frame decoded from buffer" ) ;
96+ Ok ( futures2:: Async :: Ready ( Some ( frame) ) )
97+ }
98+ }
99+
84100impl < C : UnixDatagramCodec > Sink for UnixDatagramFramed < C > {
85101 type SinkItem = C :: Out ;
86102 type SinkError = io:: Error ;
@@ -101,19 +117,21 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
101117 trace ! ( "flushing framed transport" ) ;
102118
103119 if self . wr . is_empty ( ) {
104- return Ok ( Async :: Ready ( ( ) ) )
120+ return Ok ( Async :: Ready ( ( ) ) ) ;
105121 }
106122
107123 trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
108- let n = try_nb ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
124+ let n = try_ready ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
109125 trace ! ( "written {}" , n) ;
110126 let wrote_all = n == self . wr . len ( ) ;
111127 self . wr . clear ( ) ;
112128 if wrote_all {
113129 Ok ( Async :: Ready ( ( ) ) )
114130 } else {
115- Err ( io:: Error :: new ( io:: ErrorKind :: Other ,
116- "failed to write entire datagram to socket" ) )
131+ Err ( io:: Error :: new (
132+ io:: ErrorKind :: Other ,
133+ "failed to write entire datagram to socket" ,
134+ ) )
117135 }
118136 }
119137
@@ -123,6 +141,53 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
123141 }
124142}
125143
144+ #[ cfg( feature = "unstable-futures" ) ]
145+ impl < C : UnixDatagramCodec > futures2:: Sink for UnixDatagramFramed < C > {
146+ type SinkItem = C :: Out ;
147+ type SinkError = io:: Error ;
148+
149+ fn poll_ready ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
150+ if self . wr . len ( ) > 0 {
151+ try!( self . poll_flush ( cx) ) ;
152+ if self . wr . len ( ) > 0 {
153+ return Ok ( futures2:: Async :: Pending ) ;
154+ }
155+ }
156+ Ok ( ( ) . into ( ) )
157+ }
158+
159+ fn start_send ( & mut self , item : C :: Out ) -> Result < ( ) , io:: Error > {
160+ self . out_addr = try!( self . codec . encode ( item, & mut self . wr ) ) ;
161+ Ok ( ( ) )
162+ }
163+
164+ fn poll_flush ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
165+ trace ! ( "flushing framed transport" ) ;
166+
167+ if self . wr . is_empty ( ) {
168+ return Ok ( futures2:: Async :: Ready ( ( ) ) ) ;
169+ }
170+
171+ trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
172+ let n = try_ready2 ! ( self . socket. send_to2( cx, & self . wr, & self . out_addr) ) ;
173+ trace ! ( "written {}" , n) ;
174+ let wrote_all = n == self . wr . len ( ) ;
175+ self . wr . clear ( ) ;
176+ if wrote_all {
177+ Ok ( futures2:: Async :: Ready ( ( ) ) )
178+ } else {
179+ Err ( io:: Error :: new (
180+ io:: ErrorKind :: Other ,
181+ "failed to write entire datagram to socket" ,
182+ ) )
183+ }
184+ }
185+
186+ fn poll_close ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
187+ self . poll_flush ( cx)
188+ }
189+ }
190+
126191pub fn new < C : UnixDatagramCodec > ( socket : UnixDatagram , codec : C ) -> UnixDatagramFramed < C > {
127192 UnixDatagramFramed {
128193 socket : socket,
0 commit comments