Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1b43392
[runtime] support pinning dedicated tasks to a core
andresilva Apr 7, 2026
fd20460
[runtime] add helper for available cores
andresilva Apr 7, 2026
8c076c1
[runtime] clippy
andresilva Apr 7, 2026
df22aa9
[runtime] warn once on pin_to_core failure
andresilva Apr 7, 2026
cc2321b
[runtime] clippy
andresilva Apr 7, 2026
53bff09
[runtime] tests
andresilva Apr 7, 2026
f00c82e
[runtime] panic on invalid core number
andresilva Apr 7, 2026
d3877f5
[runtime] test pinned panic
andresilva Apr 7, 2026
595bbcc
[runtime] panic eagerly on invalid pinned
andresilva Apr 7, 2026
a3b5f97
[runtime] fix test
andresilva Apr 7, 2026
a0d9699
[runtime] available_cores works on any unix
andresilva Apr 8, 2026
cd17564
[runtime] test available_cores
andresilva Apr 8, 2026
e6077ad
Merge branch 'main' into pr/3549
patrick-ogrady Apr 21, 2026
ab37233
[runtime] remove stale instrumented spawner method
patrick-ogrady Apr 21, 2026
bb4238b
Merge branch 'main' into andre/runtime-spawn-pinned
andresilva Apr 22, 2026
cc4a3b7
[runtime] pin cpus not cores
andresilva Apr 22, 2026
017c4c7
[runtime] panic on failed cpu pinning
andresilva Apr 22, 2026
64ffa2d
[runtime] clippy
andresilva Apr 22, 2026
eb5ac73
[runtime] cleanup
andresilva Apr 22, 2026
2ee66b0
[runtime] fix large cpu oom
andresilva Apr 22, 2026
1d33fe6
[runtime] cleanup available_cpus
andresilva Apr 22, 2026
d6bdfdf
[runtime] cleanup
andresilva Apr 22, 2026
6ef8a13
[runtime] cleanup
andresilva Apr 22, 2026
380231c
[runtime] cleanup
andresilva Apr 22, 2026
61be589
[runtime] cleanup tests
andresilva Apr 22, 2026
aff0464
[runtime] avoid clone
andresilva Apr 22, 2026
d478b2a
[runtime] cleanup
andresilva Apr 22, 2026
2885b19
[runtime] nit
andresilva Apr 22, 2026
6a9261e
[runtime] nit
andresilva Apr 22, 2026
72bc5ea
[runtime] fix
andresilva Apr 22, 2026
201683d
[runtime] fix test
andresilva Apr 22, 2026
412967a
[runtime] clippy
andresilva Apr 22, 2026
8df7099
[runtime] remove Spawner::available_cpus
andresilva Apr 22, 2026
87f22f0
[runtime] export available_cpus
andresilva Apr 22, 2026
34c0cdc
[runtime] clippy
andresilva Apr 22, 2026
2e18622
[runtime] clippy
andresilva Apr 22, 2026
1fb2e83
[runtime] fix wasm
andresilva Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion runtime/src/deterministic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,12 @@ impl Context {

impl crate::Spawner for Context {
fn dedicated(mut self) -> Self {
self.execution = Execution::Dedicated;
self.execution = Execution::Dedicated(None);
self
}

fn pinned(mut self, core: usize) -> Self {
self.execution = Execution::Dedicated(Some(core));
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
self
}

Expand Down
117 changes: 117 additions & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ stability_scope!(BETA {
/// This is not the default behavior. See [`Spawner::shared`] for more information.
fn dedicated(self) -> Self;

/// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given core.
///
/// Core pinning is currently Linux only and a no-op on other platforms. Pinning may
/// silently fail in restricted environments (e.g. containers with cgroup CPU limits),
/// this method will still succeed but the thread will run unpinned.
///
/// Use [`available_cores`] to query the number of available CPUs.
///
/// Implies [`Spawner::dedicated`].
///
/// # Panics
///
/// Panics if `core` is greater than or equal to the number of available CPUs.
fn pinned(self, core: usize) -> Self;

/// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
fn instrumented(self) -> Self;

Expand Down Expand Up @@ -1686,6 +1701,16 @@ mod tests {
});
}

fn test_spawn_pinned<R: Runner>(runner: R)
where
R::Context: Spawner,
{
runner.start(|context| async move {
let handle = context.pinned(0).spawn(|_| async move { 42 });
assert!(matches!(handle.await, Ok(42)));
});
}

fn test_spawn<R: Runner>(runner: R)
where
R::Context: Spawner + Clock,
Expand Down Expand Up @@ -3317,6 +3342,12 @@ mod tests {
test_spawn_dedicated(executor);
}

#[test]
fn test_deterministic_spawn_pinned() {
let executor = deterministic::Runner::default();
test_spawn_pinned(executor);
}

#[test]
fn test_deterministic_spawn() {
let runner = deterministic::Runner::default();
Expand Down Expand Up @@ -3666,6 +3697,92 @@ mod tests {
test_spawn_dedicated(executor);
}

#[test]
fn test_tokio_spawn_pinned() {
let executor = tokio::Runner::default();
test_spawn_pinned(executor);
}

#[test]
fn test_tokio_spawn_pinned_dedicated_thread() {
// Verify that pinned implies dedicated.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let root_thread = std::thread::current().id();
let task_thread = context
.pinned(0)
.spawn(|_| async move { std::thread::current().id() })
.await
.unwrap();
// The task should run on a different thread than the root thread.
assert_ne!(root_thread, task_thread);
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_correct_core() {
// Verify that a pinned task is actually running on the expected core,
// for every available core.
let num_cores = crate::available_cores().unwrap();
let executor = tokio::Runner::default();
executor.start(|context| async move {
for core in 0..num_cores {
let actual = context
.clone()
.pinned(core)
.spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
unsafe { libc::sched_getcpu() as usize }
})
.await
.unwrap();
assert_eq!(actual, core);
}
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_same_core() {
// Verify that two separate tasks pinned to the same core run on
// different threads but report the same CPU.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let core = crate::available_cores().unwrap() - 1;
let t1 = context.clone().pinned(core).spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
(std::thread::current().id(), unsafe { libc::sched_getcpu() })
});
let t2 = context.clone().pinned(core).spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
(std::thread::current().id(), unsafe { libc::sched_getcpu() })
Comment thread
cursor[bot] marked this conversation as resolved.
});
let (r1, r2) = futures::future::join(t1, t2).await;
let (thread1, cpu1) = r1.unwrap();
let (thread2, cpu2) = r2.unwrap();
// Different dedicated threads.
assert_ne!(thread1, thread2);
// Same core.
assert_eq!(cpu1, cpu2);
});
}

