Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,10 +1573,7 @@ impl Validator {
.port();
let src_ip = match node.bind_ip_addrs.active() {
IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
IpAddr::V4(_unspecified) => xdp_config
.interface
.as_ref()
.and_then(|iface| master_ip_if_bonded(iface)),
IpAddr::V4(_unspecified) => master_ip_if_bonded(&xdp_config.network_device.if_name),
_ => panic!("IPv6 not supported"),
};
let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip, exit.clone())
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 17 additions & 34 deletions turbine/src/xdp.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,7 @@
// re-export since this is needed at validator startup
pub use agave_xdp::set_cpu_affinity;
#[cfg(target_os = "linux")]
use {
agave_xdp::{
device::{NetworkDevice, QueueId},
load_xdp_program,
route::Router,
route_monitor::RouteMonitor,
tx_loop::tx_loop,
},
arc_swap::ArcSwap,
crossbeam_channel::TryRecvError,
std::{thread::Builder, time::Duration},
};
use {
agave_xdp::get_network_device::NetworkDevice,
crossbeam_channel::{Sender, TrySendError},
solana_ledger::shred,
std::{
Expand All @@ -23,13 +11,23 @@ use {
thread,
},
};
#[cfg(target_os = "linux")]
use {
agave_xdp::{
device::QueueId, load_xdp_program, route::Router, route_monitor::RouteMonitor,
tx_loop::tx_loop,
},
arc_swap::ArcSwap,
crossbeam_channel::TryRecvError,
std::{thread::Builder, time::Duration},
};

#[cfg(target_os = "linux")]
const ROUTE_MONITOR_UPDATE_INTERVAL: Duration = Duration::from_millis(50);

#[derive(Clone, Debug)]
pub struct XdpConfig {
pub interface: Option<String>,
pub network_device: NetworkDevice,
pub cpus: Vec<usize>,
pub zero_copy: bool,
// The capacity of the channel that sits between retransmit stage and each XDP thread that
Expand All @@ -42,21 +40,10 @@ impl XdpConfig {
const DEFAULT_RTX_CHANNEL_CAP: usize = 1_000_000;
}

impl Default for XdpConfig {
fn default() -> Self {
Self {
interface: None,
cpus: vec![],
zero_copy: false,
rtx_channel_cap: Self::DEFAULT_RTX_CHANNEL_CAP,
}
}
}

impl XdpConfig {
pub fn new(interface: Option<impl Into<String>>, cpus: Vec<usize>, zero_copy: bool) -> Self {
pub fn new(network_device: NetworkDevice, cpus: Vec<usize>, zero_copy: bool) -> Self {
Self {
interface: interface.map(|s| s.into()),
network_device,
cpus,
zero_copy,
rtx_channel_cap: XdpConfig::DEFAULT_RTX_CHANNEL_CAP,
Expand Down Expand Up @@ -138,19 +125,15 @@ impl XdpRetransmitter {
};
const DROP_CHANNEL_CAP: usize = 1_000_000;

let dev = config.network_device;

// switch to higher caps while we setup XDP. We assume that an error in
// this function is irrecoverable so we don't try to drop on errors.
for cap in [CAP_NET_ADMIN, CAP_NET_RAW, CAP_BPF, CAP_PERFMON] {
caps::raise(None, CapSet::Effective, cap)
.map_err(|e| format!("failed to raise {cap:?} capability: {e}"))?;
}

let dev = Arc::new(if let Some(interface) = config.interface {
NetworkDevice::new(interface).unwrap()
} else {
NetworkDevice::new_from_default_route().unwrap()
});

let ebpf = if config.zero_copy {
Some(load_xdp_program(&dev).map_err(|e| format!("failed to attach xdp program: {e}"))?)
} else {
Expand Down Expand Up @@ -205,7 +188,7 @@ impl XdpRetransmitter {
.zip(config.cpus.into_iter())
.enumerate()
{
let dev = Arc::clone(&dev);
let dev = dev.clone();
let drop_sender = drop_sender.clone();
let atomic_router = Arc::clone(&atomic_router);
threads.push(
Expand Down
1 change: 1 addition & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ agave-unstable-api = []
agave-geyser-plugin-interface = { workspace = true }
agave-logger = { workspace = true }
agave-snapshots = { workspace = true }
agave-xdp = { workspace = true }
chrono = { workspace = true, features = ["default", "serde"] }
clap = { workspace = true }
console = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions validator/src/commands/run/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
snapshot_config::{SnapshotConfig, SnapshotUsage},
ArchiveFormat, SnapshotInterval, SnapshotVersion,
},
agave_xdp::get_network_device::get_network_device,
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
crossbeam_channel::unbounded,
log::*,
Expand Down Expand Up @@ -462,11 +463,10 @@ pub fn execute(
let xdp_interface = matches.value_of("retransmit_xdp_interface");
let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
XdpConfig::new(
xdp_interface,
parse_cpu_ranges(cpus).unwrap(),
xdp_zero_copy,
)
// this will only succeed on linux
let xdp_device =
get_network_device(xdp_interface).expect("XDP interface must be a valid net device");
XdpConfig::new(xdp_device, parse_cpu_ranges(cpus).unwrap(), xdp_zero_copy)
});

let account_paths: Vec<PathBuf> =
Expand Down
3 changes: 2 additions & 1 deletion xdp/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use {
#[derive(Copy, Clone, Debug)]
pub struct QueueId(pub u64);

#[derive(Debug, Clone)]
pub struct NetworkDevice {
if_index: u32,
if_name: String,
pub if_name: String,
}

impl NetworkDevice {
Expand Down
39 changes: 39 additions & 0 deletions xdp/src/get_network_device.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#[cfg(target_os = "linux")]
pub use crate::device::NetworkDevice;
use std::error::Error;

#[cfg(target_os = "linux")]
pub fn get_network_device(interface: Option<&str>) -> Result<NetworkDevice, Box<dyn Error>> {
// switch to higher caps while we setup XDP. We assume that an error in
// this function is irrecoverable so we don't try to drop on errors.
use caps::{
CapSet,
Capability::{CAP_BPF, CAP_NET_ADMIN, CAP_NET_RAW, CAP_PERFMON},
};
for cap in [CAP_NET_ADMIN, CAP_NET_RAW, CAP_BPF, CAP_PERFMON] {
caps::raise(None, CapSet::Effective, cap)
.map_err(|e| format!("failed to raise {cap:?} capability: {e}"))?;
}

let dev = if let Some(interface) = interface {
NetworkDevice::new(interface)?
} else {
NetworkDevice::new_from_default_route()?
};

for cap in [CAP_NET_ADMIN, CAP_NET_RAW, CAP_BPF, CAP_PERFMON] {
caps::drop(None, CapSet::Effective, cap)?;
}
Ok(dev)
}

#[cfg(not(target_os = "linux"))]
#[derive(Debug, Clone)]
pub struct NetworkDevice {
pub if_name: String,
}

#[cfg(not(target_os = "linux"))]
pub fn get_network_device(_interface: Option<&str>) -> Result<NetworkDevice, Box<dyn Error>> {
Err("XDP not supported on this platform!".into())
}
3 changes: 3 additions & 0 deletions xdp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub mod umem;

#[cfg(target_os = "linux")]
pub use program::load_xdp_program;

pub mod get_network_device;

use std::io;

#[cfg(target_os = "linux")]
Expand Down
Loading