@@ -5,6 +5,7 @@ use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, th
55
66use async_channel:: { Receiver , Sender , unbounded} ;
77
8+ use crate :: Handle ;
89use crate :: system:: { FnExec , Id , System , SystemCommand } ;
910
1011thread_local ! (
@@ -29,7 +30,8 @@ pub struct Arbiter {
2930 id : usize ,
3031 pub ( crate ) sys_id : usize ,
3132 name : Arc < String > ,
32- sender : Sender < ArbiterCommand > ,
33+ pub ( crate ) hnd : Option < Handle > ,
34+ pub ( crate ) sender : Sender < ArbiterCommand > ,
3335 thread_handle : Option < thread:: JoinHandle < ( ) > > ,
3436}
3537
@@ -47,22 +49,40 @@ impl Default for Arbiter {
4749
4850impl Clone for Arbiter {
4951 fn clone ( & self ) -> Self {
50- Self :: with_sender ( self . sys_id , self . id , self . name . clone ( ) , self . sender . clone ( ) )
52+ Self {
53+ id : self . id ,
54+ sys_id : self . sys_id ,
55+ name : self . name . clone ( ) ,
56+ sender : self . sender . clone ( ) ,
57+ hnd : self . hnd . clone ( ) ,
58+ thread_handle : None ,
59+ }
5160 }
5261}
5362
5463impl Arbiter {
5564 #[ allow( clippy:: borrowed_box) ]
56- pub ( super ) fn new_system ( name : String ) -> ( Self , ArbiterController ) {
65+ pub ( super ) fn new_system ( sys_id : usize , name : String ) -> ( Self , ArbiterController ) {
5766 let ( tx, rx) = unbounded ( ) ;
5867
59- let arb = Arbiter :: with_sender ( 0 , 0 , Arc :: new ( name) , tx) ;
68+ let arb = Arbiter :: with_sender ( sys_id , 0 , Arc :: new ( name) , tx) ;
6069 ADDR . with ( |cell| * cell. borrow_mut ( ) = Some ( arb. clone ( ) ) ) ;
6170 STORAGE . with ( |cell| cell. borrow_mut ( ) . clear ( ) ) ;
6271
6372 ( arb, ArbiterController { rx, stop : None } )
6473 }
6574
75+ pub ( super ) fn dummy ( ) -> Self {
76+ Arbiter {
77+ id : 0 ,
78+ hnd : None ,
79+ name : String :: new ( ) . into ( ) ,
80+ sys_id : 0 ,
81+ sender : unbounded ( ) . 0 ,
82+ thread_handle : None ,
83+ }
84+ }
85+
6686 /// Returns the current thread's arbiter's address
6787 ///
6888 /// # Panics
@@ -75,6 +95,12 @@ impl Arbiter {
7595 } )
7696 }
7797
98+ pub ( crate ) fn set_current ( & self ) {
99+ ADDR . with ( |cell| {
100+ * cell. borrow_mut ( ) = Some ( self . clone ( ) ) ;
101+ } ) ;
102+ }
103+
78104 /// Stop arbiter from continuing it's event loop.
79105 pub fn stop ( & self ) {
80106 let _ = self . sender . try_send ( ArbiterCommand :: Stop ) ;
@@ -108,19 +134,23 @@ impl Arbiter {
108134
109135 let name = name2. clone ( ) ;
110136 let sys_id = sys. id ( ) ;
137+ let ( arb_hnd_tx, arb_hnd_rx) = oneshot:: channel ( ) ;
111138
112139 let handle = builder
113140 . spawn ( move || {
114141 log:: info!( "Starting {name2:?} arbiter" ) ;
115142
116- let arb = Arbiter :: with_sender ( sys_id. 0 , id, name2, arb_tx) ;
117-
118143 let ( stop, stop_rx) = oneshot:: channel ( ) ;
119144 STORAGE . with ( |cell| cell. borrow_mut ( ) . clear ( ) ) ;
120145
121- System :: set_current ( sys) ;
146+ System :: set_current ( sys. clone ( ) ) ;
147+
148+ crate :: driver:: block_on ( config. runner . as_ref ( ) , async move {
149+ let arb = Arbiter :: with_sender ( sys_id. 0 , id, name2, arb_tx) ;
150+ arb_hnd_tx
151+ . send ( arb. hnd . clone ( ) )
152+ . expect ( "Controller thread has gone" ) ;
122153
123- config. block_on ( async move {
124154 // start arbiter controller
125155 crate :: spawn (
126156 ArbiterController {
@@ -132,7 +162,7 @@ impl Arbiter {
132162 ADDR . with ( |cell| * cell. borrow_mut ( ) = Some ( arb. clone ( ) ) ) ;
133163
134164 // register arbiter
135- let _ = System :: current ( )
165+ let _ = sys
136166 . sys ( )
137167 . try_send ( SystemCommand :: RegisterArbiter ( Id ( id) , arb) ) ;
138168
@@ -144,13 +174,18 @@ impl Arbiter {
144174 let _ = System :: current ( )
145175 . sys ( )
146176 . try_send ( SystemCommand :: UnregisterArbiter ( Id ( id) ) ) ;
177+
178+ STORAGE . with ( |cell| cell. borrow_mut ( ) . clear ( ) ) ;
147179 } )
148180 . unwrap_or_else ( |err| {
149181 panic ! ( "Cannot spawn an arbiter's thread {:?}: {:?}" , & name, err)
150182 } ) ;
151183
184+ let hnd = arb_hnd_rx. recv ( ) . expect ( "Could not start new arbiter" ) ;
185+
152186 Arbiter {
153187 id,
188+ hnd,
154189 name,
155190 sys_id : sys_id. 0 ,
156191 sender : arb_tx2,
@@ -164,11 +199,21 @@ impl Arbiter {
164199 name : Arc < String > ,
165200 sender : Sender < ArbiterCommand > ,
166201 ) -> Self {
202+ #[ cfg( feature = "tokio" ) ]
203+ let hnd = { Handle :: new ( sender. clone ( ) ) } ;
204+
205+ #[ cfg( feature = "compio" ) ]
206+ let hnd = { Handle :: new ( sender. clone ( ) ) } ;
207+
208+ #[ cfg( all( not( feature = "compio" ) , not( feature = "tokio" ) ) ) ]
209+ let hnd = { Handle :: current ( ) } ;
210+
167211 Self {
168212 id,
169213 sys_id,
170214 name,
171215 sender,
216+ hnd : Some ( hnd) ,
172217 thread_handle : None ,
173218 }
174219 }
@@ -183,6 +228,14 @@ impl Arbiter {
183228 self . name . as_ref ( )
184229 }
185230
231+ #[ inline]
232+ /// Handle to a runtime
233+ pub fn handle ( & self ) -> & Handle {
234+ self . hnd . as_ref ( ) . unwrap ( )
235+ }
236+
237+ #[ doc( hidden) ]
238+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::spawn()`" ) ]
186239 /// Send a future to the Arbiter's thread, and spawn it.
187240 pub fn spawn < F > ( & self , future : F )
188241 where
@@ -193,12 +246,13 @@ impl Arbiter {
193246 . try_send ( ArbiterCommand :: Execute ( Box :: pin ( future) ) ) ;
194247 }
195248
196- #[ rustfmt:: skip]
249+ #[ doc( hidden) ]
250+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::Handle::spawn()`" ) ]
197251 /// Send a function to the Arbiter's thread and spawns it's resulting future.
198252 /// This can be used to spawn non-send futures on the arbiter thread.
199253 pub fn spawn_with < F , R , O > (
200254 & self ,
201- f : F
255+ f : F ,
202256 ) -> impl Future < Output = Result < O , oneshot:: RecvError > > + Send + ' static
203257 where
204258 F : FnOnce ( ) -> R + Send + ' static ,
@@ -216,11 +270,15 @@ impl Arbiter {
216270 rx
217271 }
218272
219- #[ rustfmt:: skip]
273+ #[ doc( hidden) ]
274+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::Handle::spawn()`" ) ]
220275 /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
221276 /// A future is created, and when resolved will contain the result of the function sent
222277 /// to the Arbiters thread.
223- pub fn exec < F , R > ( & self , f : F ) -> impl Future < Output = Result < R , oneshot:: RecvError > > + Send + ' static
278+ pub fn exec < F , R > (
279+ & self ,
280+ f : F ,
281+ ) -> impl Future < Output = Result < R , oneshot:: RecvError > > + Send + ' static
224282 where
225283 F : FnOnce ( ) -> R + Send + ' static ,
226284 R : Send + ' static ,
@@ -234,6 +292,8 @@ impl Arbiter {
234292 rx
235293 }
236294
295+ #[ doc( hidden) ]
296+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::Handle::spawn()`" ) ]
237297 /// Send a function to the Arbiter's thread, and execute it. Any result from the function
238298 /// is discarded.
239299 pub fn exec_fn < F > ( & self , f : F )
@@ -247,17 +307,22 @@ impl Arbiter {
247307 } ) ) ) ;
248308 }
249309
310+ #[ doc( hidden) ]
311+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::set_item()`" ) ]
250312 /// Set item to current arbiter's storage
251313 pub fn set_item < T : ' static > ( item : T ) {
252- STORAGE
253- . with ( move |cell| cell. borrow_mut ( ) . insert ( TypeId :: of :: < T > ( ) , Box :: new ( item) ) ) ;
314+ set_item ( item) ;
254315 }
255316
317+ #[ doc( hidden) ]
318+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::get_item()`" ) ]
256319 /// Check if arbiter storage contains item
257320 pub fn contains_item < T : ' static > ( ) -> bool {
258321 STORAGE . with ( move |cell| cell. borrow ( ) . get ( & TypeId :: of :: < T > ( ) ) . is_some ( ) )
259322 }
260323
324+ #[ doc( hidden) ]
325+ #[ deprecated( since = "3.8.0" , note = "use `ntex_rt::get_item()`" ) ]
261326 /// Get a reference to a type previously inserted on this arbiter's storage
262327 ///
263328 /// # Panics
@@ -353,3 +418,22 @@ impl ArbiterController {
353418 }
354419 }
355420}
421+
422+ /// Set item to current runtime's storage
423+ pub fn set_item < T : ' static > ( item : T ) {
424+ STORAGE . with ( move |cell| cell. borrow_mut ( ) . insert ( TypeId :: of :: < T > ( ) , Box :: new ( item) ) ) ;
425+ }
426+
427+ /// Get a reference to a type previously inserted on this runtime's storage
428+ pub fn get_item < T : ' static , F , R > ( f : F ) -> R
429+ where
430+ F : FnOnce ( Option < & mut T > ) -> R ,
431+ {
432+ STORAGE . with ( move |cell| {
433+ let mut st = cell. borrow_mut ( ) ;
434+ let item = st
435+ . get_mut ( & TypeId :: of :: < T > ( ) )
436+ . and_then ( |boxed| boxed. downcast_mut ( ) ) ;
437+ f ( item)
438+ } )
439+ }
0 commit comments