Skip to content

Commit 092aaae

Browse files
authored
Sync cleanups (#8230)
N/A 1. In the batch retry logic, we were failing to set the batch state to `AwaitingDownload` before attempting a retry. This PR sets it to `AwaitingDownload` before the retry and sets it back to `Downloading` if the retry suceeded in sending out a request 2. Remove all peer scoring logic from retrying and rely on just de priorotizing the failed peer. I finally concede the point to @dapplion 😄 3. Changes `block_components_by_range_request` to accept `block_peers` and `column_peers`. This is to ensure that we use the full synced peerset for requesting columns in order to avoid splitting the column peers among multiple head chains. During forward sync, we want the block peers to be the peers from the syncing chain and column peers to be all synced peers from the peerdb. Also, fixes a typo and calls `attempt_send_awaiting_download_batches` from more places Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
1 parent c012f46 commit 092aaae

File tree

6 files changed

+77
-44
lines changed

6 files changed

+77
-44
lines changed

beacon_node/lighthouse_network/src/peer_manager/peerdb.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -247,23 +247,16 @@ impl<E: EthSpec> PeerDB<E> {
247247
.map(|(peer_id, _)| peer_id)
248248
}
249249

250-
/// Returns all the synced peers from the list of allowed peers that claim to have the block
250+
/// Returns all the synced peers from the peer db that claim to have the block
251251
/// components for the given epoch based on `status.earliest_available_slot`.
252252
///
253253
/// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the
254254
/// required data.
255-
///
256-
/// If `allowed_peers` is `Some`, then filters for the epoch only for those peers.
257-
pub fn synced_peers_for_epoch<'a>(
258-
&'a self,
259-
epoch: Epoch,
260-
allowed_peers: Option<&'a HashSet<PeerId>>,
261-
) -> impl Iterator<Item = &'a PeerId> {
255+
pub fn synced_peers_for_epoch(&self, epoch: Epoch) -> impl Iterator<Item = &PeerId> {
262256
self.peers
263257
.iter()
264-
.filter(move |(peer_id, info)| {
265-
allowed_peers.is_none_or(|allowed| allowed.contains(peer_id))
266-
&& info.is_connected()
258+
.filter(move |(_, info)| {
259+
info.is_connected()
267260
&& match info.sync_status() {
268261
SyncStatus::Synced { info } => {
269262
info.has_slot(epoch.end_slot(E::slots_per_epoch()))

beacon_node/network/src/sync/backfill_sync/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
210210
.network_globals
211211
.peers
212212
.read()
213-
.synced_peers_for_epoch(self.to_be_downloaded, None)
213+
.synced_peers_for_epoch(self.to_be_downloaded)
214214
.next()
215215
.is_some()
216216
// backfill can't progress if we do not have peers in the required subnets post peerdas.
@@ -313,7 +313,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
313313
CouplingError::DataColumnPeerFailure {
314314
error,
315315
faulty_peers,
316-
action,
317316
exceeded_retries,
318317
} => {
319318
debug!(?batch_id, error, "Block components coupling error");
@@ -325,11 +324,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
325324
failed_columns.insert(*column);
326325
failed_peers.insert(*peer);
327326
}
328-
for peer in failed_peers.iter() {
329-
network.report_peer(*peer, *action, "failed to return columns");
330-
}
331327

332-
// Only retry if peer failure **and** retries have been exceeded
328+
// Only retry if peer failure **and** retries haven't been exceeded
333329
if !*exceeded_retries {
334330
return self.retry_partial_batch(
335331
network,
@@ -888,7 +884,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
888884
.network_globals
889885
.peers
890886
.read()
891-
.synced_peers_for_epoch(batch_id, None)
887+
.synced_peers_for_epoch(batch_id)
892888
.cloned()
893889
.collect::<HashSet<_>>();
894890

@@ -899,6 +895,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
899895
request,
900896
RangeRequestId::BackfillSync { batch_id },
901897
&synced_peers,
898+
&synced_peers, // All synced peers have imported up to the finalized slot so they must have their custody columns available
902899
&failed_peers,
903900
) {
904901
Ok(request_id) => {
@@ -964,7 +961,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
964961
.network_globals()
965962
.peers
966963
.read()
967-
.synced_peers_for_epoch(batch_id, None)
964+
.synced_peers_for_epoch(batch_id)
968965
.cloned()
969966
.collect::<HashSet<_>>();
970967

beacon_node/network/src/sync/block_sidecar_coupling.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use beacon_chain::{
22
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
33
};
44
use lighthouse_network::{
5-
PeerAction, PeerId,
5+
PeerId,
66
service::api_types::{
77
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
88
},
@@ -63,7 +63,6 @@ pub(crate) enum CouplingError {
6363
DataColumnPeerFailure {
6464
error: String,
6565
faulty_peers: Vec<(ColumnIndex, PeerId)>,
66-
action: PeerAction,
6766
exceeded_retries: bool,
6867
},
6968
BlobPeerFailure(String),
@@ -253,7 +252,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
253252
if let Err(CouplingError::DataColumnPeerFailure {
254253
error: _,
255254
faulty_peers,
256-
action: _,
257255
exceeded_retries: _,
258256
}) = &resp
259257
{
@@ -377,7 +375,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
377375
return Err(CouplingError::DataColumnPeerFailure {
378376
error: format!("No columns for block {block_root:?} with data"),
379377
faulty_peers: responsible_peers,
380-
action: PeerAction::LowToleranceError,
381378
exceeded_retries,
382379

383380
});
@@ -402,7 +399,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
402399
return Err(CouplingError::DataColumnPeerFailure {
403400
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
404401
faulty_peers: naughty_peers,
405-
action: PeerAction::LowToleranceError,
406402
exceeded_retries
407403
});
408404
}
@@ -468,7 +464,7 @@ mod tests {
468464
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec,
469465
};
470466
use lighthouse_network::{
471-
PeerAction, PeerId,
467+
PeerId,
472468
service::api_types::{
473469
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
474470
DataColumnsByRangeRequestId, Id, RangeRequestId,
@@ -785,15 +781,13 @@ mod tests {
785781
if let Err(super::CouplingError::DataColumnPeerFailure {
786782
error,
787783
faulty_peers,
788-
action,
789784
exceeded_retries,
790785
}) = result
791786
{
792787
assert!(error.contains("Peers did not return column"));
793788
assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing
794789
assert_eq!(faulty_peers[0].0, 3); // column index 3
795790
assert_eq!(faulty_peers[1].0, 4); // column index 4
796-
assert!(matches!(action, PeerAction::LowToleranceError));
797791
assert!(!exceeded_retries); // First attempt, should be false
798792
} else {
799793
panic!("Expected PeerFailure error");
@@ -957,13 +951,11 @@ mod tests {
957951
if let Err(super::CouplingError::DataColumnPeerFailure {
958952
error: _,
959953
faulty_peers,
960-
action,
961954
exceeded_retries,
962955
}) = result
963956
{
964957
assert_eq!(faulty_peers.len(), 1); // column 2 missing
965958
assert_eq!(faulty_peers[0].0, 2); // column index 2
966-
assert!(matches!(action, PeerAction::LowToleranceError));
967959
assert!(exceeded_retries); // Should be true after max retries
968960
} else {
969961
panic!("Expected PeerFailure error with exceeded_retries=true");

beacon_node/network/src/sync/network_context.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -533,19 +533,21 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
533533
batch_type: ByRangeRequestType,
534534
request: BlocksByRangeRequest,
535535
requester: RangeRequestId,
536-
peers: &HashSet<PeerId>,
536+
block_peers: &HashSet<PeerId>,
537+
column_peers: &HashSet<PeerId>,
537538
peers_to_deprioritize: &HashSet<PeerId>,
538539
) -> Result<Id, RpcRequestSendError> {
539540
let range_request_span = debug_span!(
540541
parent: None,
541542
SPAN_OUTGOING_RANGE_REQUEST,
542543
range_req_id = %requester,
543-
peers = peers.len()
544+
block_peers = block_peers.len(),
545+
column_peers = column_peers.len()
544546
);
545547
let _guard = range_request_span.clone().entered();
546548
let active_request_count_by_peer = self.active_request_count_by_peer();
547549

548-
let Some(block_peer) = peers
550+
let Some(block_peer) = block_peers
549551
.iter()
550552
.map(|peer| {
551553
(
@@ -579,7 +581,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
579581
.collect();
580582
Some(self.select_columns_by_range_peers_to_request(
581583
&column_indexes,
582-
peers,
584+
column_peers,
583585
active_request_count_by_peer,
584586
peers_to_deprioritize,
585587
)?)
@@ -770,7 +772,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
770772
let range_req = entry.get_mut();
771773
if let Some(blocks_result) = range_req.responses(&self.chain.spec) {
772774
if let Err(CouplingError::DataColumnPeerFailure {
773-
action: _,
774775
error,
775776
faulty_peers: _,
776777
exceeded_retries,

beacon_node/network/src/sync/range_sync/batch.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,31 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
334334
}
335335
}
336336

337+
/// Change the batch state from `Self::Downloading` to `Self::AwaitingDownload` without
338+
/// registering a failed attempt.
339+
///
340+
/// Note: must use this cautiously with some level of retry protection
341+
/// as not registering a failed attempt could lead to requesting in a loop.
342+
#[must_use = "Batch may have failed"]
343+
pub fn downloading_to_awaiting_download(
344+
&mut self,
345+
) -> Result<BatchOperationOutcome, WrongState> {
346+
match self.state.poison() {
347+
BatchState::Downloading(_) => {
348+
self.state = BatchState::AwaitingDownload;
349+
Ok(self.outcome())
350+
}
351+
BatchState::Poisoned => unreachable!("Poisoned batch"),
352+
other => {
353+
self.state = other;
354+
Err(WrongState(format!(
355+
"Download failed for batch in wrong state {:?}",
356+
self.state
357+
)))
358+
}
359+
}
360+
}
361+
337362
pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> {
338363
match self.state.poison() {
339364
BatchState::AwaitingDownload => {

beacon_node/network/src/sync/range_sync/chain.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
871871
CouplingError::DataColumnPeerFailure {
872872
error,
873873
faulty_peers,
874-
action,
875874
exceeded_retries,
876875
} => {
877876
debug!(?batch_id, error, "Block components coupling error");
@@ -883,12 +882,22 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
883882
failed_columns.insert(*column);
884883
failed_peers.insert(*peer);
885884
}
886-
for peer in failed_peers.iter() {
887-
network.report_peer(*peer, *action, "failed to return columns");
888-
}
889885
// Retry the failed columns if the column requests haven't exceeded the
890886
// max retries. Otherwise, remove treat it as a failed batch below.
891887
if !*exceeded_retries {
888+
// Set the batch back to `AwaitingDownload` before retrying.
889+
// This is to ensure that the batch doesn't get stuck in `Downloading` state.
890+
//
891+
// DataColumn retries has a retry limit so calling `downloading_to_awaiting_download`
892+
// is safe.
893+
if let BatchOperationOutcome::Failed { blacklist } =
894+
batch.downloading_to_awaiting_download()?
895+
{
896+
return Err(RemoveChain::ChainFailed {
897+
blacklist,
898+
failing_batch: batch_id,
899+
});
900+
}
892901
return self.retry_partial_batch(
893902
network,
894903
batch_id,
@@ -936,7 +945,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
936945
failing_batch: batch_id,
937946
});
938947
}
939-
self.send_batch(network, batch_id)
948+
// The errored batch is set to AwaitingDownload above.
949+
// We now just attempt to download all batches stuck in `AwaitingDownload`
950+
// state in the right order.
951+
self.attempt_send_awaiting_download_batches(network, "injecting error")
940952
} else {
941953
debug!(
942954
batch_epoch = %batch_id,
@@ -969,7 +981,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
969981
.collect();
970982
debug!(
971983
?awaiting_downloads,
972-
src, "Attempting to send batches awaiting downlaod"
984+
src, "Attempting to send batches awaiting download"
973985
);
974986

975987
for batch_id in awaiting_downloads {
@@ -998,11 +1010,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
9981010
let (request, batch_type) = batch.to_blocks_by_range_request();
9991011
let failed_peers = batch.failed_peers();
10001012

1001-
let synced_peers = network
1013+
let synced_column_peers = network
10021014
.network_globals()
10031015
.peers
10041016
.read()
1005-
.synced_peers_for_epoch(batch_id, Some(&self.peers))
1017+
.synced_peers_for_epoch(batch_id)
10061018
.cloned()
10071019
.collect::<HashSet<_>>();
10081020

@@ -1013,7 +1025,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10131025
chain_id: self.id,
10141026
batch_id,
10151027
},
1016-
&synced_peers,
1028+
// Request blocks only from peers of this specific chain
1029+
&self.peers,
1030+
// Request column from all synced peers, even if they are not part of this chain.
1031+
// This is to avoid splitting of good column peers across many head chains in a heavy forking
1032+
// environment. If the column peers and block peer are on different chains, then we return
1033+
// a coupling error and retry only the columns that failed to couple. See `Self::retry_partial_batch`.
1034+
&synced_column_peers,
10171035
&failed_peers,
10181036
) {
10191037
Ok(request_id) => {
@@ -1081,7 +1099,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10811099
.network_globals()
10821100
.peers
10831101
.read()
1084-
.synced_peers_for_epoch(batch_id, Some(&self.peers))
1102+
.synced_peers_for_epoch(batch_id)
10851103
.cloned()
10861104
.collect::<HashSet<_>>();
10871105

@@ -1093,13 +1111,17 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10931111
&failed_columns,
10941112
) {
10951113
Ok(_) => {
1114+
// inform the batch about the new request
1115+
batch.start_downloading(id)?;
10961116
debug!(
10971117
?batch_id,
10981118
id, "Retried column requests from different peers"
10991119
);
11001120
return Ok(KeepChain);
11011121
}
11021122
Err(e) => {
1123+
// No need to explicitly fail the batch since its in `AwaitingDownload` state
1124+
// before we attempted to retry.
11031125
debug!(?batch_id, id, e, "Failed to retry partial batch");
11041126
}
11051127
}
@@ -1123,6 +1145,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
11231145
) -> Result<KeepChain, RemoveChain> {
11241146
let _guard = self.span.clone().entered();
11251147
debug!("Resuming chain");
1148+
// attempt to download any batches stuck in the `AwaitingDownload` state because of
1149+
// a lack of peers before.
1150+
self.attempt_send_awaiting_download_batches(network, "resume")?;
11261151
// Request more batches if needed.
11271152
self.request_batches(network)?;
11281153
// If there is any batch ready for processing, send it.

0 commit comments

Comments
 (0)