Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fb3aa7d
tests: multiple sequential tx test added
hlgltvnnk Feb 24, 2026
9488867
fix: sequential tx broadcast
hlgltvnnk Feb 24, 2026
ba1d927
fix: tx group locking removed due to blocking full dashmap shard
hlgltvnnk Feb 24, 2026
aa8eef5
sequential signer implementation added
hlgltvnnk Feb 25, 2026
4a537c4
meta transaction sequential processing added
hlgltvnnk Feb 25, 2026
a82f196
test: updated tests for multisigner
hlgltvnnk Feb 25, 2026
206d3b8
simplified sequential mode impl
hlgltvnnk Feb 25, 2026
6056756
chores
hlgltvnnk Feb 25, 2026
ddd2d8c
updated sequential tests
hlgltvnnk Feb 25, 2026
ffb62d3
nonce tests added
hlgltvnnk Feb 26, 2026
c086c0d
docs added
hlgltvnnk Feb 26, 2026
100ef12
fetch tx tests added
hlgltvnnk Feb 26, 2026
3ed779e
bug with block_hash found
hlgltvnnk Feb 26, 2026
b7a973f
FinalExecutionOutcome deserialization bug found
hlgltvnnk Feb 26, 2026
b754c2b
reverted changes with into<Network>
hlgltvnnk Feb 27, 2026
ec54830
chores
hlgltvnnk Feb 27, 2026
2f75a72
Merge branch 'main' into fix/nonce-race-condition
hlgltvnnk Mar 4, 2026
86e6ccd
cspell cfg update
hlgltvnnk Mar 4, 2026
468e736
Execution final result -> TransactionResult
hlgltvnnk Mar 4, 2026
0a1c067
tmp: run workflow
hlgltvnnk Mar 4, 2026
322f3c9
tx count in non_sequential tests decreased
hlgltvnnk Mar 4, 2026
91aeee1
fmt
hlgltvnnk Mar 4, 2026
e60ca70
chores
hlgltvnnk Mar 4, 2026
240ac97
chores
hlgltvnnk Mar 5, 2026
f12e72c
Merge branch 'main' into fix/nonce-race-condition
hlgltvnnk Mar 6, 2026
f55d04a
dashmap replaced
hlgltvnnk Mar 6, 2026
8de95cb
Merge branch 'main' into fix/nonce-race-condition
vsavchyn-dev Mar 9, 2026
122ff8b
signer seq mod enabled by default
hlgltvnnk Mar 10, 2026
6459600
chores
hlgltvnnk Mar 10, 2026
56620a8
chores
hlgltvnnk Mar 10, 2026
5fe79bf
Merge branch 'main' into fix/nonce-race-condition
hlgltvnnk Mar 12, 2026
76dcacc
sequential mode removed, invalid tx retries added
hlgltvnnk Mar 12, 2026
6258563
refactoring
hlgltvnnk Mar 12, 2026
aa8a267
chores
hlgltvnnk Mar 12, 2026
2b84c08
chores
hlgltvnnk Mar 12, 2026
a502991
openapi client cache added
hlgltvnnk Mar 12, 2026
46337cb
chores
hlgltvnnk Mar 12, 2026
f8a2b01
max retriries added
hlgltvnnk Mar 12, 2026
9d68a37
chores
hlgltvnnk Mar 12, 2026
5f603b8
chores
hlgltvnnk Mar 12, 2026
04f8908
chores
hlgltvnnk Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Currently, the library provides:
* Ability to create custom transactions
* Several ways to sign transactions (secret key, seed phrase, file, ledger, secure keychain).
* Account key pool support to sign the transaction with different user keys to avoid nonce issues.
* Retry in case of invalid transaction nonce error

