Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use kaspa_p2p_mining::rule_engine::MiningRuleEngine;
use kaspa_utils::iter::IterExtensions;
use kaspa_utils::networking::PeerId;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use std::{collections::hash_map::Entry, fmt::Display};
use std::{
Expand Down Expand Up @@ -212,6 +212,7 @@ pub struct FlowContextInner {
hub: Hub,
orphans_pool: AsyncRwLock<OrphanBlocksPool>,
shared_block_requests: Arc<Mutex<HashMap<Hash, RequestScopeMetadata>>>,
block_inv_processing_metadata: Arc<Mutex<BlockInvProcessingMetadata>>,
transactions_spread: AsyncRwLock<TransactionsSpread>,
shared_transaction_requests: Arc<Mutex<HashMap<TransactionId, RequestScopeMetadata>>>,
is_ibd_running: Arc<AtomicBool>,
Expand Down Expand Up @@ -251,6 +252,40 @@ impl Drop for IbdRunningGuard {
}
}

#[derive(Debug, Clone, Default)]
struct BlockInvProcessingMetadata {
processing: HashMap<Hash, HashSet<PeerKey>>,
}

impl BlockInvProcessingMetadata {
// Bound the amount of hashes kept.
const MAX_SIZE: usize = 10_000;

fn insert_hash(&mut self, peer: PeerKey, hash: Hash) -> bool {
if self.processing.len() >= Self::MAX_SIZE {
// Remove a random entry to make space
if let Some(first_key) = self.processing.keys().next().cloned() {
self.processing.remove(&first_key);
}
};
match self.processing.entry(hash) {
Entry::Occupied(mut e) => {
let peers = e.get_mut();
peers.insert(peer);
false
}
Entry::Vacant(e) => {
e.insert(HashSet::from_iter(once(peer)));
true
}
}
}

fn remove_hash(&mut self, hash: &Hash) -> Option<HashSet<PeerKey>> {
self.processing.remove(hash)
}
}

#[derive(Debug, Clone, Copy)]
struct IbdMetadata {
/// The peer from which current IBD is syncing from
Expand Down Expand Up @@ -320,6 +355,7 @@ impl FlowContext {
consensus_manager,
orphans_pool: AsyncRwLock::new(OrphanBlocksPool::new(max_orphans)),
shared_block_requests: Arc::new(Mutex::new(HashMap::new())),
block_inv_processing_metadata: Arc::new(Mutex::new(BlockInvProcessingMetadata::default())),
transactions_spread: AsyncRwLock::new(TransactionsSpread::new(hub.clone())),
shared_transaction_requests: Arc::new(Mutex::new(HashMap::new())),
is_ibd_running: Default::default(),
Expand Down Expand Up @@ -491,7 +527,6 @@ impl FlowContext {
}
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) }), None).await;

self.on_new_block(consensus, Default::default(), block, virtual_state_task).await;
self.log_block_event(BlockLogEvent::Submit(hash));

Expand Down Expand Up @@ -693,6 +728,14 @@ impl FlowContext {
pub async fn broadcast_transactions<I: IntoIterator<Item = TransactionId>>(&self, transaction_ids: I, should_throttle: bool) {
self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await
}

pub async fn register_hash_for_processing_loop(&self, peer: PeerKey, hash: Hash) -> bool {
self.block_inv_processing_metadata.lock().insert_hash(peer, hash)
}

pub async fn unregister_hash_from_processing_loop(&self, hash: &Hash) -> Option<HashSet<PeerKey>> {
self.block_inv_processing_metadata.lock().remove_hash(hash)
}
}

#[async_trait]
Expand Down
50 changes: 39 additions & 11 deletions protocol/flows/src/v7/blockrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use kaspa_consensus_core::{api::BlockValidationFutures, block::Block, blockstatus::BlockStatus, errors::block::RuleError};
use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusProxy};
use kaspa_core::debug;
use kaspa_core::{debug, warn};
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
IncomingRoute, Router, SharedIncomingRoute,
Expand Down Expand Up @@ -98,25 +98,37 @@ impl HandleRelayInvsFlow {
let session = self.ctx.consensus().unguarded_session();
let is_ibd_in_transitional_state = session.async_is_consensus_in_transitional_ibd_state().await;

let is_new_hash = self.ctx.register_hash_for_processing_loop(self.router.key(), inv.hash).await;
if !is_new_hash {
debug!("Received inv of block {} which is already being processed, continuing...", inv.hash);
continue;
};

match session.async_get_block_status(inv.hash).await {
None | Some(BlockStatus::StatusHeaderOnly) => {} // Continue processing this missing inv
Some(BlockStatus::StatusInvalid) => {
// Report a protocol error
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
return Err(ProtocolError::OtherOwned(format!("sent inv of an invalid block {}", inv.hash)));
}
_ => {
// Block is already known, skip to next inv
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
debug!("Relay block {} already exists, continuing...", inv.hash);
continue;
}
}

match self.ctx.get_orphan_roots_if_known(&session, inv.hash).await {
OrphanOutput::Unknown => {} // Keep processing this inv
OrphanOutput::NoRoots(_) => continue, // Existing orphan w/o missing roots
OrphanOutput::Unknown => {} // Keep processing this inv
OrphanOutput::NoRoots(_) => {
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
continue; // Existing orphan w/o missing roots
}
OrphanOutput::Roots(roots) => {
// Known orphan with roots to enqueue
self.enqueue_orphan_roots(inv.hash, roots, inv.known_within_range);
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
continue;
}
}
Expand All @@ -125,17 +137,19 @@ impl HandleRelayInvsFlow {
// Note: If the node is considered nearly synced we continue processing relay blocks even though an IBD is in progress.
// For instance this means that downloading a side-chain from a delayed node does not interop the normal flow of live blocks.
debug!("Got relay block {} while in IBD and the node is out of sync, continuing...", inv.hash);
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
continue;
}

// We keep the request scope alive until consensus processes the block
let Some((block, request_scope)) = self.request_block(inv.hash, self.msg_route.id(), self.header_format).await? else {
debug!("Relay block {} was already requested from another peer, continuing...", inv.hash);
warn!("Relay block {} was already requested from another peer, this is unexpected, continuing...", inv.hash);
continue;
};
request_scope.report_obtained();

if block.is_header_only() {
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
return Err(ProtocolError::OtherOwned(format!("sent header of {} where expected block with body", block.hash())));
}

Expand All @@ -152,11 +166,13 @@ impl HandleRelayInvsFlow {
"Relay block {} has lower blue work than virtual's merge depth root ({} <= {}), hence we are skipping it",
inv.hash, block.header.blue_work, blue_work_threshold
);
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
continue;
}
// if in a transitional ibd state, do not wait, sync immediately
if is_ibd_in_transitional_state {
self.try_trigger_ibd(block)?;
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
continue;
}

