Skip to content

Commit 1f1fa1f

Browse files
authored
Remove all unwraps, unsubscribe properly, make clock use subsystem handles (#39)
1 parent db590d4 commit 1f1fa1f

File tree

17 files changed

+181
-105
lines changed

17 files changed

+181
-105
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ spl-token = "4.0.0"
5858
itertools = "0.13"
5959
tokio-graceful-shutdown = "0.15"
6060
solana-transaction-utils = { version = "0.3.4", path = "./solana-transaction-utils" }
61-
tuktuk-sdk = { version = "0.3.2", path = "./tuktuk-sdk" }
61+
tuktuk-sdk = { version = "0.3.3", path = "./tuktuk-sdk" }
6262
tuktuk-program = { version = "0.3.1", path = "./tuktuk-program" }
6363
solana-account-decoder = { version = "2.2.3" }
6464
solana-clock = { version = "2.2.1" }
6565
solana-transaction-status = "2.2.3"
66+
bincode = { version = "1.3.3" }
6667

6768
[profile.release]
6869
debug = true

solana-transaction-utils/src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub enum Error {
2929
FeeTooHigh,
3030
#[error("Transaction has failed too many retries and gone stale")]
3131
StaleTransaction,
32+
#[error("Channel send error: {0}")]
33+
ChannelSendError(String),
34+
#[error("System time error: {0}")]
35+
SystemTimeError(String),
3236
}
3337

3438
impl From<solana_client::client_error::ClientError> for Error {
@@ -43,6 +47,12 @@ impl From<TpuSenderError> for Error {
4347
}
4448
}
4549

50+
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
51+
fn from(value: tokio::sync::mpsc::error::SendError<T>) -> Self {
52+
Self::ChannelSendError(value.to_string())
53+
}
54+
}
55+
4656
impl Error {
4757
pub fn signer<S: ToString>(str: S) -> Self {
4858
Self::SignerError(str.to_string())

solana-transaction-utils/src/queue.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ const MAX_PACKABLE_TX_SIZE: usize = 800;
7676

7777
pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
7878
args: TransactionQueueArgs<T>,
79-
) {
79+
) -> Result<(), Error> {
8080
let mut receiver = args.receiver;
8181

8282
// The currently staged bundle of tasks
@@ -203,16 +203,15 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
203203
task,
204204
fee: 0,
205205
})
206-
.await
207-
.expect("send result");
206+
.await?;
208207
}
209208
}
210209
Err(e) => {
211210
args.result_sender.send(CompletedTransactionTask {
212211
err: Some(e),
213212
task,
214213
fee: 0,
215-
}).await.expect("send result");
214+
}).await?;
216215
},
217216
_ => {
218217
// We should never get here
@@ -252,17 +251,15 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
252251
err: Some(Error::SimulatedTransactionError(e.clone())),
253252
task,
254253
fee: 0,
255-
})
256-
.await
257-
.expect("send result");
254+
}).await?;
258255
}
259256
} else {
260257
// Handle failed task
261258
args.result_sender.send(CompletedTransactionTask {
262259
err: Some(Error::SimulatedTransactionError(e)),
263260
task: tasks[failed_task_idx].clone(),
264-
fee: 0,
265-
}).await.expect("send result");
261+
fee: 0,
262+
}).await?;
266263

267264
// Requeue remaining tasks
268265
let mut new_bundle = TaskBundle::new();
@@ -287,7 +284,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
287284
err: Some(Error::SimulatedTransactionError(e.clone())),
288285
task,
289286
fee: 0,
290-
}).await.expect("send result");
287+
}).await?;
291288
}
292289
}
293290
}
@@ -298,7 +295,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
298295
err: Some(Error::FeeTooHigh),
299296
task,
300297
fee: 0,
301-
}).await.expect("send result");
298+
}).await?;
302299
}
303300
} else {
304301
// Simulation successful, send to transaction sender
@@ -307,7 +304,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
307304
tasks,
308305
fee,
309306
re_sign_count: 0,
310-
}).await.expect("send to tx sender");
307+
}).await?;
311308
}
312309
}
313310
Err(e) => {
@@ -317,7 +314,7 @@ pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
317314
err: Some(Error::RawSimulatedTransactionError(e.to_string())),
318315
task: task.clone(),
319316
fee: 0,
320-
}).await.expect("send result");
317+
}).await?;
321318
}
322319
}
323320
}

