diff --git a/core/src/block_data_manager/mod.rs b/core/src/block_data_manager/mod.rs index 2fdbcb0b68..7bf3159916 100644 --- a/core/src/block_data_manager/mod.rs +++ b/core/src/block_data_manager/mod.rs @@ -1111,10 +1111,11 @@ impl BlockDataManager { .recover_unsigned_tx_with_order(transactions) } - pub fn build_partial( + pub fn find_missing_tx_indices_encoded( &self, compact_block: &mut CompactBlock, ) -> Vec { - self.tx_data_manager.build_partial(compact_block) + self.tx_data_manager + .find_missing_tx_indices_encoded(compact_block) } /// Caller should make sure the state exists. diff --git a/core/src/block_data_manager/tx_data_manager.rs b/core/src/block_data_manager/tx_data_manager.rs index 4c39837370..6382008533 100644 --- a/core/src/block_data_manager/tx_data_manager.rs +++ b/core/src/block_data_manager/tx_data_manager.rs @@ -191,8 +191,8 @@ impl TransactionDataManager { end_idx += 1; remainder -= 1; } - let unsigned_txes = Vec::new(); - unsigned_trans.push(unsigned_txes); + let unsigned_txns = Vec::new(); + unsigned_trans.push(unsigned_txns); } unsigned_trans.last_mut().unwrap().push(tx); @@ -202,14 +202,14 @@ impl TransactionDataManager { let (sender, receiver) = channel(); let n_thread = unsigned_trans.len(); - for unsigned_txes in unsigned_trans { - RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txes.len()); + for unsigned_txns in unsigned_trans { + RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len()); let sender = sender.clone(); self.worker_pool.lock().execute(move || { - let mut signed_txes = Vec::new(); - for (idx, tx) in unsigned_txes { + let mut signed_txns = Vec::new(); + for (idx, tx) in unsigned_txns { if let Ok(public) = tx.recover_public() { - signed_txes.push((idx, Arc::new(SignedTransaction::new( + signed_txns.push((idx, Arc::new(SignedTransaction::new( public, tx.clone(), )))); @@ -221,7 +221,7 @@ impl TransactionDataManager { break; } } - sender.send(signed_txes).unwrap(); + sender.send(signed_txns).unwrap(); }); } @@ -243,14 +243,14 @@ impl TransactionDataManager { } /// Find tx in tx_time_window that matches tx_short_ids to fill in - /// reconstruced_txes Return the differentially encoded index of missing + /// reconstruced_txns Return the differentially encoded index of missing /// transactions Now should only called once after CompactBlock is /// decoded - pub fn build_partial( + pub fn find_missing_tx_indices_encoded( &self, compact_block: &mut CompactBlock, ) -> Vec { compact_block - .reconstructed_txes + .reconstructed_txns .resize(compact_block.len(), None); let (random_bytes_vector, fixed_bytes_vector) = @@ -270,7 +270,7 @@ impl TransactionDataManager { k1, ) { Some(tx) => { - compact_block.reconstructed_txes[i] = Some(tx.clone()); + compact_block.reconstructed_txns[i] = Some(tx.clone()); } None => { missing_index.push(i); diff --git a/core/src/sync/message/get_block_txn.rs b/core/src/sync/message/get_block_txn.rs index ada1640cde..010724e945 100644 --- a/core/src/sync/message/get_block_txn.rs +++ b/core/src/sync/message/get_block_txn.rs @@ -5,7 +5,10 @@ use crate::{ message::RequestId, sync::{ - message::{Context, GetBlockTxnResponse, Handleable, KeyContainer}, + message::{ + msgid, Context, GetBlockTxnResponse, GetBlocks, Handleable, Key, + KeyContainer, + }, request_manager::{AsAny, Request}, Error, ErrorKind, ProtocolConfiguration, }, @@ -32,7 +35,10 @@ impl Request for GetBlockTxn { conf.blocks_request_timeout } - fn on_removed(&self, _inflight_keys: &KeyContainer) {} + fn on_removed(&self, inflight_keys: &KeyContainer) { + let mut inflight_keys = inflight_keys.write(msgid::GET_BLOCKS); + inflight_keys.remove(&Key::Hash(self.block_hash.clone())); + } fn with_inflight(&mut self, _inflight_keys: &KeyContainer) { // reuse the inflight key of GetCompactBlocks @@ -41,7 +47,13 @@ impl Request for GetBlockTxn { fn is_empty(&self) -> bool { false } fn resend(&self) -> Option> { - Some(Box::new(self.clone())) + Some(Box::new(GetBlocks { + request_id: 0, + // request_block_need_public can only be true in catch_up_mode, + // where GetBlockTxn can not be initiated. + with_public: false, + hashes: vec![self.block_hash.clone()], + })) } } diff --git a/core/src/sync/message/get_block_txn_response.rs b/core/src/sync/message/get_block_txn_response.rs index ddc395a120..4719d5bb1d 100644 --- a/core/src/sync/message/get_block_txn_response.rs +++ b/core/src/sync/message/get_block_txn_response.rs @@ -52,7 +52,7 @@ impl Handleable for GetBlockTxnResponse { ctx.manager.graph.block_header_by_hash(&resp_hash) { debug!("Process blocktxn hash={:?}", resp_hash); - let signed_txes = ctx + let signed_txns = ctx .manager .graph .data_man @@ -60,13 +60,13 @@ impl Handleable for GetBlockTxnResponse { match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) { Some(cmpct) => { let mut trans = - Vec::with_capacity(cmpct.reconstructed_txes.len()); + Vec::with_capacity(cmpct.reconstructed_txns.len()); let mut index = 0; - for tx in cmpct.reconstructed_txes { + for tx in cmpct.reconstructed_txns { match tx { Some(tx) => trans.push(tx), None => { - trans.push(signed_txes[index].clone()); + trans.push(signed_txns[index].clone()); index += 1; } } @@ -98,7 +98,7 @@ impl Handleable for GetBlockTxnResponse { // added to received pool ctx.manager .request_manager - .append_received_transactions(signed_txes); + .append_received_transactions(signed_txns); } if insert_result.should_relay() && !ctx.manager.catch_up_mode() diff --git a/core/src/sync/message/get_compact_blocks_response.rs b/core/src/sync/message/get_compact_blocks_response.rs index 93ab4a4a3f..9befaa4aee 100644 --- a/core/src/sync/message/get_compact_blocks_response.rs +++ b/core/src/sync/message/get_compact_blocks_response.rs @@ -49,7 +49,7 @@ impl Handleable for GetCompactBlocksResponse { let mut to_relay_blocks = Vec::new(); let mut received_reconstructed_blocks = Vec::new(); - let mut requested: HashSet = req + let mut requested_except_inflight_txn: HashSet = req .downcast_ref::( ctx.io, &ctx.manager.request_manager, @@ -62,7 +62,7 @@ impl Handleable for GetCompactBlocksResponse { for mut cmpct in self.compact_blocks { let hash = cmpct.hash(); - if !requested.contains(&hash) { + if !requested_except_inflight_txn.contains(&hash) { warn!("Response has not requested compact block {:?}", hash); continue; } @@ -96,7 +96,10 @@ impl Handleable for GetCompactBlocksResponse { let missing = { let _timer = MeterTimer::time_func(CMPCT_BLOCK_RECOVER_TIMER.as_ref()); - ctx.manager.graph.data_man.build_partial(&mut cmpct) + ctx.manager + .graph + .data_man + .find_missing_tx_indices_encoded(&mut cmpct) }; if !missing.is_empty() { debug!("Request {} missing tx in {}", missing.len(), hash); @@ -105,10 +108,10 @@ impl Handleable for GetCompactBlocksResponse { .request_manager .request_blocktxn(ctx.io, ctx.peer, hash, missing, None); // The block remains inflight. - requested.remove(&hash); + requested_except_inflight_txn.remove(&hash); } else { let trans = cmpct - .reconstructed_txes + .reconstructed_txns .into_iter() .map(|tx| tx.unwrap()) .collect(); @@ -138,15 +141,16 @@ impl Handleable for GetCompactBlocksResponse { // We cannot just mark `self.blocks` as completed here because they // might be invalid. let mut received_full_blocks = HashSet::new(); - let mut not_block_responded_requests = requested; + let mut compact_block_responded_requests = + requested_except_inflight_txn; for block in &self.blocks { received_full_blocks.insert(block.hash()); - not_block_responded_requests.remove(&block.hash()); + compact_block_responded_requests.remove(&block.hash()); } ctx.manager.blocks_received( ctx.io, - not_block_responded_requests.clone(), + compact_block_responded_requests.clone(), received_reconstructed_blocks.iter().cloned().collect(), true, Some(ctx.peer), diff --git a/core/src/sync/request_manager/mod.rs b/core/src/sync/request_manager/mod.rs index a09c804137..e07754497c 100644 --- a/core/src/sync/request_manager/mod.rs +++ b/core/src/sync/request_manager/mod.rs @@ -533,7 +533,7 @@ impl RequestManager { self.request_with_delay(io, Box::new(request), Some(peer_id), delay); } - pub fn send_request_again( + fn send_request_again( &self, io: &dyn NetworkContext, msg: &RequestMessage, ) { debug!("send_request_again, request={:?}", msg.request); @@ -657,7 +657,7 @@ impl RequestManager { /// received or will be requested by the caller again (the case for /// `Blocktxn`). pub fn blocks_received( - &self, io: &dyn NetworkContext, req_hashes: HashSet, + &self, io: &dyn NetworkContext, requested_hashes: HashSet, mut received_blocks: HashSet, ask_full_block: bool, peer: Option, with_public: bool, delay: Option, ) @@ -665,12 +665,12 @@ impl RequestManager { let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref()); debug!( "blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}", - req_hashes, received_blocks, peer + requested_hashes, received_blocks, peer ); let missing_blocks = { let mut inflight_keys = self.inflight_keys.write(msgid::GET_BLOCKS); let mut missing_blocks = Vec::new(); - for req_hash in &req_hashes { + for req_hash in &requested_hashes { if !received_blocks.remove(req_hash) { // If `req_hash` is not in `blocks_in_flight`, it may has // been received or requested @@ -910,17 +910,12 @@ impl RequestManager { } pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: PeerId) { - if let Some(mut unfinished_requests) = + if let Some(unfinished_requests) = self.request_handler.remove_peer(peer) { - { - for msg in &unfinished_requests { - msg.request.on_removed(&self.inflight_keys); - } - } - for msg in unfinished_requests.iter_mut() { + for mut msg in unfinished_requests { msg.delay = None; - self.send_request_again(io, &msg); + self.resend_request_to_another_peer(io, &msg); } } else { debug!("Peer already removed form request manager when disconnected peer={}", peer); diff --git a/core/src/sync/synchronization_protocol_handler.rs b/core/src/sync/synchronization_protocol_handler.rs index b460ddaa55..a9c4c0dab5 100644 --- a/core/src/sync/synchronization_protocol_handler.rs +++ b/core/src/sync/synchronization_protocol_handler.rs @@ -1417,14 +1417,14 @@ impl SynchronizationProtocolHandler { } pub fn blocks_received( - &self, io: &dyn NetworkContext, req_hashes: HashSet, + &self, io: &dyn NetworkContext, requested_hashes: HashSet, returned_blocks: HashSet, ask_full_block: bool, peer: Option, delay: Option, ) { self.request_manager.blocks_received( io, - req_hashes, + requested_hashes, returned_blocks, ask_full_block, peer, diff --git a/primitives/src/block.rs b/primitives/src/block.rs index f2b1db1b88..ada85c0999 100644 --- a/primitives/src/block.rs +++ b/primitives/src/block.rs @@ -119,8 +119,8 @@ impl Block { k0, k1, ), - // reconstructed_txes constructed here will not be used - reconstructed_txes: Vec::new(), + // reconstructed_txns constructed here will not be used + reconstructed_txns: Vec::new(), } } @@ -235,8 +235,8 @@ pub struct CompactBlock { pub nonce: u64, /// A list of tx short ids pub tx_short_ids: Vec, - /// Store the txes reconstructed, None means not received - pub reconstructed_txes: Vec>>, + /// Store the txns reconstructed, None means not received + pub reconstructed_txns: Vec>>, } impl Debug for CompactBlock { @@ -251,7 +251,7 @@ impl Debug for CompactBlock { impl MallocSizeOf for CompactBlock { fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { - self.tx_short_ids.size_of(ops) + self.reconstructed_txes.size_of(ops) + self.tx_short_ids.size_of(ops) + self.reconstructed_txns.size_of(ops) } } @@ -354,7 +354,7 @@ impl Decodable for CompactBlock { block_header: rlp.val_at(0)?, nonce: rlp.val_at(1)?, tx_short_ids: short_ids, - reconstructed_txes: Vec::new(), + reconstructed_txns: Vec::new(), }) } }