Skip to content

Commit 067a3da

Browse files
authored
use blockhash_watcher (#41)
1 parent d292922 commit 067a3da

File tree

6 files changed

+232
-95
lines changed

6 files changed

+232
-95
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use crate::Error;
2+
use futures::{future, TryFutureExt};
3+
use solana_client::nonblocking::rpc_client::RpcClient;
4+
use solana_sdk::hash::Hash;
5+
use std::{sync::Arc, time::Duration};
6+
use tokio::{sync::watch, time};
7+
use tokio_graceful_shutdown::SubsystemHandle;
8+
use tracing::{info, warn};
9+
10+
pub type MessageSender = watch::Sender<BlockHashData>;
11+
pub type MessageReceiver = watch::Receiver<BlockHashData>;
12+
pub const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
13+
14+
pub fn last_valid<T>(receiver: &watch::Receiver<T>) -> watch::Ref<'_, T>
15+
where
16+
T: Clone,
17+
{
18+
receiver.borrow()
19+
}
20+
21+
#[derive(Debug, Clone, Default)]
22+
pub struct BlockHashData {
23+
pub last_valid_block_height: u64,
24+
pub last_valid_blockhash: Hash,
25+
pub current_block_height: u64,
26+
}
27+
28+
#[derive(Clone)]
29+
pub struct BlockhashWatcher {
30+
watch: MessageSender,
31+
interval: Duration,
32+
client: Arc<RpcClient>,
33+
}
34+
35+
impl BlockhashWatcher {
36+
pub fn new(interval: Duration, client: Arc<RpcClient>) -> Self {
37+
let (watch, _) = watch::channel(Default::default());
38+
Self {
39+
watch,
40+
interval,
41+
client,
42+
}
43+
}
44+
45+
pub fn watcher(&mut self) -> MessageReceiver {
46+
self.watch.subscribe()
47+
}
48+
49+
pub async fn run(mut self, shutdown: SubsystemHandle) -> Result<(), Error> {
50+
info!("starting");
51+
let mut interval = time::interval(self.interval);
52+
loop {
53+
tokio::select! {
54+
_ = shutdown.on_shutdown_requested() => {
55+
info!("shutting down");
56+
return Ok(());
57+
}
58+
_ = interval.tick() => {
59+
match self.fetch_data(&shutdown).await {
60+
Ok(Some(new_data)) => {
61+
let _ = self.watch.send_replace(new_data);
62+
}
63+
Ok(None) => (),
64+
Err(err) => warn!(?err, "failed to get block hash data"),
65+
};
66+
}
67+
}
68+
}
69+
}
70+
71+
pub async fn fetch_data(
72+
&mut self,
73+
shutdown: &SubsystemHandle,
74+
) -> Result<Option<BlockHashData>, Error> {
75+
let fetch_fut = future::try_join(
76+
self.client
77+
.get_latest_blockhash_with_commitment(self.client.commitment()),
78+
self.client.get_block_height(),
79+
)
80+
.map_err(Error::from)
81+
.map_ok(
82+
|((last_valid_blockhash, last_valid_block_height), current_block_height)| {
83+
BlockHashData {
84+
last_valid_block_height,
85+
last_valid_blockhash,
86+
current_block_height,
87+
}
88+
},
89+
);
90+
tokio::select! {
91+
result = fetch_fut => result.map(Some),
92+
_ = shutdown.on_shutdown_requested() => Ok(None)
93+
}
94+
}
95+
}

solana-transaction-utils/src/error.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ pub enum Error {
2929
FeeTooHigh,
3030
#[error("Transaction has failed too many retries and gone stale")]
3131
StaleTransaction,
32-
#[error("Channel send error: {0}")]
33-
ChannelSendError(String),
3432
#[error("System time error: {0}")]
3533
SystemTimeError(String),
34+
#[error("message channel closed")]
35+
ChannelClosed,
3636
}
3737

3838
impl From<solana_client::client_error::ClientError> for Error {
@@ -48,8 +48,8 @@ impl From<TpuSenderError> for Error {
4848
}
4949

5050
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
51-
fn from(value: tokio::sync::mpsc::error::SendError<T>) -> Self {
52-
Self::ChannelSendError(value.to_string())
51+
fn from(_value: tokio::sync::mpsc::error::SendError<T>) -> Self {
52+
Self::ChannelClosed
5353
}
5454
}
5555

@@ -61,4 +61,8 @@ impl Error {
6161
pub fn serialization<S: ToString>(str: S) -> Self {
6262
Self::SerializationError(str.to_string())
6363
}
64+
65+
pub fn channel_closed() -> Error {
66+
Error::ChannelClosed
67+
}
6468
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
pub mod blockhash_watcher;
12
pub mod error;
23
pub mod pack;
34
pub mod priority_fee;
45
pub mod queue;
56
pub mod send_and_confirm_transactions_in_parallel;
67
pub mod sender;
8+
pub mod sync;
9+
10+
pub use error::Error;

solana-transaction-utils/src/sender.rs

Lines changed: 28 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
use std::{
2-
sync::{
3-
atomic::{AtomicU64, Ordering},
4-
Arc,
5-
},
6-
time::Duration,
1+
use crate::{
2+
blockhash_watcher,
3+
error::Error,
4+
queue::{CompletedTransactionTask, TransactionTask},
5+
send_and_confirm_transactions_in_parallel::QuicTpuClient,
76
};
8-
97
use dashmap::DashMap;
108
use futures::{stream, StreamExt, TryStreamExt};
119
use itertools::Itertools;
@@ -14,29 +12,19 @@ use solana_client::{
1412
tpu_client::TpuClientConfig,
1513
};
1614
use solana_sdk::{
17-
hash::Hash,
1815
instruction::Instruction,
1916
message::{v0, AddressLookupTableAccount, VersionedMessage},
2017
signature::{Keypair, Signature},
2118
signer::Signer,
2219
transaction::VersionedTransaction,
2320
};
24-
use tokio::sync::{
25-
mpsc::{Receiver, Sender},
26-
RwLock,
27-
};
21+
use std::{sync::Arc, time::Duration};
22+
use tokio::sync::mpsc::{Receiver, Sender};
2823
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
2924
use tracing::{error, info};
3025

31-
use crate::{
32-
error::Error,
33-
queue::{CompletedTransactionTask, TransactionTask},
34-
send_and_confirm_transactions_in_parallel::QuicTpuClient,
35-
};
36-
3726
const SEND_TIMEOUT: Duration = Duration::from_secs(5);
3827
const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
39-
const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
4028
const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 100;
4129

4230
#[derive(Clone, Debug)]
@@ -71,16 +59,8 @@ struct TransactionData<T: Send + Clone> {
7159
sent_to_rpc: bool,
7260
}
7361

74-
#[derive(Clone, Debug, Copy)]
75-
pub struct BlockHashData {
76-
last_valid_block_height: u64,
77-
blockhash: Hash,
78-
}
79-
8062
pub struct TransactionSender<T: Send + Clone + Sync> {
8163
unconfirmed_txs: Arc<DashMap<Signature, TransactionData<T>>>,
82-
blockhash_data: Arc<RwLock<BlockHashData>>,
83-
current_block_height: Arc<AtomicU64>,
8464
rpc_client: Arc<RpcClient>,
8565
tpu_client: Option<QuicTpuClient>,
8666
ws_url: String,
@@ -90,37 +70,6 @@ pub struct TransactionSender<T: Send + Clone + Sync> {
9070
last_tpu_use: u64,
9171
}
9272

93-
pub async fn spawn_background_tasks(
94-
handle: SubsystemHandle,
95-
blockhash_data: Arc<RwLock<BlockHashData>>,
96-
current_block_height: Arc<AtomicU64>,
97-
rpc_client: Arc<RpcClient>,
98-
) -> Result<(), Error> {
99-
// Spawn blockhash updater
100-
let mut interval = tokio::time::interval(BLOCKHASH_REFRESH_INTERVAL);
101-
loop {
102-
tokio::select! {
103-
_ = handle.on_shutdown_requested() => {
104-
return Ok(());
105-
}
106-
_ = interval.tick() => {
107-
if let Ok((blockhash, last_valid_block_height)) = rpc_client
108-
.get_latest_blockhash_with_commitment(rpc_client.commitment())
109-
.await
110-
{
111-
*blockhash_data.write().await = BlockHashData {
112-
last_valid_block_height,
113-
blockhash,
114-
};
115-
}
116-
if let Ok(block_height) = rpc_client.get_block_height().await {
117-
current_block_height.store(block_height, Ordering::Relaxed);
118-
}
119-
}
120-
}
121-
}
122-
}
123-
12473
impl<T: Send + Clone + Sync> TransactionSender<T> {
12574
pub async fn new(
12675
rpc_client: Arc<RpcClient>,
@@ -129,24 +78,8 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
12978
result_tx: Sender<CompletedTransactionTask<T>>,
13079
max_re_sign_count: u32,
13180
) -> Result<Self, Error> {
132-
// Initialize blockhash data
133-
let (blockhash, last_valid_block_height) = rpc_client
134-
.get_latest_blockhash_with_commitment(rpc_client.commitment())
135-
.await?;
136-
137-
let blockhash_data = Arc::new(RwLock::new(BlockHashData {
138-
last_valid_block_height,
139-
blockhash,
140-
}));
141-
142-
// Initialize block height
143-
let block_height = rpc_client.get_block_height().await?;
144-
let current_block_height = Arc::new(AtomicU64::new(block_height));
145-
14681
Ok(Self {
14782
unconfirmed_txs: Arc::new(DashMap::new()),
148-
blockhash_data,
149-
current_block_height,
15083
rpc_client,
15184
tpu_client: None,
15285
ws_url,
@@ -193,8 +126,9 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
193126
pub async fn handle_packed_tx(
194127
&self,
195128
packed_tx: PackedTransactionWithTasks<T>,
129+
blockhash_rx: &blockhash_watcher::MessageReceiver,
196130
) -> Result<(), Error> {
197-
if let Err(e) = self.process_packed_tx(&packed_tx).await {
131+
if let Err(e) = self.process_packed_tx(&packed_tx, blockhash_rx).await {
198132
// Handle processing error by notifying all tasks
199133
stream::iter(packed_tx.tasks)
200134
.map(|task| Ok((task, Some(e.clone()))))
@@ -212,13 +146,14 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
212146
pub async fn process_packed_tx(
213147
&self,
214148
packed: &PackedTransactionWithTasks<T>,
149+
blockhash_rx: &blockhash_watcher::MessageReceiver,
215150
) -> Result<(), Error> {
216151
// Check if transaction has been resigned too many times
217152
if packed.re_sign_count >= self.max_re_sign_count {
218153
return Err(Error::StaleTransaction);
219154
}
220155

221-
let blockhash = self.blockhash_data.read().await.blockhash;
156+
let blockhash = blockhash_rx.borrow().last_valid_blockhash;
222157
let message = v0::Message::try_compile(
223158
&self.payer.pubkey(),
224159
&packed.instructions,
@@ -240,7 +175,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
240175
signature,
241176
TransactionData {
242177
packed_tx: packed.clone(),
243-
last_valid_block_height: self.blockhash_data.read().await.last_valid_block_height,
178+
last_valid_block_height: blockhash_rx.borrow().last_valid_block_height,
244179
serialized_tx: serialized,
245180
sent_to_rpc: false,
246181
},
@@ -262,8 +197,11 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
262197
}
263198
}
264199

265-
async fn check_and_retry(&mut self) -> Result<(), Error> {
266-
let current_height = self.current_block_height.load(Ordering::Relaxed);
200+
async fn check_and_retry(
201+
&mut self,
202+
blockhash_rx: &blockhash_watcher::MessageReceiver,
203+
) -> Result<(), Error> {
204+
let current_height = blockhash_rx.borrow().current_block_height;
267205
let current_time = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
268206
{
269207
Ok(duration) => duration.as_secs(),
@@ -337,7 +275,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
337275
self.unconfirmed_txs.remove(&signature);
338276
// Create new packed transaction with incremented resign count
339277
let new_packed = data.packed_tx.with_incremented_re_sign_count();
340-
self.handle_packed_tx(new_packed).await?;
278+
self.handle_packed_tx(new_packed, blockhash_rx).await?;
341279
}
342280
}
343281
}
@@ -346,7 +284,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
346284
self.unconfirmed_txs.remove(&signature);
347285
// Create new packed transaction with incremented resign count
348286
let new_packed = data.packed_tx.with_incremented_re_sign_count();
349-
self.handle_packed_tx(new_packed).await?;
287+
self.handle_packed_tx(new_packed, blockhash_rx).await?;
350288
}
351289
}
352290
}
@@ -359,16 +297,17 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
359297
mut rx: Receiver<PackedTransactionWithTasks<T>>,
360298
handle: SubsystemHandle,
361299
) -> Result<(), Error> {
300+
let mut blockhash_watcher = blockhash_watcher::BlockhashWatcher::new(
301+
blockhash_watcher::BLOCKHASH_REFRESH_INTERVAL,
302+
self.rpc_client.clone(),
303+
);
362304
handle.start(SubsystemBuilder::new("blockhash-updater", {
363-
let blockhash_data = self.blockhash_data.clone();
364-
let current_block_height = self.current_block_height.clone();
365-
let rpc_client = self.rpc_client.clone();
366-
move |handle| {
367-
spawn_background_tasks(handle, blockhash_data, current_block_height, rpc_client)
368-
}
305+
let watcher = blockhash_watcher.clone();
306+
move |handle| watcher.run(handle)
369307
}));
370308

371309
let mut check_interval = tokio::time::interval(CONFIRMATION_CHECK_INTERVAL);
310+
let blockchain_rx = blockhash_watcher.watcher();
372311

373312
loop {
374313
tokio::select! {
@@ -379,10 +318,10 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
379318
return Ok(());
380319
}
381320
Some(packed_tx) = rx.recv() => {
382-
self.handle_packed_tx(packed_tx).await?;
321+
self.handle_packed_tx(packed_tx, &blockchain_rx).await?;
383322
}
384323
_ = check_interval.tick() => {
385-
self.check_and_retry().await?;
324+
self.check_and_retry(&blockchain_rx).await?;
386325
}
387326
}
388327
}

0 commit comments

Comments
 (0)