solana-transaction-utils/src/sender.rs

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
use crate::{
2-
error::Error,
3-
queue::{CompletedTransactionTask, TransactionTask},
4-
send_and_confirm_transactions_in_parallel::QuicTpuClient,
1+
use std::{
2+
sync::{
3+
atomic::{AtomicU64, Ordering},
4+
Arc,
5+
},
6+
time::Duration,
57
};
8+
69
use dashmap::DashMap;
710
use futures::{stream, StreamExt, TryStreamExt};
811
use itertools::Itertools;
@@ -18,20 +21,19 @@ use solana_sdk::{
1821
signer::Signer,
1922
transaction::VersionedTransaction,
2023
};
21-
use std::{
22-
sync::{
23-
atomic::{AtomicU64, Ordering},
24-
Arc,
25-
},
26-
time::Duration,
27-
};
2824
use tokio::sync::{
2925
mpsc::{Receiver, Sender},
3026
RwLock,
3127
};
3228
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
3329
use tracing::{error, info};
3430

31+
use crate::{
32+
error::Error,
33+
queue::{CompletedTransactionTask, TransactionTask},
34+
send_and_confirm_transactions_in_parallel::QuicTpuClient,
35+
};
36+
3537
const SEND_TIMEOUT: Duration = Duration::from_secs(5);
3638
const CONFIRMATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
3739
const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
@@ -188,7 +190,10 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
188190

189191
const TPU_SHUTDOWN_THRESHOLD: u64 = 60; // Shutdown TPU client after 60 seconds of inactivity
190192

191-
pub async fn handle_packed_tx(&self, packed_tx: PackedTransactionWithTasks<T>) {
193+
pub async fn handle_packed_tx(
194+
&self,
195+
packed_tx: PackedTransactionWithTasks<T>,
196+
) -> Result<(), Error> {
192197
if let Err(e) = self.process_packed_tx(&packed_tx).await {
193198
// Handle processing error by notifying all tasks
194199
stream::iter(packed_tx.tasks)
@@ -198,10 +203,10 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
198203
.send(CompletedTransactionTask { err, task, fee: 0 })
199204
.await
200205
})
201-
.await
202-
// TODO: This should really return an error to avoid pacnis on full channels
203-
.expect("send result");
206+
.await?
204207
}
208+
209+
Ok(())
205210
}
206211

207212
pub async fn process_packed_tx(
@@ -257,14 +262,13 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
257262
}
258263
}
259264

260-
async fn check_and_retry(&mut self) {
265+
async fn check_and_retry(&mut self) -> Result<(), Error> {
261266
let current_height = self.current_block_height.load(Ordering::Relaxed);
262267
let current_time = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
263268
{
264269
Ok(duration) => duration.as_secs(),
265270
Err(e) => {
266-
error!("System time error: {}", e);
267-
return;
271+
return Err(Error::SystemTimeError(e.to_string()));
268272
}
269273
};
270274

@@ -279,7 +283,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
279283
let signatures: Vec<_> = self.unconfirmed_txs.iter().map(|r| *r.key()).collect();
280284

281285
if signatures.is_empty() {
282-
return;
286+
return Ok(());
283287
}
284288

285289
for chunk in signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) {
@@ -302,8 +306,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
302306
.fee
303307
.div_ceil(num_packed_tasks as u64),
304308
})
305-
.await
306-
.expect("send result");
309+
.await?
307310
}
308311
}
309312
}
@@ -334,7 +337,7 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
334337
self.unconfirmed_txs.remove(&signature);
335338
// Create new packed transaction with incremented resign count
336339
let new_packed = data.packed_tx.with_incremented_re_sign_count();
337-
self.handle_packed_tx(new_packed).await;
340+
self.handle_packed_tx(new_packed).await?;
338341
}
339342
}
340343
}
@@ -343,10 +346,12 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
343346
self.unconfirmed_txs.remove(&signature);
344347
// Create new packed transaction with incremented resign count
345348
let new_packed = data.packed_tx.with_incremented_re_sign_count();
346-
self.handle_packed_tx(new_packed).await;
349+
self.handle_packed_tx(new_packed).await?;
347350
}
348351
}
349352
}
353+
354+
Ok(())
350355
}
351356

