Skip to content

Commit 182f1ca

Browse files
authored
Merge pull request #254 from eval-exec/exec/wasm-refactor
Refactor: Reduce wasm32 and non-wasm32 platform duplication code
2 parents aa31df1 + f439702 commit 182f1ca

File tree

8 files changed

+117
-211
lines changed

8 files changed

+117
-211
lines changed

.github/workflows/ci.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ jobs:
4848
uses: actions/setup-node@v4
4949
with:
5050
node-version: 20
51+
- name: Install cargo-binstall
52+
uses: cargo-bins/cargo-binstall@v1.16.3
5153
- name: Install dependencies
5254
run: |
53-
cargo install wasm-pack
55+
cargo binstall wasm-pack
5456
- name: Build
5557
run: |
5658
make build-wasm

light-client-lib/src/protocols/filter/block_filter.rs

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use super::{components, BAD_MESSAGE_BAN_TIME};
22
use crate::protocols::{Peers, Status, StatusCode};
33
use crate::storage::Storage;
4-
use crate::types::RwLock;
4+
use crate::types::{Duration, Instant, RwLock};
55
use crate::utils::network::prove_or_download_matched_blocks;
6+
use crate::{read_lock, write_lock};
67
use ckb_constant::sync::INIT_BLOCKS_IN_TRANSIT_PER_PEER;
78
use ckb_network::{
89
async_trait, bytes::Bytes, BoxedCKBProtocolContext, CKBProtocolHandler, PeerIndex,
@@ -12,11 +13,7 @@ use golomb_coded_set::{GCSFilterReader, SipHasher24Builder, M, P};
1213
use log::{debug, info, log_enabled, trace, warn, Level};
1314
use rand::seq::SliceRandom as _;
1415
use std::io::Cursor;
15-
#[cfg(not(target_arch = "wasm32"))]
16-
use std::time::Instant;
17-
use std::{sync::Arc, time::Duration};
18-
#[cfg(target_arch = "wasm32")]
19-
use web_time::Instant;
16+
use std::sync::Arc;
2017

2118
pub(crate) const GET_BLOCK_FILTERS_TOKEN: u64 = 0;
2219
pub(crate) const GET_BLOCK_FILTER_HASHES_TOKEN: u64 = 1;
@@ -82,33 +79,18 @@ impl FilterProtocol {
8279
}
8380

8481
async fn should_ask(&self, immediately: bool) -> bool {
85-
#[cfg(target_arch = "wasm32")]
86-
let result = !self.storage.is_filter_scripts_empty()
82+
let last_ask = read_lock!(self.last_ask_time);
83+
!self.storage.is_filter_scripts_empty()
8784
&& (immediately
88-
|| self.last_ask_time.read().await.is_none()
89-
|| self.last_ask_time.read().await.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT);
90-
#[cfg(not(target_arch = "wasm32"))]
91-
let result = !self.storage.is_filter_scripts_empty()
92-
&& (immediately
93-
|| self.last_ask_time.read().unwrap().is_none()
94-
|| self.last_ask_time.read().unwrap().unwrap().elapsed()
95-
> GET_BLOCK_FILTERS_TIMEOUT);
96-
97-
result
85+
|| last_ask.is_none()
86+
|| last_ask.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT)
9887
}
99-
#[cfg(target_arch = "wasm32")]
10088
pub async fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
10189
self.storage.update_min_filtered_block_number(block_number);
10290
self.peers
10391
.update_min_filtered_block_number(block_number)
10492
.await;
105-
self.last_ask_time.write().await.replace(Instant::now());
106-
}
107-
#[cfg(not(target_arch = "wasm32"))]
108-
pub fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
109-
self.storage.update_min_filtered_block_number(block_number);
110-
self.peers.update_min_filtered_block_number(block_number);
111-
self.last_ask_time.write().unwrap().replace(Instant::now());
93+
write_lock!(self.last_ask_time).replace(Instant::now());
11294
}
11395
pub(crate) async fn try_send_get_block_filters(
11496
&self,
@@ -129,12 +111,8 @@ impl FilterProtocol {
129111
let finalized_check_point_number = self
130112
.peers
131113
.calc_check_point_number(finalized_check_point_index);
132-
#[cfg(target_arch = "wasm32")]
133114
let (cached_check_point_index, cached_hashes) =
134115
self.peers.get_cached_block_filter_hashes().await;
135-
#[cfg(not(target_arch = "wasm32"))]
136-
let (cached_check_point_index, cached_hashes) =
137-
self.peers.get_cached_block_filter_hashes();
138116

139117
let cached_check_point_number =
140118
self.peers.calc_check_point_number(cached_check_point_index);
@@ -213,19 +191,12 @@ impl FilterProtocol {
213191

214192
pub(crate) async fn try_send_get_block_filter_hashes(&self, nc: BoxedCKBProtocolContext) {
215193
let min_filtered_block_number = self.storage.get_min_filtered_block_number();
216-
#[cfg(target_arch = "wasm32")]
217194
self.peers
218195
.update_min_filtered_block_number(min_filtered_block_number)
219196
.await;
220-
#[cfg(not(target_arch = "wasm32"))]
221-
self.peers
222-
.update_min_filtered_block_number(min_filtered_block_number);
223197

224198
let finalized_check_point_index = self.storage.get_max_check_point_index();
225-
#[cfg(target_arch = "wasm32")]
226199
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().await.0;
227-
#[cfg(not(target_arch = "wasm32"))]
228-
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().0;
229200

230201
if let Some(start_number) = self
231202
.peers

light-client-lib/src/protocols/filter/components/block_filter_hashes_process.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,8 @@ impl<'a> BlockFilterHashesProcess<'a> {
6767
.protocol
6868
.peers
6969
.calc_check_point_number(finalized_check_point_index);
70-
#[cfg(target_arch = "wasm32")]
7170
let (cached_check_point_index, cached_hashes) =
7271
self.protocol.peers.get_cached_block_filter_hashes().await;
73-
#[cfg(not(target_arch = "wasm32"))]
74-
let (cached_check_point_index, cached_hashes) =
75-
self.protocol.peers.get_cached_block_filter_hashes();
7672

7773
let cached_check_point_number = self
7874
.protocol

light-client-lib/src/protocols/filter/components/block_filters_process.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,8 @@ impl<'a> BlockFiltersProcess<'a> {
105105
let (mut parent_block_filter_hash, expected_block_filter_hashes) =
106106
if start_number <= finalized_check_point_number {
107107
// Use cached block filter hashes to check the block filters.
108-
#[cfg(target_arch = "wasm32")]
109108
let (cached_check_point_index, mut cached_block_filter_hashes) =
110109
self.filter.peers.get_cached_block_filter_hashes().await;
111-
#[cfg(not(target_arch = "wasm32"))]
112-
let (cached_check_point_index, mut cached_block_filter_hashes) =
113-
self.filter.peers.get_cached_block_filter_hashes();
114110

115111
let cached_check_point_number = self
116112
.filter
@@ -250,13 +246,9 @@ impl<'a> BlockFiltersProcess<'a> {
250246
.storage
251247
.update_block_number(filtered_block_number)
252248
}
253-
#[cfg(target_arch = "wasm32")]
254249
self.filter
255250
.update_min_filtered_block_number(filtered_block_number)
256251
.await;
257-
#[cfg(not(target_arch = "wasm32"))]
258-
self.filter
259-
.update_min_filtered_block_number(filtered_block_number);
260252

261253
let could_request_more_block_filters = self
262254
.filter

light-client-lib/src/protocols/light_client/peers.rs

Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota,
1919

2020
use super::prelude::*;
2121
use crate::{
22+
mutex_lock,
2223
protocols::{Status, StatusCode, BAD_MESSAGE_ALLOWED_EACH_HOUR, MESSAGE_TIMEOUT},
24+
read_lock,
2325
types::{Mutex, RwLock},
26+
write_lock,
2427
};
2528

2629
pub type BadMessageRateLimiter<T> = RateLimiter<T, DefaultKeyedStateStore<T>, DefaultClock>;
@@ -1299,10 +1302,7 @@ impl Peers {
12991302
self.mark_fetching_headers_timeout(index);
13001303
self.mark_fetching_txs_timeout(index);
13011304
self.inner.remove(&index);
1302-
#[cfg(target_arch = "wasm32")]
1303-
self.rate_limiter.lock().await.retain_recent();
1304-
#[cfg(not(target_arch = "wasm32"))]
1305-
let _ignore_error = self.rate_limiter.lock().map(|inner| inner.retain_recent());
1305+
mutex_lock!(self.rate_limiter).retain_recent();
13061306
}
13071307

13081308
pub(crate) fn get_peers_index(&self) -> Vec<PeerIndex> {
@@ -1644,64 +1644,33 @@ impl Peers {
16441644
Err(StatusCode::PeerIsNotFound.into())
16451645
}
16461646
}
1647-
#[cfg(target_arch = "wasm32")]
16481647
pub(crate) async fn update_min_filtered_block_number(
16491648
&self,
16501649
min_filtered_block_number: BlockNumber,
16511650
) {
16521651
let should_cached_check_point_index =
16531652
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
1654-
let current_cached_check_point_index = self.cached_block_filter_hashes.read().await.0;
1653+
let current_cached_check_point_index = read_lock!(self.cached_block_filter_hashes).0;
16551654
if current_cached_check_point_index != should_cached_check_point_index {
1656-
let mut tmp = self.cached_block_filter_hashes.write().await;
1657-
tmp.0 = should_cached_check_point_index;
1658-
tmp.1.clear();
1659-
}
1660-
}
1661-
#[cfg(not(target_arch = "wasm32"))]
1662-
pub(crate) fn update_min_filtered_block_number(&self, min_filtered_block_number: BlockNumber) {
1663-
let should_cached_check_point_index =
1664-
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
1665-
let current_cached_check_point_index =
1666-
self.cached_block_filter_hashes.read().expect("poisoned").0;
1667-
if current_cached_check_point_index != should_cached_check_point_index {
1668-
let mut tmp = self.cached_block_filter_hashes.write().expect("poisoned");
1655+
let mut tmp = write_lock!(self.cached_block_filter_hashes);
16691656
tmp.0 = should_cached_check_point_index;
16701657
tmp.1.clear();
16711658
}
16721659
}
16731660

1674-
#[cfg(target_arch = "wasm32")]
16751661
pub(crate) async fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
1676-
self.cached_block_filter_hashes.read().await.clone()
1677-
}
1678-
#[cfg(not(target_arch = "wasm32"))]
1679-
pub(crate) fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
1680-
self.cached_block_filter_hashes
1681-
.read()
1682-
.expect("poisoned")
1683-
.clone()
1662+
read_lock!(self.cached_block_filter_hashes).clone()
16841663
}
16851664
pub(crate) async fn update_cached_block_filter_hashes(&self, hashes: Vec<packed::Byte32>) {
1686-
#[cfg(target_arch = "wasm32")]
1687-
{
1688-
self.cached_block_filter_hashes.write().await.1 = hashes;
1689-
}
1690-
#[cfg(not(target_arch = "wasm32"))]
1691-
{
1692-
self.cached_block_filter_hashes.write().expect("poisoned").1 = hashes;
1693-
}
1665+
write_lock!(self.cached_block_filter_hashes).1 = hashes;
16941666
}
16951667

16961668
pub(crate) async fn if_cached_block_filter_hashes_require_update(
16971669
&self,
16981670
finalized_check_point_index: u32,
16991671
) -> Option<BlockNumber> {
17001672
let (cached_index, cached_length) = {
1701-
#[cfg(target_arch = "wasm32")]
1702-
let tmp = self.cached_block_filter_hashes.read().await;
1703-
#[cfg(not(target_arch = "wasm32"))]
1704-
let tmp = self.cached_block_filter_hashes.read().expect("poisoned");
1673+
let tmp = read_lock!(self.cached_block_filter_hashes);
17051674
(tmp.0, tmp.1.len())
17061675
};
17071676
if cached_index >= finalized_check_point_index {
@@ -1884,10 +1853,7 @@ impl Peers {
18841853
// Check:
18851854
// - If cached block filter hashes is same check point as the required,
18861855
// - If all block filter hashes in that check point are downloaded.
1887-
#[cfg(target_arch = "wasm32")]
18881856
let cached_data = self.get_cached_block_filter_hashes().await;
1889-
#[cfg(not(target_arch = "wasm32"))]
1890-
let cached_data = self.get_cached_block_filter_hashes();
18911857

18921858
let current_cached_check_point_index = cached_data.0;
18931859
should_cached_check_point_index == current_cached_check_point_index
@@ -2026,22 +1992,10 @@ impl Peers {
20261992
if self.bad_message_allowed_each_hour == 0 {
20271993
return true;
20281994
}
2029-
#[cfg(target_arch = "wasm32")]
2030-
let result = self
2031-
.rate_limiter
2032-
.lock()
2033-
.await
1995+
mutex_lock!(self.rate_limiter)
20341996
.check_key(&peer_index)
20351997
.map_err(|_| ())
2036-
.is_err();
2037-
#[cfg(not(target_arch = "wasm32"))]
2038-
let result = self
2039-
.rate_limiter
2040-
.lock()
2041-
.map_err(|_| ())
2042-
.and_then(|inner| inner.check_key(&peer_index).map_err(|_| ()))
2043-
.is_err();
2044-
result
1998+
.is_err()
20451999
}
20462000
}
20472001

light-client-lib/src/protocols/relayer.rs

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@ use linked_hash_map::LinkedHashMap;
99
use log::{debug, trace, warn};
1010
use std::collections::{HashMap, HashSet};
1111
use std::sync::Arc;
12-
#[cfg(not(target_arch = "wasm32"))]
13-
use std::time::{Duration, Instant};
14-
#[cfg(target_arch = "wasm32")]
15-
use web_time::{Duration, Instant};
1612

1713
use crate::protocols::{Peers, BAD_MESSAGE_BAN_TIME};
1814
use crate::storage::Storage;
19-
use crate::types::RwLock;
15+
use crate::types::{Duration, Instant, RwLock};
16+
use crate::{read_lock, write_lock};
2017

2118
const CHECK_PENDING_TXS_TOKEN: u64 = 0;
2219

@@ -167,37 +164,15 @@ impl CKBProtocolHandler for RelayProtocol {
167164
debug!("peer={} is ckb2023 enabled, ignore", peer);
168165
return;
169166
}
170-
#[cfg(target_arch = "wasm32")]
171-
let flag = self
172-
.pending_txs
173-
.read()
174-
.await
175-
.is_not_empty_and_updated_at(60);
176-
177-
#[cfg(not(target_arch = "wasm32"))]
178-
let flag = self
179-
.pending_txs
180-
.read()
181-
.unwrap()
182-
.is_not_empty_and_updated_at(60);
167+
let flag = read_lock!(self.pending_txs).is_not_empty_and_updated_at(60);
183168

184169
if flag {
185170
let peer_id = nc
186171
.get_peer(peer)
187172
.and_then(|p| extract_peer_id(&p.connected_addr))
188173
.unwrap();
189-
#[cfg(target_arch = "wasm32")]
190-
let tx_hashes = self
191-
.pending_txs
192-
.write()
193-
.await
194-
.fetch_transaction_hashes_for_broadcast(peer_id);
195-
#[cfg(not(target_arch = "wasm32"))]
196-
let tx_hashes = self
197-
.pending_txs
198-
.write()
199-
.unwrap()
200-
.fetch_transaction_hashes_for_broadcast(peer_id);
174+
let tx_hashes =
175+
write_lock!(self.pending_txs).fetch_transaction_hashes_for_broadcast(peer_id);
201176
if !tx_hashes.is_empty() {
202177
let content = packed::RelayTransactionHashes::new_builder()
203178
.tx_hashes(tx_hashes.pack())
@@ -246,10 +221,7 @@ impl CKBProtocolHandler for RelayProtocol {
246221
message.item_name()
247222
);
248223
if let packed::RelayMessageUnionReader::GetRelayTransactions(reader) = message {
249-
#[cfg(target_arch = "wasm32")]
250-
let pending_txs = self.pending_txs.read().await;
251-
#[cfg(not(target_arch = "wasm32"))]
252-
let pending_txs = self.pending_txs.read().expect("read access should be OK");
224+
let pending_txs = read_lock!(self.pending_txs);
253225
let relay_txs: Vec<_> = reader
254226
.tx_hashes()
255227
.iter()
@@ -285,19 +257,7 @@ impl CKBProtocolHandler for RelayProtocol {
285257
CHECK_PENDING_TXS_TOKEN => {
286258
// we check pending txs every 2 seconds, if the timestamp of the pending txs is updated in the last minute
287259
// and connected relay protocol peers is empty, we try to open the protocol and broadcast the pending txs
288-
#[cfg(target_arch = "wasm32")]
289-
let flag = self
290-
.pending_txs
291-
.read()
292-
.await
293-
.is_not_empty_and_updated_at(60);
294-
295-
#[cfg(not(target_arch = "wasm32"))]
296-
let flag = self
297-
.pending_txs
298-
.read()
299-
.unwrap()
300-
.is_not_empty_and_updated_at(60);
260+
let flag = read_lock!(self.pending_txs).is_not_empty_and_updated_at(60);
301261

302262
if flag && self.opened_peers.is_empty() {
303263
let p2p_control = nc.p2p_control().expect("p2p_control should be exist");
@@ -310,10 +270,7 @@ impl CKBProtocolHandler for RelayProtocol {
310270
}
311271
}
312272
} else {
313-
#[cfg(target_arch = "wasm32")]
314-
let mut pending_txs = self.pending_txs.write().await;
315-
#[cfg(not(target_arch = "wasm32"))]
316-
let mut pending_txs = self.pending_txs.write().unwrap();
273+
let mut pending_txs = write_lock!(self.pending_txs);
317274
for (&peer, instant) in self.opened_peers.iter_mut() {
318275
if let Some(peer_id) = nc
319276
.get_peer(peer)

0 commit comments

Comments
 (0)