Skip to content

Commit 2252845

Browse files
authored
Temporary fix for #1138 (#1140)
* Temporary fix for #1138. When GetBlockTxn is timed out, do not send the request again, instead send GetBlocks request.
1 parent 2679c51 commit 2252845

File tree

8 files changed

+62
-50
lines changed

8 files changed

+62
-50
lines changed

core/src/block_data_manager/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,10 +1111,11 @@ impl BlockDataManager {
11111111
.recover_unsigned_tx_with_order(transactions)
11121112
}
11131113

1114-
pub fn build_partial(
1114+
pub fn find_missing_tx_indices_encoded(
11151115
&self, compact_block: &mut CompactBlock,
11161116
) -> Vec<usize> {
1117-
self.tx_data_manager.build_partial(compact_block)
1117+
self.tx_data_manager
1118+
.find_missing_tx_indices_encoded(compact_block)
11181119
}
11191120

11201121
/// Caller should make sure the state exists.

core/src/block_data_manager/tx_data_manager.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ impl TransactionDataManager {
191191
end_idx += 1;
192192
remainder -= 1;
193193
}
194-
let unsigned_txes = Vec::new();
195-
unsigned_trans.push(unsigned_txes);
194+
let unsigned_txns = Vec::new();
195+
unsigned_trans.push(unsigned_txns);
196196
}
197197

