Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ use {
solana_turbine::{
self,
broadcast_stage::BroadcastStageType,
xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitBuilder, XdpRetransmitter},
},
solana_unified_scheduler_logic::SchedulingMode,
solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode},
Expand Down Expand Up @@ -1558,8 +1558,9 @@ impl Validator {
.and_then(|iface| master_ip_if_bonded(iface)),
_ => panic!("IPv6 not supported"),
};
let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip, exit.clone())
let xdp_retransmit_builder = XdpRetransmitBuilder::new(xdp_config, src_port, src_ip)
.expect("failed to create xdp retransmitter");
let (rtx, sender) = xdp_retransmit_builder.build(exit.clone());
(Some(rtx), Some(sender))
} else {
(None, None)
Expand Down
1 change: 1 addition & 0 deletions dev-bins/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thiserror = { workspace = true }
wincode = { workspace = true }

[target.'cfg(target_os = "linux")'.dependencies]
aya = { workspace = true }
caps = { workspace = true }

[dev-dependencies]
Expand Down
155 changes: 105 additions & 50 deletions turbine/src/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use {
load_xdp_program,
route::Router,
route_monitor::RouteMonitor,
tx_loop::{TxLoopBuilder, TxLoopConfigBuilder},
tx_loop::{TxLoop, TxLoopBuilder, TxLoopConfigBuilder},
umem::{OwnedUmem, PageAlignedMemory},
},
arc_swap::ArcSwap,
aya::Ebpf,
crossbeam_channel::TryRecvError,
std::{thread::Builder, time::Duration},
};
Expand Down Expand Up @@ -114,14 +116,24 @@ pub struct XdpRetransmitter {
threads: Vec<thread::JoinHandle<()>>,
}

