Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions solana-transaction-utils/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
blockhash_watcher,
error::Error,
queue::{CompletedTransactionTask, TransactionTask},
};
use std::{sync::Arc, time::Duration};

use dashmap::DashMap;
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
Expand All @@ -16,11 +13,16 @@ use solana_sdk::{
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
use tracing::warn;

use crate::{
blockhash_watcher,
error::Error,
queue::{CompletedTransactionTask, TransactionTask},
};

const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(2);
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
const RPC_TXN_SEND_CONCURRENCY: usize = 50;
Expand Down Expand Up @@ -222,7 +224,12 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {

async fn handle_tick(&mut self, blockhash_rx: &blockhash_watcher::MessageReceiver) {
// Check confirmations and process as completed
let signatures = self.unconfirmed_txs.iter().map(|r| *r.key()).collect_vec();
let signatures = self
.unconfirmed_txs
.iter()
.map(|r| *r.key())
.collect_vec()
.clone();
// Make a stream of completed (signature, status) tuples
let completed_txns = stream::iter(signatures)
.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
Expand Down