Skip to content

Commit f56845e

Browse files
authored
Gossip: Load shed packets on send and pull fixed number of packets on recv (#5177)
* Implement dropping sender to streamline gossip send/recv bounds * Remove now unnecessary metrics
1 parent 32209c7 commit f56845e

File tree

4 files changed

+155
-131
lines changed

4 files changed

+155
-131
lines changed

gossip/src/cluster_info.rs

Lines changed: 110 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use {
4242
},
4343
weighted_shuffle::WeightedShuffle,
4444
},
45-
crossbeam_channel::{Receiver, Sender, TrySendError},
45+
crossbeam_channel::{bounded, Receiver, SendError, Sender, TryRecvError, TrySendError},
4646
itertools::{Either, Itertools},
4747
rand::{seq::SliceRandom, CryptoRng, Rng},
4848
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
@@ -58,7 +58,7 @@ use {
5858
},
5959
solana_perf::{
6060
data_budget::DataBudget,
61-
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
61+
packet::{Packet, PacketBatch, PacketBatchRecycler},
6262
},
6363
solana_pubkey::Pubkey,
6464
solana_quic_definitions::QUIC_PORT_OFFSET,
@@ -71,14 +71,14 @@ use {
7171
packet,
7272
quic::DEFAULT_QUIC_ENDPOINTS,
7373
socket::SocketAddrSpace,
74-
streamer::{PacketBatchReceiver, PacketBatchSender},
74+
streamer::{ChannelSend, PacketBatchReceiver},
7575
},
7676
solana_time_utils::timestamp,
7777
solana_transaction::Transaction,
7878
solana_vote::vote_parser,
7979
std::{
8080
borrow::Borrow,
81-
collections::{HashMap, HashSet, VecDeque},
81+
collections::{HashMap, HashSet},
8282
fmt::Debug,
8383
fs::{self, File},
8484
io::{BufReader, BufWriter, Write},
@@ -103,25 +103,24 @@ const DEFAULT_EPOCH_DURATION: Duration =
103103
Duration::from_millis(DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT);
104104
/// milliseconds we sleep for between gossip requests
105105
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
106-
/// A hard limit on incoming gossip messages
107-
/// Chosen to be able to handle 1Gbps of pure gossip traffic
108-
/// 128MB/PACKET_DATA_SIZE
109-
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
110106
/// Capacity for the [`ClusterInfo::run_socket_consume`] and [`ClusterInfo::run_listen`]
111107
/// intermediate packet batch buffers.
112108
///
113-
/// Uses a heuristic of 28 packets per [`PacketBatch`], which is an observed
114-
/// average of packets per batch. The buffers are re-used across processing loops,
115-
/// so any extra capacity that may be reserved due to traffic variations will be preserved,
116-
/// avoiding excessive resizing and re-allocation.
117-
const CHANNEL_RECV_BUFFER_INITIAL_CAPACITY: usize = MAX_GOSSIP_TRAFFIC.div_ceil(28);
109+
/// To avoid the overhead of dropping large sets of packet batches in each processing loop,
110+
/// we limit the number of packet batches that are pulled from the corresponding channel on each iteration.
111+
/// This ensures that the number of `madvise` system calls is minimized and, as such, that large interruptions
112+
/// to the processing loop are avoided.
113+
const CHANNEL_CONSUME_CAPACITY: usize = 1024;
118114
/// Channel capacity for gossip channels.
119115
///
120-
/// It was observed that under extreme load, the channel caps out
121-
/// around 11k capacity. This rounds that up to the next power of 2
122-
/// such that load shedding is highly unlikely to occur on the sender
123-
/// and continues to be done on the receiver side.
124-
pub(crate) const GOSSIP_CHANNEL_CAPACITY: usize = 16_384; // 2^14
116+
/// A hard limit on incoming gossip messages.
117+
///
118+
/// 262,144 packets with saturated packet batches (64 packets).
119+
///
120+
/// 114,688 packets with observed average packet batch size (28 packets),
121+
/// putting this within reasonable range of previous hard limit
122+
/// of `MAX_GOSSIP_TRAFFIC` (103,896).
123+
pub(crate) const GOSSIP_CHANNEL_CAPACITY: usize = 4096; // 2^12
125124
const GOSSIP_PING_CACHE_CAPACITY: usize = 126976;
126125
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
127126
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64);
@@ -208,6 +207,75 @@ fn should_retain_crds_value(
208207
}
209208
}
210209

