diff --git a/Cargo.lock b/Cargo.lock index 09055a8dddd..080696a9666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,6 +141,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -1580,6 +1586,7 @@ name = "commonware-runtime" version = "2026.4.0" dependencies = [ "arbitrary", + "async-task", "axum", "bytes", "cfg-if", diff --git a/Cargo.toml b/Cargo.toml index caecc1ff288..b33ab09e59e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ suspicious_op_assign_impl = "allow" [workspace.dependencies] anyhow = { version = "1.0.99", default-features = false } arbitrary = "1.4.1" +async-task = "4.7.1" aws-config = "1.8.12" aws-lc-rs = "1.15.2" aws-sdk-ec2 = "1.200.0" diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index c583c4ea422..e6656f53f3c 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [dependencies] arbitrary = { workspace = true, optional = true } +async-task.workspace = true bytes.workspace = true cfg-if.workspace = true commonware-codec.workspace = true diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 20bad024e8d..39f5a3d04bb 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -885,6 +885,18 @@ impl Tasks { type Network = MeteredNetwork>; type Storage = MeteredStorage>>; +/// Marks a dedicated subtree as closed before the parent task's result is made +/// observable through its handle. +struct DedicatedGuard { + tree: Arc, +} + +impl Drop for DedicatedGuard { + fn drop(&mut self) { + self.tree.abort(); + } +} + /// Implementation of [crate::Spawner], [crate::Clock], /// [crate::Network], and [crate::Storage] for the `deterministic` /// runtime. @@ -900,6 +912,7 @@ pub struct Context { tree: Arc, execution: Execution, traced: bool, + dedicated: Option>, } impl Clone for Context { @@ -918,6 +931,7 @@ impl Clone for Context { tree: child, execution: Execution::default(), traced: false, + dedicated: self.dedicated.clone(), } } } @@ -999,6 +1013,7 @@ impl Context { tree: Tree::root(), execution: Execution::default(), traced: false, + dedicated: None, }, executor, panicked, @@ -1071,6 +1086,7 @@ impl Context { tree: Tree::root(), execution: Execution::default(), traced: false, + dedicated: None, }, executor, panicked, @@ -1138,6 +1154,11 @@ impl crate::Spawner for Context { self } + fn colocated(mut self) -> Self { + self.execution = Execution::Colocated; + self + } + fn shared(mut self, blocking: bool) -> Self { self.execution = Execution::Shared(blocking); self @@ -1154,14 +1175,38 @@ impl crate::Spawner for Context { // Track supervision before resetting configuration let parent = Arc::clone(&self.tree); + let past = self.execution; let traced = self.traced; + let inherited_dedicated = self.dedicated.clone(); self.execution = Execution::default(); self.traced = false; + if matches!(past, Execution::Colocated) { + let dedicated = inherited_dedicated + .as_ref() + .expect("`colocated()` requires a running dedicated ancestor"); + assert!( + !dedicated.is_aborted(), + "`colocated()` requires a running dedicated ancestor" + ); + } let (child, aborted) = Tree::child(&parent); if aborted { return Handle::closed(metric); } + let child_dedicated = match past { + // Dedicated creates a new execution domain for the spawned child. + // In the deterministic runtime, the child context node itself is + // sufficient to model that domain because the supervision tree + // already tracks whether the dedicated root task is still alive. + Execution::Dedicated => Some(child.clone()), + // Colocated reuses the nearest dedicated ancestor, which must + // already have been validated before creating the child node. + Execution::Colocated => inherited_dedicated, + // Shared clears the dedicated branch assignment. + Execution::Shared(_) => None, + }; self.tree = child; + self.dedicated = child_dedicated.clone(); // Spawn the task (we don't care about Model) let executor = self.executor(); @@ -1174,6 +1219,15 @@ impl crate::Spawner for Context { } else { Either::Right(f(self)) }; + let future = if matches!(past, Execution::Dedicated) { + let dedicated = child_dedicated.expect("dedicated tree missing"); + Either::Left(async move { + let _guard = DedicatedGuard { tree: dedicated }; + future.await + }) + } else { + Either::Right(future) + }; let (f, handle) = Handle::init( future, metric, diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0294900de7f..3c13c86eae3 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -179,6 +179,18 @@ stability_scope!(BETA { /// This is not the default behavior. See [`Spawner::shared`] for more information. fn dedicated(self) -> Self; + /// Return a [`Spawner`] that co-locates the next spawned task on the same thread + /// as the closest dedicated ancestor already assigned to this context lineage. + /// + /// [`Spawner::shared`] clears that assignment for descendants, and + /// [`Spawner::dedicated`] creates a new one for the spawned child. + /// + /// This configuration only applies to the next [`Spawner::spawn`] call. + /// It does not refer to a future dedicated child that has not been + /// spawned yet. If no running dedicated ancestor is assigned when + /// [`Spawner::spawn`] is called, the runtime panics. + fn colocated(self) -> Self; + /// Spawn a task with the current context. /// /// Unlike directly awaiting a future, the task starts running immediately even if the caller @@ -207,8 +219,19 @@ stability_scope!(BETA { /// /// # Spawn Configuration /// - /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via - /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset. + /// Context lineage and spawn configuration are resolved at different times: + /// + /// - [`Clone::clone`] and [`Metrics::with_label`] eagerly create a new child context node and + /// preserve any execution domain that is already assigned to the lineage. + /// - [`Spawner::dedicated`], [`Spawner::shared`], and [`Spawner::colocated`] only label the next + /// spawn edge. + /// - [`Spawner::spawn`] materializes the child task, creates the child context passed to the + /// closure, and resolves that pending execution label into the child's assigned execution domain. + /// + /// Because of that, cloning a context or providing one via [`Spawner::spawn`] preserves the + /// resolved execution domain already attached to the lineage, but resets any pending + /// configuration made via [`Spawner::dedicated`], [`Spawner::shared`], or + /// [`Spawner::colocated`]. /// /// Child tasks should assume they start from a clean configuration without needing to inspect how their /// parent was configured. @@ -882,6 +905,7 @@ mod tests { use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, Ipv6Addr}, + panic::{catch_unwind, AssertUnwindSafe}, pin::Pin, str::FromStr, sync::{ @@ -1729,6 +1753,125 @@ mod tests { }); } + fn test_spawn_colocated(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + // A dedicated parent assigns a dedicated execution domain to its + // child context, so a colocated child should resolve successfully. + let handle = context.dedicated().spawn(|context| async move { + let handle = context.colocated().spawn(|_| async move { 42 }); + handle.await.unwrap() + }); + assert!(matches!(handle.await, Ok(42))); + }); + } + + fn test_spawn_colocated_nested(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + // Colocation is inherited through further colocated descendants as + // long as the lineage is not reset by `shared()` or `dedicated()`. + let handle = context.dedicated().spawn(|context| async move { + let handle = context.colocated().spawn(|context| async move { + let handle = context.colocated().spawn(|_| async move { 7 }); + handle.await.unwrap() + }); + handle.await.unwrap() + }); + assert!(matches!(handle.await, Ok(7))); + }); + } + + fn test_spawn_colocated_without_dedicated_ancestor_panics(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + // A root context has no assigned dedicated execution domain, so + // `colocated()` must reject the spawn immediately. + let panic = catch_unwind(AssertUnwindSafe(|| { + drop(context.colocated().spawn(|_| async move { 99 })); + })); + assert!(panic.is_err()); + }); + } + + fn test_spawn_colocated_shared_breaks_assignment(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + // `shared()` clears the dedicated assignment for descendants, so a + // later colocated spawn from that branch must panic instead of + // walking back to an older dedicated ancestor. + let handle = context.dedicated().spawn(|context| async move { + let panic = context + .shared(false) + .spawn(|context| async move { + catch_unwind(AssertUnwindSafe(|| { + drop(context.colocated().spawn(|_| async move { 1 })); + })) + .is_err() + }) + .await + .unwrap(); + assert!(panic); + }); + assert!(handle.await.is_ok()); + }); + } + + fn test_spawn_colocated_abort_on_parent_completion(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + let child_handle = Arc::new(Mutex::new(None)); + let child_handle2 = child_handle.clone(); + + // Dedicated parent spawns a colocated child that hangs forever + let parent_handle = context.dedicated().spawn(move |context| async move { + let handle = context.colocated().spawn(|_| pending::<()>()); + *child_handle2.lock() = Some(handle); + }); + + // Parent completes immediately, colocated child should be aborted + assert!(parent_handle.await.is_ok()); + let child_handle = child_handle.lock().take().unwrap(); + assert!(matches!(child_handle.await, Err(Error::Closed))); + }); + } + + fn test_spawn_colocated_stale_clone_after_ancestor_exit_panics(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + let stale = Arc::new(Mutex::new(None)); + let stale2 = stale.clone(); + + // Clone a context while the dedicated ancestor is still running so + // the clone inherits that assignment. + let handle = context.dedicated().spawn(move |context| async move { + *stale2.lock() = Some(context.clone()); + }); + assert!(handle.await.is_ok()); + + // Once the dedicated ancestor exits, the inherited assignment is + // stale and colocated spawns must panic instead of silently + // reviving the old dedicated branch. + let stale = stale.lock().take().unwrap(); + let panic = catch_unwind(AssertUnwindSafe(|| { + drop(stale.colocated().spawn(|_| async move { 5 })); + })); + assert!(panic.is_err()); + }); + } + fn test_spawn(runner: R) where R::Context: Spawner + Clock, @@ -3544,6 +3687,42 @@ mod tests { test_spawn_dedicated(executor); } + #[test] + fn test_deterministic_spawn_colocated() { + let executor = deterministic::Runner::default(); + test_spawn_colocated(executor); + } + + #[test] + fn test_deterministic_spawn_colocated_nested() { + let executor = deterministic::Runner::default(); + test_spawn_colocated_nested(executor); + } + + #[test] + fn test_deterministic_spawn_colocated_without_dedicated_ancestor_panics() { + let executor = deterministic::Runner::default(); + test_spawn_colocated_without_dedicated_ancestor_panics(executor); + } + + #[test] + fn test_deterministic_spawn_colocated_shared_breaks_assignment() { + let executor = deterministic::Runner::default(); + test_spawn_colocated_shared_breaks_assignment(executor); + } + + #[test] + fn test_deterministic_spawn_colocated_abort_on_parent_completion() { + let executor = deterministic::Runner::default(); + test_spawn_colocated_abort_on_parent_completion(executor); + } + + #[test] + fn test_deterministic_spawn_colocated_stale_clone_after_ancestor_exit_panics() { + let executor = deterministic::Runner::default(); + test_spawn_colocated_stale_clone_after_ancestor_exit_panics(executor); + } + #[test] fn test_deterministic_spawn() { let runner = deterministic::Runner::default(); @@ -3893,6 +4072,232 @@ mod tests { test_spawn_dedicated(executor); } + #[test] + fn test_tokio_spawn_colocated() { + let executor = tokio::Runner::default(); + test_spawn_colocated(executor); + } + + #[test] + fn test_tokio_spawn_colocated_nested() { + let executor = tokio::Runner::default(); + test_spawn_colocated_nested(executor); + } + + #[test] + fn test_tokio_spawn_colocated_without_dedicated_ancestor_panics() { + let executor = tokio::Runner::default(); + test_spawn_colocated_without_dedicated_ancestor_panics(executor); + } + + #[test] + fn test_tokio_spawn_colocated_shared_breaks_assignment() { + let executor = tokio::Runner::default(); + test_spawn_colocated_shared_breaks_assignment(executor); + } + + #[test] + fn test_tokio_spawn_colocated_abort_on_parent_completion() { + let executor = tokio::Runner::default(); + test_spawn_colocated_abort_on_parent_completion(executor); + } + + #[test] + fn test_tokio_spawn_colocated_stale_clone_after_ancestor_exit_panics() { + let executor = tokio::Runner::default(); + test_spawn_colocated_stale_clone_after_ancestor_exit_panics(executor); + } + + #[test] + fn test_tokio_spawn_colocated_full_assignment_tree() { + // Exercise the full assignment tree: + // + // - root colocated panics + // - dedicated -> colocated stays on thread 1 + // - dedicated -> dedicated creates thread 2 + // - dedicated -> shared clears the assignment + // - shared -> dedicated creates thread 3 + // - shared -> colocated panics + let executor = tokio::Runner::default(); + executor.start(|context| async move { + // Without any dedicated ancestor in the lineage, the root cannot + // place a colocated child anywhere. + let root_panic = catch_unwind(AssertUnwindSafe(|| { + drop( + context + .clone() + .colocated() + .spawn(|_| async move { std::thread::current().id() }), + ); + })); + assert!(root_panic.is_err()); + + let handle = context.dedicated().spawn(|context| async move { + let thread1 = std::thread::current().id(); + + // A colocated child from the dedicated root stays on thread 1. + let thread1_colocated = context + .clone() + .colocated() + .spawn(|_| async move { std::thread::current().id() }) + .await + .unwrap(); + assert_eq!(thread1, thread1_colocated); + + // A nested dedicated child creates thread 2, and colocated + // descendants inside that branch stay on thread 2. + let (thread2, thread2_colocated, thread2_nested) = context + .clone() + .dedicated() + .spawn(|context| async move { + let thread2 = std::thread::current().id(); + let (thread2_colocated, thread2_nested) = context + .colocated() + .spawn(|context| async move { + let thread2_colocated = std::thread::current().id(); + let thread2_nested = context + .colocated() + .spawn(|_| async move { std::thread::current().id() }) + .await + .unwrap(); + (thread2_colocated, thread2_nested) + }) + .await + .unwrap(); + (thread2, thread2_colocated, thread2_nested) + }) + .await + .unwrap(); + assert_ne!(thread1, thread2); + assert_eq!(thread2, thread2_colocated); + assert_eq!(thread2, thread2_nested); + + // Returning to the original dedicated branch still targets + // thread 1. A shared hop then clears that assignment, so only a + // new dedicated child may reintroduce colocation. + let (thread1_again, shared_thread, thread3, thread3_colocated, shared_panic) = + context + .clone() + .colocated() + .spawn(|context| async move { + let thread1_again = std::thread::current().id(); + let (shared_thread, thread3, thread3_colocated, shared_panic) = context + .shared(false) + .spawn(|context| async move { + let shared_thread = std::thread::current().id(); + let (thread3, thread3_colocated) = context + .clone() + .dedicated() + .spawn(|context| async move { + let thread3 = std::thread::current().id(); + let thread3_colocated = context + .colocated() + .spawn( + |_| async move { std::thread::current().id() }, + ) + .await + .unwrap(); + (thread3, thread3_colocated) + }) + .await + .unwrap(); + + let shared_panic = catch_unwind(AssertUnwindSafe(|| { + drop(context.colocated().spawn(|_| async move { 0usize })); + })) + .is_err(); + + (shared_thread, thread3, thread3_colocated, shared_panic) + }) + .await + .unwrap(); + ( + thread1_again, + shared_thread, + thread3, + thread3_colocated, + shared_panic, + ) + }) + .await + .unwrap(); + assert_eq!(thread1, thread1_again); + assert_ne!(thread1, shared_thread); + assert_ne!(thread1, thread3); + assert_eq!(thread3, thread3_colocated); + assert!(shared_panic); + }); + handle.await.unwrap(); + }); + } + + #[test] + fn test_tokio_spawn_colocated_before_dedicated_child_starts_targets_new_thread() { + // The dedicated child context receives its new assignment before the + // closure returns, so a colocated spawn made during construction must + // already target that new dedicated thread. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let handle = context.dedicated().spawn(|context| async move { + let thread1 = std::thread::current().id(); + + let (thread2, thread2_colocated) = context + .dedicated() + .spawn(|context| { + // This call happens on thread 1, before the new + // dedicated child has started running. + let child = context + .colocated() + .spawn(|_| async move { std::thread::current().id() }); + + async move { + let thread2 = std::thread::current().id(); + let thread2_colocated = child.await.unwrap(); + (thread2, thread2_colocated) + } + }) + .await + .unwrap(); + + assert_ne!(thread1, thread2); + assert_eq!(thread2, thread2_colocated); + }); + handle.await.unwrap(); + }); + } + + #[test] + fn test_tokio_spawn_colocated_cloned_context_on_root_thread_targets_dedicated_ancestor() { + // Moving a context clone back to the root thread must still target the + // stored dedicated ancestor rather than whatever thread happens to call + // `spawn`. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let (clone_tx, clone_rx) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); + + let handle = context.dedicated().spawn(move |context| async move { + let dedicated_thread = std::thread::current().id(); + + // Hand a clone back to the root thread while the dedicated + // ancestor remains alive so the clone can spawn against it. + assert!(clone_tx.send((context.clone(), dedicated_thread)).is_ok()); + done_rx.await.unwrap(); + }); + + let (clone, dedicated_thread) = clone_rx.await.unwrap(); + let child_thread = clone + .colocated() + .spawn(|_| async move { std::thread::current().id() }) + .await + .unwrap(); + assert_eq!(dedicated_thread, child_thread); + + done_tx.send(()).unwrap(); + handle.await.unwrap(); + }); + } + #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); diff --git a/runtime/src/telemetry/metrics/task.rs b/runtime/src/telemetry/metrics/task.rs index fc0c7ab266e..649d5724cb8 100644 --- a/runtime/src/telemetry/metrics/task.rs +++ b/runtime/src/telemetry/metrics/task.rs @@ -9,7 +9,8 @@ pub struct Label { name: String, /// The type of task (root, async, or blocking). kind: Kind, - /// Whether the task runs on a dedicated thread or the shared runtime. + /// Whether the task runs on a shared executor, a dedicated thread, or is + /// colocated with a dedicated ancestor. execution: Execution, } @@ -30,6 +31,7 @@ impl Label { kind: Kind::Task, execution: match execution { crate::Execution::Dedicated => Execution::Dedicated, + crate::Execution::Colocated => Execution::Colocated, crate::Execution::Shared(blocking) => { if blocking { Execution::SharedBlocking @@ -65,4 +67,6 @@ pub enum Execution { SharedBlocking, /// Task runs on a dedicated thread. Dedicated, + /// Task is co-located on the same thread as its dedicated ancestor. + Colocated, } diff --git a/runtime/src/tokio/dedicated.rs b/runtime/src/tokio/dedicated.rs new file mode 100644 index 00000000000..b1faa0e9292 --- /dev/null +++ b/runtime/src/tokio/dedicated.rs @@ -0,0 +1,293 @@ +//! Dedicated single-thread executor used by Tokio `dedicated()` and +//! `colocated()` spawns. +//! +//! The public runtime API keeps [`crate::Spawner`] and [`crate::Metrics`] +//! contexts `Send + Sync`, which means a context carrying dedicated affinity may +//! be moved to another thread before it spawns a colocated child. Tokio's +//! [`tokio::task::LocalSet`] is not a sendable handle for that use case, so this +//! module provides a small dedicated executor abstraction instead: +//! +//! - each dedicated branch owns one OS thread; +//! - that thread enters the shared Tokio runtime once and polls all branch +//! tasks there; +//! - callers only hold a sendable [`DedicatedExecutor`] handle that can submit +//! more work onto the owner thread. +//! +//! `async-task` supplies the typed task allocation and wakeup machinery, while +//! this module supplies the dedicated thread, queueing, lifecycle tracking, and +//! "root task exit closes the branch" semantics required by `colocated()`. + +use crate::utils; +use async_task::Runnable; +use std::{ + future::Future, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + mpsc, Arc, + }, +}; + +/// Work submitted to a dedicated executor thread. +enum DedicatedWork { + /// Poll this task on the dedicated thread. + Runnable(Runnable), + /// Wake the executor loop so it can observe a state change, such as the + /// branch closing after its root exits. + Wake, +} + +/// Shared state for a dedicated executor thread. +struct DedicatedInner { + /// Queue used by remote submitters and task wakeups to hand runnable tasks + /// back to the owner thread. + tx: mpsc::Sender, + /// Whether the branch still accepts new colocated descendants. + running: AtomicBool, + /// Number of tasks that still need to finish before the thread may exit. + active: AtomicUsize, +} + +impl DedicatedInner { + /// Mark the dedicated executor as closed and wake its thread so it can + /// observe the state transition. + fn close(&self) { + if self.running.swap(false, Ordering::AcqRel) { + let _ = self.tx.send(DedicatedWork::Wake); + } + } +} + +/// A sendable handle for submitting work onto a dedicated executor thread. +#[derive(Clone)] +pub(crate) struct DedicatedExecutor { + inner: Arc, +} + +/// Decrements the active task count when a task leaves the dedicated executor. +struct ActiveGuard { + inner: Arc, +} + +impl Drop for ActiveGuard { + fn drop(&mut self) { + if self.inner.active.fetch_sub(1, Ordering::AcqRel) == 1 + && !self.inner.running.load(Ordering::Acquire) + { + let _ = self.inner.tx.send(DedicatedWork::Wake); + } + } +} + +/// Closes the dedicated executor once its root task exits. +struct RootGuard { + inner: Arc, +} + +impl Drop for RootGuard { + fn drop(&mut self) { + self.inner.close(); + } +} + +impl DedicatedExecutor { + /// Spawn a new dedicated executor thread that keeps Tokio's shared runtime + /// entered for the lifetime of the executor loop. + pub(crate) fn start(handle: tokio::runtime::Handle, stack_size: usize) -> Arc { + // The sender lives in the shared handle so callers can submit from any + // thread. The receiver is owned by the dedicated thread and is the only + // place tasks are ever polled. + let (tx, rx) = mpsc::channel(); + let executor = Arc::new(Self { + inner: Arc::new(DedicatedInner { + tx, + running: AtomicBool::new(true), + active: AtomicUsize::new(0), + }), + }); + let inner = executor.inner.clone(); + + utils::thread::spawn(stack_size, move || { + // Dedicated tasks still use Tokio-backed time, DNS, networking, + // storage, and blocking facilities. Enter the shared runtime once + // and then poll our own single-threaded task queue on top of it. + let _entered = handle.enter(); + loop { + // Once the branch is closed and all outstanding tasks have + // drained, the dedicated thread can exit. + if !inner.running.load(Ordering::Acquire) + && inner.active.load(Ordering::Acquire) == 0 + { + break; + } + + match rx.recv() { + Ok(DedicatedWork::Runnable(runnable)) => { + // `Runnable::run()` returns whether the task was woken + // while it was running. The scheduling policy here does + // not need that information, so we ignore it. + let _ = runnable.run(); + } + Ok(DedicatedWork::Wake) => { + // This is just a nudge to re-check `running` and + // `active`; there is no runnable payload to process. + } + Err(_) => break, + } + } + }); + + executor + } + + /// Returns whether this dedicated executor still accepts new colocated + /// children. + pub(crate) fn is_running(&self) -> bool { + self.inner.running.load(Ordering::Acquire) + } + + /// Spawn the root task for a dedicated branch. When it exits, the branch + /// stops accepting new colocated descendants. + pub(crate) fn spawn_root(&self, future: F) + where + F: Future + Send + 'static, + { + let inner = self.inner.clone(); + self.spawn_task(async move { + let _guard = RootGuard { inner }; + future.await; + }); + } + + /// Spawn a colocated descendant onto the dedicated thread. + pub(crate) fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { + // Stale clones must fail at spawn time once the dedicated root has + // exited; silently dropping work or reviving the branch would violate + // the `colocated()` contract. + assert!( + self.is_running(), + "`colocated()` requires a running dedicated ancestor" + ); + self.spawn_task(future); + } + + /// Schedule a task on the dedicated executor using `async-task` so the + /// concrete future type stays inside the task allocation rather than being + /// erased behind `Box`. + fn spawn_task(&self, future: F) + where + F: Future + Send + 'static, + { + // Count the task before it is scheduled so the branch cannot shut down + // between submission and the first poll. + self.inner.active.fetch_add(1, Ordering::AcqRel); + + let task_inner = self.inner.clone(); + let schedule_inner = self.inner.clone(); + let schedule = move |runnable: Runnable| { + // Both the initial submission and all later wakeups return here, + // which keeps the dedicated thread as the only poller. + let _ = schedule_inner.tx.send(DedicatedWork::Runnable(runnable)); + }; + + let wrapped = async move { + // The guard releases the active task count when the future + // completes, is aborted, or unwinds through the task harness. + let _guard = ActiveGuard { inner: task_inner }; + future.await; + }; + + // `async-task` stores the concrete future inside its own task + // allocation, so the queue only needs to carry uniform `Runnable` + // handles instead of boxed trait objects. + let (runnable, task) = async_task::spawn(wrapped, schedule); + // Our public `Handle` already owns task completion, abort, panic, + // and supervision semantics, so we detach the `async-task` handle and + // keep only the runnable side. + task.detach(); + runnable.schedule(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + panic::{catch_unwind, AssertUnwindSafe}, + thread, + time::Duration, + }; + + /// Wait until the executor closes its branch or time out if something is + /// keeping the root task alive unexpectedly. + fn wait_for_close(executor: &DedicatedExecutor) { + for _ in 0..200 { + if !executor.is_running() { + return; + } + thread::sleep(Duration::from_millis(5)); + } + panic!("dedicated executor did not close in time"); + } + + #[test] + fn test_remote_submit_runs_on_dedicated_owner_thread() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let executor = DedicatedExecutor::start( + runtime.handle().clone(), + utils::thread::system_thread_stack_size(), + ); + + let (owner_tx, owner_rx) = mpsc::channel(); + let (child_tx, child_rx) = mpsc::channel(); + let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>(); + + // Keep the root task alive so we can submit a colocated child from the + // test thread while the branch is still running. + executor.spawn_root(async move { + owner_tx.send(thread::current().id()).unwrap(); + let _ = release_rx.await; + }); + + let owner_thread = owner_rx.recv_timeout(Duration::from_secs(1)).unwrap(); + executor.spawn(async move { + child_tx.send(thread::current().id()).unwrap(); + }); + let child_thread = child_rx.recv_timeout(Duration::from_secs(1)).unwrap(); + + assert_eq!(owner_thread, child_thread); + + assert!(release_tx.send(()).is_ok()); + wait_for_close(&executor); + } + + #[test] + fn test_spawn_rejects_after_root_exit() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let executor = DedicatedExecutor::start( + runtime.handle().clone(), + utils::thread::system_thread_stack_size(), + ); + + let (done_tx, done_rx) = mpsc::channel(); + executor.spawn_root(async move { + done_tx.send(()).unwrap(); + }); + + done_rx.recv_timeout(Duration::from_secs(1)).unwrap(); + wait_for_close(&executor); + + let panic = catch_unwind(AssertUnwindSafe(|| { + executor.spawn(async move {}); + })); + assert!(panic.is_err()); + } +} diff --git a/runtime/src/tokio/mod.rs b/runtime/src/tokio/mod.rs index e88215fe966..7c74afeebd4 100644 --- a/runtime/src/tokio/mod.rs +++ b/runtime/src/tokio/mod.rs @@ -22,6 +22,7 @@ //! }); //! ``` +mod dedicated; mod runtime; pub use runtime::*; pub mod telemetry; diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index a6de1585ee0..da6673bd075 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -1,3 +1,4 @@ +use super::dedicated::DedicatedExecutor; #[cfg(not(feature = "iouring-network"))] use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork}; #[cfg(feature = "iouring-storage")] @@ -500,6 +501,7 @@ impl crate::Runner for Runner { tree: Tree::root(), execution: Execution::default(), traced: false, + dedicated: None, }; let output = executor.runtime.block_on(panicked.interrupt(f(context))); gauge.dec(); @@ -539,6 +541,7 @@ pub struct Context { tree: Arc, execution: Execution, traced: bool, + dedicated: Option>, } impl Clone for Context { @@ -556,6 +559,7 @@ impl Clone for Context { tree: child, execution: Execution::default(), traced: false, + dedicated: self.dedicated.clone(), } } } @@ -573,6 +577,11 @@ impl crate::Spawner for Context { self } + fn colocated(mut self) -> Self { + self.execution = Execution::Colocated; + self + } + fn shared(mut self, blocking: bool) -> Self { self.execution = Execution::Shared(blocking); self @@ -591,13 +600,36 @@ impl crate::Spawner for Context { let parent = Arc::clone(&self.tree); let past = self.execution; let traced = self.traced; + let inherited_dedicated = self.dedicated.clone(); self.execution = Execution::default(); self.traced = false; + let child_dedicated = match past { + // Dedicated creates a new execution domain for the spawned child. + Execution::Dedicated => Some(DedicatedExecutor::start( + self.executor.runtime.handle().clone(), + self.executor.thread_stack_size, + )), + // Colocated reuses the closest dedicated ancestor already encoded + // in the context lineage. The ancestor must still be running. + Execution::Colocated => { + let dedicated = inherited_dedicated + .expect("`colocated()` requires a running dedicated ancestor"); + assert!( + dedicated.is_running(), + "`colocated()` requires a running dedicated ancestor" + ); + Some(dedicated) + } + // Shared breaks the dedicated assignment for descendants. + Execution::Shared(_) => None, + }; + let (child, aborted) = Tree::child(&parent); if aborted { return Handle::closed(metric); } self.tree = child; + self.dedicated = child_dedicated.clone(); // Spawn the task let executor = self.executor.clone(); @@ -618,13 +650,13 @@ impl crate::Spawner for Context { ); if matches!(past, Execution::Dedicated) { - utils::thread::spawn(executor.thread_stack_size, { - // Ensure the task can access the tokio runtime - let handle = executor.runtime.handle().clone(); - move || { - handle.block_on(f); - } - }); + child_dedicated + .expect("dedicated executor missing") + .spawn_root(f); + } else if matches!(past, Execution::Colocated) { + child_dedicated + .expect("dedicated executor missing") + .spawn(f); } else if matches!(past, Execution::Shared(true)) { executor.runtime.spawn_blocking({ // Ensure the task can access the tokio runtime diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 713ecbbcc16..f7ac990da6d 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -110,6 +110,10 @@ where Self::Present(self.into_present().dedicated()) } + fn colocated(self) -> Self { + Self::Present(self.into_present().colocated()) + } + fn shared(self, blocking: bool) -> Self { Self::Present(self.into_present().shared(blocking)) } diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 8b60cb73d52..53acd108113 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -33,6 +33,9 @@ pub(crate) mod supervision; pub enum Execution { /// Task runs on a dedicated thread. Dedicated, + /// Task runs on the same dedicated execution domain as its closest + /// dedicated ancestor. + Colocated, /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. Shared(bool), diff --git a/runtime/src/utils/supervision.rs b/runtime/src/utils/supervision.rs index b65438cc43c..72b18589324 100644 --- a/runtime/src/utils/supervision.rs +++ b/runtime/src/utils/supervision.rs @@ -170,6 +170,11 @@ impl Tree { } } + /// Returns whether this node has already been aborted. + pub(crate) fn is_aborted(&self) -> bool { + self.inner.lock().aborted + } + /// Drops a strong ancestry chain iteratively to avoid recursive `Arc` /// teardown. fn drop_ancestry(parent: Arc) {