Skip to content

Commit 24319bf

Browse files
committed
Deprecate Arbiter's spawn related helpers
1 parent 17eb19c commit 24319bf

File tree

8 files changed

+141
-93
lines changed

8 files changed

+141
-93
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ntex-dispatcher = "3.1.0"
5959
ntex-net = "3.7.0"
6060
ntex-http = "1.0.0"
6161
ntex-router = "1.0.0"
62-
ntex-rt = "3.7.0"
62+
ntex-rt = "3.8.0"
6363
ntex-server = "3.8.0"
6464
ntex-service = "4.5.0"
6565
ntex-tls = "3.3.0"

ntex-rt/CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changes
22

3+
## [3.8.0] - 2026-02-16
4+
5+
* Export `get_item()` and `set_item()` methods
6+
7+
* Deprecate Arbiter's spawn related helpers
8+
39
## [3.7.1] - 2026-02-13
410

511
* Export JoinHandle for neon runtime

ntex-rt/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-rt"
3-
version = "3.7.1"
3+
version = "3.8.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntex runtime"
66
keywords = ["network", "framework", "async", "futures"]

ntex-rt/src/arbiter.rs

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, th
55

66
use async_channel::{Receiver, Sender, unbounded};
77

8+
use crate::Handle;
89
use crate::system::{FnExec, Id, System, SystemCommand};
910

