Skip to content

Commit 6fa4ea8

Browse files
wenermeburmecia
andauthored
feat: add aggregate pushdown support (COUNT/SUM/AVG/MIN/MAX) (#586)
* feat: add aggregate pushdown support (COUNT/SUM/AVG/MIN/MAX) Based on PR #549 by JohnCari with critical fix: aggregates and group_by are now stored in FdwState and passed through output_rel->fdw_private to the executor. Previously fdw_private was null, so extracted aggregates were silently discarded. Changes: - Add AggregateKind enum, Aggregate struct with deparse() methods - Add FDW trait methods: supported_aggregates(), supports_group_by(), get_aggregate_rel_size(), begin_aggregate_scan() - Add GetForeignUpperPaths callback in new upper.rs module - Add aggregates/group_by fields to FdwState in scan.rs - begin_foreign_scan detects aggregate path and calls begin_aggregate_scan - EXPLAIN output includes aggregate info when present - PG version compat: PG13-PG18 (disabled_nodes, fdw_restrictinfo) * fix: address CodeRabbit review — validate aggregates and GROUP BY 1. Reject pushdown when SUM/AVG/MIN/MAX has no simple column reference (e.g. SUM(a+b)) to avoid generating invalid SQL like SUM(*) 2. Abort pushdown when any GROUP BY item is not a plain column reference to prevent incomplete GROUP BY clauses 3. Rebuild state.tgts for aggregate paths to reflect the actual output shape (group_by columns + aggregate result columns), preventing ERRCODE_FDW_INVALID_COLUMN_NUMBER from the executor * fix: guard upper module import with PG feature cfg The upper module is conditionally compiled with pg13-pg18 feature flags, but fdw_routine() imported it unconditionally, causing build failures without PG features. Wrap the import and GetForeignUpperPaths assignment with the same cfg guard. * fix: address CodeRabbit round 2 — aggfilter, aggtype, default warning 1. Reject pushdown for aggregates with FILTER clause (aggfilter check) 2. Add type_oid field to Aggregate struct, populated from Aggref::aggtype instead of inferring from the input column type 3. Default begin_aggregate_scan() now emits a warning if called without an override, to surface missing implementations early * fix: fail fast in default begin_aggregate_scan instead of warning Use report_error to abort the transaction when begin_aggregate_scan is called without being overridden. This prevents wrong-results failures when an FDW declares aggregate support but forgets to implement the scan method. * feat: implement aggregate pushdown support for ClickHouse FDW * feat: add aggregate pushdown support for BigQuery and SQL Server FDWs * fix: update explain statements to use EXPLAIN ANALYZE for pushdown tests --------- Co-authored-by: Bo Lu <lv.patrick@gmail.com>
1 parent 926df23 commit 6fa4ea8

19 files changed

Lines changed: 2993 additions & 100 deletions

File tree

docs/catalog/bigquery.md

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,46 @@ create foreign table bigquery.my_bigquery_table (
144144

145145
#### Notes
146146

147-
- Supports `where`, `order by` and `limit` clause pushdown
147+
- Supports `where`, `order by`, `limit` and aggregate clause pushdown
148148
- When using `rowid_column`, it must be specified for data modification operations
149149
- Data in the streaming buffer cannot be updated or deleted until the buffer is flushed (up to 90 minutes)
150150

151151
## Query Pushdown Support
152152

153153
This FDW supports `where`, `order by` and `limit` clause pushdown.
154154

155+
### Aggregate Pushdown
156+
157+
The FDW pushes common aggregate queries down to BigQuery so the aggregation
158+
runs remotely and only the final result rows are transferred to Postgres. This
159+
is much faster than fetching every row and aggregating locally, especially
160+
over large tables — and on BigQuery it also reduces the bytes scanned billed
161+
to your project.
162+
163+
**Supported aggregates**`count(*)`, `count(col)`, `count(distinct col)`,
164+
`sum(col)`, `avg(col)`, `min(col)`, `max(col)`.
165+
166+
**Supported shapes** — scalar aggregates, `group by` over plain columns, with
167+
or without a `where` clause. Pushdown also works when the foreign `table`
168+
option is a sub-query.
169+
170+
```sql
171+
-- All of these run as a single aggregate query on BigQuery:
172+
select count(*) from bigquery.my_table;
173+
select id, sum(amount) from bigquery.my_table group by id;
174+
select count(distinct name) from bigquery.my_table where id = 1;
175+
```
176+
177+
**Cases that are not pushed down** — the query still returns the correct
178+
result, but the aggregation happens in Postgres after fetching the rows:
179+
180+
- The query has a `having` clause
181+
- The aggregate has a `filter (where …)` clause
182+
- A `distinct` modifier is used on anything other than `count`
183+
- The aggregate's argument is not a plain column (for example `sum(a + 1)`)
184+
- A `group by` item is not a plain column (for example `group by id + 1`)
185+
- The aggregate function is not in the list above (for example `stddev`, `string_agg`)
186+
155187
## Inserting Rows & the Streaming Buffer
156188

157189
This foreign data wrapper uses BigQuery’s `insertAll` API method to create a `streamingBuffer` with an associated partition time. **Within that partition time, the data cannot be updated, deleted, or fully exported**. Only after the time has elapsed (up to 90 minutes according to [BigQuery’s documentation](https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery)), can you perform operations.
@@ -256,3 +288,67 @@ where id = 1;
256288
delete from bigquery.people
257289
where id = 2;
258290
```
291+
292+
### Aggregate Query Examples
293+
294+
These examples assume an `orders` table on BigQuery and a matching foreign
295+
table on Postgres:
296+
297+
```sql
298+
-- Run on BigQuery
299+
create table your_project_id.your_dataset_id.orders (
300+
id int64,
301+
user_id int64,
302+
amount numeric,
303+
status string
304+
);
305+
306+
insert into your_project_id.your_dataset_id.orders values
307+
(1, 1, 100.0, 'paid'),
308+
(2, 1, 50.0, 'paid'),
309+
(3, 2, 200.0, 'pending'),
310+
(4, 2, 75.0, 'paid'),
311+
(5, 3, 300.0, 'paid');
312+
```
313+
314+
```sql
315+
-- Foreign table on Postgres
316+
create foreign table bigquery.orders (
317+
id bigint,
318+
user_id bigint,
319+
amount numeric,
320+
status text
321+
)
322+
server bigquery_server
323+
options (
324+
table 'orders'
325+
);
326+
```
327+
328+
Each query below runs a single aggregate query against BigQuery and returns
329+
just the result rows:
330+
331+
```sql
332+
-- Total order count
333+
select count(*) from bigquery.orders;
334+
335+
-- Total revenue from paid orders
336+
select sum(amount) from bigquery.orders where status = 'paid';
337+
338+
-- Per-user order count and revenue
339+
select user_id, count(*) as orders, sum(amount) as revenue
340+
from bigquery.orders
341+
group by user_id
342+
order by user_id;
343+
344+
-- Smallest and largest order
345+
select min(amount), max(amount) from bigquery.orders;
346+
347+
-- Number of distinct users who placed an order
348+
select count(distinct user_id) from bigquery.orders;
349+
350+
-- Average order value per status
351+
select status, avg(amount) as avg_amount
352+
from bigquery.orders
353+
group by status;
354+
```

docs/catalog/clickhouse.md

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,45 @@ create foreign table clickhouse.my_table (
153153

154154
#### Notes
155155

156-
- Supports `where`, `order by` and `limit` clause pushdown
156+
- Supports `where`, `order by`, `limit` and aggregate clause pushdown
157157
- Supports parametrized views in subqueries
158158
- When using `rowid_column`, it must be specified for data modification operations
159159

160160
## Query Pushdown Support
161161

162162
This FDW supports `where`, `order by` and `limit` clause pushdown, as well as parametrized view (see above).
163163

164+
### Aggregate Pushdown
165+
166+
The FDW pushes common aggregate queries down to ClickHouse so the aggregation
167+
runs remotely and only the final result rows are transferred to Postgres. This
168+
is much faster than fetching every row and aggregating locally, especially
169+
over large tables.
170+
171+
**Supported aggregates**`count(*)`, `count(col)`, `count(distinct col)`,
172+
`sum(col)`, `avg(col)`, `min(col)`, `max(col)`.
173+
174+
**Supported shapes** — scalar aggregates, `group by` over plain columns, with
175+
or without a `where` clause. Pushdown also works when the foreign `table`
176+
option is a sub-query or a parametrized view.
177+
178+
```sql
179+
-- All of these run as a single aggregate query on ClickHouse:
180+
select count(*) from clickhouse.my_table;
181+
select id, sum(amount) from clickhouse.my_table group by id;
182+
select count(distinct name) from clickhouse.my_table where id = 1;
183+
```
184+
185+
**Cases that are not pushed down** — the query still returns the correct
186+
result, but the aggregation happens in Postgres after fetching the rows:
187+
188+
- The query has a `having` clause
189+
- The aggregate has a `filter (where …)` clause
190+
- A `distinct` modifier is used on anything other than `count`
191+
- The aggregate's argument is not a plain column (for example `sum(a + 1)`)
192+
- A `group by` item is not a plain column (for example `group by id + 1`)
193+
- The aggregate function is not in the list above (for example `stddev`, `string_agg`)
194+
164195
## Supported Data Types
165196
166197
| Postgres Type | ClickHouse Type |
@@ -244,3 +275,69 @@ insert into clickhouse.people values (4, 'Yoda');
244275
update clickhouse.people set name = 'Princess Leia' where id = 2;
245276
delete from clickhouse.people where id = 3;
246277
```
278+
279+
### Aggregate Query Examples
280+
281+
These examples assume the `clickhouse.people` foreign table from the previous
282+
section, plus an `orders` table with a row per order:
283+
284+
```sql
285+
-- Run on ClickHouse
286+
create table orders (
287+
id Int64,
288+
person_id Int64,
289+
amount Float64,
290+
status String
291+
)
292+
engine=MergeTree()
293+
order by id;
294+
295+
insert into orders values
296+
(1, 1, 100.0, 'paid'),
297+
(2, 1, 50.0, 'paid'),
298+
(3, 2, 200.0, 'pending'),
299+
(4, 2, 75.0, 'paid'),
300+
(5, 3, 300.0, 'paid');
301+
```
302+
303+
```sql
304+
-- Foreign table on Postgres
305+
create foreign table clickhouse.orders (
306+
id bigint,
307+
person_id bigint,
308+
amount double precision,
309+
status text
310+
)
311+
server clickhouse_server
312+
options (
313+
table 'orders'
314+
);
315+
```
316+
317+
Each query below runs a single aggregate query against ClickHouse and returns
318+
just the result rows:
319+
320+
```sql
321+
-- Total order count
322+
select count(*) from clickhouse.orders;
323+
324+
-- Total revenue from paid orders
325+
select sum(amount) from clickhouse.orders where status = 'paid';
326+
327+
-- Per-person order count and revenue
328+
select person_id, count(*) as orders, sum(amount) as revenue
329+
from clickhouse.orders
330+
group by person_id
331+
order by person_id;
332+
333+
-- Smallest and largest order
334+
select min(amount), max(amount) from clickhouse.orders;
335+
336+
-- Number of distinct customers who placed an order
337+
select count(distinct person_id) from clickhouse.orders;
338+
339+
-- Average order value per status
340+
select status, avg(amount) as avg_amount
341+
from clickhouse.orders
342+
group by status;
343+
```

docs/catalog/mssql.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,50 @@ create foreign table mssql.users (
145145
- `where` clauses
146146
- `order by` clauses
147147
- `limit` clauses
148+
- aggregate clauses
148149
- See Data Types section for type mappings between PostgreSQL and SQL Server
149150

150151
## Query Pushdown Support
151152

152153
This FDW supports `where`, `order by` and `limit` clause pushdown.
153154

155+
### Aggregate Pushdown
156+
157+
The FDW pushes common aggregate queries down to SQL Server so the aggregation
158+
runs remotely and only the final result rows are transferred to Postgres. This
159+
is much faster than fetching every row and aggregating locally, especially
160+
over large tables.
161+
162+
**Supported aggregates**`count(*)`, `count(col)`, `count(distinct col)`,
163+
`sum(col)`, `avg(col)`, `min(col)`, `max(col)`.
164+
165+
**Supported shapes** — scalar aggregates, `group by` over plain columns, with
166+
or without a `where` clause. Pushdown also works when the foreign `table`
167+
option is a sub-query.
168+
169+
```sql
170+
-- All of these run as a single aggregate query on SQL Server:
171+
select count(*) from mssql.users;
172+
select id, sum(amount) from mssql.users group by id;
173+
select count(distinct name) from mssql.users where id = 42;
174+
```
175+
176+
`count(*)` and `count(col)` are translated to SQL Server's `count_big` so the
177+
result fits Postgres' `bigint` without overflow. Each aggregate is also wrapped
178+
in a `cast(... as <sql_server_type>)` so values come back in the exact type
179+
Postgres expects (for example, `sum` over a `bigint` column is cast to
180+
`numeric`, matching Postgres' `sum(bigint) → numeric` rule).
181+
182+
**Cases that are not pushed down** — the query still returns the correct
183+
result, but the aggregation happens in Postgres after fetching the rows:
184+
185+
- The query has a `having` clause
186+
- The aggregate has a `filter (where …)` clause
187+
- A `distinct` modifier is used on anything other than `count`
188+
- The aggregate's argument is not a plain column (for example `sum(a + 1)`)
189+
- A `group by` item is not a plain column (for example `group by id + 1`)
190+
- The aggregate function is not in the list above (for example `stddev`, `string_agg`)
191+
154192
## Supported Data Types
155193

156194
| Postgres Type | SQL Server Type |
@@ -231,3 +269,67 @@ create foreign table mssql.users_subquery (
231269

232270
select * from mssql.users_subquery;
233271
```
272+
273+
### Aggregate Query Examples
274+
275+
These examples assume an `orders` table on SQL Server and a matching foreign
276+
table on Postgres:
277+
278+
```sql
279+
-- Run on SQL Server
280+
create table orders (
281+
id bigint,
282+
user_id bigint,
283+
amount numeric(18,2),
284+
status varchar(20)
285+
);
286+
287+
insert into orders (id, user_id, amount, status) values
288+
(1, 42, 100.00, 'paid'),
289+
(2, 42, 50.00, 'paid'),
290+
(3, 43, 200.00, 'pending'),
291+
(4, 43, 75.00, 'paid'),
292+
(5, 44, 300.00, 'paid');
293+
```
294+
295+
```sql
296+
-- Foreign table on Postgres
297+
create foreign table mssql.orders (
298+
id bigint,
299+
user_id bigint,
300+
amount numeric(18,2),
301+
status text
302+
)
303+
server mssql_server
304+
options (
305+
table 'orders'
306+
);
307+
```
308+
309+
Each query below runs a single aggregate query against SQL Server and returns
310+
just the result rows:
311+
312+
```sql
313+
-- Total order count
314+
select count(*) from mssql.orders;
315+
316+
-- Total revenue from paid orders
317+
select sum(amount) from mssql.orders where status = 'paid';
318+
319+
-- Per-user order count and revenue
320+
select user_id, count(*) as orders, sum(amount) as revenue
321+
from mssql.orders
322+
group by user_id
323+
order by user_id;
324+
325+
-- Smallest and largest order
326+
select min(amount), max(amount) from mssql.orders;
327+
328+
-- Number of distinct users who placed an order
329+
select count(distinct user_id) from mssql.orders;
330+
331+
-- Average order value per status
332+
select status, avg(amount) as avg_amount
333+
from mssql.orders
334+
group by status;
335+
```

docs/contributing/native.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ pub trait ForeignDataWrapper {
2525
fn delete(...);
2626
fn end_modify(...);
2727

28+
// functions for aggregate pushdown (optional)
29+
fn supported_aggregates(...) -> Vec<AggregateKind>;
30+
fn supports_group_by(...) -> bool;
31+
fn get_aggregate_rel_size(...) -> (i64, i32);
32+
fn begin_aggregate_scan(...);
33+
2834
// other optional functions
2935
...
3036
}

0 commit comments

Comments
 (0)