Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1b43392
[runtime] support pinning dedicated tasks to a core
andresilva Apr 7, 2026
fd20460
[runtime] add helper for available cores
andresilva Apr 7, 2026
8c076c1
[runtime] clippy
andresilva Apr 7, 2026
df22aa9
[runtime] warn once on pin_to_core failure
andresilva Apr 7, 2026
cc2321b
[runtime] clippy
andresilva Apr 7, 2026
53bff09
[runtime] tests
andresilva Apr 7, 2026
f00c82e
[runtime] panic on invalid core number
andresilva Apr 7, 2026
d3877f5
[runtime] test pinned panic
andresilva Apr 7, 2026
595bbcc
[runtime] panic eagerly on invalid pinned
andresilva Apr 7, 2026
a3b5f97
[runtime] fix test
andresilva Apr 7, 2026
a0d9699
[runtime] available_cores works on any unix
andresilva Apr 8, 2026
cd17564
[runtime] test available_cores
andresilva Apr 8, 2026
e6077ad
Merge branch 'main' into pr/3549
patrick-ogrady Apr 21, 2026
ab37233
[runtime] remove stale instrumented spawner method
patrick-ogrady Apr 21, 2026
bb4238b
Merge branch 'main' into andre/runtime-spawn-pinned
andresilva Apr 22, 2026
cc4a3b7
[runtime] pin cpus not cores
andresilva Apr 22, 2026
017c4c7
[runtime] panic on failed cpu pinning
andresilva Apr 22, 2026
64ffa2d
[runtime] clippy
andresilva Apr 22, 2026
eb5ac73
[runtime] cleanup
andresilva Apr 22, 2026
2ee66b0
[runtime] fix large cpu oom
andresilva Apr 22, 2026
1d33fe6
[runtime] cleanup available_cpus
andresilva Apr 22, 2026
d6bdfdf
[runtime] cleanup
andresilva Apr 22, 2026
6ef8a13
[runtime] cleanup
andresilva Apr 22, 2026
380231c
[runtime] cleanup
andresilva Apr 22, 2026
61be589
[runtime] cleanup tests
andresilva Apr 22, 2026
aff0464
[runtime] avoid clone
andresilva Apr 22, 2026
d478b2a
[runtime] cleanup
andresilva Apr 22, 2026
2885b19
[runtime] nit
andresilva Apr 22, 2026
6a9261e
[runtime] nit
andresilva Apr 22, 2026
72bc5ea
[runtime] fix
andresilva Apr 22, 2026
201683d
[runtime] fix test
andresilva Apr 22, 2026
412967a
[runtime] clippy
andresilva Apr 22, 2026
8df7099
[runtime] remove Spawner::available_cpus
andresilva Apr 22, 2026
87f22f0
[runtime] export available_cpus
andresilva Apr 22, 2026
34c0cdc
[runtime] clippy
andresilva Apr 22, 2026
2e18622
[runtime] clippy
andresilva Apr 22, 2026
1fb2e83
[runtime] fix wasm
andresilva Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion runtime/src/deterministic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use commonware_utils::{
hex,
sync::{Mutex, RwLock},
time::SYSTEM_TIME_PRECISION,
vec::NonEmptyVec,
SystemTimeExt,
};
#[cfg(feature = "external")]
Expand Down Expand Up @@ -1133,8 +1134,17 @@ impl Context {
}

