Skip to content

Commit 8a61d12

Browse files
author
Vasilyev Dmitriy Viktorovich
committed
cleaner
1 parent 748e6db commit 8a61d12

File tree

4 files changed

+30
-10
lines changed

4 files changed

+30
-10
lines changed

src/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use pg_doorman::core_affinity;
5353
use pg_doorman::daemon;
5454
use pg_doorman::format_duration;
5555
use pg_doorman::messages::configure_tcp_socket;
56-
use pg_doorman::pool::{ClientServerMap, ConnectionPool};
56+
use pg_doorman::pool::{clean_connections, ClientServerMap, ConnectionPool};
5757
use pg_doorman::rate_limit::RateLimiter;
5858
use pg_doorman::stats::{Collector, Reporter, REPORTER, TOTAL_CONNECTION_COUNTER};
5959
use pg_doorman::tls::load_identity;
@@ -194,6 +194,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
194194
stats_collector.collect().await;
195195
});
196196

197+
tokio::task::spawn(async move {
198+
clean_connections().await;
199+
});
200+
197201
#[cfg(windows)]
198202
let mut term_signal = win_signal::ctrl_close().unwrap();
199203
#[cfg(windows)]

src/mobc/lib.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use futures_util::lock::{Mutex, MutexGuard};
1414
use futures_util::select;
1515
use futures_util::FutureExt;
1616
use futures_util::StreamExt;
17+
use log::info;
1718
use std::fmt;
1819
use std::future::Future;
1920
use std::ops::{Deref, DerefMut};
@@ -209,7 +210,19 @@ impl<M: Manager> Pool<M> {
209210
&& self.0.state.num_open.load(Ordering::Relaxed) > 0
210211
&& internals.cleaner_ch.is_none()
211212
{
212-
log::debug!("run connection cleaner");
213+
let shared1 = Arc::downgrade(&self.0);
214+
let clean_rate = self.0.config.clean_rate;
215+
let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
216+
internals.cleaner_ch = Some(cleaner_ch_sender);
217+
self.0.manager.spawn_task(async move {
218+
connection_cleaner(shared1, cleaner_ch, clean_rate).await;
219+
});
220+
}
221+
}
222+
223+
pub async fn clean_connections(&self) {
224+
let mut internals = self.0.internals.lock().await;
225+
if self.0.state.num_open.load(Ordering::Relaxed) > 0 && internals.cleaner_ch.is_none() {
213226
let shared1 = Arc::downgrade(&self.0);
214227
let clean_rate = self.0.config.clean_rate;
215228
let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
@@ -459,13 +472,10 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
459472
let shared = match shared.upgrade() {
460473
Some(shared) => shared,
461474
None => {
462-
log::debug!("Failed to clean connections");
463475
return false;
464476
}
465477
};
466478

467-
log::debug!("Clean connections");
468-
469479
let mut internals = shared.internals.lock().await;
470480
if shared.state.num_open.load(Ordering::Relaxed) == 0 || internals.config.max_lifetime.is_none()
471481
{
@@ -477,10 +487,6 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
477487
let mut closing = vec![];
478488

479489
let mut i = 0;
480-
log::debug!(
481-
"clean connections, idle conns {}",
482-
internals.free_conns.len()
483-
);
484490

485491
loop {
486492
if i >= internals.free_conns.len() {

src/pool.rs

+10
Original file line numberDiff line numberDiff line change
@@ -510,3 +510,13 @@ pub fn get_pool(db: &str, user: &str, virtual_pool_id: u16) -> Option<Connection
510510
pub fn get_all_pools() -> HashMap<PoolIdentifierVirtual, ConnectionPool> {
511511
(*(*POOLS.load())).clone()
512512
}
513+
514+
pub async fn clean_connections() {
515+
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
516+
loop {
517+
interval.tick().await;
518+
for (_, pool) in get_all_pools() {
519+
pool.database.clean_connections().await;
520+
}
521+
}
522+
}

tests/tests.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ default_tcp_so_linger = 0
2525
# non-buffer streaming messages smaller than 1mb
2626
max_message_size = 1048576
2727

28-
server_lifetime = 500000
28+
server_lifetime = 5000
2929
idle_timeout = 3000
3030

3131
# admin user.

0 commit comments

Comments
 (0)