Skip to content

Commit 6665bd3

Browse files
committed
feat: support async api
1 parent 8e6217d commit 6665bd3

28 files changed

+688
-271
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ log = "0.4.6"
2323
reqwest = { version = "0.12", default-features = false, features = ["json"] }
2424
secp256k1 = { version = "0.29.0", features = ["recovery"] }
2525
tokio-util = { version = "0.7.7", features = ["codec"] }
26-
tokio = { version = "1" }
26+
tokio = { version = "1", features = ["full"] }
2727
bytes = "1"
2828
futures = "0.3"
2929
jsonrpc-core = "18"
3030
parking_lot = "0.12"
3131
lru = "0.7.1"
3232
dashmap = "5.4"
3333
dyn-clone = "1.0"
34+
async-trait = "0.1"
3435

3536
ckb-types = "0.119.0"
3637
ckb-dao-utils = "0.119.0"
@@ -50,7 +51,6 @@ ckb-mock-tx-types = { version = "0.119.0" }
5051
ckb-chain-spec = "0.119.0"
5152

5253
sparse-merkle-tree = "0.6.1"
53-
lazy_static = "1.3.0"
5454

5555
[features]
5656
default = ["default-tls"]

deny.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ ignore = [
7474
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
7575
#"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish
7676
#{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate"
77-
"RUSTSEC-2024-0370" # proc-macro-error's maintainer seems to be unreachable, ignore this
77+
"RUSTSEC-2024-0370", # proc-macro-error's maintainer seems to be unreachable, ignore this
78+
"RUSTSEC-2024-0384", # instant is no longer maintained, ignore this
7879
]
7980
# If this is true, then cargo deny will use the git executable to fetch advisory database.
8081
# If this is false, then it uses a built-in git library.
@@ -97,8 +98,8 @@ allow = [
9798
"ISC",
9899
"MIT",
99100
"Unicode-DFS-2016",
100-
"BSL-1.0", # xxhash-rust 0.8.10
101-
101+
"BSL-1.0", # xxhash-rust 0.8.10
102+
"Unicode-3.0",
102103
#"MIT",
103104
#"Apache-2.0",
104105
#"Apache-2.0 WITH LLVM-exception",

examples/script_unlocker_example.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ use std::collections::HashMap;
1717
/// [CapacityDiff]: https://github.com/doitian/ckb-sdk-examples-capacity-diff
1818
struct CapacityDiffUnlocker {}
1919

20+
#[async_trait::async_trait]
2021
impl ScriptUnlocker for CapacityDiffUnlocker {
2122
// This works for any args
2223
fn match_args(&self, _args: &[u8]) -> bool {
2324
true
2425
}
2526

26-
fn unlock(
27+
async fn unlock_async(
2728
&self,
2829
tx: &TransactionView,
2930
script_group: &ScriptGroup,
@@ -45,12 +46,14 @@ impl ScriptUnlocker for CapacityDiffUnlocker {
4546

4647
let mut total = 0i64;
4748
for i in &script_group.input_indices {
48-
let cell = tx_dep_provider.get_cell(
49-
&tx.inputs()
50-
.get(*i)
51-
.ok_or_else(|| other_unlock_error("input index out of bound"))?
52-
.previous_output(),
53-
)?;
49+
let cell = tx_dep_provider
50+
.get_cell_async(
51+
&tx.inputs()
52+
.get(*i)
53+
.ok_or_else(|| other_unlock_error("input index out of bound"))?
54+
.previous_output(),
55+
)
56+
.await?;
5457
let capacity: u64 = cell.capacity().unpack();
5558
total -= capacity as i64;
5659
}
@@ -71,7 +74,7 @@ impl ScriptUnlocker for CapacityDiffUnlocker {
7174

7275
// This is called before balancer. It's responsible to fill witness for inputs added manually
7376
// by users.
74-
fn fill_placeholder_witness(
77+
async fn fill_placeholder_witness_async(
7578
&self,
7679
tx: &TransactionView,
7780
script_group: &ScriptGroup,

rust-toolchain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.75.0
1+
1.81.0

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub mod test_util;
1616
#[cfg(test)]
1717
mod tests;
1818

19-
pub use rpc::{CkbRpcClient, IndexerRpcClient, RpcError};
19+
pub use rpc::{CkbRpcAsyncClient, CkbRpcClient, IndexerRpcAsyncClient, IndexerRpcClient, RpcError};
2020
pub use types::{
2121
Address, AddressPayload, AddressType, CodeHashIndex, HumanCapacity, NetworkInfo, NetworkType,
2222
OldAddress, OldAddressFormat, ScriptGroup, ScriptGroupType, ScriptId, Since, SinceType,

src/rpc/ckb.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ crate::jsonrpc!(pub struct CkbRpcClient {
110110
pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity;
111111
});
112112

113-
crate::jsonrpc_async!(pub struct CkbRpcAyncClient {
113+
crate::jsonrpc_async!(pub struct CkbRpcAsyncClient {
114114
// Chain
115115
pub fn get_block(&self, hash: H256) -> Option<BlockView>;
116116
pub fn get_block_by_number(&self, number: BlockNumber) -> Option<BlockView>;
@@ -212,6 +212,15 @@ fn transform_cycles(cycles: Option<Vec<ckb_jsonrpc_types::Cycle>>) -> Vec<Cycle>
212212
.unwrap_or_default()
213213
}
214214

215+
impl From<&CkbRpcClient> for CkbRpcAsyncClient {
216+
fn from(value: &CkbRpcClient) -> Self {
217+
Self {
218+
client: value.client.clone(),
219+
id: 0.into(),
220+
}
221+
}
222+
}
223+
215224
impl CkbRpcClient {
216225
pub fn get_packed_block(&self, hash: H256) -> Result<Option<JsonBytes>, crate::RpcError> {
217226
self.post("get_block", (hash, Some(Uint32::from(0u32))))

src/rpc/ckb_indexer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,12 @@ crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient {
202202
pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option<JsonBytes>) -> Pagination<Tx>;
203203
pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option<CellsCapacity>;
204204
});
205+
206+
impl From<&IndexerRpcClient> for IndexerRpcAsyncClient {
207+
fn from(value: &IndexerRpcClient) -> Self {
208+
Self {
209+
client: value.client.clone(),
210+
id: 0.into(),
211+
}
212+
}
213+
}

src/rpc/ckb_light_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,12 @@ crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient {
199199
pub fn get_peers(&self) -> Vec<RemoteNode>;
200200
pub fn local_node_info(&self) -> LocalNode;
201201
});
202+
203+
impl From<&LightClientRpcClient> for LightClientRpcAsyncClient {
204+
fn from(value: &LightClientRpcClient) -> Self {
205+
Self {
206+
client: value.client.clone(),
207+
id: 0.into(),
208+
}
209+
}
210+
}

src/rpc/mod.rs

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,60 @@ pub mod ckb_indexer;
33
pub mod ckb_light_client;
44

55
use anyhow::anyhow;
6-
pub use ckb::CkbRpcClient;
7-
pub use ckb_indexer::IndexerRpcClient;
6+
pub use ckb::{CkbRpcAsyncClient, CkbRpcClient};
7+
pub use ckb_indexer::{IndexerRpcAsyncClient, IndexerRpcClient};
88
use ckb_jsonrpc_types::{JsonBytes, ResponseFormat};
9-
pub use ckb_light_client::LightClientRpcClient;
9+
pub use ckb_light_client::{LightClientRpcAsyncClient, LightClientRpcClient};
1010

11-
use std::sync::LazyLock;
11+
use std::{cell::LazyCell, future::Future};
1212
use thiserror::Error;
1313

14-
static RUNTIME: LazyLock<tokio::runtime::Runtime> =
15-
LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
14+
thread_local! {
15+
pub static RUNTIME: LazyCell<tokio::runtime::Runtime> =
16+
LazyCell::new(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap());
17+
}
18+
19+
pub(crate) fn block_on<F: Send>(future: impl Future<Output = F> + Send) -> F {
20+
match tokio::runtime::Handle::try_current() {
21+
Ok(h)
22+
if matches!(
23+
h.runtime_flavor(),
24+
tokio::runtime::RuntimeFlavor::MultiThread
25+
) =>
26+
{
27+
tokio::task::block_in_place(|| {
28+
h.block_on(tokio::time::timeout(
29+
std::time::Duration::from_secs(10),
30+
future,
31+
))
32+
.unwrap()
33+
})
34+
}
35+
// if we on the current runtime, it must use another thread to poll this future,
36+
// can't block on current runtime, it will block current reactor to stop forever
37+
// in tokio runtime, this time will panic
38+
Ok(_) => std::thread::scope(|s| {
39+
s.spawn(|| {
40+
RUNTIME.with(|rt| {
41+
rt.block_on(tokio::time::timeout(
42+
std::time::Duration::from_secs(10),
43+
future,
44+
))
45+
.unwrap()
46+
})
47+
})
48+
.join()
49+
.unwrap()
50+
}),
51+
Err(_) => RUNTIME.with(|rt| {
52+
rt.block_on(tokio::time::timeout(
53+
std::time::Duration::from_secs(10),
54+
future,
55+
))
56+
.unwrap()
57+
}),
58+
}
59+
}
1660

1761
#[derive(Error, Debug)]
1862
pub enum RpcError {
@@ -38,8 +82,8 @@ macro_rules! jsonrpc {
3882
) => (
3983
$(#[$struct_attr])*
4084
pub struct $struct_name {
41-
client: crate::rpc::RpcClient,
42-
id: std::sync::atomic::AtomicU64,
85+
pub(crate) client: $crate::rpc::RpcClient,
86+
pub(crate) id: std::sync::atomic::AtomicU64,
4387
}
4488

4589
impl Clone for $struct_name {
@@ -53,13 +97,13 @@ macro_rules! jsonrpc {
5397

5498
impl $struct_name {
5599
pub fn new(uri: &str) -> Self {
56-
$struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), }
100+
$struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
57101
}
58102

59103
pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->Result<RET, $crate::rpc::RpcError>
60104
where
61-
PARAM:serde::ser::Serialize,
62-
RET: serde::de::DeserializeOwned,
105+
PARAM:serde::ser::Serialize + Send + 'static,
106+
RET: serde::de::DeserializeOwned + Send + 'static,
63107
{
64108
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
65109
let params_fn = || -> Result<_,_> {
@@ -73,7 +117,7 @@ macro_rules! jsonrpc {
73117
};
74118

75119
let task = self.client.post(params_fn);
76-
crate::rpc::RUNTIME.block_on(task)
120+
$crate::rpc::block_on(task)
77121

78122
}
79123

@@ -94,7 +138,7 @@ macro_rules! jsonrpc {
94138
};
95139

96140
let task = $selff.client.post(params_fn);
97-
crate::rpc::RUNTIME.block_on(task)
141+
$crate::rpc::block_on(task)
98142
}
99143
)*
100144
}
@@ -113,8 +157,8 @@ macro_rules! jsonrpc_async {
113157
) => (
114158
$(#[$struct_attr])*
115159
pub struct $struct_name {
116-
client: crate::rpc::RpcClient,
117-
id: std::sync::atomic::AtomicU64,
160+
pub(crate) client: $crate::rpc::RpcClient,
161+
pub(crate) id: std::sync::atomic::AtomicU64,
118162
}
119163

120164
impl Clone for $struct_name {
@@ -128,13 +172,13 @@ macro_rules! jsonrpc_async {
128172

129173
impl $struct_name {
130174
pub fn new(uri: &str) -> Self {
131-
$struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), }
175+
$struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
132176
}
133177

134-
pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>>
178+
pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>> + Send + 'static
135179
where
136-
PARAM:serde::ser::Serialize,
137-
RET: serde::de::DeserializeOwned,
180+
PARAM:serde::ser::Serialize + Send + 'static,
181+
RET: serde::de::DeserializeOwned + Send + 'static,
138182
{
139183
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
140184
let method = serde_json::json!(method);
@@ -196,8 +240,8 @@ impl RpcClient {
196240
json_post_params: T,
197241
) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>>
198242
where
199-
PARAM: serde::ser::Serialize,
200-
RET: serde::de::DeserializeOwned,
243+
PARAM: serde::ser::Serialize + Send + 'static,
244+
RET: serde::de::DeserializeOwned + Send + 'static,
201245
T: FnOnce() -> Result<PARAM, crate::rpc::RpcError>,
202246
{
203247
let url = self.url.clone();

0 commit comments

Comments
 (0)