Skip to content

Commit a12f2af

Browse files
committed
Add PG reconnects
1 parent 715d66c commit a12f2af

9 files changed

Lines changed: 126 additions & 54 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pony"
3-
version = "0.1.23"
3+
version = "0.1.30"
44
edition = "2021"
55
build = "build.rs"
66

src/bin/api/core/postgres/connection.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ use serde::Deserialize;
55
use serde::Serialize;
66
use std::sync::Arc;
77
use tokio::sync::Mutex;
8-
use tokio_postgres::Client as PgClient;
98

109
use pony::Connection as Conn;
11-
1210
use pony::ConnectionStat;
1311
use pony::ConnectionStatus;
1412
use pony::Proto;
1513
use pony::Tag;
1614
use pony::WgKeys;
1715
use pony::WgParam;
1816

19-
use crate::{PonyError, Result};
17+
use pony::{PonyError, Result};
18+
19+
use super::PgClientManager;
2020

2121
#[derive(Deserialize, Serialize, Debug, Clone)]
2222
pub struct ConnRow {
@@ -108,16 +108,17 @@ impl TryFrom<ConnRow> for Conn {
108108
}
109109

110110
pub struct PgConn {
111-
pub client: Arc<Mutex<PgClient>>,
111+
pub manager: Arc<Mutex<PgClientManager>>,
112112
}
113113

114114
impl PgConn {
115-
pub fn new(client: Arc<Mutex<PgClient>>) -> Self {
116-
Self { client }
115+
pub fn new(manager: Arc<Mutex<PgClientManager>>) -> Self {
116+
Self { manager }
117117
}
118118

119119
pub async fn all(&self) -> Result<Vec<ConnRow>> {
120-
let client = self.client.lock().await;
120+
let mut manager = self.manager.lock().await;
121+
let client = manager.get_client().await?;
121122

122123
let query = "
123124
SELECT
@@ -204,12 +205,14 @@ impl PgConn {
204205
}
205206

206207
pub async fn update_stat(&self, conn_id: &uuid::Uuid, stat: ConnectionStat) -> Result<()> {
208+
let mut manager = self.manager.lock().await;
209+
let client = manager.get_client().await?;
210+
207211
let query = "
208212
UPDATE connections
209213
SET downlink = $1, uplink = $2, online = $3
210214
WHERE id = $4";
211215

212-
let client = self.client.lock().await;
213216
client
214217
.execute(
215218
query,
@@ -225,25 +228,30 @@ impl PgConn {
225228
conn_id: &uuid::Uuid,
226229
status: ConnectionStatus,
227230
) -> Result<()> {
231+
let mut manager = self.manager.lock().await;
232+
let client = manager.get_client().await?;
233+
228234
let query = format!("UPDATE connections SET status = $1::conn_status WHERE id = $2");
229235

230-
let client = self.client.lock().await;
231236
client.execute(&query, &[&status, conn_id]).await?;
232237

233238
Ok(())
234239
}
235240

236241
pub async fn delete(&self, conn_id: &uuid::Uuid) -> Result<()> {
242+
let mut manager = self.manager.lock().await;
243+
let client = manager.get_client().await?;
244+
237245
let query = format!("UPDATE connections SET is_deleted = true WHERE id = $1");
238246

239-
let client = self.client.lock().await;
240247
client.execute(&query, &[conn_id]).await?;
241248

242249
Ok(())
243250
}
244251

245252
pub async fn insert(&self, conn: ConnRow) -> Result<()> {
246-
let client = self.client.lock().await;
253+
let mut manager = self.manager.lock().await;
254+
let client = manager.get_client().await?;
247255

248256
let query = "
249257
INSERT INTO connections (
@@ -315,7 +323,8 @@ impl PgConn {
315323
}
316324
}
317325
pub async fn update(&self, conn: ConnRow) -> Result<()> {
318-
let client = self.client.lock().await;
326+
let mut manager = self.manager.lock().await;
327+
let client = manager.get_client().await?;
319328

320329
let query = "
321330
UPDATE connections SET

src/bin/api/core/postgres/mod.rs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,41 +23,80 @@ pub mod connection;
2323
pub mod node;
2424
pub mod user;
2525

26-
#[derive(Clone)]
27-
pub struct PgContext {
28-
pub client: Arc<Mutex<PgClient>>,
26+
pub struct PgClientManager {
27+
config: PostgresConfig,
28+
client: Option<PgClient>,
2929
}
3030

31-
impl PgContext {
32-
pub async fn init(config: &PostgresConfig) -> Result<Self> {
31+
impl PgClientManager {
32+
pub async fn new(config: PostgresConfig) -> Result<Self> {
33+
Ok(Self {
34+
config,
35+
client: None,
36+
})
37+
}
38+
39+
async fn connect(&mut self) -> Result<()> {
3340
let connection_line = format!(
3441
"host={} user={} dbname={} password={} port={}",
35-
config.host, config.username, config.db, config.password, config.port
42+
self.config.host,
43+
self.config.username,
44+
self.config.db,
45+
self.config.password,
46+
self.config.port
3647
);
3748

3849
let (client, connection) = tokio_postgres::connect(&connection_line, NoTls).await?;
3950

4051
tokio::spawn(async move {
4152
if let Err(e) = connection.await {
42-
eprintln!("Postgres connection error: {}", e);
53+
log::error!("Postgres connection dropped: {}", e);
4354
}
4455
});
4556

57+
self.client = Some(client);
58+
Ok(())
59+
}
60+
61+
pub async fn get_client(&mut self) -> Result<&mut PgClient> {
62+
if self.client.is_none() {
63+
self.connect().await?;
64+
}
65+
66+
// ping with simple query
67+
let client = self.client.as_mut().unwrap();
68+
if let Err(e) = client.simple_query("SELECT 1").await {
69+
log::warn!("PG ping failed: {}. Reconnecting...", e);
70+
self.connect().await?;
71+
}
72+
73+
Ok(self.client.as_mut().unwrap())
74+
}
75+
}
76+
77+
#[derive(Clone)]
78+
pub struct PgContext {
79+
pub manager: Arc<Mutex<PgClientManager>>,
80+
}
81+
82+
impl PgContext {
83+
pub async fn init(config: &PostgresConfig) -> Result<Self> {
84+
let manager = PgClientManager::new(config.clone()).await?;
4685
Ok(Self {
47-
client: Arc::new(Mutex::new(client)),
86+
manager: Arc::new(Mutex::new(manager)),
4887
})
4988
}
5089

5190
pub fn node(&self) -> PgNode {
52-
PgNode::new(self.client.clone())
91+
PgNode::new(self.manager.clone())
5392
}
5493

5594
pub fn conn(&self) -> PgConn {
56-
PgConn::new(self.client.clone())
95+
PgConn::new(self.manager.clone())
5796
}
5897

5998
pub fn user(&self) -> PgUser {
60-
PgUser::new(self.client.clone())
99+
PgUser::new(self.manager.clone())
61100
}
62101
}
63102

src/bin/api/core/postgres/node.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,28 @@ use std::sync::Arc;
88
use chrono::DateTime;
99
use chrono::Utc;
1010
use tokio::sync::Mutex;
11-
use tokio_postgres::Client as PgClient;
1211

12+
use pony::config::wireguard::WireguardSettings;
13+
use pony::config::xray::Inbound;
1314
use pony::memory::node::Node;
1415
use pony::memory::node::Status as NodeStatus;
1516
use pony::utils::to_ipv4;
17+
use pony::Result;
1618

17-
use pony::config::wireguard::WireguardSettings;
18-
use pony::config::xray::Inbound;
19-
20-
use crate::Result;
19+
use super::PgClientManager;
2120

2221
pub struct PgNode {
23-
pub client: Arc<Mutex<PgClient>>,
22+
pub manager: Arc<Mutex<PgClientManager>>,
2423
}
2524

2625
impl PgNode {
27-
pub fn new(client: Arc<Mutex<PgClient>>) -> Self {
28-
Self { client }
26+
pub fn new(manager: Arc<Mutex<PgClientManager>>) -> Self {
27+
Self { manager }
2928
}
3029

3130
pub async fn upsert(&self, node_id: uuid::Uuid, node: Node) -> Result<()> {
32-
let mut client = self.client.lock().await;
31+
let mut manager = self.manager.lock().await;
32+
let client = manager.get_client().await?;
3333
let tx = client.transaction().await?;
3434

3535
let address: IpAddr = IpAddr::V4(node.address);
@@ -146,8 +146,8 @@ impl PgNode {
146146
}
147147

148148
pub async fn insert(&self, node_id: uuid::Uuid, node: Node) -> Result<()> {
149-
let mut client = self.client.lock().await;
150-
149+
let mut manager = self.manager.lock().await;
150+
let client = manager.get_client().await?;
151151
let tx = client.transaction().await?;
152152

153153
let node_query = "
@@ -239,7 +239,8 @@ impl PgNode {
239239
}
240240

241241
pub async fn all(&self) -> Result<Vec<Node>> {
242-
let client = self.client.lock().await;
242+
let mut manager = self.manager.lock().await;
243+
let client = manager.get_client().await?;
243244

244245
let rows = client
245246
.query(
@@ -356,7 +357,8 @@ impl PgNode {
356357
env: &str,
357358
new_status: NodeStatus,
358359
) -> Result<()> {
359-
let client = self.client.lock().await;
360+
let mut manager = self.manager.lock().await;
361+
let client = manager.get_client().await?;
360362

361363
let query = "
362364
UPDATE nodes

src/bin/api/core/postgres/user.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ use chrono::Utc;
33
use std::sync::Arc;
44
use tokio::sync::Mutex;
55
use tokio_postgres::error::SqlState;
6-
use tokio_postgres::Client as PgClient;
76

87
use pony::memory::user::User;
98
use pony::utils::from_pg_bigint;
109
use pony::utils::to_pg_bigint;
1110
use pony::{PonyError, Result};
1211

12+
use super::PgClientManager;
13+
1314
#[derive(Clone)]
1415
pub struct UserRow {
1516
pub user_id: uuid::Uuid,
@@ -62,16 +63,17 @@ impl TryFrom<UserRow> for User {
6263
}
6364

6465
pub struct PgUser {
65-
pub client: Arc<Mutex<PgClient>>,
66+
pub manager: Arc<Mutex<PgClientManager>>,
6667
}
6768

6869
impl PgUser {
69-
pub fn new(client: Arc<Mutex<PgClient>>) -> Self {
70-
Self { client }
70+
pub fn new(manager: Arc<Mutex<PgClientManager>>) -> Self {
71+
Self { manager }
7172
}
7273

7374
pub async fn insert(&self, user_row: UserRow) -> Result<()> {
74-
let client = self.client.lock().await;
75+
let mut manager = self.manager.lock().await;
76+
let client = manager.get_client().await?;
7577

7678
let query = "
7779
INSERT INTO users (id, username, telegram_id, env, daily_limit_mb, password, created_at, modified_at, is_deleted)
@@ -103,7 +105,8 @@ impl PgUser {
103105
}
104106

105107
pub async fn delete(&self, user_id: &uuid::Uuid) -> Result<()> {
106-
let client = self.client.lock().await;
108+
let mut manager = self.manager.lock().await;
109+
let client = manager.get_client().await?;
107110

108111
let _ = client
109112
.execute(
@@ -115,7 +118,8 @@ impl PgUser {
115118
Ok(())
116119
}
117120
pub async fn update(&self, user_id: &uuid::Uuid, user: User) -> Result<()> {
118-
let client = self.client.lock().await;
121+
let mut manager = self.manager.lock().await;
122+
let client = manager.get_client().await?;
119123

120124
let query = "
121125
UPDATE users SET env = $2, daily_limit_mb = $3, password = $4, modified_at = $5, is_deleted = $6
@@ -144,7 +148,8 @@ impl PgUser {
144148
}
145149

146150
pub async fn all(&self) -> Result<Vec<UserRow>> {
147-
let client = self.client.lock().await;
151+
let mut manager = self.manager.lock().await;
152+
let client = manager.get_client().await?;
148153

149154
let rows = client
150155
.query(

src/bin/api/core/tasks.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use async_trait::async_trait;
2-
use std::time::Duration;
3-
42
use chrono::NaiveTime;
53
use chrono::TimeZone;
64
use chrono::Utc;
75
use futures::future::join_all;
6+
use rand::Rng;
87
use std::collections::HashMap;
9-
use tokio::time::interval;
8+
use std::time::Duration;
109

1110
use pony::memory::node::Node;
1211
use pony::memory::node::Status as NodeStatus;
12+
use pony::utils::measure_time;
1313
use pony::Action;
1414
use pony::Connection;
1515
use pony::ConnectionApiOp;
@@ -42,16 +42,26 @@ pub trait Tasks {
4242
#[async_trait]
4343
impl Tasks for Api<HashMap<String, Vec<Node>>, Connection> {
4444
async fn periodic_db_sync(&self, interval_sec: u64) {
45-
let mut ticker = interval(Duration::from_secs(interval_sec));
45+
let base = Duration::from_secs(interval_sec);
46+
4647
loop {
47-
ticker.tick().await;
48-
if let Err(e) = self.init_state_from_db().await {
48+
let jitter = rand::thread_rng().gen_range(0..=30);
49+
let interval_sec_with_jitter = base + Duration::from_secs(jitter);
50+
tokio::time::sleep(interval_sec_with_jitter).await;
51+
52+
if let Err(e) = measure_time(
53+
self.init_state_from_db(),
54+
format!("Periodic DB Sync: interval + jitter {interval_sec}, {jitter}"),
55+
)
56+
.await
57+
{
4958
log::error!("Periodic DB sync failed: {:?}", e);
5059
} else {
5160
log::info!("Periodic DB sync completed successfully");
5261
}
5362
}
5463
}
64+
5565
async fn init_state_from_db(&self) -> Result<()> {
5666
let db = self.sync.db.clone();
5767
let mut mem = self.sync.memory.write().await;

0 commit comments

Comments
 (0)