Skip to content

Commit 446fbb7

Browse files
ChewingGlassChiefWoods
authored andcommitted
Add ability to watch tasks by prefixes (#73)
* Add ability to watch tasks by prefixes * Bump cli
1 parent 8395543 commit 446fbb7

File tree

3 files changed

+251
-4
lines changed

3 files changed

+251
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tuktuk-cli/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tuktuk-cli"
3-
version = "0.2.12"
3+
version = "0.2.13"
44
description = "A cli for tuktuk"
55
homepage.workspace = true
66
repository.workspace = true
@@ -20,6 +20,7 @@ thiserror = "1"
2020
anchor-lang = { workspace = true }
2121
solana-quic-client = { workspace = true }
2222
solana-client = { workspace = true }
23+
solana-transaction-status-client-types = "2.2.1"
2324
anyhow = { workspace = true }
2425
clap = { workspace = true }
2526
tokio = { workspace = true }

tuktuk-cli/src/cmd/task.rs

Lines changed: 247 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashSet, sync::Arc};
1+
use std::{collections::HashSet, sync::Arc, time::Duration};
22

33
use anyhow::anyhow;
44
use chrono::{Local, TimeZone};
@@ -7,14 +7,18 @@ use clock::SYSVAR_CLOCK;
77
use futures::stream::StreamExt;
88
use itertools::Itertools;
99
use serde::Serialize;
10-
use solana_client::rpc_config::RpcSimulateTransactionConfig;
10+
use solana_client::{
11+
rpc_client::GetConfirmedSignaturesForAddress2Config,
12+
rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
13+
};
1114
use solana_sdk::{
1215
commitment_config::CommitmentLevel,
1316
message::{v0, VersionedMessage},
1417
pubkey::Pubkey,
1518
signer::Signer,
1619
transaction::VersionedTransaction,
1720
};
21+
use solana_transaction_status_client_types::UiTransactionEncoding;
1822
use solana_transaction_utils::{
1923
pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
2024
};
@@ -101,6 +105,15 @@ pub enum Cmd {
101105
)]
102106
failed: bool,
103107
},
108+
Watch {
109+
#[command(flatten)]
110+
task_queue: TaskQueueArg,
111+
#[arg(
112+
long,
113+
help = "Description prefix to watch for (can be specified multiple times)"
114+
)]
115+
description: Vec<String>,
116+
},
104117
}
105118

106119
async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
@@ -173,6 +186,108 @@ struct SimulationResult {
173186
pub compute_units: Option<u64>,
174187
}
175188

