Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions solana-programs/programs/tuktuk/src/instructions/run_task_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
task_seeds, utils,
};

const MEMO_PROGRAM_ID: Pubkey = pubkey!("MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr");

// You can either fit the task in a return value directly, or you need to return accounts
// that have their ownership set to this program, and are stuffed with ReturnedTasksV0.
// The account method is useful if you want to return a lot of tasks, and don't want to
Expand Down Expand Up @@ -205,13 +207,19 @@ impl<'a, 'info> TaskProcessor<'a, 'info> {
}

// Pass free tasks as remaining accounts so the task can know which IDs will be used
let free_tasks = &self.ctx.remaining_accounts[self.free_task_index..];
accounts.extend(free_tasks.iter().cloned());
account_infos.extend(free_tasks.iter().map(|acct| AccountMeta {
pubkey: acct.key(),
is_signer: false,
is_writable: false,
}));
let program_id = *remaining_accounts[ix.program_id_index as usize].key;
// Bit of a hack:
// Memo program expects the remaining accounts to be signers, when they're not.
// Most programs don't read additional accounts unless they explicitly ask for them.
if program_id != MEMO_PROGRAM_ID {
let free_tasks = &self.ctx.remaining_accounts[self.free_task_index..];
accounts.extend(free_tasks.iter().cloned());
account_infos.extend(free_tasks.iter().map(|acct| AccountMeta {
pubkey: acct.key(),
is_signer: false,
is_writable: false,
}));
}

