Skip to content

Commit ee2332b

Browse files
authored
Backport #954 to v2.3: Fix shred retransmit (#955)
1 parent a065951 commit ee2332b

File tree

1 file changed

+27
-21
lines changed

1 file changed

+27
-21
lines changed

turbine/src/retransmit_stage.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -411,15 +411,8 @@ fn retransmit_shred(
411411
return None;
412412
}
413413
let mut compute_turbine_peers = Measure::start("turbine_start");
414-
let (root_distance, addrs) = get_retransmit_addrs(
415-
&key,
416-
root_bank,
417-
cache,
418-
addr_cache,
419-
socket_addr_space,
420-
stats,
421-
shred_receiver_addr,
422-
)?;
414+
let (root_distance, addrs) =
415+
get_retransmit_addrs(&key, root_bank, cache, addr_cache, socket_addr_space, stats)?;
423416
compute_turbine_peers.stop();
424417
stats
425418
.compute_turbine_peers_total
@@ -441,7 +434,13 @@ fn retransmit_shred(
441434
RetransmitSocket::Xdp(sender) => {
442435
let mut sent = num_addrs;
443436
if num_addrs > 0 {
444-
if let Err(e) = sender.try_send(key.index(), addrs.to_vec(), shred) {
437+
// shred receiver not included in the stats
438+
let mut send_addrs = Vec::with_capacity(num_addrs + 1);
439+
send_addrs.extend(addrs.iter());
440+
if let Some(addr) = shred_receiver_addr {
441+
send_addrs.push(*addr);
442+
}
443+
if let Err(e) = sender.try_send(key.index(), send_addrs, shred) {
445444
log::warn!("xdp channel full: {e:?}");
446445
stats
447446
.num_shreds_dropped_xdp_full
@@ -451,13 +450,24 @@ fn retransmit_shred(
451450
}
452451
sent
453452
}
454-
RetransmitSocket::Socket(socket) => match multi_target_send(socket, shred, &addrs) {
455-
Ok(()) => num_addrs,
456-
Err(SendPktsError::IoError(ioerr, num_failed)) => {
457-
error!("retransmit_to multi_target_send error: {ioerr:?}, {num_failed}/{} packets failed", num_addrs);
458-
num_addrs - num_failed
453+
RetransmitSocket::Socket(socket) => {
454+
let mut send_addrs = Vec::with_capacity(num_addrs + 1);
455+
send_addrs.extend(addrs.iter());
456+
// shred receiver not included in the stats
457+
if let Some(addr) = shred_receiver_addr {
458+
send_addrs.push(*addr);
459+
}
460+
match multi_target_send(socket, shred, &send_addrs) {
461+
Ok(()) => num_addrs,
462+
Err(SendPktsError::IoError(ioerr, num_failed)) => {
463+
error!(
464+
"retransmit_to multi_target_send error: {ioerr:?}, \
465+
{num_failed}/{num_addrs} packets failed"
466+
);
467+
num_addrs.saturating_sub(num_failed)
468+
}
459469
}
460-
},
470+
}
461471
},
462472
};
463473
retransmit_time.stop();
@@ -487,25 +497,21 @@ fn get_retransmit_addrs<'a>(
487497
addr_cache: &'a AddrCache,
488498
socket_addr_space: &SocketAddrSpace,
489499
stats: &RetransmitStats,
490-
shred_receiver_addr: &Option<SocketAddr>,
491500
) -> Option<(/*root_distance:*/ u8, Cow<'a, [SocketAddr]>)> {
492501
if let Some((root_distance, addrs)) = addr_cache.get(shred) {
493502
stats.addr_cache_hit.fetch_add(1, Ordering::Relaxed);
494503
return Some((root_distance, Cow::Borrowed(addrs)));
495504
}
496505
let (slot_leader, cluster_nodes) = cache.get(&shred.slot())?;
497506
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank);
498-
let (root_distance, mut addrs) = cluster_nodes
507+
let (root_distance, addrs) = cluster_nodes
499508
.get_retransmit_addrs(slot_leader, shred, data_plane_fanout, socket_addr_space)
500509
.inspect_err(|err| match err {
501510
Error::Loopback { .. } => {
502511
stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed);
503512
}
504513
})
505514
.ok()?;
506-
if let Some(shred_receiver_addr) = shred_receiver_addr {
507-
addrs.push(*shred_receiver_addr);
508-
}
509515
stats.addr_cache_miss.fetch_add(1, Ordering::Relaxed);
510516
Some((root_distance, Cow::Owned(addrs)))
511517
}

0 commit comments

Comments
 (0)