Skip to content

Commit 164d602

Browse files
committed
Merge remote-tracking branch 'origin/main' into develop
2 parents f6d7190 + 067a3da commit 164d602

File tree

20 files changed

+386
-173
lines changed

20 files changed

+386
-173
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ spl-token = "4.0.0"
5858
itertools = "0.13"
5959
tokio-graceful-shutdown = "0.15"
6060
solana-transaction-utils = { version = "0.3.4", path = "./solana-transaction-utils" }
61-
tuktuk-sdk = { version = "0.3.2", path = "./tuktuk-sdk" }
61+
tuktuk-sdk = { version = "0.3.3", path = "./tuktuk-sdk" }
6262
tuktuk-program = { version = "0.3.1", path = "./tuktuk-program" }
6363
solana-account-decoder = { version = "2.2.3" }
6464
solana-clock = { version = "2.2.1" }
6565
solana-transaction-status = "2.2.3"
66+
bincode = { version = "1.3.3" }
6667

6768
[profile.release]
6869
debug = true
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use crate::Error;
2+
use futures::{future, TryFutureExt};
3+
use solana_client::nonblocking::rpc_client::RpcClient;
4+
use solana_sdk::hash::Hash;
5+
use std::{sync::Arc, time::Duration};
6+
use tokio::{sync::watch, time};
7+
use tokio_graceful_shutdown::SubsystemHandle;
8+
use tracing::{info, warn};
9+
10+
pub type MessageSender = watch::Sender<BlockHashData>;
11+
pub type MessageReceiver = watch::Receiver<BlockHashData>;
12+
pub const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
13+
14+
pub fn last_valid<T>(receiver: &watch::Receiver<T>) -> watch::Ref<'_, T>
15+
where
16+
T: Clone,
17+
{
18+
receiver.borrow()
19+
}
20+
21+
#[derive(Debug, Clone, Default)]
22+
pub struct BlockHashData {
23+
pub last_valid_block_height: u64,
24+
pub last_valid_blockhash: Hash,
25+
pub current_block_height: u64,
26+
}
27+
28+
#[derive(Clone)]
29+
pub struct BlockhashWatcher {
30+
watch: MessageSender,
31+
interval: Duration,
32+
client: Arc<RpcClient>,
33+
}
34+
35+
impl BlockhashWatcher {
36+
pub fn new(interval: Duration, client: Arc<RpcClient>) -> Self {
37+
let (watch, _) = watch::channel(Default::default());
38+
Self {
39+
watch,
40+
interval,
41+
client,
42+
}
43+
}
44+
45+
pub fn watcher(&mut self) -> MessageReceiver {
46+
self.watch.subscribe()
47+
}
48+
49+
pub async fn run(mut self, shutdown: SubsystemHandle) -> Result<(), Error> {
50+
info!("starting");
51+
let mut interval = time::interval(self.interval);
52+
loop {
53+
tokio::select! {
54+
_ = shutdown.on_shutdown_requested() => {
55+
info!("shutting down");
56+
return Ok(());
57+
}
58+
_ = interval.tick() => {
59+
match self.fetch_data(&shutdown).await {
60+
Ok(Some(new_data)) => {
61+
let _ = self.watch.send_replace(new_data);
62+
}
63+
Ok(None) => (),
64+
Err(err) => warn!(?err, "failed to get block hash data"),
65+
};
66+
}
67+
}
68+
}
69+
}
70+
71+
pub async fn fetch_data(
72+
&mut self,
73+
shutdown: &SubsystemHandle,
74+
) -> Result<Option<BlockHashData>, Error> {
75+
let fetch_fut = future::try_join(
76+
self.client
77+
.get_latest_blockhash_with_commitment(self.client.commitment()),
78+
self.client.get_block_height(),
79+
)
80+
.map_err(Error::from)
81+
.map_ok(
82+
|((last_valid_blockhash, last_valid_block_height), current_block_height)| {
83+
BlockHashData {
84+
last_valid_block_height,
85+
last_valid_blockhash,
86+
current_block_height,
87+
}
88+
},
89+
);
90+
tokio::select! {
91+
result = fetch_fut => result.map(Some),
92+
_ = shutdown.on_shutdown_requested() => Ok(None)
93+
}
94+
}
95+
}

