From 1b433922bb3c897140946d20d53f59573bee55fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 10:41:18 +0100 Subject: [PATCH 01/35] [runtime] support pinning dedicated tasks to a core --- runtime/src/deterministic.rs | 7 +++++- runtime/src/lib.rs | 6 +++++ runtime/src/telemetry/metrics/task.rs | 2 +- runtime/src/tokio/runtime.rs | 13 +++++++++-- runtime/src/utils/cell.rs | 4 ++++ runtime/src/utils/mod.rs | 5 +++-- runtime/src/utils/thread.rs | 32 +++++++++++++++++++++++++++ 7 files changed, 63 insertions(+), 6 deletions(-) diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 3779795d345..ba8211a743f 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -1134,7 +1134,12 @@ impl Context { impl crate::Spawner for Context { fn dedicated(mut self) -> Self { - self.execution = Execution::Dedicated; + self.execution = Execution::Dedicated(None); + self + } + + fn pinned(mut self, core: usize) -> Self { + self.execution = Execution::Dedicated(Some(core)); self } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 78dd49e7729..f73b9dc5c95 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -179,6 +179,12 @@ stability_scope!(BETA { /// This is not the default behavior. See [`Spawner::shared`] for more information. fn dedicated(self) -> Self; + /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core + /// (best-effort, Linux only). The core value wraps around the number of available CPUs. + /// + /// Implies [`Spawner::dedicated`]. + fn pinned(self, core: usize) -> Self; + /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context. fn instrumented(self) -> Self; diff --git a/runtime/src/telemetry/metrics/task.rs b/runtime/src/telemetry/metrics/task.rs index fc0c7ab266e..786954d60a8 100644 --- a/runtime/src/telemetry/metrics/task.rs +++ b/runtime/src/telemetry/metrics/task.rs @@ -29,7 +29,7 @@ impl Label { name, kind: Kind::Task, execution: match execution { - crate::Execution::Dedicated => Execution::Dedicated, + crate::Execution::Dedicated(_) => Execution::Dedicated, crate::Execution::Shared(blocking) => { if blocking { Execution::SharedBlocking diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 82bea493e03..36a45b10727 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -548,7 +548,12 @@ impl Context { impl crate::Spawner for Context { fn dedicated(mut self) -> Self { - self.execution = Execution::Dedicated; + self.execution = Execution::Dedicated(None); + self + } + + fn pinned(mut self, core: usize) -> Self { + self.execution = Execution::Dedicated(Some(core)); self } @@ -601,11 +606,15 @@ impl crate::Spawner for Context { Arc::clone(&parent), ); - if matches!(past, Execution::Dedicated) { + if let Execution::Dedicated(core) = past { utils::thread::spawn(executor.thread_stack_size, { // Ensure the task can access the tokio runtime let handle = executor.runtime.handle().clone(); move || { + // Pin before running any work on this thread + if let Some(core) = core { + utils::thread::pin_to_core(core); + } handle.block_on(f); } }); diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 849eaf96de4..dc64de7b270 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -110,6 +110,10 @@ where Self::Present(self.into_present().dedicated()) } + fn pinned(self, core: usize) -> Self { + Self::Present(self.into_present().pinned(core)) + } + fn shared(self, blocking: bool) -> Self { Self::Present(self.into_present().shared(blocking)) } diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 4b3b3fc2704..8ad532e86b1 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -31,8 +31,9 @@ pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { - /// Task runs on a dedicated thread. - Dedicated, + /// Task runs on a dedicated thread, optionally pinned to a core + /// (best-effort, Linux only). The core value wraps around available CPUs. + Dedicated(Option), /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. Shared(bool), diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 76528bcb3a9..adb0f9a1765 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -111,3 +111,35 @@ where .spawn(f) .expect("failed to spawn thread") } + +/// Best-effort attempt to pin the current thread to the given core. +/// The `core` value wraps around the number of available CPUs. +#[cfg(target_os = "linux")] +pub(crate) fn pin_to_core(core: usize) { + // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no + // preconditions. + let num_cpus = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; + if num_cpus <= 0 { + tracing::warn!("failed to query CPU count, skipping core pinning"); + return; + } + let cpu = core % (num_cpus as usize); + + // SAFETY: `cpu_set` is zeroed and then a single valid CPU index is set. + unsafe { + let mut cpu_set: libc::cpu_set_t = std::mem::zeroed(); + libc::CPU_SET(cpu, &mut cpu_set); + let result = libc::sched_setaffinity( + 0, // current thread + std::mem::size_of::(), + &cpu_set, + ); + if result != 0 { + tracing::warn!(cpu, "sched_setaffinity failed, skipping core pinning"); + } + } +} + +/// No-op on non-Linux platforms. See the Linux implementation for details. +#[cfg(not(target_os = "linux"))] +pub(crate) fn pin_to_core(_core: usize) {} From fd204607a529023429a1e8dcbf0cb4c49d65a90a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 10:49:07 +0100 Subject: [PATCH 02/35] [runtime] add helper for available cores --- runtime/src/lib.rs | 2 ++ runtime/src/utils/mod.rs | 2 ++ runtime/src/utils/thread.rs | 30 ++++++++++++++++++++++++------ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index f73b9dc5c95..d7c1d988a7d 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -182,6 +182,8 @@ stability_scope!(BETA { /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core /// (best-effort, Linux only). The core value wraps around the number of available CPUs. /// + /// Use [`available_cores`] to query the number of online CPUs. + /// /// Implies [`Spawner::dedicated`]. fn pinned(self, core: usize) -> Self; diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 8ad532e86b1..d4b9bd47867 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -16,6 +16,8 @@ commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; +#[cfg(not(target_arch = "wasm32"))] +pub use thread::available_cores; mod handle; pub use handle::Handle; diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index adb0f9a1765..0b3daf240a6 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -112,18 +112,36 @@ where .expect("failed to spawn thread") } +/// Returns the number of online CPUs, or `None` if it cannot be determined. +#[cfg(target_os = "linux")] +pub fn available_cores() -> Option { + // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no + // preconditions. + let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; + if n <= 0 { + None + } else { + Some(n as usize) + } +} + +/// Returns the number of online CPUs, or `None` if it cannot be determined. +/// +/// Always returns `None` on non-Linux platforms. +#[cfg(not(target_os = "linux"))] +pub fn available_cores() -> Option { + None +} + /// Best-effort attempt to pin the current thread to the given core. /// The `core` value wraps around the number of available CPUs. #[cfg(target_os = "linux")] pub(crate) fn pin_to_core(core: usize) { - // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no - // preconditions. - let num_cpus = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; - if num_cpus <= 0 { + let Some(num_cores) = available_cores() else { tracing::warn!("failed to query CPU count, skipping core pinning"); return; - } - let cpu = core % (num_cpus as usize); + }; + let cpu = core % num_cores; // SAFETY: `cpu_set` is zeroed and then a single valid CPU index is set. unsafe { From 8c076c1da4fe4b3644652ed2c16fa17dd4785167 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 11:02:57 +0100 Subject: [PATCH 03/35] [runtime] clippy --- runtime/src/utils/thread.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 0b3daf240a6..02fc4faa50c 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -129,7 +129,7 @@ pub fn available_cores() -> Option { /// /// Always returns `None` on non-Linux platforms. #[cfg(not(target_os = "linux"))] -pub fn available_cores() -> Option { +pub const fn available_cores() -> Option { None } @@ -160,4 +160,4 @@ pub(crate) fn pin_to_core(core: usize) { /// No-op on non-Linux platforms. See the Linux implementation for details. #[cfg(not(target_os = "linux"))] -pub(crate) fn pin_to_core(_core: usize) {} +pub(crate) const fn pin_to_core(_core: usize) {} From df22aa9175362c99e0f36c6b99f235bc723f3d65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 11:08:06 +0100 Subject: [PATCH 04/35] [runtime] warn once on pin_to_core failure --- runtime/src/utils/thread.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 02fc4faa50c..ac3a9c6036d 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,5 +1,7 @@ //! Helpers for resolving the configured thread stack size. +#[cfg(target_os = "linux")] +use std::sync::Once; use std::{env, sync::OnceLock, thread}; /// Cached configured thread stack size. @@ -137,8 +139,13 @@ pub const fn available_cores() -> Option { /// The `core` value wraps around the number of available CPUs. #[cfg(target_os = "linux")] pub(crate) fn pin_to_core(core: usize) { + static WARN_CPUS: Once = Once::new(); + static WARN_AFFINITY: Once = Once::new(); + let Some(num_cores) = available_cores() else { - tracing::warn!("failed to query CPU count, skipping core pinning"); + WARN_CPUS.call_once(|| { + tracing::warn!("failed to query CPU count, skipping core pinning"); + }); return; }; let cpu = core % num_cores; @@ -153,7 +160,9 @@ pub(crate) fn pin_to_core(core: usize) { &cpu_set, ); if result != 0 { - tracing::warn!(cpu, "sched_setaffinity failed, skipping core pinning"); + WARN_AFFINITY.call_once(|| { + tracing::warn!(cpu, "sched_setaffinity failed, skipping core pinning"); + }); } } } From cc2321bb7ba66bf7aaa1f789cd3dabad7469f762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 11:11:28 +0100 Subject: [PATCH 05/35] [runtime] clippy --- runtime/src/utils/thread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index ac3a9c6036d..37dd8f2ed4d 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,7 +1,7 @@ //! Helpers for resolving the configured thread stack size. #[cfg(target_os = "linux")] -use std::sync::Once; +use commonware_utils::sync::Once; use std::{env, sync::OnceLock, thread}; /// Cached configured thread stack size. From 53bff090cd79028a47b04e6e26a9b7dd1d44cdab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 11:29:28 +0100 Subject: [PATCH 06/35] [runtime] tests --- runtime/src/lib.rs | 89 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index d7c1d988a7d..33af003e777 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1694,6 +1694,16 @@ mod tests { }); } + fn test_spawn_pinned(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async move { + let handle = context.pinned(0).spawn(|_| async move { 42 }); + assert!(matches!(handle.await, Ok(42))); + }); + } + fn test_spawn(runner: R) where R::Context: Spawner + Clock, @@ -3325,6 +3335,12 @@ mod tests { test_spawn_dedicated(executor); } + #[test] + fn test_deterministic_spawn_pinned() { + let executor = deterministic::Runner::default(); + test_spawn_pinned(executor); + } + #[test] fn test_deterministic_spawn() { let runner = deterministic::Runner::default(); @@ -3674,6 +3690,79 @@ mod tests { test_spawn_dedicated(executor); } + #[test] + fn test_tokio_spawn_pinned() { + let executor = tokio::Runner::default(); + test_spawn_pinned(executor); + } + + #[test] + fn test_tokio_spawn_pinned_dedicated_thread() { + // Verify that pinned implies dedicated. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let root_thread = std::thread::current().id(); + let task_thread = context + .pinned(0) + .spawn(|_| async move { std::thread::current().id() }) + .await + .unwrap(); + // The task should run on a different thread than the root thread. + assert_ne!(root_thread, task_thread); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_correct_core() { + // Verify that a pinned task is actually running on the expected core, + // for every available core. + let num_cores = crate::available_cores().unwrap(); + let executor = tokio::Runner::default(); + executor.start(|context| async move { + for core in 0..num_cores { + let actual = context + .clone() + .pinned(core) + .spawn(|_| async move { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + unsafe { libc::sched_getcpu() as usize } + }) + .await + .unwrap(); + assert_eq!(actual, core); + } + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_same_core() { + // Verify that two separate tasks pinned to the same core run on + // different threads but report the same CPU. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let t1 = context.clone().pinned(1).spawn(|_| async move { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + (std::thread::current().id(), unsafe { libc::sched_getcpu() }) + }); + let t2 = context.clone().pinned(1).spawn(|_| async move { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + (std::thread::current().id(), unsafe { libc::sched_getcpu() }) + }); + let (r1, r2) = futures::future::join(t1, t2).await; + let (thread1, cpu1) = r1.unwrap(); + let (thread2, cpu2) = r2.unwrap(); + // Different dedicated threads. + assert_ne!(thread1, thread2); + // Same core. + assert_eq!(cpu1, cpu2); + }); + } + #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); From f00c82ed0d5b7a8eaae76b585231c8645e18a800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 16:28:56 +0100 Subject: [PATCH 07/35] [runtime] panic on invalid core number --- runtime/src/lib.rs | 13 ++++++++++--- runtime/src/utils/mod.rs | 4 ++-- runtime/src/utils/thread.rs | 27 +++++++++++++++++++-------- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 33af003e777..cff49531f5a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -179,12 +179,19 @@ stability_scope!(BETA { /// This is not the default behavior. See [`Spawner::shared`] for more information. fn dedicated(self) -> Self; - /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core - /// (best-effort, Linux only). The core value wraps around the number of available CPUs. + /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core. /// - /// Use [`available_cores`] to query the number of online CPUs. + /// Core pinning is currently Linux only and a no-op on other platforms. Pinning may + /// silently fail in restricted environments (e.g. containers with cgroup CPU limits), + /// this method will still succeed but the thread will run unpinned. + /// + /// Use [`available_cores`] to query the number of available CPUs. /// /// Implies [`Spawner::dedicated`]. + /// + /// # Panics + /// + /// Panics if `core` is greater than or equal to the number of available CPUs. fn pinned(self, core: usize) -> Self; /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context. diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index d4b9bd47867..85866972116 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -33,8 +33,8 @@ pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { - /// Task runs on a dedicated thread, optionally pinned to a core - /// (best-effort, Linux only). The core value wraps around available CPUs. + /// Task runs on a dedicated thread, optionally pinned to a core. Core pinning is + /// currently Linux only and a no-op on other platforms. Dedicated(Option), /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 37dd8f2ed4d..30d28a52658 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -114,7 +114,7 @@ where .expect("failed to spawn thread") } -/// Returns the number of online CPUs, or `None` if it cannot be determined. +/// Returns the number of available CPUs, or `None` if it cannot be determined. #[cfg(target_os = "linux")] pub fn available_cores() -> Option { // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no @@ -127,7 +127,7 @@ pub fn available_cores() -> Option { } } -/// Returns the number of online CPUs, or `None` if it cannot be determined. +/// Returns the number of available CPUs, or `None` if it cannot be determined. /// /// Always returns `None` on non-Linux platforms. #[cfg(not(target_os = "linux"))] @@ -135,8 +135,14 @@ pub const fn available_cores() -> Option { None } -/// Best-effort attempt to pin the current thread to the given core. -/// The `core` value wraps around the number of available CPUs. +/// Pins the current thread to the given core. +/// +/// If the CPU count cannot be queried or `sched_setaffinity` fails, a warning +/// is logged once and the thread continues unpinned. +/// +/// # Panics +/// +/// Panics if `core` is greater than or equal to the number of available CPUs. #[cfg(target_os = "linux")] pub(crate) fn pin_to_core(core: usize) { static WARN_CPUS: Once = Once::new(); @@ -148,12 +154,15 @@ pub(crate) fn pin_to_core(core: usize) { }); return; }; - let cpu = core % num_cores; + assert!( + core < num_cores, + "core {core} out of range ({num_cores} available)" + ); // SAFETY: `cpu_set` is zeroed and then a single valid CPU index is set. unsafe { let mut cpu_set: libc::cpu_set_t = std::mem::zeroed(); - libc::CPU_SET(cpu, &mut cpu_set); + libc::CPU_SET(core, &mut cpu_set); let result = libc::sched_setaffinity( 0, // current thread std::mem::size_of::(), @@ -161,12 +170,14 @@ pub(crate) fn pin_to_core(core: usize) { ); if result != 0 { WARN_AFFINITY.call_once(|| { - tracing::warn!(cpu, "sched_setaffinity failed, skipping core pinning"); + tracing::warn!(core, "sched_setaffinity failed, skipping core pinning"); }); } } } -/// No-op on non-Linux platforms. See the Linux implementation for details. +/// Pins the current thread to the given core. +/// +/// No-op on non-Linux platforms. #[cfg(not(target_os = "linux"))] pub(crate) const fn pin_to_core(_core: usize) {} From d3877f519b680fcddd44d3728fcbee588e00e8c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 16:44:19 +0100 Subject: [PATCH 08/35] [runtime] test pinned panic --- runtime/src/lib.rs | 16 +++++++++++++++- runtime/src/utils/thread.rs | 27 ++++++++++++++++----------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index cff49531f5a..3e9e541904d 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -191,7 +191,8 @@ stability_scope!(BETA { /// /// # Panics /// - /// Panics if `core` is greater than or equal to the number of available CPUs. + /// The spawned thread panics if `core` is greater than or equal to the number of available + /// CPUs. fn pinned(self, core: usize) -> Self; /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context. @@ -3770,6 +3771,19 @@ mod tests { }); } + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_invalid_core() { + // Pinning to a core beyond the available count panics on the dedicated + // thread. + let num_cores = crate::available_cores().unwrap(); + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let result = context.pinned(num_cores).spawn(|_| async {}).await; + assert!(result.is_err()); + }); + } + #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 30d28a52658..da9fe96f3e8 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -4,9 +4,6 @@ use commonware_utils::sync::Once; use std::{env, sync::OnceLock, thread}; -/// Cached configured thread stack size. -static SYSTEM_THREAD_STACK_SIZE: OnceLock = OnceLock::new(); - /// Rust's default thread stack size. /// /// See . @@ -34,7 +31,10 @@ fn rust_min_stack() -> Option { /// /// On other platforms, or if the platform-specific query fails, this falls back /// to [RUST_DEFAULT_THREAD_STACK_SIZE]. +/// +/// The result is cached after the first call. pub(crate) fn system_thread_stack_size() -> usize { + static SYSTEM_THREAD_STACK_SIZE: OnceLock = OnceLock::new(); *SYSTEM_THREAD_STACK_SIZE.get_or_init(|| { rust_min_stack() .or(system_thread_stack_size_impl()) @@ -115,16 +115,21 @@ where } /// Returns the number of available CPUs, or `None` if it cannot be determined. +/// +/// The result is cached after the first call. #[cfg(target_os = "linux")] pub fn available_cores() -> Option { - // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no - // preconditions. - let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; - if n <= 0 { - None - } else { - Some(n as usize) - } + static CORES: OnceLock> = OnceLock::new(); + *CORES.get_or_init(|| { + // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no + // preconditions. + let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; + if n <= 0 { + None + } else { + Some(n as usize) + } + }) } /// Returns the number of available CPUs, or `None` if it cannot be determined. From 595bbcc6501128aa3985f8bd3b78d7c84ef4f083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 16:50:04 +0100 Subject: [PATCH 09/35] [runtime] panic eagerly on invalid pinned --- runtime/src/lib.rs | 10 ++++------ runtime/src/tokio/runtime.rs | 6 ++++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 3e9e541904d..2906e10a901 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -191,8 +191,7 @@ stability_scope!(BETA { /// /// # Panics /// - /// The spawned thread panics if `core` is greater than or equal to the number of available - /// CPUs. + /// Panics if `core` is greater than or equal to the number of available CPUs. fn pinned(self, core: usize) -> Self; /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context. @@ -3773,14 +3772,13 @@ mod tests { #[cfg(target_os = "linux")] #[test] + #[should_panic(expected = "out of range")] fn test_tokio_spawn_pinned_invalid_core() { - // Pinning to a core beyond the available count panics on the dedicated - // thread. + // Pinning to a core beyond the available count panics eagerly. let num_cores = crate::available_cores().unwrap(); let executor = tokio::Runner::default(); executor.start(|context| async move { - let result = context.pinned(num_cores).spawn(|_| async {}).await; - assert!(result.is_err()); + context.pinned(num_cores).spawn(|_| async {}); }); } diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 36a45b10727..a9aaf7d816c 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -553,6 +553,12 @@ impl crate::Spawner for Context { } fn pinned(mut self, core: usize) -> Self { + if let Some(num_cores) = utils::thread::available_cores() { + assert!( + core < num_cores, + "core {core} out of range ({num_cores} available)" + ); + } self.execution = Execution::Dedicated(Some(core)); self } From a3b5f97e5e8ec8f9f84374830ca9d78176eaee81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 7 Apr 2026 16:53:20 +0100 Subject: [PATCH 10/35] [runtime] fix test --- runtime/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 2906e10a901..1d78ac646de 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -3750,12 +3750,13 @@ mod tests { // different threads but report the same CPU. let executor = tokio::Runner::default(); executor.start(|context| async move { - let t1 = context.clone().pinned(1).spawn(|_| async move { + let core = crate::available_cores().unwrap() - 1; + let t1 = context.clone().pinned(core).spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) }); - let t2 = context.clone().pinned(1).spawn(|_| async move { + let t2 = context.clone().pinned(core).spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) From a0d9699bd13eae8c328469206149fa2ed630d017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 8 Apr 2026 18:46:40 +0100 Subject: [PATCH 11/35] [runtime] available_cores works on any unix --- runtime/src/utils/thread.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index da9fe96f3e8..29311df5b14 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -117,7 +117,7 @@ where /// Returns the number of available CPUs, or `None` if it cannot be determined. /// /// The result is cached after the first call. -#[cfg(target_os = "linux")] +#[cfg(unix)] pub fn available_cores() -> Option { static CORES: OnceLock> = OnceLock::new(); *CORES.get_or_init(|| { @@ -134,8 +134,8 @@ pub fn available_cores() -> Option { /// Returns the number of available CPUs, or `None` if it cannot be determined. /// -/// Always returns `None` on non-Linux platforms. -#[cfg(not(target_os = "linux"))] +/// Always returns `None` on non-Unix platforms. +#[cfg(not(unix))] pub const fn available_cores() -> Option { None } From cd17564a1cdd972879bd7f8bb263746e595a69bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 8 Apr 2026 18:49:50 +0100 Subject: [PATCH 12/35] [runtime] test available_cores --- runtime/src/utils/thread.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 29311df5b14..77ac007318f 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -186,3 +186,21 @@ pub(crate) fn pin_to_core(core: usize) { /// No-op on non-Linux platforms. #[cfg(not(target_os = "linux"))] pub(crate) const fn pin_to_core(_core: usize) {} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(unix)] + #[test] + fn test_available_cores() { + let n = available_cores().expect("available_cores returned None on Unix"); + assert!(n >= 1, "expected at least 1 core, got {n}"); + } + + #[cfg(not(unix))] + #[test] + fn test_available_cores_non_unix() { + assert!(available_cores().is_none()); + } +} From ab372337a5e6d7b2e003dea33efb7deab384ded7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 21 Apr 2026 15:59:46 -0700 Subject: [PATCH 13/35] [runtime] remove stale instrumented spawner method --- runtime/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index ea3024f5da4..f0d52988130 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -194,8 +194,6 @@ stability_scope!(BETA { /// Panics if `core` is greater than or equal to the number of available CPUs. fn pinned(self, core: usize) -> Self; - /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context. - fn instrumented(self) -> Self; /// Spawn a task with the current context. /// /// Unlike directly awaiting a future, the task starts running immediately even if the caller From cc4a3b76cb6af43e019db01080db2b173d2bac03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 12:18:46 +0100 Subject: [PATCH 14/35] [runtime] pin cpus not cores --- runtime/src/deterministic.rs | 4 +- runtime/src/lib.rs | 59 +++++++------ runtime/src/tokio/runtime.rs | 19 +++-- runtime/src/utils/cell.rs | 4 +- runtime/src/utils/mod.rs | 6 +- runtime/src/utils/thread.rs | 156 +++++++++++++++++++++-------------- 6 files changed, 145 insertions(+), 103 deletions(-) diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index b1dc1616d50..35c8cd17e62 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -1138,8 +1138,8 @@ impl crate::Spawner for Context { self } - fn pinned(mut self, core: usize) -> Self { - self.execution = Execution::Dedicated(Some(core)); + fn pinned(mut self, cpu: usize) -> Self { + self.execution = Execution::Dedicated(Some(cpu)); self } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index f0d52988130..5cbac4b05bd 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -179,20 +179,21 @@ stability_scope!(BETA { /// This is not the default behavior. See [`Spawner::shared`] for more information. fn dedicated(self) -> Self; - /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core. + /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. /// - /// Core pinning is currently Linux only and a no-op on other platforms. Pinning may + /// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`. + /// Use [`available_cpus`] to query the current thread's allowed CPU ids. + /// + /// CPU pinning is currently Linux only and a no-op on other platforms. Pinning may /// silently fail in restricted environments (e.g. containers with cgroup CPU limits), /// this method will still succeed but the thread will run unpinned. /// - /// Use [`available_cores`] to query the number of available CPUs. - /// /// Implies [`Spawner::dedicated`]. /// /// # Panics /// - /// Panics if `core` is greater than or equal to the number of available CPUs. - fn pinned(self, core: usize) -> Self; + /// Panics if `cpu` is not in the current affinity mask, when that can be determined. + fn pinned(self, cpu: usize) -> Self; /// Spawn a task with the current context. /// @@ -1749,7 +1750,8 @@ mod tests { R::Context: Spawner, { runner.start(|context| async move { - let handle = context.pinned(0).spawn(|_| async move { 42 }); + let cpu = crate::available_cpus().into_iter().next().unwrap_or(0); + let handle = context.pinned(cpu).spawn(|_| async move { 42 }); assert!(matches!(handle.await, Ok(42))); }); } @@ -3935,9 +3937,10 @@ mod tests { // Verify that pinned implies dedicated. let executor = tokio::Runner::default(); executor.start(|context| async move { + let cpu = crate::available_cpus().into_iter().next().unwrap_or(0); let root_thread = std::thread::current().id(); let task_thread = context - .pinned(0) + .pinned(cpu) .spawn(|_| async move { std::thread::current().id() }) .await .unwrap(); @@ -3948,16 +3951,16 @@ mod tests { #[cfg(target_os = "linux")] #[test] - fn test_tokio_spawn_pinned_correct_core() { - // Verify that a pinned task is actually running on the expected core, - // for every available core. - let num_cores = crate::available_cores().unwrap(); + fn test_tokio_spawn_pinned_correct_cpu() { + // Verify that a pinned task is actually running on the expected CPU, + // for every allowed CPU id. + let cpus = crate::available_cpus(); let executor = tokio::Runner::default(); executor.start(|context| async move { - for core in 0..num_cores { + for cpu in cpus { let actual = context .clone() - .pinned(core) + .pinned(cpu) .spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. @@ -3965,25 +3968,25 @@ mod tests { }) .await .unwrap(); - assert_eq!(actual, core); + assert_eq!(actual, cpu); } }); } #[cfg(target_os = "linux")] #[test] - fn test_tokio_spawn_pinned_same_core() { - // Verify that two separate tasks pinned to the same core run on + fn test_tokio_spawn_pinned_same_cpu() { + // Verify that two separate tasks pinned to the same CPU run on // different threads but report the same CPU. + let cpu = crate::available_cpus().into_iter().last().unwrap(); let executor = tokio::Runner::default(); executor.start(|context| async move { - let core = crate::available_cores().unwrap() - 1; - let t1 = context.clone().pinned(core).spawn(|_| async move { + let t1 = context.clone().pinned(cpu).spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) }); - let t2 = context.clone().pinned(core).spawn(|_| async move { + let t2 = context.clone().pinned(cpu).spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) @@ -3993,20 +3996,24 @@ mod tests { let (thread2, cpu2) = r2.unwrap(); // Different dedicated threads. assert_ne!(thread1, thread2); - // Same core. + // Same CPU. assert_eq!(cpu1, cpu2); }); } #[cfg(target_os = "linux")] #[test] - #[should_panic(expected = "out of range")] - fn test_tokio_spawn_pinned_invalid_core() { - // Pinning to a core beyond the available count panics eagerly. - let num_cores = crate::available_cores().unwrap(); + #[should_panic(expected = "not in the current affinity mask")] + fn test_tokio_spawn_pinned_invalid_cpu() { + // Pinning to a CPU outside the current affinity mask panics eagerly. + let invalid_cpu = crate::available_cpus() + .into_iter() + .last() + .map(|cpu| cpu + 1) + .unwrap(); let executor = tokio::Runner::default(); executor.start(|context| async move { - context.pinned(num_cores).spawn(|_| async {}); + context.pinned(invalid_cpu).spawn(|_| async {}); }); } diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index bdd30ba4b8f..ea889a74843 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -573,14 +573,15 @@ impl crate::Spawner for Context { self } - fn pinned(mut self, core: usize) -> Self { - if let Some(num_cores) = utils::thread::available_cores() { + fn pinned(mut self, cpu: usize) -> Self { + let cpus = utils::thread::available_cpus(); + if !cpus.is_empty() { assert!( - core < num_cores, - "core {core} out of range ({num_cores} available)" + cpus.contains(&cpu), + "cpu {cpu} not in the current affinity mask" ); } - self.execution = Execution::Dedicated(Some(core)); + self.execution = Execution::Dedicated(Some(cpu)); self } @@ -628,14 +629,14 @@ impl crate::Spawner for Context { Arc::clone(&parent), ); - if let Execution::Dedicated(core) = past { + if let Execution::Dedicated(cpu) = past { utils::thread::spawn(executor.thread_stack_size, { // Ensure the task can access the tokio runtime let handle = executor.runtime.handle().clone(); move || { - // Pin before running any work on this thread - if let Some(core) = core { - utils::thread::pin_to_core(core); + // Pin before running any work on this thread. + if let Some(cpu) = cpu { + utils::thread::pin_to_cpu(cpu); } handle.block_on(f); } diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 4118fbd4a38..a8a7ce07891 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -110,8 +110,8 @@ where Self::Present(self.into_present().dedicated()) } - fn pinned(self, core: usize) -> Self { - Self::Present(self.into_present().pinned(core)) + fn pinned(self, cpu: usize) -> Self { + Self::Present(self.into_present().pinned(cpu)) } fn shared(self, blocking: bool) -> Self { diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 8329c27da36..7236f1c37a8 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -17,7 +17,7 @@ pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; #[cfg(not(target_arch = "wasm32"))] -pub use thread::available_cores; +pub use thread::available_cpus; mod handle; pub use handle::Handle; @@ -33,8 +33,8 @@ pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { - /// Task runs on a dedicated thread, optionally pinned to a core. Core pinning is - /// currently Linux only and a no-op on other platforms. + /// Task runs on a dedicated thread, optionally pinned to a logical CPU id. + /// CPU pinning is currently Linux only and a no-op on other platforms. Dedicated(Option), /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 77ac007318f..7949efe3a1c 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -114,93 +114,127 @@ where .expect("failed to spawn thread") } -/// Returns the number of available CPUs, or `None` if it cannot be determined. -/// -/// The result is cached after the first call. -#[cfg(unix)] -pub fn available_cores() -> Option { - static CORES: OnceLock> = OnceLock::new(); - *CORES.get_or_init(|| { - // SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no - // preconditions. - let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; - if n <= 0 { - None - } else { - Some(n as usize) +#[cfg(target_os = "linux")] +fn affinity_mask() -> Option<(Vec, usize)> { + let word_bits = libc::c_ulong::BITS as usize; + let mut words = 1usize; + loop { + let mut mask = vec![0 as libc::c_ulong; words]; + let cpusetsize = std::mem::size_of_val(mask.as_slice()); + let result = unsafe { + libc::syscall( + libc::SYS_sched_getaffinity, + 0, + cpusetsize, + mask.as_mut_ptr(), + ) + }; + if result >= 0 { + let bytes = result as usize; + return Some((mask, bytes)); } - }) + + let err = std::io::Error::last_os_error(); + match err.raw_os_error() { + Some(libc::EINTR) => continue, + Some(libc::EINVAL) => { + // Kernels with larger affinity masks require probing with a larger buffer. + words = words.checked_mul(2)?; + if words.checked_mul(word_bits)? > 1 << 20 { + return None; + } + } + _ => return None, + } + } } -/// Returns the number of available CPUs, or `None` if it cannot be determined. +/// Returns the logical CPU ids currently allowed for the calling thread. /// -/// Always returns `None` on non-Unix platforms. -#[cfg(not(unix))] -pub const fn available_cores() -> Option { - None +/// On Linux this queries the calling thread's affinity mask via `sched_getaffinity`. +/// On other platforms, or if the affinity mask cannot be queried, it returns an +/// empty vector. +#[cfg(target_os = "linux")] +pub fn available_cpus() -> Vec { + let Some((mask, bytes)) = affinity_mask() else { + return Vec::new(); + }; + let word_bits = libc::c_ulong::BITS as usize; + let mut cpus = Vec::new(); + for cpu in 0..(bytes * 8) { + let index = cpu / word_bits; + let offset = cpu % word_bits; + if index < mask.len() && (mask[index] & ((1 as libc::c_ulong) << offset)) != 0 { + cpus.push(cpu); + } + } + cpus } -/// Pins the current thread to the given core. +/// Returns the logical CPU ids currently allowed for the calling thread. /// -/// If the CPU count cannot be queried or `sched_setaffinity` fails, a warning -/// is logged once and the thread continues unpinned. -/// -/// # Panics +/// Always returns an empty vector on non-Linux platforms. +#[cfg(not(target_os = "linux"))] +pub fn available_cpus() -> Vec { + Vec::new() +} + +/// Pins the current thread to the given logical CPU id. /// -/// Panics if `core` is greater than or equal to the number of available CPUs. +/// If `sched_setaffinity` fails, a warning is logged once and the thread +/// continues unpinned. #[cfg(target_os = "linux")] -pub(crate) fn pin_to_core(core: usize) { - static WARN_CPUS: Once = Once::new(); +pub(crate) fn pin_to_cpu(cpu: usize) { static WARN_AFFINITY: Once = Once::new(); - let Some(num_cores) = available_cores() else { - WARN_CPUS.call_once(|| { - tracing::warn!("failed to query CPU count, skipping core pinning"); - }); - return; - }; - assert!( - core < num_cores, - "core {core} out of range ({num_cores} available)" - ); - - // SAFETY: `cpu_set` is zeroed and then a single valid CPU index is set. - unsafe { - let mut cpu_set: libc::cpu_set_t = std::mem::zeroed(); - libc::CPU_SET(core, &mut cpu_set); - let result = libc::sched_setaffinity( - 0, // current thread - std::mem::size_of::(), - &cpu_set, - ); - if result != 0 { - WARN_AFFINITY.call_once(|| { - tracing::warn!(core, "sched_setaffinity failed, skipping core pinning"); - }); + let word_bits = libc::c_ulong::BITS as usize; + let words = (cpu / word_bits) + .checked_add(1) + .expect("cpu bitset size overflow"); + let mut mask = vec![0 as libc::c_ulong; words]; + mask[cpu / word_bits] |= (1 as libc::c_ulong) << (cpu % word_bits); + let cpusetsize = std::mem::size_of_val(mask.as_slice()); + + loop { + let result = + unsafe { libc::syscall(libc::SYS_sched_setaffinity, 0, cpusetsize, mask.as_ptr()) }; + if result == 0 { + return; + } + + let err = std::io::Error::last_os_error(); + match err.raw_os_error() { + Some(libc::EINTR) => continue, + _ => { + WARN_AFFINITY.call_once(|| { + tracing::warn!(cpu, ?err, "sched_setaffinity failed, skipping CPU pinning"); + }); + return; + } } } } -/// Pins the current thread to the given core. +/// Pins the current thread to the given logical CPU id. /// /// No-op on non-Linux platforms. #[cfg(not(target_os = "linux"))] -pub(crate) const fn pin_to_core(_core: usize) {} +pub(crate) const fn pin_to_cpu(_cpu: usize) {} #[cfg(test)] mod tests { use super::*; - #[cfg(unix)] + #[cfg(target_os = "linux")] #[test] - fn test_available_cores() { - let n = available_cores().expect("available_cores returned None on Unix"); - assert!(n >= 1, "expected at least 1 core, got {n}"); + fn test_available_cpus() { + let cpus = available_cpus(); + assert!(!cpus.is_empty(), "expected at least one available CPU"); } - #[cfg(not(unix))] + #[cfg(not(target_os = "linux"))] #[test] - fn test_available_cores_non_unix() { - assert!(available_cores().is_none()); + fn test_available_cpus_non_linux() { + assert!(available_cpus().is_empty()); } } From 017c4c7a6273cae2976197efa420d09537957d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 12:23:14 +0100 Subject: [PATCH 15/35] [runtime] panic on failed cpu pinning --- runtime/src/lib.rs | 18 +++++++++++++----- runtime/src/tokio/runtime.rs | 14 ++++++++------ runtime/src/utils/thread.rs | 24 ++++++++++-------------- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 5cbac4b05bd..f276d56d116 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -184,15 +184,12 @@ stability_scope!(BETA { /// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`. /// Use [`available_cpus`] to query the current thread's allowed CPU ids. /// - /// CPU pinning is currently Linux only and a no-op on other platforms. Pinning may - /// silently fail in restricted environments (e.g. containers with cgroup CPU limits), - /// this method will still succeed but the thread will run unpinned. - /// /// Implies [`Spawner::dedicated`]. /// /// # Panics /// - /// Panics if `cpu` is not in the current affinity mask, when that can be determined. + /// Panics if CPU pinning is unavailable, if `cpu` is not in the current affinity mask, + /// or if pinning fails. fn pinned(self, cpu: usize) -> Self; /// Spawn a task with the current context. @@ -3572,6 +3569,7 @@ mod tests { } #[test] + #[should_panic(expected = "cpu pinning is not available in deterministic runtime")] fn test_deterministic_spawn_pinned() { let executor = deterministic::Runner::default(); test_spawn_pinned(executor); @@ -3926,12 +3924,14 @@ mod tests { test_spawn_dedicated(executor); } + #[cfg(target_os = "linux")] #[test] fn test_tokio_spawn_pinned() { let executor = tokio::Runner::default(); test_spawn_pinned(executor); } + #[cfg(target_os = "linux")] #[test] fn test_tokio_spawn_pinned_dedicated_thread() { // Verify that pinned implies dedicated. @@ -4017,6 +4017,14 @@ mod tests { }); } + #[cfg(not(target_os = "linux"))] + #[test] + #[should_panic(expected = "cpu pinning is not available")] + fn test_tokio_spawn_pinned_unavailable() { + let executor = tokio::Runner::default(); + test_spawn_pinned(executor); + } + #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index ea889a74843..589e1eca0c3 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -575,12 +575,14 @@ impl crate::Spawner for Context { fn pinned(mut self, cpu: usize) -> Self { let cpus = utils::thread::available_cpus(); - if !cpus.is_empty() { - assert!( - cpus.contains(&cpu), - "cpu {cpu} not in the current affinity mask" - ); - } + assert!( + !cpus.is_empty(), + "cpu pinning is not available on this platform or the affinity mask could not be queried" + ); + assert!( + cpus.contains(&cpu), + "cpu {cpu} not in the current affinity mask" + ); self.execution = Execution::Dedicated(Some(cpu)); self } diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 7949efe3a1c..d3630ea44e1 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,7 +1,5 @@ //! Helpers for resolving the configured thread stack size. -#[cfg(target_os = "linux")] -use commonware_utils::sync::Once; use std::{env, sync::OnceLock, thread}; /// Rust's default thread stack size. @@ -181,12 +179,11 @@ pub fn available_cpus() -> Vec { /// Pins the current thread to the given logical CPU id. /// -/// If `sched_setaffinity` fails, a warning is logged once and the thread -/// continues unpinned. +/// # Panics +/// +/// Panics if `sched_setaffinity` fails. #[cfg(target_os = "linux")] pub(crate) fn pin_to_cpu(cpu: usize) { - static WARN_AFFINITY: Once = Once::new(); - let word_bits = libc::c_ulong::BITS as usize; let words = (cpu / word_bits) .checked_add(1) @@ -205,21 +202,20 @@ pub(crate) fn pin_to_cpu(cpu: usize) { let err = std::io::Error::last_os_error(); match err.raw_os_error() { Some(libc::EINTR) => continue, - _ => { - WARN_AFFINITY.call_once(|| { - tracing::warn!(cpu, ?err, "sched_setaffinity failed, skipping CPU pinning"); - }); - return; - } + _ => panic!("sched_setaffinity failed for cpu {cpu}: {err}"), } } } /// Pins the current thread to the given logical CPU id. /// -/// No-op on non-Linux platforms. +/// # Panics +/// +/// Always panics on non-Linux platforms, where CPU pinning is unavailable. #[cfg(not(target_os = "linux"))] -pub(crate) const fn pin_to_cpu(_cpu: usize) {} +pub(crate) fn pin_to_cpu(_cpu: usize) { + panic!("cpu pinning is not available on this platform") +} #[cfg(test)] mod tests { From 64ffa2d355cad7e509efbbebbdfa252309d35d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 12:26:27 +0100 Subject: [PATCH 16/35] [runtime] clippy --- runtime/src/utils/thread.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index d3630ea44e1..0cf705308c4 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -119,6 +119,8 @@ fn affinity_mask() -> Option<(Vec, usize)> { loop { let mut mask = vec![0 as libc::c_ulong; words]; let cpusetsize = std::mem::size_of_val(mask.as_slice()); + // SAFETY: `mask` points to writable storage for `cpusetsize` bytes, and + // `pid == 0` targets the calling thread as documented by the syscall. let result = unsafe { libc::syscall( libc::SYS_sched_getaffinity, @@ -193,6 +195,8 @@ pub(crate) fn pin_to_cpu(cpu: usize) { let cpusetsize = std::mem::size_of_val(mask.as_slice()); loop { + // SAFETY: `mask` points to readable storage for `cpusetsize` bytes, and + // `pid == 0` targets the calling thread as documented by the syscall. let result = unsafe { libc::syscall(libc::SYS_sched_setaffinity, 0, cpusetsize, mask.as_ptr()) }; if result == 0 { From eb5ac733acd61f7df840ff7f56aa99bdefa35851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 15:07:10 +0100 Subject: [PATCH 17/35] [runtime] cleanup --- runtime/src/lib.rs | 42 ++++++++++++--- runtime/src/tokio/runtime.rs | 102 ++++++++++++++++++++++++----------- runtime/src/utils/thread.rs | 41 +++++++------- 3 files changed, 129 insertions(+), 56 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index f276d56d116..951330fb18e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -184,12 +184,15 @@ stability_scope!(BETA { /// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`. /// Use [`available_cpus`] to query the current thread's allowed CPU ids. /// + /// This only configures the next spawned task. Runtimes that implement CPU pinning + /// perform validation and the pinning attempt when [`Spawner::spawn`] starts that task. + /// /// Implies [`Spawner::dedicated`]. /// /// # Panics /// - /// Panics if CPU pinning is unavailable, if `cpu` is not in the current affinity mask, - /// or if pinning fails. + /// [`Spawner::spawn`] may panic if CPU pinning is unavailable for the runtime, if `cpu` + /// is not in the current affinity mask, or if pinning fails when the task starts. fn pinned(self, cpu: usize) -> Self; /// Spawn a task with the current context. @@ -225,6 +228,12 @@ stability_scope!(BETA { /// /// Child tasks should assume they start from a clean configuration without needing to inspect how their /// parent was configured. + /// + /// # Panics + /// + /// Panics if the runtime cannot start a task configured with [`Spawner::dedicated`], or + /// if a task configured with [`Spawner::pinned`] cannot be pinned when it starts running. + /// These startup panics occur in the caller of [`Spawner::spawn`]. fn spawn(self, f: F) -> Handle where F: FnOnce(Self) -> Fut + Send + 'static, @@ -3569,7 +3578,6 @@ mod tests { } #[test] - #[should_panic(expected = "cpu pinning is not available in deterministic runtime")] fn test_deterministic_spawn_pinned() { let executor = deterministic::Runner::default(); test_spawn_pinned(executor); @@ -4003,9 +4011,9 @@ mod tests { #[cfg(target_os = "linux")] #[test] - #[should_panic(expected = "not in the current affinity mask")] + #[should_panic(expected = "failed to pin task to cpu")] fn test_tokio_spawn_pinned_invalid_cpu() { - // Pinning to a CPU outside the current affinity mask panics eagerly. + // Pinning to a CPU outside the current affinity mask panics when the task starts. let invalid_cpu = crate::available_cpus() .into_iter() .last() @@ -4017,9 +4025,31 @@ mod tests { }); } + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_invalid_cpu_cleans_metrics() { + let invalid_cpu = crate::available_cpus() + .into_iter() + .last() + .map(|cpu| cpu + 1) + .unwrap(); + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let context = context.with_label("pinned_invalid_cpu"); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + context.clone().pinned(invalid_cpu).spawn(|_| async {}); + })); + assert!(result.is_err(), "expected invalid pinned cpu to panic"); + + while utils::count_running_tasks(&context, "pinned_invalid_cpu") > 0 { + context.sleep(Duration::from_millis(10)).await; + } + }); + } + #[cfg(not(target_os = "linux"))] #[test] - #[should_panic(expected = "cpu pinning is not available")] + #[should_panic(expected = "cpu pinning is not available on this platform")] fn test_tokio_spawn_pinned_unavailable() { let executor = tokio::Runner::default(); test_spawn_pinned(executor); diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 589e1eca0c3..ced7d6c105b 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -43,7 +43,7 @@ use std::{ net::{IpAddr, SocketAddr}, num::NonZeroUsize, path::PathBuf, - sync::Arc, + sync::{mpsc, Arc}, time::{Duration, SystemTime}, }; use tokio::runtime::{Builder, Runtime}; @@ -574,15 +574,6 @@ impl crate::Spawner for Context { } fn pinned(mut self, cpu: usize) -> Self { - let cpus = utils::thread::available_cpus(); - assert!( - !cpus.is_empty(), - "cpu pinning is not available on this platform or the affinity mask could not be queried" - ); - assert!( - cpus.contains(&cpu), - "cpu {cpu} not in the current affinity mask" - ); self.execution = Execution::Dedicated(Some(cpu)); self } @@ -603,7 +594,7 @@ impl crate::Spawner for Context { // Track supervision before resetting configuration let parent = Arc::clone(&self.tree); - let past = self.execution; + let execution = self.execution; let traced = self.traced; self.execution = Execution::default(); self.traced = false; @@ -631,28 +622,20 @@ impl crate::Spawner for Context { Arc::clone(&parent), ); - if let Execution::Dedicated(cpu) = past { - utils::thread::spawn(executor.thread_stack_size, { - // Ensure the task can access the tokio runtime - let handle = executor.runtime.handle().clone(); - move || { - // Pin before running any work on this thread. - if let Some(cpu) = cpu { - utils::thread::pin_to_cpu(cpu); + match execution { + Execution::Dedicated(cpu) => spawn_dedicated(&executor, &handle, cpu, f), + Execution::Shared(true) => { + executor.runtime.spawn_blocking({ + // Ensure the task can access the tokio runtime + let handle = executor.runtime.handle().clone(); + move || { + handle.block_on(f); } - handle.block_on(f); - } - }); - } else if matches!(past, Execution::Shared(true)) { - executor.runtime.spawn_blocking({ - // Ensure the task can access the tokio runtime - let handle = executor.runtime.handle().clone(); - move || { - handle.block_on(f); - } - }); - } else { - executor.runtime.spawn(f); + }); + } + Execution::Shared(false) => { + executor.runtime.spawn(f); + } } // Register the task on the parent @@ -911,6 +894,61 @@ impl crate::BufferPooler for Context { } } +/// Spawn a task on a dedicated thread, optionally pinning it to `cpu`, and +/// wait for startup to complete before returning. +/// +/// # Panics +/// +/// Panics if the dedicated thread cannot be created, if pinning to `cpu` fails, +/// or if the startup handshake breaks before the task begins running. +fn spawn_dedicated(executor: &Executor, handle: &Handle, cpu: Option, future: F) +where + T: Send + 'static, + F: Future + Send + 'static, +{ + // Ensure the task can access the tokio runtime. + let runtime_handle = executor.runtime.handle().clone(); + + // The dedicated thread reports whether startup succeeded before it begins + // polling the task future. + let (startup_tx, startup_rx) = mpsc::sync_channel(1); + + if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { + let result = cpu.map_or(Ok(()), utils::thread::pin_to_cpu); + let pinned = result.is_ok(); + startup_tx + .send(result) + .expect("startup receiver dropped unexpectedly"); + if pinned { + runtime_handle.block_on(future); + } + }) { + // Thread startup failed before the task future could be polled, so + // abort the handle to release its metrics and supervision state. + handle.abort(); + panic!("failed to spawn thread: {err}"); + } + + // Wait synchronously so dedicated-thread creation and CPU pinning failures + // are surfaced to the spawn caller rather than the spawned task. + match startup_rx.recv() { + Ok(Ok(())) => {} + Ok(Err(err)) => { + // Pinning failed before the task future started running. + handle.abort(); + match cpu { + Some(cpu) => panic!("failed to pin task to cpu {cpu}: {err}"), + None => unreachable!(), + } + } + Err(_) => { + // The dedicated thread exited before reporting startup status. + handle.abort(); + panic!("failed to start dedicated task"); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 0cf705308c4..ca5ec111155 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -100,16 +100,27 @@ const fn system_thread_stack_size_impl() -> Option { None } -/// Spawns a thread with an explicit stack size. +/// Attempts to spawn a thread with an explicit stack size. +pub(crate) fn try_spawn(stack_size: usize, f: F) -> std::io::Result> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + thread::Builder::new().stack_size(stack_size).spawn(f) +} + +/// Spawns a thread with an explicit stack size, panicking if thread creation fails. +/// +/// # Panics +/// +/// Panics if the thread cannot be created. +#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))] pub(crate) fn spawn(stack_size: usize, f: F) -> thread::JoinHandle where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - thread::Builder::new() - .stack_size(stack_size) - .spawn(f) - .expect("failed to spawn thread") + try_spawn(stack_size, f).expect("failed to spawn thread") } #[cfg(target_os = "linux")] @@ -180,12 +191,8 @@ pub fn available_cpus() -> Vec { } /// Pins the current thread to the given logical CPU id. -/// -/// # Panics -/// -/// Panics if `sched_setaffinity` fails. #[cfg(target_os = "linux")] -pub(crate) fn pin_to_cpu(cpu: usize) { +pub(crate) fn pin_to_cpu(cpu: usize) -> Result<(), std::io::Error> { let word_bits = libc::c_ulong::BITS as usize; let words = (cpu / word_bits) .checked_add(1) @@ -200,25 +207,23 @@ pub(crate) fn pin_to_cpu(cpu: usize) { let result = unsafe { libc::syscall(libc::SYS_sched_setaffinity, 0, cpusetsize, mask.as_ptr()) }; if result == 0 { - return; + return Ok(()); } let err = std::io::Error::last_os_error(); match err.raw_os_error() { Some(libc::EINTR) => continue, - _ => panic!("sched_setaffinity failed for cpu {cpu}: {err}"), + _ => return Err(err), } } } /// Pins the current thread to the given logical CPU id. -/// -/// # Panics -/// -/// Always panics on non-Linux platforms, where CPU pinning is unavailable. #[cfg(not(target_os = "linux"))] -pub(crate) fn pin_to_cpu(_cpu: usize) { - panic!("cpu pinning is not available on this platform") +pub(crate) fn pin_to_cpu(_cpu: usize) -> Result<(), std::io::Error> { + Err(std::io::Error::other( + "cpu pinning is not available on this platform", + )) } #[cfg(test)] From 2ee66b05b0b13532f971fe8c15398412cef309dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 15:24:19 +0100 Subject: [PATCH 18/35] [runtime] fix large cpu oom --- runtime/src/lib.rs | 13 ++++++++++++- runtime/src/utils/mod.rs | 4 ++-- runtime/src/utils/thread.rs | 13 ++++++++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 951330fb18e..cbdc76e10f1 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -224,7 +224,8 @@ stability_scope!(BETA { /// # Spawn Configuration /// /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via - /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset. + /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`], [`Spawner::pinned`], or + /// [`Spawner::shared`] is reset. /// /// Child tasks should assume they start from a clean configuration without needing to inspect how their /// parent was configured. @@ -4025,6 +4026,16 @@ mod tests { }); } + #[cfg(target_os = "linux")] + #[test] + #[should_panic(expected = "failed to pin task to cpu")] + fn test_tokio_spawn_pinned_huge_cpu() { + let executor = tokio::Runner::default(); + executor.start(|context| async move { + context.pinned(usize::MAX).spawn(|_| async {}); + }); + } + #[cfg(target_os = "linux")] #[test] fn test_tokio_spawn_pinned_invalid_cpu_cleans_metrics() { diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 7236f1c37a8..ea75371581f 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -33,8 +33,8 @@ pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { - /// Task runs on a dedicated thread, optionally pinned to a logical CPU id. - /// CPU pinning is currently Linux only and a no-op on other platforms. + /// Task runs on a dedicated thread, optionally with a logical CPU id + /// requested for pinning. Dedicated(Option), /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index ca5ec111155..7b5683dd11d 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -7,6 +7,13 @@ use std::{env, sync::OnceLock, thread}; /// See . const RUST_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024; +/// Upper bound for affinity-mask probing and construction. +/// +/// This keeps bogus CPU ids from forcing arbitrarily large allocations while +/// still leaving ample room beyond realistic machine sizes. +#[cfg(target_os = "linux")] +const MAX_AFFINITY_CPUS: usize = 1 << 20; + /// Returns the value of the `RUST_MIN_STACK` environment variable, if set. fn rust_min_stack() -> Option { env::var_os("RUST_MIN_STACK").and_then(|s| s.to_str().and_then(|s| s.parse().ok())) @@ -151,7 +158,7 @@ fn affinity_mask() -> Option<(Vec, usize)> { Some(libc::EINVAL) => { // Kernels with larger affinity masks require probing with a larger buffer. words = words.checked_mul(2)?; - if words.checked_mul(word_bits)? > 1 << 20 { + if words.checked_mul(word_bits)? > MAX_AFFINITY_CPUS { return None; } } @@ -193,6 +200,10 @@ pub fn available_cpus() -> Vec { /// Pins the current thread to the given logical CPU id. #[cfg(target_os = "linux")] pub(crate) fn pin_to_cpu(cpu: usize) -> Result<(), std::io::Error> { + if cpu >= MAX_AFFINITY_CPUS { + return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); + } + let word_bits = libc::c_ulong::BITS as usize; let words = (cpu / word_bits) .checked_add(1) From 1d33fe6db2771e08c4cae1a1fe2cb180849d5baa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 15:37:41 +0100 Subject: [PATCH 19/35] [runtime] cleanup available_cpus --- runtime/src/utils/thread.rs | 53 +++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 7b5683dd11d..0e6a42504fe 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -130,13 +130,21 @@ where try_spawn(stack_size, f).expect("failed to spawn thread") } +/// Returns the logical CPU ids currently allowed for the calling thread. +/// +/// This queries the calling thread's affinity mask via `sched_getaffinity` and +/// returns an empty vector if that query fails. #[cfg(target_os = "linux")] -fn affinity_mask() -> Option<(Vec, usize)> { +pub fn available_cpus() -> Vec { let word_bits = libc::c_ulong::BITS as usize; let mut words = 1usize; - loop { + + // Probe `sched_getaffinity` with an exponentially growing buffer until the + // kernel either accepts it or reports a non-retryable error. + let (mask, bytes) = loop { let mut mask = vec![0 as libc::c_ulong; words]; let cpusetsize = std::mem::size_of_val(mask.as_slice()); + // SAFETY: `mask` points to writable storage for `cpusetsize` bytes, and // `pid == 0` targets the calling thread as documented by the syscall. let result = unsafe { @@ -147,42 +155,41 @@ fn affinity_mask() -> Option<(Vec, usize)> { mask.as_mut_ptr(), ) }; + if result >= 0 { - let bytes = result as usize; - return Some((mask, bytes)); + break (mask, result as usize); } let err = std::io::Error::last_os_error(); match err.raw_os_error() { Some(libc::EINTR) => continue, Some(libc::EINVAL) => { - // Kernels with larger affinity masks require probing with a larger buffer. - words = words.checked_mul(2)?; - if words.checked_mul(word_bits)? > MAX_AFFINITY_CPUS { - return None; + // Kernels with larger affinity masks require probing with a + // larger buffer. Cap the probe size so invalid environments + // cannot force unbounded growth. + words = match words.checked_mul(2) { + Some(words) => words, + None => return Vec::new(), + }; + if words + .checked_mul(word_bits) + .is_none_or(|bits| bits > MAX_AFFINITY_CPUS) + { + return Vec::new(); } } - _ => return None, + _ => return Vec::new(), } - } -} - -/// Returns the logical CPU ids currently allowed for the calling thread. -/// -/// On Linux this queries the calling thread's affinity mask via `sched_getaffinity`. -/// On other platforms, or if the affinity mask cannot be queried, it returns an -/// empty vector. -#[cfg(target_os = "linux")] -pub fn available_cpus() -> Vec { - let Some((mask, bytes)) = affinity_mask() else { - return Vec::new(); }; - let word_bits = libc::c_ulong::BITS as usize; + let mut cpus = Vec::new(); + + // `sched_getaffinity` reports how many bytes of the mask are meaningful. + // Walk that returned bitset and collect the enabled logical CPU ids. for cpu in 0..(bytes * 8) { let index = cpu / word_bits; let offset = cpu % word_bits; - if index < mask.len() && (mask[index] & ((1 as libc::c_ulong) << offset)) != 0 { + if (mask[index] & ((1 as libc::c_ulong) << offset)) != 0 { cpus.push(cpu); } } From d6bdfdfb8f71ade2a8c444d19abc2351124c473a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 16:15:00 +0100 Subject: [PATCH 20/35] [runtime] cleanup --- runtime/src/deterministic.rs | 5 ++ runtime/src/lib.rs | 127 +++++++++++++++++++++++++++++------ runtime/src/tokio/runtime.rs | 26 +++++-- runtime/src/utils/cell.rs | 5 ++ runtime/src/utils/mod.rs | 2 - runtime/src/utils/thread.rs | 80 +++++++++++++++------- 6 files changed, 197 insertions(+), 48 deletions(-) diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 35c8cd17e62..367210f90cf 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -71,6 +71,7 @@ use commonware_utils::{ hex, sync::{Mutex, RwLock}, time::SYSTEM_TIME_PRECISION, + vec::NonEmptyVec, SystemTimeExt, }; #[cfg(feature = "external")] @@ -1133,6 +1134,10 @@ impl Context { } impl crate::Spawner for Context { + fn available_cpus(&self) -> Option> { + None + } + fn dedicated(mut self) -> Self { self.execution = Execution::Dedicated(None); self diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index cbdc76e10f1..65fe08dce43 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -46,6 +46,7 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) { stability_scope!(BETA { use commonware_macros::select; use commonware_parallel::{Rayon, ThreadPool}; + use commonware_utils::vec::NonEmptyVec; use iobuf::PoolError; use prometheus_client::registry::Metric; use rayon::ThreadPoolBuildError; @@ -161,6 +162,11 @@ stability_scope!(BETA { /// Interface that any task scheduler must implement to spawn tasks. pub trait Spawner: Clone + Send + Sync + 'static { + /// Return the logical CPU ids this runtime makes available for [`Spawner::pinned`] placements. + /// + /// Returns `None` if the runtime does not support CPU pinning. + fn available_cpus(&self) -> Option>; + /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor. /// /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation. @@ -182,7 +188,8 @@ stability_scope!(BETA { /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. /// /// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`. - /// Use [`available_cpus`] to query the current thread's allowed CPU ids. + /// Use [`Spawner::available_cpus`] to query the runtime's CPU ids for placement + /// decisions rather than the current thread's live affinity mask. /// /// This only configures the next spawned task. Runtimes that implement CPU pinning /// perform validation and the pinning attempt when [`Spawner::spawn`] starts that task. @@ -191,8 +198,8 @@ stability_scope!(BETA { /// /// # Panics /// - /// [`Spawner::spawn`] may panic if CPU pinning is unavailable for the runtime, if `cpu` - /// is not in the current affinity mask, or if pinning fails when the task starts. + /// [`Spawner::spawn`] may panic if CPU pinning is unavailable for the runtime or if the + /// operating system rejects `cpu` when the task starts. fn pinned(self, cpu: usize) -> Self; /// Spawn a task with the current context. @@ -1757,7 +1764,10 @@ mod tests { R::Context: Spawner, { runner.start(|context| async move { - let cpu = crate::available_cpus().into_iter().next().unwrap_or(0); + let cpu = context + .available_cpus() + .map(|cpus| *cpus.first()) + .unwrap_or(0); let handle = context.pinned(cpu).spawn(|_| async move { 42 }); assert!(matches!(handle.await, Ok(42))); }); @@ -3946,7 +3956,7 @@ mod tests { // Verify that pinned implies dedicated. let executor = tokio::Runner::default(); executor.start(|context| async move { - let cpu = crate::available_cpus().into_iter().next().unwrap_or(0); + let cpu = *context.available_cpus().unwrap().first(); let root_thread = std::thread::current().id(); let task_thread = context .pinned(cpu) @@ -3963,10 +3973,9 @@ mod tests { fn test_tokio_spawn_pinned_correct_cpu() { // Verify that a pinned task is actually running on the expected CPU, // for every allowed CPU id. - let cpus = crate::available_cpus(); let executor = tokio::Runner::default(); executor.start(|context| async move { - for cpu in cpus { + for cpu in context.available_cpus().unwrap() { let actual = context .clone() .pinned(cpu) @@ -3987,9 +3996,9 @@ mod tests { fn test_tokio_spawn_pinned_same_cpu() { // Verify that two separate tasks pinned to the same CPU run on // different threads but report the same CPU. - let cpu = crate::available_cpus().into_iter().last().unwrap(); let executor = tokio::Runner::default(); executor.start(|context| async move { + let cpu = *context.available_cpus().unwrap().last(); let t1 = context.clone().pinned(cpu).spawn(|_| async move { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. @@ -4010,18 +4019,94 @@ mod tests { }); } + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_available_cpus_stable_inside_pinned_task() { + // `Spawner::available_cpus` should keep reporting the runtime's + // placement set even when the current task narrows its own thread + // affinity with `.pinned(...)`. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let available = context.available_cpus().unwrap(); + let cpu = *available.first(); + + context + .pinned(cpu) + .spawn(move |context| { + let available = available.clone(); + async move { + // The public runtime API stays stable, while the + // thread-local helper reflects the live pinned mask. + assert_eq!(context.available_cpus(), Some(available.clone())); + assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu)),); + } + }) + .await + .unwrap(); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_dedicated_restores_baseline_affinity() { + // An unpinned dedicated child spawned from within pinned tasks should + // reset back to the runtime's placement set instead of inheriting the + // parent thread's narrowed affinity mask. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let available = context.available_cpus().unwrap(); + if available.len().get() < 2 { + return; + } + + let outer_cpu = available[0]; + let inner_cpu = available[1]; + context + .pinned(outer_cpu) + .spawn(move |context| { + let available = available.clone(); + async move { + context + .pinned(inner_cpu) + .spawn(move |context| { + let available = available.clone(); + async move { + assert_eq!( + utils::thread::available_cpus(), + Some(NonEmptyVec::new(inner_cpu)), + ); + // The dedicated child is unpinned, so it + // should restore the runtime baseline mask. + let dedicated_allowed = context + .dedicated() + .spawn(|_| async move { utils::thread::available_cpus() }) + .await + .unwrap(); + assert_eq!(dedicated_allowed, Some(available.clone())); + } + }) + .await + .unwrap() + } + }) + .await + .unwrap(); + }); + } + #[cfg(target_os = "linux")] #[test] #[should_panic(expected = "failed to pin task to cpu")] fn test_tokio_spawn_pinned_invalid_cpu() { - // Pinning to a CPU outside the current affinity mask panics when the task starts. - let invalid_cpu = crate::available_cpus() - .into_iter() - .last() - .map(|cpu| cpu + 1) - .unwrap(); + // Pinning to a CPU outside the runtime baseline CPU set panics when the task starts. let executor = tokio::Runner::default(); executor.start(|context| async move { + let invalid_cpu = context + .available_cpus() + .as_ref() + .map(|cpus| *cpus.last()) + .map(|cpu| cpu + 1); + let invalid_cpu = invalid_cpu.unwrap(); context.pinned(invalid_cpu).spawn(|_| async {}); }); } @@ -4039,13 +4124,17 @@ mod tests { #[cfg(target_os = "linux")] #[test] fn test_tokio_spawn_pinned_invalid_cpu_cleans_metrics() { - let invalid_cpu = crate::available_cpus() - .into_iter() - .last() - .map(|cpu| cpu + 1) - .unwrap(); + // A pinning failure happens during spawn-time startup, before the task + // future is polled. That failure must still clean up the task's + // metrics and supervision state. let executor = tokio::Runner::default(); executor.start(|context| async move { + let invalid_cpu = context + .available_cpus() + .as_ref() + .map(|cpus| *cpus.last()) + .map(|cpu| cpu + 1); + let invalid_cpu = invalid_cpu.unwrap(); let context = context.with_label("pinned_invalid_cpu"); let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { context.clone().pinned(invalid_cpu).spawn(|_| async {}); diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index ced7d6c105b..2e3948731ce 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -26,7 +26,7 @@ use crate::{ use commonware_macros::{select, stability}; #[stability(BETA)] use commonware_parallel::ThreadPool; -use commonware_utils::{sync::Mutex, NZUsize}; +use commonware_utils::{sync::Mutex, vec::NonEmptyVec, NZUsize}; use futures::future::Either; use governor::clock::{Clock as GClock, ReasonablyRealtime}; use prometheus_client::{ @@ -335,6 +335,7 @@ pub struct Executor { shutdown: Mutex, panicker: Panicker, thread_stack_size: usize, + available_cpus: Option>, } /// Implementation of [crate::Runner] for the `tokio` runtime. @@ -480,6 +481,7 @@ impl crate::Runner for Runner { shutdown: Mutex::new(Stopper::default()), panicker, thread_stack_size: self.cfg.thread_stack_size, + available_cpus: utils::thread::available_cpus(), }); // Get metrics @@ -573,6 +575,10 @@ impl crate::Spawner for Context { self } + fn available_cpus(&self) -> Option> { + self.executor.available_cpus.clone() + } + fn pinned(mut self, cpu: usize) -> Self { self.execution = Execution::Dedicated(Some(cpu)); self @@ -897,6 +903,10 @@ impl crate::BufferPooler for Context { /// Spawn a task on a dedicated thread, optionally pinning it to `cpu`, and /// wait for startup to complete before returning. /// +/// When `cpu` is `None`, the dedicated thread restores the runtime's baseline +/// affinity mask before polling the task so it does not inherit a parent +/// task's temporary CPU pin. +/// /// # Panics /// /// Panics if the dedicated thread cannot be created, if pinning to `cpu` fails, @@ -908,18 +918,26 @@ where { // Ensure the task can access the tokio runtime. let runtime_handle = executor.runtime.handle().clone(); + let available_cpus = executor.available_cpus.clone(); // The dedicated thread reports whether startup succeeded before it begins // polling the task future. let (startup_tx, startup_rx) = mpsc::sync_channel(1); if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { - let result = cpu.map_or(Ok(()), utils::thread::pin_to_cpu); - let pinned = result.is_ok(); + let result = cpu.map_or_else( + || { + available_cpus.as_ref().map_or(Ok(()), |available_cpus| { + utils::thread::set_cpu_affinity(available_cpus) + }) + }, + |cpu| utils::thread::set_cpu_affinity(&[cpu]), + ); + let started = result.is_ok(); startup_tx .send(result) .expect("startup receiver dropped unexpectedly"); - if pinned { + if started { runtime_handle.block_on(future); } }) { diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index a8a7ce07891..5137071741b 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -1,4 +1,5 @@ use crate::{signal, Error, Handle}; +use commonware_utils::vec::NonEmptyVec; use governor::clock::{Clock as GClock, ReasonablyRealtime}; use prometheus_client::registry::Metric; use rand::{CryptoRng, RngCore}; @@ -106,6 +107,10 @@ impl crate::Spawner for Cell where C: crate::Spawner, { + fn available_cpus(&self) -> Option> { + self.as_present().available_cpus() + } + fn dedicated(self) -> Self { Self::Present(self.into_present().dedicated()) } diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index ea75371581f..801840c913f 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -16,8 +16,6 @@ commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; -#[cfg(not(target_arch = "wasm32"))] -pub use thread::available_cpus; mod handle; pub use handle::Handle; diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 0e6a42504fe..7d48211ee30 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,5 +1,6 @@ //! Helpers for resolving the configured thread stack size. +use commonware_utils::vec::NonEmptyVec; use std::{env, sync::OnceLock, thread}; /// Rust's default thread stack size. @@ -133,9 +134,9 @@ where /// Returns the logical CPU ids currently allowed for the calling thread. /// /// This queries the calling thread's affinity mask via `sched_getaffinity` and -/// returns an empty vector if that query fails. +/// returns `None` if that query fails. #[cfg(target_os = "linux")] -pub fn available_cpus() -> Vec { +pub(crate) fn available_cpus() -> Option> { let word_bits = libc::c_ulong::BITS as usize; let mut words = 1usize; @@ -167,18 +168,15 @@ pub fn available_cpus() -> Vec { // Kernels with larger affinity masks require probing with a // larger buffer. Cap the probe size so invalid environments // cannot force unbounded growth. - words = match words.checked_mul(2) { - Some(words) => words, - None => return Vec::new(), - }; + words = words.checked_mul(2)?; if words .checked_mul(word_bits) .is_none_or(|bits| bits > MAX_AFFINITY_CPUS) { - return Vec::new(); + return None; } } - _ => return Vec::new(), + _ => return None, } }; @@ -193,30 +191,35 @@ pub fn available_cpus() -> Vec { cpus.push(cpu); } } - cpus + cpus.try_into().ok() } /// Returns the logical CPU ids currently allowed for the calling thread. /// -/// Always returns an empty vector on non-Linux platforms. +/// Always returns `None` on non-Linux platforms. #[cfg(not(target_os = "linux"))] -pub fn available_cpus() -> Vec { - Vec::new() +pub(crate) fn available_cpus() -> Option> { + None } -/// Pins the current thread to the given logical CPU id. +/// Sets the current thread's affinity mask to the given logical CPU ids. #[cfg(target_os = "linux")] -pub(crate) fn pin_to_cpu(cpu: usize) -> Result<(), std::io::Error> { - if cpu >= MAX_AFFINITY_CPUS { +pub(crate) fn set_cpu_affinity(cpus: &[usize]) -> Result<(), std::io::Error> { + let Some(max_cpu) = cpus.iter().copied().max() else { + return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); + }; + if max_cpu >= MAX_AFFINITY_CPUS { return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); } let word_bits = libc::c_ulong::BITS as usize; - let words = (cpu / word_bits) + let words = (max_cpu / word_bits) .checked_add(1) .expect("cpu bitset size overflow"); let mut mask = vec![0 as libc::c_ulong; words]; - mask[cpu / word_bits] |= (1 as libc::c_ulong) << (cpu % word_bits); + for &cpu in cpus { + mask[cpu / word_bits] |= (1 as libc::c_ulong) << (cpu % word_bits); + } let cpusetsize = std::mem::size_of_val(mask.as_slice()); loop { @@ -236,9 +239,8 @@ pub(crate) fn pin_to_cpu(cpu: usize) -> Result<(), std::io::Error> { } } -/// Pins the current thread to the given logical CPU id. #[cfg(not(target_os = "linux"))] -pub(crate) fn pin_to_cpu(_cpu: usize) -> Result<(), std::io::Error> { +pub(crate) fn set_cpu_affinity(_cpus: &[usize]) -> Result<(), std::io::Error> { Err(std::io::Error::other( "cpu pinning is not available on this platform", )) @@ -250,14 +252,46 @@ mod tests { #[cfg(target_os = "linux")] #[test] - fn test_available_cpus() { - let cpus = available_cpus(); - assert!(!cpus.is_empty(), "expected at least one available CPU"); + fn test_available_cpus_linux() { + assert!( + available_cpus().is_some(), + "expected at least one available CPU" + ); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_set_cpu_affinity_linux() { + std::thread::spawn(|| { + let cpus = available_cpus().unwrap(); + + let cpu = *cpus.first(); + set_cpu_affinity(&[cpu]).unwrap(); + assert_eq!(available_cpus().unwrap().into_vec(), vec![cpu]); + + let invalid_cpu = (0..=MAX_AFFINITY_CPUS) + .find(|candidate| !cpus.contains(candidate)) + .unwrap(); + assert!( + set_cpu_affinity(&[invalid_cpu]).is_err(), + "expected pinning to a disallowed CPU to fail", + ); + }) + .join() + .unwrap(); } #[cfg(not(target_os = "linux"))] #[test] fn test_available_cpus_non_linux() { - assert!(available_cpus().is_empty()); + assert!(available_cpus().is_none()); + } + + #[cfg(not(target_os = "linux"))] + #[test] + fn test_set_cpu_affinity_non_linux() { + for cpus in [vec![0], vec![1], vec![usize::MAX]] { + assert!(set_cpu_affinity(&cpus).is_err()); + } } } From 6ef8a13f3ed513df2bfaeade0932a3a25f801767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 16:18:06 +0100 Subject: [PATCH 21/35] [runtime] cleanup --- runtime/src/utils/thread.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 7d48211ee30..fa8ac5a8157 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -138,7 +138,7 @@ where #[cfg(target_os = "linux")] pub(crate) fn available_cpus() -> Option> { let word_bits = libc::c_ulong::BITS as usize; - let mut words = 1usize; + let mut words = 1; // Probe `sched_getaffinity` with an exponentially growing buffer until the // kernel either accepts it or reports a non-retryable error. @@ -169,12 +169,9 @@ pub(crate) fn available_cpus() -> Option> { // larger buffer. Cap the probe size so invalid environments // cannot force unbounded growth. words = words.checked_mul(2)?; - if words + words .checked_mul(word_bits) - .is_none_or(|bits| bits > MAX_AFFINITY_CPUS) - { - return None; - } + .map(|bits| bits > MAX_AFFINITY_CPUS)?; } _ => return None, } From 380231c469b33656ee085bb06ea57d82ffca3ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 16:34:12 +0100 Subject: [PATCH 22/35] [runtime] cleanup --- runtime/src/utils/thread.rs | 38 +++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index fa8ac5a8157..a057bb98fd9 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -171,7 +171,7 @@ pub(crate) fn available_cpus() -> Option> { words = words.checked_mul(2)?; words .checked_mul(word_bits) - .map(|bits| bits > MAX_AFFINITY_CPUS)?; + .filter(|bits| *bits <= MAX_AFFINITY_CPUS)?; } _ => return None, } @@ -200,12 +200,15 @@ pub(crate) fn available_cpus() -> Option> { } /// Sets the current thread's affinity mask to the given logical CPU ids. +/// +/// Returns an error if `cpus` is empty, contains an unreasonably large CPU id, +/// or if the operating system rejects the affinity change. #[cfg(target_os = "linux")] pub(crate) fn set_cpu_affinity(cpus: &[usize]) -> Result<(), std::io::Error> { - let Some(max_cpu) = cpus.iter().copied().max() else { + let Some(max_cpu) = cpus.iter().max() else { return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); }; - if max_cpu >= MAX_AFFINITY_CPUS { + if *max_cpu >= MAX_AFFINITY_CPUS { return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); } @@ -236,16 +239,21 @@ pub(crate) fn set_cpu_affinity(cpus: &[usize]) -> Result<(), std::io::Error> { } } +/// Sets the current thread's affinity mask to the given logical CPU ids. +/// +/// Always returns an error since setting CPU affinity is not available on this platform. #[cfg(not(target_os = "linux"))] pub(crate) fn set_cpu_affinity(_cpus: &[usize]) -> Result<(), std::io::Error> { Err(std::io::Error::other( - "cpu pinning is not available on this platform", + "cpu affinity is not available on this platform", )) } #[cfg(test)] mod tests { use super::*; + #[cfg(target_os = "linux")] + use commonware_utils::vec::NonEmptyVec; #[cfg(target_os = "linux")] #[test] @@ -261,18 +269,22 @@ mod tests { fn test_set_cpu_affinity_linux() { std::thread::spawn(|| { let cpus = available_cpus().unwrap(); - let cpu = *cpus.first(); - set_cpu_affinity(&[cpu]).unwrap(); - assert_eq!(available_cpus().unwrap().into_vec(), vec![cpu]); + let pinned = NonEmptyVec::new(cpu); + set_cpu_affinity(&pinned).unwrap(); + assert_eq!(available_cpus(), Some(pinned)); - let invalid_cpu = (0..=MAX_AFFINITY_CPUS) - .find(|candidate| !cpus.contains(candidate)) - .unwrap(); + let invalid_cpu = cpus.last().checked_add(1).unwrap(); assert!( set_cpu_affinity(&[invalid_cpu]).is_err(), - "expected pinning to a disallowed CPU to fail", + "expected affinity to a disallowed CPU to fail", ); + + let err = set_cpu_affinity(&[]).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(libc::EINVAL)); + + let err = set_cpu_affinity(&[MAX_AFFINITY_CPUS]).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(libc::EINVAL)); }) .join() .unwrap(); @@ -287,8 +299,6 @@ mod tests { #[cfg(not(target_os = "linux"))] #[test] fn test_set_cpu_affinity_non_linux() { - for cpus in [vec![0], vec![1], vec![usize::MAX]] { - assert!(set_cpu_affinity(&cpus).is_err()); - } + assert!(set_cpu_affinity(&[0]).is_err()); } } From 61be5899cdb0475dcd6e110c322a990ccecfa9da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:00:12 +0100 Subject: [PATCH 23/35] [runtime] cleanup tests --- runtime/src/lib.rs | 152 +++++++++++++++------------------------ runtime/src/utils/mod.rs | 2 +- 2 files changed, 59 insertions(+), 95 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 65fe08dce43..278dc2621a7 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -183,16 +183,18 @@ stability_scope!(BETA { /// shared executor. /// /// This is not the default behavior. See [`Spawner::shared`] for more information. + /// + /// # Panics + /// + /// [`Spawner::spawn`] may panic if the runtime cannot start the dedicated thread for that task. fn dedicated(self) -> Self; /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. /// - /// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`. - /// Use [`Spawner::available_cpus`] to query the runtime's CPU ids for placement - /// decisions rather than the current thread's live affinity mask. + /// Use [`Spawner::available_cpus`] to query the runtime's CPU ids for placement decisions. /// - /// This only configures the next spawned task. Runtimes that implement CPU pinning - /// perform validation and the pinning attempt when [`Spawner::spawn`] starts that task. + /// Runtimes that implement CPU pinning perform validation and the pinning attempt when + /// [`Spawner::spawn`] starts that task. /// /// Implies [`Spawner::dedicated`]. /// @@ -239,8 +241,8 @@ stability_scope!(BETA { /// /// # Panics /// - /// Panics if the runtime cannot start a task configured with [`Spawner::dedicated`], or - /// if a task configured with [`Spawner::pinned`] cannot be pinned when it starts running. + /// Panics if the runtime cannot start a task configured with [`Spawner::dedicated`], or if + /// a task configured with [`Spawner::pinned`] cannot be pinned when it starts running. /// These startup panics occur in the caller of [`Spawner::spawn`]. fn spawn(self, f: F) -> Handle where @@ -1763,12 +1765,12 @@ mod tests { where R::Context: Spawner, { - runner.start(|context| async move { + runner.start(|context| async { let cpu = context .available_cpus() .map(|cpus| *cpus.first()) .unwrap_or(0); - let handle = context.pinned(cpu).spawn(|_| async move { 42 }); + let handle = context.pinned(cpu).spawn(|_| async { 42 }); assert!(matches!(handle.await, Ok(42))); }); } @@ -3950,17 +3952,25 @@ mod tests { test_spawn_pinned(executor); } + #[cfg(not(target_os = "linux"))] + #[test] + #[should_panic(expected = "cpu pinning is not available on this platform")] + fn test_tokio_spawn_pinned() { + let executor = tokio::Runner::default(); + test_spawn_pinned(executor); + } + #[cfg(target_os = "linux")] #[test] fn test_tokio_spawn_pinned_dedicated_thread() { // Verify that pinned implies dedicated. let executor = tokio::Runner::default(); - executor.start(|context| async move { + executor.start(|context| async { let cpu = *context.available_cpus().unwrap().first(); let root_thread = std::thread::current().id(); let task_thread = context .pinned(cpu) - .spawn(|_| async move { std::thread::current().id() }) + .spawn(|_| async { std::thread::current().id() }) .await .unwrap(); // The task should run on a different thread than the root thread. @@ -3979,7 +3989,7 @@ mod tests { let actual = context .clone() .pinned(cpu) - .spawn(|_| async move { + .spawn(|_| async { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. unsafe { libc::sched_getcpu() as usize } @@ -3999,12 +4009,12 @@ mod tests { let executor = tokio::Runner::default(); executor.start(|context| async move { let cpu = *context.available_cpus().unwrap().last(); - let t1 = context.clone().pinned(cpu).spawn(|_| async move { + let t1 = context.clone().pinned(cpu).spawn(|_| async { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) }); - let t2 = context.clone().pinned(cpu).spawn(|_| async move { + let t2 = context.clone().pinned(cpu).spawn(|_| async { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. (std::thread::current().id(), unsafe { libc::sched_getcpu() }) @@ -4026,20 +4036,17 @@ mod tests { // placement set even when the current task narrows its own thread // affinity with `.pinned(...)`. let executor = tokio::Runner::default(); - executor.start(|context| async move { + executor.start(|context| async { let available = context.available_cpus().unwrap(); let cpu = *available.first(); context .pinned(cpu) - .spawn(move |context| { - let available = available.clone(); - async move { - // The public runtime API stays stable, while the - // thread-local helper reflects the live pinned mask. - assert_eq!(context.available_cpus(), Some(available.clone())); - assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu)),); - } + .spawn(move |context| async move { + // The public runtime API stays stable, while the + // thread-local helper reflects the live pinned mask. + assert_eq!(context.available_cpus(), Some(available)); + assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu))); }) .await .unwrap(); @@ -4053,41 +4060,30 @@ mod tests { // reset back to the runtime's placement set instead of inheriting the // parent thread's narrowed affinity mask. let executor = tokio::Runner::default(); - executor.start(|context| async move { + executor.start(|context| async { let available = context.available_cpus().unwrap(); if available.len().get() < 2 { return; } - let outer_cpu = available[0]; - let inner_cpu = available[1]; + let pinned_cpu = available[0]; context - .pinned(outer_cpu) - .spawn(move |context| { - let available = available.clone(); - async move { - context - .pinned(inner_cpu) - .spawn(move |context| { - let available = available.clone(); - async move { - assert_eq!( - utils::thread::available_cpus(), - Some(NonEmptyVec::new(inner_cpu)), - ); - // The dedicated child is unpinned, so it - // should restore the runtime baseline mask. - let dedicated_allowed = context - .dedicated() - .spawn(|_| async move { utils::thread::available_cpus() }) - .await - .unwrap(); - assert_eq!(dedicated_allowed, Some(available.clone())); - } - }) - .await - .unwrap() - } + .pinned(pinned_cpu) + .spawn(move |context| async move { + assert_eq!( + utils::thread::available_cpus(), + Some(NonEmptyVec::new(pinned_cpu)), + ); + + // The dedicated child is unpinned, so it should restore the + // runtime baseline mask. + context + .dedicated() + .spawn(|_| async { + assert_eq!(utils::thread::available_cpus(), Some(available)); + }) + .await + .unwrap(); }) .await .unwrap(); @@ -4096,50 +4092,26 @@ mod tests { #[cfg(target_os = "linux")] #[test] - #[should_panic(expected = "failed to pin task to cpu")] fn test_tokio_spawn_pinned_invalid_cpu() { - // Pinning to a CPU outside the runtime baseline CPU set panics when the task starts. + // Pinning to a CPU outside the runtime placement set must panic during + // spawn-time startup, before the task future is polled, while still + // cleaning up the task's metrics and supervision state. let executor = tokio::Runner::default(); executor.start(|context| async move { let invalid_cpu = context .available_cpus() .as_ref() .map(|cpus| *cpus.last()) - .map(|cpu| cpu + 1); - let invalid_cpu = invalid_cpu.unwrap(); - context.pinned(invalid_cpu).spawn(|_| async {}); - }); - } - - #[cfg(target_os = "linux")] - #[test] - #[should_panic(expected = "failed to pin task to cpu")] - fn test_tokio_spawn_pinned_huge_cpu() { - let executor = tokio::Runner::default(); - executor.start(|context| async move { - context.pinned(usize::MAX).spawn(|_| async {}); - }); - } + .map(|cpu| cpu + 1) + .unwrap(); - #[cfg(target_os = "linux")] - #[test] - fn test_tokio_spawn_pinned_invalid_cpu_cleans_metrics() { - // A pinning failure happens during spawn-time startup, before the task - // future is polled. That failure must still clean up the task's - // metrics and supervision state. - let executor = tokio::Runner::default(); - executor.start(|context| async move { - let invalid_cpu = context - .available_cpus() - .as_ref() - .map(|cpus| *cpus.last()) - .map(|cpu| cpu + 1); - let invalid_cpu = invalid_cpu.unwrap(); let context = context.with_label("pinned_invalid_cpu"); - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { context.clone().pinned(invalid_cpu).spawn(|_| async {}); - })); - assert!(result.is_err(), "expected invalid pinned cpu to panic"); + })) + .err() + .unwrap(); + assert!(utils::extract_panic_message(&*panic).contains("failed to pin task to cpu")); while utils::count_running_tasks(&context, "pinned_invalid_cpu") > 0 { context.sleep(Duration::from_millis(10)).await; @@ -4147,14 +4119,6 @@ mod tests { }); } - #[cfg(not(target_os = "linux"))] - #[test] - #[should_panic(expected = "cpu pinning is not available on this platform")] - fn test_tokio_spawn_pinned_unavailable() { - let executor = tokio::Runner::default(); - test_spawn_pinned(executor); - } - #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 801840c913f..83a12d65920 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -68,7 +68,7 @@ pub async fn reschedule() { Reschedule { yielded: false }.await } -fn extract_panic_message(err: &(dyn Any + Send)) -> String { +pub(crate) fn extract_panic_message(err: &(dyn Any + Send)) -> String { err.downcast_ref::<&str>().map_or_else( || { err.downcast_ref::() From aff04646e1125ababba0ef37ddc164f1a28f4b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:05:37 +0100 Subject: [PATCH 24/35] [runtime] avoid clone --- runtime/src/deterministic.rs | 2 +- runtime/src/lib.rs | 10 +++++----- runtime/src/tokio/runtime.rs | 4 ++-- runtime/src/utils/cell.rs | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 367210f90cf..7c397a40aa4 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -1134,7 +1134,7 @@ impl Context { } impl crate::Spawner for Context { - fn available_cpus(&self) -> Option> { + fn available_cpus(&self) -> Option<&NonEmptyVec> { None } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 278dc2621a7..4ac2cbb63a3 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -165,7 +165,7 @@ stability_scope!(BETA { /// Return the logical CPU ids this runtime makes available for [`Spawner::pinned`] placements. /// /// Returns `None` if the runtime does not support CPU pinning. - fn available_cpus(&self) -> Option>; + fn available_cpus(&self) -> Option<&NonEmptyVec>; /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor. /// @@ -3985,7 +3985,7 @@ mod tests { // for every allowed CPU id. let executor = tokio::Runner::default(); executor.start(|context| async move { - for cpu in context.available_cpus().unwrap() { + for cpu in context.available_cpus().cloned().unwrap() { let actual = context .clone() .pinned(cpu) @@ -4037,7 +4037,7 @@ mod tests { // affinity with `.pinned(...)`. let executor = tokio::Runner::default(); executor.start(|context| async { - let available = context.available_cpus().unwrap(); + let available = context.available_cpus().cloned().unwrap(); let cpu = *available.first(); context @@ -4045,7 +4045,7 @@ mod tests { .spawn(move |context| async move { // The public runtime API stays stable, while the // thread-local helper reflects the live pinned mask. - assert_eq!(context.available_cpus(), Some(available)); + assert_eq!(context.available_cpus(), Some(&available)); assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu))); }) .await @@ -4061,7 +4061,7 @@ mod tests { // parent thread's narrowed affinity mask. let executor = tokio::Runner::default(); executor.start(|context| async { - let available = context.available_cpus().unwrap(); + let available = context.available_cpus().cloned().unwrap(); if available.len().get() < 2 { return; } diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 2e3948731ce..147a88941d0 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -575,8 +575,8 @@ impl crate::Spawner for Context { self } - fn available_cpus(&self) -> Option> { - self.executor.available_cpus.clone() + fn available_cpus(&self) -> Option<&NonEmptyVec> { + self.executor.available_cpus.as_ref() } fn pinned(mut self, cpu: usize) -> Self { diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 5137071741b..a029d3c14c2 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -107,7 +107,7 @@ impl crate::Spawner for Cell where C: crate::Spawner, { - fn available_cpus(&self) -> Option> { + fn available_cpus(&self) -> Option<&NonEmptyVec> { self.as_present().available_cpus() } From d478b2a58d19f34b331d5e370552d77119a3ed51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:23:40 +0100 Subject: [PATCH 25/35] [runtime] cleanup --- runtime/src/tokio/runtime.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 147a88941d0..79eeb0a2a6b 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -918,26 +918,29 @@ where { // Ensure the task can access the tokio runtime. let runtime_handle = executor.runtime.handle().clone(); - let available_cpus = executor.available_cpus.clone(); + let thread_stack_size = executor.thread_stack_size; + let available_cpus = if cpu.is_none() { + executor.available_cpus.clone() + } else { + None + }; // The dedicated thread reports whether startup succeeded before it begins // polling the task future. let (startup_tx, startup_rx) = mpsc::sync_channel(1); - if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { - let result = cpu.map_or_else( - || { - available_cpus.as_ref().map_or(Ok(()), |available_cpus| { - utils::thread::set_cpu_affinity(available_cpus) - }) - }, - |cpu| utils::thread::set_cpu_affinity(&[cpu]), - ); - let started = result.is_ok(); + if let Err(err) = utils::thread::try_spawn(thread_stack_size, move || { + let cpu = cpu.map(|cpu| [cpu]); + let cpus = cpu + .as_ref() + .map(|cpu| cpu.as_slice()) + .or(available_cpus.as_deref()); + let result = cpus.map_or(Ok(()), utils::thread::set_cpu_affinity); + let is_ok = result.is_ok(); startup_tx .send(result) .expect("startup receiver dropped unexpectedly"); - if started { + if is_ok { runtime_handle.block_on(future); } }) { From 2885b198aece40e37bad0be2fbbffe25a385d6ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:24:21 +0100 Subject: [PATCH 26/35] [runtime] nit --- runtime/src/tokio/runtime.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 79eeb0a2a6b..2e518772cc2 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -570,15 +570,15 @@ impl Context { } impl crate::Spawner for Context { + fn available_cpus(&self) -> Option<&NonEmptyVec> { + self.executor.available_cpus.as_ref() + } + fn dedicated(mut self) -> Self { self.execution = Execution::Dedicated(None); self } - fn available_cpus(&self) -> Option<&NonEmptyVec> { - self.executor.available_cpus.as_ref() - } - fn pinned(mut self, cpu: usize) -> Self { self.execution = Execution::Dedicated(Some(cpu)); self From 6a9261ed99af386323b12e2aac581f3895135e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:26:06 +0100 Subject: [PATCH 27/35] [runtime] nit --- runtime/src/tokio/runtime.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 2e518772cc2..9083e7aafb9 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -918,7 +918,6 @@ where { // Ensure the task can access the tokio runtime. let runtime_handle = executor.runtime.handle().clone(); - let thread_stack_size = executor.thread_stack_size; let available_cpus = if cpu.is_none() { executor.available_cpus.clone() } else { @@ -929,7 +928,7 @@ where // polling the task future. let (startup_tx, startup_rx) = mpsc::sync_channel(1); - if let Err(err) = utils::thread::try_spawn(thread_stack_size, move || { + if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { let cpu = cpu.map(|cpu| [cpu]); let cpus = cpu .as_ref() From 72bc5ea21e3019295dc4c69110e2b9f2d7f7dd81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:27:39 +0100 Subject: [PATCH 28/35] [runtime] fix --- runtime/src/tokio/runtime.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 9083e7aafb9..2a8b8e365ae 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -954,11 +954,11 @@ where match startup_rx.recv() { Ok(Ok(())) => {} Ok(Err(err)) => { - // Pinning failed before the task future started running. + // Affinity setup failed before the task future started running. handle.abort(); match cpu { Some(cpu) => panic!("failed to pin task to cpu {cpu}: {err}"), - None => unreachable!(), + None => panic!("failed to restore dedicated task affinity: {err}"), } } Err(_) => { From 201683daa086ece779dcbd460641b8a1fdcf34a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 17:33:23 +0100 Subject: [PATCH 29/35] [runtime] fix test --- runtime/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4ac2cbb63a3..04f9178bdd9 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -3954,7 +3954,7 @@ mod tests { #[cfg(not(target_os = "linux"))] #[test] - #[should_panic(expected = "cpu pinning is not available on this platform")] + #[should_panic(expected = "failed to pin task to cpu")] fn test_tokio_spawn_pinned() { let executor = tokio::Runner::default(); test_spawn_pinned(executor); From 412967a33edfc116f160b155d38a6588ec6f4a67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 18:43:21 +0100 Subject: [PATCH 30/35] [runtime] clippy --- runtime/src/utils/thread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index a057bb98fd9..76841fe2d79 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -195,7 +195,7 @@ pub(crate) fn available_cpus() -> Option> { /// /// Always returns `None` on non-Linux platforms. #[cfg(not(target_os = "linux"))] -pub(crate) fn available_cpus() -> Option> { +pub(crate) const fn available_cpus() -> Option> { None } From 8df709989e31863e21580c8e7e592b367d676d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 19:26:36 +0100 Subject: [PATCH 31/35] [runtime] remove Spawner::available_cpus --- runtime/src/deterministic.rs | 5 --- runtime/src/lib.rs | 49 ++++++++++++--------------- runtime/src/tokio/runtime.rs | 22 +++--------- runtime/src/utils/cell.rs | 5 --- runtime/src/utils/thread.rs | 65 ++++++++++++++++++++++++++++++------ 5 files changed, 80 insertions(+), 66 deletions(-) diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 7c397a40aa4..35c8cd17e62 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -71,7 +71,6 @@ use commonware_utils::{ hex, sync::{Mutex, RwLock}, time::SYSTEM_TIME_PRECISION, - vec::NonEmptyVec, SystemTimeExt, }; #[cfg(feature = "external")] @@ -1134,10 +1133,6 @@ impl Context { } impl crate::Spawner for Context { - fn available_cpus(&self) -> Option<&NonEmptyVec> { - None - } - fn dedicated(mut self) -> Self { self.execution = Execution::Dedicated(None); self diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 04f9178bdd9..dc0a9b81b14 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -46,7 +46,6 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) { stability_scope!(BETA { use commonware_macros::select; use commonware_parallel::{Rayon, ThreadPool}; - use commonware_utils::vec::NonEmptyVec; use iobuf::PoolError; use prometheus_client::registry::Metric; use rayon::ThreadPoolBuildError; @@ -162,11 +161,6 @@ stability_scope!(BETA { /// Interface that any task scheduler must implement to spawn tasks. pub trait Spawner: Clone + Send + Sync + 'static { - /// Return the logical CPU ids this runtime makes available for [`Spawner::pinned`] placements. - /// - /// Returns `None` if the runtime does not support CPU pinning. - fn available_cpus(&self) -> Option<&NonEmptyVec>; - /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor. /// /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation. @@ -191,7 +185,7 @@ stability_scope!(BETA { /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. /// - /// Use [`Spawner::available_cpus`] to query the runtime's CPU ids for placement decisions. + /// Use [`crate::utils::thread::available_cpus`] to query valid CPU ids for placement decisions. /// /// Runtimes that implement CPU pinning perform validation and the pinning attempt when /// [`Spawner::spawn`] starts that task. @@ -900,6 +894,7 @@ mod tests { use commonware_utils::{ channel::{mpsc, oneshot}, sync::Mutex, + vec::NonEmptyVec, NZUsize, SystemTimeExt, }; use futures::{ @@ -1766,8 +1761,7 @@ mod tests { R::Context: Spawner, { runner.start(|context| async { - let cpu = context - .available_cpus() + let cpu = utils::thread::available_cpus() .map(|cpus| *cpus.first()) .unwrap_or(0); let handle = context.pinned(cpu).spawn(|_| async { 42 }); @@ -3966,7 +3960,7 @@ mod tests { // Verify that pinned implies dedicated. let executor = tokio::Runner::default(); executor.start(|context| async { - let cpu = *context.available_cpus().unwrap().first(); + let cpu = *utils::thread::available_cpus().unwrap().first(); let root_thread = std::thread::current().id(); let task_thread = context .pinned(cpu) @@ -3985,7 +3979,7 @@ mod tests { // for every allowed CPU id. let executor = tokio::Runner::default(); executor.start(|context| async move { - for cpu in context.available_cpus().cloned().unwrap() { + for cpu in utils::thread::available_cpus().unwrap() { let actual = context .clone() .pinned(cpu) @@ -4008,7 +4002,7 @@ mod tests { // different threads but report the same CPU. let executor = tokio::Runner::default(); executor.start(|context| async move { - let cpu = *context.available_cpus().unwrap().last(); + let cpu = *utils::thread::available_cpus().unwrap().last(); let t1 = context.clone().pinned(cpu).spawn(|_| async { // SAFETY: `sched_getcpu` is a read-only query with no // preconditions. @@ -4032,21 +4026,23 @@ mod tests { #[cfg(target_os = "linux")] #[test] fn test_tokio_available_cpus_stable_inside_pinned_task() { - // `Spawner::available_cpus` should keep reporting the runtime's - // placement set even when the current task narrows its own thread - // affinity with `.pinned(...)`. + // The runtime placement set should stay stable even when the current + // task narrows its own thread affinity with `.pinned(...)`. let executor = tokio::Runner::default(); executor.start(|context| async { - let available = context.available_cpus().cloned().unwrap(); + let available = utils::thread::available_cpus().unwrap(); let cpu = *available.first(); context .pinned(cpu) - .spawn(move |context| async move { - // The public runtime API stays stable, while the - // thread-local helper reflects the live pinned mask. - assert_eq!(context.available_cpus(), Some(&available)); - assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu))); + .spawn(move |_| async move { + // Placement queries stay stable, while the test-only helper + // reflects the live pinned mask. + assert_eq!(utils::thread::available_cpus(), Some(available)); + assert_eq!( + utils::thread::current_affinity_cpus(), + Some(NonEmptyVec::new(cpu)) + ); }) .await .unwrap(); @@ -4061,7 +4057,7 @@ mod tests { // parent thread's narrowed affinity mask. let executor = tokio::Runner::default(); executor.start(|context| async { - let available = context.available_cpus().cloned().unwrap(); + let available = utils::thread::available_cpus().unwrap(); if available.len().get() < 2 { return; } @@ -4071,8 +4067,8 @@ mod tests { .pinned(pinned_cpu) .spawn(move |context| async move { assert_eq!( - utils::thread::available_cpus(), - Some(NonEmptyVec::new(pinned_cpu)), + utils::thread::current_affinity_cpus(), + Some(NonEmptyVec::new(pinned_cpu)) ); // The dedicated child is unpinned, so it should restore the @@ -4080,7 +4076,7 @@ mod tests { context .dedicated() .spawn(|_| async { - assert_eq!(utils::thread::available_cpus(), Some(available)); + assert_eq!(utils::thread::current_affinity_cpus(), Some(available)); }) .await .unwrap(); @@ -4098,8 +4094,7 @@ mod tests { // cleaning up the task's metrics and supervision state. let executor = tokio::Runner::default(); executor.start(|context| async move { - let invalid_cpu = context - .available_cpus() + let invalid_cpu = utils::thread::available_cpus() .as_ref() .map(|cpus| *cpus.last()) .map(|cpu| cpu + 1) diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 2a8b8e365ae..3aea8c196ba 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -26,7 +26,7 @@ use crate::{ use commonware_macros::{select, stability}; #[stability(BETA)] use commonware_parallel::ThreadPool; -use commonware_utils::{sync::Mutex, vec::NonEmptyVec, NZUsize}; +use commonware_utils::{sync::Mutex, NZUsize}; use futures::future::Either; use governor::clock::{Clock as GClock, ReasonablyRealtime}; use prometheus_client::{ @@ -335,7 +335,6 @@ pub struct Executor { shutdown: Mutex, panicker: Panicker, thread_stack_size: usize, - available_cpus: Option>, } /// Implementation of [crate::Runner] for the `tokio` runtime. @@ -481,7 +480,6 @@ impl crate::Runner for Runner { shutdown: Mutex::new(Stopper::default()), panicker, thread_stack_size: self.cfg.thread_stack_size, - available_cpus: utils::thread::available_cpus(), }); // Get metrics @@ -570,10 +568,6 @@ impl Context { } impl crate::Spawner for Context { - fn available_cpus(&self) -> Option<&NonEmptyVec> { - self.executor.available_cpus.as_ref() - } - fn dedicated(mut self) -> Self { self.execution = Execution::Dedicated(None); self @@ -918,23 +912,15 @@ where { // Ensure the task can access the tokio runtime. let runtime_handle = executor.runtime.handle().clone(); - let available_cpus = if cpu.is_none() { - executor.available_cpus.clone() - } else { - None - }; // The dedicated thread reports whether startup succeeded before it begins // polling the task future. let (startup_tx, startup_rx) = mpsc::sync_channel(1); if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { - let cpu = cpu.map(|cpu| [cpu]); - let cpus = cpu - .as_ref() - .map(|cpu| cpu.as_slice()) - .or(available_cpus.as_deref()); - let result = cpus.map_or(Ok(()), utils::thread::set_cpu_affinity); + let result = cpu.map_or_else(utils::thread::reset_cpu_affinity, |cpu| { + utils::thread::set_cpu_affinity(&[cpu]) + }); let is_ok = result.is_ok(); startup_tx .send(result) diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index a029d3c14c2..a8a7ce07891 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -1,5 +1,4 @@ use crate::{signal, Error, Handle}; -use commonware_utils::vec::NonEmptyVec; use governor::clock::{Clock as GClock, ReasonablyRealtime}; use prometheus_client::registry::Metric; use rand::{CryptoRng, RngCore}; @@ -107,10 +106,6 @@ impl crate::Spawner for Cell where C: crate::Spawner, { - fn available_cpus(&self) -> Option<&NonEmptyVec> { - self.as_present().available_cpus() - } - fn dedicated(self) -> Self { Self::Present(self.into_present().dedicated()) } diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 76841fe2d79..ba0938c306d 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,4 +1,4 @@ -//! Helpers for resolving the configured thread stack size. +//! Helpers for managing runtime-owned threads. use commonware_utils::vec::NonEmptyVec; use std::{env, sync::OnceLock, thread}; @@ -131,12 +131,15 @@ where try_spawn(stack_size, f).expect("failed to spawn thread") } -/// Returns the logical CPU ids currently allowed for the calling thread. +/// Returns the logical CPU ids enabled in `pid`'s affinity mask. /// -/// This queries the calling thread's affinity mask via `sched_getaffinity` and -/// returns `None` if that query fails. +/// `pid` uses the Linux `sched_getaffinity` semantics, so `0` targets the +/// calling thread and any other value names a specific thread id. +/// +/// Returns `None` if the affinity mask cannot be queried or would require an +/// unreasonably large probe buffer. #[cfg(target_os = "linux")] -pub(crate) fn available_cpus() -> Option> { +fn affinity_cpus(pid: libc::pid_t) -> Option> { let word_bits = libc::c_ulong::BITS as usize; let mut words = 1; @@ -147,11 +150,11 @@ pub(crate) fn available_cpus() -> Option> { let cpusetsize = std::mem::size_of_val(mask.as_slice()); // SAFETY: `mask` points to writable storage for `cpusetsize` bytes, and - // `pid == 0` targets the calling thread as documented by the syscall. + // `pid` names the thread whose affinity mask should be queried. let result = unsafe { libc::syscall( libc::SYS_sched_getaffinity, - 0, + pid, cpusetsize, mask.as_mut_ptr(), ) @@ -191,11 +194,31 @@ pub(crate) fn available_cpus() -> Option> { cpus.try_into().ok() } -/// Returns the logical CPU ids currently allowed for the calling thread. +/// Returns the logical CPU ids available for [`crate::Spawner::pinned`] placements. /// -/// Always returns `None` on non-Linux platforms. +/// Returns `None` if the process affinity mask cannot be queried. +#[cfg(target_os = "linux")] +pub fn available_cpus() -> Option> { + // SAFETY: `getpid` has no preconditions and returns the process leader's + // pid, which `sched_getaffinity` uses to query that thread's baseline mask. + let pid = unsafe { libc::getpid() }; + affinity_cpus(pid) +} + +/// Returns `None` because CPU pinning is not available on this platform. #[cfg(not(target_os = "linux"))] -pub(crate) const fn available_cpus() -> Option> { +#[commonware_macros::stability(BETA)] +pub const fn available_cpus() -> Option> { + None +} + +#[cfg(all(target_os = "linux", test))] +pub(crate) fn current_affinity_cpus() -> Option> { + affinity_cpus(0) +} + +#[cfg(all(not(target_os = "linux"), test))] +pub(crate) const fn current_affinity_cpus() -> Option> { None } @@ -249,6 +272,22 @@ pub(crate) fn set_cpu_affinity(_cpus: &[usize]) -> Result<(), std::io::Error> { )) } +/// Resets the current thread's affinity mask to [`available_cpus`]. +/// +/// Returns `Ok(())` if the baseline CPU set cannot be queried. +#[cfg(target_os = "linux")] +pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { + available_cpus().map_or(Ok(()), |cpus| set_cpu_affinity(&cpus)) +} + +/// Resets the current thread's affinity mask to [`available_cpus`]. +/// +/// Always returns `Ok(())` since CPU affinity is not available on this platform. +#[cfg(not(target_os = "linux"))] +pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -272,7 +311,10 @@ mod tests { let cpu = *cpus.first(); let pinned = NonEmptyVec::new(cpu); set_cpu_affinity(&pinned).unwrap(); - assert_eq!(available_cpus(), Some(pinned)); + assert_eq!(current_affinity_cpus(), Some(pinned)); + + reset_cpu_affinity().unwrap(); + assert_eq!(current_affinity_cpus(), Some(cpus.clone())); let invalid_cpu = cpus.last().checked_add(1).unwrap(); assert!( @@ -300,5 +342,6 @@ mod tests { #[test] fn test_set_cpu_affinity_non_linux() { assert!(set_cpu_affinity(&[0]).is_err()); + assert!(reset_cpu_affinity().is_ok()); } } From 87f22f0aaf23a07e0f2152888f4b5945aea18efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 19:30:28 +0100 Subject: [PATCH 32/35] [runtime] export available_cpus --- runtime/src/lib.rs | 2 +- runtime/src/utils/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index dc0a9b81b14..59284840e65 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -185,7 +185,7 @@ stability_scope!(BETA { /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. /// - /// Use [`crate::utils::thread::available_cpus`] to query valid CPU ids for placement decisions. + /// Use [`crate::utils::available_cpus`] to query valid CPU ids for placement decisions. /// /// Runtimes that implement CPU pinning perform validation and the pinning attempt when /// [`Spawner::spawn`] starts that task. diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 83a12d65920..d659e6a274e 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -16,6 +16,7 @@ commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; +pub use thread::available_cpus; mod handle; pub use handle::Handle; From 34c0cdcc80b62e2c519602347a821b81a7789a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 19:42:12 +0100 Subject: [PATCH 33/35] [runtime] clippy --- runtime/src/lib.rs | 12 ++++++++---- runtime/src/utils/thread.rs | 18 ++++++------------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 59284840e65..431e1910f7e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -891,10 +891,11 @@ mod tests { use crate::telemetry::traces::collector::TraceStorage; use bytes::Bytes; use commonware_macros::{select, test_collect_traces}; + #[cfg(target_os = "linux")] + use commonware_utils::vec::NonEmptyVec; use commonware_utils::{ channel::{mpsc, oneshot}, sync::Mutex, - vec::NonEmptyVec, NZUsize, SystemTimeExt, }; use futures::{ @@ -4040,7 +4041,7 @@ mod tests { // reflects the live pinned mask. assert_eq!(utils::thread::available_cpus(), Some(available)); assert_eq!( - utils::thread::current_affinity_cpus(), + utils::thread::tests::current_affinity_cpus(), Some(NonEmptyVec::new(cpu)) ); }) @@ -4067,7 +4068,7 @@ mod tests { .pinned(pinned_cpu) .spawn(move |context| async move { assert_eq!( - utils::thread::current_affinity_cpus(), + utils::thread::tests::current_affinity_cpus(), Some(NonEmptyVec::new(pinned_cpu)) ); @@ -4076,7 +4077,10 @@ mod tests { context .dedicated() .spawn(|_| async { - assert_eq!(utils::thread::current_affinity_cpus(), Some(available)); + assert_eq!( + utils::thread::tests::current_affinity_cpus(), + Some(available) + ); }) .await .unwrap(); diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index ba0938c306d..9d3c3fbbe29 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -207,21 +207,10 @@ pub fn available_cpus() -> Option> { /// Returns `None` because CPU pinning is not available on this platform. #[cfg(not(target_os = "linux"))] -#[commonware_macros::stability(BETA)] pub const fn available_cpus() -> Option> { None } -#[cfg(all(target_os = "linux", test))] -pub(crate) fn current_affinity_cpus() -> Option> { - affinity_cpus(0) -} - -#[cfg(all(not(target_os = "linux"), test))] -pub(crate) const fn current_affinity_cpus() -> Option> { - None -} - /// Sets the current thread's affinity mask to the given logical CPU ids. /// /// Returns an error if `cpus` is empty, contains an unreasonably large CPU id, @@ -289,11 +278,16 @@ pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { } #[cfg(test)] -mod tests { +pub mod tests { use super::*; #[cfg(target_os = "linux")] use commonware_utils::vec::NonEmptyVec; + #[cfg(target_os = "linux")] + pub fn current_affinity_cpus() -> Option> { + affinity_cpus(0) + } + #[cfg(target_os = "linux")] #[test] fn test_available_cpus_linux() { From 2e18622109107b256d2e222ec87e86f3ddab5f30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 19:51:06 +0100 Subject: [PATCH 34/35] [runtime] clippy --- runtime/src/utils/thread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 9d3c3fbbe29..5412e1c35c7 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -273,7 +273,7 @@ pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { /// /// Always returns `Ok(())` since CPU affinity is not available on this platform. #[cfg(not(target_os = "linux"))] -pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { +pub(crate) const fn reset_cpu_affinity() -> Result<(), std::io::Error> { Ok(()) } From 1fb2e837c28a532b320faef3b90feeab77521f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 22 Apr 2026 23:42:18 +0100 Subject: [PATCH 35/35] [runtime] fix wasm --- runtime/src/utils/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index d659e6a274e..660c084583b 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -16,6 +16,7 @@ commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; +#[cfg(not(target_arch = "wasm32"))] pub use thread::available_cpus; mod handle;