Skip to content

Commit 338aeab

Browse files
[runtime] Add spawn_blocking_ref to Spawner trait (#957)
1 parent 0bb0e56 commit 338aeab

3 files changed

Lines changed: 159 additions & 6 deletions

File tree

runtime/src/deterministic.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,45 @@ impl crate::Spawner for Context {
814814
handle
815815
}
816816

817+
fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
818+
where
819+
F: FnOnce() -> T + Send + 'static,
820+
T: Send + 'static,
821+
{
822+
// Ensure a context only spawns one task
823+
assert!(!self.spawned, "already spawned");
824+
self.spawned = true;
825+
826+
// Get metrics
827+
let label = if dedicated {
828+
Label::blocking_dedicated(self.name.clone())
829+
} else {
830+
Label::blocking_shared(self.name.clone())
831+
};
832+
self.executor
833+
.metrics
834+
.tasks_spawned
835+
.get_or_create(&label)
836+
.inc();
837+
let gauge = self
838+
.executor
839+
.metrics
840+
.tasks_running
841+
.get_or_create(&label)
842+
.clone();
843+
844+
// Set up the task
845+
let executor = self.executor.clone();
846+
move |f: F| {
847+
let (f, handle) = Handle::init_blocking(f, gauge, false);
848+
849+
// Spawn the task
850+
let f = async move { f() };
851+
Tasks::register_work(&executor.tasks, label, Box::pin(f));
852+
handle
853+
}
854+
}
855+
817856
fn stop(&self, value: i32) {
818857
self.executor.auditor.event(b"stop", |hasher| {
819858
hasher.update(value.to_be_bytes());

runtime/src/lib.rs

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,7 @@ pub trait Spawner: Clone + Send + Sync + 'static {
123123

124124
/// Enqueue a task to be executed (without consuming the context).
125125
///
126-
/// Unlike a future, a spawned task will start executing immediately (even if the caller
127-
/// does not await the handle).
128-
///
129-
/// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
130-
/// an actor that already has a reference to context).
126+
/// The semantics are the same as [Spawner::spawn].
131127
///
132128
/// # Warning
133129
///
@@ -161,6 +157,22 @@ pub trait Spawner: Clone + Send + Sync + 'static {
161157
F: FnOnce(Self) -> T + Send + 'static,
162158
T: Send + 'static;
163159

160+
/// Enqueue a blocking task to be executed (without consuming the context).
161+
///
162+
/// The semantics are the same as [Spawner::spawn_blocking].
163+
///
164+
/// # Warning
165+
///
166+
/// If this function is used to spawn multiple tasks from the same context,
167+
/// the runtime will panic to prevent accidental misuse.
168+
fn spawn_blocking_ref<F, T>(
169+
&mut self,
170+
dedicated: bool,
171+
) -> impl FnOnce(F) -> Handle<T> + 'static
172+
where
173+
F: FnOnce() -> T + Send + 'static,
174+
T: Send + 'static;
175+
164176
/// Signals the runtime to stop execution and that all outstanding tasks
165177
/// should perform any required cleanup and exit. This method is idempotent and
166178
/// can be called multiple times.
@@ -933,6 +945,32 @@ mod tests {
933945
});
934946
}
935947

948+
fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
949+
where
950+
R::Context: Spawner,
951+
{
952+
runner.start(|mut context| async move {
953+
let spawn = context.spawn_blocking_ref(dedicated);
954+
let handle = spawn(|| 42);
955+
let result = handle.await;
956+
assert!(matches!(result, Ok(42)));
957+
});
958+
}
959+
960+
fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
961+
where
962+
R::Context: Spawner,
963+
{
964+
runner.start(|mut context| async move {
965+
let spawn = context.spawn_blocking_ref(dedicated);
966+
let result = spawn(|| 42).await;
967+
assert!(matches!(result, Ok(42)));
968+
969+
// Ensure context is consumed
970+
context.spawn_blocking(dedicated, |_| 42);
971+
});
972+
}
973+
936974
fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
937975
where
938976
R::Context: Spawner,
@@ -1157,6 +1195,23 @@ mod tests {
11571195
}
11581196
}
11591197

1198+
#[test]
1199+
fn test_deterministic_spawn_blocking_ref() {
1200+
for dedicated in [false, true] {
1201+
let executor = deterministic::Runner::default();
1202+
test_spawn_blocking_ref(executor, dedicated);
1203+
}
1204+
}
1205+
1206+
#[test]
1207+
#[should_panic]
1208+
fn test_deterministic_spawn_blocking_ref_duplicate() {
1209+
for dedicated in [false, true] {
1210+
let executor = deterministic::Runner::default();
1211+
test_spawn_blocking_ref_duplicate(executor, dedicated);
1212+
}
1213+
}
1214+
11601215
#[test]
11611216
fn test_deterministic_metrics() {
11621217
let executor = deterministic::Runner::default();
@@ -1311,6 +1366,23 @@ mod tests {
13111366
}
13121367
}
13131368

1369+
#[test]
1370+
fn test_tokio_spawn_blocking_ref() {
1371+
for dedicated in [false, true] {
1372+
let executor = tokio::Runner::default();
1373+
test_spawn_blocking_ref(executor, dedicated);
1374+
}
1375+
}
1376+
1377+
#[test]
1378+
#[should_panic]
1379+
fn test_tokio_spawn_blocking_ref_duplicate() {
1380+
for dedicated in [false, true] {
1381+
let executor = tokio::Runner::default();
1382+
test_spawn_blocking_ref_duplicate(executor, dedicated);
1383+
}
1384+
}
1385+
13141386
#[test]
13151387
fn test_tokio_metrics() {
13161388
let executor = tokio::Runner::default();

runtime/src/tokio/runtime.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl crate::Spawner for Context {
429429
.get_or_create(&label)
430430
.clone();
431431

432-
// Initialize the blocking task using the new function
432+
// Set up the task
433433
let executor = self.executor.clone();
434434
let (f, handle) = Handle::init_blocking(|| f(self), gauge, executor.cfg.catch_panics);
435435

@@ -442,6 +442,48 @@ impl crate::Spawner for Context {
442442
handle
443443
}
444444

445+
fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
446+
where
447+
F: FnOnce() -> T + Send + 'static,
448+
T: Send + 'static,
449+
{
450+
// Ensure a context only spawns one task
451+
assert!(!self.spawned, "already spawned");
452+
self.spawned = true;
453+
454+
// Get metrics
455+
let label = if dedicated {
456+
Label::blocking_dedicated(self.name.clone())
457+
} else {
458+
Label::blocking_shared(self.name.clone())
459+
};
460+
self.executor
461+
.metrics
462+
.tasks_spawned
463+
.get_or_create(&label)
464+
.inc();
465+
let gauge = self
466+
.executor
467+
.metrics
468+
.tasks_running
469+
.get_or_create(&label)
470+
.clone();
471+
472+
// Set up the task
473+
let executor = self.executor.clone();
474+
move |f: F| {
475+
let (f, handle) = Handle::init_blocking(f, gauge, executor.cfg.catch_panics);
476+
477+
// Spawn the blocking task
478+
if dedicated {
479+
std::thread::spawn(f);
480+
} else {
481+
executor.runtime.spawn_blocking(f);
482+
}
483+
handle
484+
}
485+
}
486+
445487
fn stop(&self, value: i32) {
446488
self.executor.signaler.lock().unwrap().signal(value);
447489
}

0 commit comments

Comments
 (0)