Skip to content

Add a UDF named "reconfigure" that triggers a pgbouncer reconfig and blocks until it is completed. #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/background_worker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
use crate::gucs::PG_PGBOUNCER_DATABASE;
use crate::*;
use pgrx::pg_sys::{ConditionVariable, ConditionVariableInit};
use pgrx::shmem::*;
use pgrx::PgLwLock;
use pgrx::{pg_shmem_init, warning};
use std::borrow::BorrowMut;

// types behind a `LwLock` must derive/implement `Copy` and `Clone`
#[derive(Copy, Clone, Default)]

pub struct Reconfiguremsg {
pub count: u64,
pub pid: pg_sys::pid_t,
pub cv: ConditionVariable,
}

unsafe impl PGRXSharedMemory for Reconfiguremsg {}
unsafe impl Sync for Reconfiguremsg {}
unsafe impl Send for Reconfiguremsg {}

pub static RECONFIGURE: PgLwLock<Reconfiguremsg> = PgLwLock::new();

#[pg_guard]
pub extern "C" fn _PG_init() {
pg_shmem_init!(RECONFIGURE);
gucs::init();
BackgroundWorkerBuilder::new("PgBouncer Manager")
.set_function("pgbouncer_manager_main")
Expand Down Expand Up @@ -38,6 +59,10 @@ pub extern "C" fn pgbouncer_manager_main() {
| SignalWakeFlags::SIGCHLD,
);

unsafe {
ConditionVariableInit(RECONFIGURE.exclusive().cv.borrow_mut());
}

// we want to be able to use SPI against the specified database (pg_pgbouncer), as the superuser which
// did the initdb. You can specify a specific user with Some("my_user")
BackgroundWorker::connect_worker_to_spi(Some(database_name), None);
Expand All @@ -47,11 +72,17 @@ pub extern "C" fn pgbouncer_manager_main() {
BackgroundWorker::get_name(),
);

// put the process id in shared memory so that reconfigure callers
// can signal the process.
let mut msg = RECONFIGURE.exclusive();
msg.pid = unsafe { pg_sys::MyProcPid };
drop(msg);

create_dir_all(TEMP_DIR).unwrap();
let mut state = ManagerState::default();

// wake up every 10s or if we received a signal
while BackgroundWorker::wait_latch(Some(Duration::from_secs(10))) {
while BackgroundWorker::wait_latch(None) {
if let Err(err) = state.do_main_loop() {
warning!("error in main pg_pgbouncer loop: {err}");
// We're in an unknown state because of the error. Clear out previous_groups to
Expand Down
20 changes: 20 additions & 0 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::background_worker::RECONFIGURE;
use crate::*;
use pgrx::pg_sys::ConditionVariableBroadcast;
use std::borrow::BorrowMut;

#[derive(Default)]
pub struct ManagerState {
Expand All @@ -11,13 +14,30 @@ impl ManagerState {
pub fn do_main_loop(&mut self) -> Result<()> {
if BackgroundWorker::sighup_received() {
// on SIGHUP, you might want to reload some external configuration or something
log!("SIGHUP received. Reconfiguring");
}
if BackgroundWorker::sigint_received() {
panic!("ASKED TO QUIT")
}
self.groups = BackgroundWorker::transaction(Group::all)?;
self.create_all()?;
self.cleanup_old_files()?;

let mut msg = RECONFIGURE.exclusive();
log!(
"Increase reconfiguration count from { } to { }",
msg.count,
msg.count + 1
);

msg.count += 1;

unsafe {
ConditionVariableBroadcast(msg.cv.borrow_mut());
}

drop(msg);

Ok(())
}

Expand Down
51 changes: 51 additions & 0 deletions src/udfs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use crate::background_worker::RECONFIGURE;
use crate::pg_sys::ConditionVariableCancelSleep;
use crate::pg_sys::ConditionVariableTimedSleep;
use crate::Group;
use anyhow::{Context, Result};
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
use pgrx::prelude::*;
use std::borrow::BorrowMut;
use std::time;

#[pg_schema]
mod pgbouncer {
Expand All @@ -15,4 +22,48 @@ mod pgbouncer {
.admin(command)?;
Ok(())
}

#[pg_extern]
fn reconfigure() -> i64 {
let ten_millis = time::Duration::from_millis(10000);

let curr_config_count = RECONFIGURE.share().count;

let mut next_config_count = curr_config_count;

signal::kill(Pid::from_raw(RECONFIGURE.share().pid), Signal::SIGHUP).unwrap();
log!("Sent SIGHUP");

while curr_config_count >= next_config_count {
//thread::sleep(ten_millis);
log!(
"curr_config_count { } next_config_count { } ",
curr_config_count,
next_config_count
);
log!("Sleep");

unsafe {
if ConditionVariableTimedSleep(
RECONFIGURE.exclusive().cv.borrow_mut(),
ten_millis.as_millis().try_into().unwrap(),
pg_sys::WL_LATCH_SET | pg_sys::WL_TIMEOUT | pg_sys::WL_POSTMASTER_DEATH,
) {
log!("ConditionVariableTimedSleep timed out");
} else {
log!("CV signalled");
}
}

pg_sys::check_for_interrupts!();

next_config_count = RECONFIGURE.share().count;
}

unsafe {
ConditionVariableCancelSleep();
}

next_config_count as i64
}
}