Skip to content

Commit c2f8f8c

Browse files
committed
wip
Signed-off-by: Eval EXEC <[email protected]>
1 parent a12cd96 commit c2f8f8c

File tree

4 files changed

+57
-18
lines changed

4 files changed

+57
-18
lines changed

sync/src/synchronizer/get_headers_process.rs

+21-5
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> {
7474
);
7575

7676
self.synchronizer.peers().getheaders_received(self.peer);
77-
let headers_vec: Vec<Vec<core::HeaderView>> =
78-
active_chain.get_locator_responses(block_number, &hash_stop);
79-
// response headers
8077

81-
debug!("headers len={}", headers_vec.len());
82-
for headers in headers_vec {
78+
let hash_size: packed::Uint32 = 20_u32.pack();
79+
let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes());
80+
if hash_stop.eq(&length_20_for_test) {
81+
let headers: Vec<core::HeaderView> =
82+
active_chain.get_locator_response(block_number, &hash_stop);
83+
// response headers
84+
85+
debug!("headers len={}", headers.len());
8386
let content = packed::SendHeaders::new_builder()
8487
.headers(headers.into_iter().map(|x| x.data()).pack())
8588
.build();
8689
let message = packed::SyncMessage::new_builder().set(content).build();
8790
attempt!(send_message_to(self.nc, self.peer, &message));
91+
} else {
92+
let headers_vec: Vec<Vec<core::HeaderView>> =
93+
active_chain.get_locator_responses(block_number, &hash_stop);
94+
// response headers
95+
96+
debug!("headers vec len={}", headers_vec.len());
97+
for headers in headers_vec {
98+
let content = packed::SendHeaders::new_builder()
99+
.headers(headers.into_iter().map(|x| x.data()).pack())
100+
.build();
101+
let message = packed::SyncMessage::new_builder().set(content).build();
102+
attempt!(send_message_to(self.nc, self.peer, &message));
103+
}
88104
}
89105
} else {
90106
return StatusCode::GetHeadersMissCommonAncestors

sync/src/synchronizer/headers_process.rs

+19-8
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ impl<'a> HeadersProcess<'a> {
5252
true
5353
}
5454

55-
fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool {
55+
fn is_parent_exists(&self, first_header: &core::HeaderView) -> bool {
5656
let shared: &SyncShared = self.synchronizer.shared();
57-
shared.get_header_fields(first_header.parent_hash).is_some()
57+
shared
58+
.get_header_fields(&first_header.parent_hash())
59+
.is_some()
5860
}
5961

60-
6162
pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult {
6263
let shared: &SyncShared = self.synchronizer.shared();
6364
let verifier = HeaderVerifier::new(shared, shared.consensus());
@@ -98,15 +99,19 @@ impl<'a> HeadersProcess<'a> {
9899

99100
pub fn execute(self) -> Status {
100101
debug!("HeadersProcess begins");
101-
let shared: &SyncShared = self.synchronizer.shared();
102-
let consensus = shared.consensus();
103102
let headers = self
104103
.message
105104
.headers()
106105
.to_entity()
107106
.into_iter()
108107
.map(packed::Header::into_view)
109108
.collect::<Vec<_>>();
109+
self.execute_inner(headers)
110+
}
111+
112+
fn execute_inner(self, headers: Vec<core::HeaderView>) -> Status {
113+
let shared: &SyncShared = self.synchronizer.shared();
114+
let consensus = shared.consensus();
110115

111116
if headers.len() > MAX_HEADERS_LEN {
112117
warn!("HeadersProcess is oversized");
@@ -136,7 +141,9 @@ impl<'a> HeadersProcess<'a> {
136141

137142
if !self.is_parent_exists(&headers[0]) {
138143
// put the headers into a memory cache
139-
self.synchronizer.header_cache.insert(headers[0].parent_hash, headers);
144+
self.synchronizer
145+
.header_cache
146+
.insert(headers[0].parent_hash(), headers);
140147
// verify them later
141148
return Status::ok();
142149
}
@@ -226,8 +233,12 @@ impl<'a> HeadersProcess<'a> {
226233
{
227234
// these headers verify success
228235
// may the headers's tail header_hash exist in headers_cahce?
229-
if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){
230-
HeadersProcess::new().execute();
236+
if let Some(headers) = self
237+
.synchronizer
238+
.header_cache
239+
.get(headers.last().expect("last header must exist").hash())
240+
{
241+
return self.execute_inner(headers);
231242
}
232243
}
233244

sync/src/synchronizer/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,14 @@ use ckb_systemtime::unix_time_as_millis;
4444

4545
#[cfg(test)]
4646
use ckb_types::core;
47+
use ckb_types::core::HeaderView;
48+
use ckb_types::packed::Header;
4749
use ckb_types::{
4850
core::BlockNumber,
4951
packed::{self, Byte32},
5052
prelude::*,
5153
};
54+
use std::collections::HashMap;
5255
use std::{
5356
collections::HashSet,
5457
sync::{atomic::Ordering, Arc},
@@ -303,7 +306,7 @@ pub struct Synchronizer {
303306
pub shared: Arc<SyncShared>,
304307

305308
// First Headers's parent_hash -> Headers
306-
pub(crate) header_cache: HashMap<Byte32, Vec<Header>>,
309+
pub(crate) header_cache: HashMap<Byte32, Vec<HeaderView>>,
307310
fetch_channel: Option<channel::Sender<FetchCMD>>,
308311
}
309312

@@ -312,10 +315,12 @@ impl Synchronizer {
312315
///
313316
/// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
314317
pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
318+
let header_cache = HashMap::new();
315319
Synchronizer {
316320
chain,
317321
shared,
318322
fetch_channel: None,
323+
header_cache,
319324
}
320325
}
321326

sync/src/types/mod.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -1914,9 +1914,14 @@ impl ActiveChain {
19141914
block_number: BlockNumber,
19151915
hash_stop: &Byte32,
19161916
) -> Vec<Vec<core::HeaderView>> {
1917-
(0..32).iter().map(|index| {
1918-
get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default())
1919-
}).collect();
1917+
(0..32)
1918+
.map(|index| {
1919+
self.get_locator_response(
1920+
block_number + (index as u64 * MAX_HEADERS_LEN as u64),
1921+
&Byte32::default(),
1922+
)
1923+
})
1924+
.collect()
19201925
}
19211926

19221927
pub fn send_getheaders_to_peer(
@@ -1955,9 +1960,11 @@ impl ActiveChain {
19551960
block_number_and_hash.hash()
19561961
);
19571962
let locator_hash = self.get_locator(block_number_and_hash);
1963+
let hash_size: packed::Uint32 = 20_u32.pack();
1964+
let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes());
19581965
let content = packed::GetHeaders::new_builder()
19591966
.block_locator_hashes(locator_hash.pack())
1960-
.hash_stop(packed::Byte32::zero())
1967+
.hash_stop(length_20_for_test)
19611968
.build();
19621969
let message = packed::SyncMessage::new_builder().set(content).build();
19631970
let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);

0 commit comments

Comments
 (0)