Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions light-client-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ anyhow = "1.0.56"
thiserror = "1.0.30"
toml = "0.5.8"
tokio = { version = "1.20" }
async-trait = "0.1.56"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [
Expand Down
16 changes: 3 additions & 13 deletions light-client-lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::{fmt, result};

use thiserror::Error;

pub mod db;

#[derive(Error, Debug)]
pub enum Error {
#[error("config error: {0}")]
Expand All @@ -11,20 +13,8 @@ pub enum Error {
#[error("runtime error: {0}")]
Runtime(String),

#[cfg(not(target_arch = "wasm32"))]
#[error("db error: {0}")]
Db(#[from] rocksdb::Error),

#[cfg(target_arch = "wasm32")]
#[error("db error: {0}")]
Indexdb(String),
}

#[cfg(target_arch = "wasm32")]
impl From<idb::Error> for Error {
fn from(value: idb::Error) -> Self {
Error::Indexdb(value.to_string())
}
Db(#[from] db::DatabaseError),
}

pub type Result<T> = result::Result<T, Error>;
Expand Down
22 changes: 22 additions & 0 deletions light-client-lib/src/error/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Cross-platform database error.

#[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::Error as DatabaseError;

#[cfg(target_arch = "wasm32")]
#[derive(Debug)]
pub struct DatabaseError(String);

#[cfg(target_arch = "wasm32")]
impl std::fmt::Display for DatabaseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

#[cfg(target_arch = "wasm32")]
impl From<idb::Error> for DatabaseError {
fn from(value: idb::Error) -> Self {
DatabaseError(value.to_string())
}
}
2 changes: 2 additions & 0 deletions light-client-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub mod error;
pub mod protocols;
pub mod service;
pub mod storage;
pub mod sync;
pub mod time;
pub mod types;
pub mod utils;
pub mod verify;
49 changes: 12 additions & 37 deletions light-client-lib/src/protocols/filter/block_filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{components, BAD_MESSAGE_BAN_TIME};
use crate::protocols::{Peers, Status, StatusCode};
use crate::storage::Storage;
use crate::types::RwLock;
use crate::sync::{RwLock, RwLockExt};
use crate::time::{Duration, Instant};
use crate::utils::network::prove_or_download_matched_blocks;
use ckb_constant::sync::INIT_BLOCKS_IN_TRANSIT_PER_PEER;
use ckb_network::{
Expand All @@ -12,11 +13,7 @@ use golomb_coded_set::{GCSFilterReader, SipHasher24Builder, M, P};
use log::{debug, info, log_enabled, trace, warn, Level};
use rand::seq::SliceRandom as _;
use std::io::Cursor;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
use std::{sync::Arc, time::Duration};
#[cfg(target_arch = "wasm32")]
use web_time::Instant;
use std::sync::Arc;

pub(crate) const GET_BLOCK_FILTERS_TOKEN: u64 = 0;
pub(crate) const GET_BLOCK_FILTER_HASHES_TOKEN: u64 = 1;
Expand Down Expand Up @@ -82,33 +79,22 @@ impl FilterProtocol {
}

async fn should_ask(&self, immediately: bool) -> bool {
#[cfg(target_arch = "wasm32")]
let result = !self.storage.is_filter_scripts_empty()
let last_ask_time = self.last_ask_time.read_ext().await.unwrap();
!self.storage.is_filter_scripts_empty()
&& (immediately
|| self.last_ask_time.read().await.is_none()
|| self.last_ask_time.read().await.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT);
#[cfg(not(target_arch = "wasm32"))]
let result = !self.storage.is_filter_scripts_empty()
&& (immediately
|| self.last_ask_time.read().unwrap().is_none()
|| self.last_ask_time.read().unwrap().unwrap().elapsed()
> GET_BLOCK_FILTERS_TIMEOUT);

result
|| last_ask_time.is_none()
|| last_ask_time.unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT)
}
#[cfg(target_arch = "wasm32")]
pub async fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
self.storage.update_min_filtered_block_number(block_number);
self.peers
.update_min_filtered_block_number(block_number)
.await;
self.last_ask_time.write().await.replace(Instant::now());
}
#[cfg(not(target_arch = "wasm32"))]
pub fn update_min_filtered_block_number(&self, block_number: BlockNumber) {
self.storage.update_min_filtered_block_number(block_number);
self.peers.update_min_filtered_block_number(block_number);
self.last_ask_time.write().unwrap().replace(Instant::now());
self.last_ask_time
.write_ext()
.await
.unwrap()
.replace(Instant::now());
}
pub(crate) async fn try_send_get_block_filters(
&self,
Expand All @@ -129,12 +115,8 @@ impl FilterProtocol {
let finalized_check_point_number = self
.peers
.calc_check_point_number(finalized_check_point_index);
#[cfg(target_arch = "wasm32")]
let (cached_check_point_index, cached_hashes) =
self.peers.get_cached_block_filter_hashes().await;
#[cfg(not(target_arch = "wasm32"))]
let (cached_check_point_index, cached_hashes) =
self.peers.get_cached_block_filter_hashes();

let cached_check_point_number =
self.peers.calc_check_point_number(cached_check_point_index);
Expand Down Expand Up @@ -213,19 +195,12 @@ impl FilterProtocol {

pub(crate) async fn try_send_get_block_filter_hashes(&self, nc: BoxedCKBProtocolContext) {
let min_filtered_block_number = self.storage.get_min_filtered_block_number();
#[cfg(target_arch = "wasm32")]
self.peers
.update_min_filtered_block_number(min_filtered_block_number)
.await;
#[cfg(not(target_arch = "wasm32"))]
self.peers
.update_min_filtered_block_number(min_filtered_block_number);

let finalized_check_point_index = self.storage.get_max_check_point_index();
#[cfg(target_arch = "wasm32")]
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().await.0;
#[cfg(not(target_arch = "wasm32"))]
let cached_check_point_index = self.peers.get_cached_block_filter_hashes().0;

if let Some(start_number) = self
.peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ impl<'a> BlockFilterHashesProcess<'a> {
.protocol
.peers
.calc_check_point_number(finalized_check_point_index);
#[cfg(target_arch = "wasm32")]
let (cached_check_point_index, cached_hashes) =
self.protocol.peers.get_cached_block_filter_hashes().await;
#[cfg(not(target_arch = "wasm32"))]
let (cached_check_point_index, cached_hashes) =
self.protocol.peers.get_cached_block_filter_hashes();

let cached_check_point_number = self
.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,8 @@ impl<'a> BlockFiltersProcess<'a> {
let (mut parent_block_filter_hash, expected_block_filter_hashes) =
if start_number <= finalized_check_point_number {
// Use cached block filter hashes to check the block filters.
#[cfg(target_arch = "wasm32")]
let (cached_check_point_index, mut cached_block_filter_hashes) =
self.filter.peers.get_cached_block_filter_hashes().await;
#[cfg(not(target_arch = "wasm32"))]
let (cached_check_point_index, mut cached_block_filter_hashes) =
self.filter.peers.get_cached_block_filter_hashes();

let cached_check_point_number = self
.filter
Expand Down Expand Up @@ -250,13 +246,9 @@ impl<'a> BlockFiltersProcess<'a> {
.storage
.update_block_number(filtered_block_number)
}
#[cfg(target_arch = "wasm32")]
self.filter
.update_min_filtered_block_number(filtered_block_number)
.await;
#[cfg(not(target_arch = "wasm32"))]
self.filter
.update_min_filtered_block_number(filtered_block_number);

let could_request_more_block_filters = self
.filter
Expand Down
2 changes: 1 addition & 1 deletion light-client-lib/src/protocols/filter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use crate::time::Duration;

mod block_filter;
mod components;
Expand Down
2 changes: 1 addition & 1 deletion light-client-lib/src/protocols/light_client/constant.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use crate::time::Duration;

pub const REFRESH_PEERS_TOKEN: u64 = 0;
pub const FETCH_HEADER_TX_TOKEN: u64 = 1;
Expand Down
68 changes: 13 additions & 55 deletions light-client-lib/src/protocols/light_client/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota,
use super::prelude::*;
use crate::{
protocols::{Status, StatusCode, BAD_MESSAGE_ALLOWED_EACH_HOUR, MESSAGE_TIMEOUT},
types::{Mutex, RwLock},
sync::{Mutex, MutexExt, RwLock, RwLockExt},
};

pub type BadMessageRateLimiter<T> = RateLimiter<T, DefaultKeyedStateStore<T>, DefaultClock>;
Expand Down Expand Up @@ -1299,10 +1299,7 @@ impl Peers {
self.mark_fetching_headers_timeout(index);
self.mark_fetching_txs_timeout(index);
self.inner.remove(&index);
#[cfg(target_arch = "wasm32")]
self.rate_limiter.lock().await.retain_recent();
#[cfg(not(target_arch = "wasm32"))]
let _ignore_error = self.rate_limiter.lock().map(|inner| inner.retain_recent());
self.rate_limiter.lock_ext().await.unwrap().retain_recent();
}

pub(crate) fn get_peers_index(&self) -> Vec<PeerIndex> {
Expand Down Expand Up @@ -1644,64 +1641,38 @@ impl Peers {
Err(StatusCode::PeerIsNotFound.into())
}
}
#[cfg(target_arch = "wasm32")]
pub(crate) async fn update_min_filtered_block_number(
&self,
min_filtered_block_number: BlockNumber,
) {
let should_cached_check_point_index =
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
let current_cached_check_point_index = self.cached_block_filter_hashes.read().await.0;
if current_cached_check_point_index != should_cached_check_point_index {
let mut tmp = self.cached_block_filter_hashes.write().await;
tmp.0 = should_cached_check_point_index;
tmp.1.clear();
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn update_min_filtered_block_number(&self, min_filtered_block_number: BlockNumber) {
let should_cached_check_point_index =
self.calc_cached_check_point_index_when_sync_at(min_filtered_block_number + 1);
let current_cached_check_point_index =
self.cached_block_filter_hashes.read().expect("poisoned").0;
self.cached_block_filter_hashes.read_ext().await.unwrap().0;
if current_cached_check_point_index != should_cached_check_point_index {
let mut tmp = self.cached_block_filter_hashes.write().expect("poisoned");
let mut tmp = self.cached_block_filter_hashes.write_ext().await.unwrap();
tmp.0 = should_cached_check_point_index;
tmp.1.clear();
}
}

#[cfg(target_arch = "wasm32")]
pub(crate) async fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
self.cached_block_filter_hashes.read().await.clone()
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn get_cached_block_filter_hashes(&self) -> (u32, Vec<packed::Byte32>) {
self.cached_block_filter_hashes
.read()
.expect("poisoned")
.read_ext()
.await
.unwrap()
.clone()
}
pub(crate) async fn update_cached_block_filter_hashes(&self, hashes: Vec<packed::Byte32>) {
#[cfg(target_arch = "wasm32")]
{
self.cached_block_filter_hashes.write().await.1 = hashes;
}
#[cfg(not(target_arch = "wasm32"))]
{
self.cached_block_filter_hashes.write().expect("poisoned").1 = hashes;
}
self.cached_block_filter_hashes.write_ext().await.unwrap().1 = hashes;
}

pub(crate) async fn if_cached_block_filter_hashes_require_update(
&self,
finalized_check_point_index: u32,
) -> Option<BlockNumber> {
let (cached_index, cached_length) = {
#[cfg(target_arch = "wasm32")]
let tmp = self.cached_block_filter_hashes.read().await;
#[cfg(not(target_arch = "wasm32"))]
let tmp = self.cached_block_filter_hashes.read().expect("poisoned");
let tmp = self.cached_block_filter_hashes.read_ext().await.unwrap();
(tmp.0, tmp.1.len())
};
if cached_index >= finalized_check_point_index {
Expand Down Expand Up @@ -1884,10 +1855,7 @@ impl Peers {
// Check:
// - If cached block filter hashes is same check point as the required,
// - If all block filter hashes in that check point are downloaded.
#[cfg(target_arch = "wasm32")]
let cached_data = self.get_cached_block_filter_hashes().await;
#[cfg(not(target_arch = "wasm32"))]
let cached_data = self.get_cached_block_filter_hashes();

let current_cached_check_point_index = cached_data.0;
should_cached_check_point_index == current_cached_check_point_index
Expand Down Expand Up @@ -2026,22 +1994,12 @@ impl Peers {
if self.bad_message_allowed_each_hour == 0 {
return true;
}
#[cfg(target_arch = "wasm32")]
let result = self
.rate_limiter
.lock()
self.rate_limiter
.lock_ext()
.await
.unwrap()
.check_key(&peer_index)
.map_err(|_| ())
.is_err();
#[cfg(not(target_arch = "wasm32"))]
let result = self
.rate_limiter
.lock()
.map_err(|_| ())
.and_then(|inner| inner.check_key(&peer_index).map_err(|_| ()))
.is_err();
result
.is_err()
}
}

Expand Down
2 changes: 1 addition & 1 deletion light-client-lib/src/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use crate::time::Duration;

use ckb_types::core::BlockNumber;

Expand Down
Loading
Loading