impl crate::Spawner for Context {
fn available_cpus(&self) -> Option<NonEmptyVec<usize>> {
None
}

fn dedicated(mut self) -> Self {
self.execution = Execution::Dedicated;
self.execution = Execution::Dedicated(None);
self
}

fn pinned(mut self, cpu: usize) -> Self {
self.execution = Execution::Dedicated(Some(cpu));
self
}

Expand Down
264 changes: 263 additions & 1 deletion runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
stability_scope!(BETA {
use commonware_macros::select;
use commonware_parallel::{Rayon, ThreadPool};
use commonware_utils::vec::NonEmptyVec;
use iobuf::PoolError;
use prometheus_client::registry::Metric;
use rayon::ThreadPoolBuildError;
Expand Down Expand Up @@ -161,6 +162,11 @@ stability_scope!(BETA {

/// Interface that any task scheduler must implement to spawn tasks.
pub trait Spawner: Clone + Send + Sync + 'static {
/// Return the logical CPU ids this runtime makes available for [`Spawner::pinned`] placements.
///
/// Returns `None` if the runtime does not support CPU pinning.
fn available_cpus(&self) -> Option<NonEmptyVec<usize>>;

/// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
///
/// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
Expand All @@ -179,6 +185,23 @@ stability_scope!(BETA {
/// This is not the default behavior. See [`Spawner::shared`] for more information.
fn dedicated(self) -> Self;

/// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU.
///
/// The `cpu` value is a Linux logical CPU id used with `sched_setaffinity`.
/// Use [`Spawner::available_cpus`] to query the runtime's CPU ids for placement
/// decisions rather than the current thread's live affinity mask.
///
/// This only configures the next spawned task. Runtimes that implement CPU pinning
/// perform validation and the pinning attempt when [`Spawner::spawn`] starts that task.
///
/// Implies [`Spawner::dedicated`].
///
/// # Panics
///
/// [`Spawner::spawn`] may panic if CPU pinning is unavailable for the runtime or if the
/// operating system rejects `cpu` when the task starts.
fn pinned(self, cpu: usize) -> Self;

/// Spawn a task with the current context.
///
/// Unlike directly awaiting a future, the task starts running immediately even if the caller
Expand Down Expand Up @@ -208,10 +231,17 @@ stability_scope!(BETA {
/// # Spawn Configuration
///
/// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
/// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
/// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`], [`Spawner::pinned`], or
/// [`Spawner::shared`] is reset.
///
/// Child tasks should assume they start from a clean configuration without needing to inspect how their
/// parent was configured.
///
/// # Panics
///
/// Panics if the runtime cannot start a task configured with [`Spawner::dedicated`], or
/// if a task configured with [`Spawner::pinned`] cannot be pinned when it starts running.
/// These startup panics occur in the caller of [`Spawner::spawn`].
fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Expand Down Expand Up @@ -1729,6 +1759,20 @@ mod tests {
});
}

fn test_spawn_pinned<R: Runner>(runner: R)
where
R::Context: Spawner,
{
runner.start(|context| async move {
let cpu = context
.available_cpus()
.map(|cpus| *cpus.first())
.unwrap_or(0);
let handle = context.pinned(cpu).spawn(|_| async move { 42 });
assert!(matches!(handle.await, Ok(42)));
});
}

fn test_spawn<R: Runner>(runner: R)
where
R::Context: Spawner + Clock,
Expand Down Expand Up @@ -3544,6 +3588,12 @@ mod tests {
test_spawn_dedicated(executor);
}

#[test]
fn test_deterministic_spawn_pinned() {
let executor = deterministic::Runner::default();
test_spawn_pinned(executor);
}

#[test]
fn test_deterministic_spawn() {
let runner = deterministic::Runner::default();
Expand Down Expand Up @@ -3893,6 +3943,218 @@ mod tests {
test_spawn_dedicated(executor);
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned() {
let executor = tokio::Runner::default();
test_spawn_pinned(executor);
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_dedicated_thread() {
// Verify that pinned implies dedicated.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let cpu = *context.available_cpus().unwrap().first();
let root_thread = std::thread::current().id();
let task_thread = context
.pinned(cpu)
.spawn(|_| async move { std::thread::current().id() })
.await
.unwrap();
// The task should run on a different thread than the root thread.
assert_ne!(root_thread, task_thread);
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_correct_cpu() {
// Verify that a pinned task is actually running on the expected CPU,
// for every allowed CPU id.
let executor = tokio::Runner::default();
executor.start(|context| async move {
for cpu in context.available_cpus().unwrap() {
let actual = context
.clone()
.pinned(cpu)
.spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
unsafe { libc::sched_getcpu() as usize }
})
.await
.unwrap();
assert_eq!(actual, cpu);
}
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_same_cpu() {
// Verify that two separate tasks pinned to the same CPU run on
// different threads but report the same CPU.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let cpu = *context.available_cpus().unwrap().last();
let t1 = context.clone().pinned(cpu).spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
(std::thread::current().id(), unsafe { libc::sched_getcpu() })
});
let t2 = context.clone().pinned(cpu).spawn(|_| async move {
// SAFETY: `sched_getcpu` is a read-only query with no
// preconditions.
(std::thread::current().id(), unsafe { libc::sched_getcpu() })
Comment thread
cursor[bot] marked this conversation as resolved.
});
let (r1, r2) = futures::future::join(t1, t2).await;
let (thread1, cpu1) = r1.unwrap();
let (thread2, cpu2) = r2.unwrap();
// Different dedicated threads.
assert_ne!(thread1, thread2);
// Same CPU.
assert_eq!(cpu1, cpu2);
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_available_cpus_stable_inside_pinned_task() {
// `Spawner::available_cpus` should keep reporting the runtime's
// placement set even when the current task narrows its own thread
// affinity with `.pinned(...)`.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let available = context.available_cpus().unwrap();
let cpu = *available.first();

context
.pinned(cpu)
.spawn(move |context| {
let available = available.clone();
async move {
// The public runtime API stays stable, while the
// thread-local helper reflects the live pinned mask.
assert_eq!(context.available_cpus(), Some(available.clone()));
assert_eq!(utils::thread::available_cpus(), Some(NonEmptyVec::new(cpu)),);
}
})
.await
.unwrap();
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_dedicated_restores_baseline_affinity() {
// An unpinned dedicated child spawned from within pinned tasks should
// reset back to the runtime's placement set instead of inheriting the
// parent thread's narrowed affinity mask.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let available = context.available_cpus().unwrap();
if available.len().get() < 2 {
return;
}

let outer_cpu = available[0];
let inner_cpu = available[1];
context
.pinned(outer_cpu)
.spawn(move |context| {
let available = available.clone();
async move {
context
.pinned(inner_cpu)
.spawn(move |context| {
let available = available.clone();
async move {
assert_eq!(
utils::thread::available_cpus(),
Some(NonEmptyVec::new(inner_cpu)),
);
// The dedicated child is unpinned, so it
// should restore the runtime baseline mask.
let dedicated_allowed = context
.dedicated()
.spawn(|_| async move { utils::thread::available_cpus() })
.await
.unwrap();
assert_eq!(dedicated_allowed, Some(available.clone()));
}
})
.await
.unwrap()
}
})
.await
.unwrap();
});
}

#[cfg(target_os = "linux")]
#[test]
#[should_panic(expected = "failed to pin task to cpu")]
fn test_tokio_spawn_pinned_invalid_cpu() {
// Pinning to a CPU outside the runtime baseline CPU set panics when the task starts.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let invalid_cpu = context
.available_cpus()
.as_ref()
.map(|cpus| *cpus.last())
.map(|cpu| cpu + 1);
let invalid_cpu = invalid_cpu.unwrap();
context.pinned(invalid_cpu).spawn(|_| async {});
});
}

#[cfg(target_os = "linux")]
#[test]
#[should_panic(expected = "failed to pin task to cpu")]
fn test_tokio_spawn_pinned_huge_cpu() {
let executor = tokio::Runner::default();
executor.start(|context| async move {
context.pinned(usize::MAX).spawn(|_| async {});
});
}

#[cfg(target_os = "linux")]
#[test]
fn test_tokio_spawn_pinned_invalid_cpu_cleans_metrics() {
// A pinning failure happens during spawn-time startup, before the task
// future is polled. That failure must still clean up the task's
// metrics and supervision state.
let executor = tokio::Runner::default();
executor.start(|context| async move {
let invalid_cpu = context
.available_cpus()
.as_ref()
.map(|cpus| *cpus.last())
.map(|cpu| cpu + 1);
let invalid_cpu = invalid_cpu.unwrap();
let context = context.with_label("pinned_invalid_cpu");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
context.clone().pinned(invalid_cpu).spawn(|_| async {});
}));
assert!(result.is_err(), "expected invalid pinned cpu to panic");

while utils::count_running_tasks(&context, "pinned_invalid_cpu") > 0 {
context.sleep(Duration::from_millis(10)).await;
}
});
}

#[cfg(not(target_os = "linux"))]
#[test]
#[should_panic(expected = "cpu pinning is not available on this platform")]
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
fn test_tokio_spawn_pinned_unavailable() {
let executor = tokio::Runner::default();
test_spawn_pinned(executor);
}

#[test]
fn test_tokio_spawn() {
let runner = tokio::Runner::default();
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/telemetry/metrics/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Label {
name,
kind: Kind::Task,
execution: match execution {
crate::Execution::Dedicated => Execution::Dedicated,
crate::Execution::Dedicated(_) => Execution::Dedicated,
crate::Execution::Shared(blocking) => {
if blocking {
Execution::SharedBlocking
Expand Down
Loading
Loading