Skip to content

Commit 1f05e50

Browse files
committed
Tracking Block Number for Consensus Messages,
outdated messages for consensus are now ignored in the resend loop. DMDcoin#261
1 parent 311337f commit 1f05e50

File tree

9 files changed

+69
-26
lines changed

9 files changed

+69
-26
lines changed

crates/ethcore/src/client/chain_notify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{collections::HashMap, time::Duration};
2222
/// Messages to broadcast via chain
2323
pub enum ChainMessageType {
2424
/// Consensus message
25-
Consensus(Vec<u8>),
25+
Consensus(u64, Vec<u8>),
2626
}
2727

2828
/// Route type to indicate whether it is enacted or retracted.

crates/ethcore/src/client/client.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3288,18 +3288,23 @@ impl super::traits::EngineClient for Client {
32883288
}
32893289
}
32903290

3291-
fn broadcast_consensus_message(&self, message: Bytes) {
3291+
fn broadcast_consensus_message(&self, future_block_id: u64, message: Bytes) {
32923292
self.statistics
32933293
.broadcasted_consensus_messages
32943294
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
32953295
self.statistics
32963296
.broadcasted_consensus_messages_bytes
32973297
.fetch_add(message.len() as u64, std::sync::atomic::Ordering::Relaxed);
32983298

3299-
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
3299+
self.notify(|notify| {
3300+
notify.broadcast(ChainMessageType::Consensus(
3301+
future_block_id,
3302+
message.clone(),
3303+
))
3304+
});
33003305
}
33013306

3302-
fn send_consensus_message(&self, message: Bytes, node_id: Option<H512>) {
3307+
fn send_consensus_message(&self, future_block_id: u64, message: Bytes, node_id: Option<H512>) {
33033308
self.statistics
33043309
.sent_consensus_messages
33053310
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -3308,7 +3313,12 @@ impl super::traits::EngineClient for Client {
33083313
.fetch_add(message.len() as u64, std::sync::atomic::Ordering::Relaxed);
33093314

33103315
if let Some(n) = node_id {
3311-
self.notify(|notify| notify.send(ChainMessageType::Consensus(message.clone()), &n));
3316+
self.notify(|notify| {
3317+
notify.send(
3318+
ChainMessageType::Consensus(future_block_id, message.clone()),
3319+
&n,
3320+
)
3321+
});
33123322
}
33133323
}
33143324

crates/ethcore/src/client/test_client.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,9 +1232,14 @@ impl super::traits::EngineClient for TestBlockChainClient {
12321232
}
12331233
}
12341234

1235-
fn broadcast_consensus_message(&self, _message: Bytes) {}
1235+
fn broadcast_consensus_message(&self, _future_block_id: u64, _message: Bytes) {}
12361236

1237-
fn send_consensus_message(&self, _message: Bytes, _node_id: Option<H512>) {
1237+
fn send_consensus_message(
1238+
&self,
1239+
_future_block_id: u64,
1240+
_message: Bytes,
1241+
_node_id: Option<H512>,
1242+
) {
12381243
// TODO: allow test to intercept the message to relay it to other test clients
12391244
}
12401245

crates/ethcore/src/client/traits.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,10 +668,10 @@ pub trait EngineClient: Sync + Send + ChainInfo {
668668
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>);
669669

670670
/// Broadcast a consensus message to the network.
671-
fn broadcast_consensus_message(&self, message: Bytes);
671+
fn broadcast_consensus_message(&self, future_block_id: u64, message: Bytes);
672672

673673
/// Send a consensus message to the specified peer
674-
fn send_consensus_message(&self, message: Bytes, node_id: Option<H512>);
674+
fn send_consensus_message(&self, future_block_id: u64, message: Bytes, node_id: Option<H512>);
675675

676676
/// Get the transition to the epoch the given parent hash is part of
677677
/// or transitions to.

crates/ethcore/src/engines/authority_round/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,7 @@ impl AuthorityRound {
11581158
self.empty_steps.lock().insert(empty_step);
11591159
}
11601160

1161-
fn generate_empty_step(&self, parent_hash: &H256) {
1161+
fn generate_empty_step(&self, future_block_id: u64, parent_hash: &H256) {
11621162
let step = self.step.inner.load();
11631163
let empty_step_rlp = empty_step_rlp(step, parent_hash);
11641164

@@ -1173,16 +1173,16 @@ impl AuthorityRound {
11731173
};
11741174

11751175
trace!(target: "engine", "broadcasting empty step message: {:?}", empty_step);
1176-
self.broadcast_message(message_rlp);
1176+
self.broadcast_message(future_block_id, message_rlp);
11771177
self.handle_empty_step_message(empty_step);
11781178
} else {
11791179
warn!(target: "engine", "generate_empty_step: FAIL: accounts secret key unavailable");
11801180
}
11811181
}
11821182

1183-
fn broadcast_message(&self, message: Vec<u8>) {
1183+
fn broadcast_message(&self, future_block_id: u64, message: Vec<u8>) {
11841184
if let Ok(c) = self.upgrade_client_or(None) {
1185-
c.broadcast_consensus_message(message);
1185+
c.broadcast_consensus_message(future_block_id, message);
11861186
}
11871187
}
11881188

@@ -1759,7 +1759,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
17591759
.compare_exchange(true, false, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
17601760
.is_ok()
17611761
{
1762-
self.generate_empty_step(header.parent_hash());
1762+
self.generate_empty_step(header.number(), header.parent_hash());
17631763
}
17641764

17651765
return Seal::None;

crates/ethcore/src/engines/hbbft/hbbft_engine.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ enum Message {
7272
Sealing(BlockNumber, sealing::Message),
7373
}
7474

75+
impl Message {
76+
/// Returns the epoch (block number) of the message.
77+
pub fn block_number(&self) -> BlockNumber {
78+
match self {
79+
Message::HoneyBadger(_, msg) => msg.epoch(),
80+
Message::Sealing(block_num, _) => *block_num,
81+
}
82+
}
83+
}
84+
7585
/// The Honey Badger BFT Engine.
7686
pub struct HoneyBadgerBFT {
7787
transition_service: IoService<()>,
@@ -727,7 +737,11 @@ impl HoneyBadgerBFT {
727737
trace!(target: "consensus", "Dispatching message {:?} to {:?}", m.message, set);
728738
for node_id in set.into_iter().filter(|p| p != net_info.our_id()) {
729739
trace!(target: "consensus", "Sending message to {}", node_id.0);
730-
client.send_consensus_message(ser.clone(), Some(node_id.0));
740+
client.send_consensus_message(
741+
m.message.block_number(),
742+
ser.clone(),
743+
Some(node_id.0),
744+
);
731745
}
732746
}
733747
Target::AllExcept(set) => {
@@ -737,7 +751,11 @@ impl HoneyBadgerBFT {
737751
.filter(|p| (p != &net_info.our_id() && !set.contains(p)))
738752
{
739753
trace!(target: "consensus", "Sending exclusive message to {}", node_id.0);
740-
client.send_consensus_message(ser.clone(), Some(node_id.0));
754+
client.send_consensus_message(
755+
m.message.block_number(),
756+
ser.clone(),
757+
Some(node_id.0),
758+
);
741759
}
742760
}
743761
}
@@ -780,6 +798,7 @@ impl HoneyBadgerBFT {
780798
message: Message::HoneyBadger(*message_counter, msg.message),
781799
}
782800
});
801+
783802
self.dispatch_messages(&client, messages, network_info);
784803
std::mem::drop(message_counter);
785804
self.process_output(client, step.output, network_info);
@@ -813,7 +832,7 @@ impl HoneyBadgerBFT {
813832

814833
let step = match self
815834
.hbbft_state
816-
.try_write_for(std::time::Duration::from_millis(10))
835+
.try_write_for(std::time::Duration::from_millis(100))
817836
{
818837
Some(mut state_lock) => state_lock.try_send_contribution(client.clone(), &self.signer),
819838
None => {

crates/ethcore/sync/src/api.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,11 @@ impl SyncProtocolHandler {
506506
}
507507

508508
fn send_cached_consensus_messages_for(&self, sync_io: &mut dyn SyncIo, node_id: &NodeId) {
509+
let last_interesting_block = self
510+
.chain
511+
.block_number(types::ids::BlockId::Latest)
512+
.unwrap_or(0);
513+
509514
// now since we are connected, lets send any cached messages
510515
if let Some(vec_msg) = self.message_cache.write().remove(&Some(*node_id)) {
511516
trace!(target: "consensus", "Cached Messages: Trying to send cached messages to {:?}", node_id);
@@ -514,7 +519,11 @@ impl SyncProtocolHandler {
514519

515520
for msg in vec_msg {
516521
match msg {
517-
ChainMessageType::Consensus(message) => {
522+
ChainMessageType::Consensus(block, message) => {
523+
if block < last_interesting_block {
524+
// https://github.com/DMDcoin/diamond-node/issues/261
525+
continue;
526+
}
518527
let send_consensus_result = self.sync.write().send_consensus_packet(
519528
sync_io,
520529
message.clone(),
@@ -525,7 +534,7 @@ impl SyncProtocolHandler {
525534
Ok(_) => {}
526535
Err(e) => {
527536
info!(target: "consensus", "Error sending cached consensus message to peer (re-adding) {:?}: {:?}", node_id, e);
528-
failed_messages.push(ChainMessageType::Consensus(message));
537+
failed_messages.push(ChainMessageType::Consensus(block, message));
529538
}
530539
}
531540
}
@@ -534,8 +543,9 @@ impl SyncProtocolHandler {
534543

535544
if !failed_messages.is_empty() {
536545
// If we failed to send some messages, cache them for later
537-
let mut lock = self.message_cache.write();
538-
lock.entry(Some(*node_id))
546+
self.message_cache
547+
.write()
548+
.entry(Some(*node_id))
539549
.or_default()
540550
.extend(failed_messages);
541551
} else {
@@ -729,7 +739,7 @@ impl ChainNotify for EthSync {
729739
&self.eth_handler.overlay,
730740
);
731741
match message_type {
732-
ChainMessageType::Consensus(message) => self
742+
ChainMessageType::Consensus(_block, message) => self
733743
.eth_handler
734744
.sync
735745
.write()
@@ -746,13 +756,13 @@ impl ChainNotify for EthSync {
746756
&self.eth_handler.overlay);
747757

748758
match message_type {
749-
ChainMessageType::Consensus(message) => {
759+
ChainMessageType::Consensus(block, message) => {
750760
let send_result = self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message.clone(), node_id);
751761
if let Err(e) = send_result {
752762
info!(target: "consensus", "Error sending consensus message to peer - caching message {:?}: {:?}", node_id, e);
753763
// If we failed to send the message, cache it for later
754764
let mut lock = self.eth_handler.message_cache.write();
755-
lock.entry(Some(node_id.clone())).or_default().push(ChainMessageType::Consensus(message));
765+
lock.entry(Some(node_id.clone())).or_default().push(ChainMessageType::Consensus(block, message));
756766
}
757767
},
758768
}

crates/ethcore/sync/src/chain/propagator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,6 @@ impl ChainSync {
405405
for peer_id in lucky_peers {
406406
let send_result =
407407
ChainSync::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
408-
409408
if let Err(e) = send_result {
410409
info!(target: "sync", "Error broadcast consensus packet to peer {}: {:?}", peer_id, e);
411410
} else {

crates/net/network-devp2p/src/session_container.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ impl SessionContainer {
433433
pub(crate) fn deregister_session_stream<Host: mio::deprecated::Handler>(
434434
&self,
435435
stream: usize,
436-
436+
437437
event_loop: &mut mio::deprecated::EventLoop<Host>,
438438
) {
439439
let connections = if stream < self.last_handshake {

0 commit comments

Comments
 (0)