Skip to content

Commit 69d6af4

Browse files
committed
Fix snapshots ._.
1 parent f5044bf commit 69d6af4

5 files changed

Lines changed: 141 additions & 12 deletions

File tree

src/bin/agent/core/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use pony::SubscriptionOp;
1717
mod http;
1818
pub(crate) mod metrics;
1919
pub mod service;
20+
mod snapshot;
2021
mod stats;
2122
mod tasks;
2223

src/bin/agent/core/service.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use pony::Subscriber as ZmqSubscriber;
4040
use pony::Subscription;
4141
use pony::Tag;
4242

43+
use super::snapshot::SnapshotRestore;
4344
use super::tasks::Tasks;
4445
use super::Agent;
4546
use crate::core::http::ApiRequests;
@@ -160,6 +161,13 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
160161
match snapshot_manager.load_snapshot().await {
161162
Ok(Some(timestamp)) => {
162163
let count = snapshot_manager.count().await;
164+
if let Err(e) = snapshot_manager
165+
.restore_connections(xray_handler_client.clone(), wg_client)
166+
.await
167+
{
168+
log::error!("Couldn't restore connections from memory, {}", e);
169+
panic!("Couldn't restore connections from memory, {}", e);
170+
}
163171
log::info!("Loaded connections snapshot from {} {}", timestamp, count);
164172
Some(timestamp)
165173
}

src/bin/agent/core/snapshot.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use rkyv::Archive;
2+
use std::sync::Arc;
3+
use tokio::sync::Mutex;
4+
5+
use pony::memory::connection::wireguard::IpAddrMaskSerializable;
6+
use pony::memory::snapshot::SnapshotManager;
7+
use pony::wireguard_op::WgApi;
8+
use pony::xray_op::client::HandlerActions;
9+
use pony::xray_op::client::HandlerClient;
10+
use pony::Connection;
11+
use pony::ConnectionBaseOp as ConnOp;
12+
use pony::NodeStorageOp as NodeOp;
13+
use pony::Result;
14+
use pony::Tag;
15+
16+
#[async_trait::async_trait]
17+
pub trait SnapshotRestore {
18+
async fn restore_connections(
19+
&self,
20+
xray_client: Option<Arc<Mutex<HandlerClient>>>,
21+
wg_client: Option<WgApi>,
22+
) -> Result<()>;
23+
}
24+
25+
#[async_trait::async_trait]
26+
impl<N, C, S> SnapshotRestore for SnapshotManager<N, C, S>
27+
where
28+
C: Archive + Send + Sync + Clone + 'static + ConnOp + From<Connection>,
29+
N: Send + Sync + Clone + 'static + NodeOp,
30+
S: Send + Sync + Clone + 'static,
31+
{
32+
async fn restore_connections(
33+
&self,
34+
xray_client: Option<Arc<Mutex<HandlerClient>>>,
35+
wg_client: Option<WgApi>,
36+
) -> Result<()> {
37+
let mem = self.memory.read().await;
38+
39+
for (conn_id, conn) in mem.connections.iter() {
40+
let conn_id = *conn_id;
41+
let conn = conn.clone();
42+
let memory = self.memory.clone();
43+
let wg_client = wg_client.clone();
44+
let xray_client = xray_client.clone();
45+
46+
tokio::spawn(async move {
47+
match conn.get_proto().proto() {
48+
Tag::Wireguard => {
49+
if let Some(wg) = conn.get_wireguard() {
50+
let node_id = {
51+
let mem = memory.read().await;
52+
mem.nodes.get_self().map(|n| n.uuid)
53+
};
54+
55+
if let Some(_node_id) = node_id {
56+
if let Some(api) = wg_client.as_ref() {
57+
if let Err(e) = api.create(
58+
&wg.keys.pubkey,
59+
<IpAddrMaskSerializable as Clone>::clone(&wg.address)
60+
.into(),
61+
) {
62+
log::error!(
63+
"Failed to restore WireGuard connection {}: {}",
64+
conn_id,
65+
e
66+
);
67+
}
68+
}
69+
}
70+
}
71+
}
72+
Tag::VlessTcpReality
73+
| Tag::VlessGrpcReality
74+
| Tag::VlessXhttpReality
75+
| Tag::Vmess => {
76+
if let Some(client) = xray_client.as_ref() {
77+
if let Err(e) = client
78+
.create(&conn_id, conn.get_proto().proto(), None)
79+
.await
80+
{
81+
log::error!("Failed to restore Xray connection {}: {}", conn_id, e);
82+
} else {
83+
log::debug!("Restored Xray connection {}", conn_id);
84+
}
85+
}
86+
}
87+
Tag::Shadowsocks => {
88+
if let Some(password) = conn.get_password() {
89+
if let Some(client) = xray_client.as_ref() {
90+
if let Err(e) = client
91+
.create(&conn_id, conn.get_proto().proto(), Some(password))
92+
.await
93+
{
94+
log::error!(
95+
"Failed to restore Shadowsocks connection {}: {}",
96+
conn_id,
97+
e
98+
);
99+
} else {
100+
log::debug!("Restored Shadowsocks connection {}", conn_id);
101+
}
102+
}
103+
}
104+
}
105+
Tag::Hysteria2 | Tag::Mtproto => {
106+
log::warn!(
107+
"Skipping unsupported connection {} with tag {:?}",
108+
conn_id,
109+
conn.get_proto().proto()
110+
);
111+
}
112+
}
113+
});
114+
}
115+
116+
Ok(())
117+
}
118+
}

src/memory/snapshot.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use tokio::fs as async_fs;
1010
use tokio::sync::RwLock;
1111

1212
use crate::error::Result;
13-
use crate::MemoryCache;
1413

14+
use super::cache::Cache;
1515
use super::cache::Connections;
1616
use super::connection::conn::Conn;
1717

@@ -33,7 +33,7 @@ where
3333
S: Send + Sync + Clone + 'static,
3434
{
3535
pub snapshot_path: String,
36-
pub memory: Arc<RwLock<MemoryCache<N, C, S>>>,
36+
pub memory: Arc<RwLock<Cache<N, C, S>>>,
3737
}
3838

3939
impl<N, C, S> Clone for SnapshotManager<N, C, S>
@@ -86,7 +86,7 @@ where
8686
N: Send + Sync + Clone + 'static,
8787
S: Send + Sync + Clone + 'static,
8888
{
89-
pub fn new(snapshot_path: String, memory: Arc<RwLock<MemoryCache<N, C, S>>>) -> Self {
89+
pub fn new(snapshot_path: String, memory: Arc<RwLock<Cache<N, C, S>>>) -> Self {
9090
Self {
9191
snapshot_path,
9292
memory,

src/xray_op/client.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,37 +77,39 @@ pub trait HandlerActions {
7777
#[async_trait::async_trait]
7878
impl HandlerActions for Arc<Mutex<HandlerClient>> {
7979
async fn create(&self, conn_id: &uuid::Uuid, tag: Tag, password: Option<String>) -> Result<()> {
80+
log::debug!("Creating user {} for xray proto: {}", conn_id, tag);
8081
match tag {
8182
Tag::VlessTcpReality => {
8283
let user_info = VlessConnInfo::new(conn_id, ConnFlow::Vision, Tag::VlessTcpReality);
83-
let _ = user_info.create(self.clone()).await;
84+
user_info.create(self.clone()).await?;
8485
Ok(())
8586
}
8687
Tag::VlessGrpcReality => {
8788
let user_info =
8889
VlessConnInfo::new(conn_id, ConnFlow::Direct, Tag::VlessGrpcReality);
89-
let _ = user_info.create(self.clone()).await;
90+
user_info.create(self.clone()).await?;
9091
Ok(())
9192
}
9293
Tag::VlessXhttpReality => {
9394
let user_info = VlessConnInfo::new(conn_id, ConnFlow::None, Tag::VlessXhttpReality);
94-
let _ = user_info.create(self.clone()).await;
95+
user_info.create(self.clone()).await?;
9596
Ok(())
9697
}
9798
Tag::Vmess => {
9899
let user_info = VmessConnInfo::new(conn_id);
99-
let _ = user_info.create(self.clone()).await;
100+
user_info.create(self.clone()).await?;
100101
Ok(())
101102
}
102103
Tag::Shadowsocks => {
103104
if let Some(pass) = password.clone() {
104105
let user_info = SsConnInfo::new(conn_id, Some(pass));
105-
let _ = user_info.create(self.clone()).await;
106-
return Ok(());
106+
user_info.create(self.clone()).await?;
107+
Ok(())
108+
} else {
109+
Err(crate::PonyError::Custom(
110+
"Create SS user error, password not provided".to_string(),
111+
))
107112
}
108-
Err(crate::PonyError::Custom(
109-
"Create SS user error, password not provided".to_string(),
110-
))
111113
}
112114
_ => Err(PonyError::Custom("Not supported Proto".into())),
113115
}

0 commit comments

Comments
 (0)