Skip to content

Commit 5a91d26

Browse files
committed
Consensus messages are now targeted at NodeIDs (H512) instead of the internally used peer_id, to make sure that they are sent to the correct peer.
DMDcoin#248
1 parent 10351eb commit 5a91d26

File tree

11 files changed

+75
-68
lines changed

11 files changed

+75
-68
lines changed

crates/ethcore/src/client/chain_notify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ pub trait ChainNotify: Send + Sync {
184184
}
185185

186186
/// fires when chain sends a message to a specific peer
187-
fn send(&self, _message_type: ChainMessageType, _node_id: Option<H512>) {
187+
fn send(&self, _message_type: ChainMessageType, _node_id: &H512) {
188188
// does nothing by default
189189
}
190190

crates/ethcore/src/client/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3307,7 +3307,9 @@ impl super::traits::EngineClient for Client {
33073307
.sent_consensus_messages_bytes
33083308
.fetch_add(message.len() as u64, std::sync::atomic::Ordering::Relaxed);
33093309

3310-
self.notify(|notify| notify.send(ChainMessageType::Consensus(message.clone()), node_id));
3310+
if let Some(n) = node_id {
3311+
self.notify(|notify| notify.send(ChainMessageType::Consensus(message.clone()), &n));
3312+
}
33113313
}
33123314

33133315
fn epoch_transition_for(&self, parent_hash: H256) -> Option<crate::engines::EpochTransition> {

crates/ethcore/src/test_helpers.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,11 +695,13 @@ impl ChainNotify for TestNotify {
695695
self.messages.write().push(data);
696696
}
697697

698-
fn send(&self, message: ChainMessageType, node_id: Option<H512>) {
698+
fn send(&self, message: ChainMessageType, node_id: &H512) {
699699
let data = match message {
700700
ChainMessageType::Consensus(data) => data,
701701
};
702-
self.targeted_messages.write().push((data, node_id));
702+
self.targeted_messages
703+
.write()
704+
.push((data, Some(node_id.clone())));
703705
}
704706
}
705707

crates/ethcore/sync/src/api.rs

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -494,24 +494,18 @@ impl SyncProtocolHandler {
494494
let mut sync_io = NetSyncIo::new(nc, &*self.chain, &*self.snapshot_service, &self.overlay);
495495

496496
for node_id in pub_keys.iter() {
497-
if let Some(peer_id) = nc.node_id_to_peer_id(*node_id) {
497+
if let Some(peer_id) = nc.node_id_to_peer_id(node_id) {
498498
let found_peers = self.sync.peer_info(&[peer_id]);
499499
if let Some(peer_info) = found_peers.first() {
500500
if let Some(_) = peer_info {
501-
self.send_cached_consensus_messages_for(&mut sync_io, node_id, peer_id);
501+
self.send_cached_consensus_messages_for(&mut sync_io, node_id);
502502
}
503503
}
504504
}
505505
}
506506
}
507507

508-
fn send_cached_consensus_messages_for(
509-
&self,
510-
sync_io: &mut dyn SyncIo,
511-
node_id: &NodeId,
512-
peer_id: PeerId,
513-
) {
514-
508+
fn send_cached_consensus_messages_for(&self, sync_io: &mut dyn SyncIo, node_id: &NodeId) {
515509
// now since we are connected, lets send any cached messages
516510
if let Some(vec_msg) = self.message_cache.write().remove(&Some(*node_id)) {
517511
trace!(target: "consensus", "Cached Messages: Trying to send cached messages to {:?}", node_id);
@@ -521,17 +515,18 @@ impl SyncProtocolHandler {
521515
for msg in vec_msg {
522516
match msg {
523517
ChainMessageType::Consensus(message) => {
524-
let send_consensus_result = self
525-
.sync
526-
.write()
527-
.send_consensus_packet(sync_io, message.clone(), peer_id);
518+
let send_consensus_result = self.sync.write().send_consensus_packet(
519+
sync_io,
520+
message.clone(),
521+
node_id,
522+
);
528523

529524
match send_consensus_result {
530-
Ok(_) => {},
525+
Ok(_) => {}
531526
Err(e) => {
532-
info!(target: "consensus", "Error sending cached consensus message to peer (re-adding) {:?}: {:?}", peer_id, e);
527+
info!(target: "consensus", "Error sending cached consensus message to peer (re-adding) {:?}: {:?}", node_id, e);
533528
failed_messages.push(ChainMessageType::Consensus(message));
534-
},
529+
}
535530
}
536531
}
537532
}
@@ -540,7 +535,9 @@ impl SyncProtocolHandler {
540535
if !failed_messages.is_empty() {
541536
// If we failed to send some messages, cache them for later
542537
let mut lock = self.message_cache.write();
543-
lock.entry(Some(*node_id)).or_default().extend(failed_messages);
538+
lock.entry(Some(*node_id))
539+
.or_default()
540+
.extend(failed_messages);
544541
} else {
545542
trace!(target: "consensus", "Cached Messages: Successfully sent all cached messages to {:?}", node_id);
546543
}
@@ -736,41 +733,21 @@ impl ChainNotify for EthSync {
736733
});
737734
}
738735

739-
fn send(&self, message_type: ChainMessageType, node_id: Option<H512>) {
736+
fn send(&self, message_type: ChainMessageType, node_id: &H512) {
740737
self.network.with_context(PAR_PROTOCOL, |context| {
741-
let peer_ids = self.network.connected_peers();
742-
let target_peer_id = peer_ids.into_iter().find(|p| {
743-
match context.session_info(*p){
744-
Some(session_info) => {
745-
session_info.id == node_id
746-
},
747-
None => { warn!(target:"sync", "No session exists for peerId {:?} Node: {:?}", p, node_id); false},
748-
}
749-
});
750-
751-
let my_peer_id = match target_peer_id {
752-
None => {
753-
trace!(target: "consensus", "Cached Messages: peer {:?} not connected, caching message...", node_id);
754-
let mut lock = self.eth_handler.message_cache.write();
755-
lock.entry(node_id.clone()).or_default().push(message_type);
756-
return;
757-
}
758-
Some(n) => n,
759-
};
760-
761738
let mut sync_io = NetSyncIo::new(context,
762739
&*self.eth_handler.chain,
763740
&*self.eth_handler.snapshot_service,
764741
&self.eth_handler.overlay);
765742

766743
match message_type {
767744
ChainMessageType::Consensus(message) => {
768-
let send_result = self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message.clone(), my_peer_id);
745+
let send_result = self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message.clone(), node_id);
769746
if let Err(e) = send_result {
770-
info!(target: "consensus", "Error sending consensus message to peer - caching message {:?}: {:?}", my_peer_id, e);
747+
info!(target: "consensus", "Error sending consensus message to peer - caching message {:?}: {:?}", node_id, e);
771748
// If we failed to send the message, cache it for later
772749
let mut lock = self.eth_handler.message_cache.write();
773-
lock.entry(node_id.clone()).or_default().push(ChainMessageType::Consensus(message));
750+
lock.entry(Some(node_id.clone())).or_default().push(ChainMessageType::Consensus(message));
774751
}
775752
},
776753
}

crates/ethcore/sync/src/chain/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,8 @@ impl ChainSyncApi {
577577
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
578578
check_deadline(deadline)?;
579579
for peer in peers {
580-
let send_result = ChainSync::send_packet(io, *peer, NewBlockPacket, rlp.clone());
580+
let send_result =
581+
ChainSync::send_packet(io, *peer, NewBlockPacket, rlp.clone());
581582
if send_result.is_ok() {
582583
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
583584
peer.latest_hash = hash;

crates/ethcore/sync/src/chain/propagator.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use crate::{
2525
types::{BlockNumber, blockchain_info::BlockChainInfo, transaction::SignedTransaction},
2626
};
2727
use bytes::Bytes;
28-
use ethereum_types::H256;
28+
use ethereum_types::{H256, H512};
2929
use fastmap::H256FastSet;
30-
use network::{client_version::ClientCapabilities, Error, PeerId};
30+
use network::{Error, PeerId, client_version::ClientCapabilities};
3131
use rand::RngCore;
3232
use rlp::RlpStream;
3333

@@ -202,7 +202,6 @@ impl ChainSync {
202202
rlp: Bytes| {
203203
let size = rlp.len();
204204

205-
206205
let send_result = ChainSync::send_packet(
207206
io,
208207
peer_id,
@@ -214,17 +213,14 @@ impl ChainSync {
214213
rlp,
215214
);
216215

217-
218-
219216
if send_result.is_ok() {
220217
if is_hashes {
221218
stats.log_propagated_hashes(sent, size);
222219
} else {
223220
stats.log_propagated_transactions(sent, size);
224-
}
221+
}
225222
trace!(target: "sync", "{:02} <- {} ({} entries; {} bytes)", peer_id, if is_hashes { "NewPooledTransactionHashes" } else { "Transactions" }, sent, size);
226223
}
227-
228224
};
229225

230226
let mut sent_to_peers = HashSet::new();
@@ -403,32 +399,46 @@ impl ChainSync {
403399
let lucky_peers = ChainSync::select_random_peers(&self.get_consensus_peers());
404400
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
405401

406-
self.statistics
407-
.log_consensus_broadcast(lucky_peers.len(), packet.len());
402+
let mut num_sent_messages = 0;
408403
for peer_id in lucky_peers {
409-
let send_result = ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
404+
let send_result =
405+
ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
410406

411407
if let Err(e) = send_result {
412408
info!(target: "sync", "Error broadcast consensus packet to peer {}: {:?}", peer_id, e);
409+
} else {
410+
num_sent_messages += 1;
413411
}
414412
}
413+
414+
self.statistics
415+
.log_consensus_broadcast(num_sent_messages, packet.len());
415416
}
416417

418+
/// Sends a packet to a specific peer.
419+
/// The caller has to take care about Errors, and reshedule if an error occurs.
417420
pub(crate) fn send_consensus_packet(
418421
&mut self,
419422
io: &mut dyn SyncIo,
420423
packet: Bytes,
421-
peer_id: usize,
424+
peer: &H512,
422425
) -> Result<(), Error> {
426+
let peer_id = match io.node_id_to_peer_id(peer) {
427+
Some(id) => id,
428+
None => {
429+
warn!(target: "sync", "Peer with node id {} not found in peers list.", peer);
430+
return Err("No Session for Peer".into());
431+
}
432+
};
423433
let packet_len = packet.len();
424434
let send_result = ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
425435
match &send_result {
426436
Ok(_) => {
427-
self.statistics.log_consensus(peer_id, packet_len);
428-
},
437+
self.statistics.log_consensus(packet_len);
438+
}
429439
Err(e) => {
430440
warn!(target: "sync", "Error sending consensus packet to peer {}: {:?}", peer_id, e);
431-
},
441+
}
432442
}
433443
return send_result;
434444
}
@@ -467,7 +477,7 @@ impl ChainSync {
467477
packet: Bytes,
468478
) -> Result<(), Error> {
469479
let result = sync.send(peer_id, packet_id, packet);
470-
if let Err(e) = &result {
480+
if let Err(e) = &result {
471481
debug!(target:"sync", "Error sending packet: {:?}", e);
472482
sync.disconnect_peer(peer_id);
473483
}

crates/ethcore/sync/src/chain/propagator_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl SyncPropagatorStatistics {
7979
}
8080
}
8181

82-
pub(crate) fn log_consensus(&mut self, _peer_id: usize, bytelen: usize) {
82+
pub(crate) fn log_consensus(&mut self, bytelen: usize) {
8383
if self.logging_enabled {
8484
self.consensus_bytes += bytelen as i64;
8585
self.consensus_packages += 1;

crates/ethcore/sync/src/sync_io.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
};
2121
use bytes::Bytes;
2222
use ethcore::{client::BlockChainClient, snapshot::SnapshotService};
23+
use ethereum_types::H512;
2324
use network::{
2425
Error, NetworkContext, PacketId, PeerId, ProtocolId, SessionInfo, client_version::ClientVersion,
2526
};
@@ -58,6 +59,9 @@ pub trait SyncIo {
5859
fn is_expired(&self) -> bool;
5960
/// Return sync overlay
6061
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>>;
62+
63+
/// Returns the peer ID for a given node id, if a corresponding peer exists.
64+
fn node_id_to_peer_id(&self, node_id: &H512) -> Option<PeerId>;
6165
}
6266

6367
/// Wraps `NetworkContext` and the blockchain client
@@ -132,4 +136,8 @@ impl<'s> SyncIo for NetSyncIo<'s> {
132136
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
133137
self.network.peer_client_version(peer_id)
134138
}
139+
140+
fn node_id_to_peer_id(&self, node_id: &H512) -> Option<PeerId> {
141+
self.network.node_id_to_peer_id(node_id)
142+
}
135143
}

crates/ethcore/sync/src/tests/helpers.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ where
181181
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
182182
&self.overlay
183183
}
184+
185+
fn node_id_to_peer_id(&self, node_id: &ethereum_types::H512) -> Option<PeerId> {
186+
return Some(node_id.to_low_u64_le() as PeerId);
187+
}
184188
}
185189

186190
/// Mock for emulution of async run of new blocks

crates/net/network-devp2p/src/host.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,17 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
296296
.unwrap_or(false)
297297
}
298298

299-
fn node_id_to_peer_id(&self, node_id: NodeId) -> Option<PeerId> {
299+
fn node_id_to_peer_id(&self, node_id: &NodeId) -> Option<PeerId> {
300300
let sessions = self.sessions.read();
301301
let sessions = &*sessions;
302302

303303
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
304304
if let Some(session) = sessions.get(i) {
305-
if session.lock().info.id == Some(node_id) {
306-
return Some(i);
305+
let session_node_id_o = session.lock().info.id;
306+
if let Some(session_node_id) = session_node_id_o {
307+
if session_node_id == *node_id {
308+
return Some(i);
309+
}
307310
}
308311
}
309312
}

0 commit comments

Comments
 (0)