@@ -4,6 +4,9 @@ use std::path::PathBuf;
44
55use futures:: { Async , AsyncSink , Poll , Sink , StartSend , Stream } ;
66
7+ #[ cfg( feature = "unstable-futures" ) ]
8+ use futures2:: { self , task} ;
9+
710use UnixDatagram ;
811
912/// Encoding of frames via buffers.
@@ -72,14 +75,28 @@ impl<C: UnixDatagramCodec> Stream for UnixDatagramFramed<C> {
7275 type Error = io:: Error ;
7376
7477 fn poll ( & mut self ) -> Poll < Option < C :: In > , io:: Error > {
75- let ( n, addr) = try_nb ! ( self . socket. recv_from( & mut self . rd) ) ;
78+ let ( n, addr) = try_ready ! ( self . socket. recv_from( & mut self . rd) ) ;
7679 trace ! ( "received {} bytes, decoding" , n) ;
7780 let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
7881 trace ! ( "frame decoded from buffer" ) ;
7982 Ok ( Async :: Ready ( Some ( frame) ) )
8083 }
8184}
8285
86+ #[ cfg( feature = "unstable-futures" ) ]
87+ impl < C : UnixDatagramCodec > futures2:: Stream for UnixDatagramFramed < C > {
88+ type Item = C :: In ;
89+ type Error = io:: Error ;
90+
91+ fn poll_next ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < Option < C :: In > , io:: Error > {
92+ let ( n, addr) = try_ready2 ! ( self . socket. recv_from2( cx, & mut self . rd) ) ;
93+ trace ! ( "received {} bytes, decoding" , n) ;
94+ let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
95+ trace ! ( "frame decoded from buffer" ) ;
96+ Ok ( futures2:: Async :: Ready ( Some ( frame) ) )
97+ }
98+ }
99+
83100impl < C : UnixDatagramCodec > Sink for UnixDatagramFramed < C > {
84101 type SinkItem = C :: Out ;
85102 type SinkError = io:: Error ;
@@ -104,7 +121,7 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
104121 }
105122
106123 trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
107- let n = try_nb ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
124+ let n = try_ready ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
108125 trace ! ( "written {}" , n) ;
109126 let wrote_all = n == self . wr . len ( ) ;
110127 self . wr . clear ( ) ;
@@ -124,6 +141,53 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
124141 }
125142}
126143
144+ #[ cfg( feature = "unstable-futures" ) ]
145+ impl < C : UnixDatagramCodec > futures2:: Sink for UnixDatagramFramed < C > {
146+ type SinkItem = C :: Out ;
147+ type SinkError = io:: Error ;
148+
149+ fn poll_ready ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
150+ if self . wr . len ( ) > 0 {
151+ try!( self . poll_flush ( cx) ) ;
152+ if self . wr . len ( ) > 0 {
153+ return Ok ( futures2:: Async :: Pending ) ;
154+ }
155+ }
156+ Ok ( ( ) . into ( ) )
157+ }
158+
159+ fn start_send ( & mut self , item : C :: Out ) -> Result < ( ) , io:: Error > {
160+ self . out_addr = try!( self . codec . encode ( item, & mut self . wr ) ) ;
161+ Ok ( ( ) )
162+ }
163+
164+ fn poll_flush ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
165+ trace ! ( "flushing framed transport" ) ;
166+
167+ if self . wr . is_empty ( ) {
168+ return Ok ( futures2:: Async :: Ready ( ( ) ) ) ;
169+ }
170+
171+ trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
172+ let n = try_ready2 ! ( self . socket. send_to2( cx, & self . wr, & self . out_addr) ) ;
173+ trace ! ( "written {}" , n) ;
174+ let wrote_all = n == self . wr . len ( ) ;
175+ self . wr . clear ( ) ;
176+ if wrote_all {
177+ Ok ( futures2:: Async :: Ready ( ( ) ) )
178+ } else {
179+ Err ( io:: Error :: new (
180+ io:: ErrorKind :: Other ,
181+ "failed to write entire datagram to socket" ,
182+ ) )
183+ }
184+ }
185+
186+ fn poll_close ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
187+ self . poll_flush ( cx)
188+ }
189+ }
190+
127191pub fn new < C : UnixDatagramCodec > ( socket : UnixDatagram , codec : C ) -> UnixDatagramFramed < C > {
128192 UnixDatagramFramed {
129193 socket : socket,
0 commit comments