210+
/// A sender implementation that evicts the oldest message when the channel is full.
211+
#[derive(Clone)]
212+
pub(crate) struct EvictingSender<T> {
213+
sender: Sender<T>,
214+
receiver: Receiver<T>,
215+
}
216+
217+
impl<T> EvictingSender<T> {
218+
/// Create a new evicting sender with provided sender, receiver.
219+
#[inline]
220+
pub(crate) fn new(sender: Sender<T>, receiver: Receiver<T>) -> Self {
221+
Self { sender, receiver }
222+
}
223+
224+
/// Create a new `EvictingSender` with a bounded channel of the specified capacity.
225+
#[inline]
226+
pub(crate) fn new_bounded(capacity: usize) -> (Self, Receiver<T>) {
227+
let (sender, receiver) = bounded(capacity);
228+
(Self::new(sender, receiver.clone()), receiver)
229+
}
230+
}
231+
232+
impl<T> ChannelSend<T> for EvictingSender<T>
233+
where
234+
T: Send + 'static,
235+
{
236+
#[inline]
237+
fn send(&self, msg: T) -> std::result::Result<(), SendError<T>> {
238+
self.sender.send(msg)
239+
}
240+
241+
fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>> {
242+
let Err(e) = self.sender.try_send(msg) else {
243+
return Ok(());
244+
};
245+
246+
match e {
247+
// Prefer newer messages over older messages.
248+
TrySendError::Full(msg) => match self.receiver.try_recv() {
249+
Ok(older) => {
250+
// Attempt to requeue the newer message.
251+
// NB: if multiple senders are used, and another sender is faster than us to send() after we've popped `older`,
252+
// our try_send() will fail with Full(msg), in which case we drop the new message.
253+
self.sender.try_send(msg)?;
254+
// Propagate the error _with the older message_.
255+
Err(TrySendError::Full(older))
256+
}
257+
// Unlikely race condition -- it was just indicated that the channel is full.
258+
// Attempt to requeue the message.
259+
Err(TryRecvError::Empty) => self.sender.try_send(msg),
260+
// Unreachable in practice since we maintain a reference to both the sender and receiver.
261+
Err(TryRecvError::Disconnected) => unreachable!(),
262+
},
263+
// Unreachable in practice since we maintain a reference to both the sender and receiver.
264+
TrySendError::Disconnected(_) => unreachable!(),
265+
}
266+
}
267+
268+
#[inline]
269+
fn is_empty(&self) -> bool {
270+
self.receiver.is_empty()
271+
}
272+
273+
#[inline]
274+
fn len(&self) -> usize {
275+
self.receiver.len()
276+
}
277+
}
278+
211279
impl ClusterInfo {
212280
pub fn new(
213281
contact_info: ContactInfo,
@@ -252,7 +320,7 @@ impl ClusterInfo {
252320
recycler: &PacketBatchRecycler,
253321
stakes: &HashMap<Pubkey, u64>,
254322
gossip_validators: Option<&HashSet<Pubkey>>,
255-
sender: &PacketBatchSender,
323+
sender: &impl ChannelSend<PacketBatch>,
256324
) {
257325
let shred_version = self.my_contact_info.read().unwrap().shred_version();
258326
let self_keypair: Arc<Keypair> = self.keypair().clone();
@@ -1347,7 +1415,7 @@ impl ClusterInfo {
13471415
gossip_validators: Option<&HashSet<Pubkey>>,
13481416
recycler: &PacketBatchRecycler,
13491417
stakes: &HashMap<Pubkey, u64>,
1350-
sender: &PacketBatchSender,
1418+
sender: &impl ChannelSend<PacketBatch>,
13511419
generate_pull_requests: bool,
13521420
) -> Result<(), GossipError> {
13531421
let _st = ScopedTimer::from(&self.stats.gossip_transmit_loop_time);
@@ -1468,7 +1536,7 @@ impl ClusterInfo {
14681536
pub fn gossip(
14691537
self: Arc<Self>,
14701538
bank_forks: Option<Arc<RwLock<BankForks>>>,
1471-
sender: PacketBatchSender,
1539+
sender: impl ChannelSend<PacketBatch>,
14721540
gossip_validators: Option<HashSet<Pubkey>>,
14731541
exit: Arc<AtomicBool>,
14741542
) -> JoinHandle<()> {
@@ -1601,7 +1669,7 @@ impl ClusterInfo {
16011669
thread_pool: &ThreadPool,
16021670
recycler: &PacketBatchRecycler,
16031671
stakes: &HashMap<Pubkey, u64>,
1604-
response_sender: &PacketBatchSender,
1672+
response_sender: &impl ChannelSend<PacketBatch>,
16051673
) {
16061674
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time);
16071675
if !requests.is_empty() {
@@ -1836,7 +1904,7 @@ impl ClusterInfo {
18361904
&self,
18371905
pings: impl IntoIterator<Item = (S, Ping), IntoIter: ExactSizeIterator>,
18381906
recycler: &PacketBatchRecycler,
1839-
response_sender: &PacketBatchSender,
1907+
response_sender: &impl ChannelSend<PacketBatch>,
18401908
) {
18411909
let _st = ScopedTimer::from(&self.stats.handle_batch_ping_messages_time);
18421910
let keypair: Arc<Keypair> = self.keypair().clone();
@@ -1867,7 +1935,7 @@ impl ClusterInfo {
18671935
thread_pool: &ThreadPool,
18681936
recycler: &PacketBatchRecycler,
18691937
stakes: &HashMap<Pubkey, u64>,
1870-
response_sender: &PacketBatchSender,
1938+
response_sender: &impl ChannelSend<PacketBatch>,
18711939
) {
18721940
let _st = ScopedTimer::from(&self.stats.handle_batch_push_messages_time);
18731941
if messages.is_empty() {
@@ -1960,10 +2028,10 @@ impl ClusterInfo {
19602028

19612029
fn process_packets(
19622030
&self,
1963-
packets: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2031+
packets: &mut Vec<Vec<(/*from:*/ SocketAddr, Protocol)>>,
19642032
thread_pool: &ThreadPool,
19652033
recycler: &PacketBatchRecycler,
1966-
response_sender: &PacketBatchSender,
2034+
response_sender: &impl ChannelSend<PacketBatch>,
19672035
stakes: &HashMap<Pubkey, u64>,
19682036
epoch_duration: Duration,
19692037
should_check_duplicate_instance: bool,
@@ -2108,45 +2176,24 @@ impl ClusterInfo {
21082176
thread_pool: &ThreadPool,
21092177
epoch_specs: Option<&mut EpochSpecs>,
21102178
receiver: &PacketBatchReceiver,
2111-
sender: &Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2112-
packet_buf: &mut VecDeque<PacketBatch>,
2179+
sender: &impl ChannelSend<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2180+
packet_buf: &mut Vec<PacketBatch>,
21132181
) -> Result<(), GossipError> {
2114-
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) {
2115-
for packet in packets {
2116-
let k = packet
2117-
.data(..4)
2118-
.and_then(|data| <[u8; 4]>::try_from(data).ok())
2119-
.map(u32::from_le_bytes)
2120-
.filter(|&k| k < 6)
2121-
.unwrap_or(/*invalid:*/ 6) as usize;
2122-
dropped_packets_counts[k] += 1;
2123-
}
2124-
}
2125-
let mut dropped_packets_counts = [0u64; 7];
21262182
let mut num_packets = 0;
21272183
for packet_batch in receiver
21282184
.recv()
21292185
.map(std::iter::once)?
21302186
.chain(receiver.try_iter())
21312187
{
21322188
num_packets += packet_batch.len();
2133-
packet_buf.push_back(packet_batch);
2134-
while num_packets > MAX_GOSSIP_TRAFFIC {
2135-
// Discard older packets in favor of more recent ones.
2136-
let Some(packet_batch) = packet_buf.pop_front() else {
2137-
break;
2138-
};
2139-
num_packets -= packet_batch.len();
2140-
count_dropped_packets(&packet_batch, &mut dropped_packets_counts);
2189+
packet_buf.push(packet_batch);
2190+
if packet_buf.len() == CHANNEL_CONSUME_CAPACITY {
2191+
break;
21412192
}
21422193
}
2143-
let num_packets_dropped = self.stats.record_dropped_packets(&dropped_packets_counts);
21442194
self.stats
21452195
.packets_received_count
2146-
.add_relaxed(num_packets as u64 + num_packets_dropped);
2147-
self.stats
2148-
.socket_consume_packet_buf_capacity
2149-
.max_relaxed(packet_buf.capacity() as u64);
2196+
.add_relaxed(num_packets as u64);
21502197
fn verify_packet(
21512198
packet: &Packet,
21522199
stakes: &HashMap<Pubkey, u64>,
@@ -2210,33 +2257,22 @@ impl ClusterInfo {
22102257
recycler: &PacketBatchRecycler,
22112258
mut epoch_specs: Option<&mut EpochSpecs>,
22122259
receiver: &Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2213-
response_sender: &PacketBatchSender,
2260+
response_sender: &impl ChannelSend<PacketBatch>,
22142261
thread_pool: &ThreadPool,
22152262
should_check_duplicate_instance: bool,
2216-
packet_buf: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2263+
packet_buf: &mut Vec<Vec<(/*from:*/ SocketAddr, Protocol)>>,
22172264
) -> Result<(), GossipError> {
22182265
let _st = ScopedTimer::from(&self.stats.gossip_listen_loop_time);
2219-
let mut num_packets = 0;
22202266
for pkts in receiver
22212267
.recv()
22222268
.map(std::iter::once)?
22232269
.chain(receiver.try_iter())
22242270
{
2225-
num_packets += pkts.len();
2226-
packet_buf.push_back(pkts);
2227-
while num_packets > MAX_GOSSIP_TRAFFIC {
2228-
let Some(num) = packet_buf.pop_front().as_ref().map(Vec::len) else {
2229-
break;
2230-
};
2231-
self.stats
2232-
.gossip_packets_dropped_count
2233-
.add_relaxed(num as u64);
2234-
num_packets -= num;
2271+
packet_buf.push(pkts);
2272+
if packet_buf.len() == CHANNEL_CONSUME_CAPACITY {
2273+
break;
22352274
}
22362275
}
2237-
self.stats
2238-
.listen_packet_buf_capacity
2239-
.max_relaxed(packet_buf.capacity() as u64);
22402276
let stakes = epoch_specs
22412277
.as_mut()
22422278
.map(|epoch_specs| epoch_specs.current_epoch_staked_nodes())
@@ -2265,7 +2301,7 @@ impl ClusterInfo {
22652301
self: Arc<Self>,
22662302
bank_forks: Option<Arc<RwLock<BankForks>>>,
22672303
receiver: PacketBatchReceiver,
2268-
sender: Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2304+
sender: impl ChannelSend<Vec<(/*from:*/ SocketAddr, Protocol)>>,
22692305
exit: Arc<AtomicBool>,
22702306
) -> JoinHandle<()> {
22712307
let thread_pool = ThreadPoolBuilder::new()
@@ -2274,7 +2310,7 @@ impl ClusterInfo {
22742310
.build()
22752311
.unwrap();
22762312
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
2277-
let mut packet_buf = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY);
2313+
let mut packet_buf = Vec::with_capacity(CHANNEL_CONSUME_CAPACITY);
22782314
let run_consume = move || {
22792315
while !exit.load(Ordering::Relaxed) {
22802316
match self.run_socket_consume(
@@ -2303,7 +2339,7 @@ impl ClusterInfo {
23032339
self: Arc<Self>,
23042340
bank_forks: Option<Arc<RwLock<BankForks>>>,
23052341
requests_receiver: Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
2306-
response_sender: PacketBatchSender,
2342+
response_sender: impl ChannelSend<PacketBatch>,
23072343
should_check_duplicate_instance: bool,
23082344
exit: Arc<AtomicBool>,
23092345
) -> JoinHandle<()> {
@@ -2314,7 +2350,7 @@ impl ClusterInfo {
23142350
.build()
23152351
.unwrap();
23162352
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
2317-
let mut packet_buf = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY);
2353+
let mut packet_buf = Vec::with_capacity(CHANNEL_CONSUME_CAPACITY);
23182354
Builder::new()
23192355
.name("solGossipListen".to_string())
23202356
.spawn(move || {
@@ -3012,7 +3048,7 @@ fn verify_gossip_addr<R: Rng + CryptoRng>(
30123048
fn send_gossip_packets<S: Borrow<SocketAddr>>(
30133049
pkts: impl IntoIterator<Item = (S, Protocol), IntoIter: ExactSizeIterator>,
30143050
recycler: &PacketBatchRecycler,
3015-
sender: &PacketBatchSender,
3051+
sender: &impl ChannelSend<PacketBatch>,
30163052
stats: &GossipStats,
30173053
) {
30183054
let pkts = pkts.into_iter();

0 commit comments

Comments
 (0)