Skip to content

Commit 7814a45

Browse files
committed
use read_lock, write_lock
Signed-off-by: Eval EXEC <execvy@gmail.com>
1 parent 421295f commit 7814a45

File tree

5 files changed

+31
-100
lines changed

5 files changed

+31
-100
lines changed

light-client-lib/src/protocols/filter/block_filter.rs

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::protocols::{Peers, Status, StatusCode};
33
use crate::storage::Storage;
44
use crate::types::{Duration, Instant, RwLock};
55
use crate::utils::network::prove_or_download_matched_blocks;
6+
use crate::{read_lock, write_lock};
67
use ckb_constant::sync::INIT_BLOCKS_IN_TRANSIT_PER_PEER;
78
use ckb_network::{
89
async_trait, bytes::Bytes, BoxedCKBProtocolContext, CKBProtocolHandler, PeerIndex,
@@ -78,33 +79,18 @@ impl FilterProtocol {
7879
}
7980

8081
async fn should_ask(&self, immediately: bool) -> bool {
81-
#[cfg(target_arch = "wasm32")]
82-
let result = !self.storage.is_filter_scripts_empty()
82+
let last_ask = read_lock!(self.last_ask_time);
83+
!self.storage.is_filter_scripts_empty()
8384
&& (immediately
84-
|| self.last_ask_time.read().await.is_none()
85-
|| self.last_ask_time.read().await.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT);
86-
#[cfg(not(target_arch = "wasm32"))]
87-
let result = !self.storage.is_filter_scripts_empty()
88-
&& (immediately
89-
|| self.last_ask_time.read().unwrap().is_none()
90-
|| self.last_ask_time.read().unwrap().unwrap().elapsed()
91-
> GET_BLOCK_FILTERS_TIMEOUT);
92-
93-
result
85+
|| last_ask.is_none()
86+
|| last_ask.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT)
9487
}
95-
#[cfg(target_arch = "wasm32")]
9688
pub async fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
9789
self.storage.update_min_filtered_block_number(block_number);
9890
self.peers
9991
.update_min_filtered_block_number(block_number)
10092
.await;
101-
self.last_ask_time.write().await.replace(Instant::now());
102-
}
103-
#[cfg(not(target_arch = "wasm32"))]
104-
pub fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
105-
self.storage.update_min_filtered_block_number(block_number);
106-
self.peers.update_min_filtered_block_number(block_number);
107-
self.last_ask_time.write().unwrap().replace(Instant::now());
93+
write_lock!(self.last_ask_time).replace(Instant::now());
10894
}
10995
pub(crate) async fn try_send_get_block_filters(
11096
&self,
@@ -125,12 +111,8 @@ impl FilterProtocol {
125111
let finalized_check_point_number = self
126112
.peers
127113
.calc_check_point_number(finalized_check_point_index);
128-
#[cfg(target_arch = "wasm32")]
129114
let (cached_check_point_index, cached_hashes) =
130115
self.peers.get_cached_block_filter_hashes().await;
131-
#[cfg(not(target_arch = "wasm32"))]
132-
let (cached_check_point_index, cached_hashes) =
133-
self.peers.get_cached_block_filter_hashes();
134116

135117
let cached_check_point_number =
136118
self.peers.calc_check_point_number(cached_check_point_index);
@@ -209,19 +191,12 @@ impl FilterProtocol {
209191

210192
pub(crate) async fn try_send_get_block_filter_hashes(&self, nc: BoxedCKBProtocolContext) {
211193
let min_filtered_block_number = self.storage.get_min_filtered_block_number();
212-
#[cfg(target_arch = "wasm32")]
213194
self.peers
214195
.update_min_filtered_block_number(min_filtered_block_number)
215196
.await;
216-
#[cfg(not(target_arch = "wasm32"))]
217-
self.peers
218-
.update_min_filtered_block_number(min_filtered_block_number);
219197

220198
let finalized_check_point_index = self.storage.get_max_check_point_index();
221-
#[cfg(target_arch = "wasm32")]
222199
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().await.0;
223-
#[cfg(not(target_arch = "wasm32"))]
224-
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().0;
225200

226201
if let Some(start_number) = self
227202
.peers

light-client-lib/src/protocols/filter/components/block_filter_hashes_process.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,8 @@ impl<'a> BlockFilterHashesProcess<'a> {
6767
.protocol
6868
.peers
6969
.calc_check_point_number(finalized_check_point_index);
70-
#[cfg(target_arch = "wasm32")]
7170
let (cached_check_point_index, cached_hashes) =
7271
self.protocol.peers.get_cached_block_filter_hashes().await;
73-
#[cfg(not(target_arch = "wasm32"))]
74-
let (cached_check_point_index, cached_hashes) =
75-
self.protocol.peers.get_cached_block_filter_hashes();
7672

7773
let cached_check_point_number = self
7874
.protocol

light-client-lib/src/protocols/filter/components/block_filters_process.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,8 @@ impl<'a> BlockFiltersProcess<'a> {
105105
let (mut parent_block_filter_hash, expected_block_filter_hashes) =
106106
if start_number <= finalized_check_point_number {
107107
// Use cached block filter hashes to check the block filters.
108-
#[cfg(target_arch = "wasm32")]
109108
let (cached_check_point_index, mut cached_block_filter_hashes) =
110109
self.filter.peers.get_cached_block_filter_hashes().await;
111-
#[cfg(not(target_arch = "wasm32"))]
112-
let (cached_check_point_index, mut cached_block_filter_hashes) =
113-
self.filter.peers.get_cached_block_filter_hashes();
114110

115111
let cached_check_point_number = self
116112
.filter
@@ -250,13 +246,9 @@ impl<'a> BlockFiltersProcess<'a> {
250246
.storage
251247
.update_block_number(filtered_block_number)
252248
}
253-
#[cfg(target_arch = "wasm32")]
254249
self.filter
255250
.update_min_filtered_block_number(filtered_block_number)
256251
.await;
257-
#[cfg(not(target_arch = "wasm32"))]
258-
self.filter
259-
.update_min_filtered_block_number(filtered_block_number);
260252

261253
let could_request_more_block_filters = self
262254
.filter

light-client-lib/src/protocols/light_client/peers.rs

Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota,
1919

2020
use super::prelude::*;
2121
use crate::{
22+
mutex_lock,
2223
protocols::{Status, StatusCode, BAD_MESSAGE_ALLOWED_EACH_HOUR, MESSAGE_TIMEOUT},
24+
read_lock,
2325
types::{Mutex, RwLock},
26+
write_lock,
2427
};
2528

2629
pub type BadMessageRateLimiter<T> = RateLimiter<T, DefaultKeyedStateStore<T>, DefaultClock>;
@@ -1299,10 +1302,7 @@ impl Peers {
12991302
self.mark_fetching_headers_timeout(index);
13001303
self.mark_fetching_txs_timeout(index);
13011304
self.inner.remove(&index);
1302-
#[cfg(target_arch = "wasm32")]
1303-
self.rate_limiter.lock().await.retain_recent();
1304-
#[cfg(not(target_arch = "wasm32"))]
1305-
let _ignore_error = self.rate_limiter.lock().map(|inner| inner.retain_recent());
1305+
mutex_lock!(self.rate_limiter).retain_recent();
13061306
}
13071307

13081308
pub(crate) fn get_peers_index(&self) -> Vec<PeerIndex> {
@@ -1644,64 +1644,33 @@ impl Peers {
16441644
Err(StatusCode::PeerIsNotFound.into())
16451645
}
16461646
}
1647-
#[cfg(target_arch = "wasm32")]
16481647
pub(crate) async fn update_min_filtered_block_number(
16491648
&self,
16501649
min_filtered_block_number: BlockNumber,
16511650
) {
16521651
let should_cached_check_point_index =
16531652
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
1654-
let current_cached_check_point_index = self.cached_block_filter_hashes.read().await.0;
1653+
let current_cached_check_point_index = read_lock!(self.cached_block_filter_hashes).0;
16551654
if current_cached_check_point_index != should_cached_check_point_index {
1656-
let mut tmp = self.cached_block_filter_hashes.write().await;
1657-
tmp.0 = should_cached_check_point_index;
1658-
tmp.1.clear();
1659-
}
1660-
}
1661-
#[cfg(not(target_arch = "wasm32"))]
1662-
pub(crate) fn update_min_filtered_block_number(&self, min_filtered_block_number: BlockNumber) {
1663-
let should_cached_check_point_index =
1664-
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
1665-
let current_cached_check_point_index =
1666-
self.cached_block_filter_hashes.read().expect("poisoned").0;
1667-
if current_cached_check_point_index != should_cached_check_point_index {
1668-
let mut tmp = self.cached_block_filter_hashes.write().expect("poisoned");
1655+
let mut tmp = write_lock!(self.cached_block_filter_hashes);
16691656
tmp.0 = should_cached_check_point_index;
16701657
tmp.1.clear();
16711658
}
16721659
}
16731660

1674-
#[cfg(target_arch = "wasm32")]
16751661
pub(crate) async fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
1676-
self.cached_block_filter_hashes.read().await.clone()
1677-
}
1678-
#[cfg(not(target_arch = "wasm32"))]
1679-
pub(crate) fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
1680-
self.cached_block_filter_hashes
1681-
.read()
1682-
.expect("poisoned")
1683-
.clone()
1662+
read_lock!(self.cached_block_filter_hashes).clone()
16841663
}
16851664
pub(crate) async fn update_cached_block_filter_hashes(&self, hashes: Vec<packed::Byte32>) {
1686-
#[cfg(target_arch = "wasm32")]
1687-
{
1688-
self.cached_block_filter_hashes.write().await.1 = hashes;
1689-
}
1690-
#[cfg(not(target_arch = "wasm32"))]
1691-
{
1692-
self.cached_block_filter_hashes.write().expect("poisoned").1 = hashes;
1693-
}
1665+
write_lock!(self.cached_block_filter_hashes).1 = hashes;
16941666
}
16951667

16961668
pub(crate) async fn if_cached_block_filter_hashes_require_update(
16971669
&self,
16981670
finalized_check_point_index: u32,
16991671
) -> Option<BlockNumber> {
17001672
let (cached_index, cached_length) = {
1701-
#[cfg(target_arch = "wasm32")]
1702-
let tmp = self.cached_block_filter_hashes.read().await;
1703-
#[cfg(not(target_arch = "wasm32"))]
1704-
let tmp = self.cached_block_filter_hashes.read().expect("poisoned");
1673+
let tmp = read_lock!(self.cached_block_filter_hashes);
17051674
(tmp.0, tmp.1.len())
17061675
};
17071676
if cached_index >= finalized_check_point_index {
@@ -1884,10 +1853,7 @@ impl Peers {
18841853
// Check:
18851854
// - If cached block filter hashes is same check point as the required,
18861855
// - If all block filter hashes in that check point are downloaded.
1887-
#[cfg(target_arch = "wasm32")]
18881856
let cached_data = self.get_cached_block_filter_hashes().await;
1889-
#[cfg(not(target_arch = "wasm32"))]
1890-
let cached_data = self.get_cached_block_filter_hashes();
18911857

18921858
let current_cached_check_point_index = cached_data.0;
18931859
should_cached_check_point_index == current_cached_check_point_index
@@ -2026,22 +1992,10 @@ impl Peers {
20261992
if self.bad_message_allowed_each_hour == 0 {
20271993
return true;
20281994
}
2029-
#[cfg(target_arch = "wasm32")]
2030-
let result = self
2031-
.rate_limiter
2032-
.lock()
2033-
.await
1995+
mutex_lock!(self.rate_limiter)
20341996
.check_key(&peer_index)
20351997
.map_err(|_| ())
2036-
.is_err();
2037-
#[cfg(not(target_arch = "wasm32"))]
2038-
let result = self
2039-
.rate_limiter
2040-
.lock()
2041-
.map_err(|_| ())
2042-
.and_then(|inner| inner.check_key(&peer_index).map_err(|_| ()))
2043-
.is_err();
2044-
result
1998+
.is_err()
20451999
}
20462000
}
20472001

light-client-lib/src/types.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,17 @@ macro_rules! write_lock {
9191
}
9292
}};
9393
}
94+
95+
#[macro_export]
96+
macro_rules! mutex_lock {
97+
($lock:expr) => {{
98+
#[cfg(target_arch = "wasm32")]
99+
{
100+
$lock.lock().await
101+
}
102+
#[cfg(not(target_arch = "wasm32"))]
103+
{
104+
$lock.lock().unwrap()
105+
}
106+
}};
107+
}

0 commit comments

Comments
 (0)