Skip to content

Commit 10351eb

Browse files
committed
- errors in sending messages now require to be handled.
- failed messages are now readded to the send cache. - in non critical paths of the consesus, those errors are ignored, like it has been the case all the time.
1 parent 3794a1d commit 10351eb

File tree

3 files changed

+81
-25
lines changed

3 files changed

+81
-25
lines changed

crates/ethcore/sync/src/api.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,17 +511,39 @@ impl SyncProtocolHandler {
511511
node_id: &NodeId,
512512
peer_id: PeerId,
513513
) {
514+
514515
// now since we are connected, lets send any cached messages
515516
if let Some(vec_msg) = self.message_cache.write().remove(&Some(*node_id)) {
516517
trace!(target: "consensus", "Cached Messages: Trying to send cached messages to {:?}", node_id);
518+
519+
let mut failed_messages: Vec<ChainMessageType> = Vec::new();
520+
517521
for msg in vec_msg {
518522
match msg {
519-
ChainMessageType::Consensus(message) => self
523+
ChainMessageType::Consensus(message) => {
524+
let send_consensus_result = self
520525
.sync
521526
.write()
522-
.send_consensus_packet(sync_io, message, peer_id),
527+
.send_consensus_packet(sync_io, message.clone(), peer_id);
528+
529+
match send_consensus_result {
530+
Ok(_) => {},
531+
Err(e) => {
532+
info!(target: "consensus", "Error sending cached consensus message to peer (re-adding) {:?}: {:?}", peer_id, e);
533+
failed_messages.push(ChainMessageType::Consensus(message));
534+
},
535+
}
536+
}
523537
}
524538
}
539+
540+
if !failed_messages.is_empty() {
541+
// If we failed to send some messages, cache them for later
542+
let mut lock = self.message_cache.write();
543+
lock.entry(Some(*node_id)).or_default().extend(failed_messages);
544+
} else {
545+
trace!(target: "consensus", "Cached Messages: Successfully sent all cached messages to {:?}", node_id);
546+
}
525547
}
526548
}
527549
}
@@ -742,7 +764,15 @@ impl ChainNotify for EthSync {
742764
&self.eth_handler.overlay);
743765

744766
match message_type {
745-
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message, my_peer_id),
767+
ChainMessageType::Consensus(message) => {
768+
let send_result = self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message.clone(), my_peer_id);
769+
if let Err(e) = send_result {
770+
info!(target: "consensus", "Error sending consensus message to peer - caching message {:?}: {:?}", my_peer_id, e);
771+
// If we failed to send the message, cache it for later
772+
let mut lock = self.eth_handler.message_cache.write();
773+
lock.entry(node_id.clone()).or_default().push(ChainMessageType::Consensus(message));
774+
}
775+
},
746776
}
747777
});
748778
}

crates/ethcore/sync/src/chain/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,9 +577,11 @@ impl ChainSyncApi {
577577
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
578578
check_deadline(deadline)?;
579579
for peer in peers {
580-
ChainSync::send_packet(io, *peer, NewBlockPacket, rlp.clone());
581-
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
582-
peer.latest_hash = hash;
580+
let send_result = ChainSync::send_packet(io, *peer, NewBlockPacket, rlp.clone());
581+
if send_result.is_ok() {
582+
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
583+
peer.latest_hash = hash;
584+
}
583585
}
584586
}
585587
}

crates/ethcore/sync/src/chain/propagator.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
use bytes::Bytes;
2828
use ethereum_types::H256;
2929
use fastmap::H256FastSet;
30-
use network::{PeerId, client_version::ClientCapabilities};
30+
use network::{client_version::ClientCapabilities, Error, PeerId};
3131
use rand::RngCore;
3232
use rlp::RlpStream;
3333

