Skip to content

Commit e56cc79

Browse files
committed
Use order by pushdowns from dbconnector
1 parent 6738a19 commit e56cc79

5 files changed

Lines changed: 230 additions & 79 deletions

File tree

src/include/postgres_scanner.hpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
#pragma once
1010

1111
#include "duckdb.hpp"
12+
13+
#include "dbconnector/bind_data.hpp"
14+
1215
#include "postgres_utils.hpp"
1316
#include "postgres_connection.hpp"
1417
#include "postgres_parameters.hpp"
@@ -20,7 +23,7 @@ struct PostgresLocalState;
2023
struct PostgresGlobalState;
2124
class PostgresTransaction;
2225

23-
struct PostgresBindData : public FunctionData {
26+
struct PostgresBindData : public dbconnector::BindData {
2427
static constexpr const idx_t DEFAULT_PAGES_PER_TASK = 1000;
2528

2629
public:
@@ -31,7 +34,6 @@ struct PostgresBindData : public FunctionData {
3134
string table_name;
3235
string sql;
3336
PostgresParameters params;
34-
string limit;
3537
idx_t pages_approx = 0;
3638

3739
vector<PostgresType> postgres_types;
@@ -50,6 +52,9 @@ struct PostgresBindData : public FunctionData {
5052
bool use_text_protocol = false;
5153
idx_t max_threads = 1;
5254

55+
dbconnector::optimizer::OrderByAndLimitBindData order_by_and_limit_bind_data;
56+
dbconnector::optimizer::AggregateBindData aggregate_bind_data;
57+
5358
public:
5459
void SetTablePages(idx_t approx_num_pages);
5560

@@ -69,6 +74,14 @@ struct PostgresBindData : public FunctionData {
6974
return false;
7075
}
7176

77+
dbconnector::optimizer::OrderByAndLimitBindData &GetOrderByAndLimitBindData() override {
78+
return order_by_and_limit_bind_data;
79+
}
80+
81+
dbconnector::optimizer::AggregateBindData &GetAggregateBindData() override {
82+
return aggregate_bind_data;
83+
}
84+
7285
private:
7386
optional_ptr<PostgresCatalog> pg_catalog;
7487
optional_ptr<PostgresTableEntry> pg_table;

src/postgres_extension.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ static void LoadInternal(ExtensionLoader &loader) {
195195
LogicalType::BOOLEAN, Value::BOOLEAN(true), DisablePool);
196196
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
197197
LogicalType::BOOLEAN, Value::BOOLEAN(true));
198+
config.AddExtensionOption("pg_order_pushdown", "Push ORDER BY and LIMIT clauses to Postgres (default: true)",
199+
LogicalType::BOOLEAN, Value::BOOLEAN(true));
198200
config.AddExtensionOption("pg_null_byte_replacement",
199201
"When writing NULL bytes to Postgres, replace them with the given character",
200202
LogicalType::VARCHAR, Value(), SetPostgresNullByteReplacement);

src/postgres_scanner.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,20 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
286286
string query;
287287
if (bind_data->table_name.empty()) {
288288
D_ASSERT(!bind_data->sql.empty());
289-
query = StringUtil::Format(R"(SELECT %s FROM (%s) AS __unnamed_subquery %s%s)", col_names, bind_data->sql,
290-
filter, bind_data->limit);
289+
query =
290+
StringUtil::Format(R"(SELECT %s FROM (%s) AS __unnamed_subquery %s)", col_names, bind_data->sql, filter);
291291

292292
} else {
293-
query = StringUtil::Format(R"(SELECT %s FROM %s.%s %s%s)", col_names,
293+
query = StringUtil::Format(R"(SELECT %s FROM %s.%s %s)", col_names,
294294
PostgresUtils::WriteIdentifier(bind_data->schema_name),
295-
PostgresUtils::WriteIdentifier(bind_data->table_name), filter, bind_data->limit);
295+
PostgresUtils::WriteIdentifier(bind_data->table_name), filter);
296+
}
297+
if (!bind_data->order_by_and_limit_bind_data.order_by_clause.empty()) {
298+
query += bind_data->order_by_and_limit_bind_data.order_by_clause;
299+
query += " NULLS LAST";
300+
}
301+
if (!bind_data->order_by_and_limit_bind_data.limit_clause.empty()) {
302+
query += bind_data->order_by_and_limit_bind_data.limit_clause;
296303
}
297304
if (!bind_data->use_text_protocol) {
298305
query = StringUtil::Format(R"(COPY (%s) TO STDOUT (FORMAT "binary");)", query);

src/storage/postgres_optimizer.cpp

Lines changed: 37 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,24 @@
1-
#include "storage/postgres_index_set.hpp"
2-
#include "storage/postgres_schema_entry.hpp"
3-
#include "storage/postgres_transaction.hpp"
41
#include "storage/postgres_optimizer.hpp"
2+
53
#include "duckdb/planner/operator/logical_get.hpp"
64
#include "duckdb/planner/operator/logical_limit.hpp"
7-
#include "storage/postgres_catalog.hpp"
5+
6+
#include "dbconnector/optimizer/order_by_and_limit_optimizer.hpp"
7+
#include "dbconnector/optimizer/optimizer_util.hpp"
8+
89
#include "postgres_scanner.hpp"
10+
#include "storage/postgres_index_set.hpp"
11+
#include "storage/postgres_schema_entry.hpp"
12+
#include "storage/postgres_transaction.hpp"
13+
#include "storage/postgres_catalog.hpp"
914

1015
namespace duckdb {
1116

1217
struct PostgresOperators {
1318
reference_map_t<PostgresCatalog, vector<reference<LogicalGet>>> scans;
1419
};
1520

16-
static void OptimizePostgresScanLimitPushdown(unique_ptr<LogicalOperator> &op) {
17-
if (op->type == LogicalOperatorType::LOGICAL_LIMIT) {
18-
auto &limit = op->Cast<LogicalLimit>();
19-
reference<LogicalOperator> child = *op->children[0];
20-
21-
while (child.get().type == LogicalOperatorType::LOGICAL_PROJECTION) {
22-
child = *child.get().children[0];
23-
}
24-
25-
if (child.get().type != LogicalOperatorType::LOGICAL_GET) {
26-
OptimizePostgresScanLimitPushdown(op->children[0]);
27-
return;
28-
}
29-
30-
auto &get = child.get().Cast<LogicalGet>();
31-
if (!PostgresCatalog::IsPostgresScan(get.function.name)) {
32-
OptimizePostgresScanLimitPushdown(op->children[0]);
33-
return;
34-
}
35-
36-
switch (limit.limit_val.Type()) {
37-
case LimitNodeType::CONSTANT_VALUE:
38-
case LimitNodeType::UNSET:
39-
break;
40-
default:
41-
// not a constant or unset limit
42-
OptimizePostgresScanLimitPushdown(op->children[0]);
43-
return;
44-
}
45-
switch (limit.offset_val.Type()) {
46-
case LimitNodeType::CONSTANT_VALUE:
47-
case LimitNodeType::UNSET:
48-
break;
49-
default:
50-
// not a constant or unset offset
51-
OptimizePostgresScanLimitPushdown(op->children[0]);
52-
return;
53-
}
54-
55-
auto &bind_data = get.bind_data->Cast<PostgresBindData>();
56-
57-
string generated_limit_clause = "";
58-
if (limit.limit_val.Type() != LimitNodeType::UNSET) {
59-
generated_limit_clause += " LIMIT " + to_string(limit.limit_val.GetConstantValue());
60-
}
61-
if (limit.offset_val.Type() != LimitNodeType::UNSET) {
62-
generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue());
63-
}
64-
65-
if (!generated_limit_clause.empty()) {
66-
bind_data.limit = generated_limit_clause;
67-
// When LIMIT is pushed down to Postgres, we must ensure single-task execution
68-
// to avoid each task (whether parallel or sequential) applying the LIMIT independently.
69-
// Setting pages_approx = 0 disables CTID-based task splitting, ensuring a single query.
70-
bind_data.pages_approx = 0;
71-
bind_data.max_threads = 1;
72-
73-
op = std::move(op->children[0]);
74-
return;
75-
}
76-
}
77-
78-
for (auto &child : op->children) {
79-
OptimizePostgresScanLimitPushdown(child);
80-
}
81-
}
82-
83-
void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) {
21+
static void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) {
8422
if (op.type == LogicalOperatorType::LOGICAL_GET) {
8523
auto &get = op.Cast<LogicalGet>();
8624
auto &table_scan = get.function;
@@ -102,9 +40,35 @@ void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) {
10240
}
10341
}
10442

43+
static void DisableParallelLimit(LogicalOperator &op) {
44+
LogicalGet *get = nullptr;
45+
dbconnector::BindData *bind_data = nullptr;
46+
if (dbconnector::optimizer::OptimizerUtil::FindExtensionGet("postgres_scan", op, get, bind_data)) {
47+
auto &pg_bind_data = bind_data->Cast<PostgresBindData>();
48+
if (!pg_bind_data.order_by_and_limit_bind_data.limit_clause.empty()) {
49+
// When LIMIT is pushed down to Postgres, we must ensure single-task execution
50+
// to avoid each task (whether parallel or sequential) applying the LIMIT independently.
51+
// Setting pages_approx = 0 disables CTID-based task splitting, ensuring a single query.
52+
pg_bind_data.pages_approx = 0;
53+
pg_bind_data.max_threads = 1;
54+
}
55+
}
56+
57+
for (auto &child : op.children) {
58+
DisableParallelLimit(*child);
59+
}
60+
}
61+
10562
void PostgresOptimizer::Optimize(OptimizerExtensionInput &input, unique_ptr<LogicalOperator> &plan) {
63+
using namespace dbconnector;
10664
// look at query plan and check if we can find LIMIT/OFFSET to pushdown
107-
OptimizePostgresScanLimitPushdown(plan);
65+
// OptimizePostgresScanLimitPushdown(plan);
66+
67+
auto order_config = optimizer::OrderByAndLimitOptimizer::CreateConfig(
68+
input.context, "pg_order_pushdown", '"', query::QuoteEscapeStyle::DOUBLE_QUOTE, "postgres_scan");
69+
optimizer::OrderByAndLimitOptimizer::Optimize(order_config, input, plan);
70+
DisableParallelLimit(*plan);
71+
10872
// look at the query plan and check if we can enable streaming query scans
10973
PostgresOperators operators;
11074
GatherPostgresScans(*plan, operators);
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# name: test/sql/storage/attach_order_pushdown.test
2+
# description: Test ORDER BY pushdown over an attached Postgres table
3+
# group: [storage]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
ATTACH 'dbname=postgresscanner' AS s1 (TYPE POSTGRES);
11+
12+
statement ok
13+
DROP TABLE IF EXISTS s1.order_test;
14+
15+
statement ok
16+
CALL postgres_execute('s1', 'CREATE TABLE order_test (id INT NOT NULL PRIMARY KEY, val INT, name TEXT)');
17+
18+
statement ok
19+
CALL pg_clear_cache();
20+
21+
statement ok
22+
INSERT INTO s1.order_test VALUES
23+
(1, 10, 'alice'),
24+
(2, 20, 'bob'),
25+
(3, NULL, 'carol'),
26+
(4, 30, 'dave'),
27+
(5, 15, 'eve'),
28+
(6, NULL, 'frank'),
29+
(7, 25, 'grace'),
30+
(8, 5, 'heidi'),
31+
(9, 35, 'ivan'),
32+
(10, 20, 'judy');
33+
34+
# ASC with NULLS LAST (DuckDB default) + LIMIT
35+
query III
36+
SELECT * FROM s1.order_test ORDER BY val, id LIMIT 5;
37+
----
38+
8 5 heidi
39+
1 10 alice
40+
5 15 eve
41+
2 20 bob
42+
10 20 judy
43+
44+
# DESC + LIMIT
45+
query III
46+
SELECT * FROM s1.order_test ORDER BY val DESC LIMIT 3;
47+
----
48+
9 35 ivan
49+
4 30 dave
50+
7 25 grace
51+
52+
# TopN with OFFSET
53+
query III
54+
SELECT * FROM s1.order_test ORDER BY val, id LIMIT 3 OFFSET 2;
55+
----
56+
5 15 eve
57+
2 20 bob
58+
10 20 judy
59+
60+
# Standalone ORDER BY
61+
62+
query III
63+
SELECT * FROM s1.order_test ORDER BY val, id;
64+
----
65+
8 5 heidi
66+
1 10 alice
67+
5 15 eve
68+
2 20 bob
69+
10 20 judy
70+
7 25 grace
71+
4 30 dave
72+
9 35 ivan
73+
3 NULL carol
74+
6 NULL frank
75+
76+
# Composition with WHERE filter
77+
78+
query III
79+
SELECT * FROM s1.order_test WHERE val > 15 ORDER BY val DESC LIMIT 3;
80+
----
81+
9 35 ivan
82+
4 30 dave
83+
7 25 grace
84+
85+
# Multi-column ORDER BY with filter
86+
query III
87+
SELECT * FROM s1.order_test WHERE val IS NOT NULL ORDER BY val DESC, name ASC LIMIT 4;
88+
----
89+
9 35 ivan
90+
4 30 dave
91+
7 25 grace
92+
2 20 bob
93+
94+
# Standalone ORDER BY with filter
95+
query III
96+
SELECT * FROM s1.order_test WHERE val IS NOT NULL ORDER BY val, id;
97+
----
98+
8 5 heidi
99+
1 10 alice
100+
5 15 eve
101+
2 20 bob
102+
10 20 judy
103+
7 25 grace
104+
4 30 dave
105+
9 35 ivan
106+
107+
# Projection pushdown: ORDER BY on column not in SELECT
108+
109+
# Sort by column not in output
110+
query I
111+
SELECT name FROM s1.order_test ORDER BY val, id;
112+
----
113+
heidi
114+
alice
115+
eve
116+
bob
117+
judy
118+
grace
119+
dave
120+
ivan
121+
carol
122+
frank
123+
124+
# Sort by column not in output with LIMIT
125+
query I
126+
SELECT name FROM s1.order_test ORDER BY val DESC LIMIT 3;
127+
----
128+
ivan
129+
dave
130+
grace
131+
132+
# Sort by one column, output different column
133+
query I
134+
SELECT id FROM s1.order_test ORDER BY name;
135+
----
136+
1
137+
2
138+
3
139+
4
140+
5
141+
6
142+
7
143+
8
144+
9
145+
10
146+
147+
# WHERE + ORDER BY on non-output column + LIMIT
148+
query I
149+
SELECT name FROM s1.order_test WHERE val IS NOT NULL ORDER BY val LIMIT 5;
150+
----
151+
heidi
152+
alice
153+
eve
154+
bob
155+
judy
156+
157+
# Output multiple columns, sort by non-output column
158+
query II
159+
SELECT id, name FROM s1.order_test ORDER BY val, id LIMIT 5;
160+
----
161+
8 heidi
162+
1 alice
163+
5 eve
164+
2 bob
165+
10 judy

0 commit comments

Comments
 (0)