Skip to content

Commit fc459ca

Browse files
authored
Bugfix/tx sender hang3 (#50)
* Fix tx sender hang
1 parent 62026a6 commit fc459ca

File tree

1 file changed

+28
-14
lines changed

1 file changed

+28
-14
lines changed

solana-transaction-utils/src/sender.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
use crate::{
2-
blockhash_watcher,
3-
error::Error,
4-
queue::{CompletedTransactionTask, TransactionTask},
5-
};
1+
use std::{sync::Arc, time::Duration};
2+
63
use dashmap::DashMap;
74
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
85
use itertools::Itertools;
@@ -16,11 +13,16 @@ use solana_sdk::{
1613
transaction::VersionedTransaction,
1714
};
1815
use solana_transaction_status::TransactionStatus;
19-
use std::{sync::Arc, time::Duration};
2016
use tokio::sync::mpsc::{Receiver, Sender};
2117
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
2218
use tracing::warn;
2319

20+
use crate::{
21+
blockhash_watcher,
22+
error::Error,
23+
queue::{CompletedTransactionTask, TransactionTask},
24+
};
25+
2426
const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(2);
2527
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
2628
const RPC_TXN_SEND_CONCURRENCY: usize = 50;
@@ -259,24 +261,36 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
259261
// Retry unconfirmed
260262
let current_height = blockhash_rx.borrow().current_block_height;
261263
if !self.unconfirmed_txs.is_empty() {
262-
let (unexpired, expired): (Vec<_>, Vec<_>) = self
264+
// Collect all entries first to release the DashMap lock
265+
let entries = self
263266
.unconfirmed_txs
264267
.iter()
265-
.partition(|entry| entry.value().last_valid_block_height < current_height);
266-
267-
// Resend unexpred/unconfirmed to rpc
268-
let unexpired_txns = unexpired
269-
.iter()
270-
.map(|entry| entry.value().tx.clone())
268+
.map(|entry| {
269+
(
270+
*entry.key(),
271+
entry.value().last_valid_block_height,
272+
entry.value().tx.clone(),
273+
)
274+
})
271275
.collect_vec();
276+
277+
let (unexpired, expired): (Vec<_>, Vec<_>) =
278+
entries
279+
.into_iter()
280+
.partition(|(_, last_valid_block_height, _)| {
281+
*last_valid_block_height < current_height
282+
});
283+
284+
let unexpired_txns = unexpired.into_iter().map(|(_, _, tx)| tx).collect_vec();
285+
272286
// Collect failed transactions (likely expired) and handle as expired
273287
let unexpired_error_signatures = self
274288
.send_transactions(unexpired_txns.as_slice())
275289
.filter_map(|(signature, result)| async move { result.err().map(|_| signature) });
276290
self.handle_expired(unexpired_error_signatures, blockhash_rx)
277291
.await;
278292

279-
let expired_signatures = expired.iter().map(|entry| *entry.key());
293+
let expired_signatures = expired.iter().map(|(signature, _, _)| *signature);
280294
self.handle_expired(stream::iter(expired_signatures), blockhash_rx)
281295
.await;
282296
}

0 commit comments

Comments
 (0)