From 1617ae62c613db5f61b948471ecf2fb3558b537d Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Wed, 31 Jan 2024 15:21:48 +0300 Subject: [PATCH] Add reconfigure udf --- src/background_worker.rs | 33 +++++++++++++++++++++++++- src/manager.rs | 20 ++++++++++++++++ src/udfs.rs | 51 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) diff --git a/src/background_worker.rs b/src/background_worker.rs index d983d60..5ab2e88 100644 --- a/src/background_worker.rs +++ b/src/background_worker.rs @@ -1,8 +1,29 @@ use crate::gucs::PG_PGBOUNCER_DATABASE; use crate::*; +use pgrx::pg_sys::{ConditionVariable, ConditionVariableInit}; +use pgrx::shmem::*; +use pgrx::PgLwLock; +use pgrx::{pg_shmem_init, warning}; +use std::borrow::BorrowMut; + +// types behind a `LwLock` must derive/implement `Copy` and `Clone` +#[derive(Copy, Clone, Default)] + +pub struct Reconfiguremsg { + pub count: u64, + pub pid: pg_sys::pid_t, + pub cv: ConditionVariable, +} + +unsafe impl PGRXSharedMemory for Reconfiguremsg {} +unsafe impl Sync for Reconfiguremsg {} +unsafe impl Send for Reconfiguremsg {} + +pub static RECONFIGURE: PgLwLock = PgLwLock::new(); #[pg_guard] pub extern "C" fn _PG_init() { + pg_shmem_init!(RECONFIGURE); gucs::init(); BackgroundWorkerBuilder::new("PgBouncer Manager") .set_function("pgbouncer_manager_main") @@ -38,6 +59,10 @@ pub extern "C" fn pgbouncer_manager_main() { | SignalWakeFlags::SIGCHLD, ); + unsafe { + ConditionVariableInit(RECONFIGURE.exclusive().cv.borrow_mut()); + } + // we want to be able to use SPI against the specified database (pg_pgbouncer), as the superuser which // did the initdb. You can specify a specific user with Some("my_user") BackgroundWorker::connect_worker_to_spi(Some(database_name), None); @@ -47,11 +72,17 @@ pub extern "C" fn pgbouncer_manager_main() { BackgroundWorker::get_name(), ); + // put the process id in shared memory so that reconfigure callers + // can signal the process. + let mut msg = RECONFIGURE.exclusive(); + msg.pid = unsafe { pg_sys::MyProcPid }; + drop(msg); + create_dir_all(TEMP_DIR).unwrap(); let mut state = ManagerState::default(); // wake up every 10s or if we received a signal - while BackgroundWorker::wait_latch(Some(Duration::from_secs(10))) { + while BackgroundWorker::wait_latch(None) { if let Err(err) = state.do_main_loop() { warning!("error in main pg_pgbouncer loop: {err}"); // We're in an unknown state because of the error. Clear out previous_groups to diff --git a/src/manager.rs b/src/manager.rs index 9ca7bec..5159473 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,4 +1,7 @@ +use crate::background_worker::RECONFIGURE; use crate::*; +use pgrx::pg_sys::ConditionVariableBroadcast; +use std::borrow::BorrowMut; #[derive(Default)] pub struct ManagerState { @@ -11,6 +14,7 @@ impl ManagerState { pub fn do_main_loop(&mut self) -> Result<()> { if BackgroundWorker::sighup_received() { // on SIGHUP, you might want to reload some external configuration or something + log!("SIGHUP received. Reconfiguring"); } if BackgroundWorker::sigint_received() { panic!("ASKED TO QUIT") @@ -18,6 +22,22 @@ impl ManagerState { self.groups = BackgroundWorker::transaction(Group::all)?; self.create_all()?; self.cleanup_old_files()?; + + let mut msg = RECONFIGURE.exclusive(); + log!( + "Increase reconfiguration count from { } to { }", + msg.count, + msg.count + 1 + ); + + msg.count += 1; + + unsafe { + ConditionVariableBroadcast(msg.cv.borrow_mut()); + } + + drop(msg); + Ok(()) } diff --git a/src/udfs.rs b/src/udfs.rs index 5655d94..80e70ca 100644 --- a/src/udfs.rs +++ b/src/udfs.rs @@ -1,6 +1,13 @@ +use crate::background_worker::RECONFIGURE; +use crate::pg_sys::ConditionVariableCancelSleep; +use crate::pg_sys::ConditionVariableTimedSleep; use crate::Group; use anyhow::{Context, Result}; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; use pgrx::prelude::*; +use std::borrow::BorrowMut; +use std::time; #[pg_schema] mod pgbouncer { @@ -15,4 +22,48 @@ mod pgbouncer { .admin(command)?; Ok(()) } + + #[pg_extern] + fn reconfigure() -> i64 { + let ten_millis = time::Duration::from_millis(10000); + + let curr_config_count = RECONFIGURE.share().count; + + let mut next_config_count = curr_config_count; + + signal::kill(Pid::from_raw(RECONFIGURE.share().pid), Signal::SIGHUP).unwrap(); + log!("Sent SIGHUP"); + + while curr_config_count >= next_config_count { + //thread::sleep(ten_millis); + log!( + "curr_config_count { } next_config_count { } ", + curr_config_count, + next_config_count + ); + log!("Sleep"); + + unsafe { + if ConditionVariableTimedSleep( + RECONFIGURE.exclusive().cv.borrow_mut(), + ten_millis.as_millis().try_into().unwrap(), + pg_sys::WL_LATCH_SET | pg_sys::WL_TIMEOUT | pg_sys::WL_POSTMASTER_DEATH, + ) { + log!("ConditionVariableTimedSleep timed out"); + } else { + log!("CV signalled"); + } + } + + pg_sys::check_for_interrupts!(); + + next_config_count = RECONFIGURE.share().count; + } + + unsafe { + ConditionVariableCancelSleep(); + } + + next_config_count as i64 + } }