Skip to content

Commit 9accae4

Browse files
authored
Parallelize simulations for better cli perf (#66)
* Minor cli improvements * Bump version * Uneeded logging
1 parent 37c2504 commit 9accae4

File tree

1 file changed

+41
-3
lines changed

1 file changed

+41
-3
lines changed

tuktuk-cli/src/cmd/task.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{collections::HashSet, sync::Arc, time::Duration};
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
sync::Arc,
4+
time::Duration,
5+
};
26

37
use anyhow::anyhow;
48
use chrono::{Local, TimeZone};
@@ -447,12 +451,19 @@ impl TaskCmd {
447451
.as_ref()
448452
.anchor_accounts::<TaskV0>(&task_keys)
449453
.await?;
454+
let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
455+
let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
456+
let now = clock.unix_timestamp;
450457
tasks
451458
.into_iter()
452459
.filter(|(_, task)| {
453460
if let Some(task) = task {
461+
if *failed && !task.trigger.is_active(now) {
462+
return false;
463+
}
454464
return task.description.starts_with(description);
455465
}
466+
456467
false
457468
})
458469
.map(|(p, task)| (p, task.unwrap().clone()))
@@ -465,10 +476,37 @@ impl TaskCmd {
465476
let mut to_close = Vec::new();
466477

467478
// If failed flag is set, simulate each task first
479+
let client = Arc::new(client);
480+
let simulation_tasks = tasks
481+
.iter()
482+
.filter(|(_, task)| seen_ids.insert(task.id))
483+
.map(|(pubkey, _)| *pubkey)
484+
.collect::<Vec<_>>();
485+
486+
let mut simulation_results = HashMap::new();
487+
if *failed {
488+
// Run simulations in parallel with a limit of 10 concurrent tasks
489+
let results = futures::stream::iter(simulation_tasks)
490+
.map(|pubkey| {
491+
let client = client.clone();
492+
async move {
493+
let result = simulate_task(&client, pubkey).await;
494+
(pubkey, result)
495+
}
496+
})
497+
.buffer_unordered(10)
498+
.collect::<Vec<_>>()
499+
.await;
500+
501+
// Collect results into a HashMap for O(1) lookups
502+
simulation_results = results.into_iter().collect();
503+
}
504+
505+
// Filter tasks based on simulation results if failed flag is set
468506
for (pubkey, task) in &tasks {
469-
if seen_ids.insert(task.id) {
507+
if seen_ids.contains(&task.id) {
470508
if *failed {
471-
if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
509+
if let Some(Ok(Some(sim_result))) = simulation_results.get(pubkey) {
472510
if sim_result.error.is_some() {
473511
to_close.push(task.clone());
474512
}

0 commit comments

Comments
 (0)