Expand All @@ -176,7 +192,10 @@ impl HandleRelayInvsFlow {
Ok(_) => {}
// We disconnect on invalidness even though this is not a direct relay from this peer, because
// current relay is a descendant of this block (i.e. this peer claims all its ancestors are valid)
Err(rule_error) => return Err(rule_error.into()),
Err(rule_error) => {
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
return Err(rule_error.into());
}
}
}

Expand All @@ -188,16 +207,25 @@ impl HandleRelayInvsFlow {
debug!("Unorphaned {} ancestors and retried orphan block {} successfully", n, block.hash())
}
},
Err(rule_error) => return Err(rule_error.into()),
Err(rule_error) => {
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
return Err(rule_error.into());
}
}
ancestor_batch
} else {
continue;
}
}
Err(rule_error) => return Err(rule_error.into()),
Err(rule_error) => {
let _ = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;
return Err(rule_error.into());
}
};

// once we unregister the hash at this point, we know the hash will be in the status store, which then hence performs the filtering.
let registered_peers_for_hash = self.ctx.unregister_hash_from_processing_loop(&inv.hash).await;

// As a policy, we only relay blocks who stand a chance to enter past(virtual).
// The only mining rule which permanently excludes a block is the merge depth bound
// (as opposed to "max parents" and "mergeset size limit" rules)
Expand All @@ -207,15 +235,15 @@ impl HandleRelayInvsFlow {
.iter()
.map(|b| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) }))
.collect();
// we filter out the current peer to avoid sending it back invs we know it already has
self.ctx.hub().broadcast_many(msgs, Some(self.router.key())).await;
// we filter out peers that (in the meantime) sent us the original processed hash to avoid sending it back invs we know it already has.
self.ctx.hub().broadcast_many(msgs, registered_peers_for_hash.as_ref()).await;

// we filter out the current peer to avoid sending it back the same invs
// we filter out peers that (in the meantime) sent us the original processed hash to avoid sending it back invs we know it already has.
self.ctx
.hub()
.broadcast(
make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) }),
Some(self.router.key()),
registered_peers_for_hash.as_ref(),
)
.await;
}
Expand Down
24 changes: 7 additions & 17 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{ConnectionInitializer, Peer, Router, common::ProtocolError, pb::Kasp
use kaspa_core::{debug, info, warn};
use parking_lot::RwLock;
use std::{
collections::{HashMap, hash_map::Entry::Occupied},
collections::{HashMap, HashSet, hash_map::Entry::Occupied},
sync::Arc,
};
use tokio::sync::mpsc::Receiver as MpscReceiver;
Expand Down Expand Up @@ -131,14 +131,9 @@ impl Hub {
}

/// Broadcast a message to all peers (except an optional filtered peer)
pub async fn broadcast(&self, msg: KaspadMessage, filter_peer: Option<PeerKey>) {
let peers = self
.peers
.read()
.values()
.filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer))
.cloned()
.collect::<Vec<_>>();
pub async fn broadcast(&self, msg: KaspadMessage, filter_peers: Option<&HashSet<PeerKey>>) {
let peers =
self.peers.read().values().filter(|&r| !filter_peers.is_some_and(|f| f.contains(&r.key()))).cloned().collect::<Vec<_>>();
for router in peers {
let _ = router.enqueue(msg.clone()).await;
}
Expand All @@ -156,17 +151,12 @@ impl Hub {
}

/// Broadcast a vector of messages to all peers (except an optional filtered peer)
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>, filter_peer: Option<PeerKey>) {
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>, filter_peers: Option<&HashSet<PeerKey>>) {
if msgs.is_empty() {
return;
}
let peers = self
.peers
.read()
.values()
.filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer))
.cloned()
.collect::<Vec<_>>();
let peers =
self.peers.read().values().filter(|&r| !filter_peers.is_some_and(|f| f.contains(&r.key()))).cloned().collect::<Vec<_>>();
for router in peers {
for msg in msgs.iter().cloned() {
let _ = router.enqueue(msg).await;
Expand Down