@@ -11,14 +11,8 @@ use futures_util::{
1111 future:: { self } ,
1212 pin_mut, FutureExt , StreamExt ,
1313} ;
14- use std:: {
15- cell:: Cell ,
16- collections:: HashMap ,
17- future:: Future ,
18- rc:: Rc ,
19- sync:: atomic:: { AtomicUsize , Ordering } ,
20- task:: Waker ,
21- } ;
14+ use slab:: Slab ;
15+ use std:: { cell:: Cell , future:: Future , rc:: Rc , task:: Waker } ;
2216use std:: { fmt:: Debug , ops:: Deref } ;
2317
2418#[ doc = include_str ! ( "../docs/use_resource.md" ) ]
4135 ( rc, Rc :: new ( Cell :: new ( Some ( changed) ) ) )
4236 } ) ;
4337
44- let mut waiting_futures: Signal < HashMap < usize , Waker > > = use_signal ( HashMap :: new) ;
38+ let mut waiting_futures: Signal < ( usize , Slab < Waker > ) > = use_signal ( || ( 0 , Slab :: new ( ) ) ) ;
4539
4640 let cb = use_callback ( move |_| {
4741 // Set the state to Pending when the task is restarted
7165 value. set ( Some ( res) ) ;
7266
7367 let mut waiting_futures = waiting_futures. write ( ) ;
74- for ( _, waker) in waiting_futures. drain ( ) {
68+ let ( version, wakers) = & mut * waiting_futures;
69+ for waker in wakers. drain ( ) {
7570 waker. wake ( ) ;
7671 }
72+ * version += 1 ;
7773 } )
7874 } ) ;
7975
@@ -131,7 +127,10 @@ where
131127/// ```
132128#[ derive( Debug ) ]
133129pub struct Resource < T : ' static > {
134- waiting_futures : Signal < HashMap < usize , Waker > > ,
130+ /// (slab version, wakers)
131+ /// When the slab is drained, ids are reclaimed. We need version to ensure [`ResourceFuture`]
132+ /// does not overwrite another futures waker when polled.
133+ waiting_futures : Signal < ( usize , Slab < Waker > ) > ,
135134 value : Signal < Option < T > > ,
136135 task : Signal < Task > ,
137136 state : Signal < UseResourceState > ,
@@ -729,14 +728,12 @@ impl<T: Clone> Deref for Resource<T> {
729728 }
730729}
731730
732- static NEXT_RESOURCE_FUTURE_ID : AtomicUsize = AtomicUsize :: new ( 0 ) ;
733-
734731#[ derive( Debug ) ]
735732pub struct ResourceFuture < T >
736733where
737734 T : ' static ,
738735{
739- id : usize ,
736+ id : Option < ( usize , usize ) > ,
740737 resource : Resource < T > ,
741738}
742739
@@ -750,12 +747,24 @@ where
750747 self : std:: pin:: Pin < & mut Self > ,
751748 cx : & mut std:: task:: Context < ' _ > ,
752749 ) -> std:: task:: Poll < Self :: Output > {
753- if matches ! ( * self . resource. state. peek( ) , UseResourceState :: Ready ) {
750+ let this = unsafe { self . get_unchecked_mut ( ) } ;
751+ if matches ! ( * this. resource. state. peek( ) , UseResourceState :: Ready ) {
754752 return std:: task:: Poll :: Ready ( ( ) ) ;
755753 }
756- {
757- let mut waiting_futures = self . resource . waiting_futures ;
758- waiting_futures. insert ( self . id , cx. waker ( ) . clone ( ) ) ;
754+ let mut waiting_futures = this. resource . waiting_futures . write ( ) ;
755+ let ( current_slab_version, wakers) = & mut * waiting_futures;
756+ if let Some ( ( slab_version, id) ) = & mut this. id {
757+ if * current_slab_version == * slab_version {
758+ let old_waker = wakers. get_mut ( * id) . unwrap ( ) ;
759+ let _ = std:: mem:: replace ( old_waker, cx. waker ( ) . clone ( ) ) ;
760+ } else {
761+ let new_id = wakers. insert ( cx. waker ( ) . clone ( ) ) ;
762+ * slab_version = * current_slab_version;
763+ * id = new_id;
764+ }
765+ } else {
766+ let id = wakers. insert ( cx. waker ( ) . clone ( ) ) ;
767+ this. id = Some ( ( * current_slab_version, id) ) ;
759768 }
760769 std:: task:: Poll :: Pending
761770 }
@@ -766,9 +775,8 @@ where
766775 T : ' static ,
767776{
768777 fn clone ( & self ) -> Self {
769- let id = NEXT_RESOURCE_FUTURE_ID . fetch_add ( 1 , Ordering :: Relaxed ) ;
770- ResourceFuture {
771- id,
778+ Self {
779+ id : None ,
772780 resource : self . resource ,
773781 }
774782 }
@@ -779,7 +787,13 @@ where
779787 T : ' static ,
780788{
781789 fn drop ( & mut self ) {
782- self . resource . waiting_futures . write ( ) . remove ( & self . id ) ;
790+ if let Some ( ( slab_version, id) ) = & self . id {
791+ let mut waiting_futures = self . resource . waiting_futures . write ( ) ;
792+ let ( current_slab_version, wakers) = & mut * waiting_futures;
793+ if * current_slab_version == * slab_version {
794+ wakers. remove ( * id) ;
795+ }
796+ }
783797 }
784798}
785799
@@ -792,7 +806,9 @@ where
792806 type IntoFuture = ResourceFuture < T > ;
793807
794808 fn into_future ( self ) -> Self :: IntoFuture {
795- let id = NEXT_RESOURCE_FUTURE_ID . fetch_add ( 1 , Ordering :: Relaxed ) ;
796- ResourceFuture { id, resource : self }
809+ ResourceFuture {
810+ id : None ,
811+ resource : self ,
812+ }
797813 }
798814}
0 commit comments