Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions runtime/src/iobuf/benches/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

use super::utils::{measure, Threading};
use commonware_runtime::{
tokio, BufferPool, BufferPoolConfig, BufferPooler, IoBufMut, Runner as _,
page_size, tokio, BufferPool, BufferPoolConfig, BufferPooler, IoBufMut, Runner as _,
};
use commonware_utils::{NZUsize, NZU32};
use criterion::Criterion;
Expand Down Expand Up @@ -228,22 +228,3 @@ fn build_pool(size: usize, threads: usize) -> BufferPool {

tokio::Runner::new(runner_cfg).start(|ctx| async move { ctx.network_buffer_pool().clone() })
}

#[allow(clippy::missing_const_for_fn)]
fn page_size() -> usize {
#[cfg(unix)]
{
// SAFETY: sysconf is safe to call.
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096
} else {
size as usize
}
}

#[cfg(not(unix))]
{
4096
}
}
13 changes: 6 additions & 7 deletions runtime/src/iobuf/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,17 +830,16 @@ impl BufMut<PooledBacking> {
mod tests {
use super::*;
use crate::{
iobuf::pool::{BufferPool, BufferPoolConfig, BufferPoolThreadCacheConfig},
iobuf::{
cache_line_size, page_size, pool::BufferPoolThreadCacheConfig, BufferPool,
BufferPoolConfig,
},
telemetry::metrics::Registry,
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use commonware_utils::{NZUsize, NZU32};
use std::ops::Bound;

fn page_size() -> usize {
BufferPoolConfig::for_storage().min_size.get()
}

fn test_pool(config: BufferPoolConfig) -> BufferPool {
let mut registry = Registry::default();
BufferPool::new(config, &mut registry)
Expand Down Expand Up @@ -868,7 +867,7 @@ mod tests {
assert!((buf.as_ptr() as usize).is_multiple_of(page));

// Cache-line-aligned allocation should also satisfy its alignment.
let cache_line = BufferPoolConfig::for_network().alignment.get();
let cache_line = cache_line_size();
let buf2 = AlignedBuffer::new(4096, cache_line);
assert_eq!(buf2.capacity(), 4096);
assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
Expand Down Expand Up @@ -1319,7 +1318,7 @@ mod tests {
fn test_alignment_after_advance() {
// Advancing breaks base-pointer alignment, which is expected.
let page = page_size();
let pool = test_pool(BufferPoolConfig::for_storage());
let pool = test_pool(BufferPoolConfig::for_storage().with_alignment(NZUsize!(page)));
Comment thread
cursor[bot] marked this conversation as resolved.

let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0; 100]);
Expand Down
31 changes: 30 additions & 1 deletion runtime/src/iobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,37 @@ pub(crate) use buffer::AlignedBuffer;
use buffer::{AlignedBuf, AlignedBufMut, PooledBuf, PooledBufMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use commonware_codec::{util::at_least, BufsMut, EncodeSize, Error, RangeCfg, Read, Write};
use crossbeam_utils::CachePadded;
pub use pool::{BufferPool, BufferPoolConfig, BufferPoolThreadCache, PoolError};
use std::{collections::VecDeque, io::IoSlice, num::NonZeroUsize, ops::RangeBounds};
use std::{collections::VecDeque, io::IoSlice, mem::align_of, num::NonZeroUsize, ops::RangeBounds};

/// Returns the system page size.
///
/// On Unix systems, queries the actual page size via `sysconf`.
/// On other systems (Windows), defaults to 4KB.
#[allow(clippy::missing_const_for_fn)]
pub fn page_size() -> usize {
#[cfg(unix)]
{
// SAFETY: sysconf is safe to call.
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096 // Safe fallback if sysconf fails
} else {
size as usize
}
}

#[cfg(not(unix))]
{
4096
}
}

/// Returns the cache line size for the current architecture.
pub const fn cache_line_size() -> usize {
align_of::<CachePadded<u8>>()
}

#[cfg(feature = "bench")]
pub mod bench {
Expand Down
86 changes: 27 additions & 59 deletions runtime/src/iobuf/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@
//! first try to re-enter the dropping thread's local cache, spilling a bounded
//! batch back to the global freelist if needed.

use super::{freelist::Freelist, IoBufMut};
use super::{freelist::Freelist, page_size, IoBufMut};
use crate::{
iobuf::buffer::{PooledBufMut, PooledBuffer},
telemetry::metrics::{raw, Counter, CounterFamily, EncodeLabelSet, GaugeFamily, Register},
};
use commonware_utils::{NZUsize, NZU32};
use crossbeam_utils::CachePadded;
use std::{
alloc::Layout,
cell::{Cell, UnsafeCell},
mem::{align_of, MaybeUninit},
mem::MaybeUninit,
num::{NonZeroU32, NonZeroUsize},
ptr,
sync::{
Expand Down Expand Up @@ -95,35 +94,6 @@ impl std::fmt::Display for PoolError {

impl std::error::Error for PoolError {}

/// Returns the system page size.
///
/// On Unix systems, queries the actual page size via `sysconf`.
/// On other systems (Windows), defaults to 4KB.
#[cfg(unix)]
fn page_size() -> usize {
// SAFETY: sysconf is safe to call.
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096 // Safe fallback if sysconf fails
} else {
size as usize
}
}

#[cfg(not(unix))]
#[allow(clippy::missing_const_for_fn)]
fn page_size() -> usize {
4096
}

/// Returns the cache line size for the current architecture.
///
/// Matches the architecture-specific alignment used by
/// [`crossbeam_utils::CachePadded`].
const fn cache_line_size() -> usize {
align_of::<CachePadded<u8>>()
}

/// Policy for sizing each thread's cache within a buffer pool size class.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BufferPoolThreadCacheConfig {
Expand Down Expand Up @@ -184,40 +154,36 @@ pub struct BufferPoolConfig {
}

impl BufferPoolConfig {
/// Network I/O preset: cache-line aligned, 1KB to 64KB buffers,
/// 4096 per class, not prefilled.
/// Network I/O preset: 1KB to 128KB buffers, 4096 per class, not prefilled.
///
/// Network operations typically need multiple concurrent buffers per connection
/// (message, encoding, encryption) so we allow 4096 buffers per size class.
/// Cache-line alignment is used because network buffers don't require page
/// alignment for DMA, and smaller alignment reduces internal fragmentation.
/// Network operations typically need multiple concurrent buffers per
/// connection (message, encoding, encryption) so we allow 4096 buffers per
/// size class.
pub const fn for_network() -> Self {
let cache_line = NZUsize!(cache_line_size());
Self {
pool_min_size: 1024,
pool_min_size: 0,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pool is now faster than going to the system allocator, so there's no reason to bypass it in the defaults.

Comment thread
patrick-ogrady marked this conversation as resolved.
min_size: NZUsize!(1024),
max_size: NZUsize!(64 * 1024),
max_size: NZUsize!(128 * 1024),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? This means this pool will use 1GB of memory at peak, same as the storage pool below.

max_per_class: NZU32!(4096),
prefill: false,
alignment: cache_line,
alignment: NZUsize!(1),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually need any specific alignment here, and while this doesn't make any difference once the buffer is allocated and pooled the initial allocation is slower.

parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
}
}

/// Storage I/O preset: page-aligned, page_size to 8MB buffers, 64 per class,
/// Storage I/O preset: `page_size` (usually 4KB) to 8MB buffers, 64 per class,
/// not prefilled.
///
/// Page alignment is required for direct I/O and efficient DMA transfers.
pub fn for_storage() -> Self {
let page = NZUsize!(page_size());
Self {
pool_min_size: 1024,
pool_min_size: 0,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

min_size: page,
max_size: NZUsize!(8 * 1024 * 1024),
max_per_class: NZU32!(64),
prefill: false,
alignment: page,
// TODO (#2960): this needs to be page/block aligned for O_DIRECT
alignment: NZUsize!(1),
Comment thread
patrick-ogrady marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
}
Expand Down Expand Up @@ -1745,7 +1711,7 @@ impl BufferPool {
mod tests {
use super::*;
use crate::{
iobuf::{freelist, IoBuf},
iobuf::{cache_line_size, freelist, IoBuf},
telemetry::metrics::Registry,
};
use bytes::{Buf, BufMut};
Expand Down Expand Up @@ -2053,24 +2019,24 @@ mod tests {
fn test_config_for_network() {
let config = BufferPoolConfig::for_network();
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.pool_min_size, 0);
assert_eq!(config.min_size.get(), 1024);
assert_eq!(config.max_size.get(), 64 * 1024);
assert_eq!(config.max_size.get(), 128 * 1024);
assert_eq!(config.max_per_class.get(), 4096);
assert_eq!(config.parallelism, NZUsize!(1));
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Enabled(None)
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), cache_line_size());
assert_eq!(config.alignment.get(), 1);
}

#[test]
fn test_config_for_storage() {
let config = BufferPoolConfig::for_storage();
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.pool_min_size, 0);
assert_eq!(config.min_size.get(), page_size());
assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
assert_eq!(config.max_per_class.get(), 64);
Expand All @@ -2080,7 +2046,7 @@ mod tests {
BufferPoolThreadCacheConfig::Enabled(None)
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), page_size());
assert_eq!(config.alignment.get(), 1);
}

#[test]
Expand Down Expand Up @@ -2115,8 +2081,7 @@ mod tests {
BufferPoolThreadCacheConfig::Enabled(Some(NZUsize!(8)))
);
assert!(config.prefill);
// Storage profile alignment stays page-sized unless explicitly changed.
assert_eq!(config.alignment.get(), page_size());
assert_eq!(config.alignment.get(), 1);

// Alignment can be tuned explicitly as long as min_size is also adjusted.
let aligned = BufferPoolConfig::for_network()
Expand Down Expand Up @@ -3142,20 +3107,23 @@ mod tests {
fn test_buffer_alignment() {
let page = page_size();
let cache_line = cache_line_size();

// Reduce max_per_class under miri (atomics are slow)
cfg_if::cfg_if! {
if #[cfg(miri)] {
let storage_config = BufferPoolConfig {
max_per_class: NZU32!(32),
..BufferPoolConfig::for_storage()
..BufferPoolConfig::for_storage().with_alignment(NZUsize!(page))
};
let network_config = BufferPoolConfig {
max_per_class: NZU32!(32),
..BufferPoolConfig::for_network()
..BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line))
};
} else {
let storage_config = BufferPoolConfig::for_storage();
let network_config = BufferPoolConfig::for_network();
let storage_config =
BufferPoolConfig::for_storage().with_alignment(NZUsize!(page));
let network_config =
BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line));
}
}

Expand Down
4 changes: 2 additions & 2 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ stability_scope!(BETA {

pub mod iobuf;
pub use iobuf::{
BufferPool, BufferPoolConfig, BufferPoolThreadCache, Builder as IoBufsBuilder, IoBuf,
IoBufMut, IoBufs, IoBufsMut,
cache_line_size, page_size, BufferPool, BufferPoolConfig, BufferPoolThreadCache,
Builder as IoBufsBuilder, IoBuf, IoBufMut, IoBufs, IoBufsMut,
};

pub mod utils;
Expand Down
Loading