1- use std:: sync:: {
2- atomic:: { AtomicUsize , Ordering } ,
3- mpsc, Arc ,
1+ use std:: {
2+ error, fmt, mem,
3+ sync:: {
4+ atomic:: { AtomicUsize , Ordering } ,
5+ Arc ,
6+ } ,
47} ;
58
69use crate :: {
710 context:: { Context , TaskContext } ,
811 result:: NeonResult ,
12+ result:: ResultExt ,
913 sys:: { raw:: Env , tsfn:: ThreadsafeFunction } ,
1014} ;
1115
16+ #[ cfg( feature = "futures" ) ]
17+ use {
18+ std:: future:: Future ,
19+ std:: pin:: Pin ,
20+ std:: task:: { self , Poll } ,
21+ tokio:: sync:: oneshot,
22+ } ;
23+
24+ #[ cfg( not( feature = "futures" ) ) ]
25+ // Synchronous oneshot channel API compatible with `futures-channel`
26+ mod oneshot {
27+ use std:: sync:: mpsc;
28+
29+ pub ( super ) use std:: sync:: mpsc:: Receiver ;
30+
31+ pub ( super ) fn channel < T > ( ) -> ( mpsc:: SyncSender < T > , mpsc:: Receiver < T > ) {
32+ mpsc:: sync_channel ( 1 )
33+ }
34+ }
35+
1236type Callback = Box < dyn FnOnce ( Env ) + Send + ' static > ;
1337
1438/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
@@ -70,8 +94,8 @@ pub struct Channel {
7094 has_ref : bool ,
7195}
7296
73- impl std :: fmt:: Debug for Channel {
74- fn fmt ( & self , f : & mut std :: fmt:: Formatter < ' _ > ) -> std :: fmt:: Result {
97+ impl fmt:: Debug for Channel {
98+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
7599 f. write_str ( "Channel" )
76100 }
77101}
@@ -131,9 +155,9 @@ impl Channel {
131155 T : Send + ' static ,
132156 F : FnOnce ( TaskContext ) -> NeonResult < T > + Send + ' static ,
133157 {
134- let ( tx, rx) = mpsc :: sync_channel ( 1 ) ;
158+ let ( tx, rx) = oneshot :: channel ( ) ;
135159 let callback = Box :: new ( move |env| {
136- let env = unsafe { std :: mem:: transmute ( env) } ;
160+ let env = unsafe { mem:: transmute ( env) } ;
137161
138162 // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
139163 // N-API creates a `HandleScope` before calling the callback.
@@ -225,20 +249,34 @@ impl Drop for Channel {
225249/// thread with [`Channel::send`].
226250pub struct JoinHandle < T > {
227251 // `Err` is always `Throw`, but `Throw` cannot be sent across threads
228- rx : mpsc :: Receiver < Result < T , ( ) > > ,
252+ rx : oneshot :: Receiver < Result < T , ( ) > > ,
229253}
230254
231255impl < T > JoinHandle < T > {
232256 /// Waits for the associated closure to finish executing
233257 ///
234258 /// If the closure panics or throws an exception, `Err` is returned
259+ ///
260+ /// # Panics
261+ ///
262+ /// This function panics if called within an asynchronous execution context.
235263 pub fn join ( self ) -> Result < T , JoinError > {
236- self . rx
237- . recv ( )
238- // If the sending side dropped without sending, it must have panicked
239- . map_err ( |_| JoinError ( JoinErrorType :: Panic ) ) ?
240- // If the closure returned `Err`, a JavaScript exception was thrown
241- . map_err ( |_| JoinError ( JoinErrorType :: Throw ) )
264+ #[ cfg( feature = "futures" ) ]
265+ let result = self . rx . blocking_recv ( ) ;
266+ #[ cfg( not( feature = "futures" ) ) ]
267+ let result = self . rx . recv ( ) ;
268+
269+ JoinError :: map_res ( result)
270+ }
271+ }
272+
273+ #[ cfg( feature = "futures" ) ]
274+ #[ cfg_attr( docsrs, doc( cfg( feature = "futures" ) ) ) ]
275+ impl < T > Future for JoinHandle < T > {
276+ type Output = Result < T , JoinError > ;
277+
278+ fn poll ( mut self : Pin < & mut Self > , cx : & mut task:: Context ) -> Poll < Self :: Output > {
279+ JoinError :: map_poll ( & mut self . rx , cx)
242280 }
243281}
244282
@@ -253,16 +291,50 @@ enum JoinErrorType {
253291 Throw ,
254292}
255293
256- impl std :: fmt :: Display for JoinError {
257- fn fmt ( & self , f : & mut std :: fmt :: Formatter < ' _ > ) -> std :: fmt :: Result {
294+ impl JoinError {
295+ fn as_str ( & self ) -> & str {
258296 match & self . 0 {
259- JoinErrorType :: Panic => f. write_str ( "Closure panicked before returning" ) ,
260- JoinErrorType :: Throw => f. write_str ( "Closure threw an exception" ) ,
297+ JoinErrorType :: Panic => "Closure panicked before returning" ,
298+ JoinErrorType :: Throw => "Closure threw an exception" ,
299+ }
300+ }
301+
302+ #[ cfg( feature = "futures" ) ]
303+ // Helper for writing a `Future` implementation by wrapping a `Future` and
304+ // mapping to `Result<T, JoinError>`
305+ pub ( crate ) fn map_poll < T , E > (
306+ f : & mut ( impl Future < Output = Result < Result < T , ( ) > , E > > + Unpin ) ,
307+ cx : & mut task:: Context ,
308+ ) -> Poll < Result < T , Self > > {
309+ match Pin :: new ( f) . poll ( cx) {
310+ Poll :: Ready ( result) => Poll :: Ready ( Self :: map_res ( result) ) ,
311+ Poll :: Pending => Poll :: Pending ,
261312 }
262313 }
314+
315+ // Helper for mapping a nested `Result` from joining to a `Result<T, JoinError>`
316+ pub ( crate ) fn map_res < T , E > ( res : Result < Result < T , ( ) > , E > ) -> Result < T , Self > {
317+ res
318+ // If the sending side dropped without sending, it must have panicked
319+ . map_err ( |_| JoinError ( JoinErrorType :: Panic ) ) ?
320+ // If the closure returned `Err`, a JavaScript exception was thrown
321+ . map_err ( |_| JoinError ( JoinErrorType :: Throw ) )
322+ }
323+ }
324+
325+ impl fmt:: Display for JoinError {
326+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
327+ f. write_str ( self . as_str ( ) )
328+ }
263329}
264330
265- impl std:: error:: Error for JoinError { }
331+ impl error:: Error for JoinError { }
332+
333+ impl < T > ResultExt < T > for Result < T , JoinError > {
334+ fn or_throw < ' a , C : Context < ' a > > ( self , cx : & mut C ) -> NeonResult < T > {
335+ self . or_else ( |err| cx. throw_error ( err. as_str ( ) ) )
336+ }
337+ }
266338
267339/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
268340///
@@ -275,19 +347,19 @@ impl std::error::Error for JoinError {}
275347#[ cfg_attr( docsrs, doc( cfg( feature = "napi-4" ) ) ) ]
276348pub struct SendError ;
277349
278- impl std :: fmt:: Display for SendError {
279- fn fmt ( & self , f : & mut std :: fmt:: Formatter < ' _ > ) -> std :: fmt:: Result {
350+ impl fmt:: Display for SendError {
351+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
280352 write ! ( f, "SendError" )
281353 }
282354}
283355
284- impl std :: fmt:: Debug for SendError {
285- fn fmt ( & self , f : & mut std :: fmt:: Formatter < ' _ > ) -> std :: fmt:: Result {
286- std :: fmt:: Display :: fmt ( self , f)
356+ impl fmt:: Debug for SendError {
357+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
358+ fmt:: Display :: fmt ( self , f)
287359 }
288360}
289361
290- impl std :: error:: Error for SendError { }
362+ impl error:: Error for SendError { }
291363
292364struct ChannelState {
293365 tsfn : ThreadsafeFunction < Callback > ,
0 commit comments