#[cfg(target_os = "linux")]
#[test]
#[should_panic(expected = "out of range")]
fn test_tokio_spawn_pinned_invalid_core() {
// Pinning to a core beyond the available count panics eagerly.
let num_cores = crate::available_cores().unwrap();
let executor = tokio::Runner::default();
executor.start(|context| async move {
context.pinned(num_cores).spawn(|_| async {});
});
}

#[test]
fn test_tokio_spawn() {
let runner = tokio::Runner::default();
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/telemetry/metrics/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Label {
name,
kind: Kind::Task,
execution: match execution {
crate::Execution::Dedicated => Execution::Dedicated,
crate::Execution::Dedicated(_) => Execution::Dedicated,
crate::Execution::Shared(blocking) => {
if blocking {
Execution::SharedBlocking
Expand Down
19 changes: 17 additions & 2 deletions runtime/src/tokio/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,18 @@ impl Context {

impl crate::Spawner for Context {
fn dedicated(mut self) -> Self {
self.execution = Execution::Dedicated;
self.execution = Execution::Dedicated(None);
self
}

fn pinned(mut self, core: usize) -> Self {
if let Some(num_cores) = utils::thread::available_cores() {
assert!(
core < num_cores,
"core {core} out of range ({num_cores} available)"
);
}
self.execution = Execution::Dedicated(Some(core));
self
}

Expand Down Expand Up @@ -601,11 +612,15 @@ impl crate::Spawner for Context {
Arc::clone(&parent),
);

if matches!(past, Execution::Dedicated) {
if let Execution::Dedicated(core) = past {
utils::thread::spawn(executor.thread_stack_size, {
// Ensure the task can access the tokio runtime
let handle = executor.runtime.handle().clone();
move || {
// Pin before running any work on this thread
if let Some(core) = core {
utils::thread::pin_to_core(core);
}
Comment thread
cursor[bot] marked this conversation as resolved.
handle.block_on(f);
}
});
Expand Down
4 changes: 4 additions & 0 deletions runtime/src/utils/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ where
Self::Present(self.into_present().dedicated())
}

fn pinned(self, core: usize) -> Self {
Self::Present(self.into_present().pinned(core))
}

fn shared(self, blocking: bool) -> Self {
Self::Present(self.into_present().shared(blocking))
}
Expand Down
7 changes: 5 additions & 2 deletions runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ commonware_macros::stability_mod!(BETA, pub mod buffer);
pub mod signal;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod thread;
#[cfg(not(target_arch = "wasm32"))]
pub use thread::available_cores;

mod handle;
pub use handle::Handle;
Expand All @@ -31,8 +33,9 @@ pub(crate) mod supervision;
/// The execution mode of a task.
#[derive(Copy, Clone, Debug)]
pub enum Execution {
/// Task runs on a dedicated thread.
Dedicated,
/// Task runs on a dedicated thread, optionally pinned to a core. Core pinning is
/// currently Linux only and a no-op on other platforms.
Dedicated(Option<usize>),
/// Task runs on the shared executor. `true` marks short blocking work that should
/// use the runtime's blocking-friendly pool.
Shared(bool),
Expand Down
99 changes: 96 additions & 3 deletions runtime/src/utils/thread.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Helpers for resolving the configured thread stack size.

#[cfg(target_os = "linux")]
use commonware_utils::sync::Once;
use std::{env, sync::OnceLock, thread};

/// Cached configured thread stack size.
static SYSTEM_THREAD_STACK_SIZE: OnceLock<usize> = OnceLock::new();

/// Rust's default thread stack size.
///
/// See <https://doc.rust-lang.org/std/thread/#stack-size>.
Expand Down Expand Up @@ -32,7 +31,10 @@ fn rust_min_stack() -> Option<usize> {
///
/// On other platforms, or if the platform-specific query fails, this falls back
/// to [RUST_DEFAULT_THREAD_STACK_SIZE].
///
/// The result is cached after the first call.
pub(crate) fn system_thread_stack_size() -> usize {
static SYSTEM_THREAD_STACK_SIZE: OnceLock<usize> = OnceLock::new();
*SYSTEM_THREAD_STACK_SIZE.get_or_init(|| {
rust_min_stack()
.or(system_thread_stack_size_impl())
Expand Down Expand Up @@ -111,3 +113,94 @@ where
.spawn(f)
.expect("failed to spawn thread")
}

/// Returns the number of available CPUs, or `None` if it cannot be determined.
///
/// The result is cached after the first call.
#[cfg(unix)]
pub fn available_cores() -> Option<usize> {
static CORES: OnceLock<Option<usize>> = OnceLock::new();
*CORES.get_or_init(|| {
// SAFETY: `sysconf(_SC_NPROCESSORS_ONLN)` is a read-only query with no
// preconditions.
let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
if n <= 0 {
None
} else {
Some(n as usize)
}
})
}

/// Returns the number of available CPUs, or `None` if it cannot be determined.
///
/// Always returns `None` on non-Unix platforms.
#[cfg(not(unix))]
pub const fn available_cores() -> Option<usize> {
None
}

/// Pins the current thread to the given core.
///
/// If the CPU count cannot be queried or `sched_setaffinity` fails, a warning
/// is logged once and the thread continues unpinned.
///
/// # Panics
///
/// Panics if `core` is greater than or equal to the number of available CPUs.
#[cfg(target_os = "linux")]
pub(crate) fn pin_to_core(core: usize) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are designing your application with core pinning in mind, I wonder if there is some credence to just panicking if core pining fails?

For example, I suspect glommio just dies if it can't pin?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a lot of cases core pinning is mostly a performance optimization but not necessarily something that MUST happen, i.e. a best-effort attempt, since it doesn't incur any correctness issue if it doesn't happen. Maybe here we can just return a bool and that let's the user decide what to do? The issue is that in Context::pined(...) we have no way to expose this 🫤. I'm thinking mostly of a scenario where someone distributes a node binary, that tries to pin threads as a performance optimization, and then it suddenly panics because someone is running that (general) binary in a container.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern (maybe too pedantic) is that correct/smooth behavior does actually rely on pinning in the applications that want it (or else there is such a big performance hit it doesn't work well). 🤔

Copy link
Copy Markdown
Member Author

@andresilva andresilva Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the behavior could be controlled by a runtime configuration option? IMO it should default to "best-effort", but can be set to strict in which case we panic if pinning fails.

static WARN_CPUS: Once = Once::new();
static WARN_AFFINITY: Once = Once::new();

let Some(num_cores) = available_cores() else {
WARN_CPUS.call_once(|| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that warning each time would become very noisy/preference would just be to continue?

Copy link
Copy Markdown
Member Author

@andresilva andresilva Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if this fails once, it will very likely just keep failing forever (i.e. the syscall just isn't allowed for some reason). I can remove and just warn every time though.

tracing::warn!("failed to query CPU count, skipping core pinning");
});
return;
};
assert!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to assert in two locations?

Copy link
Copy Markdown
Member Author

@andresilva andresilva Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't assert in Context::pinned(...) then the failure only happens at spawn time. That may be fine, but may also be a bit weird? Also the panic will happen in the new spawned thread, and won't necessarily take down the main thread (we can workaround that though if you think panicking at spawn time is better).

core < num_cores,
"core {core} out of range ({num_cores} available)"
);

// SAFETY: `cpu_set` is zeroed and then a single valid CPU index is set.
unsafe {
let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_SET(core, &mut cpu_set);
let result = libc::sched_setaffinity(
0, // current thread
std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
if result != 0 {
WARN_AFFINITY.call_once(|| {
tracing::warn!(core, "sched_setaffinity failed, skipping core pinning");
});
}
}
}

/// Pins the current thread to the given core.
///
/// No-op on non-Linux platforms.
#[cfg(not(target_os = "linux"))]
pub(crate) const fn pin_to_core(_core: usize) {}

#[cfg(test)]
mod tests {
use super::*;

#[cfg(unix)]
#[test]
fn test_available_cores() {
let n = available_cores().expect("available_cores returned None on Unix");
assert!(n >= 1, "expected at least 1 core, got {n}");
}

#[cfg(not(unix))]
#[test]
fn test_available_cores_non_unix() {
assert!(available_cores().is_none());
}
}
Loading