Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions turbine/src/xdp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// re-export since this is needed at validator startup
pub use agave_xdp::set_cpu_affinity;
pub use agave_xdp::{get_cpu, set_cpu_affinity};
#[cfg(target_os = "linux")]
use {
agave_xdp::{
Expand Down Expand Up @@ -207,6 +207,10 @@ impl XdpRetransmitter {
}
let tx_loop_config = tx_loop_config_builder.build_with_src_device(&dev);

// since we aren't necessarily allocating from the thread that we intend to run on,
// temporarily switch to the target cpu for each TxLoop to ensure that the Umem region
// is allocated to the correct numa node
let this_cpu = get_cpu().expect("linux implements sched_getcpu");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next we pull TxLoopBuilder init out to main thread. stash initial main thread cpu and restore it once all of the tx loops have been started

for (i, (receiver, cpu_id)) in receivers
.into_iter()
.zip(config.cpus.into_iter())
Expand All @@ -216,13 +220,12 @@ impl XdpRetransmitter {
let drop_sender = drop_sender.clone();
let atomic_router = Arc::clone(&atomic_router);
let config = tx_loop_config.clone();
let tx_loop_builder = TxLoopBuilder::new(cpu_id, QueueId(i as u64), config, &dev);
let tx_loop = tx_loop_builder.build();
threads.push(
Builder::new()
.name(format!("solRetransmIO{i:02}"))
.spawn(move || {
let tx_loop_builder =
TxLoopBuilder::new(cpu_id, QueueId(i as u64), config, &dev);
let tx_loop = tx_loop_builder.build();
tx_loop.run(receiver, drop_sender, move |ip| {
let r = atomic_router.load();
r.route(*ip).ok()
Expand All @@ -231,6 +234,8 @@ impl XdpRetransmitter {
.unwrap(),
);
}
// migrate main thread back off of the last xdp reserved cpu
set_cpu_affinity([this_cpu]).unwrap();

Ok((Self { threads }, XdpSender { senders }))
}
Expand Down
9 changes: 9 additions & 0 deletions xdp/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub(crate) struct RingConsumer {
cached_consumer: u32,
}

///Safety: Instances of `RingConsumer` MUST only be resident on one thread at a time
unsafe impl Send for RingConsumer {}

impl RingConsumer {
pub fn new(producer: *mut AtomicU32, consumer: *mut AtomicU32) -> Self {
Self {
Expand Down Expand Up @@ -306,6 +309,9 @@ pub(crate) struct RingProducer {
size: u32,
}

///Safety: Instances of `RingProducer` MUST only be resident on one thread at a time
unsafe impl Send for RingProducer {}

impl RingProducer {
pub fn new(producer: *mut AtomicU32, consumer: *mut AtomicU32, size: u32) -> Self {
Self {
Expand Down Expand Up @@ -435,6 +441,9 @@ pub struct RingMmap<T> {
pub flags: *mut AtomicU32,
}

///Safety: Instances of `RingMmap<T>` MUST only be resident on one thread at a time
unsafe impl<T> Send for RingMmap<T> {}

impl<T> Drop for RingMmap<T> {
fn drop(&mut self) {
unsafe {
Expand Down
18 changes: 18 additions & 0 deletions xdp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ pub fn set_cpu_affinity(cpus: impl IntoIterator<Item = usize>) -> Result<(), io:
pub fn set_cpu_affinity(_cpus: impl IntoIterator<Item = usize>) -> Result<(), io::Error> {
unimplemented!()
}

#[cfg(target_os = "linux")]
pub fn get_cpu() -> Result<usize, io::Error> {
unsafe {
let result = libc::sched_getcpu();
if result < 0 {
assert_eq!(result, -1);
Err(io::Error::last_os_error())
} else {
Ok(result as usize)
}
}
}

#[cfg(not(target_os = "linux"))]
pub fn get_cpu() -> Result<usize, io::Error> {
unimplemented!()
}
3 changes: 3 additions & 0 deletions xdp/src/umem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ pub struct PageAlignedMemory {
len: usize,
}

/// Safety: a `PageAlignedMemory` instance MUST only be resident in one thread at a time
unsafe impl Send for PageAlignedMemory {}

impl PageAlignedMemory {
pub fn alloc(frame_size: usize, frame_count: usize) -> Result<Self, AllocError> {
Self::alloc_with_page_size(
Expand Down