189+
async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
190+
println!(
191+
"Task {} completed! Getting transaction signature...",
192+
task_id
193+
);
194+
195+
// Get the last 10 transaction signatures for this task
196+
let signatures = client
197+
.rpc_client
198+
.get_signatures_for_address_with_config(
199+
&task_key,
200+
GetConfirmedSignaturesForAddress2Config {
201+
limit: Some(10),
202+
..Default::default()
203+
},
204+
)
205+
.await?;
206+
207+
if signatures.is_empty() {
208+
println!("No transaction signature found for task {}", task_id);
209+
return Ok(());
210+
}
211+
212+
// Limit to last 10 transactions
213+
let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
214+
.iter()
215+
.take(10)
216+
.map(|sig_info| sig_info.signature.parse().unwrap())
217+
.collect();
218+
219+
// Get statuses for all signatures at once
220+
let signature_statuses = client
221+
.rpc_client
222+
.get_signature_statuses_with_history(&recent_signatures)
223+
.await?;
224+
225+
// Find the first successful transaction
226+
let mut successful_signature = None;
227+
for (i, status_result) in signature_statuses.value.iter().enumerate() {
228+
match status_result {
229+
Some(status) => {
230+
// Check if the transaction was successful (no error)
231+
if status.err.is_none() {
232+
successful_signature = Some(recent_signatures[i].to_string());
233+
break;
234+
}
235+
}
236+
None => {
237+
// Transaction not found, continue to next
238+
continue;
239+
}
240+
}
241+
}
242+
243+
if let Some(signature) = successful_signature {
244+
println!("Successful transaction signature: {}", signature);
245+
246+
// Get the full transaction to extract logs
247+
match client
248+
.rpc_client
249+
.get_transaction_with_config(
250+
&signature.parse()?,
251+
RpcTransactionConfig {
252+
encoding: Some(UiTransactionEncoding::Json),
253+
max_supported_transaction_version: Some(0),
254+
..Default::default()
255+
},
256+
)
257+
.await
258+
{
259+
Ok(tx) => {
260+
if let Some(meta) = tx.transaction.meta {
261+
match meta.log_messages {
262+
solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
263+
println!("Transaction logs:");
264+
for log in logs {
265+
println!(" {}", log);
266+
}
267+
}
268+
_ => {
269+
println!("No logs found in transaction");
270+
}
271+
}
272+
} else {
273+
println!("No transaction metadata found");
274+
}
275+
}
276+
Err(e) => {
277+
println!("Error getting transaction details: {}", e);
278+
}
279+
}
280+
} else {
281+
println!(
282+
"No successful transaction found for task {} (all {} recent transactions failed)",
283+
task_id,
284+
recent_signatures.len()
285+
);
286+
}
287+
288+
Ok(())
289+
}
290+
176291
impl TaskCmd {
177292
pub async fn run(&self, opts: Opts) -> Result {
178293
match &self.cmd {
@@ -593,6 +708,136 @@ impl TaskCmd {
593708
println!("New task key: {new_task_key}");
594709
}
595710
}
711+
Cmd::Watch {
712+
task_queue,
713+
description,
714+
} => {
715+
if description.is_empty() {
716+
return Err(anyhow!(
717+
"At least one description must be provided for watch command"
718+
));
719+
}
720+
721+
let client = opts.client().await?;
722+
let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
723+
let task_queue: TaskQueueV0 = client
724+
.as_ref()
725+
.anchor_account(&task_queue_pubkey)
726+
.await?
727+
.ok_or_else(|| anyhow!("Task queue account not found"))?;
728+
729+
let trimmed_descriptions: Vec<String> = description
730+
.iter()
731+
.map(|prefix| {
732+
if prefix.len() > 40 {
733+
prefix.chars().take(40).collect()
734+
} else {
735+
prefix.clone()
736+
}
737+
})
738+
.collect();
739+
740+
// First, get and display all existing tasks that match the description prefixes
741+
let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
742+
let existing_tasks = client
743+
.as_ref()
744+
.anchor_accounts::<TaskV0>(&task_keys)
745+
.await?;
746+
747+
let mut watched_tasks = std::collections::HashMap::new();
748+
749+
// Filter and start watching existing tasks that match our prefixes
750+
for (task_key, maybe_task) in existing_tasks {
751+
if let Some(task) = maybe_task {
752+
// Check if task description matches any of the prefixes
753+
let matches = trimmed_descriptions
754+
.iter()
755+
.any(|prefix| task.description.starts_with(prefix));
756+
if matches {
757+
println!(
758+
"Found existing matching task: {} (ID: {}, KEY: {})",
759+
task.description, task.id, task_key
760+
);
761+
watched_tasks.insert(task_key, task.id);
762+
}
763+
}
764+
}
765+
766+
// Set up pubsub tracker for watching
767+
let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
768+
tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
769+
.await?;
770+
let pubsub_client = Arc::new(pubsub_client_raw);
771+
let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
772+
client.rpc_client.clone(),
773+
pubsub_client,
774+
Duration::from_secs(30),
775+
solana_sdk::commitment_config::CommitmentConfig::confirmed(),
776+
));
777+
778+
// Start watching for task updates
779+
let (stream, _unsub) = tuktuk::task::on_new(
780+
client.as_ref(),
781+
&pubsub_tracker,
782+
&task_queue_pubkey,
783+
&task_queue,
784+
)
785+
.await?;
786+
println!(
787+
"Watching for tasks with description prefixes: {:?}",
788+
trimmed_descriptions
789+
);
790+
println!("Press Ctrl+C to stop watching...");
791+
792+
let mut stream = Box::pin(stream);
793+
794+
while let Some(update) = stream.next().await {
795+
match update {
796+
Ok(task_update) => {
797+
// Check for new tasks that match any of our descriptions
798+
for (task_key, maybe_task) in task_update.tasks {
799+
if let Some(task) = maybe_task {
800+
// Check if task description matches any of the prefixes
801+
let matches = trimmed_descriptions
802+
.iter()
803+
.any(|prefix| task.description.starts_with(prefix));
804+
if matches {
805+
println!(
806+
"Found matching task: {} (ID: {}, KEY: {})",
807+
task.description, task.id, task_key
808+
);
809+
watched_tasks.insert(task_key, task.id);
810+
}
811+
} else {
812+
// Task was removed (completed)
813+
if let Some(task_id) = watched_tasks.remove(&task_key) {
814+
if let Err(e) =
815+
handle_task_completion(&client, task_key, task_id).await
816+
{
817+
eprintln!("Error handling task completion: {}", e);
818+
}
819+
}
820+
}
821+
}
822+
823+
// Check for removed tasks
824+
for removed_task_key in task_update.removed {
825+
if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
826+
if let Err(e) =
827+
handle_task_completion(&client, removed_task_key, task_id)
828+
.await
829+
{
830+
eprintln!("Error handling task completion: {}", e);
831+
}
832+
}
833+
}
834+
}
835+
Err(e) => {
836+
eprintln!("Error receiving task update: {}", e);
837+
}
838+
}
839+
}
840+
}
596841
}
597842
Ok(())
598843
}

0 commit comments

Comments
 (0)