@@ -5,7 +5,7 @@ use std::pin::Pin;
55use std:: task:: { Context , Poll } ;
66
77type AsyncClosure < T > =
8- Box < dyn FnOnce ( ) -> Pin < Box < dyn Future < Output = Option < T > > + Send > > + Send + ' static > ;
8+ Box < dyn FnOnce ( ) -> Pin < Box < dyn Future < Output = T > + Send > > + Send + ' static > ;
99
1010enum Producer < T > {
1111 SyncClosure ( Box < dyn FnOnce ( ) -> T + Send + ' static > ) ,
@@ -36,9 +36,9 @@ impl<T: Send + 'static> Batch<T> {
3636 pub fn queue_async < F , Fut > ( & mut self , f : F )
3737 where
3838 F : FnOnce ( ) -> Fut + Send + ' static ,
39- Fut : Future < Output = Option < T > > + Send + ' static ,
39+ Fut : Future < Output = T > + Send + ' static ,
4040 {
41- let closure = move || -> Pin < Box < dyn Future < Output = Option < T > > + Send > > { Box :: pin ( f ( ) ) } ;
41+ let closure = move || -> Pin < Box < dyn Future < Output = T > + Send > > { Box :: pin ( f ( ) ) } ;
4242 self . producers
4343 . push_back ( Producer :: AsyncClosure ( Box :: new ( closure) ) ) ;
4444 }
@@ -63,7 +63,7 @@ impl<T: Send + 'static> Batch<T> {
6363
6464enum Current < T > {
6565 Idle ,
66- Future ( Pin < Box < dyn Future < Output = Option < T > > + Send > > ) ,
66+ Future ( Pin < Box < dyn Future < Output = T > + Send > > ) ,
6767 Iterator ( Box < dyn Iterator < Item = T > + Send > ) ,
6868}
6969
@@ -80,18 +80,15 @@ impl<T: Send + 'static> Stream for BatchStream<T> {
8080
8181 loop {
8282 match & mut this. current {
83- Current :: Future ( fut) => match fut. as_mut ( ) . poll ( cx) {
84- Poll :: Ready ( Some ( item) ) => {
85- this. current = Current :: Idle ;
86- return Poll :: Ready ( Some ( item) ) ;
87- }
88- Poll :: Ready ( None ) => {
89- this. current = Current :: Idle ;
90- }
91- Poll :: Pending => {
92- return Poll :: Pending ;
93- }
94- } ,
83+ Current :: Future ( fut) => {
84+ return match fut. as_mut ( ) . poll ( cx) {
85+ Poll :: Ready ( item) => {
86+ this. current = Current :: Idle ;
87+ Poll :: Ready ( Some ( item) )
88+ }
89+ Poll :: Pending => Poll :: Pending ,
90+ } ;
91+ }
9592 Current :: Iterator ( iter) => {
9693 if let Some ( item) = iter. next ( ) {
9794 return Poll :: Ready ( Some ( item) ) ;
@@ -127,7 +124,7 @@ mod tests {
127124 let mut batch = Batch :: new ( ) ;
128125
129126 batch. queue ( || 1 ) ;
130- batch. queue_async ( || async { Some ( 2 ) } ) ;
127+ batch. queue_async ( || async { 2 } ) ;
131128 batch. chain_iter ( 3 ..5 ) ;
132129
133130 let mut stream = batch. into_stream ( ) ;
0 commit comments