The minimum required version is located in the [rust-version](./Cargo.toml#L4) field of the `Cargo.toml` file.

Expand Down
4 changes: 2 additions & 2 deletions api/src/common/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ where
let requests = requests.into_iter().map(|request| {
let reference = &self.reference;
async move {
retry(network.clone(), |client| {
retry(network, |client| {
let request = &request;

async move {
Expand Down Expand Up @@ -436,7 +436,7 @@ where

debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");

let query_response = retry(network.clone(), |client| {
let query_response = retry(network, |client| {
let request = &request;
let reference = &self.reference;
async move {
Expand Down
370 changes: 222 additions & 148 deletions api/src/common/send.rs

Large diffs are not rendered by default.

57 changes: 50 additions & 7 deletions api/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{
collections::HashMap,
sync::{Arc, LazyLock},
};

use near_api_types::AccountId;
use near_openapi_client::Client;
use reqwest::header::{HeaderValue, InvalidHeaderValue};
use tokio::sync::RwLock;
use url::Url;

use crate::errors::RetryError;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Hash, Eq, PartialEq)]
/// Specifies the retry strategy for RPC endpoint requests.
pub enum RetryMethod {
/// Exponential backoff strategy with configurable initial delay and multiplication factor.
Expand All @@ -23,7 +29,7 @@ pub enum RetryMethod {
},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Hash, Eq, PartialEq)]
/// Configuration for a [NEAR RPC](https://docs.near.org/api/rpc/providers) endpoint with retry and backoff settings.
pub struct RPCEndpoint {
/// The URL of the RPC endpoint
Expand Down Expand Up @@ -263,14 +269,53 @@ impl<R, E> From<Result<R, E>> for RetryResponse<R, E> {
}
}

pub static OPENAPI_CLIENT_CACHE: LazyLock<OpenapiClientCache> =
LazyLock::new(OpenapiClientCache::new);

pub struct OpenapiClientCache {
inner: RwLock<HashMap<RPCEndpoint, Arc<Client>>>,
}

impl OpenapiClientCache {
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}

pub async fn get_or_create(
&self,
endpoint: &RPCEndpoint,
) -> Result<Arc<Client>, InvalidHeaderValue> {
{
let guard = self.inner.read().await;

if let Some(client) = guard.get(endpoint) {
return Ok(Arc::clone(client));
}
}

let client = Arc::new(endpoint.client()?);

{
let mut guard = self.inner.write().await;
guard
.entry(endpoint.clone())
.or_insert_with(|| Arc::clone(&client));
}

Ok(client)
}
Comment on lines +272 to +308
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenapiClientCache uses the entire RPCEndpoint (including retries/retry_method and bearer_header) as the cache key. This can significantly reduce reuse (same URL but different retry settings yields different clients) and it also stores/clones the API key string in a process-wide static map for the lifetime of the program. Consider keying the cache only by client-affecting fields (e.g., URL + auth header) and avoid retaining the raw secret in the key (store a digest/fingerprint instead).

Copilot uses AI. Check for mistakes.
}

/// Retry a task with exponential backoff and failover.
///
/// # Arguments
/// * `network` - The network configuration to use for the retry-able operation.
/// * `task` - The task to retry.
pub async fn retry<R, E, T, F>(network: NetworkConfig, mut task: F) -> Result<R, RetryError<E>>
pub async fn retry<R, E, T, F>(network: &NetworkConfig, mut task: F) -> Result<R, RetryError<E>>
where
F: FnMut(Client) -> T + Send,
F: FnMut(Arc<Client>) -> T + Send,
T: core::future::Future<Output = RetryResponse<R, E>> + Send,
T::Output: Send,
E: Send,
Expand All @@ -281,9 +326,7 @@ where

let mut last_error = None;
for endpoint in network.rpc_endpoints.iter() {
let client = endpoint
.client()
.map_err(|e| RetryError::InvalidApiKey(e))?;
let client = OPENAPI_CLIENT_CACHE.get_or_create(endpoint).await?;
for retry in 0..endpoint.retries {
let result = task(client.clone()).await;
match result {
Expand Down
235 changes: 235 additions & 0 deletions api/src/signer/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use std::{sync::atomic::Ordering, time::Duration};

use near_api_types::{
AccountId, BlockHeight, CryptoHash, Nonce, PublicKey, Reference, TxExecutionStatus,
transaction::{PrepopulateTransaction, result::TransactionResult},
};

use near_openapi_client::types::RpcTransactionError;
use tokio::time::sleep;
use tracing::{debug, instrument, warn};

use crate::{
Signer,
advanced::{ExecuteMetaTransaction, ExecuteSignedTransaction, TxExecutionResult},
config::NetworkConfig,
errors::{
ExecuteMetaTransactionsError, ExecuteTransactionError, MetaSignError, RetryError,
SendRequestError, SignerError,
},
signer::SIGNER_TARGET,
};

impl Signer {
async fn fetch_nonce_data(
account_id: AccountId,
public_key: PublicKey,
network: &NetworkConfig,
) -> Result<(Nonce, CryptoHash, BlockHeight), SignerError> {
debug!(target: SIGNER_TARGET, "Fetching latest nonce");

let nonce_data = crate::account::Account(account_id.clone())
.access_key(public_key)
.at(Reference::Final)
.fetch_from(network)
.await
.map_err(|e| SignerError::FetchNonceError(Box::new(e)))?;

Ok((
nonce_data.data.nonce.0,
nonce_data.block_hash,
nonce_data.block_height,
))
}

/// Fetches the transaction nonce and block hash associated to the access key. Internally
/// caches the nonce as to not need to query for it every time, and ending up having to run
/// into contention with others.
///
/// Uses finalized block hash to avoid "Transaction Expired" errors when sending transactions
/// to load-balanced RPC endpoints where different nodes may be at different chain heights.
#[allow(clippy::significant_drop_tightening)]
#[instrument(skip(self, network))]
pub async fn fetch_tx_nonce(
&self,
account_id: AccountId,
public_key: PublicKey,
network: &NetworkConfig,
) -> Result<(Nonce, CryptoHash, BlockHeight), SignerError> {
debug!(target: SIGNER_TARGET, "Fetching transaction nonce");

let key = (network.network_name.clone(), account_id.clone(), public_key);

let (fetched_nonce, block_hash, block_height) =
Self::fetch_nonce_data(account_id, public_key, network).await?;

let nonce = {
let mut nonce_cache = self.nonce_cache.lock().await;
let nonce = nonce_cache.entry(key).or_default();

*nonce = (*nonce).max(fetched_nonce) + 1;
*nonce
};

Ok((nonce, block_hash, block_height))
}

/// Signs and sends a transaction to the network.
///
/// Concurrent broadcasting of transactions of the same transaction group
/// (network, account, public key) can cause nonce conflicts
/// (`InvalidTransaction` errors), so this method retries with a fresh nonce
/// up to `max_nonce_retries` specified in the signer configuration
#[instrument(skip(self, network, transaction, account_id))]
pub async fn sign_and_send(
&self,
account_id: impl Into<AccountId>,
network: &NetworkConfig,
transaction: PrepopulateTransaction,
wait_until: TxExecutionStatus,
) -> TxExecutionResult {
let account_id = account_id.into();
let public_key = self
.get_public_key()
.await
.map_err(SignerError::PublicKeyError)?;

self.sign_and_send_with_retry(account_id, public_key, network, transaction, wait_until)
.await
}

async fn sign_and_send_with_retry(
&self,
account_id: AccountId,
public_key: PublicKey,
network: &NetworkConfig,
transaction: PrepopulateTransaction,
wait_until: TxExecutionStatus,
) -> TxExecutionResult {
let max_nonce_retries = self.max_nonce_retries.load(Ordering::SeqCst);
let attempts = max_nonce_retries + 1; // +1 for the initial attempt

for attempt in 0..attempts {
match self
.broadcast_tx(
&account_id,
public_key,
network,
transaction.clone(),
wait_until,
)
.await
{
Err(err) if Self::is_retryable_nonce_error(&err) && attempt + 1 < attempts => {
warn!(
target: SIGNER_TARGET,
account_id = %account_id,
attempt = attempt + 1,
max_attempts = max_nonce_retries,
error = ?err,
"Invalid transaction detected, retrying after delay"
);

let delay = Self::calculate_retry_delay(attempt);
sleep(delay).await;
}

result => return result,
}
}

unreachable!("loop always returns on the final attempt")
}

const fn is_retryable_nonce_error(error: &ExecuteTransactionError) -> bool {
// TODO: check tx nonce error after fix in near openapi types
matches!(
error,
ExecuteTransactionError::TransactionError(RetryError::Critical(
SendRequestError::ServerError(RpcTransactionError::InvalidTransaction(_))
))
)
}

fn calculate_retry_delay(attempt: u32) -> Duration {
const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(2);

INITIAL_RETRY_DELAY * 2u32.pow(attempt)
}

/// Signs and sends a meta transaction to the relayer.
///
/// This method is used to sign and send a meta transaction to the relayer.
#[instrument(skip(self, network, transaction, account_id))]
pub async fn sign_and_send_meta(
&self,
account_id: impl Into<AccountId>,
network: &NetworkConfig,
transaction: PrepopulateTransaction,
tx_live_for: BlockHeight,
) -> Result<reqwest::Response, ExecuteMetaTransactionsError> {
let account_id = account_id.into();
let public_key = self
.get_public_key()
.await
.map_err(SignerError::PublicKeyError)
.map_err(MetaSignError::from)?;

self.broadcast_meta_tx(account_id, public_key, network, transaction, tx_live_for)
.await
}

#[allow(clippy::significant_drop_tightening)]
#[instrument(skip(self, account_id, network))]
async fn broadcast_tx(
&self,
account_id: &AccountId,
public_key: PublicKey,
network: &NetworkConfig,
transaction: PrepopulateTransaction,
wait_until: TxExecutionStatus,
) -> Result<TransactionResult, ExecuteTransactionError> {
debug!(target: SIGNER_TARGET, "Broadcasting transaction");

let (fetched_nonce, block_hash, _) = self
.fetch_tx_nonce(account_id.clone(), public_key, network)
.await?;

let signed = self
.sign(transaction, public_key, fetched_nonce, block_hash)
.await?;

ExecuteSignedTransaction::send_impl(network, signed, wait_until).await
}

#[allow(clippy::significant_drop_tightening)]
#[instrument(skip(self, account_id, network))]
async fn broadcast_meta_tx(
&self,
account_id: impl Into<AccountId>,
public_key: PublicKey,
network: &NetworkConfig,
transaction: PrepopulateTransaction,
tx_live_for: BlockHeight,
) -> Result<reqwest::Response, ExecuteMetaTransactionsError> {
debug!(target: SIGNER_TARGET, "Broadcasting meta transaction");
let account_id = account_id.into();

let (fetched_nonce, block_hash, block_height) = self
.fetch_tx_nonce(account_id, public_key, network)
.await
.map_err(MetaSignError::from)?;

let signed = self
.sign_meta(
transaction,
public_key,
fetched_nonce,
block_hash,
block_height + tx_live_for,
)
.await?;

ExecuteMetaTransaction::send_impl(network, signed).await
}
}
Loading