From 8f64c7b4f97379e09b7e13d1477733bba37d1840 Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Mon, 9 Mar 2026 10:19:01 +0100 Subject: [PATCH 1/2] add support for param fingerprint to know if it changed or not during rescan Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- supabase-wrappers/src/scan.rs | 91 +++++++++++++++++++++++++++++------ 1 file changed, 75 insertions(+), 16 deletions(-) diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index ee4d9fe28..dc2030bb5 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -6,6 +6,8 @@ use pgrx::{ prelude::*, }; use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use pgrx::pg_sys::panic::ErrorReport; @@ -51,6 +53,8 @@ struct FdwState, W: ForeignDataWrapper> { values: Vec, nulls: Vec, row: Row, + // fingerprint of current parameter values to detect rescan changes + param_fingerprint: u64, _phantom: PhantomData, } @@ -67,6 +71,7 @@ impl, W: ForeignDataWrapper> FdwState { values: Vec::new(), nulls: Vec::new(), row: Row::new(), + param_fingerprint: 0, _phantom: PhantomData, } } @@ -341,19 +346,23 @@ unsafe fn assign_paramenter_value, W: ForeignDataWrapper // assign parameter value to qual for qual in &mut state.quals.iter_mut() { if let Some(param) = &mut qual.param { + let mut current_value: Option = None; match param.kind { ParamKind::PARAM_EXTERN => { // get parameter list in execution state let plist_info = (*estate).es_param_list_info; - if plist_info.is_null() { - continue; - } - let params_cnt = (*plist_info).numParams as usize; - let plist = (*plist_info).params.as_slice(params_cnt); - let p: pg_sys::ParamExternData = plist[param.id - 1]; - if let Some(cell) = Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype) - { - qual.value = Value::Cell(cell); + if !plist_info.is_null() { + let params_cnt = (*plist_info).numParams as usize; + if param.id > 0 && param.id <= params_cnt { + let plist = (*plist_info).params.as_slice(params_cnt); + let p: pg_sys::ParamExternData = plist[param.id - 1]; + if let Some(cell) = + Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype) + { + qual.value = Value::Cell(cell.clone()); + current_value = Some(Value::Cell(cell)); + } + } } } ParamKind::PARAM_EXEC => { @@ -370,21 +379,63 @@ unsafe fn assign_paramenter_value, W: ForeignDataWrapper ) && let Some(cell) = Cell::from_polymorphic_datum(datum, isnull, param.type_oid) { - let mut eval_value = param - .eval_value - .lock() - .expect("param.eval_value should be locked"); - *eval_value = Some(Value::Cell(cell.clone())); - qual.value = Value::Cell(cell); + qual.value = Value::Cell(cell.clone()); + current_value = Some(Value::Cell(cell)); } } _ => {} } + + let mut eval_value = param + .eval_value + .lock() + .expect("param.eval_value should be locked"); + *eval_value = current_value; } } } } +#[derive(Hash, PartialEq, Eq)] +struct ParamFingerPrint<'a> { + field: &'a str, + operator: &'a str, + use_or: bool, + kind: u32, + id: usize, + type_oid: Oid, + eval_value: String, +} + +fn compute_param_fingerprint, W: ForeignDataWrapper>( + state: &FdwState, +) -> u64 { + let mut hasher = DefaultHasher::new(); + + for qual in &state.quals { + let Some(param) = &qual.param else { + continue; + }; + let param_finger_print = ParamFingerPrint { + field: &qual.field, + operator: &qual.operator, + use_or: qual.use_or, + kind: param.kind, + id: param.id, + type_oid: param.type_oid, + eval_value: match param.eval_value.lock() { + Ok(value) => { + format!("{:?}", *value) + } + Err(_) => "param_eval_lock_error".to_string(), + }, + }; + + param_finger_print.hash(&mut hasher); + } + hasher.finish() +} + #[pg_guard] pub(super) extern "C-unwind" fn begin_foreign_scan< E: Into, @@ -402,6 +453,7 @@ pub(super) extern "C-unwind" fn begin_foreign_scan< // assign parameter values to qual assign_paramenter_value(node, &mut state); + state.param_fingerprint = compute_param_fingerprint(&state); // begin scan if it is not EXPLAIN statement if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 { @@ -497,7 +549,14 @@ pub(super) extern "C-unwind" fn re_scan_foreign_scan< let fdw_state = (*node).fdw_state as *mut FdwState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - let result = state.re_scan(); + assign_paramenter_value(node, &mut state); + let next_fingerprint = compute_param_fingerprint(&state); + let result = if next_fingerprint != state.param_fingerprint { + state.param_fingerprint = next_fingerprint; + state.begin_scan() + } else { + state.re_scan() + }; if result.is_err() { drop_fdw_state(state.as_ptr()); (*node).fdw_state = ptr::null::>() as _; From 8dc0cda8b3308fee7e3fc8f8775c2fedf29fcd99 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Thu, 12 Mar 2026 18:41:03 +1100 Subject: [PATCH 2/2] refactor: update parameter fingerprint to use String for better handling during rescan --- supabase-wrappers/src/scan.rs | 78 +++++++++++++++-------------------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index dc2030bb5..7c27d119a 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -6,8 +6,6 @@ use pgrx::{ prelude::*, }; use std::collections::HashMap; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use pgrx::pg_sys::panic::ErrorReport; @@ -54,7 +52,7 @@ struct FdwState, W: ForeignDataWrapper> { nulls: Vec, row: Row, // fingerprint of current parameter values to detect rescan changes - param_fingerprint: u64, + param_fingerprint: String, _phantom: PhantomData, } @@ -71,7 +69,7 @@ impl, W: ForeignDataWrapper> FdwState { values: Vec::new(), nulls: Vec::new(), row: Row::new(), - param_fingerprint: 0, + param_fingerprint: String::new(), _phantom: PhantomData, } } @@ -334,8 +332,8 @@ pub(super) extern "C-unwind" fn explain_foreign_scan< } } -// extract paramter value and assign it to qual in scan state -unsafe fn assign_paramenter_value, W: ForeignDataWrapper>( +// extract parameter value and assign it to qual in scan state +unsafe fn assign_parameter_value, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, state: &mut FdwState, ) { @@ -396,44 +394,32 @@ unsafe fn assign_paramenter_value, W: ForeignDataWrapper } } -#[derive(Hash, PartialEq, Eq)] -struct ParamFingerPrint<'a> { - field: &'a str, - operator: &'a str, - use_or: bool, - kind: u32, - id: usize, - type_oid: Oid, - eval_value: String, -} - fn compute_param_fingerprint, W: ForeignDataWrapper>( state: &FdwState, -) -> u64 { - let mut hasher = DefaultHasher::new(); - - for qual in &state.quals { - let Some(param) = &qual.param else { - continue; - }; - let param_finger_print = ParamFingerPrint { - field: &qual.field, - operator: &qual.operator, - use_or: qual.use_or, - kind: param.kind, - id: param.id, - type_oid: param.type_oid, - eval_value: match param.eval_value.lock() { - Ok(value) => { - format!("{:?}", *value) - } - Err(_) => "param_eval_lock_error".to_string(), - }, - }; - - param_finger_print.hash(&mut hasher); - } - hasher.finish() +) -> String { + state + .quals + .iter() + .filter_map(|qual| { + qual.param.as_ref().map(|param| { + let eval_value = match param.eval_value.lock() { + Ok(value) => format!("{:?}", *value), + Err(_) => "lock_error".to_string(), + }; + format!( + "{}|{}|{}|{}|{}|{}|{}", + qual.field, + qual.operator, + qual.use_or, + param.kind, + param.id, + param.type_oid, + eval_value, + ) + }) + }) + .collect::>() + .join(";") } #[pg_guard] @@ -452,7 +438,7 @@ pub(super) extern "C-unwind" fn begin_foreign_scan< assert!(!state.is_null()); // assign parameter values to qual - assign_paramenter_value(node, &mut state); + assign_parameter_value(node, &mut state); state.param_fingerprint = compute_param_fingerprint(&state); // begin scan if it is not EXPLAIN statement @@ -492,7 +478,7 @@ pub(super) extern "C-unwind" fn iterate_foreign_scan< let mut state = PgBox::>::from_pg((*node).fdw_state as _); // evaluate parameter values - assign_paramenter_value(node, &mut state); + assign_parameter_value(node, &mut state); // clear slot let slot = (*node).ss.ss_ScanTupleSlot; @@ -549,10 +535,12 @@ pub(super) extern "C-unwind" fn re_scan_foreign_scan< let fdw_state = (*node).fdw_state as *mut FdwState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - assign_paramenter_value(node, &mut state); + assign_parameter_value(node, &mut state); let next_fingerprint = compute_param_fingerprint(&state); let result = if next_fingerprint != state.param_fingerprint { state.param_fingerprint = next_fingerprint; + // end the active scan to release resources before restarting with new params + let _ = state.end_scan(); state.begin_scan() } else { state.re_scan()