@@ -160,3 +160,108 @@ impl<M: RawMutex, T, const N: usize> ConnectedPipe<M, T, N> {
160160 }
161161 }
162162}
163+
164+
165+ #[ cfg( test) ]
166+ mod tests {
167+ use core:: time:: Duration ;
168+
169+ use embassy_sync:: blocking_mutex:: raw:: CriticalSectionRawMutex ;
170+ use futures_executor:: { LocalPool , ThreadPool } ;
171+ use futures_util:: { future:: select, pin_mut, task:: SpawnExt , FutureExt } ;
172+ use futures_timer:: Delay ;
173+
174+ use super :: ConnectedPipe ;
175+
176+ async fn wait_milis ( milis : u64 ) {
177+ Delay :: new ( Duration :: from_millis ( milis) ) . await ;
178+ }
179+
180+ // #[futures_test::test]
181+ #[ test]
182+ fn test_send_receive ( ) {
183+ let mut executor = LocalPool :: new ( ) ;
184+ let spawner = executor. spawner ( ) ;
185+
186+ static PIPE : ConnectedPipe < CriticalSectionRawMutex , usize , 5 > = ConnectedPipe :: new ( ) ;
187+
188+ // Task that sends
189+ spawner. spawn ( async {
190+ wait_milis ( 10 ) . await ;
191+
192+ PIPE . push ( 23 ) . await ;
193+ PIPE . push ( 56 ) . await ;
194+ PIPE . push ( 67 ) . await ;
195+ } ) . unwrap ( ) ;
196+
197+
198+ // Task that receives
199+ spawner. spawn ( async {
200+ let reader = PIPE . reader ( ) ;
201+ let value = reader. receive ( ) . await ;
202+ assert_eq ! ( value, 23 ) ;
203+ let value = reader. receive ( ) . await ;
204+ assert_eq ! ( value, 56 ) ;
205+ let value = reader. receive ( ) . await ;
206+ assert_eq ! ( value, 67 ) ;
207+
208+ } ) . unwrap ( ) ;
209+
210+ executor. run ( ) ;
211+ }
212+
213+ #[ futures_test:: test]
214+ async fn test_send_drop ( ) {
215+
216+ static PIPE : ConnectedPipe < CriticalSectionRawMutex , usize , 5 > = ConnectedPipe :: new ( ) ;
217+
218+ PIPE . push ( 23 ) . await ;
219+ PIPE . push ( 56 ) . await ;
220+ PIPE . push ( 67 ) . await ;
221+
222+
223+ // Create a reader after sending
224+ let reader = PIPE . reader ( ) ;
225+ let receive = reader. receive ( )
226+ . fuse ( ) ;
227+ pin_mut ! ( receive) ;
228+
229+ let timeout = wait_milis ( 50 ) . fuse ( ) ;
230+ pin_mut ! ( timeout) ;
231+
232+ let either = select ( receive, timeout) . await ;
233+
234+ match either {
235+ futures_util:: future:: Either :: Left ( _) => {
236+ panic ! ( "There should be nothing to receive!" ) ;
237+ } ,
238+ futures_util:: future:: Either :: Right ( _) => { } ,
239+ }
240+ }
241+
242+ #[ futures_test:: test]
243+ async fn test_bulk_send_publish ( ) {
244+ static PIPE : ConnectedPipe < CriticalSectionRawMutex , usize , 5 > = ConnectedPipe :: new ( ) ;
245+
246+ let executor = ThreadPool :: new ( ) . unwrap ( ) ;
247+
248+ executor. spawn ( async {
249+ for i in 0 ..1000 {
250+ PIPE . push ( i) . await ;
251+ }
252+ } ) . unwrap ( ) ;
253+
254+ executor. spawn ( async {
255+ for i in 1000 ..2000 {
256+ PIPE . push ( i) . await ;
257+ }
258+ } ) . unwrap ( ) ;
259+
260+ let reader = PIPE . reader ( ) ;
261+ for _ in 0 ..800 {
262+ reader. receive ( ) . await ;
263+ }
264+
265+ }
266+
267+ }
0 commit comments