Skip to content

Commit 4d5ed8d

Browse files
[runtime/tokio] use system thread stack size for all threads (#3475)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent ba3ab43 commit 4d5ed8d

5 files changed

Lines changed: 190 additions & 14 deletions

File tree

runtime/src/network/iouring.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
2626
use crate::{
2727
iouring::{self, should_retry, OpBuffer, OpFd, OpIovecs},
28-
Buf, BufferPool, Error, IoBuf, IoBufMut, IoBufs,
28+
utils, Buf, BufferPool, Error, IoBuf, IoBufMut, IoBufs,
2929
};
3030
use commonware_utils::channel::oneshot;
3131
use io_uring::{opcode, types::Fd};
@@ -71,6 +71,8 @@ pub struct Config {
7171
pub read_buffer_size: usize,
7272
/// Configuration for the iouring instance.
7373
pub iouring_config: iouring::Config,
74+
/// Stack size for the dedicated send and receive io_uring threads.
75+
pub thread_stack_size: usize,
7476
}
7577

7678
impl Default for Config {
@@ -82,6 +84,7 @@ impl Default for Config {
8284
read_write_timeout: iouring_config.max_op_timeout,
8385
iouring_config,
8486
read_buffer_size: DEFAULT_READ_BUFFER_SIZE,
87+
thread_stack_size: utils::thread::system_thread_stack_size(),
8588
}
8689
}
8790
}
@@ -134,13 +137,13 @@ impl Network {
134137
let sender_registry = registry.sub_registry_with_prefix("iouring_sender");
135138
let (send_submitter, send_loop) =
136139
iouring::IoUringLoop::new(cfg.iouring_config.clone(), sender_registry);
137-
std::thread::spawn(move || send_loop.run());
140+
utils::thread::spawn(cfg.thread_stack_size, move || send_loop.run());
138141

139142
// Create an io_uring instance to handle receive operations.
140143
let receiver_registry = registry.sub_registry_with_prefix("iouring_receiver");
141144
let (recv_submitter, recv_loop) =
142145
iouring::IoUringLoop::new(cfg.iouring_config, receiver_registry);
143-
std::thread::spawn(move || recv_loop.run());
146+
utils::thread::spawn(cfg.thread_stack_size, move || recv_loop.run());
144147

145148
Ok(Self {
146149
tcp_nodelay: cfg.tcp_nodelay,
@@ -664,7 +667,8 @@ mod tests {
664667
iouring::{Config, Network},
665668
tests,
666669
},
667-
BufferPool, BufferPoolConfig, Error, Listener as _, Network as _, Sink as _, Stream as _,
670+
thread, BufferPool, BufferPoolConfig, Error, Listener as _, Network as _, Sink as _,
671+
Stream as _,
668672
};
669673
use commonware_macros::{select, test_group};
670674
use prometheus_client::registry::Registry;
@@ -677,6 +681,14 @@ mod tests {
677681
BufferPool::new(BufferPoolConfig::for_network(), &mut Registry::default())
678682
}
679683

684+
#[test]
685+
fn test_default_thread_stack_size_uses_system_default() {
686+
assert_eq!(
687+
Config::default().thread_stack_size,
688+
thread::system_thread_stack_size()
689+
);
690+
}
691+
680692
#[tokio::test]
681693
async fn test_trait() {
682694
tests::test_network_trait(|| {

runtime/src/storage/iouring.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use super::Header;
2525
use crate::{
2626
iouring::{self, should_retry, OpBuffer, OpFd, OpIovecs},
27-
Buf, BufferPool, Error, IoBuf, IoBufs, IoBufsMut,
27+
utils, Buf, BufferPool, Error, IoBuf, IoBufs, IoBufsMut,
2828
};
2929
use commonware_codec::Encode;
3030
use commonware_utils::{channel::oneshot, from_hex, hex};
@@ -69,6 +69,8 @@ pub struct Config {
6969
pub storage_directory: PathBuf,
7070
/// Configuration for the iouring instance.
7171
pub iouring_config: iouring::Config,
72+
/// Stack size for the dedicated io_uring worker thread.
73+
pub thread_stack_size: usize,
7274
}
7375

7476
#[derive(Clone)]
@@ -80,22 +82,28 @@ pub struct Storage {
8082

8183
impl Storage {
8284
/// Returns a new `Storage` instance.
83-
pub fn start(mut cfg: Config, registry: &mut Registry, pool: BufferPool) -> Self {
85+
pub fn start(cfg: Config, registry: &mut Registry, pool: BufferPool) -> Self {
86+
let Config {
87+
storage_directory,
88+
mut iouring_config,
89+
thread_stack_size,
90+
} = cfg;
91+
8492
// Optimize performance by hinting the kernel that a single task will
8593
// submit requests. This is safe because each iouring instance runs in a
8694
// dedicated thread, which guarantees that the same thread that creates
8795
// the ring is the only thread submitting work to it.
88-
cfg.iouring_config.single_issuer = true;
96+
iouring_config.single_issuer = true;
8997

90-
let (io_submitter, iouring_loop) = iouring::IoUringLoop::new(cfg.iouring_config, registry);
98+
let (io_submitter, iouring_loop) = iouring::IoUringLoop::new(iouring_config, registry);
9199

92100
let storage = Self {
93-
storage_directory: cfg.storage_directory,
101+
storage_directory,
94102
io_submitter,
95103
pool,
96104
};
97105

98-
std::thread::spawn(move || iouring_loop.run());
106+
utils::thread::spawn(thread_stack_size, move || iouring_loop.run());
99107
storage
100108
}
101109
}
@@ -579,7 +587,8 @@ impl crate::Blob for Blob {
579587
mod tests {
580588
use super::{Header, *};
581589
use crate::{
582-
storage::tests::run_storage_tests, Blob, BufferPool, BufferPoolConfig, Storage as _,
590+
storage::tests::run_storage_tests, utils::thread, Blob, BufferPool, BufferPoolConfig,
591+
Storage as _,
583592
};
584593
use rand::{Rng as _, SeedableRng as _};
585594
use std::env;
@@ -595,6 +604,7 @@ mod tests {
595604
Config {
596605
storage_directory: storage_directory.clone(),
597606
iouring_config: Default::default(),
607+
thread_stack_size: thread::system_thread_stack_size(),
598608
},
599609
&mut Registry::default(),
600610
pool,

runtime/src/tokio/runtime.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use crate::{
1717
signal::Signal,
1818
storage::metered::Storage as MeteredStorage,
1919
telemetry::metrics::task::Label,
20-
utils::{add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard},
20+
utils::{
21+
self, add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard,
22+
},
2123
BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf,
2224
Spawner as _, StreamOf, METRICS_PREFIX,
2325
};
@@ -42,7 +44,6 @@ use std::{
4244
num::NonZeroUsize,
4345
path::PathBuf,
4446
sync::Arc,
45-
thread,
4647
time::{Duration, SystemTime},
4748
};
4849
use tokio::runtime::{Builder, Runtime};
@@ -141,6 +142,14 @@ pub struct Config {
141142
/// operations that require blocking (like `fs` and writing to `Stdout`).
142143
max_blocking_threads: usize,
143144

145+
/// Stack size to use for runtime-owned threads.
146+
///
147+
/// Defaults to the system stack size when the current platform exposes it,
148+
/// and otherwise falls back to Rust's default spawned-thread stack size.
149+
///
150+
/// See [utils::thread::system_thread_stack_size].
151+
thread_stack_size: usize,
152+
144153
/// Whether or not to catch panics.
145154
catch_panics: bool,
146155

@@ -170,6 +179,7 @@ impl Config {
170179
Self {
171180
worker_threads: 2,
172181
max_blocking_threads: 512,
182+
thread_stack_size: utils::thread::system_thread_stack_size(),
173183
catch_panics: false,
174184
storage_directory,
175185
maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
@@ -191,6 +201,11 @@ impl Config {
191201
self
192202
}
193203
/// See [Config]
204+
pub const fn with_thread_stack_size(mut self, n: usize) -> Self {
205+
self.thread_stack_size = n;
206+
self
207+
}
208+
/// See [Config]
194209
pub const fn with_catch_panics(mut self, b: bool) -> Self {
195210
self.catch_panics = b;
196211
self
@@ -241,6 +256,10 @@ impl Config {
241256
self.max_blocking_threads
242257
}
243258
/// See [Config]
259+
pub const fn thread_stack_size(&self) -> usize {
260+
self.thread_stack_size
261+
}
262+
/// See [Config]
244263
pub const fn catch_panics(&self) -> bool {
245264
self.catch_panics
246265
}
@@ -297,6 +316,7 @@ pub struct Executor {
297316
runtime: Runtime,
298317
shutdown: Mutex<Stopper>,
299318
panicker: Panicker,
319+
thread_stack_size: usize,
300320
}
301321

302322
/// Implementation of [crate::Runner] for the `tokio` runtime.
@@ -334,6 +354,7 @@ impl crate::Runner for Runner {
334354
let runtime = Builder::new_multi_thread()
335355
.worker_threads(self.cfg.worker_threads)
336356
.max_blocking_threads(self.cfg.max_blocking_threads)
357+
.thread_stack_size(self.cfg.thread_stack_size)
337358
.enable_all()
338359
.build()
339360
.expect("failed to create Tokio runtime");
@@ -368,6 +389,7 @@ impl crate::Runner for Runner {
368389
IoUringConfig {
369390
storage_directory: self.cfg.storage_directory.clone(),
370391
iouring_config: Default::default(),
392+
thread_stack_size: self.cfg.thread_stack_size,
371393
},
372394
iouring_registry,
373395
storage_buffer_pool.clone(),
@@ -404,6 +426,7 @@ impl crate::Runner for Runner {
404426
shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
405427
..Default::default()
406428
},
429+
thread_stack_size: self.cfg.thread_stack_size,
407430
..Default::default()
408431
};
409432
let network = MeteredNetwork::new(
@@ -435,6 +458,7 @@ impl crate::Runner for Runner {
435458
runtime,
436459
shutdown: Mutex::new(Stopper::default()),
437460
panicker,
461+
thread_stack_size: self.cfg.thread_stack_size,
438462
});
439463

440464
// Get metrics
@@ -578,7 +602,7 @@ impl crate::Spawner for Context {
578602
);
579603

580604
if matches!(past, Execution::Dedicated) {
581-
thread::spawn({
605+
utils::thread::spawn(executor.thread_stack_size, {
582606
// Ensure the task can access the tokio runtime
583607
let handle = executor.runtime.handle().clone();
584608
move || {
@@ -871,6 +895,21 @@ mod tests {
871895
);
872896
}
873897

898+
#[test]
899+
fn test_default_thread_stack_size_uses_system_default() {
900+
let cfg = Config::new();
901+
assert_eq!(
902+
cfg.thread_stack_size(),
903+
utils::thread::system_thread_stack_size()
904+
);
905+
}
906+
907+
#[test]
908+
fn test_thread_stack_size_override() {
909+
let cfg = Config::new().with_thread_stack_size(4 * 1024 * 1024);
910+
assert_eq!(cfg.thread_stack_size(), 4 * 1024 * 1024);
911+
}
912+
874913
#[test]
875914
fn test_explicit_buffer_pool_configs_override_worker_threads() {
876915
// Order does not matter -- explicit configs always win.

runtime/src/utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use std::{
1414

1515
commonware_macros::stability_mod!(BETA, pub mod buffer);
1616
pub mod signal;
17+
#[cfg(not(target_arch = "wasm32"))]
18+
pub(crate) mod thread;
1719

1820
mod handle;
1921
pub use handle::Handle;

runtime/src/utils/thread.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
//! Helpers for resolving the configured thread stack size.
2+
3+
use std::{env, sync::OnceLock, thread};
4+
5+
/// Cached configured thread stack size.
6+
static SYSTEM_THREAD_STACK_SIZE: OnceLock<usize> = OnceLock::new();
7+
8+
/// Rust's default thread stack size.
9+
///
10+
/// See <https://doc.rust-lang.org/std/thread/#stack-size>.
11+
const RUST_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024;
12+
13+
/// Returns the value of the `RUST_MIN_STACK` environment variable, if set.
14+
fn rust_min_stack() -> Option<usize> {
15+
env::var_os("RUST_MIN_STACK").and_then(|s| s.to_str().and_then(|s| s.parse().ok()))
16+
}
17+
18+
/// Resolves the stack size to use for runtime-owned threads.
19+
///
20+
/// If `RUST_MIN_STACK` is set, this uses that value so runtime-owned threads
21+
/// preserve Rust's process-wide spawned-thread override.
22+
///
23+
/// Otherwise, on Unix platforms other than macOS, this queries the default
24+
/// stack size for newly created pthreads via `pthread_attr_init` and
25+
/// `pthread_attr_getstacksize`.
26+
///
27+
/// On macOS, this instead uses `RLIMIT_STACK`. macOS distinguishes between the
28+
/// process stack limit and the smaller default stack size for secondary
29+
/// pthreads, so `pthread_attr_getstacksize` would otherwise resolve the wrong
30+
/// default for this use case. In practice, that means preferring the larger
31+
/// 8 MB process default over the 512 KB secondary-thread pthread default.
32+
///
33+
/// On other platforms, or if the platform-specific query fails, this falls back
34+
/// to [RUST_DEFAULT_THREAD_STACK_SIZE].
35+
pub(crate) fn system_thread_stack_size() -> usize {
36+
*SYSTEM_THREAD_STACK_SIZE.get_or_init(|| {
37+
rust_min_stack()
38+
.or(system_thread_stack_size_impl())
39+
.unwrap_or(RUST_DEFAULT_THREAD_STACK_SIZE)
40+
})
41+
}
42+
43+
#[cfg(all(unix, not(target_os = "macos")))]
44+
fn system_thread_stack_size_impl() -> Option<usize> {
45+
let mut attr = std::mem::MaybeUninit::<libc::pthread_attr_t>::uninit();
46+
47+
// SAFETY: `attr` points to uninitialized storage reserved for
48+
// `pthread_attr_t`, exactly as required by `pthread_attr_init`.
49+
if unsafe { libc::pthread_attr_init(attr.as_mut_ptr()) } != 0 {
50+
return None;
51+
}
52+
53+
// SAFETY: `pthread_attr_init` succeeded, so `attr` is now initialized.
54+
let mut attr = unsafe { attr.assume_init() };
55+
let mut stack_size = 0;
56+
// SAFETY: `attr` is a valid initialized pthread attribute object and
57+
// `stack_size` points to writable storage for the result.
58+
let get_result = unsafe { libc::pthread_attr_getstacksize(&attr, &mut stack_size) };
59+
// SAFETY: `attr` remains initialized until it is destroyed here.
60+
let destroy_result = unsafe { libc::pthread_attr_destroy(&mut attr) };
61+
62+
if get_result != 0 || destroy_result != 0 || stack_size == 0 {
63+
return None;
64+
}
65+
66+
Some(stack_size)
67+
}
68+
69+
#[cfg(target_os = "macos")]
70+
fn system_thread_stack_size_impl() -> Option<usize> {
71+
// macOS uses different defaults for the main thread and spawned threads:
72+
// the main thread stack is 8 MB, while secondary threads default to
73+
// 512 KB. We use `RLIMIT_STACK` here to avoid inheriting the smaller
74+
// secondary-thread default through `pthread_attr_getstacksize`.
75+
let mut stack_limit = std::mem::MaybeUninit::<libc::rlimit>::uninit();
76+
77+
// SAFETY: `stack_limit` points to uninitialized storage reserved for
78+
// `rlimit`, exactly as required by `getrlimit`.
79+
let limit_result = unsafe { libc::getrlimit(libc::RLIMIT_STACK, stack_limit.as_mut_ptr()) };
80+
if limit_result != 0 {
81+
return None;
82+
}
83+
84+
// SAFETY: `getrlimit` succeeded, so `stack_limit` is initialized.
85+
let stack_limit = unsafe { stack_limit.assume_init() };
86+
if stack_limit.rlim_cur == libc::RLIM_INFINITY {
87+
return None;
88+
}
89+
90+
let limit = usize::try_from(stack_limit.rlim_cur).ok()?;
91+
if limit == 0 {
92+
return None;
93+
}
94+
95+
Some(limit)
96+
}
97+
98+
#[cfg(not(unix))]
99+
const fn system_thread_stack_size_impl() -> Option<usize> {
100+
None
101+
}
102+
103+
/// Spawns a thread with an explicit stack size.
104+
pub(crate) fn spawn<F, T>(stack_size: usize, f: F) -> thread::JoinHandle<T>
105+
where
106+
F: FnOnce() -> T + Send + 'static,
107+
T: Send + 'static,
108+
{
109+
thread::Builder::new()
110+
.stack_size(stack_size)
111+
.spawn(f)
112+
.expect("failed to spawn thread")
113+
}

0 commit comments

Comments
 (0)