352357
pub async fn run(
@@ -374,10 +379,10 @@ impl<T: Send + Clone + Sync> TransactionSender<T> {
374379
return Ok(());
375380
}
376381
Some(packed_tx) = rx.recv() => {
377-
self.handle_packed_tx(packed_tx).await;
382+
self.handle_packed_tx(packed_tx).await?;
378383
}
379384
_ = check_interval.tick() => {
380-
self.check_and_retry().await;
385+
self.check_and_retry().await?;
381386
}
382387
}
383388
}

tuktuk-crank-turner/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tuktuk-crank-turner"
3-
version = "0.2.17"
3+
version = "0.2.18"
44
authors.workspace = true
55
edition.workspace = true
66
license.workspace = true
@@ -15,6 +15,7 @@ path = "src/main.rs"
1515
[dependencies]
1616
anchor-lang = { workspace = true }
1717
anchor-client = { workspace = true, features = ["async"] }
18+
bincode = { workspace = true }
1819
solana-sdk = { workspace = true }
1920
tokio = { workspace = true }
2021
solana-client = { workspace = true }

tuktuk-crank-turner/src/main.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ use tokio::{
2222
use tokio_graceful_shutdown::{SubsystemBuilder, Toplevel};
2323
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
2424
use transaction::TransactionSenderSubsystem;
25-
use tuktuk_sdk::{prelude::*, pubsub_client::PubsubClient, watcher::PubsubTracker};
25+
use tuktuk_sdk::{
26+
clock::{track, SYSVAR_CLOCK},
27+
pubsub_client::PubsubClient,
28+
watcher::PubsubTracker,
29+
};
2630
use warp::{reject::Rejection, reply::Reply, Filter};
2731
use watchers::{args::WatcherArgs, task_queues::get_and_watch_task_queues};
2832

@@ -130,7 +134,9 @@ impl Cli {
130134
commitment,
131135
));
132136

133-
let now_rx = clock::track(Arc::clone(&rpc_client), Arc::clone(&pubsub_tracker)).await?;
137+
let clock_acc = rpc_client.get_account(&SYSVAR_CLOCK).await?;
138+
let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
139+
let (now_tx, now_rx) = tokio::sync::watch::channel(clock.unix_timestamp as u64);
134140

135141
let (tasks, task_queue) = create_task_queue(TaskQueueArgs {
136142
channel_capacity: 100,
@@ -222,6 +228,7 @@ impl Cli {
222228
}));
223229
// Poll RPC for changes to pubsub keys every 30 seconds
224230
top_level.start(SubsystemBuilder::new("pubsub-tracker", {
231+
let pubsub_tracker = pubsub_tracker.clone();
225232
move |handle| async move {
226233
let mut interval = interval(pubsub_repoll);
227234
loop {
@@ -240,12 +247,17 @@ impl Cli {
240247
anyhow::Ok(())
241248
}
242249
}));
250+
top_level.start(SubsystemBuilder::new("clock-tracker", {
251+
let now_tx = now_tx.clone();
252+
let pubsub_tracker = pubsub_tracker.clone();
253+
move |handle| track(now_tx, pubsub_tracker, handle)
254+
}));
243255
top_level.start(SubsystemBuilder::new("pubsub-client", {
244256
move |handle| async move {
245257
tokio::select! {
246258
_ = handle.on_shutdown_requested() => {
247259
tracing::info!("Shutdown requested, exiting pubsub-client");
248-
shutdown_sender.send(()).unwrap();
260+
shutdown_sender.send(()).map_err(|_| anyhow::anyhow!("Failed to send shutdown signal"))?;
249261
anyhow::Ok(())
250262
},
251263
res = pubsub_handle => {

0 commit comments

Comments
 (0)