diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 20bad024e8d..35c8cd17e62 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -1134,7 +1134,12 @@ impl Context { impl crate::Spawner for Context { fn dedicated(mut self) -> Self { - self.execution = Execution::Dedicated; + self.execution = Execution::Dedicated(None); + self + } + + fn pinned(mut self, cpu: usize) -> Self { + self.execution = Execution::Dedicated(Some(cpu)); self } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0294900de7f..431e1910f7e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -177,8 +177,27 @@ stability_scope!(BETA { /// shared executor. /// /// This is not the default behavior. See [`Spawner::shared`] for more information. + /// + /// # Panics + /// + /// [`Spawner::spawn`] may panic if the runtime cannot start the dedicated thread for that task. fn dedicated(self) -> Self; + /// Return a [`Spawner`] that runs tasks on a dedicated thread pinned to the given CPU. + /// + /// Use [`crate::utils::available_cpus`] to query valid CPU ids for placement decisions. + /// + /// Runtimes that implement CPU pinning perform validation and the pinning attempt when + /// [`Spawner::spawn`] starts that task. + /// + /// Implies [`Spawner::dedicated`]. + /// + /// # Panics + /// + /// [`Spawner::spawn`] may panic if CPU pinning is unavailable for the runtime or if the + /// operating system rejects `cpu` when the task starts. + fn pinned(self, cpu: usize) -> Self; + /// Spawn a task with the current context. /// /// Unlike directly awaiting a future, the task starts running immediately even if the caller @@ -208,10 +227,17 @@ stability_scope!(BETA { /// # Spawn Configuration /// /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via - /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset. + /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`], [`Spawner::pinned`], or + /// [`Spawner::shared`] is reset. /// /// Child tasks should assume they start from a clean configuration without needing to inspect how their /// parent was configured. + /// + /// # Panics + /// + /// Panics if the runtime cannot start a task configured with [`Spawner::dedicated`], or if + /// a task configured with [`Spawner::pinned`] cannot be pinned when it starts running. + /// These startup panics occur in the caller of [`Spawner::spawn`]. fn spawn(self, f: F) -> Handle where F: FnOnce(Self) -> Fut + Send + 'static, @@ -865,6 +891,8 @@ mod tests { use crate::telemetry::traces::collector::TraceStorage; use bytes::Bytes; use commonware_macros::{select, test_collect_traces}; + #[cfg(target_os = "linux")] + use commonware_utils::vec::NonEmptyVec; use commonware_utils::{ channel::{mpsc, oneshot}, sync::Mutex, @@ -1729,6 +1757,19 @@ mod tests { }); } + fn test_spawn_pinned(runner: R) + where + R::Context: Spawner, + { + runner.start(|context| async { + let cpu = utils::thread::available_cpus() + .map(|cpus| *cpus.first()) + .unwrap_or(0); + let handle = context.pinned(cpu).spawn(|_| async { 42 }); + assert!(matches!(handle.await, Ok(42))); + }); + } + fn test_spawn(runner: R) where R::Context: Spawner + Clock, @@ -3544,6 +3585,12 @@ mod tests { test_spawn_dedicated(executor); } + #[test] + fn test_deterministic_spawn_pinned() { + let executor = deterministic::Runner::default(); + test_spawn_pinned(executor); + } + #[test] fn test_deterministic_spawn() { let runner = deterministic::Runner::default(); @@ -3893,6 +3940,184 @@ mod tests { test_spawn_dedicated(executor); } + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned() { + let executor = tokio::Runner::default(); + test_spawn_pinned(executor); + } + + #[cfg(not(target_os = "linux"))] + #[test] + #[should_panic(expected = "failed to pin task to cpu")] + fn test_tokio_spawn_pinned() { + let executor = tokio::Runner::default(); + test_spawn_pinned(executor); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_dedicated_thread() { + // Verify that pinned implies dedicated. + let executor = tokio::Runner::default(); + executor.start(|context| async { + let cpu = *utils::thread::available_cpus().unwrap().first(); + let root_thread = std::thread::current().id(); + let task_thread = context + .pinned(cpu) + .spawn(|_| async { std::thread::current().id() }) + .await + .unwrap(); + // The task should run on a different thread than the root thread. + assert_ne!(root_thread, task_thread); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_correct_cpu() { + // Verify that a pinned task is actually running on the expected CPU, + // for every allowed CPU id. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + for cpu in utils::thread::available_cpus().unwrap() { + let actual = context + .clone() + .pinned(cpu) + .spawn(|_| async { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + unsafe { libc::sched_getcpu() as usize } + }) + .await + .unwrap(); + assert_eq!(actual, cpu); + } + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_same_cpu() { + // Verify that two separate tasks pinned to the same CPU run on + // different threads but report the same CPU. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let cpu = *utils::thread::available_cpus().unwrap().last(); + let t1 = context.clone().pinned(cpu).spawn(|_| async { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + (std::thread::current().id(), unsafe { libc::sched_getcpu() }) + }); + let t2 = context.clone().pinned(cpu).spawn(|_| async { + // SAFETY: `sched_getcpu` is a read-only query with no + // preconditions. + (std::thread::current().id(), unsafe { libc::sched_getcpu() }) + }); + let (r1, r2) = futures::future::join(t1, t2).await; + let (thread1, cpu1) = r1.unwrap(); + let (thread2, cpu2) = r2.unwrap(); + // Different dedicated threads. + assert_ne!(thread1, thread2); + // Same CPU. + assert_eq!(cpu1, cpu2); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_available_cpus_stable_inside_pinned_task() { + // The runtime placement set should stay stable even when the current + // task narrows its own thread affinity with `.pinned(...)`. + let executor = tokio::Runner::default(); + executor.start(|context| async { + let available = utils::thread::available_cpus().unwrap(); + let cpu = *available.first(); + + context + .pinned(cpu) + .spawn(move |_| async move { + // Placement queries stay stable, while the test-only helper + // reflects the live pinned mask. + assert_eq!(utils::thread::available_cpus(), Some(available)); + assert_eq!( + utils::thread::tests::current_affinity_cpus(), + Some(NonEmptyVec::new(cpu)) + ); + }) + .await + .unwrap(); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_dedicated_restores_baseline_affinity() { + // An unpinned dedicated child spawned from within pinned tasks should + // reset back to the runtime's placement set instead of inheriting the + // parent thread's narrowed affinity mask. + let executor = tokio::Runner::default(); + executor.start(|context| async { + let available = utils::thread::available_cpus().unwrap(); + if available.len().get() < 2 { + return; + } + + let pinned_cpu = available[0]; + context + .pinned(pinned_cpu) + .spawn(move |context| async move { + assert_eq!( + utils::thread::tests::current_affinity_cpus(), + Some(NonEmptyVec::new(pinned_cpu)) + ); + + // The dedicated child is unpinned, so it should restore the + // runtime baseline mask. + context + .dedicated() + .spawn(|_| async { + assert_eq!( + utils::thread::tests::current_affinity_cpus(), + Some(available) + ); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_tokio_spawn_pinned_invalid_cpu() { + // Pinning to a CPU outside the runtime placement set must panic during + // spawn-time startup, before the task future is polled, while still + // cleaning up the task's metrics and supervision state. + let executor = tokio::Runner::default(); + executor.start(|context| async move { + let invalid_cpu = utils::thread::available_cpus() + .as_ref() + .map(|cpus| *cpus.last()) + .map(|cpu| cpu + 1) + .unwrap(); + + let context = context.with_label("pinned_invalid_cpu"); + let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + context.clone().pinned(invalid_cpu).spawn(|_| async {}); + })) + .err() + .unwrap(); + assert!(utils::extract_panic_message(&*panic).contains("failed to pin task to cpu")); + + while utils::count_running_tasks(&context, "pinned_invalid_cpu") > 0 { + context.sleep(Duration::from_millis(10)).await; + } + }); + } + #[test] fn test_tokio_spawn() { let runner = tokio::Runner::default(); diff --git a/runtime/src/telemetry/metrics/task.rs b/runtime/src/telemetry/metrics/task.rs index fc0c7ab266e..786954d60a8 100644 --- a/runtime/src/telemetry/metrics/task.rs +++ b/runtime/src/telemetry/metrics/task.rs @@ -29,7 +29,7 @@ impl Label { name, kind: Kind::Task, execution: match execution { - crate::Execution::Dedicated => Execution::Dedicated, + crate::Execution::Dedicated(_) => Execution::Dedicated, crate::Execution::Shared(blocking) => { if blocking { Execution::SharedBlocking diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index a6de1585ee0..3aea8c196ba 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -43,7 +43,7 @@ use std::{ net::{IpAddr, SocketAddr}, num::NonZeroUsize, path::PathBuf, - sync::Arc, + sync::{mpsc, Arc}, time::{Duration, SystemTime}, }; use tokio::runtime::{Builder, Runtime}; @@ -569,7 +569,12 @@ impl Context { impl crate::Spawner for Context { fn dedicated(mut self) -> Self { - self.execution = Execution::Dedicated; + self.execution = Execution::Dedicated(None); + self + } + + fn pinned(mut self, cpu: usize) -> Self { + self.execution = Execution::Dedicated(Some(cpu)); self } @@ -589,7 +594,7 @@ impl crate::Spawner for Context { // Track supervision before resetting configuration let parent = Arc::clone(&self.tree); - let past = self.execution; + let execution = self.execution; let traced = self.traced; self.execution = Execution::default(); self.traced = false; @@ -617,24 +622,20 @@ impl crate::Spawner for Context { Arc::clone(&parent), ); - if matches!(past, Execution::Dedicated) { - utils::thread::spawn(executor.thread_stack_size, { - // Ensure the task can access the tokio runtime - let handle = executor.runtime.handle().clone(); - move || { - handle.block_on(f); - } - }); - } else if matches!(past, Execution::Shared(true)) { - executor.runtime.spawn_blocking({ - // Ensure the task can access the tokio runtime - let handle = executor.runtime.handle().clone(); - move || { - handle.block_on(f); - } - }); - } else { - executor.runtime.spawn(f); + match execution { + Execution::Dedicated(cpu) => spawn_dedicated(&executor, &handle, cpu, f), + Execution::Shared(true) => { + executor.runtime.spawn_blocking({ + // Ensure the task can access the tokio runtime + let handle = executor.runtime.handle().clone(); + move || { + handle.block_on(f); + } + }); + } + Execution::Shared(false) => { + executor.runtime.spawn(f); + } } // Register the task on the parent @@ -893,6 +894,67 @@ impl crate::BufferPooler for Context { } } +/// Spawn a task on a dedicated thread, optionally pinning it to `cpu`, and +/// wait for startup to complete before returning. +/// +/// When `cpu` is `None`, the dedicated thread restores the runtime's baseline +/// affinity mask before polling the task so it does not inherit a parent +/// task's temporary CPU pin. +/// +/// # Panics +/// +/// Panics if the dedicated thread cannot be created, if pinning to `cpu` fails, +/// or if the startup handshake breaks before the task begins running. +fn spawn_dedicated(executor: &Executor, handle: &Handle, cpu: Option, future: F) +where + T: Send + 'static, + F: Future + Send + 'static, +{ + // Ensure the task can access the tokio runtime. + let runtime_handle = executor.runtime.handle().clone(); + + // The dedicated thread reports whether startup succeeded before it begins + // polling the task future. + let (startup_tx, startup_rx) = mpsc::sync_channel(1); + + if let Err(err) = utils::thread::try_spawn(executor.thread_stack_size, move || { + let result = cpu.map_or_else(utils::thread::reset_cpu_affinity, |cpu| { + utils::thread::set_cpu_affinity(&[cpu]) + }); + let is_ok = result.is_ok(); + startup_tx + .send(result) + .expect("startup receiver dropped unexpectedly"); + if is_ok { + runtime_handle.block_on(future); + } + }) { + // Thread startup failed before the task future could be polled, so + // abort the handle to release its metrics and supervision state. + handle.abort(); + panic!("failed to spawn thread: {err}"); + } + + // Wait synchronously so dedicated-thread creation and CPU pinning failures + // are surfaced to the spawn caller rather than the spawned task. + match startup_rx.recv() { + Ok(Ok(())) => {} + Ok(Err(err)) => { + // Affinity setup failed before the task future started running. + handle.abort(); + match cpu { + Some(cpu) => panic!("failed to pin task to cpu {cpu}: {err}"), + None => panic!("failed to restore dedicated task affinity: {err}"), + } + } + Err(_) => { + // The dedicated thread exited before reporting startup status. + handle.abort(); + panic!("failed to start dedicated task"); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 713ecbbcc16..a8a7ce07891 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -110,6 +110,10 @@ where Self::Present(self.into_present().dedicated()) } + fn pinned(self, cpu: usize) -> Self { + Self::Present(self.into_present().pinned(cpu)) + } + fn shared(self, blocking: bool) -> Self { Self::Present(self.into_present().shared(blocking)) } diff --git a/runtime/src/utils/mod.rs b/runtime/src/utils/mod.rs index 8b60cb73d52..660c084583b 100644 --- a/runtime/src/utils/mod.rs +++ b/runtime/src/utils/mod.rs @@ -16,6 +16,8 @@ commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; #[cfg(not(target_arch = "wasm32"))] pub(crate) mod thread; +#[cfg(not(target_arch = "wasm32"))] +pub use thread::available_cpus; mod handle; pub use handle::Handle; @@ -31,8 +33,9 @@ pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { - /// Task runs on a dedicated thread. - Dedicated, + /// Task runs on a dedicated thread, optionally with a logical CPU id + /// requested for pinning. + Dedicated(Option), /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. Shared(bool), @@ -67,7 +70,7 @@ pub async fn reschedule() { Reschedule { yielded: false }.await } -fn extract_panic_message(err: &(dyn Any + Send)) -> String { +pub(crate) fn extract_panic_message(err: &(dyn Any + Send)) -> String { err.downcast_ref::<&str>().map_or_else( || { err.downcast_ref::() diff --git a/runtime/src/utils/thread.rs b/runtime/src/utils/thread.rs index 76528bcb3a9..5412e1c35c7 100644 --- a/runtime/src/utils/thread.rs +++ b/runtime/src/utils/thread.rs @@ -1,15 +1,20 @@ -//! Helpers for resolving the configured thread stack size. +//! Helpers for managing runtime-owned threads. +use commonware_utils::vec::NonEmptyVec; use std::{env, sync::OnceLock, thread}; -/// Cached configured thread stack size. -static SYSTEM_THREAD_STACK_SIZE: OnceLock = OnceLock::new(); - /// Rust's default thread stack size. /// /// See . const RUST_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024; +/// Upper bound for affinity-mask probing and construction. +/// +/// This keeps bogus CPU ids from forcing arbitrarily large allocations while +/// still leaving ample room beyond realistic machine sizes. +#[cfg(target_os = "linux")] +const MAX_AFFINITY_CPUS: usize = 1 << 20; + /// Returns the value of the `RUST_MIN_STACK` environment variable, if set. fn rust_min_stack() -> Option { env::var_os("RUST_MIN_STACK").and_then(|s| s.to_str().and_then(|s| s.parse().ok())) @@ -32,7 +37,10 @@ fn rust_min_stack() -> Option { /// /// On other platforms, or if the platform-specific query fails, this falls back /// to [RUST_DEFAULT_THREAD_STACK_SIZE]. +/// +/// The result is cached after the first call. pub(crate) fn system_thread_stack_size() -> usize { + static SYSTEM_THREAD_STACK_SIZE: OnceLock = OnceLock::new(); *SYSTEM_THREAD_STACK_SIZE.get_or_init(|| { rust_min_stack() .or(system_thread_stack_size_impl()) @@ -100,14 +108,234 @@ const fn system_thread_stack_size_impl() -> Option { None } -/// Spawns a thread with an explicit stack size. +/// Attempts to spawn a thread with an explicit stack size. +pub(crate) fn try_spawn(stack_size: usize, f: F) -> std::io::Result> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + thread::Builder::new().stack_size(stack_size).spawn(f) +} + +/// Spawns a thread with an explicit stack size, panicking if thread creation fails. +/// +/// # Panics +/// +/// Panics if the thread cannot be created. +#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))] pub(crate) fn spawn(stack_size: usize, f: F) -> thread::JoinHandle where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - thread::Builder::new() - .stack_size(stack_size) - .spawn(f) - .expect("failed to spawn thread") + try_spawn(stack_size, f).expect("failed to spawn thread") +} + +/// Returns the logical CPU ids enabled in `pid`'s affinity mask. +/// +/// `pid` uses the Linux `sched_getaffinity` semantics, so `0` targets the +/// calling thread and any other value names a specific thread id. +/// +/// Returns `None` if the affinity mask cannot be queried or would require an +/// unreasonably large probe buffer. +#[cfg(target_os = "linux")] +fn affinity_cpus(pid: libc::pid_t) -> Option> { + let word_bits = libc::c_ulong::BITS as usize; + let mut words = 1; + + // Probe `sched_getaffinity` with an exponentially growing buffer until the + // kernel either accepts it or reports a non-retryable error. + let (mask, bytes) = loop { + let mut mask = vec![0 as libc::c_ulong; words]; + let cpusetsize = std::mem::size_of_val(mask.as_slice()); + + // SAFETY: `mask` points to writable storage for `cpusetsize` bytes, and + // `pid` names the thread whose affinity mask should be queried. + let result = unsafe { + libc::syscall( + libc::SYS_sched_getaffinity, + pid, + cpusetsize, + mask.as_mut_ptr(), + ) + }; + + if result >= 0 { + break (mask, result as usize); + } + + let err = std::io::Error::last_os_error(); + match err.raw_os_error() { + Some(libc::EINTR) => continue, + Some(libc::EINVAL) => { + // Kernels with larger affinity masks require probing with a + // larger buffer. Cap the probe size so invalid environments + // cannot force unbounded growth. + words = words.checked_mul(2)?; + words + .checked_mul(word_bits) + .filter(|bits| *bits <= MAX_AFFINITY_CPUS)?; + } + _ => return None, + } + }; + + let mut cpus = Vec::new(); + + // `sched_getaffinity` reports how many bytes of the mask are meaningful. + // Walk that returned bitset and collect the enabled logical CPU ids. + for cpu in 0..(bytes * 8) { + let index = cpu / word_bits; + let offset = cpu % word_bits; + if (mask[index] & ((1 as libc::c_ulong) << offset)) != 0 { + cpus.push(cpu); + } + } + cpus.try_into().ok() +} + +/// Returns the logical CPU ids available for [`crate::Spawner::pinned`] placements. +/// +/// Returns `None` if the process affinity mask cannot be queried. +#[cfg(target_os = "linux")] +pub fn available_cpus() -> Option> { + // SAFETY: `getpid` has no preconditions and returns the process leader's + // pid, which `sched_getaffinity` uses to query that thread's baseline mask. + let pid = unsafe { libc::getpid() }; + affinity_cpus(pid) +} + +/// Returns `None` because CPU pinning is not available on this platform. +#[cfg(not(target_os = "linux"))] +pub const fn available_cpus() -> Option> { + None +} + +/// Sets the current thread's affinity mask to the given logical CPU ids. +/// +/// Returns an error if `cpus` is empty, contains an unreasonably large CPU id, +/// or if the operating system rejects the affinity change. +#[cfg(target_os = "linux")] +pub(crate) fn set_cpu_affinity(cpus: &[usize]) -> Result<(), std::io::Error> { + let Some(max_cpu) = cpus.iter().max() else { + return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); + }; + if *max_cpu >= MAX_AFFINITY_CPUS { + return Err(std::io::Error::from_raw_os_error(libc::EINVAL)); + } + + let word_bits = libc::c_ulong::BITS as usize; + let words = (max_cpu / word_bits) + .checked_add(1) + .expect("cpu bitset size overflow"); + let mut mask = vec![0 as libc::c_ulong; words]; + for &cpu in cpus { + mask[cpu / word_bits] |= (1 as libc::c_ulong) << (cpu % word_bits); + } + let cpusetsize = std::mem::size_of_val(mask.as_slice()); + + loop { + // SAFETY: `mask` points to readable storage for `cpusetsize` bytes, and + // `pid == 0` targets the calling thread as documented by the syscall. + let result = + unsafe { libc::syscall(libc::SYS_sched_setaffinity, 0, cpusetsize, mask.as_ptr()) }; + if result == 0 { + return Ok(()); + } + + let err = std::io::Error::last_os_error(); + match err.raw_os_error() { + Some(libc::EINTR) => continue, + _ => return Err(err), + } + } +} + +/// Sets the current thread's affinity mask to the given logical CPU ids. +/// +/// Always returns an error since setting CPU affinity is not available on this platform. +#[cfg(not(target_os = "linux"))] +pub(crate) fn set_cpu_affinity(_cpus: &[usize]) -> Result<(), std::io::Error> { + Err(std::io::Error::other( + "cpu affinity is not available on this platform", + )) +} + +/// Resets the current thread's affinity mask to [`available_cpus`]. +/// +/// Returns `Ok(())` if the baseline CPU set cannot be queried. +#[cfg(target_os = "linux")] +pub(crate) fn reset_cpu_affinity() -> Result<(), std::io::Error> { + available_cpus().map_or(Ok(()), |cpus| set_cpu_affinity(&cpus)) +} + +/// Resets the current thread's affinity mask to [`available_cpus`]. +/// +/// Always returns `Ok(())` since CPU affinity is not available on this platform. +#[cfg(not(target_os = "linux"))] +pub(crate) const fn reset_cpu_affinity() -> Result<(), std::io::Error> { + Ok(()) +} + +#[cfg(test)] +pub mod tests { + use super::*; + #[cfg(target_os = "linux")] + use commonware_utils::vec::NonEmptyVec; + + #[cfg(target_os = "linux")] + pub fn current_affinity_cpus() -> Option> { + affinity_cpus(0) + } + + #[cfg(target_os = "linux")] + #[test] + fn test_available_cpus_linux() { + assert!( + available_cpus().is_some(), + "expected at least one available CPU" + ); + } + + #[cfg(target_os = "linux")] + #[test] + fn test_set_cpu_affinity_linux() { + std::thread::spawn(|| { + let cpus = available_cpus().unwrap(); + let cpu = *cpus.first(); + let pinned = NonEmptyVec::new(cpu); + set_cpu_affinity(&pinned).unwrap(); + assert_eq!(current_affinity_cpus(), Some(pinned)); + + reset_cpu_affinity().unwrap(); + assert_eq!(current_affinity_cpus(), Some(cpus.clone())); + + let invalid_cpu = cpus.last().checked_add(1).unwrap(); + assert!( + set_cpu_affinity(&[invalid_cpu]).is_err(), + "expected affinity to a disallowed CPU to fail", + ); + + let err = set_cpu_affinity(&[]).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(libc::EINVAL)); + + let err = set_cpu_affinity(&[MAX_AFFINITY_CPUS]).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(libc::EINVAL)); + }) + .join() + .unwrap(); + } + + #[cfg(not(target_os = "linux"))] + #[test] + fn test_available_cpus_non_linux() { + assert!(available_cpus().is_none()); + } + + #[cfg(not(target_os = "linux"))] + #[test] + fn test_set_cpu_affinity_non_linux() { + assert!(set_cpu_affinity(&[0]).is_err()); + assert!(reset_cpu_affinity().is_ok()); + } }