-
-
Notifications
You must be signed in to change notification settings - Fork 99
scan: refresh parameter state and rescan on parameter changes (PARAM_EXTERN + PARAM_EXEC) #587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,8 @@ use pgrx::{ | |||||||||||||||||||||||
| prelude::*, | ||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||
| use std::collections::HashMap; | ||||||||||||||||||||||||
| use std::collections::hash_map::DefaultHasher; | ||||||||||||||||||||||||
| use std::hash::{Hash, Hasher}; | ||||||||||||||||||||||||
| use std::marker::PhantomData; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| use pgrx::pg_sys::panic::ErrorReport; | ||||||||||||||||||||||||
|
|
@@ -51,6 +53,8 @@ struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> { | |||||||||||||||||||||||
| values: Vec<Datum>, | ||||||||||||||||||||||||
| nulls: Vec<bool>, | ||||||||||||||||||||||||
| row: Row, | ||||||||||||||||||||||||
| // fingerprint of current parameter values to detect rescan changes | ||||||||||||||||||||||||
| param_fingerprint: u64, | ||||||||||||||||||||||||
| _phantom: PhantomData<E>, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
@@ -67,6 +71,7 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> { | |||||||||||||||||||||||
| values: Vec::new(), | ||||||||||||||||||||||||
| nulls: Vec::new(), | ||||||||||||||||||||||||
| row: Row::new(), | ||||||||||||||||||||||||
| param_fingerprint: 0, | ||||||||||||||||||||||||
| _phantom: PhantomData, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
@@ -341,19 +346,23 @@ unsafe fn assign_paramenter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E> | |||||||||||||||||||||||
| // assign parameter value to qual | ||||||||||||||||||||||||
| for qual in &mut state.quals.iter_mut() { | ||||||||||||||||||||||||
| if let Some(param) = &mut qual.param { | ||||||||||||||||||||||||
| let mut current_value: Option<Value> = None; | ||||||||||||||||||||||||
| match param.kind { | ||||||||||||||||||||||||
| ParamKind::PARAM_EXTERN => { | ||||||||||||||||||||||||
| // get parameter list in execution state | ||||||||||||||||||||||||
| let plist_info = (*estate).es_param_list_info; | ||||||||||||||||||||||||
| if plist_info.is_null() { | ||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| let params_cnt = (*plist_info).numParams as usize; | ||||||||||||||||||||||||
| let plist = (*plist_info).params.as_slice(params_cnt); | ||||||||||||||||||||||||
| let p: pg_sys::ParamExternData = plist[param.id - 1]; | ||||||||||||||||||||||||
| if let Some(cell) = Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype) | ||||||||||||||||||||||||
| { | ||||||||||||||||||||||||
| qual.value = Value::Cell(cell); | ||||||||||||||||||||||||
| if !plist_info.is_null() { | ||||||||||||||||||||||||
| let params_cnt = (*plist_info).numParams as usize; | ||||||||||||||||||||||||
| if param.id > 0 && param.id <= params_cnt { | ||||||||||||||||||||||||
| let plist = (*plist_info).params.as_slice(params_cnt); | ||||||||||||||||||||||||
| let p: pg_sys::ParamExternData = plist[param.id - 1]; | ||||||||||||||||||||||||
| if let Some(cell) = | ||||||||||||||||||||||||
| Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype) | ||||||||||||||||||||||||
| { | ||||||||||||||||||||||||
| qual.value = Value::Cell(cell.clone()); | ||||||||||||||||||||||||
| current_value = Some(Value::Cell(cell)); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| ParamKind::PARAM_EXEC => { | ||||||||||||||||||||||||
|
|
@@ -370,21 +379,63 @@ unsafe fn assign_paramenter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E> | |||||||||||||||||||||||
| ) && let Some(cell) = | ||||||||||||||||||||||||
| Cell::from_polymorphic_datum(datum, isnull, param.type_oid) | ||||||||||||||||||||||||
| { | ||||||||||||||||||||||||
| let mut eval_value = param | ||||||||||||||||||||||||
| .eval_value | ||||||||||||||||||||||||
| .lock() | ||||||||||||||||||||||||
| .expect("param.eval_value should be locked"); | ||||||||||||||||||||||||
| *eval_value = Some(Value::Cell(cell.clone())); | ||||||||||||||||||||||||
| qual.value = Value::Cell(cell); | ||||||||||||||||||||||||
| qual.value = Value::Cell(cell.clone()); | ||||||||||||||||||||||||
| current_value = Some(Value::Cell(cell)); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
357
to
382
|
||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| _ => {} | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| let mut eval_value = param | ||||||||||||||||||||||||
| .eval_value | ||||||||||||||||||||||||
| .lock() | ||||||||||||||||||||||||
| .expect("param.eval_value should be locked"); | ||||||||||||||||||||||||
| *eval_value = current_value; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| #[derive(Hash, PartialEq, Eq)] | ||||||||||||||||||||||||
| struct ParamFingerPrint<'a> { | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
| field: &'a str, | ||||||||||||||||||||||||
| operator: &'a str, | ||||||||||||||||||||||||
| use_or: bool, | ||||||||||||||||||||||||
| kind: u32, | ||||||||||||||||||||||||
| id: usize, | ||||||||||||||||||||||||
| type_oid: Oid, | ||||||||||||||||||||||||
| eval_value: String, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| fn compute_param_fingerprint<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>( | ||||||||||||||||||||||||
| state: &FdwState<E, W>, | ||||||||||||||||||||||||
| ) -> u64 { | ||||||||||||||||||||||||
| let mut hasher = DefaultHasher::new(); | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| for qual in &state.quals { | ||||||||||||||||||||||||
| let Some(param) = &qual.param else { | ||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||
| let param_finger_print = ParamFingerPrint { | ||||||||||||||||||||||||
| field: &qual.field, | ||||||||||||||||||||||||
| operator: &qual.operator, | ||||||||||||||||||||||||
| use_or: qual.use_or, | ||||||||||||||||||||||||
| kind: param.kind, | ||||||||||||||||||||||||
| id: param.id, | ||||||||||||||||||||||||
| type_oid: param.type_oid, | ||||||||||||||||||||||||
| eval_value: match param.eval_value.lock() { | ||||||||||||||||||||||||
| Ok(value) => { | ||||||||||||||||||||||||
| format!("{:?}", *value) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
| format!("{:?}", *value) | |
| // Derive a stable fingerprint from the parameter value itself | |
| let mut value_hasher = DefaultHasher::new(); | |
| (*value).hash(&mut value_hasher); | |
| value_hasher.finish().to_string() |
Copilot
AI
Mar 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new caller in re_scan_foreign_scan at this line still uses the typo-containing name assign_paramenter_value (should be assign_parameter_value). Since the PR adds this new call site, it would be appropriate to also rename the function to fix the pre-existing spelling error paramenter → parameter.
Copilot
AI
Mar 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When re_scan_foreign_scan detects a fingerprint change and calls begin_scan() (line 556), the previous scan session's resources have never been cleaned up via end_scan(). FDW implementations such as ClickHouseFdw, BigQueryFdw, IcebergFdw, etc., typically allocate connections, cursors, or buffers in begin_scan and release them in end_scan. Calling begin_scan() again without a preceding end_scan() may cause connection/resource leaks, or even double initialization bugs.
To fix this, end_scan() should be called on the state before invoking begin_scan() when the fingerprint has changed.
| state.param_fingerprint = next_fingerprint; | |
| state.begin_scan() | |
| // The parameters have changed; clean up the previous scan session | |
| // before starting a new one to avoid leaking resources. | |
| let end_result = state.end_scan(); | |
| if end_result.is_err() { | |
| end_result | |
| } else { | |
| state.param_fingerprint = next_fingerprint; | |
| state.begin_scan() | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't re-enter begin_scan() on a live FDW instance.
Line 556 turns begin_scan() into a rescan path without any teardown. The trait only defines re_scan()/end_scan() for restart/cleanup, so existing FDWs that allocate cursors or materialized buffers in begin_scan() can leak or append stale state when parameters change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@supabase-wrappers/src/scan.rs` around lines 554 - 557, The code currently
calls state.begin_scan() when parameters change (comparing next_fingerprint to
state.param_fingerprint), which can re-enter begin_scan() on a live FDW and leak
or corrupt resources; instead detect the parameter change and invoke the proper
restart/cleanup flow: if the FDW implements re_scan(), call state.re_scan() (or
the trait-equivalent) to let the FDW refresh cursors/buffers; otherwise call
state.end_scan() to tear down existing state and then call state.begin_scan() to
reinitialize cleanly; update the branch that compares next_fingerprint and
state.param_fingerprint to use re_scan()/end_scan()+begin_scan() rather than
directly calling begin_scan() on a live instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear
qual.valueon NULL / missing parameter updates.Line 393 resets
param.eval_valuetoNone, but Lines 362 and 382 only overwritequal.valueon theSome(cell)path. After a non-NULL → NULL/missing transition, FDWs that readqual.valuestill see the previous predicate, so the NULL-rescan case remains stale.Also applies to: 375-394
🤖 Prompt for AI Agents