diff --git a/Cargo.lock b/Cargo.lock index dd27b4b5e..06906ea2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -979,9 +979,9 @@ dependencies = [ [[package]] name = "ckb-sdk" -version = "3.6.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "781bd01a7f8880177caad556e7df407b8a6ff428ec7037b01e7c7ac7b2f68e42" +checksum = "d855c4850d01d5d1cacf932fae8c9f56cdc56b8bf4f7f82a80ac82631f05b561" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index e1c550e7f..a9273acdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ bech32 = "0.9.1" rand = "0.8.5" serde_json = { version = "1.0" } home = "0.5.9" -ckb-sdk = "3.6" +ckb-sdk = "3.7" thiserror = "1.0.58" anyhow = "1.0.81" tentacle = { version = "0.6.6", default-features = false, features = ["upnp", "parking_lot", "openssl-vendored", "tokio-runtime", "tokio-timer", "ws"] } diff --git a/src/ckb/actor.rs b/src/ckb/actor.rs index 7b7cf4fd9..3dd981d22 100644 --- a/src/ckb/actor.rs +++ b/src/ckb/actor.rs @@ -1,4 +1,4 @@ -use ckb_sdk::{CkbRpcClient, RpcError}; +use ckb_sdk::{CkbRpcAsyncClient, RpcError}; use ckb_types::{ core::{tx_pool::TxStatus, TransactionView}, packed, @@ -160,13 +160,11 @@ impl Actor for CkbChainActor { if !reply_port.is_closed() { let context = state.build_funding_context(&request); let exclusion = &mut state.live_cells_exclusion_map; - tokio::task::block_in_place(move || { - let result = tx.fulfill(request, context, exclusion); - if !reply_port.is_closed() { - // ignore error - let _ = reply_port.send(result); - } - }); + let result = tx.fulfill(request, context, exclusion).await; + if !reply_port.is_closed() { + // ignore error + let _ = reply_port.send(result); + } } } CkbChainMessage::AddFundingTx(tx) => { @@ -184,62 +182,56 @@ impl Actor for CkbChainActor { if !reply_port.is_closed() { let secret_key = state.secret_key; let rpc_url = state.config.rpc_url.clone(); - tokio::task::block_in_place(move || { - let result = tx.sign(secret_key, rpc_url); - if !reply_port.is_closed() { - // ignore error - let _ = reply_port.send(result); - } - }); + let result = tx.sign(secret_key, rpc_url).await; + if !reply_port.is_closed() { + // ignore error + let _ = reply_port.send(result); + } } } CkbChainMessage::SendTx(tx, reply_port) => { let rpc_url = state.config.rpc_url.clone(); - tokio::task::block_in_place(move || { - let ckb_client = CkbRpcClient::new(&rpc_url); - let result = match ckb_client.send_transaction(tx.data().into(), None) { - Ok(_) => Ok(()), - Err(err) => { - //FIXME(yukang): RBF or duplicated transaction handling - match err { - RpcError::Rpc(e) - if (e.code.code() == -1107 || e.code.code() == -1111) => - { - tracing::warn!( - "[{}] transaction { } already in pool", - myself.get_name().unwrap_or_default(), - tx.hash(), - ); - Ok(()) - } - _ => { - tracing::error!( - "[{}] send transaction {} failed: {:?}", - myself.get_name().unwrap_or_default(), - tx.hash(), - err - ); - Err(err) - } + let ckb_client = CkbRpcAsyncClient::new(&rpc_url); + let result = match ckb_client.send_transaction(tx.data().into(), None).await { + Ok(_) => Ok(()), + Err(err) => { + //FIXME(yukang): RBF or duplicated transaction handling + match err { + RpcError::Rpc(e) + if (e.code.code() == -1107 || e.code.code() == -1111) => + { + tracing::warn!( + "[{}] transaction { } already in pool", + myself.get_name().unwrap_or_default(), + tx.hash(), + ); + Ok(()) + } + _ => { + tracing::error!( + "[{}] send transaction {} failed: {:?}", + myself.get_name().unwrap_or_default(), + tx.hash(), + err + ); + Err(err) } } - }; - if !reply_port.is_closed() { - // ignore error - let _ = reply_port.send(result); } - }); + }; + if !reply_port.is_closed() { + // ignore error + let _ = reply_port.send(result); + } } CkbChainMessage::GetTx(tx_hash, reply_port) => { let rpc_url = state.config.rpc_url.clone(); - tokio::task::block_in_place(move || { - let ckb_client = CkbRpcClient::new(&rpc_url); - let result = ckb_client.get_transaction(tx_hash.into()); - if !reply_port.is_closed() { - // ignore error - let _ = reply_port.send(result.map(Into::into)); - } - }); + let ckb_client = CkbRpcAsyncClient::new(&rpc_url); + let result = ckb_client.get_transaction(tx_hash.into()).await; + if !reply_port.is_closed() { + // ignore error + let _ = reply_port.send(result.map(Into::into)); + } } CkbChainMessage::CreateTxTracer(tracer) => { debug!( @@ -262,14 +254,13 @@ impl Actor for CkbChainActor { reply_port, ) => { let rpc_url = state.config.rpc_url.clone(); - tokio::task::block_in_place(move || { - let ckb_client = CkbRpcClient::new(&rpc_url); - let _ = reply_port.send( - ckb_client - .get_header(block_hash.into()) - .map(|x| x.map(|x| x.inner.timestamp.into())), - ); - }); + let ckb_client = CkbRpcAsyncClient::new(&rpc_url); + let _ = reply_port.send( + ckb_client + .get_header(block_hash.into()) + .await + .map(|x| x.map(|x| x.inner.timestamp.into())), + ); } CkbChainMessage::Stop => { myself.stop(Some("stop received".to_string())); diff --git a/src/ckb/funding/funding_tx.rs b/src/ckb/funding/funding_tx.rs index 812ba0de5..33fbc4d2c 100644 --- a/src/ckb/funding/funding_tx.rs +++ b/src/ckb/funding/funding_tx.rs @@ -7,11 +7,12 @@ use ckb_sdk::{ traits::{ CellCollector, CellDepResolver, CellQueryOptions, DefaultCellCollector, DefaultCellDepResolver, DefaultHeaderDepResolver, DefaultTransactionDependencyProvider, - HeaderDepResolver, SecpCkbRawKeySigner, TransactionDependencyProvider, ValueRangeOption, + HeaderDepResolver, SecpCkbRawKeySigner, TransactionDependencyError, + TransactionDependencyProvider, ValueRangeOption, }, - tx_builder::{unlock_tx, CapacityBalancer, TxBuilder, TxBuilderError}, - unlock::{ScriptUnlocker, SecpSighashUnlocker}, - CkbRpcClient, ScriptId, + tx_builder::{CapacityBalancer, ScriptGroups, TxBuilder, TxBuilderError}, + unlock::{ScriptUnlocker, SecpSighashUnlocker, UnlockError}, + CkbRpcAsyncClient, ScriptGroup, ScriptId, }; use ckb_types::{ core::{BlockView, Capacity, TransactionView}, @@ -353,7 +354,7 @@ impl FundingTxBuilder { ))); } - fn build( + async fn build( self, live_cells_exclusion_map: &mut LiveCellsExclusionMap, ) -> Result { @@ -379,9 +380,10 @@ impl FundingTxBuilder { self.request.funding_fee_rate, ); - let ckb_client = CkbRpcClient::new(&self.context.rpc_url); + let ckb_client = CkbRpcAsyncClient::new(&self.context.rpc_url); let cell_dep_resolver = ckb_client .get_block_by_number(0.into()) + .await .map_err(FundingError::CkbRpcError)? .and_then(|genesis_block| { DefaultCellDepResolver::from_genesis(&BlockView::from(genesis_block)).ok() @@ -394,20 +396,22 @@ impl FundingTxBuilder { let mut cell_collector = DefaultCellCollector::new(&self.context.rpc_url); let tx_dep_provider = DefaultTransactionDependencyProvider::new(&self.context.rpc_url, 10); - let tip_block_number: u64 = ckb_client.get_tip_block_number()?.into(); + let tip_block_number: u64 = ckb_client.get_tip_block_number().await?.into(); live_cells_exclusion_map.truncate(tip_block_number); live_cells_exclusion_map .apply(&mut cell_collector) .map_err(|err| FundingError::CkbTxBuilderError(TxBuilderError::Other(err.into())))?; - let (tx, _) = self.build_unlocked( - &mut cell_collector, - &cell_dep_resolver, - &header_dep_resolver, - &tx_dep_provider, - &balancer, - &unlockers, - )?; + let tx = self + .build_balanced_async( + &mut cell_collector, + &cell_dep_resolver, + &header_dep_resolver, + &tx_dep_provider, + &balancer, + &unlockers, + ) + .await?; let old_tx_hash = self.funding_tx.tx.as_ref().map(|tx| tx.hash()); let mut funding_tx = self.funding_tx; @@ -443,7 +447,7 @@ impl FundingTx { self.tx } - pub fn fulfill( + pub async fn fulfill( self, request: FundingRequest, context: FundingContext, @@ -454,10 +458,10 @@ impl FundingTx { request, context, }; - builder.build(live_cells_exclusion_map) + builder.build(live_cells_exclusion_map).await } - pub fn sign( + pub async fn sign( mut self, secret_key: secp256k1::SecretKey, rpc_url: String, @@ -487,7 +491,7 @@ impl FundingTx { let tx = self.take().ok_or(FundingError::AbsentTx)?; let tx_dep_provider = DefaultTransactionDependencyProvider::new(&rpc_url, 10); - let (tx, _) = unlock_tx(tx, &tx_dep_provider, &unlockers)?; + let (tx, _) = unlock_tx(tx, &tx_dep_provider, &unlockers).await?; self.update_for_self(tx)?; Ok(self) } @@ -504,3 +508,72 @@ impl FundingTx { Ok(()) } } + +async fn unlock_tx( + balanced_tx: TransactionView, + tx_dep_provider: &dyn TransactionDependencyProvider, + unlockers: &HashMap>, +) -> Result<(TransactionView, Vec), UnlockError> { + let ScriptGroups { lock_groups, .. } = gen_script_groups(&balanced_tx, tx_dep_provider).await?; + let mut tx = balanced_tx; + let mut not_unlocked = Vec::new(); + for script_group in lock_groups.values() { + let script_id = ScriptId::from(&script_group.script); + let script_args = script_group.script.args().raw_data(); + if let Some(unlocker) = unlockers.get(&script_id) { + if unlocker + .is_unlocked_async(&tx, script_group, tx_dep_provider) + .await? + { + tx = unlocker.clear_placeholder_witness(&tx, script_group)?; + } else if unlocker.match_args(script_args.as_ref()) { + tx = unlocker + .unlock_async(&tx, script_group, tx_dep_provider) + .await?; + } else { + not_unlocked.push(script_group.clone()); + } + } else { + not_unlocked.push(script_group.clone()); + } + } + Ok((tx, not_unlocked)) +} + +async fn gen_script_groups( + tx: &TransactionView, + tx_dep_provider: &dyn TransactionDependencyProvider, +) -> Result { + use ckb_types::packed::Byte32; + #[allow(clippy::mutable_key_type)] + let mut lock_groups: HashMap = HashMap::default(); + #[allow(clippy::mutable_key_type)] + let mut type_groups: HashMap = HashMap::default(); + for (i, input) in tx.inputs().into_iter().enumerate() { + let output = tx_dep_provider + .get_cell_async(&input.previous_output()) + .await?; + let lock_group_entry = lock_groups + .entry(output.calc_lock_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&output.lock())); + lock_group_entry.input_indices.push(i); + if let Some(t) = &output.type_().to_opt() { + let type_group_entry = type_groups + .entry(t.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_type_script(t)); + type_group_entry.input_indices.push(i); + } + } + for (i, output) in tx.outputs().into_iter().enumerate() { + if let Some(t) = &output.type_().to_opt() { + let type_group_entry = type_groups + .entry(t.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_type_script(t)); + type_group_entry.output_indices.push(i); + } + } + Ok(ScriptGroups { + lock_groups, + type_groups, + }) +} diff --git a/src/watchtower/actor.rs b/src/watchtower/actor.rs index e9156890a..04dd0a74d 100644 --- a/src/watchtower/actor.rs +++ b/src/watchtower/actor.rs @@ -6,7 +6,7 @@ use ckb_sdk::{ traits::{CellCollector, CellQueryOptions, DefaultCellCollector, ValueRangeOption}, transaction::builder::FeeCalculator, util::blake160, - CkbRpcClient, RpcError, Since, SinceType, + CkbRpcAsyncClient, RpcError, Since, SinceType, }; use ckb_types::{ self, @@ -131,7 +131,7 @@ where } } } - WatchtowerMessage::PeriodicCheck => self.periodic_check(state), + WatchtowerMessage::PeriodicCheck => self.periodic_check(state).await, } Ok(()) } @@ -141,100 +141,100 @@ impl WatchtowerActor where S: InvoiceStore + WatchtowerStore, { - fn periodic_check(&self, state: &WatchtowerState) { + async fn periodic_check(&self, state: &WatchtowerState) { let secret_key = state.secret_key; let rpc_url = state.config.rpc_url.clone(); - tokio::task::block_in_place(move || { - let mut cell_collector = DefaultCellCollector::new(&rpc_url); + let mut cell_collector = DefaultCellCollector::new(&rpc_url); - for channel_data in self.store.get_watch_channels() { - let ckb_client = CkbRpcClient::new(&rpc_url); - let search_key = SearchKey { - script: channel_data.funding_tx_lock.clone().into(), - script_type: ScriptType::Lock, - script_search_mode: Some(SearchMode::Exact), - with_data: None, - filter: None, - group_by_transaction: None, - }; - // we need two parties' signatures to unlock the funding tx, so we can check the last one transaction only to see if it's an old version commitment tx - match ckb_client.get_transactions(search_key, Order::Desc, 1u32.into(), None) { - Ok(txs) => { - if let Some(Tx::Ungrouped(tx)) = txs.objects.first() { - if matches!(tx.io_type, CellType::Input) { - match ckb_client.get_transaction(tx.tx_hash.clone()) { - Ok(Some(tx_with_status)) => { - if tx_with_status.tx_status.status != Status::Committed { - error!("Cannot find the commitment tx: {:?}, status is {:?}, maybe ckb indexer bug?", tx_with_status.tx_status.status, tx.tx_hash); - } else if let Some(tx) = tx_with_status.transaction { - match tx.inner { - Either::Left(tx) => { - let tx: Transaction = tx.inner.into(); - if tx.raw().outputs().len() == 1 { - let output = tx - .raw() - .outputs() - .get(0) - .expect("get output 0 of tx"); - let commitment_lock = output.lock(); - let lock_args = - commitment_lock.args().raw_data(); - let pub_key_hash: [u8; 20] = lock_args - [0..20] + for channel_data in self.store.get_watch_channels() { + let ckb_client = CkbRpcAsyncClient::new(&rpc_url); + let search_key = SearchKey { + script: channel_data.funding_tx_lock.clone().into(), + script_type: ScriptType::Lock, + script_search_mode: Some(SearchMode::Exact), + with_data: None, + filter: None, + group_by_transaction: None, + }; + // we need two parties' signatures to unlock the funding tx, so we can check the last one transaction only to see if it's an old version commitment tx + match ckb_client + .get_transactions(search_key, Order::Desc, 1u32.into(), None) + .await + { + Ok(txs) => { + if let Some(Tx::Ungrouped(tx)) = txs.objects.first() { + if matches!(tx.io_type, CellType::Input) { + match ckb_client.get_transaction(tx.tx_hash.clone()).await { + Ok(Some(tx_with_status)) => { + if tx_with_status.tx_status.status != Status::Committed { + error!("Cannot find the commitment tx: {:?}, status is {:?}, maybe ckb indexer bug?", tx_with_status.tx_status.status, tx.tx_hash); + } else if let Some(tx) = tx_with_status.transaction { + match tx.inner { + Either::Left(tx) => { + let tx: Transaction = tx.inner.into(); + if tx.raw().outputs().len() == 1 { + let output = tx + .raw() + .outputs() + .get(0) + .expect("get output 0 of tx"); + let commitment_lock = output.lock(); + let lock_args = + commitment_lock.args().raw_data(); + let pub_key_hash: [u8; 20] = lock_args[0..20] + .try_into() + .expect("checked length"); + let commitment_number = u64::from_be_bytes( + lock_args[28..36] .try_into() - .expect("checked length"); - let commitment_number = u64::from_be_bytes( - lock_args[28..36] - .try_into() - .expect("u64 from slice"), - ); + .expect("u64 from slice"), + ); - if blake160( - &channel_data - .remote_settlement_data - .x_only_aggregated_pubkey, - ) - .0 == pub_key_hash - { - match channel_data - .revocation_data - .clone() + if blake160( + &channel_data + .remote_settlement_data + .x_only_aggregated_pubkey, + ) + .0 == pub_key_hash + { + match channel_data.revocation_data.clone() { + Some(revocation_data) + if revocation_data + .commitment_number + >= commitment_number => { - Some(revocation_data) - if revocation_data - .commitment_number - >= commitment_number => - { - let commitment_tx_out_point = - OutPoint::new( - tx.calc_tx_hash(), - 0, - ); - match ckb_client.get_live_cell( + let commitment_tx_out_point = + OutPoint::new( + tx.calc_tx_hash(), + 0, + ); + match ckb_client + .get_live_cell( commitment_tx_out_point .clone() .into(), false, - ) { - Ok(cell_with_status) => { - if cell_with_status - .status - == "live" - { - warn!("Found an old version commitment tx submitted by remote: {:#x}", tx.calc_tx_hash()); - match build_revocation_tx( + ) + .await + { + Ok(cell_with_status) => { + if cell_with_status.status + == "live" + { + warn!("Found an old version commitment tx submitted by remote: {:#x}", tx.calc_tx_hash()); + match build_revocation_tx( commitment_tx_out_point, revocation_data, secret_key, &mut cell_collector, - ) { + ).await { Ok(tx) => { match ckb_client .send_transaction( tx.data() .into(), None, - ) { + ).await { Ok(tx_hash) => { info!("Revocation tx: {:?} sent, tx_hash: {:?}", tx, tx_hash); } @@ -247,81 +247,80 @@ where error!("Failed to build revocation tx: {:?}", err); } } - } - } - Err(err) => { - error!("Failed to get live cell: {:?}", err); } } - } - _ => { - try_settle_commitment_tx( - commitment_lock, - ckb_client, - channel_data - .remote_settlement_data - .clone(), - true, - secret_key, - &mut cell_collector, - &self.store, - ); + Err(err) => { + error!("Failed to get live cell: {:?}", err); + } } } - } else { - try_settle_commitment_tx( - commitment_lock, - ckb_client, - channel_data - .local_settlement_data - .clone() - .expect( - "local settlement data", - ), - false, - secret_key, - &mut cell_collector, - &self.store, - ); + _ => { + try_settle_commitment_tx( + commitment_lock, + ckb_client, + channel_data + .remote_settlement_data + .clone(), + true, + secret_key, + &mut cell_collector, + &self.store, + ) + .await; + } } } else { - // there may be a race condition that PeriodicCheck is triggered before the remove_channel fn is called - // it's a close channel tx, ignore + try_settle_commitment_tx( + commitment_lock, + ckb_client, + channel_data + .local_settlement_data + .clone() + .expect("local settlement data"), + false, + secret_key, + &mut cell_collector, + &self.store, + ) + .await; } + } else { + // there may be a race condition that PeriodicCheck is triggered before the remove_channel fn is called + // it's a close channel tx, ignore } - Either::Right(_tx) => { - // unreachable, ignore - } } - } else { - error!("Cannot find the commitment tx: {:?}, transaction is none, maybe ckb indexer bug?", tx.tx_hash); + Either::Right(_tx) => { + // unreachable, ignore + } } + } else { + error!("Cannot find the commitment tx: {:?}, transaction is none, maybe ckb indexer bug?", tx.tx_hash); } - Ok(None) => { - error!("Cannot find the commitment tx: {:?}, maybe ckb indexer bug?", tx.tx_hash); - } - Err(err) => { - error!("Failed to get funding tx: {:?}", err); - } + } + Ok(None) => { + error!("Cannot find the commitment tx: {:?}, maybe ckb indexer bug?", tx.tx_hash); + } + Err(err) => { + error!("Failed to get funding tx: {:?}", err); } } } } - Err(err) => { - error!("Failed to get transactions: {:?}", err); - } + } + Err(err) => { + error!("Failed to get transactions: {:?}", err); } } - }); + } } } -fn build_revocation_tx( +async fn build_revocation_tx( commitment_tx_out_point: OutPoint, revocation_data: RevocationData, secret_key: SecretKey, cell_collector: &mut DefaultCellCollector, -) -> Result> { +) -> Result { let witness = [ XUDT_COMPATIBLE_WITNESS.to_vec(), vec![0xFF], @@ -377,7 +376,9 @@ fn build_revocation_tx( query.secondary_script_len_range = Some(ValueRangeOption::new_exact(0)); query.data_len_range = Some(ValueRangeOption::new_exact(0)); query.min_total_capacity = min_total_capacity; - let (cells, total_capacity) = cell_collector.collect_live_cells(&query, false)?; + let (cells, total_capacity) = cell_collector + .collect_live_cells_async(&query, false) + .await?; debug!( "cells len: {}, total_capacity: {}", cells.len(), @@ -413,12 +414,12 @@ fn build_revocation_tx( } } - Err(Box::new(RpcError::Other(anyhow!("Not enough capacity")))) + Err(RpcError::Other(anyhow!("Not enough capacity")).into()) } -fn try_settle_commitment_tx( +async fn try_settle_commitment_tx( commitment_lock: Script, - ckb_client: CkbRpcClient, + ckb_client: CkbRpcAsyncClient, settlement_data: SettlementData, for_remote: bool, secret_key: SecretKey, @@ -439,10 +440,13 @@ fn try_settle_commitment_tx( group_by_transaction: Some(true), }; - find_preimages(search_key.clone(), &ckb_client, store); + find_preimages(search_key.clone(), &ckb_client, store).await; - let (current_epoch, current_time) = match ckb_client.get_tip_header() { - Ok(tip_header) => match ckb_client.get_block_median_time(tip_header.hash.clone()) { + let (current_epoch, current_time) = match ckb_client.get_tip_header().await { + Ok(tip_header) => match ckb_client + .get_block_median_time(tip_header.hash.clone()) + .await + { Ok(Some(median_time)) => { let tip_header: HeaderView = tip_header.into(); let epoch = tip_header.epoch(); @@ -469,12 +473,15 @@ fn try_settle_commitment_tx( // however, an attacker may create a lot of cells to implement a tx pinning attack, we have to use loop to get all cells let mut after = None; loop { - match ckb_client.get_cells( - search_key.clone(), - Order::Desc, - 100u32.into(), - after.clone(), - ) { + match ckb_client + .get_cells( + search_key.clone(), + Order::Desc, + 100u32.into(), + after.clone(), + ) + .await + { Ok(cells) => { if cells.objects.is_empty() { break; @@ -510,7 +517,7 @@ fn try_settle_commitment_tx( } let cell_header: HeaderView = - match ckb_client.get_header_by_number(cell.block_number) { + match ckb_client.get_header_by_number(cell.block_number).await { Ok(Some(header)) => header.into(), Ok(None) => { error!("Cannot find header: {}", cell.block_number); @@ -527,7 +534,7 @@ fn try_settle_commitment_tx( "Found a force closed commitment tx with pending tlcs: {:#x}", commitment_tx_hash ); - match ckb_client.get_transaction(commitment_tx_hash.clone()) { + match ckb_client.get_transaction(commitment_tx_hash.clone()).await { Ok(Some(tx_with_status)) => { if tx_with_status.tx_status.status != Status::Committed { error!("Cannot find the commitment tx: {:?}, status is {:?}, maybe ckb indexer bug?", tx_with_status.tx_status.status, commitment_tx_hash); @@ -584,9 +591,12 @@ fn try_settle_commitment_tx( current_time, current_epoch, store, - ) { + ) + .await + { Ok(Some(tx)) => match ckb_client .send_transaction(tx.data().into(), None) + .await { Ok(tx_hash) => { info!("Settlement tx for pending tlcs: {:?} sent, tx_hash: {:#x}", tx, tx_hash); @@ -638,22 +648,26 @@ fn try_settle_commitment_tx( settlement_data.clone(), secret_key, cell_collector, - ) { - Ok(tx) => match ckb_client.send_transaction(tx.data().into(), None) - { - Ok(tx_hash) => { - info!( - "Settlement tx: {:?} sent, tx_hash: {:#x}", - tx, tx_hash - ); - } - Err(err) => { - error!( - "Failed to send settlement tx: {:?}, error: {:?}", - tx, err - ); + ) + .await + { + Ok(tx) => { + match ckb_client.send_transaction(tx.data().into(), None).await + { + Ok(tx_hash) => { + info!( + "Settlement tx: {:?} sent, tx_hash: {:#x}", + tx, tx_hash + ); + } + Err(err) => { + error!( + "Failed to send settlement tx: {:?}, error: {:?}", + tx, err + ); + } } - }, + } Err(err) => { error!("Failed to build settlement tx: {:?}", err); } @@ -670,22 +684,29 @@ fn try_settle_commitment_tx( } // find all on-chain transactions with the preimage and store them -fn find_preimages(search_key: SearchKey, ckb_client: &CkbRpcClient, store: &S) { +async fn find_preimages( + search_key: SearchKey, + ckb_client: &CkbRpcAsyncClient, + store: &S, +) { let mut after = None; loop { - match ckb_client.get_transactions( - search_key.clone(), - Order::Desc, - 100u32.into(), - after.clone(), - ) { + match ckb_client + .get_transactions( + search_key.clone(), + Order::Desc, + 100u32.into(), + after.clone(), + ) + .await + { Ok(txs) => { if txs.objects.is_empty() { break; } after = Some(txs.last_cursor.clone()); for tx in txs.objects { - match ckb_client.get_transaction(tx.tx_hash()) { + match ckb_client.get_transaction(tx.tx_hash()).await { Ok(Some(tx_with_status)) => { if tx_with_status.tx_status.status != Status::Committed { error!("Cannot find the tx: {:?}, status is {:?}, maybe ckb indexer bug?", tx_with_status.tx_status.status, tx.tx_hash()); @@ -753,13 +774,13 @@ fn find_preimages(search_key: SearchKey, ckb_client: &CkbRpcCli } } -fn build_settlement_tx( +async fn build_settlement_tx( commitment_tx_out_point: OutPoint, since: u64, settlement_data: SettlementData, secret_key: SecretKey, cell_collector: &mut DefaultCellCollector, -) -> Result> { +) -> Result { let pubkey = PublicKey::from_secret_key(&Secp256k1::new(), &secret_key); let args = blake160(pubkey.serialize().as_ref()); let fee_provider_lock_script = get_script_by_contract(Contract::Secp256k1Lock, args.as_bytes()); @@ -822,7 +843,9 @@ fn build_settlement_tx( query.secondary_script_len_range = Some(ValueRangeOption::new_exact(0)); query.data_len_range = Some(ValueRangeOption::new_exact(0)); query.min_total_capacity = min_total_capacity; - let (cells, total_capacity) = cell_collector.collect_live_cells(&query, true)?; + let (cells, total_capacity) = cell_collector + .collect_live_cells_async(&query, true) + .await?; debug!( "cells len: {}, total_capacity: {}", cells.len(), @@ -856,13 +879,10 @@ fn build_settlement_tx( } } - Err(Box::new(RpcError::Other(anyhow!("Not enough capacity")))) + Err(RpcError::Other(anyhow!("Not enough capacity")).into()) } -fn sign_tx( - tx: TransactionView, - secret_key: SecretKey, -) -> Result> { +fn sign_tx(tx: TransactionView, secret_key: SecretKey) -> Result { let tx = tx.data(); let witness = tx.witnesses().get(1).expect("get witness at index 1"); let mut blake2b = new_blake2b(); @@ -894,7 +914,7 @@ fn sign_tx_with_settlement( tx: TransactionView, change_secret_key: SecretKey, settlement_secret_key: SecretKey, -) -> Result> { +) -> Result { let tx = tx.data().into_view(); let message = compute_tx_message(&tx); @@ -940,7 +960,7 @@ fn sign_tx_with_settlement( } #[allow(clippy::too_many_arguments)] -fn build_settlement_tx_for_pending_tlcs( +async fn build_settlement_tx_for_pending_tlcs( commitment_tx_cell: Cell, cell_header: HeaderView, delay_epoch: EpochNumberWithFraction, @@ -952,7 +972,7 @@ fn build_settlement_tx_for_pending_tlcs( current_time: u64, current_epoch: EpochNumberWithFraction, store: &S, -) -> Result, Box> { +) -> Result, anyhow::Error> { let settlement_tlc: Option<(usize, SettlementTlc, Option)> = match pending_tlcs.clone() { Some(pending_tlcs) => pending_tlcs @@ -1144,7 +1164,9 @@ fn build_settlement_tx_for_pending_tlcs( if min_total_capacity > 0 { query.min_total_capacity = min_total_capacity; } - let (cells, total_capacity) = cell_collector.collect_live_cells(&query, false)?; + let (cells, total_capacity) = cell_collector + .collect_live_cells_async(&query, false) + .await?; debug!( "cells len: {}, total_capacity: {}", cells.len(), @@ -1182,7 +1204,7 @@ fn build_settlement_tx_for_pending_tlcs( } } - Err(Box::new(RpcError::Other(anyhow!("Not enough capacity")))) + Err(RpcError::Other(anyhow!("Not enough capacity")).into()) } else { let amount = u128::from_le_bytes( commitment_tx_cell.output_data.unwrap().as_bytes()[0..16] @@ -1266,7 +1288,9 @@ fn build_settlement_tx_for_pending_tlcs( query.secondary_script_len_range = Some(ValueRangeOption::new_exact(0)); query.data_len_range = Some(ValueRangeOption::new_exact(0)); query.min_total_capacity = min_total_capacity; - let (cells, total_capacity) = cell_collector.collect_live_cells(&query, false)?; + let (cells, total_capacity) = cell_collector + .collect_live_cells_async(&query, false) + .await?; debug!( "cells len: {}, total_capacity: {}", cells.len(), @@ -1304,7 +1328,7 @@ fn build_settlement_tx_for_pending_tlcs( } } - Err(Box::new(RpcError::Other(anyhow!("Not enough capacity")))) + Err(RpcError::Other(anyhow!("Not enough capacity")).into()) } } else if cell_header.epoch().to_rational() + delay_epoch.to_rational() > current_epoch.to_rational() @@ -1331,7 +1355,8 @@ fn build_settlement_tx_for_pending_tlcs( settlement_data, secret_key, cell_collector, - )?; + ) + .await?; Ok(Some(tx)) } }