@@ -77,10 +77,12 @@ impl ChainSync {
7777
self.statistics
7878
.log_propagated_block(io, *peer_id, blocks.len(), rlp.len());
7979

80-
ChainSync::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
80+
let send_result = ChainSync::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
8181

82-
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
83-
peer.latest_hash = chain_info.best_block_hash.clone();
82+
if send_result.is_ok() {
83+
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
84+
peer.latest_hash = chain_info.best_block_hash.clone();
85+
}
8486
}
8587
}
8688
};
@@ -122,7 +124,7 @@ impl ChainSync {
122124
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
123125
peer.latest_hash = best_block_hash;
124126
}
125-
ChainSync::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone());
127+
let _ = ChainSync::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone());
126128
}
127129
sent
128130
}
@@ -200,13 +202,8 @@ impl ChainSync {
200202
rlp: Bytes| {
201203
let size = rlp.len();
202204

203-
if is_hashes {
204-
stats.log_propagated_hashes(sent, size);
205-
} else {
206-
stats.log_propagated_transactions(sent, size);
207-
}
208205

209-
ChainSync::send_packet(
206+
let send_result = ChainSync::send_packet(
210207
io,
211208
peer_id,
212209
if is_hashes {
@@ -216,7 +213,18 @@ impl ChainSync {
216213
},
217214
rlp,
218215
);
219-
trace!(target: "sync", "{:02} <- {} ({} entries; {} bytes)", peer_id, if is_hashes { "NewPooledTransactionHashes" } else { "Transactions" }, sent, size);
216+
217+
218+
219+
if send_result.is_ok() {
220+
if is_hashes {
221+
stats.log_propagated_hashes(sent, size);
222+
} else {
223+
stats.log_propagated_transactions(sent, size);
224+
}
225+
trace!(target: "sync", "{:02} <- {} ({} entries; {} bytes)", peer_id, if is_hashes { "NewPooledTransactionHashes" } else { "Transactions" }, sent, size);
226+
}
227+
220228
};
221229

222230
let mut sent_to_peers = HashSet::new();
@@ -385,7 +393,7 @@ impl ChainSync {
385393
// more about: https://github.com/DMDcoin/diamond-node/issues/61
386394
let rlp = ChainSync::create_block_rlp(block, io.chain().chain_info().total_difficulty);
387395
for peer_id in &peers {
388-
ChainSync::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
396+
let _ = ChainSync::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
389397
}
390398
}
391399
}
@@ -398,7 +406,11 @@ impl ChainSync {
398406
self.statistics
399407
.log_consensus_broadcast(lucky_peers.len(), packet.len());
400408
for peer_id in lucky_peers {
401-
ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
409+
let send_result = ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
410+
411+
if let Err(e) = send_result {
412+
info!(target: "sync", "Error broadcast consensus packet to peer {}: {:?}", peer_id, e);
413+
}
402414
}
403415
}
404416

@@ -407,9 +419,18 @@ impl ChainSync {
407419
io: &mut dyn SyncIo,
408420
packet: Bytes,
409421
peer_id: usize,
410-
) {
411-
self.statistics.log_consensus(peer_id, packet.len());
412-
ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet);
422+
) -> Result<(), Error> {
423+
let packet_len = packet.len();
424+
let send_result = ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
425+
match &send_result {
426+
Ok(_) => {
427+
self.statistics.log_consensus(peer_id, packet_len);
428+
},
429+
Err(e) => {
430+
warn!(target: "sync", "Error sending consensus packet to peer {}: {:?}", peer_id, e);
431+
},
432+
}
433+
return send_result;
413434
}
414435

415436
fn select_peers_for_transactions<F>(&self, filter: F, are_new: bool) -> Vec<PeerId>
@@ -444,11 +465,14 @@ impl ChainSync {
444465
peer_id: PeerId,
445466
packet_id: SyncPacket,
446467
packet: Bytes,
447-
) {
448-
if let Err(e) = sync.send(peer_id, packet_id, packet) {
468+
) -> Result<(), Error> {
469+
let result = sync.send(peer_id, packet_id, packet);
470+
if let Err(e) = &result {
449471
debug!(target:"sync", "Error sending packet: {:?}", e);
450472
sync.disconnect_peer(peer_id);
451473
}
474+
475+
return result;
452476
}
453477

454478
/// propagates new transactions to all peers

0 commit comments

Comments
 (0)