Skip to content

Commit 08c403e

Browse files
committed
feat(eth): Implement transaction replacement (speed up/cancel)
This commit introduces the ability to replace pending Ethereum-based transactions. Users can now use the `replace_transaction` RPC to either "speed up" a transaction by re-broadcasting it with a higher fee, or "cancel" it by sending a zero-value transaction to themselves with the same nonce. Key changes include: - A new `replace_transaction` method on the `MmCoin` trait, with a concrete implementation for `EthCoin`. - The `EthWithdraw` logic has been updated to allow for nonce overriding, which is essential for replacing transactions. - An in-memory `local_tx_cache` has been added to `EthCoin` to enable replacement of transactions that have not yet been propagated to RPC nodes. - A new `replace_transaction` RPC endpoint is exposed through the dispatcher. refactor(coins): Centralize HD address scanning logic As part of this work, the `scan_for_new_addresses` function has been refactored from a required trait method into a default implementation on the `HDWalletBalanceOps` trait. This change simplifies the codebase by removing redundant address scanning logic from multiple UTXO-based coins (`BchCoin`, `QtumCoin`, `UtxoStandardCoin`) and `EthCoin`, making them rely on the new generic implementation. This also involved adding a `bip44_chains` method to specify which chains to scan.
1 parent 5c8a4b1 commit 08c403e

15 files changed

Lines changed: 494 additions & 216 deletions

File tree

mm2src/coins/coin_balance.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,15 +276,93 @@ pub trait HDWalletBalanceOps: HDWalletCoinOps {
276276
where
277277
XPubExtractor: HDXPubExtractor + Send;
278278

279-
/// Scans for the new addresses of the specified `hd_account` using the given `address_scanner`.
280-
/// Returns balances of the new addresses.
279+
/// Scans for new, used addresses for the specified `hd_account` and returns their balances.
280+
///
281+
/// The function begins scanning from the last known address index, using the provided
282+
/// `address_scanner` to check for any transaction history. This process continues until
283+
/// `gap_limit` consecutive unused addresses are found, which is a standard
284+
/// heuristic to determine that there are likely no more used addresses to discover.
285+
///
286+
/// Returns a `Vec` containing the balances for any newly discovered used addresses.
281287
async fn scan_for_new_addresses(
282288
&self,
283289
hd_wallet: &Self::HDWallet,
284290
hd_account: &mut HDCoinHDAccount<Self>,
285291
address_scanner: &Self::HDAddressScanner,
286292
gap_limit: u32,
287-
) -> BalanceResult<Vec<HDAddressBalance<Self::BalanceObject>>>;
293+
) -> BalanceResult<Vec<HDAddressBalance<Self::BalanceObject>>> {
294+
let mut all_balances = Vec::new();
295+
for chain in self.bip44_chains() {
296+
let mut chain_balances = Vec::with_capacity(gap_limit as usize);
297+
298+
// Get the first address index that we haven't scanned yet for this chain.
299+
let mut checking_address_id = hd_account
300+
.known_addresses_number(chain)
301+
.mm_err(|e| BalanceError::Internal(e.to_string()))?;
302+
303+
let mut unused_addresses_counter = 0;
304+
let max_addresses_number = hd_account.address_limit();
305+
306+
// Scan addresses until we hit the gap limit or the maximum address limit.
307+
while checking_address_id < max_addresses_number && unused_addresses_counter <= gap_limit {
308+
let hd_address = self
309+
.derive_address(hd_account, chain, checking_address_id)
310+
.await
311+
.map_mm_err()?;
312+
let checking_address = hd_address.address();
313+
let checking_address_der_path = hd_address.derivation_path();
314+
315+
match self.is_address_used(&checking_address, address_scanner).await? {
316+
// If the address has been used, add it and any preceding empty addresses to our list.
317+
AddressBalanceStatus::Used(non_empty_balance) => {
318+
let last_non_empty_address_id = checking_address_id - unused_addresses_counter;
319+
320+
// Add all the empty addresses we skipped over.
321+
let address_ids = (last_non_empty_address_id..checking_address_id)
322+
.map(|address_id| HDAddressId { chain, address_id });
323+
let empty_addresses = self
324+
.derive_addresses(hd_account, address_ids)
325+
.await
326+
.map_mm_err()?
327+
.into_iter()
328+
.map(|empty_address| HDAddressBalance {
329+
address: empty_address.address().display_address(),
330+
derivation_path: RpcDerivationPath(empty_address.derivation_path().clone()),
331+
chain,
332+
balance: Self::BalanceObject::new(),
333+
});
334+
chain_balances.extend(empty_addresses);
335+
336+
// Add the used address we just found.
337+
chain_balances.push(HDAddressBalance {
338+
address: checking_address.display_address(),
339+
derivation_path: RpcDerivationPath(checking_address_der_path.clone()),
340+
chain,
341+
balance: non_empty_balance,
342+
});
343+
344+
// Reset the counter since we found a used address.
345+
unused_addresses_counter = 0;
346+
},
347+
AddressBalanceStatus::NotUsed => unused_addresses_counter += 1,
348+
}
349+
checking_address_id += 1;
350+
}
351+
352+
// Update the wallet's state with the new number of known addresses.
353+
self.set_known_addresses_number(
354+
hd_wallet,
355+
hd_account,
356+
chain,
357+
checking_address_id - unused_addresses_counter,
358+
)
359+
.await
360+
.map_mm_err()?;
361+
362+
all_balances.extend(chain_balances);
363+
}
364+
Ok(all_balances)
365+
}
288366

