Skip to content

Use CKB SDK async API #680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bech32 = "0.9.1"
rand = "0.8.5"
serde_json = { version = "1.0" }
home = "0.5.9"
ckb-sdk = "3.6"
ckb-sdk = "3.7"
thiserror = "1.0.58"
anyhow = "1.0.81"
tentacle = { version = "0.6.6", default-features = false, features = ["upnp", "parking_lot", "openssl-vendored", "tokio-runtime", "tokio-timer", "ws"] }
Expand Down
115 changes: 53 additions & 62 deletions src/ckb/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ckb_sdk::{CkbRpcClient, RpcError};
use ckb_sdk::{CkbRpcAsyncClient, RpcError};
use ckb_types::{
core::{tx_pool::TxStatus, TransactionView},
packed,
Expand Down Expand Up @@ -160,13 +160,11 @@ impl Actor for CkbChainActor {
if !reply_port.is_closed() {
let context = state.build_funding_context(&request);
let exclusion = &mut state.live_cells_exclusion_map;
tokio::task::block_in_place(move || {
let result = tx.fulfill(request, context, exclusion);
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
});
let result = tx.fulfill(request, context, exclusion).await;
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
}
}
CkbChainMessage::AddFundingTx(tx) => {
Expand All @@ -184,62 +182,56 @@ impl Actor for CkbChainActor {
if !reply_port.is_closed() {
let secret_key = state.secret_key;
let rpc_url = state.config.rpc_url.clone();
tokio::task::block_in_place(move || {
let result = tx.sign(secret_key, rpc_url);
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
});
let result = tx.sign(secret_key, rpc_url).await;
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
}
}
CkbChainMessage::SendTx(tx, reply_port) => {
let rpc_url = state.config.rpc_url.clone();
tokio::task::block_in_place(move || {
let ckb_client = CkbRpcClient::new(&rpc_url);
let result = match ckb_client.send_transaction(tx.data().into(), None) {
Ok(_) => Ok(()),
Err(err) => {
//FIXME(yukang): RBF or duplicated transaction handling
match err {
RpcError::Rpc(e)
if (e.code.code() == -1107 || e.code.code() == -1111) =>
{
tracing::warn!(
"[{}] transaction { } already in pool",
myself.get_name().unwrap_or_default(),
tx.hash(),
);
Ok(())
}
_ => {
tracing::error!(
"[{}] send transaction {} failed: {:?}",
myself.get_name().unwrap_or_default(),
tx.hash(),
err
);
Err(err)
}
let ckb_client = CkbRpcAsyncClient::new(&rpc_url);
let result = match ckb_client.send_transaction(tx.data().into(), None).await {
Ok(_) => Ok(()),
Err(err) => {
//FIXME(yukang): RBF or duplicated transaction handling
match err {
RpcError::Rpc(e)
if (e.code.code() == -1107 || e.code.code() == -1111) =>
{
tracing::warn!(
"[{}] transaction { } already in pool",
myself.get_name().unwrap_or_default(),
tx.hash(),
);
Ok(())
}
_ => {
tracing::error!(
"[{}] send transaction {} failed: {:?}",
myself.get_name().unwrap_or_default(),
tx.hash(),
err
);
Err(err)
}
}
};
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
});
};
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result);
}
}
CkbChainMessage::GetTx(tx_hash, reply_port) => {
let rpc_url = state.config.rpc_url.clone();
tokio::task::block_in_place(move || {
let ckb_client = CkbRpcClient::new(&rpc_url);
let result = ckb_client.get_transaction(tx_hash.into());
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result.map(Into::into));
}
});
let ckb_client = CkbRpcAsyncClient::new(&rpc_url);
let result = ckb_client.get_transaction(tx_hash.into()).await;
if !reply_port.is_closed() {
// ignore error
let _ = reply_port.send(result.map(Into::into));
}
}
CkbChainMessage::CreateTxTracer(tracer) => {
debug!(
Expand All @@ -262,14 +254,13 @@ impl Actor for CkbChainActor {
reply_port,
) => {
let rpc_url = state.config.rpc_url.clone();
tokio::task::block_in_place(move || {
let ckb_client = CkbRpcClient::new(&rpc_url);
let _ = reply_port.send(
ckb_client
.get_header(block_hash.into())
.map(|x| x.map(|x| x.inner.timestamp.into())),
);
});
let ckb_client = CkbRpcAsyncClient::new(&rpc_url);
let _ = reply_port.send(
ckb_client
.get_header(block_hash.into())
.await
.map(|x| x.map(|x| x.inner.timestamp.into())),
);
}
CkbChainMessage::Stop => {
myself.stop(Some("stop received".to_string()));
Expand Down
111 changes: 92 additions & 19 deletions src/ckb/funding/funding_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use ckb_sdk::{
traits::{
CellCollector, CellDepResolver, CellQueryOptions, DefaultCellCollector,
DefaultCellDepResolver, DefaultHeaderDepResolver, DefaultTransactionDependencyProvider,
HeaderDepResolver, SecpCkbRawKeySigner, TransactionDependencyProvider, ValueRangeOption,
HeaderDepResolver, SecpCkbRawKeySigner, TransactionDependencyError,
TransactionDependencyProvider, ValueRangeOption,
},
tx_builder::{unlock_tx, CapacityBalancer, TxBuilder, TxBuilderError},
unlock::{ScriptUnlocker, SecpSighashUnlocker},
CkbRpcClient, ScriptId,
tx_builder::{CapacityBalancer, ScriptGroups, TxBuilder, TxBuilderError},
unlock::{ScriptUnlocker, SecpSighashUnlocker, UnlockError},
CkbRpcAsyncClient, ScriptGroup, ScriptId,
};
use ckb_types::{
core::{BlockView, Capacity, TransactionView},
Expand Down Expand Up @@ -353,7 +354,7 @@ impl FundingTxBuilder {
)));
}

fn build(
async fn build(
self,
live_cells_exclusion_map: &mut LiveCellsExclusionMap,
) -> Result<FundingTx, FundingError> {
Expand All @@ -379,9 +380,10 @@ impl FundingTxBuilder {
self.request.funding_fee_rate,
);

let ckb_client = CkbRpcClient::new(&self.context.rpc_url);
let ckb_client = CkbRpcAsyncClient::new(&self.context.rpc_url);
let cell_dep_resolver = ckb_client
.get_block_by_number(0.into())
.await
.map_err(FundingError::CkbRpcError)?
.and_then(|genesis_block| {
DefaultCellDepResolver::from_genesis(&BlockView::from(genesis_block)).ok()
Expand All @@ -394,20 +396,22 @@ impl FundingTxBuilder {
let mut cell_collector = DefaultCellCollector::new(&self.context.rpc_url);
let tx_dep_provider = DefaultTransactionDependencyProvider::new(&self.context.rpc_url, 10);

let tip_block_number: u64 = ckb_client.get_tip_block_number()?.into();
let tip_block_number: u64 = ckb_client.get_tip_block_number().await?.into();
live_cells_exclusion_map.truncate(tip_block_number);
live_cells_exclusion_map
.apply(&mut cell_collector)
.map_err(|err| FundingError::CkbTxBuilderError(TxBuilderError::Other(err.into())))?;

let (tx, _) = self.build_unlocked(
&mut cell_collector,
&cell_dep_resolver,
&header_dep_resolver,
&tx_dep_provider,
&balancer,
&unlockers,
)?;
let tx = self
.build_balanced_async(
&mut cell_collector,
&cell_dep_resolver,
&header_dep_resolver,
&tx_dep_provider,
&balancer,
&unlockers,
)
.await?;

let old_tx_hash = self.funding_tx.tx.as_ref().map(|tx| tx.hash());
let mut funding_tx = self.funding_tx;
Expand Down Expand Up @@ -443,7 +447,7 @@ impl FundingTx {
self.tx
}

pub fn fulfill(
pub async fn fulfill(
self,
request: FundingRequest,
context: FundingContext,
Expand All @@ -454,10 +458,10 @@ impl FundingTx {
request,
context,
};
builder.build(live_cells_exclusion_map)
builder.build(live_cells_exclusion_map).await
}

pub fn sign(
pub async fn sign(
mut self,
secret_key: secp256k1::SecretKey,
rpc_url: String,
Expand Down Expand Up @@ -487,7 +491,7 @@ impl FundingTx {
let tx = self.take().ok_or(FundingError::AbsentTx)?;
let tx_dep_provider = DefaultTransactionDependencyProvider::new(&rpc_url, 10);

let (tx, _) = unlock_tx(tx, &tx_dep_provider, &unlockers)?;
let (tx, _) = unlock_tx(tx, &tx_dep_provider, &unlockers).await?;
self.update_for_self(tx)?;
Ok(self)
}
Expand All @@ -504,3 +508,72 @@ impl FundingTx {
Ok(())
}
}

async fn unlock_tx(
balanced_tx: TransactionView,
tx_dep_provider: &dyn TransactionDependencyProvider,
unlockers: &HashMap<ScriptId, Box<dyn ScriptUnlocker>>,
) -> Result<(TransactionView, Vec<ScriptGroup>), UnlockError> {
let ScriptGroups { lock_groups, .. } = gen_script_groups(&balanced_tx, tx_dep_provider).await?;
let mut tx = balanced_tx;
let mut not_unlocked = Vec::new();
for script_group in lock_groups.values() {
let script_id = ScriptId::from(&script_group.script);
let script_args = script_group.script.args().raw_data();
if let Some(unlocker) = unlockers.get(&script_id) {
if unlocker
.is_unlocked_async(&tx, script_group, tx_dep_provider)
.await?
{
tx = unlocker.clear_placeholder_witness(&tx, script_group)?;
} else if unlocker.match_args(script_args.as_ref()) {
tx = unlocker
.unlock_async(&tx, script_group, tx_dep_provider)
.await?;
} else {
not_unlocked.push(script_group.clone());
}
} else {
not_unlocked.push(script_group.clone());
}
}
Ok((tx, not_unlocked))
}

async fn gen_script_groups(
tx: &TransactionView,
tx_dep_provider: &dyn TransactionDependencyProvider,
) -> Result<ScriptGroups, TransactionDependencyError> {
use ckb_types::packed::Byte32;
#[allow(clippy::mutable_key_type)]
let mut lock_groups: HashMap<Byte32, ScriptGroup> = HashMap::default();
#[allow(clippy::mutable_key_type)]
let mut type_groups: HashMap<Byte32, ScriptGroup> = HashMap::default();
for (i, input) in tx.inputs().into_iter().enumerate() {
let output = tx_dep_provider
.get_cell_async(&input.previous_output())
.await?;
let lock_group_entry = lock_groups
.entry(output.calc_lock_hash())
.or_insert_with(|| ScriptGroup::from_lock_script(&output.lock()));
lock_group_entry.input_indices.push(i);
if let Some(t) = &output.type_().to_opt() {
let type_group_entry = type_groups
.entry(t.calc_script_hash())
.or_insert_with(|| ScriptGroup::from_type_script(t));
type_group_entry.input_indices.push(i);
}
}
for (i, output) in tx.outputs().into_iter().enumerate() {
if let Some(t) = &output.type_().to_opt() {
let type_group_entry = type_groups
.entry(t.calc_script_hash())
.or_insert_with(|| ScriptGroup::from_type_script(t));
type_group_entry.output_indices.push(i);
}
}
Ok(ScriptGroups {
lock_groups,
type_groups,
})
}
Loading
Loading