Skip to content

Temporary fix for #1138 #1140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/block_data_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,10 +1111,11 @@ impl BlockDataManager {
.recover_unsigned_tx_with_order(transactions)
}

pub fn build_partial(
pub fn find_missing_tx_indices_encoded(
&self, compact_block: &mut CompactBlock,
) -> Vec<usize> {
self.tx_data_manager.build_partial(compact_block)
self.tx_data_manager
.find_missing_tx_indices_encoded(compact_block)
}

/// Caller should make sure the state exists.
Expand Down
24 changes: 12 additions & 12 deletions core/src/block_data_manager/tx_data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ impl TransactionDataManager {
end_idx += 1;
remainder -= 1;
}
let unsigned_txes = Vec::new();
unsigned_trans.push(unsigned_txes);
let unsigned_txns = Vec::new();
unsigned_trans.push(unsigned_txns);
}

unsigned_trans.last_mut().unwrap().push(tx);
Expand All @@ -202,14 +202,14 @@ impl TransactionDataManager {

let (sender, receiver) = channel();
let n_thread = unsigned_trans.len();
for unsigned_txes in unsigned_trans {
RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txes.len());
for unsigned_txns in unsigned_trans {
RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
let sender = sender.clone();
self.worker_pool.lock().execute(move || {
let mut signed_txes = Vec::new();
for (idx, tx) in unsigned_txes {
let mut signed_txns = Vec::new();
for (idx, tx) in unsigned_txns {
if let Ok(public) = tx.recover_public() {
signed_txes.push((idx, Arc::new(SignedTransaction::new(
signed_txns.push((idx, Arc::new(SignedTransaction::new(
public,
tx.clone(),
))));
Expand All @@ -221,7 +221,7 @@ impl TransactionDataManager {
break;
}
}
sender.send(signed_txes).unwrap();
sender.send(signed_txns).unwrap();
});
}

Expand All @@ -243,14 +243,14 @@ impl TransactionDataManager {
}

/// Find tx in tx_time_window that matches tx_short_ids to fill in
/// reconstruced_txes Return the differentially encoded index of missing
/// reconstruced_txns Return the differentially encoded index of missing
/// transactions Now should only called once after CompactBlock is
/// decoded
pub fn build_partial(
pub fn find_missing_tx_indices_encoded(
&self, compact_block: &mut CompactBlock,
) -> Vec<usize> {
compact_block
.reconstructed_txes
.reconstructed_txns
.resize(compact_block.len(), None);

let (random_bytes_vector, fixed_bytes_vector) =
Expand All @@ -270,7 +270,7 @@ impl TransactionDataManager {
k1,
) {
Some(tx) => {
compact_block.reconstructed_txes[i] = Some(tx.clone());
compact_block.reconstructed_txns[i] = Some(tx.clone());
}
None => {
missing_index.push(i);
Expand Down
18 changes: 15 additions & 3 deletions core/src/sync/message/get_block_txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
use crate::{
message::RequestId,
sync::{
message::{Context, GetBlockTxnResponse, Handleable, KeyContainer},
message::{
msgid, Context, GetBlockTxnResponse, GetBlocks, Handleable, Key,
KeyContainer,
},
request_manager::{AsAny, Request},
Error, ErrorKind, ProtocolConfiguration,
},
Expand All @@ -32,7 +35,10 @@ impl Request for GetBlockTxn {
conf.blocks_request_timeout
}

fn on_removed(&self, _inflight_keys: &KeyContainer) {}
fn on_removed(&self, inflight_keys: &KeyContainer) {
let mut inflight_keys = inflight_keys.write(msgid::GET_BLOCKS);
inflight_keys.remove(&Key::Hash(self.block_hash.clone()));
}

fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {
// reuse the inflight key of GetCompactBlocks
Expand All @@ -41,7 +47,13 @@ impl Request for GetBlockTxn {
fn is_empty(&self) -> bool { false }

fn resend(&self) -> Option<Box<dyn Request>> {
Some(Box::new(self.clone()))
Some(Box::new(GetBlocks {
request_id: 0,
// request_block_need_public can only be true in catch_up_mode,
// where GetBlockTxn can not be initiated.
with_public: false,
hashes: vec![self.block_hash.clone()],
}))
}
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/sync/message/get_block_txn_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,21 @@ impl Handleable for GetBlockTxnResponse {
ctx.manager.graph.block_header_by_hash(&resp_hash)
{
debug!("Process blocktxn hash={:?}", resp_hash);
let signed_txes = ctx
let signed_txns = ctx
.manager
.graph
.data_man
.recover_unsigned_tx_with_order(&self.block_txn)?;
match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
Some(cmpct) => {
let mut trans =
Vec::with_capacity(cmpct.reconstructed_txes.len());
Vec::with_capacity(cmpct.reconstructed_txns.len());
let mut index = 0;
for tx in cmpct.reconstructed_txes {
for tx in cmpct.reconstructed_txns {
match tx {
Some(tx) => trans.push(tx),
None => {
trans.push(signed_txes[index].clone());
trans.push(signed_txns[index].clone());
index += 1;
}
}
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Handleable for GetBlockTxnResponse {
// added to received pool
ctx.manager
.request_manager
.append_received_transactions(signed_txes);
.append_received_transactions(signed_txns);
}
if insert_result.should_relay()
&& !ctx.manager.catch_up_mode()
Expand Down
20 changes: 12 additions & 8 deletions core/src/sync/message/get_compact_blocks_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Handleable for GetCompactBlocksResponse {
let mut to_relay_blocks = Vec::new();
let mut received_reconstructed_blocks = Vec::new();

let mut requested: HashSet<H256> = req
let mut requested_except_inflight_txn: HashSet<H256> = req
.downcast_ref::<GetCompactBlocks>(
ctx.io,
&ctx.manager.request_manager,
Expand All @@ -62,7 +62,7 @@ impl Handleable for GetCompactBlocksResponse {
for mut cmpct in self.compact_blocks {
let hash = cmpct.hash();

if !requested.contains(&hash) {
if !requested_except_inflight_txn.contains(&hash) {
warn!("Response has not requested compact block {:?}", hash);
continue;
}
Expand Down Expand Up @@ -96,7 +96,10 @@ impl Handleable for GetCompactBlocksResponse {
let missing = {
let _timer =
MeterTimer::time_func(CMPCT_BLOCK_RECOVER_TIMER.as_ref());
ctx.manager.graph.data_man.build_partial(&mut cmpct)
ctx.manager
.graph
.data_man
.find_missing_tx_indices_encoded(&mut cmpct)
};
if !missing.is_empty() {
debug!("Request {} missing tx in {}", missing.len(), hash);
Expand All @@ -105,10 +108,10 @@ impl Handleable for GetCompactBlocksResponse {
.request_manager
.request_blocktxn(ctx.io, ctx.peer, hash, missing, None);
// The block remains inflight.
requested.remove(&hash);
requested_except_inflight_txn.remove(&hash);
} else {
let trans = cmpct
.reconstructed_txes
.reconstructed_txns
.into_iter()
.map(|tx| tx.unwrap())
.collect();
Expand Down Expand Up @@ -138,15 +141,16 @@ impl Handleable for GetCompactBlocksResponse {
// We cannot just mark `self.blocks` as completed here because they
// might be invalid.
let mut received_full_blocks = HashSet::new();
let mut not_block_responded_requests = requested;
let mut compact_block_responded_requests =
requested_except_inflight_txn;
for block in &self.blocks {
received_full_blocks.insert(block.hash());
not_block_responded_requests.remove(&block.hash());
compact_block_responded_requests.remove(&block.hash());
}

ctx.manager.blocks_received(
ctx.io,
not_block_responded_requests.clone(),
compact_block_responded_requests.clone(),
received_reconstructed_blocks.iter().cloned().collect(),
true,
Some(ctx.peer),
Expand Down
19 changes: 7 additions & 12 deletions core/src/sync/request_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl RequestManager {
self.request_with_delay(io, Box::new(request), Some(peer_id), delay);
}

pub fn send_request_again(
fn send_request_again(
&self, io: &dyn NetworkContext, msg: &RequestMessage,
) {
debug!("send_request_again, request={:?}", msg.request);
Expand Down Expand Up @@ -657,20 +657,20 @@ impl RequestManager {
/// received or will be requested by the caller again (the case for
/// `Blocktxn`).
pub fn blocks_received(
&self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
&self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
mut received_blocks: HashSet<H256>, ask_full_block: bool,
peer: Option<PeerId>, with_public: bool, delay: Option<Duration>,
)
{
let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
debug!(
"blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}",
req_hashes, received_blocks, peer
requested_hashes, received_blocks, peer
);
let missing_blocks = {
let mut inflight_keys = self.inflight_keys.write(msgid::GET_BLOCKS);
let mut missing_blocks = Vec::new();
for req_hash in &req_hashes {
for req_hash in &requested_hashes {
if !received_blocks.remove(req_hash) {
// If `req_hash` is not in `blocks_in_flight`, it may has
// been received or requested
Expand Down Expand Up @@ -910,17 +910,12 @@ impl RequestManager {
}

pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: PeerId) {
if let Some(mut unfinished_requests) =
if let Some(unfinished_requests) =
self.request_handler.remove_peer(peer)
{
{
for msg in &unfinished_requests {
msg.request.on_removed(&self.inflight_keys);
}
}
for msg in unfinished_requests.iter_mut() {
for mut msg in unfinished_requests {
msg.delay = None;
self.send_request_again(io, &msg);
self.resend_request_to_another_peer(io, &msg);
}
} else {
debug!("Peer already removed form request manager when disconnected peer={}", peer);
Expand Down
4 changes: 2 additions & 2 deletions core/src/sync/synchronization_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,14 +1417,14 @@ impl SynchronizationProtocolHandler {
}

pub fn blocks_received(
&self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
&self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
returned_blocks: HashSet<H256>, ask_full_block: bool,
peer: Option<PeerId>, delay: Option<Duration>,
)
{
self.request_manager.blocks_received(
io,
req_hashes,
requested_hashes,
returned_blocks,
ask_full_block,
peer,
Expand Down
12 changes: 6 additions & 6 deletions primitives/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ impl Block {
k0,
k1,
),
// reconstructed_txes constructed here will not be used
reconstructed_txes: Vec::new(),
// reconstructed_txns constructed here will not be used
reconstructed_txns: Vec::new(),
}
}

Expand Down Expand Up @@ -235,8 +235,8 @@ pub struct CompactBlock {
pub nonce: u64,
/// A list of tx short ids
pub tx_short_ids: Vec<u8>,
/// Store the txes reconstructed, None means not received
pub reconstructed_txes: Vec<Option<Arc<SignedTransaction>>>,
/// Store the txns reconstructed, None means not received
pub reconstructed_txns: Vec<Option<Arc<SignedTransaction>>>,
}

impl Debug for CompactBlock {
Expand All @@ -251,7 +251,7 @@ impl Debug for CompactBlock {

impl MallocSizeOf for CompactBlock {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.tx_short_ids.size_of(ops) + self.reconstructed_txes.size_of(ops)
self.tx_short_ids.size_of(ops) + self.reconstructed_txns.size_of(ops)
}
}

Expand Down Expand Up @@ -354,7 +354,7 @@ impl Decodable for CompactBlock {
block_header: rlp.val_at(0)?,
nonce: rlp.val_at(1)?,
tx_short_ids: short_ids,
reconstructed_txes: Vec::new(),
reconstructed_txns: Vec::new(),
})
}
}