289367
/// Requests balances of every activated HD account.
290368
async fn all_accounts_balances(

mm2src/coins/eth.rs

Lines changed: 205 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ use crate::rpc_command::init_scan_for_new_addresses::{InitScanAddressesRpcOps, S
4747
use crate::rpc_command::init_withdraw::{InitWithdrawCoin, WithdrawTaskHandleShared};
4848
use crate::rpc_command::{account_balance, get_new_address, init_account_balance, init_create_account,
4949
init_scan_for_new_addresses};
50-
use crate::{coin_balance, scan_for_new_addresses_impl, BalanceResult, CoinWithDerivationMethod, DerivationMethod,
51-
DexFee, Eip1559Ops, MakerNftSwapOpsV2, ParseCoinAssocTypes, ParseNftAssocTypes, PayForGasParams,
52-
PrivKeyPolicy, RpcCommonOps, SendNftMakerPaymentArgs, SpendNftMakerPaymentArgs, ToBytes,
53-
ValidateNftMakerPaymentArgs, ValidateWatcherSpendInput, WatcherSpendType};
50+
use crate::{coin_balance, BalanceResult, CoinWithDerivationMethod, DerivationMethod, DexFee, Eip1559Ops,
51+
MakerNftSwapOpsV2, ParseCoinAssocTypes, ParseNftAssocTypes, PayForGasParams, PrivKeyPolicy, RpcCommonOps,
52+
SendNftMakerPaymentArgs, SpendNftMakerPaymentArgs, ToBytes, ValidateNftMakerPaymentArgs,
53+
ValidateWatcherSpendInput, WatcherSpendType};
5454
use async_trait::async_trait;
5555
use bitcrypto::{dhash160, keccak256, ripemd160, sha256};
5656
use common::custom_futures::repeatable::{Ready, Retry, RetryOnError};
@@ -62,7 +62,7 @@ use common::number_type_casting::SafeTypeCastingNumbers;
6262
use common::wait_until_sec;
6363
use common::{now_sec, small_rng, DEX_FEE_ADDR_RAW_PUBKEY};
6464
use crypto::privkey::key_pair_from_secret;
65-
use crypto::{Bip44Chain, CryptoCtx, CryptoCtxError, GlobalHDAccountArc, KeyPairPolicy};
65+
use crypto::{Bip44Chain, CryptoCtx, CryptoCtxError, GlobalHDAccountArc, KeyPairPolicy, StandardHDPath};
6666
use derive_more::Display;
6767
use enum_derives::EnumFromStringify;
6868

@@ -899,6 +899,10 @@ pub struct EthCoinImpl {
899899
pub(crate) gas_limit: EthGasLimit,
900900
/// Config provided gas limits v2 for swap v2 transactions
901901
pub(crate) gas_limit_v2: EthGasLimitV2,
902+
/// A local cache for transactions sent from this wallet. Only kept in memory for a running KDF instance.
903+
/// This allows replacing a transaction even if it was sent through a private node
904+
/// and is not yet publicly visible.
905+
local_tx_cache: Arc<AsyncMutex<HashMap<H256, BytesJson>>>,
902906
/// This spawner is used to spawn coin's related futures that should be aborted on coin deactivation
903907
/// and on [`MmArc::stop`].
904908
pub abortable_system: AbortableQueue,
@@ -5848,6 +5852,200 @@ impl MmCoin for EthCoin {
58485852
Box::new(get_tx_hex_by_hash_impl(self.clone(), tx_hash).boxed().compat())
58495853
}
58505854

5855+
async fn replace_transaction(
5856+
&self,
5857+
tx_hash: String,
5858+
fee: WithdrawFee,
5859+
action: ReplacementAction,
5860+
broadcast: bool,
5861+
) -> Result<TransactionDetails, MmError<ReplaceTxError>> {
5862+
// 1. Try to fetch the transaction from the local cache first.
5863+
// This allows replacing transactions sent via a private node
5864+
// and are not yet publicly visible.
5865+
let tx_hash_h256 = H256::from_str(&tx_hash).map_to_mm(|e| ReplaceTxError::InvalidTxHash(e.to_string()))?;
5866+
let tx_hex_from_cache = self.local_tx_cache.lock().await.get(&tx_hash_h256).cloned();
5867+
5868+
let original_tx_raw = if let Some(tx_hex) = tx_hex_from_cache {
5869+
RawTransactionRes { tx_hex }
5870+
} else {
5871+
// 2. If not in the cache, fall back to fetching from the RPC node.
5872+
self.get_raw_transaction(RawTransactionRequest {
5873+
coin: self.ticker().to_string(),
5874+
tx_hash: tx_hash.clone(),
5875+
})
5876+
.compat()
5877+
.await
5878+
.map_err(|e| ReplaceTxError::TxNotFound(e.to_string()))?
5879+
};
5880+
5881+
let original_tx: UnverifiedTransactionWrapper =
5882+
rlp::decode(&original_tx_raw.tx_hex.0).map_to_mm(|e| ReplaceTxError::TxDecodeError(e.to_string()))?;
5883+
5884+
let signed_original_tx =
5885+
SignedEthTx::new(original_tx.clone()).map_to_mm(|e| ReplaceTxError::TxDecodeError(e.to_string()))?;
5886+
5887+
let from_addr = signed_original_tx.sender();
5888+
let nonce_to_replace = original_tx.unsigned().nonce();
5889+
5890+
let from = match self.derivation_method() {
5891+
DerivationMethod::SingleAddress(my_address) => {
5892+
if *my_address != from_addr {
5893+
return MmError::err(ReplaceTxError::TxNotSentFromAddress {
5894+
address: from_addr.display_address(),
5895+
});
5896+
}
5897+
None
5898+
},
5899+
DerivationMethod::HDWallet(hd_wallet) => {
5900+
let hd_address = self
5901+
.find_wallet_address(hd_wallet, &from_addr)
5902+
.await
5903+
.map_err(|e| ReplaceTxError::InternalError(e.to_string()))?
5904+
.ok_or_else(|| {
5905+
MmError::new(ReplaceTxError::TxNotSentFromAddress {
5906+
address: from_addr.display_address(),
5907+
})
5908+
})?;
5909+
5910+
let standard_path =
5911+
StandardHDPath::from_str(&hd_address.derivation_path().to_string()).map_to_mm(|e| {
5912+
ReplaceTxError::InternalError(format!(
5913+
"Invalid HD path for the original transaction's sender address: {:?}",
5914+
e,
5915+
))
5916+
})?;
5917+
5918+
Some(HDAddressSelector::AddressId(standard_path.into()))
5919+
},
5920+
};
5921+
5922+
// Check if the transaction is a swap contract interaction and disallow replacement if so.
5923+
// This is crucial because the counterparty in a swap would not be aware of the new transaction hash.
5924+
// Note: This check is a temporary measure until we implement a proper replacement mechanism for swaps.
5925+
// Replacing spending transactions from swap contracts is allowed though.
5926+
if let Call(to_addr) = original_tx.unsigned().action() {
5927+
let is_swap_v1 = *to_addr == self.swap_contract_address || self.fallback_swap_contract == Some(*to_addr);
5928+
let is_swap_v2 = self.swap_v2_contracts.map_or(false, |c| {
5929+
c.maker_swap_v2_contract == *to_addr
5930+
|| c.taker_swap_v2_contract == *to_addr
5931+
|| c.nft_maker_swap_v2_contract == *to_addr
5932+
});
5933+
5934+
if is_swap_v1 || is_swap_v2 {
5935+
return MmError::err(ReplaceTxError::NotSupported(
5936+
"Replacing transactions for swaps is not yet supported.".to_string(),
5937+
));
5938+
}
5939+
}
5940+
5941+
// 3. Determine parameters for the new transaction.
5942+
let (to_addr_str, amount, max) = match action {
5943+
ReplacementAction::SpeedUp => {
5944+
let to_addr = match original_tx.unsigned().action() {
5945+
Call(addr) => addr,
5946+
Create => {
5947+
return MmError::err(ReplaceTxError::NotSupported(
5948+
"Speeding up contract creation is not yet supported.".to_string(),
5949+
))
5950+
},
5951+
};
5952+
5953+
match self.coin_type {
5954+
EthCoinType::Eth => {
5955+
let original_amount_wei = original_tx.unsigned().value();
5956+
5957+
// Try to estimate the new fee to see if we have enough balance.
5958+
let details_res = get_eth_gas_details_from_withdraw_fee(
5959+
self,
5960+
Some(fee.clone()),
5961+
original_amount_wei,
5962+
original_tx.unsigned().data().clone().into(),
5963+
from_addr,
5964+
*to_addr,
5965+
false, // We don't know if it's a max withdrawal yet
5966+
)
5967+
.await;
5968+
5969+
match details_res {
5970+
Ok(_) => {
5971+
// If successful, it means there's enough balance to cover the original amount and the new fee.
5972+
let amount_dec = u256_to_big_decimal(original_amount_wei, self.decimals())
5973+
.mm_err(|e| ReplaceTxError::InternalError(e.to_string()))?;
5974+
(to_addr.display_address(), amount_dec, false)
5975+
},
5976+
Err(e) => match e.into_inner() {
5977+
// This error indicates we don't have enough funds for the original amount + new fee.
5978+
// This is a strong indicator that the original transaction was a "max" withdrawal.
5979+
// In this case, we switch to a max withdrawal for the replacement transaction.
5980+
EthGasDetailsErr::AmountTooLow { .. } => {
5981+
(to_addr.display_address(), BigDecimal::from(0), true)
5982+
},
5983+
// For any other error, we propagate it.
5984+
other => {
5985+
let withdraw_error: WithdrawError = other.into();
5986+
return Err(withdraw_error.into());
5987+
},
5988+
},
5989+
}
5990+
},
5991+
EthCoinType::Erc20 { token_addr, .. } => {
5992+
if to_addr != &token_addr {
5993+
return MmError::err(ReplaceTxError::NotSupported(
5994+
"Transaction does not belong to this ERC20 token.".to_string(),
5995+
));
5996+
}
5997+
let function = ERC20_CONTRACT
5998+
.function("transfer")
5999+
.map_to_mm(|e| ReplaceTxError::TxDecodeError(e.to_string()))?;
6000+
let tokens = function
6001+
.decode_input(&original_tx.unsigned().data()[4..])
6002+
.map_to_mm(|e| ReplaceTxError::TxDecodeError(e.to_string()))?;
6003+
6004+
let recipient_addr =
6005+
tokens.first().and_then(|t| t.clone().into_address()).ok_or_else(|| {
6006+
MmError::new(ReplaceTxError::TxDecodeError("Couldn't decode recipient".into()))
6007+
})?;
6008+
let token_amount = tokens.get(1).and_then(|t| t.clone().into_uint()).ok_or_else(|| {
6009+
MmError::new(ReplaceTxError::TxDecodeError("Couldn't decode amount".into()))
6010+
})?;
6011+
6012+
let amount_dec = u256_to_big_decimal(token_amount, self.decimals())
6013+
.map_err(|e| MmError::new(ReplaceTxError::InternalError(e.to_string())))?;
6014+
6015+
(recipient_addr.display_address(), amount_dec, false)
6016+
},
6017+
// Todo: Handle NFT transactions
6018+
EthCoinType::Nft { .. } => {
6019+
return MmError::err(ReplaceTxError::NotSupported(
6020+
"Speeding up NFT transactions is not yet supported.".to_string(),
6021+
));
6022+
},
6023+
}
6024+
},
6025+
ReplacementAction::Cancel => (from_addr.display_address(), 0.into(), false),
6026+
};
6027+
6028+
// 4. Create the WithdrawRequest.
6029+
let withdraw_req = WithdrawRequest {
6030+
coin: self.ticker().to_string(),
6031+
to: to_addr_str,
6032+
amount,
6033+
from,
6034+
max,
6035+
fee: Some(fee),
6036+
memo: None,
6037+
broadcast,
6038+
ibc_source_channel: None,
6039+
};
6040+
6041+
// 5. Use the builder pattern to set the nonce and build the transaction.
6042+
let mut withdraw_flow = StandardEthWithdraw::new(self.clone(), withdraw_req).map_mm_err()?;
6043+
withdraw_flow.set_nonce_override(nonce_to_replace);
6044+
6045+
let result = withdraw_flow.build().await.map_mm_err()?;
6046+
Ok(result)
6047+
}
6048+
58516049
fn withdraw(&self, req: WithdrawRequest) -> WithdrawFut {
58526050
Box::new(Box::pin(withdraw_impl(self.clone(), req)).compat())
58536051
}
@@ -6673,6 +6871,7 @@ pub async fn eth_coin_from_conf_and_request(
66736871
nfts_infos: Default::default(),
66746872
gas_limit,
66756873
gas_limit_v2,
6874+
local_tx_cache: Arc::new(AsyncMutex::new(HashMap::new())),
66766875
abortable_system,
66776876
};
66786877

@@ -7540,6 +7739,7 @@ impl EthCoin {
75407739
nfts_infos: Arc::clone(&self.nfts_infos),
75417740
gas_limit: EthGasLimit::default(),
75427741
gas_limit_v2: EthGasLimitV2::default(),
7742+
local_tx_cache: Arc::new(AsyncMutex::new(HashMap::new())),
75437743
abortable_system: self.abortable_system.create_subsystem().unwrap(),
75447744
};
75457745
EthCoin(Arc::new(coin))

mm2src/coins/eth/eth_hd_wallet.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ impl ExtractExtendedPubkey for EthCoin {
3737
impl HDWalletCoinOps for EthCoin {
3838
type HDWallet = EthHDWallet;
3939

40+
fn bip44_chains(&self) -> Vec<Bip44Chain> { vec![Bip44Chain::External] }
41+
4042
fn address_from_extended_pubkey(
4143
&self,
4244
extended_pubkey: &Secp256k1ExtendedPublicKey,
@@ -110,24 +112,6 @@ impl HDWalletBalanceOps for EthCoin {
110112
coin_balance::common_impl::enable_hd_wallet(self, hd_wallet, xpub_extractor, params, path_to_address).await
111113
}
112114

113-
async fn scan_for_new_addresses(
114-
&self,
115-
hd_wallet: &Self::HDWallet,
116-
hd_account: &mut EthHDAccount,
117-
address_scanner: &Self::HDAddressScanner,
118-
gap_limit: u32,
119-
) -> BalanceResult<Vec<HDAddressBalance<Self::BalanceObject>>> {
120-
scan_for_new_addresses_impl(
121-
self,
122-
hd_wallet,
123-
hd_account,
124-
address_scanner,
125-
Bip44Chain::External,
126-
gap_limit,
127-
)
128-
.await
129-
}
130-
131115
async fn all_known_addresses_balances(
132116
&self,
133117
hd_account: &EthHDAccount,

0 commit comments

Comments
 (0)