feat: add aggregate pushdown support (COUNT/SUM/AVG/MIN/MAX)#586
feat: add aggregate pushdown support (COUNT/SUM/AVG/MIN/MAX)#586wenerme wants to merge 5 commits intosupabase:mainfrom
Conversation
Based on PR supabase#549 by JohnCari with critical fix: aggregates and group_by are now stored in FdwState and passed through output_rel->fdw_private to the executor. Previously fdw_private was null, so extracted aggregates were silently discarded. Changes: - Add AggregateKind enum, Aggregate struct with deparse() methods - Add FDW trait methods: supported_aggregates(), supports_group_by(), get_aggregate_rel_size(), begin_aggregate_scan() - Add GetForeignUpperPaths callback in new upper.rs module - Add aggregates/group_by fields to FdwState in scan.rs - begin_foreign_scan detects aggregate path and calls begin_aggregate_scan - EXPLAIN output includes aggregate info when present - PG version compat: PG13-PG18 (disabled_nodes, fdw_restrictinfo)
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Central YAML (base), Organization UI (inherited) Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughSummary by CodeRabbit
WalkthroughThe pull request adds aggregate pushdown to the ForeignDataWrapper framework. It introduces public types AggregateKind and Aggregate, extends the ForeignDataWrapper trait with supported_aggregates, supports_group_by, get_aggregate_rel_size, and begin_aggregate_scan, and wires an upper-path planner hook that extracts aggregates and GROUP BY columns during the GROUP_AGG stage. The scan layer gains aggregates and group_by state, aggregate-aware begin/init logic, and explain output. Documentation is updated with guidance and examples for implementing aggregate pushdown. Sequence Diagram(s)sequenceDiagram
participant QueryPlanner as Query Planner
participant UpperPath as Upper Path Module
participant FDWInterface as FDW Interface
participant FDWScan as FDW Scan
participant RemoteSource as Remote Data Source
QueryPlanner->>UpperPath: get_foreign_upper_paths(stage = GROUP_AGG)
UpperPath->>UpperPath: extract_aggregates() and extract_group_by_columns()
UpperPath->>FDWInterface: supported_aggregates() / supports_group_by() validation
alt Aggregates Supported
UpperPath->>FDWInterface: populate FdwState.aggregates & .group_by
UpperPath->>FDWInterface: get_aggregate_rel_size() for cost
UpperPath->>QueryPlanner: register foreign upper path
else Unsupported
UpperPath->>QueryPlanner: skip pushdown (no path)
end
QueryPlanner->>FDWScan: begin_foreign_scan()
FDWScan->>FDWScan: is_aggregate_scan()?
alt Aggregate Scan Active
FDWScan->>FDWInterface: begin_aggregate_scan(aggregates, group_by)
FDWInterface->>RemoteSource: execute pushed-down aggregate query (GROUP BY)
else Standard Scan
FDWScan->>RemoteSource: execute standard row scan
end
RemoteSource-->>FDWScan: return rows/aggregates
FDWScan-->>QueryPlanner: complete
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
supabase-wrappers/src/interface.rs (1)
1183-1199:⚠️ Potential issue | 🔴 CriticalGuard
upperimport and usage with the same feature cfg as module declaration.The
uppermodule is conditionally compiled only when PG features are enabled (lib.rs lines 324–331), but it's imported unconditionally infdw_routine()(line 1183) and used unconditionally (line 1198). This causes a compile error when building withoutpg13..pg18features.Wrap lines 1183 and 1198 with
#[cfg(any(feature = "pg13", feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17", feature = "pg18"))]or conditionally assignGetForeignUpperPathsonly when the module is available.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@supabase-wrappers/src/interface.rs` around lines 1183 - 1199, The call site unconditionally imports and assigns the upper module functionality causing build failures when PG feature flags are absent; update fdw_routine() so the import of upper and the assignment to fdw_routine.GetForeignUpperPaths (upper::get_foreign_upper_paths::<E, Self>) are guarded by the same cfg used where the upper module is declared (any(feature = "pg13", "pg14", "pg15", "pg16", "pg17", "pg18")), either by adding #[cfg(...)] to the import and the single assignment line or by performing a conditional assignment only when that cfg is true, while leaving the rest of fdw_routine unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@supabase-wrappers/src/upper.rs`:
- Around line 328-330: The FDW state is only saving state.aggregates and
state.group_by but not updating state.tgts (the target-column metadata) that the
executor and the scan path (tuple validation/mapping) rely on; update state.tgts
after computing aggregates/group_by so it reflects the actual output columns
(including aggregate result columns and any group-by columns) and their
types/names, ensuring the scan/executor will validate/match the tuple descriptor
correctly (adjust the same place where you set state.aggregates/state.group_by
to also rebuild/assign state.tgts based on those aggregates and group_by).
- Around line 231-245: The loop over SortGroupClause currently skips non-Var
group keys which leads to incomplete GROUP BY pushdowns; change the logic in the
block iterating list_iter::<pg_sys::SortGroupClause>(group_clause) so that if a
group's TargetEntry expr is not a Var or extract_column_from_var(root, expr as
*mut pg_sys::Var) returns None you abort the pushdown path (e.g., return an Err
or a specific "cannot pushdown" result) instead of silently continuing; update
references to tle_resno, the inner list_iter::<pg_sys::TargetEntry>(target_list)
loop, and group_by so that only when extract_column_from_var succeeds do you
push to group_by, otherwise immediately stop and propagate the failure to
prevent constructing incomplete GROUP BY clauses.
- Around line 161-177: The current code lets aggregates with non-column
arguments fall through (setting column to None) and later deparses with
unwrap_or("*"), producing invalid SQL; update extract_aggregates to validate the
aggregate kind (e.g., check aggref->aggkind or the agg function OID for
Sum/Avg/Min/Max) and if the aggregate requires a simple column argument but
extract_column_from_var returned None, reject the aggregate by returning None
from extract_aggregates rather than allowing pushdown; ensure this check
references the same aggref and the local column variable so only aggregates with
a non-None column proceed to deparse.
---
Outside diff comments:
In `@supabase-wrappers/src/interface.rs`:
- Around line 1183-1199: The call site unconditionally imports and assigns the
upper module functionality causing build failures when PG feature flags are
absent; update fdw_routine() so the import of upper and the assignment to
fdw_routine.GetForeignUpperPaths (upper::get_foreign_upper_paths::<E, Self>) are
guarded by the same cfg used where the upper module is declared (any(feature =
"pg13", "pg14", "pg15", "pg16", "pg17", "pg18")), either by adding #[cfg(...)]
to the import and the single assignment line or by performing a conditional
assignment only when that cfg is true, while leaving the rest of fdw_routine
unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 54507870-00cd-40b3-820e-d24d479ec17e
📒 Files selected for processing (6)
docs/contributing/native.mddocs/guides/query-pushdown.mdsupabase-wrappers/src/interface.rssupabase-wrappers/src/lib.rssupabase-wrappers/src/scan.rssupabase-wrappers/src/upper.rs
1. Reject pushdown when SUM/AVG/MIN/MAX has no simple column reference (e.g. SUM(a+b)) to avoid generating invalid SQL like SUM(*) 2. Abort pushdown when any GROUP BY item is not a plain column reference to prevent incomplete GROUP BY clauses 3. Rebuild state.tgts for aggregate paths to reflect the actual output shape (group_by columns + aggregate result columns), preventing ERRCODE_FDW_INVALID_COLUMN_NUMBER from the executor
The upper module is conditionally compiled with pg13-pg18 feature flags, but fdw_routine() imported it unconditionally, causing build failures without PG features. Wrap the import and GetForeignUpperPaths assignment with the same cfg guard.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@supabase-wrappers/src/interface.rs`:
- Around line 1027-1029: The default no-op implementation of
begin_aggregate_scan() must not succeed silently when supported_aggregates()
advertises aggregate support; update the default to return an error (e.g.,
Err(FdwError::unsupported) or a descriptive Err indicating begin_aggregate_scan
is unimplemented) so the planner fails fast instead of producing wrong results,
and/or change supported_aggregates() default to return an empty Vec unless the
implementer overrides it; apply the same fix to the other occurrence noted
around begin_aggregate_scan (lines 1148-1159) so any FDW that advertises
aggregates but doesn't implement begin_aggregate_scan will error rather than
proceed.
In `@supabase-wrappers/src/upper.rs`:
- Around line 352-381: The Aggregate output types are being inferred from the
input column; add an aggtype field to the Aggregate struct (populated in
extract_aggregates from Aggref::aggtype) and use that agg.aggtype when
rebuilding state.tgts in the loop that constructs new_tgts (instead of
agg.column.as_ref().map(|c| c.type_oid)). Ensure extract_aggregates sets aggtype
for COUNT/SUM/AVG cases (fallback to pg_sys::INT8OID only if aggtype is absent)
so state.tgts contains the correct OID for executor tuple/slot validation.
- Around line 99-209: extract_aggregates currently ignores Aggref.aggfilter and
thus may push down aggregates with FILTER clauses producing incorrect results;
update extract_aggregates to reject any aggregate with a non-null
(*aggref).aggfilter (or extend the Aggregate struct to carry the filter
predicate and ensure remote SQL generation uses it). Specifically, in
extract_aggregates (where aggref is processed) check if
!(*aggref).aggfilter.is_null(); if so either return None (to abort pushdown) or
add a new field (e.g., filter: Option<Expr> or filter_sql: Option<String>) to
the Aggregate struct and populate it from (*aggref).aggfilter so the filter can
be preserved and applied by the remote SQL generator that consumes Aggregate.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: b8fddce0-bce8-457a-99ef-e2771d39e69e
📒 Files selected for processing (2)
supabase-wrappers/src/interface.rssupabase-wrappers/src/upper.rs
1. Reject pushdown for aggregates with FILTER clause (aggfilter check) 2. Add type_oid field to Aggregate struct, populated from Aggref::aggtype instead of inferring from the input column type 3. Default begin_aggregate_scan() now emits a warning if called without an override, to surface missing implementations early
There was a problem hiding this comment.
♻️ Duplicate comments (1)
supabase-wrappers/src/interface.rs (1)
1156-1170:⚠️ Potential issue | 🔴 CriticalFail fast when aggregate pushdown is advertised but not implemented.
Lines 1163-1170 still warn and return
Ok(()). If an FDW overridessupported_aggregates()but forgetsbegin_aggregate_scan(), the planner can pick the aggregate path and execution will continue without any aggregate result source being initialized, which is a wrong-results failure mode. This default needs to hard-fail instead of succeeding.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@supabase-wrappers/src/interface.rs` around lines 1156 - 1170, The default implementation of begin_aggregate_scan currently only logs a warning and returns Ok(()), which leads to incorrect execution when supported_aggregates() advertises aggregate pushdown but begin_aggregate_scan() is not implemented; change begin_aggregate_scan to fail fast by returning an Err with a clear error indicating that aggregate pushdown was advertised but begin_aggregate_scan is unimplemented (use the error type E expected by the signature), and include a message referencing supported_aggregates() and begin_aggregate_scan to aid debugging; update the crate::utils::report_warning call to a report_error or remove it in favor of returning the Err so callers receive the failure immediately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@supabase-wrappers/src/interface.rs`:
- Around line 1156-1170: The default implementation of begin_aggregate_scan
currently only logs a warning and returns Ok(()), which leads to incorrect
execution when supported_aggregates() advertises aggregate pushdown but
begin_aggregate_scan() is not implemented; change begin_aggregate_scan to fail
fast by returning an Err with a clear error indicating that aggregate pushdown
was advertised but begin_aggregate_scan is unimplemented (use the error type E
expected by the signature), and include a message referencing
supported_aggregates() and begin_aggregate_scan to aid debugging; update the
crate::utils::report_warning call to a report_error or remove it in favor of
returning the Err so callers receive the failure immediately.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 4651f9d9-6044-4d73-b47e-6e1e8228bf56
📒 Files selected for processing (2)
supabase-wrappers/src/interface.rssupabase-wrappers/src/upper.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- supabase-wrappers/src/upper.rs
Use report_error to abort the transaction when begin_aggregate_scan is called without being overridden. This prevents wrong-results failures when an FDW declares aggregate support but forgets to implement the scan method.
Continuation of #549 by @JohnCari — picks up the aggregate pushdown work with a critical bug fix.
Summary
AggregateKindenum,Aggregatestruct withdeparse()/deparse_with_alias()methodssupported_aggregates(),supports_group_by(),get_aggregate_rel_size(),begin_aggregate_scan()— all with backward-compatible defaultsGetForeignUpperPathscallback in newupper.rsmoduleGetForeignUpperPathsinfdw_routine()disabled_nodes,fdw_restrictinfo)Critical fix from #549
Bug:
upper.rsextracted aggregates and group_by from the query but passedfdw_private: ptr::null_mut()when creating the foreign upper path. The executor never received the aggregate info, sobegin_aggregate_scan()was never called.Fix: Store aggregates and group_by in
FdwState, pass the state pointer throughoutput_rel->fdw_privateso it survives from planning to execution.begin_foreign_scannow detects aggregate paths and dispatches tobegin_aggregate_scan().Test plan
cargo fmt --checkpassesclippy -D warningspasses (zero warnings)cargo test— 19 passed, 0 failed (unit + doc tests)cargo pgrx test --features "helloworld_fdw pg16"— compiles and passessupported_aggregates()default)