let signer_seeds: Vec<Vec<&[u8]>> = self
.signers
Expand All @@ -221,7 +229,7 @@ impl<'a, 'info> TaskProcessor<'a, 'info> {

solana_program::program::invoke_signed(
&Instruction {
program_id: *remaining_accounts[ix.program_id_index as usize].key,
program_id,
accounts: account_infos,
data: ix.data.clone(),
},
Expand Down
1 change: 1 addition & 0 deletions solana-transaction-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pub mod queue;
pub mod send_and_confirm_transactions_in_parallel;
pub mod sender;
pub mod sync;
pub mod tx_tracker;

pub use error::Error;
6 changes: 3 additions & 3 deletions solana-transaction-utils/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
error::Error,
pack::{PackedTransaction, MAX_TRANSACTION_SIZE},
priority_fee::{auto_compute_price, compute_budget_instruction},
sender::PackedTransactionWithTasks,
tx_tracker::PackedTransactionWithTasks,
};

#[derive(Debug, Clone)]
Expand All @@ -40,7 +40,7 @@ pub struct CompletedTransactionTask<T: Send + Clone> {
pub task: TransactionTask<T>,
}

pub struct TransactionQueueArgs<T: Send + Clone> {
pub struct TransactionQueueArgs<T: Send + Clone + std::fmt::Debug> {
pub rpc_client: Arc<RpcClient>,
pub ws_url: String,
pub payer: Arc<Keypair>,
Expand Down Expand Up @@ -74,7 +74,7 @@ pub fn create_transaction_queue_handles<T: Send + Clone>(

const MAX_PACKABLE_TX_SIZE: usize = 800;

pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
pub async fn create_transaction_queue<T: Send + Clone + std::fmt::Debug + 'static + Sync>(
args: TransactionQueueArgs<T>,
) -> Result<(), Error> {
let mut receiver = args.receiver;
Expand Down
121 changes: 68 additions & 53 deletions solana-transaction-utils/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,32 @@
use crate::{
blockhash_watcher,
error::Error,
queue::{CompletedTransactionTask, TransactionTask},
};
use dashmap::DashMap;
use std::{sync::Arc, time::Duration};

use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
hash::Hash,
instruction::Instruction,
message::{v0, AddressLookupTableAccount, VersionedMessage},
signature::{Keypair, Signature},
signer::Signer,
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
use tracing::warn;

use crate::{
blockhash_watcher,
error::Error,
queue::CompletedTransactionTask,
tx_tracker::{PackedTransactionWithTasks, TransactionData, TxTrackerRequest, TxTrackerSender},
};

const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(2);
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
const RPC_TXN_SEND_CONCURRENCY: usize = 50;

#[derive(Clone, Debug)]
pub struct PackedTransactionWithTasks<T: Send + Clone> {
pub instructions: Vec<Instruction>,
pub tasks: Vec<TransactionTask<T>>,
pub fee: u64,
pub re_sign_count: u32,
}

impl<T: Send + Clone> PackedTransactionWithTasks<T> {
impl<T: Send + Clone + std::fmt::Debug> PackedTransactionWithTasks<T> {
pub fn with_incremented_re_sign_count(&self) -> Self {
let mut result = self.clone();
result.re_sign_count += 1;
Expand Down Expand Up @@ -84,30 +77,24 @@ impl<T: Send + Clone> PackedTransactionWithTasks<T> {
}
}

#[derive(Debug, Clone)]
struct TransactionData<T: Send + Clone> {
packed_tx: PackedTransactionWithTasks<T>,
tx: VersionedTransaction,
last_valid_block_height: u64,
}

pub struct TransactionSender<T: Send + Clone + Sync> {
unconfirmed_txs: Arc<DashMap<Signature, TransactionData<T>>>,
pub struct TransactionSender<T: Send + Clone + Sync + std::fmt::Debug> {
tx_tracker: TxTrackerSender<T>,
rpc_client: Arc<RpcClient>,
result_tx: Sender<CompletedTransactionTask<T>>,
payer: Arc<Keypair>,
max_re_sign_count: u32,
}

impl<T: Send + Clone + Sync> TransactionSender<T> {
impl<T: Send + Clone + Sync + std::fmt::Debug> TransactionSender<T> {
pub async fn new(
rpc_client: Arc<RpcClient>,
payer: Arc<Keypair>,
result_tx: Sender<CompletedTransactionTask<T>>,
max_re_sign_count: u32,
tx_tracker: TxTrackerSender<T>,
) -> Result<Self, Error> {
Ok(Self {
unconfirmed_txs: Arc::new(DashMap::new()),
tx_tracker,
rpc_client,
result_tx,
payer,
Expand All @@ -120,7 +107,10 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
packed_tx: PackedTransactionWithTasks<T>,
blockhash_rx: &blockhash_watcher::MessageReceiver,
) {
// Clone the values we need immediately to avoid holding the borrow
let blockhash = blockhash_rx.borrow().last_valid_blockhash;
let last_valid_block_height = blockhash_rx.borrow().last_valid_block_height;

// Convert the packed txn into a versioned transaction. If this fails it's not recoverable through retries
// so notify and exit on errors without queueing for retries
let tx = match packed_tx.mk_transaction(self.max_re_sign_count, blockhash, &self.payer) {
Expand All @@ -140,14 +130,16 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
.inspect_err(|err| warn!(?err, "sending transaction"))
.await;

self.unconfirmed_txs.insert(
tx.signatures[0],
TransactionData {
tx,
packed_tx: packed_tx.clone(),
last_valid_block_height: blockhash_rx.borrow().last_valid_block_height,
},
);
self.tx_tracker
.send(TxTrackerRequest::Add {
signature: tx.signatures[0],
transaction_data: Box::new(TransactionData {
tx,
packed_tx: packed_tx.clone(),
last_valid_block_height,
}),
})
.await;
}

pub fn send_transactions<'a>(
Expand Down Expand Up @@ -178,13 +170,17 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
blockhash_rx: &blockhash_watcher::MessageReceiver,
) {
signatures
.filter_map(|signature| async move {
self.unconfirmed_txs
.remove(&signature)
.map(|(_, data)| data.packed_tx.with_incremented_re_sign_count())
.filter_map(async move |signature| {
self.tx_tracker
.remove(signature)
.await
.inspect_err(|err| warn!(?err, "removing signature from tx tracker"))
.ok()
.flatten()
.map(|data| data.packed_tx.with_incremented_re_sign_count())
})
.for_each_concurrent(RPC_TXN_SEND_CONCURRENCY, |packed_tx| async move {
self.handle_packed_tx(packed_tx, blockhash_rx).await
self.handle_packed_tx(packed_tx, blockhash_rx).await;
})
.await
}
Expand All @@ -196,9 +192,13 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
let completions = signature_statuses
// Look up transaction data for signature
.filter_map(|(signature, status)| async move {
self.unconfirmed_txs
.remove(&signature)
.map(|(_, v)| (v, status))
self.tx_tracker
.remove(signature)
.await
.inspect_err(|err| warn!(?err, "removing signature from tx tracker"))
.ok()
.flatten()
.map(|data| (data, status))
})
// Map status into completion messages
.flat_map(|(data, status)| {
Expand All @@ -221,8 +221,18 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
}

async fn handle_tick(&mut self, blockhash_rx: &blockhash_watcher::MessageReceiver) {
// Clone the value we need immediately to avoid holding the borrow
let current_height = blockhash_rx.borrow().current_block_height;

// Check confirmations and process as completed
let signatures = self.unconfirmed_txs.iter().map(|r| *r.key()).collect_vec();
let signatures = self
.tx_tracker
.get_all()
.await
.inspect_err(|err| warn!(?err, "getting all tx tracker"))
.ok()
.map(|tx_tracker| tx_tracker.into_keys().collect_vec())
.unwrap_or_default();
// Make a stream of completed (signature, status) tuples
let completed_txns = stream::iter(signatures)
.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
Expand Down Expand Up @@ -255,19 +265,24 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
.flatten_unordered(10);
// Remove completed and notify
self.handle_completed(completed_txns).await;

// Retry unconfirmed
let current_height = blockhash_rx.borrow().current_block_height;
if !self.unconfirmed_txs.is_empty() {
let (unexpired, expired): (Vec<_>, Vec<_>) = self
.unconfirmed_txs
let tx_map = self
.tx_tracker
.get_all()
.await
.inspect_err(|err| warn!(?err, "getting all tx tracker"))
.ok()
.unwrap_or_default();
let signatures = tx_map.clone().into_keys().collect_vec();
if !signatures.is_empty() {
let (unexpired, expired): (Vec<_>, Vec<_>) = tx_map
.iter()
.partition(|entry| entry.value().last_valid_block_height < current_height);
.partition(|entry| entry.1.last_valid_block_height < current_height);

// Resend unexpred/unconfirmed to rpc
let unexpired_txns = unexpired
.iter()
.map(|entry| entry.value().tx.clone())
.map(|entry| entry.1.tx.clone())
.collect_vec();
// Collect failed transactions (likely expired) and handle as expired
let unexpired_error_signatures = self
Expand All @@ -276,7 +291,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
self.handle_expired(unexpired_error_signatures, blockhash_rx)
.await;

let expired_signatures = expired.iter().map(|entry| *entry.key());
let expired_signatures = expired.iter().map(|entry| *entry.0);
self.handle_expired(stream::iter(expired_signatures), blockhash_rx)
.await;
}
Expand Down
Loading
Loading