Skip to content

Commit 12e3399

Browse files
authored
Merge pull request #43 from helium/refactor/mutexes
Refactor to remove mutexes
2 parents c0b6b73 + 6c323c3 commit 12e3399

File tree

11 files changed

+502
-124
lines changed

11 files changed

+502
-124
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use solana_client::nonblocking::rpc_client::RpcClient;
4+
use solana_sdk::{
5+
address_lookup_table::{state::AddressLookupTable, AddressLookupTableAccount},
6+
pubkey::Pubkey,
7+
};
8+
use tokio_graceful_shutdown::SubsystemHandle;
9+
use tracing::info;
10+
11+
use crate::{cache::LookupTableRequest, sync};
12+
13+
pub type LookupTablesSender = sync::MessageSender<LookupTableRequest>;
14+
pub type LookupTablesReceiver = sync::MessageReceiver<LookupTableRequest>;
15+
16+
impl LookupTablesSender {
17+
pub async fn get_lookup_tables(
18+
&self,
19+
lookup_table_keys: Vec<Pubkey>,
20+
) -> Result<Vec<AddressLookupTableAccount>, sync::Error> {
21+
self.request(|resp| LookupTableRequest::Get {
22+
lookup_table_keys,
23+
resp,
24+
})
25+
.await
26+
}
27+
}
28+
29+
pub fn lookup_tables_channel() -> (LookupTablesSender, LookupTablesReceiver) {
30+
let (tx, rx) = sync::message_channel(100);
31+
(tx, rx)
32+
}
33+
34+
pub struct LookupTablesCache {
35+
rpc_client: Arc<RpcClient>,
36+
cache: HashMap<Pubkey, AddressLookupTableAccount>,
37+
receiver: LookupTablesReceiver,
38+
}
39+
40+
impl LookupTablesCache {
41+
pub fn new(rpc_client: Arc<RpcClient>, receiver: LookupTablesReceiver) -> Self {
42+
Self {
43+
rpc_client,
44+
cache: HashMap::new(),
45+
receiver,
46+
}
47+
}
48+
49+
pub async fn run(mut self, handle: SubsystemHandle) -> anyhow::Result<()> {
50+
info!("starting lookup tables cache");
51+
loop {
52+
tokio::select! {
53+
_ = handle.on_shutdown_requested() => {
54+
info!("shutting down lookup tables cache");
55+
break;
56+
}
57+
Some(req) = self.receiver.recv() => {
58+
match req {
59+
LookupTableRequest::Get {
60+
lookup_table_keys,
61+
resp,
62+
} => {
63+
let mut result = Vec::with_capacity(lookup_table_keys.len());
64+
let mut missing_keys = Vec::new();
65+
66+
// First check cache
67+
for key in &lookup_table_keys {
68+
if let Some(table) = self.cache.get(key) {
69+
result.push(table.clone());
70+
} else {
71+
missing_keys.push(*key);
72+
}
73+
}
74+
75+
// Fetch missing tables
76+
if !missing_keys.is_empty() {
77+
for key in missing_keys {
78+
if let Ok(account) = self.rpc_client.get_account(&key).await {
79+
if let Ok(lut) = AddressLookupTable::deserialize(&account.data) {
80+
let table = AddressLookupTableAccount {
81+
key,
82+
addresses: lut.addresses.to_vec(),
83+
};
84+
self.cache.insert(key, table.clone());
85+
result.push(table);
86+
}
87+
}
88+
}
89+
}
90+
91+
// Sort result to match input order
92+
result.sort_by_key(|table| {
93+
lookup_table_keys.iter()
94+
.position(|key| key == &table.key)
95+
.unwrap_or(usize::MAX)
96+
});
97+
98+
resp.send(result);
99+
}
100+
}
101+
}
102+
else => break,
103+
}
104+
}
105+
Ok(())
106+
}
107+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::collections::HashSet;
2+
3+
use solana_sdk::{address_lookup_table::AddressLookupTableAccount, pubkey::Pubkey};
4+
use tuktuk_program::TaskQueueV0;
5+
6+
use crate::sync::ResponseSender;
7+
8+
// Request types for state management
9+
#[derive(Debug)]
10+
pub enum TaskStateRequest {
11+
AddInProgressTasks {
12+
pubkey: Pubkey,
13+
task_ids: HashSet<u16>,
14+
},
15+
RemoveInProgressTasks {
16+
pubkey: Pubkey,
17+
task_ids: HashSet<u16>,
18+
},
19+
GetInProgressTasks {
20+
pubkey: Pubkey,
21+
resp: ResponseSender<HashSet<u16>>,
22+
},
23+
}
24+
25+
#[derive(Debug)]
26+
pub enum LookupTableRequest {
27+
Get {
28+
lookup_table_keys: Vec<Pubkey>,
29+
resp: ResponseSender<Vec<AddressLookupTableAccount>>,
30+
},
31+
}
32+
33+
#[derive(Debug)]
34+
pub enum TaskQueueRequest {
35+
Get {
36+
pubkey: Pubkey,
37+
resp: ResponseSender<Option<TaskQueueV0>>,
38+
},
39+
Update {
40+
pubkey: Pubkey,
41+
queue: Box<TaskQueueV0>,
42+
},
43+
}
44+
45+
mod lookup_tables;
46+
mod task_queues;
47+
mod task_state;
48+
49+
pub use lookup_tables::{lookup_tables_channel, LookupTablesCache, LookupTablesSender};
50+
pub use task_queues::{task_queues_channel, TaskQueueCache, TaskQueuesSender};
51+
pub use task_state::{task_state_channel, TaskStateCache, TaskStateSender};
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use std::collections::HashMap;
2+
3+
use solana_sdk::pubkey::Pubkey;
4+
use tokio_graceful_shutdown::SubsystemHandle;
5+
use tracing::info;
6+
use tuktuk_program::TaskQueueV0;
7+
8+
use crate::{cache::TaskQueueRequest, sync};
9+
10+
pub type TaskQueuesSender = sync::MessageSender<TaskQueueRequest>;
11+
pub type TaskQueuesReceiver = sync::MessageReceiver<TaskQueueRequest>;
12+
13+
impl TaskQueuesSender {
14+
pub async fn get_task_queue(&self, pubkey: Pubkey) -> Result<Option<TaskQueueV0>, sync::Error> {
15+
self.request(|resp| TaskQueueRequest::Get { pubkey, resp })
16+
.await
17+
}
18+
19+
pub async fn update_task_queue(&self, pubkey: Pubkey, queue: TaskQueueV0) {
20+
self.send(TaskQueueRequest::Update {
21+
pubkey,
22+
queue: Box::new(queue),
23+
})
24+
.await
25+
}
26+
}
27+
28+
pub fn task_queues_channel() -> (TaskQueuesSender, TaskQueuesReceiver) {
29+
let (tx, rx) = sync::message_channel(100);
30+
(tx, rx)
31+
}
32+
33+
pub struct TaskQueueCache {
34+
cache: HashMap<Pubkey, TaskQueueV0>,
35+
receiver: TaskQueuesReceiver,
36+
}
37+
38+
impl TaskQueueCache {
39+
pub fn new(receiver: TaskQueuesReceiver) -> Self {
40+
Self {
41+
cache: HashMap::new(),
42+
receiver,
43+
}
44+
}
45+
46+
pub async fn run(mut self, handle: SubsystemHandle) -> anyhow::Result<()> {
47+
info!("starting task queue cache");
48+
loop {
49+
tokio::select! {
50+
_ = handle.on_shutdown_requested() => {
51+
info!("shutting down task queue cache");
52+
break;
53+
}
54+
Some(req) = self.receiver.recv() => {
55+
match req {
56+
TaskQueueRequest::Get { pubkey, resp } => {
57+
resp.send(self.cache.get(&pubkey).cloned());
58+
}
59+
TaskQueueRequest::Update { pubkey, queue } => {
60+
self.cache.insert(pubkey, *queue);
61+
}
62+
}
63+
}
64+
else => break,
65+
}
66+
}
67+
Ok(())
68+
}
69+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use std::collections::{HashMap, HashSet};
2+
3+
use solana_sdk::pubkey::Pubkey;
4+
use tokio_graceful_shutdown::SubsystemHandle;
5+
use tracing::info;
6+
7+
use crate::{cache::TaskStateRequest, sync};
8+
9+
pub type TaskStateSender = sync::MessageSender<TaskStateRequest>;
10+
pub type TaskStateReceiver = sync::MessageReceiver<TaskStateRequest>;
11+
12+
impl TaskStateSender {
13+
pub async fn get_in_progress_tasks(&self, pubkey: Pubkey) -> Result<HashSet<u16>, sync::Error> {
14+
self.request(|resp| TaskStateRequest::GetInProgressTasks { pubkey, resp })
15+
.await
16+
}
17+
18+
pub async fn add_in_progress_tasks(&self, pubkey: Pubkey, task_ids: HashSet<u16>) {
19+
self.send(TaskStateRequest::AddInProgressTasks { pubkey, task_ids })
20+
.await
21+
}
22+
23+
pub async fn remove_in_progress_tasks(&self, pubkey: Pubkey, task_ids: HashSet<u16>) {
24+
self.send(TaskStateRequest::RemoveInProgressTasks { pubkey, task_ids })
25+
.await
26+
}
27+
}
28+
29+
pub fn task_state_channel() -> (TaskStateSender, TaskStateReceiver) {
30+
let (tx, rx) = sync::message_channel(100);
31+
(tx, rx)
32+
}
33+
34+
pub struct TaskStateCache {
35+
cache: HashMap<Pubkey, HashSet<u16>>,
36+
receiver: TaskStateReceiver,
37+
}
38+
39+
impl TaskStateCache {
40+
pub fn new(receiver: TaskStateReceiver) -> Self {
41+
Self {
42+
cache: HashMap::new(),
43+
receiver,
44+
}
45+
}
46+
47+
pub async fn run(mut self, handle: SubsystemHandle) -> anyhow::Result<()> {
48+
info!("starting task state cache");
49+
loop {
50+
tokio::select! {
51+
_ = handle.on_shutdown_requested() => {
52+
info!("shutting down task state cache");
53+
break;
54+
}
55+
Some(req) = self.receiver.recv() => {
56+
match req {
57+
TaskStateRequest::AddInProgressTasks { pubkey, task_ids } => {
58+
self.cache.entry(pubkey).or_default().extend(task_ids);
59+
}
60+
TaskStateRequest::RemoveInProgressTasks { pubkey, task_ids } => {
61+
if let Some(tasks) = self.cache.get_mut(&pubkey) {
62+
for task_id in task_ids {
63+
tasks.remove(&task_id);
64+
}
65+
if tasks.is_empty() {
66+
self.cache.remove(&pubkey);
67+
}
68+
}
69+
}
70+
TaskStateRequest::GetInProgressTasks { pubkey, resp } => {
71+
resp.send(self.cache.get(&pubkey).cloned().unwrap_or_default());
72+
}
73+
}
74+
}
75+
else => break,
76+
}
77+
}
78+
Ok(())
79+
}
80+
}

0 commit comments

Comments
 (0)