Skip to content

Commit e37076b

Browse files
author
Vasilyev Dmitriy Viktorovich
committed
cleaner
1 parent 748e6db commit e37076b

File tree

4 files changed

+29
-10
lines changed

4 files changed

+29
-10
lines changed

src/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use pg_doorman::core_affinity;
5353
use pg_doorman::daemon;
5454
use pg_doorman::format_duration;
5555
use pg_doorman::messages::configure_tcp_socket;
56-
use pg_doorman::pool::{ClientServerMap, ConnectionPool};
56+
use pg_doorman::pool::{clean_connections, ClientServerMap, ConnectionPool};
5757
use pg_doorman::rate_limit::RateLimiter;
5858
use pg_doorman::stats::{Collector, Reporter, REPORTER, TOTAL_CONNECTION_COUNTER};
5959
use pg_doorman::tls::load_identity;
@@ -194,6 +194,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
194194
stats_collector.collect().await;
195195
});
196196

197+
tokio::task::spawn(async move {
198+
clean_connections().await;
199+
});
200+
197201
#[cfg(windows)]
198202
let mut term_signal = win_signal::ctrl_close().unwrap();
199203
#[cfg(windows)]

src/mobc/lib.rs

+13-8
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,19 @@ impl<M: Manager> Pool<M> {
209209
&& self.0.state.num_open.load(Ordering::Relaxed) > 0
210210
&& internals.cleaner_ch.is_none()
211211
{
212-
log::debug!("run connection cleaner");
212+
let shared1 = Arc::downgrade(&self.0);
213+
let clean_rate = self.0.config.clean_rate;
214+
let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
215+
internals.cleaner_ch = Some(cleaner_ch_sender);
216+
self.0.manager.spawn_task(async move {
217+
connection_cleaner(shared1, cleaner_ch, clean_rate).await;
218+
});
219+
}
220+
}
221+
222+
pub async fn clean_connections(&self) {
223+
let mut internals = self.0.internals.lock().await;
224+
if self.0.state.num_open.load(Ordering::Relaxed) > 0 && internals.cleaner_ch.is_none() {
213225
let shared1 = Arc::downgrade(&self.0);
214226
let clean_rate = self.0.config.clean_rate;
215227
let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
@@ -459,13 +471,10 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
459471
let shared = match shared.upgrade() {
460472
Some(shared) => shared,
461473
None => {
462-
log::debug!("Failed to clean connections");
463474
return false;
464475
}
465476
};
466477

467-
log::debug!("Clean connections");
468-
469478
let mut internals = shared.internals.lock().await;
470479
if shared.state.num_open.load(Ordering::Relaxed) == 0 || internals.config.max_lifetime.is_none()
471480
{
@@ -477,10 +486,6 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
477486
let mut closing = vec![];
478487

479488
let mut i = 0;
480-
log::debug!(
481-
"clean connections, idle conns {}",
482-
internals.free_conns.len()
483-
);
484489

485490
loop {
486491
if i >= internals.free_conns.len() {

src/pool.rs

+10
Original file line numberDiff line numberDiff line change
@@ -510,3 +510,13 @@ pub fn get_pool(db: &str, user: &str, virtual_pool_id: u16) -> Option<Connection
510510
pub fn get_all_pools() -> HashMap<PoolIdentifierVirtual, ConnectionPool> {
511511
(*(*POOLS.load())).clone()
512512
}
513+
514+
pub async fn clean_connections() {
515+
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
516+
loop {
517+
interval.tick().await;
518+
for (_, pool) in get_all_pools() {
519+
pool.database.clean_connections().await;
520+
}
521+
}
522+
}

tests/tests.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ default_tcp_so_linger = 0
2525
# non-buffer streaming messages smaller than 1mb
2626
max_message_size = 1048576
2727

28-
server_lifetime = 500000
28+
server_lifetime = 5000
2929
idle_timeout = 3000
3030

3131
# admin user.

0 commit comments

Comments
 (0)