Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions solana-transaction-utils/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
blockhash_watcher,
error::Error,
queue::{CompletedTransactionTask, TransactionTask},
};
use std::{sync::Arc, time::Duration};

use dashmap::DashMap;
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
Expand All @@ -16,11 +13,16 @@ use solana_sdk::{
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
use tracing::warn;

use crate::{
blockhash_watcher,
error::Error,
queue::{CompletedTransactionTask, TransactionTask},
};

const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(2);
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
const RPC_TXN_SEND_CONCURRENCY: usize = 50;
Expand Down Expand Up @@ -259,24 +261,36 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
// Retry unconfirmed
let current_height = blockhash_rx.borrow().current_block_height;
if !self.unconfirmed_txs.is_empty() {
let (unexpired, expired): (Vec<_>, Vec<_>) = self
// Collect all entries first to release the DashMap lock
let entries = self
.unconfirmed_txs
.iter()
.partition(|entry| entry.value().last_valid_block_height < current_height);

// Resend unexpred/unconfirmed to rpc
let unexpired_txns = unexpired
.iter()
.map(|entry| entry.value().tx.clone())
.map(|entry| {
(
*entry.key(),
entry.value().last_valid_block_height,
entry.value().tx.clone(),
)
})
.collect_vec();

let (unexpired, expired): (Vec<_>, Vec<_>) =
entries
.into_iter()
.partition(|(_, last_valid_block_height, _)| {
*last_valid_block_height < current_height
});

let unexpired_txns = unexpired.into_iter().map(|(_, _, tx)| tx).collect_vec();

// Collect failed transactions (likely expired) and handle as expired
let unexpired_error_signatures = self
.send_transactions(unexpired_txns.as_slice())
.filter_map(|(signature, result)| async move { result.err().map(|_| signature) });
self.handle_expired(unexpired_error_signatures, blockhash_rx)
.await;

let expired_signatures = expired.iter().map(|entry| *entry.key());
let expired_signatures = expired.iter().map(|(signature, _, _)| *signature);
self.handle_expired(stream::iter(expired_signatures), blockhash_rx)
.await;
}
Expand Down