impl XdpRetransmitter {
#[cfg(not(target_os = "linux"))]
pub struct XdpRetransmitBuilder {}

#[cfg(target_os = "linux")]
pub struct XdpRetransmitBuilder {
tx_loops: Vec<TxLoop<OwnedUmem<PageAlignedMemory>>>,
rtx_channel_cap: usize,
maybe_ebpf: Option<Ebpf>,
router: Router,
}

impl XdpRetransmitBuilder {
#[cfg(not(target_os = "linux"))]
pub fn new(
_config: XdpConfig,
_src_port: u16,
_src_ip: Option<Ipv4Addr>,
_exit: Arc<AtomicBool>,
) -> Result<(Self, XdpSender), Box<dyn Error>> {
) -> Result<Self, Box<dyn Error>> {
Err("XDP is only supported on Linux".into())
}

Expand All @@ -130,13 +142,45 @@ impl XdpRetransmitter {
config: XdpConfig,
src_port: u16,
src_ip: Option<Ipv4Addr>,
exit: Arc<AtomicBool>,
) -> Result<(Self, XdpSender), Box<dyn Error>> {
) -> Result<Self, Box<dyn Error>> {
use caps::{
CapSet,
Capability::{CAP_BPF, CAP_NET_ADMIN, CAP_NET_RAW, CAP_PERFMON},
};
const DROP_CHANNEL_CAP: usize = 1_000_000;
let XdpConfig {
interface: maybe_interface,
cpus,
zero_copy,
rtx_channel_cap,
} = config;

let dev = Arc::new(if let Some(interface) = maybe_interface {
NetworkDevice::new(interface).unwrap()
} else {
NetworkDevice::new_from_default_route().unwrap()
});

let mut tx_loop_config_builder = TxLoopConfigBuilder::new(src_port);
tx_loop_config_builder.zero_copy(zero_copy);
if let Some(src_ip) = src_ip {
tx_loop_config_builder.override_src_ip(src_ip);
}
let tx_loop_config = tx_loop_config_builder.build_with_src_device(&dev);

// since we aren't necessarily allocating from the thread that we intend to run on,
// temporarily switch to the target cpu for each TxLoop to ensure that the Umem region
// is allocated to the correct numa node
let this_cpu = get_cpu().expect("linux implements sched_getcpu");
let tx_loop_builders = cpus
.into_iter()
.zip(std::iter::repeat_with(|| tx_loop_config.clone()))
.enumerate()
.map(|(i, (cpu_id, config))| {
TxLoopBuilder::new(cpu_id, QueueId(i as u64), config, &dev)
})
.collect::<Vec<_>>();
// migrate main thread back off of the last xdp reserved cpu
set_cpu_affinity([this_cpu]).unwrap();
Comment on lines +170 to +183
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retain the main thread cpu restoration after isolating the logic that requires caps being raised

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_cpu_affinity([this_cpu]) seems incorrect? this cpu != all the possible cpus in the original mask

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we haven't applied the masked yet. i moved that back


// switch to higher caps while we setup XDP. We assume that an error in
// this function is irrecoverable so we don't try to drop on errors.
Expand All @@ -145,38 +189,56 @@ impl XdpRetransmitter {
.map_err(|e| format!("failed to raise {cap:?} capability: {e}"))?;
}

let dev = Arc::new(if let Some(interface) = config.interface {
NetworkDevice::new(interface).unwrap()
} else {
NetworkDevice::new_from_default_route().unwrap()
});

let ebpf = if config.zero_copy {
Some(load_xdp_program(&dev).map_err(|e| format!("failed to attach xdp program: {e}"))?)
let maybe_ebpf_result = if zero_copy {
Some(load_xdp_program(&dev).map_err(|e| format!("failed to attach xdp program: {e}")))
} else {
None
};

let tx_loops = tx_loop_builders
.into_iter()
.map(|tx_loop_builder| tx_loop_builder.build())
.collect::<Vec<_>>();

let router_result = Router::new();

for cap in [CAP_NET_ADMIN, CAP_NET_RAW, CAP_BPF, CAP_PERFMON] {
caps::drop(None, CapSet::Effective, cap).unwrap();
}

let (senders, receivers) = (0..config.cpus.len())
.map(|_| crossbeam_channel::bounded(config.rtx_channel_cap))
.unzip::<_, _, Vec<_>, Vec<_>>();
let router = router_result?;
let maybe_ebpf = maybe_ebpf_result.transpose()?;

// Use ArcSwap for lock-free updates of the routing table
let atomic_router = Arc::new(ArcSwap::from_pointee(Router::new()?));
let monitor_handle = RouteMonitor::start(
Arc::clone(&atomic_router),
exit.clone(),
ROUTE_MONITOR_UPDATE_INTERVAL,
);
Ok(Self {
tx_loops,
rtx_channel_cap,
maybe_ebpf,
router,
})
}

let mut threads = vec![];
threads.push(monitor_handle);
#[cfg(not(target_os = "linux"))]
pub fn build(self, _exit: Arc<AtomicBool>) -> (XdpRetransmitter, XdpSender) {
(
XdpRetransmitter { threads: vec![] },
XdpSender { senders: vec![] },
)
}

#[cfg(target_os = "linux")]
pub fn build(self, exit: Arc<AtomicBool>) -> (XdpRetransmitter, XdpSender) {
const DROP_CHANNEL_CAP: usize = 1_000_000;

let Self {
tx_loops,
rtx_channel_cap,
maybe_ebpf,
router,
} = self;

let (drop_sender, drop_receiver) = crossbeam_channel::bounded(DROP_CHANNEL_CAP);
let mut threads = vec![];

threads.push(
Builder::new()
.name("solRetransmDrop".to_owned())
Expand All @@ -195,33 +257,25 @@ impl XdpRetransmitter {
}
}
// move the ebpf program here so it stays attached until we exit
drop(ebpf);
drop(maybe_ebpf);
})
.unwrap(),
);

let mut tx_loop_config_builder = TxLoopConfigBuilder::new(src_port);
tx_loop_config_builder.zero_copy(config.zero_copy);
if let Some(src_ip) = src_ip {
tx_loop_config_builder.override_src_ip(src_ip);
}
let tx_loop_config = tx_loop_config_builder.build_with_src_device(&dev);
// Use ArcSwap for lock-free updates of the routing table
let atomic_router = Arc::new(ArcSwap::from_pointee(router));
let monitor_handle = RouteMonitor::start(
Arc::clone(&atomic_router),
exit.clone(),
ROUTE_MONITOR_UPDATE_INTERVAL,
);
threads.push(monitor_handle);

// since we aren't necessarily allocating from the thread that we intend to run on,
// temporarily switch to the target cpu for each TxLoop to ensure that the Umem region
// is allocated to the correct numa node
let this_cpu = get_cpu().expect("linux implements sched_getcpu");
for (i, (receiver, cpu_id)) in receivers
.into_iter()
.zip(config.cpus.into_iter())
.enumerate()
{
let dev = Arc::clone(&dev);
let mut senders = vec![];
for (i, tx_loop) in tx_loops.into_iter().enumerate() {
let (sender, receiver) = crossbeam_channel::bounded(rtx_channel_cap);
let drop_sender = drop_sender.clone();
let atomic_router = Arc::clone(&atomic_router);
let config = tx_loop_config.clone();
let tx_loop_builder = TxLoopBuilder::new(cpu_id, QueueId(i as u64), config, &dev);
let tx_loop = tx_loop_builder.build();
threads.push(
Builder::new()
.name(format!("solRetransmIO{i:02}"))
Expand All @@ -233,13 +287,14 @@ impl XdpRetransmitter {
})
.unwrap(),
);
senders.push(sender);
}
// migrate main thread back off of the last xdp reserved cpu
set_cpu_affinity([this_cpu]).unwrap();

Ok((Self { threads }, XdpSender { senders }))
(XdpRetransmitter { threads }, XdpSender { senders })
}
}

impl XdpRetransmitter {
pub fn join(self) -> thread::Result<()> {
for handle in self.threads {
handle.join()?;
Expand Down