diff --git a/README.md b/README.md index 5990c3e..ac89d21 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Currently, the library provides: * Ability to create custom transactions * Several ways to sign transactions (secret key, seed phrase, file, ledger, secure keychain). * Account key pool support to sign the transaction with different user keys to avoid nonce issues. +* Retry in case of invalid transaction nonce error The minimum required version is located in the [rust-version](./Cargo.toml#L4) field of the `Cargo.toml` file. diff --git a/api/src/common/query/mod.rs b/api/src/common/query/mod.rs index c1f4d65..7dfe3cf 100644 --- a/api/src/common/query/mod.rs +++ b/api/src/common/query/mod.rs @@ -245,7 +245,7 @@ where let requests = requests.into_iter().map(|request| { let reference = &self.reference; async move { - retry(network.clone(), |client| { + retry(network, |client| { let request = &request; async move { @@ -436,7 +436,7 @@ where debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query"); - let query_response = retry(network.clone(), |client| { + let query_response = retry(network, |client| { let request = &request; let reference = &self.reference; async move { diff --git a/api/src/common/send.rs b/api/src/common/send.rs index 5987b66..085b890 100644 --- a/api/src/common/send.rs +++ b/api/src/common/send.rs @@ -1,10 +1,11 @@ -use std::fmt; use std::sync::Arc; +use std::{fmt, sync::LazyLock}; use near_openapi_client::types::{ ErrorWrapperForRpcTransactionError, FinalExecutionOutcomeView, JsonRpcRequestForSendTx, + JsonRpcRequestForTx, JsonRpcRequestForTxMethod, JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError, RpcSendTransactionRequest, - RpcTransactionError, RpcTransactionResponse, + RpcTransactionError, RpcTransactionResponse, RpcTransactionStatusRequest, }; use near_api_types::{ @@ -33,6 +34,11 @@ use super::META_TRANSACTION_VALID_FOR_DEFAULT; const TX_EXECUTOR_TARGET: &str = "near_api::tx::executor"; const META_EXECUTOR_TARGET: &str = "near_api::meta::executor"; +pub static REQWEST_CLIENT: LazyLock> = + LazyLock::new(|| Arc::new(reqwest::Client::new())); + +pub type TxExecutionResult = Result; + /// Internal enum to distinguish between a full RPC response and a minimal pending response. enum SendImplResponse { Full(Box), @@ -86,7 +92,7 @@ pub enum TransactionableOrSigned { } impl TransactionableOrSigned { - pub fn signed(self) -> Option { + pub const fn signed(&self) -> Option<&Signed> { match self { Self::Signed((signed, _)) => Some(signed), Self::Transactionable(_) => None, @@ -138,6 +144,13 @@ impl ExecuteSignedTransaction { self } + /// Get transaction hash if the transaction is signed, otherwise returns None. + pub fn get_hash(&self) -> Option { + // FIXME: transaction hash should be retrieved without signing the transaction, + // but currently it is designed to have complete transaction msg only after signing + self.transaction.signed().map(|signed| signed.get_hash()) + } + /// Signs the transaction offline without fetching the nonce or block hash from the network. /// /// The transaction won't be broadcasted to the network and just stored signed in the [Self::transaction] struct variable. @@ -225,10 +238,7 @@ impl ExecuteSignedTransaction { /// Returns a [`TransactionResult`] which is either: /// - [`TransactionResult::Pending`] if `wait_until` is `None` or `Included` (no execution data available yet) /// - [`TransactionResult::Full`] for higher finality levels with full execution results - pub async fn send_to( - mut self, - network: &NetworkConfig, - ) -> Result { + pub async fn send_to(mut self, network: &NetworkConfig) -> TxExecutionResult { let (signed, transactionable) = match &mut self.transaction { TransactionableOrSigned::Transactionable(transaction) => { debug!(target: TX_EXECUTOR_TARGET, "Preparing unsigned transaction"); @@ -240,8 +250,6 @@ impl ExecuteSignedTransaction { } }; - let wait_until = self.wait_until; - if signed.is_none() { debug!(target: TX_EXECUTOR_TARGET, "Editing transaction with network config"); transactionable.edit_with_network(network).await?; @@ -250,34 +258,77 @@ impl ExecuteSignedTransaction { transactionable.validate_with_network(network).await?; } - let signed = match signed { - Some(s) => s, + // If the transaction is signed, send it to the network. + // If the transaction is not signed, sign it and send + match signed { + Some(signed) => Self::send_impl(network, signed, self.wait_until).await, None => { debug!(target: TX_EXECUTOR_TARGET, "Signing transaction"); - self.presign_with(network) - .await? - .transaction - .signed() - .expect("Expect to have it signed") + let prepopulated = transactionable.prepopulated()?; + self.signer + .sign_and_send( + prepopulated.signer_id.clone(), + network, + prepopulated, + self.wait_until, + ) + .await } - }; + } + } + /// Fetches the transaction status from the network. + pub async fn fetch_tx( + network: &NetworkConfig, + params: RpcTransactionStatusRequest, + ) -> TxExecutionResult { info!( target: TX_EXECUTOR_TARGET, - "Broadcasting signed transaction. Hash: {:?}, Signer: {:?}, Receiver: {:?}, Nonce: {}", - signed.get_hash(), - signed.transaction.signer_id(), - signed.transaction.receiver_id(), - signed.transaction.nonce(), + "Fetching transaction status. Params: {:?}", + params, ); - Self::send_impl(network, signed, wait_until).await + let wait_until = match params { + RpcTransactionStatusRequest::Variant0 { wait_until, .. } + | RpcTransactionStatusRequest::Variant1 { wait_until, .. } => wait_until, + }; + + let result = retry(network, |client| { + let params = params.clone(); + async move { + let result = parse_rpc_response( + wait_until, + client + .tx(&JsonRpcRequestForTx { + id: "0".to_string(), + jsonrpc: "2.0".to_string(), + method: JsonRpcRequestForTxMethod::Tx, + params, + }) + .await + .map(|r| r.into_inner()) + .map_err(SendRequestError::from), + ); + + tracing::debug!( + target: TX_EXECUTOR_TARGET, + "Fetching transaction resulted in {:?}", + result + ); + + result + } + }) + .await + .map_err(ExecuteTransactionError::TransactionError)?; + + into_final_outcome(result) } /// Sends the transaction to the default mainnet configuration. /// /// Please note that this will sign the transaction with the mainnet's nonce and block hash if it's not presigned yet. - pub async fn send_to_mainnet(self) -> Result { + pub async fn send_to_mainnet(self) -> TxExecutionResult { let network = NetworkConfig::mainnet(); self.send_to(&network).await } @@ -285,86 +336,53 @@ impl ExecuteSignedTransaction { /// Sends the transaction to the default testnet configuration. /// /// Please note that this will sign the transaction with the testnet's nonce and block hash if it's not presigned yet. - pub async fn send_to_testnet(self) -> Result { + pub async fn send_to_testnet(self) -> TxExecutionResult { let network = NetworkConfig::testnet(); self.send_to(&network).await } - async fn send_impl( + pub(crate) async fn send_impl( network: &NetworkConfig, signed_tr: SignedTransaction, wait_until: TxExecutionStatus, - ) -> Result { + ) -> TxExecutionResult { + info!( + target: TX_EXECUTOR_TARGET, + "Broadcasting signed transaction. Hash: {:?}, Signer: {:?}, Receiver: {:?}, Nonce: {}", + signed_tr.get_hash().to_string(), + signed_tr.transaction.signer_id(), + signed_tr.transaction.receiver_id(), + signed_tr.transaction.nonce(), + ); + let hash = signed_tr.get_hash(); let signed_tx_base64: near_openapi_client::types::SignedTransaction = signed_tr.into(); - let result = retry(network.clone(), |client| { + + let result = retry(network, |client| { let signed_tx_base64 = signed_tx_base64.clone(); async move { - let result = match client - .send_tx(&JsonRpcRequestForSendTx { - id: "0".to_string(), - jsonrpc: "2.0".to_string(), - method: near_openapi_client::types::JsonRpcRequestForSendTxMethod::SendTx, - params: RpcSendTransactionRequest { - signed_tx_base64, - wait_until, - }, - }) - .await - .map(|r| r.into_inner()) - .map_err(SendRequestError::from) - { - Ok( - JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError::Variant0 { - result, - .. - }, - ) => RetryResponse::Ok(SendImplResponse::Full(Box::new(result))), - Ok( - JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError::Variant1 { - error, - .. - }, - ) => { - let error: SendRequestError = - SendRequestError::from(error); - to_retry_error(error, is_critical_transaction_error) - } - Err(err) => { - // When wait_until is NONE or INCLUDED, the RPC returns a minimal - // response with only `final_execution_status`. The openapi client - // fails to deserialize this into RpcTransactionResponse (which - // expects full execution data) and returns InvalidResponsePayload. - // We intercept this case and parse the minimal response ourselves. - // - // We only attempt this fallback when we explicitly requested a - // minimal response, so unexpected/buggy RPC responses for higher - // finality levels don't get silently treated as Pending. - if matches!( - wait_until, - TxExecutionStatus::None | TxExecutionStatus::Included - ) { - if let SendRequestError::TransportError( - near_openapi_client::Error::InvalidResponsePayload(ref bytes, _), - ) = err - { - if let Ok(minimal) = - serde_json::from_slice::(bytes) - { - return RetryResponse::Ok(SendImplResponse::Pending( - minimal.result.final_execution_status, - )); - } - } - } - to_retry_error(err, is_critical_transaction_error) - } - }; + let result = parse_rpc_response( + wait_until, + client + .send_tx(&JsonRpcRequestForSendTx { + id: "0".to_string(), + jsonrpc: "2.0".to_string(), + method: + near_openapi_client::types::JsonRpcRequestForSendTxMethod::SendTx, + params: RpcSendTransactionRequest { + signed_tx_base64, + wait_until, + }, + }) + .await + .map(|r| r.into_inner()) + .map_err(SendRequestError::from), + ); tracing::debug!( target: TX_EXECUTOR_TARGET, "Broadcasting transaction {} resulted in {:?}", - hash, + hash.to_string(), result ); @@ -374,16 +392,7 @@ impl ExecuteSignedTransaction { .await .map_err(ExecuteTransactionError::TransactionError)?; - match result { - SendImplResponse::Pending(status) => Ok(TransactionResult::Pending { status }), - SendImplResponse::Full(rpc_response) => { - let final_execution_outcome_view = to_final_execution_outcome(*rpc_response); - - Ok(TransactionResult::Full(Box::new( - ExecutionFinalResult::try_from(final_execution_outcome_view)?, - ))) - } - } + into_final_outcome(result) } } @@ -419,6 +428,11 @@ impl ExecuteMetaTransaction { self } + pub fn get_tx_lifetime_delta(&self) -> BlockHeight { + self.tx_live_for + .unwrap_or(META_TRANSACTION_VALID_FOR_DEFAULT) + } + /// Signs the transaction offline without fetching the nonce or block hash from the network. Does not broadcast it. /// /// Signed transaction is stored in the [Self::transaction] struct variable. @@ -439,10 +453,7 @@ impl ExecuteMetaTransaction { }; let transaction = transaction.prepopulated()?; - let max_block_height = block_height - + self - .tx_live_for - .unwrap_or(META_TRANSACTION_VALID_FOR_DEFAULT); + let max_block_height = block_height + self.get_tx_lifetime_delta(); let signed_tr = self .signer @@ -507,7 +518,7 @@ impl ExecuteMetaTransaction { /// Sends the transaction to the custom provided network. /// /// This is useful if you want to send the transaction to a non-default network configuration (e.g, custom RPC URL, sandbox). - /// Please note that if the transaction is not presigned, it will be sign with the network's nonce and block hash. + /// Please note that if the transaction is not presigned, it will be signed with the network's nonce and block hash. pub async fn send_to( mut self, network: &NetworkConfig, @@ -531,28 +542,23 @@ impl ExecuteMetaTransaction { transactionable.validate_with_network(network).await?; } - let signed = match signed { - Some(s) => s, + // If the transaction is signed, send it to the relayer. + // If the transaction is not signed, sign it and send + match signed { + Some(signed) => Self::send_impl(network, signed).await, None => { debug!(target: META_EXECUTOR_TARGET, "Signing meta transaction"); - self.presign_with(network) - .await? - .transaction - .signed() - .expect("Expect to have it signed") + let prepopulated = transactionable.prepopulated()?; + self.signer + .sign_and_send_meta( + prepopulated.signer_id.clone(), + network, + prepopulated, + self.get_tx_lifetime_delta(), + ) + .await } - }; - - info!( - target: META_EXECUTOR_TARGET, - "Broadcasting signed meta transaction. Signer: {:?}, Receiver: {:?}, Nonce: {}, Valid until: {}", - signed.delegate_action.sender_id, - signed.delegate_action.receiver_id, - signed.delegate_action.nonce, - signed.delegate_action.max_block_height - ); - - Self::send_impl(network, signed).await + } } /// Sends the transaction to the default mainnet configuration. @@ -571,11 +577,20 @@ impl ExecuteMetaTransaction { self.send_to(&network).await } - async fn send_impl( + pub(crate) async fn send_impl( network: &NetworkConfig, transaction: SignedDelegateAction, ) -> Result { - let client = reqwest::Client::new(); + info!( + target: META_EXECUTOR_TARGET, + "Broadcasting signed meta transaction. Signer: {:?}, Receiver: {:?}, Nonce: {}, Valid until: {}", + transaction.delegate_action.sender_id, + transaction.delegate_action.receiver_id, + transaction.delegate_action.nonce, + transaction.delegate_action.max_block_height + ); + + let client = REQWEST_CLIENT.clone(); let json_payload = serde_json::json!({ "signed_delegate_action": SignedDelegateActionAsBase64::from( transaction.clone() @@ -608,6 +623,86 @@ impl ExecuteMetaTransaction { } } +impl From for SendRequestError { + fn from(err: ErrorWrapperForRpcTransactionError) -> Self { + match err { + ErrorWrapperForRpcTransactionError::InternalError(internal_error) => { + Self::InternalError(internal_error) + } + ErrorWrapperForRpcTransactionError::RequestValidationError( + rpc_request_validation_error_kind, + ) => Self::RequestValidationError(rpc_request_validation_error_kind), + ErrorWrapperForRpcTransactionError::HandlerError(server_error) => { + Self::ServerError(server_error) + } + } + } +} + +fn parse_rpc_response( + wait_until: TxExecutionStatus, + result: Result< + JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError, + SendRequestError, + >, +) -> RetryResponse> { + match result { + Ok(JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError::Variant0 { + result, + .. + }) => RetryResponse::Ok(SendImplResponse::Full(Box::new(result))), + Ok(JsonRpcResponseForRpcTransactionResponseAndRpcTransactionError::Variant1 { + error, + .. + }) => { + let error: SendRequestError = SendRequestError::from(error); + to_retry_error(error, is_critical_transaction_error) + } + Err(err) => { + // When wait_until is NONE or INCLUDED, the RPC returns a minimal + // response with only `final_execution_status`. The openapi client + // fails to deserialize this into RpcTransactionResponse (which + // expects full execution data) and returns InvalidResponsePayload. + // We intercept this case and parse the minimal response ourselves. + // + // We only attempt this fallback when we explicitly requested a + // minimal response, so unexpected/buggy RPC responses for higher + // finality levels don't get silently treated as Pending. + if matches!( + wait_until, + TxExecutionStatus::None | TxExecutionStatus::Included + ) { + if let SendRequestError::TransportError( + near_openapi_client::Error::InvalidResponsePayload(ref bytes, _), + ) = err + { + if let Ok(minimal) = serde_json::from_slice::(bytes) + { + return RetryResponse::Ok(SendImplResponse::Pending( + minimal.result.final_execution_status, + )); + } + } + } + to_retry_error(err, is_critical_transaction_error) + } + } +} + +#[allow(clippy::result_large_err)] +fn into_final_outcome(response: SendImplResponse) -> TxExecutionResult { + match response { + SendImplResponse::Pending(status) => Ok(TransactionResult::Pending { status }), + SendImplResponse::Full(rpc_response) => { + let final_execution_outcome_view = to_final_execution_outcome(*rpc_response); + + Ok(TransactionResult::Full(Box::new( + ExecutionFinalResult::try_from(final_execution_outcome_view)?, + ))) + } + } +} + /// Extracts a [`FinalExecutionOutcomeView`] from an [`RpcTransactionResponse`]. /// /// Both `send_tx` and `tx` (status query) responses share the same envelope type. @@ -623,13 +718,8 @@ pub fn to_final_execution_outcome(response: RpcTransactionResponse) -> FinalExec status, transaction, transaction_outcome, - } => FinalExecutionOutcomeView { - receipts_outcome, - status, - transaction, - transaction_outcome, - }, - RpcTransactionResponse::Variant1 { + } + | RpcTransactionResponse::Variant1 { final_execution_status: _, receipts_outcome, status, @@ -643,19 +733,3 @@ pub fn to_final_execution_outcome(response: RpcTransactionResponse) -> FinalExec }, } } - -impl From for SendRequestError { - fn from(err: ErrorWrapperForRpcTransactionError) -> Self { - match err { - ErrorWrapperForRpcTransactionError::InternalError(internal_error) => { - Self::InternalError(internal_error) - } - ErrorWrapperForRpcTransactionError::RequestValidationError( - rpc_request_validation_error_kind, - ) => Self::RequestValidationError(rpc_request_validation_error_kind), - ErrorWrapperForRpcTransactionError::HandlerError(server_error) => { - Self::ServerError(server_error) - } - } - } -} diff --git a/api/src/config.rs b/api/src/config.rs index 0647864..07e2a5d 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -1,11 +1,17 @@ +use std::{ + collections::HashMap, + sync::{Arc, LazyLock}, +}; + use near_api_types::AccountId; use near_openapi_client::Client; use reqwest::header::{HeaderValue, InvalidHeaderValue}; +use tokio::sync::RwLock; use url::Url; use crate::errors::RetryError; -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Hash, Eq, PartialEq)] /// Specifies the retry strategy for RPC endpoint requests. pub enum RetryMethod { /// Exponential backoff strategy with configurable initial delay and multiplication factor. @@ -23,7 +29,7 @@ pub enum RetryMethod { }, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Hash, Eq, PartialEq)] /// Configuration for a [NEAR RPC](https://docs.near.org/api/rpc/providers) endpoint with retry and backoff settings. pub struct RPCEndpoint { /// The URL of the RPC endpoint @@ -263,14 +269,53 @@ impl From> for RetryResponse { } } +pub static OPENAPI_CLIENT_CACHE: LazyLock = + LazyLock::new(OpenapiClientCache::new); + +pub struct OpenapiClientCache { + inner: RwLock>>, +} + +impl OpenapiClientCache { + pub fn new() -> Self { + Self { + inner: RwLock::new(HashMap::new()), + } + } + + pub async fn get_or_create( + &self, + endpoint: &RPCEndpoint, + ) -> Result, InvalidHeaderValue> { + { + let guard = self.inner.read().await; + + if let Some(client) = guard.get(endpoint) { + return Ok(Arc::clone(client)); + } + } + + let client = Arc::new(endpoint.client()?); + + { + let mut guard = self.inner.write().await; + guard + .entry(endpoint.clone()) + .or_insert_with(|| Arc::clone(&client)); + } + + Ok(client) + } +} + /// Retry a task with exponential backoff and failover. /// /// # Arguments /// * `network` - The network configuration to use for the retry-able operation. /// * `task` - The task to retry. -pub async fn retry(network: NetworkConfig, mut task: F) -> Result> +pub async fn retry(network: &NetworkConfig, mut task: F) -> Result> where - F: FnMut(Client) -> T + Send, + F: FnMut(Arc) -> T + Send, T: core::future::Future> + Send, T::Output: Send, E: Send, @@ -281,9 +326,7 @@ where let mut last_error = None; for endpoint in network.rpc_endpoints.iter() { - let client = endpoint - .client() - .map_err(|e| RetryError::InvalidApiKey(e))?; + let client = OPENAPI_CLIENT_CACHE.get_or_create(endpoint).await?; for retry in 0..endpoint.retries { let result = task(client.clone()).await; match result { diff --git a/api/src/signer/broadcast.rs b/api/src/signer/broadcast.rs new file mode 100644 index 0000000..95536a4 --- /dev/null +++ b/api/src/signer/broadcast.rs @@ -0,0 +1,235 @@ +use std::{sync::atomic::Ordering, time::Duration}; + +use near_api_types::{ + AccountId, BlockHeight, CryptoHash, Nonce, PublicKey, Reference, TxExecutionStatus, + transaction::{PrepopulateTransaction, result::TransactionResult}, +}; + +use near_openapi_client::types::RpcTransactionError; +use tokio::time::sleep; +use tracing::{debug, instrument, warn}; + +use crate::{ + Signer, + advanced::{ExecuteMetaTransaction, ExecuteSignedTransaction, TxExecutionResult}, + config::NetworkConfig, + errors::{ + ExecuteMetaTransactionsError, ExecuteTransactionError, MetaSignError, RetryError, + SendRequestError, SignerError, + }, + signer::SIGNER_TARGET, +}; + +impl Signer { + async fn fetch_nonce_data( + account_id: AccountId, + public_key: PublicKey, + network: &NetworkConfig, + ) -> Result<(Nonce, CryptoHash, BlockHeight), SignerError> { + debug!(target: SIGNER_TARGET, "Fetching latest nonce"); + + let nonce_data = crate::account::Account(account_id.clone()) + .access_key(public_key) + .at(Reference::Final) + .fetch_from(network) + .await + .map_err(|e| SignerError::FetchNonceError(Box::new(e)))?; + + Ok(( + nonce_data.data.nonce.0, + nonce_data.block_hash, + nonce_data.block_height, + )) + } + + /// Fetches the transaction nonce and block hash associated to the access key. Internally + /// caches the nonce as to not need to query for it every time, and ending up having to run + /// into contention with others. + /// + /// Uses finalized block hash to avoid "Transaction Expired" errors when sending transactions + /// to load-balanced RPC endpoints where different nodes may be at different chain heights. + #[allow(clippy::significant_drop_tightening)] + #[instrument(skip(self, network))] + pub async fn fetch_tx_nonce( + &self, + account_id: AccountId, + public_key: PublicKey, + network: &NetworkConfig, + ) -> Result<(Nonce, CryptoHash, BlockHeight), SignerError> { + debug!(target: SIGNER_TARGET, "Fetching transaction nonce"); + + let key = (network.network_name.clone(), account_id.clone(), public_key); + + let (fetched_nonce, block_hash, block_height) = + Self::fetch_nonce_data(account_id, public_key, network).await?; + + let nonce = { + let mut nonce_cache = self.nonce_cache.lock().await; + let nonce = nonce_cache.entry(key).or_default(); + + *nonce = (*nonce).max(fetched_nonce) + 1; + *nonce + }; + + Ok((nonce, block_hash, block_height)) + } + + /// Signs and sends a transaction to the network. + /// + /// Concurrent broadcasting of transactions of the same transaction group + /// (network, account, public key) can cause nonce conflicts + /// (`InvalidTransaction` errors), so this method retries with a fresh nonce + /// up to `max_nonce_retries` specified in the signer configuration + #[instrument(skip(self, network, transaction, account_id))] + pub async fn sign_and_send( + &self, + account_id: impl Into, + network: &NetworkConfig, + transaction: PrepopulateTransaction, + wait_until: TxExecutionStatus, + ) -> TxExecutionResult { + let account_id = account_id.into(); + let public_key = self + .get_public_key() + .await + .map_err(SignerError::PublicKeyError)?; + + self.sign_and_send_with_retry(account_id, public_key, network, transaction, wait_until) + .await + } + + async fn sign_and_send_with_retry( + &self, + account_id: AccountId, + public_key: PublicKey, + network: &NetworkConfig, + transaction: PrepopulateTransaction, + wait_until: TxExecutionStatus, + ) -> TxExecutionResult { + let max_nonce_retries = self.max_nonce_retries.load(Ordering::SeqCst); + let attempts = max_nonce_retries + 1; // +1 for the initial attempt + + for attempt in 0..attempts { + match self + .broadcast_tx( + &account_id, + public_key, + network, + transaction.clone(), + wait_until, + ) + .await + { + Err(err) if Self::is_retryable_nonce_error(&err) && attempt + 1 < attempts => { + warn!( + target: SIGNER_TARGET, + account_id = %account_id, + attempt = attempt + 1, + max_attempts = max_nonce_retries, + error = ?err, + "Invalid transaction detected, retrying after delay" + ); + + let delay = Self::calculate_retry_delay(attempt); + sleep(delay).await; + } + + result => return result, + } + } + + unreachable!("loop always returns on the final attempt") + } + + const fn is_retryable_nonce_error(error: &ExecuteTransactionError) -> bool { + // TODO: check tx nonce error after fix in near openapi types + matches!( + error, + ExecuteTransactionError::TransactionError(RetryError::Critical( + SendRequestError::ServerError(RpcTransactionError::InvalidTransaction(_)) + )) + ) + } + + fn calculate_retry_delay(attempt: u32) -> Duration { + const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(2); + + INITIAL_RETRY_DELAY * 2u32.pow(attempt) + } + + /// Signs and sends a meta transaction to the relayer. + /// + /// This method is used to sign and send a meta transaction to the relayer. + #[instrument(skip(self, network, transaction, account_id))] + pub async fn sign_and_send_meta( + &self, + account_id: impl Into, + network: &NetworkConfig, + transaction: PrepopulateTransaction, + tx_live_for: BlockHeight, + ) -> Result { + let account_id = account_id.into(); + let public_key = self + .get_public_key() + .await + .map_err(SignerError::PublicKeyError) + .map_err(MetaSignError::from)?; + + self.broadcast_meta_tx(account_id, public_key, network, transaction, tx_live_for) + .await + } + + #[allow(clippy::significant_drop_tightening)] + #[instrument(skip(self, account_id, network))] + async fn broadcast_tx( + &self, + account_id: &AccountId, + public_key: PublicKey, + network: &NetworkConfig, + transaction: PrepopulateTransaction, + wait_until: TxExecutionStatus, + ) -> Result { + debug!(target: SIGNER_TARGET, "Broadcasting transaction"); + + let (fetched_nonce, block_hash, _) = self + .fetch_tx_nonce(account_id.clone(), public_key, network) + .await?; + + let signed = self + .sign(transaction, public_key, fetched_nonce, block_hash) + .await?; + + ExecuteSignedTransaction::send_impl(network, signed, wait_until).await + } + + #[allow(clippy::significant_drop_tightening)] + #[instrument(skip(self, account_id, network))] + async fn broadcast_meta_tx( + &self, + account_id: impl Into, + public_key: PublicKey, + network: &NetworkConfig, + transaction: PrepopulateTransaction, + tx_live_for: BlockHeight, + ) -> Result { + debug!(target: SIGNER_TARGET, "Broadcasting meta transaction"); + let account_id = account_id.into(); + + let (fetched_nonce, block_hash, block_height) = self + .fetch_tx_nonce(account_id, public_key, network) + .await + .map_err(MetaSignError::from)?; + + let signed = self + .sign_meta( + transaction, + public_key, + fetched_nonce, + block_hash, + block_height + tx_live_for, + ) + .await?; + + ExecuteMetaTransaction::send_impl(network, signed).await + } +} diff --git a/api/src/signer/mod.rs b/api/src/signer/mod.rs index 43fa37c..a693862 100644 --- a/api/src/signer/mod.rs +++ b/api/src/signer/mod.rs @@ -107,16 +107,17 @@ //! The user can instantiate [`Signer`] with a custom signing logic by utilizing the [`SignerTrait`] trait. use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, path::{Path, PathBuf}, sync::{ Arc, - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU32, AtomicUsize, Ordering}, }, }; +use futures::lock::Mutex; use near_api_types::{ - AccountId, BlockHeight, CryptoHash, Nonce, PublicKey, Reference, SecretKey, Signature, + AccountId, BlockHeight, CryptoHash, Nonce, PublicKey, SecretKey, Signature, transaction::{ PrepopulateTransaction, SignedTransaction, Transaction, TransactionV0, delegate_action::{NonDelegateAction, SignedDelegateAction}, @@ -135,6 +136,7 @@ use crate::{ use secret_key::SecretKeySigner; +mod broadcast; #[cfg(feature = "keystore")] pub mod keystore; #[cfg(feature = "ledger")] @@ -388,14 +390,21 @@ pub trait SignerTrait { fn get_public_key(&self) -> Result; } +/// Each transaction group is identified by: network name, account_id, public_key +pub type TransactionGroupKey = (String, AccountId, PublicKey); + /// A [Signer](`Signer`) is a wrapper around a single or multiple signer implementations /// of [SignerTrait](`SignerTrait`). /// /// It provides an access key pooling and a nonce caching mechanism to improve transaction throughput. +/// Taking into account each transaction group: account_id + public_key + network name, +/// to manage nonces separately for each group. pub struct Signer { - pool: tokio::sync::RwLock>>, - nonce_cache: futures::lock::Mutex>, + pool: tokio::sync::RwLock>>, + nonce_cache: Mutex>, current_public_key: AtomicUsize, + // Optional max retry limit for nonce conflicts during transaction broadcasting + max_nonce_retries: AtomicU32, } impl Signer { @@ -404,17 +413,26 @@ impl Signer { pub fn new( signer: T, ) -> Result, PublicKeyError> { + const DEFAULT_MAX_RETRIES: u32 = 3; + let public_key = signer.get_public_key()?; Ok(Arc::new(Self { - pool: tokio::sync::RwLock::new(HashMap::from([( + pool: tokio::sync::RwLock::new(BTreeMap::from([( public_key, Box::new(signer) as Box, )])), - nonce_cache: futures::lock::Mutex::new(HashMap::new()), + nonce_cache: Mutex::new(HashMap::new()), current_public_key: AtomicUsize::new(0), + max_nonce_retries: AtomicU32::new(DEFAULT_MAX_RETRIES), })) } + /// Sets the maximum number of retries for nonce conflicts during transaction broadcasting + #[instrument(skip(self, retries))] + pub fn set_max_nonce_retries(&self, retries: u32) { + self.max_nonce_retries.store(retries, Ordering::SeqCst); + } + /// Adds a signer to the pool of signers. /// The [Signer](`Signer`) will rotate the provided implementation of [SignerTrait](`SignerTrait`) on each call to [get_public_key](`Signer::get_public_key`). #[instrument(skip(self, signer))] @@ -534,39 +552,6 @@ impl Signer { self.add_signer_to_pool(signer).await } - /// Fetches the transaction nonce and block hash associated to the access key. Internally - /// caches the nonce as to not need to query for it every time, and ending up having to run - /// into contention with others. - /// - /// Uses finalized block hash to avoid "Transaction Expired" errors when sending transactions - /// to load-balanced RPC endpoints where different nodes may be at different chain heights. - #[allow(clippy::significant_drop_tightening)] - #[instrument(skip(self, network), fields(account_id = %account_id))] - pub async fn fetch_tx_nonce( - &self, - account_id: AccountId, - public_key: PublicKey, - network: &NetworkConfig, - ) -> Result<(Nonce, CryptoHash, BlockHeight), SignerError> { - debug!(target: SIGNER_TARGET, "Fetching transaction nonce"); - - let nonce_data = crate::account::Account(account_id.clone()) - .access_key(public_key) - .at(Reference::Final) - .fetch_from(network) - .await - .map_err(|e| SignerError::FetchNonceError(Box::new(e)))?; - - let nonce = { - let mut nonce_cache = self.nonce_cache.lock().await; - let nonce = nonce_cache.entry((account_id, public_key)).or_default(); - *nonce = (*nonce).max(nonce_data.data.nonce.0) + 1; - *nonce - }; - - Ok((nonce, nonce_data.block_hash, nonce_data.block_height)) - } - /// Creates a [Signer](`Signer`) using seed phrase with default HD path. pub fn from_seed_phrase( seed_phrase: &str, diff --git a/api/tests/fetch_tx.rs b/api/tests/fetch_tx.rs new file mode 100644 index 0000000..7a83746 --- /dev/null +++ b/api/tests/fetch_tx.rs @@ -0,0 +1,46 @@ +use near_api::{advanced::ExecuteSignedTransaction, *}; +use near_api_types::{AccountId, NearToken}; +use near_openapi_client::types::RpcTransactionStatusRequest; +use near_sandbox::config::{DEFAULT_GENESIS_ACCOUNT, DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY}; +use testresult::TestResult; + +#[tokio::test] +async fn fetch_tx_status() -> TestResult { + let receiver: AccountId = "tmp_account".parse()?; + let account: AccountId = DEFAULT_GENESIS_ACCOUNT.into(); + + let sandbox = near_sandbox::Sandbox::start_sandbox().await?; + sandbox.create_account(receiver.clone()).send().await?; + + let network = NetworkConfig::from_rpc_url("sandbox", sandbox.rpc_addr.parse()?); + let signer = Signer::from_secret_key(DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY.parse()?)?; + + let tx = Tokens::account(account.clone()) + .send_to(receiver.clone()) + .near(NearToken::from_millinear(1)) + .with_signer(signer.clone()) + .presign_with(&network) + .await?; + + let tx_hash = tx.get_hash().unwrap(); + + tx.wait_until(near_api_types::TxExecutionStatus::Included) + .send_to(&network) + .await? + .assert_success(); + + let res = ExecuteSignedTransaction::fetch_tx( + &network, + RpcTransactionStatusRequest::Variant1 { + sender_account_id: account.clone(), + tx_hash: tx_hash.into(), + wait_until: near_api_types::TxExecutionStatus::IncludedFinal, + }, + ) + .await? + .assert_success(); + + assert!(res.outcome().is_success()); + + Ok(()) +} diff --git a/api/tests/nonces.rs b/api/tests/nonces.rs new file mode 100644 index 0000000..aaab96b --- /dev/null +++ b/api/tests/nonces.rs @@ -0,0 +1,89 @@ +use std::sync::Arc; + +use futures::future::join_all; +use near_api::*; +use near_api_types::{AccountId, NearToken}; +use near_sandbox::config::{DEFAULT_GENESIS_ACCOUNT, DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY}; +use testresult::TestResult; + +#[tokio::test] +async fn correct_nonces_for_different_networks() -> TestResult { + let account: AccountId = DEFAULT_GENESIS_ACCOUNT.into(); + let signer = Signer::from_secret_key(DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY.parse()?)?; + + let sandbox = near_sandbox::Sandbox::start_sandbox().await?; + let second_sandbox = near_sandbox::Sandbox::start_sandbox().await?; + + let network = NetworkConfig::from_rpc_url("sandbox", sandbox.rpc_addr.parse()?); + let second_network = + NetworkConfig::from_rpc_url("second_sandbox", second_sandbox.rpc_addr.parse()?); + + let tx = Tokens::account(account.clone()) + .send_to("tmp_account".parse()?) + .near(NearToken::from_millinear(1)); + + tx.clone() + .with_signer(signer.clone()) + .presign_with(&network) + .await?; + + let nonce_before = Account(account.clone()) + .access_key(signer.get_public_key().await?) + .fetch_from(&network) + .await? + .data + .nonce; + + tx.with_signer(signer.clone()) + .presign_with(&second_network) + .await?; + + let nonce_after = Account(account.clone()) + .access_key(signer.get_public_key().await?) + .fetch_from(&network) + .await? + .data + .nonce; + + // Check that presigning on a different network does not change the nonce on this network + assert_eq!(nonce_after.0, nonce_before.0); + + Ok(()) +} + +#[tokio::test] +async fn sequential_nonces() -> TestResult { + let receiver: AccountId = "tmp_account".parse()?; + let account: AccountId = DEFAULT_GENESIS_ACCOUNT.into(); + let signer = Signer::from_secret_key(DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY.parse()?)?; + + let tx_count = 10; + + let sandbox = near_sandbox::Sandbox::start_sandbox().await?; + let network = NetworkConfig::from_rpc_url("sandbox", sandbox.rpc_addr.parse()?); + + let tx = Tokens::account(account.clone()) + .send_to(receiver.clone()) + .near(NearToken::from_millinear(1)); + + // Commit sequential nonce to signer + join_all((0..tx_count).map(|_| { + tx.clone() + .with_signer(Arc::clone(&signer)) + .send_to(&network) + })) + .await + .into_iter() + .collect::, _>>()?; + + // Try to presign with non sequential nonce + tx.with_signer(Arc::clone(&signer)) + .presign_offline(signer.get_public_key().await?, CryptoHash::default(), 0) + .await? + .send_to(&network) + .await + .err() + .ok_or("Should not be able to use with non sequential nonce")?; + + Ok(()) +} diff --git a/api/tests/multiple_tx_at_same_time_from_same-_user.rs b/api/tests/send_tx.rs similarity index 52% rename from api/tests/multiple_tx_at_same_time_from_same-_user.rs rename to api/tests/send_tx.rs index bf4b5f7..74a0a94 100644 --- a/api/tests/multiple_tx_at_same_time_from_same-_user.rs +++ b/api/tests/send_tx.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use futures::future::join_all; use near_api::*; @@ -7,9 +7,10 @@ use near_sandbox::config::{ DEFAULT_GENESIS_ACCOUNT, DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY, DEFAULT_GENESIS_ACCOUNT_PUBLIC_KEY, }; -use signer::generate_secret_key; +use std::str::FromStr; use testresult::TestResult; +#[allow(clippy::result_large_err)] #[tokio::test] async fn multiple_tx_at_same_time_from_same_key() -> TestResult { let receiver: AccountId = "tmp_account".parse()?; @@ -28,25 +29,22 @@ async fn multiple_tx_at_same_time_from_same_key() -> TestResult { .data .nonce; - let tx = (0..20).map(|i| { + let tx_count = 5_000; + let tx = (0..tx_count).map(|_| { Tokens::account(account.clone()) .send_to(receiver.clone()) - .near(NearToken::from_millinear(i)) + .near(NearToken::from_millinear(1)) }); - // Even though we send 20 transactions with correct nonces, it still might fail - // because of the blockchain/network inclusion race condition - // - // TX1 gets nonce=100, TX2 gets nonce=101, TX3 gets nonce=102, TX4 gets nonce=103 - // TX4 (nonce=103) arrives and gets included in the block at validator first ✓ - // TX1 (nonce=100) arrives second ✗ (rejected - nonce too old, expected 104) - // TX3 (nonce=102) arrives third ✗ (rejected - nonce too old, expected 104) - // TX2 (nonce=101) arrives last ✗ (rejected - nonce too old, expected 104) - let txs = join_all(tx.map(|t| t.with_signer(Arc::clone(&signer)).send_to(&network))) - .await - .into_iter() - .collect::, _>>()?; - assert_eq!(txs.len(), 20); + join_all(tx.map(|t| { + t.with_signer(Arc::clone(&signer)) + .wait_until(near_api_types::TxExecutionStatus::Final) + .send_to(&network) + })) + .await + .into_iter() + .map(|t| t.map(|t| t.assert_success())) + .collect::, _>>()?; let end_nonce = Account(account.clone()) .access_key(signer.get_public_key().await?) @@ -54,64 +52,79 @@ async fn multiple_tx_at_same_time_from_same_key() -> TestResult { .await? .data .nonce; - assert_eq!(end_nonce.0, start_nonce.0 + 20); + + // Nonce can be higher than the number of transactions sent because of retries + assert!(end_nonce.0 >= start_nonce.0 + tx_count as u64); Ok(()) } -#[tokio::test] #[allow(clippy::result_large_err)] +#[tokio::test] async fn multiple_tx_at_same_time_from_different_keys() -> TestResult { let receiver: AccountId = "tmp_account".parse()?; let account: AccountId = DEFAULT_GENESIS_ACCOUNT.into(); + let pubkey_count = 9; + let tx_count = 5_000; + let first_pubkey = PublicKey::from_str(DEFAULT_GENESIS_ACCOUNT_PUBLIC_KEY)?; + let sandbox = near_sandbox::Sandbox::start_sandbox().await?; sandbox.create_account(receiver.clone()).send().await?; let network = NetworkConfig::from_rpc_url("sandbox", sandbox.rpc_addr.parse()?); let signer = Signer::from_secret_key(DEFAULT_GENESIS_ACCOUNT_PRIVATE_KEY.parse()?)?; - let secret = generate_secret_key()?; - Account(account.clone()) - .add_key(AccessKeyPermission::FullAccess, secret.public_key()) - .with_signer(signer.clone()) - .send_to(&network) - .await? - .assert_success(); - - signer.add_secret_key_to_pool(secret.clone()).await?; + join_all((0..pubkey_count).map(|_| add_key_to_pool(&account, &signer, &network))) + .await + .into_iter() + .collect::, _>>()?; - let secret2 = generate_secret_key()?; - Account(account.clone()) - .add_key(AccessKeyPermission::FullAccess, secret2.public_key()) - .with_signer(signer.clone()) - .send_to(&network) + let start_nonce = Account(account.clone()) + .access_key(first_pubkey) + .fetch_from(&network) .await? - .assert_success(); - signer.add_secret_key_to_pool(secret2.clone()).await?; + .data + .nonce; - let tx = (0..12).map(|i| { + let tx = (0..tx_count).map(|_| { Tokens::account(account.clone()) .send_to(receiver.clone()) - .near(NearToken::from_millinear(i)) + .near(NearToken::from_millinear(1)) }); - let txs = join_all(tx.map(|t| t.with_signer(Arc::clone(&signer)).send_to(&network))) + + join_all(tx.map(|t| t.with_signer(Arc::clone(&signer)).send_to(&network))) .await .into_iter() .map(|t| t.map(|t| t.assert_success())) .collect::, _>>()?; - assert_eq!(txs.len(), 12); - let mut hash_map = HashMap::new(); - for tx in txs { - let public_key = tx.transaction().public_key(); - let count: &mut i32 = hash_map.entry(public_key.to_string()).or_insert(0); - *count += 1; - } - - assert_eq!(hash_map.len(), 3); - assert_eq!(hash_map[DEFAULT_GENESIS_ACCOUNT_PUBLIC_KEY], 4); - assert_eq!(hash_map[&secret2.public_key().to_string()], 4); - assert_eq!(hash_map[&secret.public_key().to_string()], 4); + let end_nonce = Account(account.clone()) + .access_key(first_pubkey) + .fetch_from(&network) + .await? + .data + .nonce; + + // Nonce can be higher than the number of transactions sent because of retries + assert!(end_nonce.0 >= start_nonce.0 + tx_count as u64 / (pubkey_count + 1)); + + Ok(()) +} + +async fn add_key_to_pool( + account_id: &AccountId, + signer: &Arc, + network: &NetworkConfig, +) -> TestResult { + let secret = signer::generate_secret_key()?; + Account(account_id.clone()) + .add_key(AccessKeyPermission::FullAccess, secret.public_key()) + .with_signer(Arc::clone(signer)) + .send_to(network) + .await? + .assert_success(); + + signer.add_secret_key_to_pool(secret).await?; Ok(()) } diff --git a/types/src/lib.rs b/types/src/lib.rs index a45297f..0ef43e9 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -18,7 +18,7 @@ pub mod transaction; pub mod utils; pub use near_abi as abi; -pub use near_account_id::AccountId; +pub use near_account_id::{AccountId, AccountIdRef}; pub use near_gas::NearGas; pub use near_openapi_types::{ AccountView, ContractCodeView, FunctionArgs, RpcBlockResponse, diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index edaede3..c219dc8 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -140,6 +140,7 @@ impl TryFrom for SignedTransaction { public_key: public_key.try_into()?, nonce, receiver_id, + // FIXME: this is tx hash, not block hash (https://github.com/near/near-api-rs/issues/134) block_hash: hash.into(), actions: actions .into_iter() @@ -153,6 +154,7 @@ impl TryFrom for SignedTransaction { public_key: public_key.try_into()?, nonce, receiver_id, + // FIXME: this is tx hash, not block hash (https://github.com/near/near-api-rs/issues/134) block_hash: hash.into(), actions: actions .into_iter()