198198
unsigned_trans.last_mut().unwrap().push(tx);
@@ -202,14 +202,14 @@ impl TransactionDataManager {
202202

203203
let (sender, receiver) = channel();
204204
let n_thread = unsigned_trans.len();
205-
for unsigned_txes in unsigned_trans {
206-
RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txes.len());
205+
for unsigned_txns in unsigned_trans {
206+
RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
207207
let sender = sender.clone();
208208
self.worker_pool.lock().execute(move || {
209-
let mut signed_txes = Vec::new();
210-
for (idx, tx) in unsigned_txes {
209+
let mut signed_txns = Vec::new();
210+
for (idx, tx) in unsigned_txns {
211211
if let Ok(public) = tx.recover_public() {
212-
signed_txes.push((idx, Arc::new(SignedTransaction::new(
212+
signed_txns.push((idx, Arc::new(SignedTransaction::new(
213213
public,
214214
tx.clone(),
215215
))));
@@ -221,7 +221,7 @@ impl TransactionDataManager {
221221
break;
222222
}
223223
}
224-
sender.send(signed_txes).unwrap();
224+
sender.send(signed_txns).unwrap();
225225
});
226226
}
227227

@@ -243,14 +243,14 @@ impl TransactionDataManager {
243243
}
244244

245245
/// Find tx in tx_time_window that matches tx_short_ids to fill in
246-
/// reconstruced_txes Return the differentially encoded index of missing
246+
/// reconstruced_txns Return the differentially encoded index of missing
247247
/// transactions Now should only called once after CompactBlock is
248248
/// decoded
249-
pub fn build_partial(
249+
pub fn find_missing_tx_indices_encoded(
250250
&self, compact_block: &mut CompactBlock,
251251
) -> Vec<usize> {
252252
compact_block
253-
.reconstructed_txes
253+
.reconstructed_txns
254254
.resize(compact_block.len(), None);
255255

256256
let (random_bytes_vector, fixed_bytes_vector) =
@@ -270,7 +270,7 @@ impl TransactionDataManager {
270270
k1,
271271
) {
272272
Some(tx) => {
273-
compact_block.reconstructed_txes[i] = Some(tx.clone());
273+
compact_block.reconstructed_txns[i] = Some(tx.clone());
274274
}
275275
None => {
276276
missing_index.push(i);

core/src/sync/message/get_block_txn.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
use crate::{
66
message::RequestId,
77
sync::{
8-
message::{Context, GetBlockTxnResponse, Handleable, KeyContainer},
8+
message::{
9+
msgid, Context, GetBlockTxnResponse, GetBlocks, Handleable, Key,
10+
KeyContainer,
11+
},
912
request_manager::{AsAny, Request},
1013
Error, ErrorKind, ProtocolConfiguration,
1114
},
@@ -32,7 +35,10 @@ impl Request for GetBlockTxn {
3235
conf.blocks_request_timeout
3336
}
3437

35-
fn on_removed(&self, _inflight_keys: &KeyContainer) {}
38+
fn on_removed(&self, inflight_keys: &KeyContainer) {
39+
let mut inflight_keys = inflight_keys.write(msgid::GET_BLOCKS);
40+
inflight_keys.remove(&Key::Hash(self.block_hash.clone()));
41+
}
3642

3743
fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {
3844
// reuse the inflight key of GetCompactBlocks
@@ -41,7 +47,13 @@ impl Request for GetBlockTxn {
4147
fn is_empty(&self) -> bool { false }
4248

4349
fn resend(&self) -> Option<Box<dyn Request>> {
44-
Some(Box::new(self.clone()))
50+
Some(Box::new(GetBlocks {
51+
request_id: 0,
52+
// request_block_need_public can only be true in catch_up_mode,
53+
// where GetBlockTxn can not be initiated.
54+
with_public: false,
55+
hashes: vec![self.block_hash.clone()],
56+
}))
4557
}
4658
}
4759

core/src/sync/message/get_block_txn_response.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,21 @@ impl Handleable for GetBlockTxnResponse {
5252
ctx.manager.graph.block_header_by_hash(&resp_hash)
5353
{
5454
debug!("Process blocktxn hash={:?}", resp_hash);
55-
let signed_txes = ctx
55+
let signed_txns = ctx
5656
.manager
5757
.graph
5858
.data_man
5959
.recover_unsigned_tx_with_order(&self.block_txn)?;
6060
match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
6161
Some(cmpct) => {
6262
let mut trans =
63-
Vec::with_capacity(cmpct.reconstructed_txes.len());
63+
Vec::with_capacity(cmpct.reconstructed_txns.len());
6464
let mut index = 0;
65-
for tx in cmpct.reconstructed_txes {
65+
for tx in cmpct.reconstructed_txns {
6666
match tx {
6767
Some(tx) => trans.push(tx),
6868
None => {
69-
trans.push(signed_txes[index].clone());
69+
trans.push(signed_txns[index].clone());
7070
index += 1;
7171
}
7272
}
@@ -98,7 +98,7 @@ impl Handleable for GetBlockTxnResponse {
9898
// added to received pool
9999
ctx.manager
100100
.request_manager
101-
.append_received_transactions(signed_txes);
101+
.append_received_transactions(signed_txns);
102102
}
103103
if insert_result.should_relay()
104104
&& !ctx.manager.catch_up_mode()

core/src/sync/message/get_compact_blocks_response.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl Handleable for GetCompactBlocksResponse {
4949
let mut to_relay_blocks = Vec::new();
5050
let mut received_reconstructed_blocks = Vec::new();
5151

52-
let mut requested: HashSet<H256> = req
52+
let mut requested_except_inflight_txn: HashSet<H256> = req
5353
.downcast_ref::<GetCompactBlocks>(
5454
ctx.io,
5555
&ctx.manager.request_manager,
@@ -62,7 +62,7 @@ impl Handleable for GetCompactBlocksResponse {
6262
for mut cmpct in self.compact_blocks {
6363
let hash = cmpct.hash();
6464

65-
if !requested.contains(&hash) {
65+
if !requested_except_inflight_txn.contains(&hash) {
6666
warn!("Response has not requested compact block {:?}", hash);
6767
continue;
6868
}
@@ -96,7 +96,10 @@ impl Handleable for GetCompactBlocksResponse {
9696
let missing = {
9797
let _timer =
9898
MeterTimer::time_func(CMPCT_BLOCK_RECOVER_TIMER.as_ref());
99-
ctx.manager.graph.data_man.build_partial(&mut cmpct)
99+
ctx.manager
100+
.graph
101+
.data_man
102+
.find_missing_tx_indices_encoded(&mut cmpct)
100103
};
101104
if !missing.is_empty() {
102105
debug!("Request {} missing tx in {}", missing.len(), hash);
@@ -105,10 +108,10 @@ impl Handleable for GetCompactBlocksResponse {
105108
.request_manager
106109
.request_blocktxn(ctx.io, ctx.peer, hash, missing, None);
107110
// The block remains inflight.
108-
requested.remove(&hash);
111+
requested_except_inflight_txn.remove(&hash);
109112
} else {
110113
let trans = cmpct
111-
.reconstructed_txes
114+
.reconstructed_txns
112115
.into_iter()
113116
.map(|tx| tx.unwrap())
114117
.collect();
@@ -138,15 +141,16 @@ impl Handleable for GetCompactBlocksResponse {
138141
// We cannot just mark `self.blocks` as completed here because they
139142
// might be invalid.
140143
let mut received_full_blocks = HashSet::new();
141-
let mut not_block_responded_requests = requested;
144+
let mut compact_block_responded_requests =
145+
requested_except_inflight_txn;
142146
for block in &self.blocks {
143147
received_full_blocks.insert(block.hash());
144-
not_block_responded_requests.remove(&block.hash());
148+
compact_block_responded_requests.remove(&block.hash());
145149
}
146150

147151
ctx.manager.blocks_received(
148152
ctx.io,
149-
not_block_responded_requests.clone(),
153+
compact_block_responded_requests.clone(),
150154
received_reconstructed_blocks.iter().cloned().collect(),
151155
true,
152156
Some(ctx.peer),

core/src/sync/request_manager/mod.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ impl RequestManager {
533533
self.request_with_delay(io, Box::new(request), Some(peer_id), delay);
534534
}
535535

536-
pub fn send_request_again(
536+
fn send_request_again(
537537
&self, io: &dyn NetworkContext, msg: &RequestMessage,
538538
) {
539539
debug!("send_request_again, request={:?}", msg.request);
@@ -657,20 +657,20 @@ impl RequestManager {
657657
/// received or will be requested by the caller again (the case for
658658
/// `Blocktxn`).
659659
pub fn blocks_received(
660-
&self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
660+
&self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
661661
mut received_blocks: HashSet<H256>, ask_full_block: bool,
662662
peer: Option<PeerId>, with_public: bool, delay: Option<Duration>,
663663
)
664664
{
665665
let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
666666
debug!(
667667
"blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}",
668-
req_hashes, received_blocks, peer
668+
requested_hashes, received_blocks, peer
669669
);
670670
let missing_blocks = {
671671
let mut inflight_keys = self.inflight_keys.write(msgid::GET_BLOCKS);
672672
let mut missing_blocks = Vec::new();
673-
for req_hash in &req_hashes {
673+
for req_hash in &requested_hashes {
674674
if !received_blocks.remove(req_hash) {
675675
// If `req_hash` is not in `blocks_in_flight`, it may has
676676
// been received or requested
@@ -910,17 +910,12 @@ impl RequestManager {
910910
}
911911

912912
pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: PeerId) {
913-
if let Some(mut unfinished_requests) =
913+
if let Some(unfinished_requests) =
914914
self.request_handler.remove_peer(peer)
915915
{
916-
{
917-
for msg in &unfinished_requests {
918-
msg.request.on_removed(&self.inflight_keys);
919-
}
920-
}
921-
for msg in unfinished_requests.iter_mut() {
916+
for mut msg in unfinished_requests {
922917
msg.delay = None;
923-
self.send_request_again(io, &msg);
918+
self.resend_request_to_another_peer(io, &msg);
924919
}
925920
} else {
926921
debug!("Peer already removed form request manager when disconnected peer={}", peer);

core/src/sync/synchronization_protocol_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,14 +1417,14 @@ impl SynchronizationProtocolHandler {
14171417
}
14181418

14191419
pub fn blocks_received(
1420-
&self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
1420+
&self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
14211421
returned_blocks: HashSet<H256>, ask_full_block: bool,
14221422
peer: Option<PeerId>, delay: Option<Duration>,
14231423
)
14241424
{
14251425
self.request_manager.blocks_received(
14261426
io,
1427-
req_hashes,
1427+
requested_hashes,
14281428
returned_blocks,
14291429
ask_full_block,
14301430
peer,

primitives/src/block.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ impl Block {
119119
k0,
120120
k1,
121121
),
122-
// reconstructed_txes constructed here will not be used
123-
reconstructed_txes: Vec::new(),
122+
// reconstructed_txns constructed here will not be used
123+
reconstructed_txns: Vec::new(),
124124
}
125125
}
126126

@@ -235,8 +235,8 @@ pub struct CompactBlock {
235235
pub nonce: u64,
236236
/// A list of tx short ids
237237
pub tx_short_ids: Vec<u8>,
238-
/// Store the txes reconstructed, None means not received
239-
pub reconstructed_txes: Vec<Option<Arc<SignedTransaction>>>,
238+
/// Store the txns reconstructed, None means not received
239+
pub reconstructed_txns: Vec<Option<Arc<SignedTransaction>>>,
240240
}
241241

242242
impl Debug for CompactBlock {
@@ -251,7 +251,7 @@ impl Debug for CompactBlock {
251251

252252
impl MallocSizeOf for CompactBlock {
253253
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
254-
self.tx_short_ids.size_of(ops) + self.reconstructed_txes.size_of(ops)
254+
self.tx_short_ids.size_of(ops) + self.reconstructed_txns.size_of(ops)
255255
}
256256
}
257257

@@ -354,7 +354,7 @@ impl Decodable for CompactBlock {
354354
block_header: rlp.val_at(0)?,
355355
nonce: rlp.val_at(1)?,
356356
tx_short_ids: short_ids,
357-
reconstructed_txes: Vec::new(),
357+
reconstructed_txns: Vec::new(),
358358
})
359359
}
360360
}

0 commit comments

Comments
 (0)