Skip to content

Downscore and retry custody failures #7510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: peerdas-devnet-7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl<E: EthSpec> RpcBlock<E> {
custody_columns: Vec<CustodyDataColumn<E>>,
expected_custody_indices: Vec<ColumnIndex>,
spec: &ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
) -> Result<Self, String> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

let custody_columns_count = expected_custody_indices.len();
Expand All @@ -209,11 +209,7 @@ impl<E: EthSpec> RpcBlock<E> {
custody_columns,
spec.number_of_columns as usize,
)
.map_err(|e| {
AvailabilityCheckError::Unexpected(format!(
"custody_columns len exceeds number_of_columns: {e:?}"
))
})?,
.map_err(|e| format!("custody_columns len exceeds number_of_columns: {e:?}"))?,
expected_custody_indices,
};
Ok(Self {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2416,7 +2416,8 @@ where
columns,
expected_custody_indices,
&self.spec,
)?
)
.map_err(BlockError::InternalError)?
} else {
RpcBlock::new_without_blobs(Some(block_root), block, sampling_column_count)
}
Expand Down
22 changes: 17 additions & 5 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub struct BlobsByRangeRequestId {
pub struct DataColumnsByRangeRequestId {
/// Id to identify this attempt at a data_columns_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the parent custody by range request that issued this data_columns_by_range request
pub parent_request_id: CustodyByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct CustodyByRangeRequestId {
/// Id to identify this attempt at a meta custody by range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}
Expand Down Expand Up @@ -221,6 +229,7 @@ macro_rules! impl_display {
impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(CustodyByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester);
impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester);
impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id);
Expand Down Expand Up @@ -299,14 +308,17 @@ mod tests {
fn display_id_data_columns_by_range() {
let id = DataColumnsByRangeRequestId {
id: 123,
parent_request_id: ComponentsByRangeRequestId {
parent_request_id: CustodyByRangeRequestId {
id: 122,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
parent_request_id: ComponentsByRangeRequestId {
id: 121,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
},
},
},
};
assert_eq!(format!("{id}"), "123/122/RangeSync/0/54");
assert_eq!(format!("{id}"), "123/122/121/RangeSync/0/54");
}
}
19 changes: 19 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,25 @@ impl<E: EthSpec> NetworkGlobals<E> {
Self::new_test_globals_with_metadata(trusted_peers, metadata, config, spec)
}

pub fn new_test_globals_as_supernode(
trusted_peers: Vec<PeerId>,
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
is_supernode: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters currently - but thought might be worth mentioning - we use config.subscribe_all_data_column_subnets for determine the gossip topics, and it doesn't seem like we use it for determining is_supernode elsewhere, but something to keep in mind - if we want consistency maybe we can derive is_supernode from network config, but that may not be relevant from once we move to validator custody.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we should revisit after impl validator custody. Since this function is for unit tests without gossip, it doesn't matter

) -> NetworkGlobals<E> {
let metadata = MetaData::V3(MetaDataV3 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
custody_group_count: if is_supernode {
spec.number_of_custody_groups
} else {
spec.custody_requirement
},
});
Self::new_test_globals_with_metadata(trusted_peers, metadata, config, spec)
}

pub(crate) fn new_test_globals_with_metadata(
trusted_peers: Vec<PeerId>,
metadata: MetaData<E>,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_rpc_validate_data_columns(
self: &Arc<Self>,
block_root: Hash256,
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
data_columns: DataColumnSidecarList<T::EthSpec>,
seen_timestamp: Duration,
id: SamplingId,
) -> Result<(), Error<T::EthSpec>> {
Expand Down
60 changes: 28 additions & 32 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use parking_lot::RwLock;
use std::collections::{
btree_map::{BTreeMap, Entry},
HashSet,
HashMap, HashSet,
};
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
Expand Down Expand Up @@ -135,6 +136,8 @@ pub struct BackFillSync<T: BeaconChainTypes> {
/// This signifies that we are able to attempt to restart a failed chain.
restart_failed_sync: bool,

peers: Arc<RwLock<HashSet<PeerId>>>,

/// Reference to the beacon chain to obtain initial starting points for the backfill sync.
beacon_chain: Arc<BeaconChain<T>>,

Expand Down Expand Up @@ -179,6 +182,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
current_processing_batch: None,
validated_batches: 0,
restart_failed_sync: false,
peers: <_>::default(),
beacon_chain,
};

Expand Down Expand Up @@ -218,14 +222,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
match self.state() {
BackFillState::Syncing => {} // already syncing ignore.
BackFillState::Paused => {
if self
.network_globals
.peers
.read()
.synced_peers()
.next()
.is_some()
{
if !self.peers.read().is_empty() {
// If there are peers to resume with, begin the resume.
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
self.set_state(BackFillState::Syncing);
Expand Down Expand Up @@ -298,6 +295,14 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}

pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.write().insert(peer_id);
}

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.peers.write().remove(peer_id);
}

/// An RPC error has occurred.
///
/// If the batch exists it is re-requested.
Expand All @@ -312,7 +317,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
err: RpcResponseError,
) -> Result<(), BackFillError> {
Expand All @@ -326,11 +330,18 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
match batch.download_failed(Some(*peer_id)) {
// TODO(das): Is it necessary for the batch to track failed peers? Can we make this
// mechanism compatible with PeerDAS and before PeerDAS?
match batch.download_failed(None) {
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
}
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => self.fail_sync(match err {
RpcResponseError::RpcError(_)
| RpcResponseError::VerifyError(_)
| RpcResponseError::InternalError(_) => {
BackFillError::BatchDownloadFailed(batch_id)
}
RpcResponseError::RequestExpired(_) => BackFillError::Paused,
}),
Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id),
}
} else {
Expand Down Expand Up @@ -914,21 +925,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let synced_peers = self
.network_globals
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();

let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers();
match network.block_components_by_range_request(
request,
RangeRequestId::BackfillSync { batch_id },
&synced_peers,
self.peers.clone(),
&failed_peers,
// Does not track total requests per peers for now
&HashMap::new(),
) {
Ok(request_id) => {
// inform the batch about the new request
Expand All @@ -940,15 +945,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
Err(e) => match e {
RpcRequestSendError::NoPeer(no_peer) => {
// If we are here the chain has no more synced peers
info!(
"reason" = format!("insufficient_synced_peers({no_peer:?})"),
"Backfill sync paused"
);
self.set_state(BackFillState::Paused);
return Err(BackFillError::Paused);
}
RpcRequestSendError::InternalError(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(?id, "Block returned for single block lookup not present");
debug!(%id, "Block returned for single block lookup not present");
return Err(LookupRequestError::UnknownLookup);
};

Expand All @@ -507,7 +507,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok((response, peer_group, seen_timestamp)) => {
debug!(
?block_root,
?id,
%id,
?peer_group,
?response_type,
"Received lookup download success"
Expand Down Expand Up @@ -540,7 +540,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// the peer and the request ID which is linked to this `id` value here.
debug!(
?block_root,
?id,
%id,
?response_type,
error = ?e,
"Received lookup download failure"
Expand Down Expand Up @@ -724,7 +724,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Collect all peers that sent a column that was invalid. Must
// run .unique as a single peer can send multiple invalid
// columns. Penalize once to avoid insta-bans
.flat_map(|(index, _)| peer_group.of_index((*index) as usize))
.flat_map(|(index, _)| peer_group.of_index(&(*index as usize)))
.unique()
.collect(),
_ => peer_group.all().collect(),
Expand Down
Loading
Loading