Skip to content

Commit 715d66c

Browse files
committed
Replace Mem first with DB first approac
1 parent 71a5333 commit 715d66c

20 files changed

Lines changed: 918 additions & 547 deletions

File tree

src/bin/agent/core/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ where
3434
node_type: NodeType,
3535
) -> Result<()> {
3636
let node = {
37-
let mem = self.memory.lock().await;
37+
let mem = self.memory.read().await;
3838
mem.nodes
3939
.get_self()
4040
.expect("No node available to register")

src/bin/agent/core/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ where
2727
T: NodeStorageOp + Sync + Send + Clone + 'static,
2828
{
2929
let mut metrics: Vec<MetricType> = Vec::new();
30-
let mem = self.memory.lock().await;
30+
let mem = self.memory.read().await;
3131
let connections = mem.connections.clone();
3232

3333
let node = mem.nodes.get_self();
@@ -57,7 +57,7 @@ where
5757
}
5858

5959
async fn collect_hb_metrics<M>(&self) -> MetricType {
60-
let mem = self.memory.lock().await;
60+
let mem = self.memory.read().await;
6161
let node = mem.nodes.get_self();
6262
if let Some(node) = node {
6363
heartbeat_metrics(&node.env, &node.uuid, &node.hostname)

src/bin/agent/core/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22
use tokio::sync::Mutex;
3+
use tokio::sync::RwLock;
34

45
use pony::memory::node::Node;
56
use pony::wireguard_op::WgApi;
@@ -24,7 +25,7 @@ where
2425
N: NodeStorageOp + Send + Sync + Clone + 'static,
2526
C: ConnectionBaseOp + Send + Sync + Clone + 'static,
2627
{
27-
pub memory: Arc<Mutex<MemoryCache<N, C>>>,
28+
pub memory: Arc<RwLock<MemoryCache<N, C>>>,
2829
pub subscriber: ZmqSubscriber,
2930
pub xray_stats_client: Option<Arc<Mutex<StatsClient>>>,
3031
pub xray_handler_client: Option<Arc<Mutex<HandlerClient>>>,
@@ -37,7 +38,7 @@ where
3738
C: ConnectionBaseOp + Send + Sync + Clone + 'static,
3839
{
3940
pub fn new(
40-
memory: Arc<Mutex<MemoryCache<N, C>>>,
41+
memory: Arc<RwLock<MemoryCache<N, C>>>,
4142
subscriber: ZmqSubscriber,
4243
xray_stats_client: Option<Arc<Mutex<StatsClient>>>,
4344
xray_handler_client: Option<Arc<Mutex<HandlerClient>>>,

src/bin/agent/core/service.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use tokio::signal;
77
use tokio::sync::broadcast;
88
use tokio::sync::Mutex;
9+
use tokio::sync::RwLock;
910
use tokio::task::JoinHandle;
1011
use tokio::time::sleep;
1112
use tokio::time::Duration;
@@ -97,7 +98,8 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
9798
let node_config = NodeConfig::from_raw(settings.node.clone());
9899
let node = Node::new(node_config?, xray_config, wg_config.clone());
99100

100-
let memory: Arc<Mutex<AgentState>> = Arc::new(Mutex::new(MemoryCache::with_node(node.clone())));
101+
let memory: Arc<RwLock<AgentState>> =
102+
Arc::new(RwLock::new(MemoryCache::with_node(node.clone())));
101103

102104
let agent = Arc::new(Agent::new(
103105
memory.clone(),
@@ -118,16 +120,16 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
118120
tokio::select! {
119121
_ = async {
120122
let _ = {
121-
let mut memory = agent.memory.lock().await;
123+
let mut mem = agent.memory.write().await;
122124
for (tag, _) in node.inbounds {
123125
let proto = Proto::new_xray(&tag);
124126
let conn = Connection::new(proto);
125-
let _ = memory.connections.insert(conn_id, conn.into());
127+
let _ = mem.connections.insert(conn_id, conn.into());
126128
let _ = xray_handler_client.create(&conn_id, tag, None).await;
127129
}
128130
};
129131

130-
let mem = agent.memory.lock().await;
132+
let mem = agent.memory.read().await;
131133
if let Some(node) = mem.nodes.get_self() {
132134
println!(
133135
r#"
@@ -178,7 +180,7 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
178180
async move {
179181
tokio::select! {
180182
_ = async {
181-
let mut mem = agent.memory.lock().await;
183+
let mut mem = agent.memory.write().await;
182184
for (tag, _inbound) in &node.inbounds {
183185
if tag.is_wireguard() {
184186
let conn_id = uuid::Uuid::new_v4();

src/bin/agent/core/stats.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ where
262262
{
263263
async fn collect_conn_stats(self: Arc<Self>, conn_id: uuid::Uuid) -> PonyResult<()> {
264264
let conn_stat = self.conn_stats(Prefix::ConnPrefix(conn_id)).await?;
265-
let mut mem = self.memory.lock().await;
265+
let mut mem = self.memory.write().await;
266266
let _ = mem
267267
.connections
268268
.update_downlink(&conn_id, conn_stat.downlink);
@@ -280,7 +280,7 @@ where
280280
let inbound_stat = self
281281
.inbound_stats(Prefix::InboundPrefix(tag.clone()))
282282
.await?;
283-
let mut mem = self.memory.lock().await;
283+
let mut mem = self.memory.write().await;
284284
let _ = mem
285285
.nodes
286286
.update_node_downlink(&tag, inbound_stat.downlink, &env, &node_uuid);
@@ -298,7 +298,7 @@ where
298298
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
299299

300300
let conn_ids = {
301-
let mem = self.memory.lock().await;
301+
let mem = self.memory.write().await;
302302
mem.connections.keys().cloned().collect::<Vec<_>>()
303303
};
304304

@@ -312,7 +312,7 @@ where
312312
}
313313

314314
if let Some(node) = {
315-
let mem = self.memory.lock().await;
315+
let mem = self.memory.read().await;
316316
mem.nodes.get_self()
317317
} {
318318
let node_tags = node.inbounds.keys().cloned().collect::<Vec<_>>();
@@ -353,7 +353,7 @@ where
353353
};
354354

355355
let conns = {
356-
let mem = self.memory.lock().await;
356+
let mem = self.memory.read().await;
357357
mem.connections
358358
.iter()
359359
.filter_map(|(id, conn)| {
@@ -373,7 +373,7 @@ where
373373
if let Some(wg) = conn.get_wireguard() {
374374
match wg_client.peer_stats(&wg.keys.pubkey.clone()) {
375375
Ok((uplink, downlink)) => {
376-
let mut mem = agent.memory.lock().await;
376+
let mut mem = agent.memory.write().await;
377377
if let Some(existing) = mem.connections.get_mut(&conn_id) {
378378
existing.set_uplink(uplink);
379379
existing.set_downlink(downlink);

src/bin/agent/core/tasks.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ where
9494
.ok_or_else(|| PonyError::Custom("Missing WireGuard keys".into()))?;
9595

9696
let node_id = {
97-
let mem = self.memory.lock().await;
97+
let mem = self.memory.read().await;
9898
let node = mem.nodes.get_self();
9999
node.map(|n| n.uuid).ok_or_else(|| {
100100
PonyError::Custom("Current node UUID not found".to_string())
@@ -105,7 +105,7 @@ where
105105
let conn = Connection::new(proto);
106106

107107
{
108-
let mut mem = self.memory.lock().await;
108+
let mut mem = self.memory.write().await;
109109
mem.connections
110110
.add(&conn_id, conn.clone().into())
111111
.map_err(|err| {
@@ -153,7 +153,7 @@ where
153153
))
154154
})?;
155155

156-
let mut mem = self.memory.lock().await;
156+
let mut mem = self.memory.write().await;
157157
mem.connections.add(&conn_id, conn.into()).map_err(|err| {
158158
PonyError::Custom(format!("Failed to add conn {}: {}", conn_id, err))
159159
})?;
@@ -179,7 +179,7 @@ where
179179
))
180180
})?;
181181

182-
let mut mem = self.memory.lock().await;
182+
let mut mem = self.memory.write().await;
183183
mem.connections.add(&conn_id, conn.into()).map_err(|err| {
184184
PonyError::Custom(format!(
185185
"Failed to add conn {}: {}",
@@ -212,7 +212,7 @@ where
212212
PonyError::Custom(format!("Failed to delete WireGuard peer: {}", e))
213213
})?;
214214

215-
let mut mem = self.memory.lock().await;
215+
let mut mem = self.memory.write().await;
216216
let _ = mem.connections.remove(&msg.conn_id);
217217

218218
Ok(())
@@ -232,7 +232,7 @@ where
232232
))
233233
})?;
234234

235-
let mut mem = self.memory.lock().await;
235+
let mut mem = self.memory.write().await;
236236
let _ = mem.connections.remove(&msg.conn_id);
237237

238238
Ok(())

src/bin/api/core/postgres/mod.rs

Lines changed: 53 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
1+
use pony::ConnectionStorageApiOp;
2+
use std::collections::HashMap;
13
use std::sync::Arc;
2-
use tokio::sync::mpsc;
34
use tokio::sync::Mutex;
45
use tokio_postgres::Client as PgClient;
56
use tokio_postgres::NoTls;
67

8+
use pony::config::settings::PostgresConfig;
9+
use pony::memory::node::Node;
10+
use pony::Connection;
11+
use pony::MemoryCache;
12+
use pony::NodeStorageOp;
13+
use pony::OperationStatus;
14+
use pony::Result;
15+
716
use crate::core::postgres::connection::ConnRow;
817
use crate::core::postgres::connection::PgConn;
918
use crate::core::postgres::node::PgNode;
1019
use crate::core::postgres::user::PgUser;
1120
use crate::core::postgres::user::UserRow;
12-
use crate::core::sync::SyncTask;
13-
use crate::Result;
14-
use pony::config::settings::PostgresConfig;
1521

1622
pub mod connection;
1723
pub mod node;
@@ -55,66 +61,50 @@ impl PgContext {
5561
}
5662
}
5763

58-
pub async fn run_shadow_sync(mut rx: mpsc::Receiver<SyncTask>, db: PgContext) {
59-
while let Some(task) = rx.recv().await {
60-
match task {
61-
SyncTask::InsertUser { user_id, user } => {
62-
let user_row: UserRow = (user_id, user).into();
63-
if let Err(err) = db.user().insert(user_row).await {
64-
log::error!("Failed to sync InsertUser: {err}");
65-
}
66-
}
67-
SyncTask::UpdateUser { user_id, user } => {
68-
if let Err(err) = db.user().update(&user_id, user).await {
69-
log::error!("Failed to sync UpdateUser: {err}");
70-
}
71-
}
72-
SyncTask::DeleteUser { user_id } => {
73-
if let Err(err) = db.user().delete(&user_id).await {
74-
log::error!("Failed to sync DeleteUser: {err}");
75-
}
76-
}
77-
SyncTask::InsertNode { node_id, node } => {
78-
if let Err(err) = db.node().insert(node_id, node).await {
79-
log::error!("Failed to sync InsertNode: {err}");
80-
}
81-
}
82-
SyncTask::UpdateNodeStatus {
83-
node_id,
84-
env,
85-
status,
86-
} => {
87-
if let Err(err) = db.node().update_status(&node_id, &env, status).await {
88-
log::error!("Failed to sync UpdateNodeStatus: {err}");
89-
}
90-
}
91-
SyncTask::InsertConn { conn_id, conn } => {
92-
let conn_row: ConnRow = (conn_id, conn).into();
93-
if let Err(err) = db.conn().insert(conn_row).await {
94-
log::error!("Failed to sync InsertConn: {err}");
95-
}
96-
}
97-
SyncTask::UpdateConn { conn_id, conn } => {
98-
let conn_row: ConnRow = (conn_id, conn).into();
99-
if let Err(err) = db.conn().update(conn_row).await {
100-
log::error!("Failed to sync UpdateConn: {err}");
101-
}
102-
}
103-
SyncTask::DeleteConn { conn_id } => {
104-
if let Err(err) = db.conn().delete(&conn_id).await {
105-
log::error!("Failed to sync DeleteConn: {err}");
106-
}
107-
}
108-
SyncTask::UpdateConnStat { conn_id, stat } => {
109-
if let Err(err) = db.conn().update_stat(&conn_id, stat).await {
110-
log::error!("Failed to sync UpdateConnStat: {err}");
111-
}
112-
}
113-
SyncTask::UpdateConnStatus { conn_id, status } => {
114-
if let Err(err) = db.conn().update_status(&conn_id, status).await {
115-
log::error!("Failed to sync UpdateConnStatus: {err}");
116-
}
64+
#[async_trait::async_trait]
65+
pub trait Tasks {
66+
async fn add_node(&mut self, db_node: Node) -> Result<()>;
67+
async fn add_conn(&mut self, db_conn: ConnRow) -> Result<OperationStatus>;
68+
async fn add_user(&mut self, db_user: UserRow) -> Result<OperationStatus>;
69+
}
70+
71+
#[async_trait::async_trait]
72+
impl Tasks for MemoryCache<HashMap<String, Vec<Node>>, Connection> {
73+
async fn add_user(&mut self, db_user: UserRow) -> Result<OperationStatus> {
74+
let user_id = db_user.user_id;
75+
let user = db_user.try_into()?;
76+
77+
if self.users.contains_key(&user_id) {
78+
return Ok(OperationStatus::AlreadyExist(user_id));
79+
}
80+
81+
self.users.insert(user_id, user);
82+
Ok(OperationStatus::Ok(user_id))
83+
}
84+
85+
async fn add_conn(&mut self, db_conn: ConnRow) -> Result<OperationStatus> {
86+
let conn_id = db_conn.conn_id;
87+
let conn: Connection = db_conn.try_into()?;
88+
89+
self.connections.add(&conn_id, conn.into()).map_err(|e| {
90+
format!(
91+
"Create: Failed to add connection {} to state: {}",
92+
conn_id, e
93+
)
94+
.into()
95+
})
96+
}
97+
async fn add_node(&mut self, db_node: Node) -> Result<()> {
98+
match self.nodes.add(db_node.clone()) {
99+
Ok(_) => {
100+
log::debug!("Node added to State: {}", db_node.uuid);
101+
Ok(())
117102
}
103+
Err(e) => Err(format!(
104+
"Create: Failed to add node {} to state: {}",
105+
db_node.uuid, e
106+
)
107+
.into()),
118108
}
119109
}
120110
}

0 commit comments

Comments
 (0)