1011
thread_local!(
@@ -29,6 +30,7 @@ pub struct Arbiter {
2930
id: usize,
3031
pub(crate) sys_id: usize,
3132
name: Arc<String>,
33+
hnd: Handle,
3234
sender: Sender<ArbiterCommand>,
3335
thread_handle: Option<thread::JoinHandle<()>>,
3436
}
@@ -108,19 +110,23 @@ impl Arbiter {
108110

109111
let name = name2.clone();
110112
let sys_id = sys.id();
113+
let (arb_hnd_tx, arb_hnd_rx) = oneshot::channel();
111114

112115
let handle = builder
113116
.spawn(move || {
114117
log::info!("Starting {name2:?} arbiter");
115118

116-
let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
117-
118119
let (stop, stop_rx) = oneshot::channel();
119120
STORAGE.with(|cell| cell.borrow_mut().clear());
120121

121122
System::set_current(sys);
122123

123-
config.block_on(async move {
124+
crate::driver::block_on(config.runner.as_ref(), async move {
125+
let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
126+
arb_hnd_tx
127+
.send(arb.hnd.clone())
128+
.expect("Controller thread has gone");
129+
124130
// start arbiter controller
125131
crate::spawn(
126132
ArbiterController {
@@ -149,8 +155,11 @@ impl Arbiter {
149155
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
150156
});
151157

158+
let hnd = arb_hnd_rx.recv().expect("Could not start new arbiter");
159+
152160
Arbiter {
153161
id,
162+
hnd,
154163
name,
155164
sys_id: sys_id.0,
156165
sender: arb_tx2,
@@ -169,6 +178,7 @@ impl Arbiter {
169178
sys_id,
170179
name,
171180
sender,
181+
hnd: Handle::current(),
172182
thread_handle: None,
173183
}
174184
}
@@ -183,6 +193,13 @@ impl Arbiter {
183193
self.name.as_ref()
184194
}
185195

196+
/// Handle to a runtime
197+
pub fn handle(&self) -> &Handle {
198+
&self.hnd
199+
}
200+
201+
#[doc(hidden)]
202+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::spawn()`")]
186203
/// Send a future to the Arbiter's thread, and spawn it.
187204
pub fn spawn<F>(&self, future: F)
188205
where
@@ -193,12 +210,13 @@ impl Arbiter {
193210
.try_send(ArbiterCommand::Execute(Box::pin(future)));
194211
}
195212

196-
#[rustfmt::skip]
213+
#[doc(hidden)]
214+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
197215
/// Send a function to the Arbiter's thread and spawns it's resulting future.
198216
/// This can be used to spawn non-send futures on the arbiter thread.
199217
pub fn spawn_with<F, R, O>(
200218
&self,
201-
f: F
219+
f: F,
202220
) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
203221
where
204222
F: FnOnce() -> R + Send + 'static,
@@ -216,11 +234,15 @@ impl Arbiter {
216234
rx
217235
}
218236

219-
#[rustfmt::skip]
237+
#[doc(hidden)]
238+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
220239
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
221240
/// A future is created, and when resolved will contain the result of the function sent
222241
/// to the Arbiters thread.
223-
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
242+
pub fn exec<F, R>(
243+
&self,
244+
f: F,
245+
) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
224246
where
225247
F: FnOnce() -> R + Send + 'static,
226248
R: Send + 'static,
@@ -234,6 +256,8 @@ impl Arbiter {
234256
rx
235257
}
236258

259+
#[doc(hidden)]
260+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
237261
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
238262
/// is discarded.
239263
pub fn exec_fn<F>(&self, f: F)
@@ -247,17 +271,22 @@ impl Arbiter {
247271
})));
248272
}
249273

274+
#[doc(hidden)]
275+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::set_item()`")]
250276
/// Set item to current arbiter's storage
251277
pub fn set_item<T: 'static>(item: T) {
252-
STORAGE
253-
.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
278+
set_item(item);
254279
}
255280

281+
#[doc(hidden)]
282+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
256283
/// Check if arbiter storage contains item
257284
pub fn contains_item<T: 'static>() -> bool {
258285
STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
259286
}
260287

288+
#[doc(hidden)]
289+
#[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
261290
/// Get a reference to a type previously inserted on this arbiter's storage
262291
///
263292
/// # Panics
@@ -353,3 +382,22 @@ impl ArbiterController {
353382
}
354383
}
355384
}
385+
386+
/// Set item to current runtime's storage
387+
pub fn set_item<T: 'static>(item: T) {
388+
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
389+
}
390+
391+
/// Get a reference to a type previously inserted on this runtime's storage
392+
pub fn get_item<T: 'static, F, R>(f: F) -> R
393+
where
394+
F: FnOnce(Option<&mut T>) -> R,
395+
{
396+
STORAGE.with(move |cell| {
397+
let mut st = cell.borrow_mut();
398+
let item = st
399+
.get_mut(&TypeId::of::<T>())
400+
.and_then(|boxed| boxed.downcast_mut());
401+
f(item)
402+
})
403+
}

ntex-rt/src/builder.rs

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -108,43 +108,26 @@ impl Builder {
108108
///
109109
/// This method panics if it can not create tokio runtime
110110
pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
111-
let name = self.name.clone();
112-
let testing = self.testing;
113-
let stack_size = self.stack_size;
114-
let stop_on_panic = self.stop_on_panic;
115-
116-
self.build_with(SystemConfig {
117-
name,
118-
testing,
119-
stack_size,
120-
stop_on_panic,
111+
let config = SystemConfig {
112+
name: self.name.clone(),
113+
testing: self.testing,
114+
stack_size: self.stack_size,
115+
stop_on_panic: self.stop_on_panic,
116+
ping_interval: self.ping_interval,
117+
pool_limit: self.pool_limit,
118+
pool_recv_timeout: self.pool_recv_timeout,
121119
runner: Arc::new(runner),
122-
})
120+
};
121+
self.build_with(config)
123122
}
124123

125124
/// Create new System.
126125
///
127126
/// This method panics if it can not create tokio runtime
128127
pub fn build_with(self, config: SystemConfig) -> SystemRunner {
129-
let (stop_tx, stop) = oneshot::channel();
130-
let (sys_sender, sys_receiver) = unbounded();
131-
132-
// thread pool
133-
let pool = ThreadPool::new(&self.name, self.pool_limit, self.pool_recv_timeout);
134-
135-
let (arb, controller) = Arbiter::new_system(self.name.clone());
136-
let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
137-
let system = System::construct(sys_sender, arb, config.clone(), pool);
138-
139-
// system arbiter
140-
let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
141-
142128
// init system arbiter and run configuration method
143129
SystemRunner {
144-
stop,
145-
support,
146-
controller,
147-
system,
130+
runner: config.runner.clone(),
148131
config,
149132
_t: PhantomData,
150133
}
@@ -154,20 +137,12 @@ impl Builder {
154137
/// Helper object that runs System's event loop
155138
#[must_use = "SystemRunner must be run"]
156139
pub struct SystemRunner {
157-
stop: oneshot::Receiver<i32>,
158-
support: SystemSupport,
159-
controller: ArbiterController,
160-
system: System,
161140
config: SystemConfig,
141+
runner: Arc<dyn Runner>,
162142
_t: PhantomData<Rc<()>>,
163143
}
164144

165145
impl SystemRunner {
166-
/// Get current system.
167-
pub fn system(&self) -> System {
168-
self.system.clone()
169-
}
170-
171146
/// This function will start event loop and will finish once the
172147
/// `System::stop()` function is called.
173148
pub fn run_until_stop(self) -> io::Result<()> {
@@ -182,16 +157,12 @@ impl SystemRunner {
182157
{
183158
log::info!("Starting {:?} system", self.config.name);
184159

185-
let SystemRunner {
186-
controller,
187-
stop,
188-
support,
189-
config,
190-
..
191-
} = self;
160+
let SystemRunner { config, runner, .. } = self;
192161

193162
// run loop
194-
config.block_on(async move {
163+
crate::driver::block_on(runner.as_ref(), async move {
164+
let (_, support, controller, stop) = create(&config);
165+
195166
f()?;
196167

197168
crate::spawn(support.run());
@@ -215,14 +186,11 @@ impl SystemRunner {
215186
F: Future<Output = R> + 'static,
216187
R: 'static,
217188
{
218-
let SystemRunner {
219-
controller,
220-
support,
221-
config,
222-
..
223-
} = self;
189+
let SystemRunner { config, runner, .. } = self;
190+
191+
crate::driver::block_on(runner.as_ref(), async move {
192+
let (_, support, controller, _) = create(&config);
224193

225-
config.block_on(async move {
226194
crate::spawn(support.run());
227195
crate::spawn(controller.run());
228196
fut.await
@@ -236,15 +204,13 @@ impl SystemRunner {
236204
F: Future<Output = R> + 'static,
237205
R: 'static,
238206
{
239-
let SystemRunner {
240-
controller,
241-
support,
242-
..
243-
} = self;
207+
let SystemRunner { config, .. } = self;
244208

245209
// run loop
246210
tok_io::task::LocalSet::new()
247211
.run_until(async move {
212+
let (sys, support, controller, _) = create(&config);
213+
248214
crate::spawn(support.run());
249215
crate::spawn(controller.run());
250216
fut.await
@@ -256,9 +222,34 @@ impl SystemRunner {
256222
impl fmt::Debug for SystemRunner {
257223
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258224
f.debug_struct("SystemRunner")
259-
.field("system", &self.system)
260-
.field("support", &self.support)
261225
.field("config", &self.config)
262226
.finish()
263227
}
264228
}
229+
230+
/// Create new System.
231+
///
232+
/// This method panics if it can not create tokio runtime
233+
fn create(
234+
config: &SystemConfig,
235+
) -> (
236+
System,
237+
SystemSupport,
238+
ArbiterController,
239+
oneshot::Receiver<i32>,
240+
) {
241+
let (stop_tx, stop) = oneshot::channel();
242+
let (sys_sender, sys_receiver) = unbounded();
243+
244+
// thread pool
245+
let pool = ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
246+
247+
let (arb, controller) = Arbiter::new_system(config.name.clone());
248+
let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
249+
let system = System::construct(sys_sender, arb, config.clone(), pool);
250+
251+
// system arbiter
252+
let support = SystemSupport::new(stop_tx, sys_receiver, config.ping_interval);
253+
254+
(system, support, controller, stop)
255+
}

ntex-rt/src/driver.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! The platform-specified driver.
2-
use std::{fmt, io, pin::Pin};
2+
use std::{cell::RefCell, fmt, io, pin::Pin, rc::Rc};
33

44
use crate::rt::Runtime;
55

@@ -102,3 +102,20 @@ macro_rules! syscall {
102102
}
103103
}};
104104
}
105+
106+
/// Execute a future with custom `block_on` method and wait for result
107+
pub(super) fn block_on<F, R>(run: &dyn Runner, fut: F) -> R
108+
where
109+
F: Future<Output = R> + 'static,
110+
R: 'static,
111+
{
112+
// run loop
113+
let result = Rc::new(RefCell::new(None));
114+
let result_inner = result.clone();
115+
116+
run.block_on(Box::pin(async move {
117+
let r = fut.await;
118+
*result_inner.borrow_mut() = Some(r);
119+
}));
120+
result.borrow_mut().take().unwrap()
121+
}

0 commit comments

Comments
 (0)