solana-transaction-utils/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub enum Error {
2929
FeeTooHigh,
3030
#[error("Transaction has failed too many retries and gone stale")]
3131
StaleTransaction,
32+
#[error("System time error: {0}")]
33+
SystemTimeError(String),
34+
#[error("message channel closed")]
35+
ChannelClosed,
3236
}
3337

3438
impl From<solana_client::client_error::ClientError> for Error {
@@ -43,6 +47,12 @@ impl From<TpuSenderError> for Error {
4347
}
4448
}
4549

50+
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
51+
fn from(_value: tokio::sync::mpsc::error::SendError<T>) -> Self {
52+
Self::ChannelClosed
53+
}
54+
}
55+
4656
impl Error {
4757
pub fn signer<S: ToString>(str: S) -> Self {
4858
Self::SignerError(str.to_string())
@@ -51,4 +61,8 @@ impl Error {
5161
pub fn serialization<S: ToString>(str: S) -> Self {
5262
Self::SerializationError(str.to_string())
5363
}
64+
65+
pub fn channel_closed() -> Error {
66+
Error::ChannelClosed
67+
}
5468
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
pub mod blockhash_watcher;
12
pub mod error;
23
pub mod pack;
34
pub mod priority_fee;
45
pub mod queue;
56
pub mod send_and_confirm_transactions_in_parallel;
67
pub mod sender;
8+
pub mod sync;
9+
10+
pub use error::Error;

solana-transaction-utils/src/queue.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ const MAX_PACKABLE_TX_SIZE: usize = 800;
7676

7777
pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
7878
args: TransactionQueueArgs<T>,
79-
) {
79+
) -> Result<(), Error> {
8080
let mut receiver = args.receiver;
8181

8282
// The currently staged bundle of tasks
@@ -203,16 +203,15 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
203203
task,
204204
fee: 0,
205205
})
206-
.await
207-
.expect("send result");
206+
.await?;
208207
}
209208
}
210209
Err(e) => {
211210
args.result_sender.send(CompletedTransactionTask {
212211
err: Some(e),
213212
task,
214213
fee: 0,
215-
}).await.expect("send result");
214+
}).await?;
216215
},
217216
_ => {
218217
// We should never get here
@@ -252,17 +251,15 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
252251
err: Some(Error::SimulatedTransactionError(e.clone())),
253252
task,
254253
fee: 0,
255-
})
256-
.await
257-
.expect("send result");
254+
}).await?;
258255
}
259256
} else {
260257
// Handle failed task
261258
args.result_sender.send(CompletedTransactionTask {
262259
err: Some(Error::SimulatedTransactionError(e)),
263260
task: tasks[failed_task_idx].clone(),
264-
fee: 0,
265-
}).await.expect("send result");
261+
fee: 0,
262+
}).await?;
266263

267264
// Requeue remaining tasks
268265
let mut new_bundle = TaskBundle::new();
@@ -287,7 +284,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
287284
err: Some(Error::SimulatedTransactionError(e.clone())),
288285
task,
289286
fee: 0,
290-
}).await.expect("send result");
287+
}).await?;
291288
}
292289
}
293290
}
@@ -298,7 +295,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
298295
err: Some(Error::FeeTooHigh),
299296
task,
300297
fee: 0,
301-
}).await.expect("send result");
298+
}).await?;
302299
}
303300
} else {
304301
// Simulation successful, send to transaction sender
@@ -307,7 +304,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
307304
tasks,
308305
fee,
309306
re_sign_count: 0,
310-
}).await.expect("send to tx sender");
307+
}).await?;
311308
}
312309
}
313310
Err(e) => {
@@ -317,7 +314,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
317314
err: Some(Error::RawSimulatedTransactionError(e.to_string())),
318315
task: task.clone(),
319316
fee: 0,
320-
}).await.expect("send result");
317+
}).await?;
321318
}
322319
}
323320
}

0 commit comments

Comments
 (0)