Skip to content

Commit ba1c559

Browse files
committed
Add txn to unconfirmed on either rpc success/err
1 parent c0df81b commit ba1c559

File tree

1 file changed

+24
-27
lines changed

1 file changed

+24
-27
lines changed

solana-transaction-utils/src/sender.rs

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use tokio::sync::mpsc::{Receiver, Sender};
2121
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
2222
use tracing::warn;
2323

24-
const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
24+
const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(2);
2525
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
2626
const RPC_TXN_SEND_CONCURRENCY: usize = 50;
2727

@@ -120,37 +120,34 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
120120
packed_tx: PackedTransactionWithTasks<T>,
121121
blockhash_rx: &blockhash_watcher::MessageReceiver,
122122
) {
123-
match self.send_packed_tx(&packed_tx, blockhash_rx).await {
123+
let blockhash = blockhash_rx.borrow().last_valid_blockhash;
124+
// Convert the packed txn into a versioned transaction. If this fails it's not recoverable through retries
125+
// so notify and exit on errors without queueing for retries
126+
let tx = match packed_tx.mk_transaction(self.max_re_sign_count, blockhash, &self.payer) {
127+
Ok(tx) => tx,
124128
Err(err) => {
125-
warn!(?err, "sending packed transaction");
126129
self.handle_completions(packed_tx.into_completions_with_status(Some(err), Some(0)))
127130
.await;
131+
return;
128132
}
129-
Ok(tx) => {
130-
// Add to unconfirmed map
131-
self.unconfirmed_txs.insert(
132-
tx.signatures[0],
133-
TransactionData {
134-
tx,
135-
packed_tx: packed_tx.clone(),
136-
last_valid_block_height: blockhash_rx.borrow().last_valid_block_height,
137-
},
138-
);
139-
}
140-
}
141-
}
142-
143-
pub async fn send_packed_tx(
144-
&self,
145-
packed_tx: &PackedTransactionWithTasks<T>,
146-
blockhash_rx: &blockhash_watcher::MessageReceiver,
147-
) -> Result<VersionedTransaction, Error> {
148-
let blockhash = blockhash_rx.borrow().last_valid_blockhash;
149-
let tx = packed_tx.mk_transaction(self.max_re_sign_count, blockhash, &self.payer)?;
150-
let Some((_, result)) = self.send_transactions(&[tx.clone()]).next().await else {
151-
return Err(Error::RpcError("Unexpected stream close".to_string()));
152133
};
153-
result.map(|_| tx)
134+
// Send transaction. Queue for checks and retries whether errored or not to handle rpc being down
135+
// temporarily
136+
let _ = self
137+
.rpc_client
138+
.send_transaction(&tx)
139+
.map_err(Error::from)
140+
.inspect_err(|err| warn!(?err, "sending transaction"))
141+
.await;
142+
143+
self.unconfirmed_txs.insert(
144+
tx.signatures[0],
145+
TransactionData {
146+
tx,
147+
packed_tx: packed_tx.clone(),
148+
last_valid_block_height: blockhash_rx.borrow().last_valid_block_height,
149+
},
150+
);
154151
}
155152

156153
pub fn send_transactions<'a>(

0 commit comments

Comments
 (0)