-
Notifications
You must be signed in to change notification settings - Fork 99
Expand file tree
/
Copy pathsnapshot.patch
More file actions
114 lines (108 loc) · 4.52 KB
/
snapshot.patch
File metadata and controls
114 lines (108 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
diff --git a/src/include/postgres_scanner.hpp b/src/include/postgres_scanner.hpp
index 3e0162c..7c8fa19 100644
--- a/src/include/postgres_scanner.hpp
+++ b/src/include/postgres_scanner.hpp
@@ -50,6 +50,8 @@ public:
bool use_text_protocol = false;
idx_t max_threads = 1;
+ string snapshot;
+
public:
void SetTablePages(idx_t approx_num_pages);
diff --git a/src/postgres_query.cpp b/src/postgres_query.cpp
index f999617..771427c 100644
--- a/src/postgres_query.cpp
+++ b/src/postgres_query.cpp
@@ -44,6 +44,9 @@ static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctio
if (kv.first == "use_transaction") {
use_transaction = BooleanValue::Get(kv.second);
}
+ if (kv.first == "snapshot") {
+ result->snapshot = kv.second.GetValue<string>();
+ }
}
vector<Value> param_values;
@@ -123,6 +126,7 @@ PostgresQueryFunction::PostgresQueryFunction()
: TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR}, nullptr, PGQueryBind) {
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
named_parameters["params"] = LogicalType::ANY;
+ named_parameters["snapshot"] = LogicalType::VARCHAR;
PostgresScanFunction scan_function;
init_global = scan_function.init_global;
init_local = scan_function.init_local;
diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp
index eb4360a..3de7dbe 100644
--- a/src/postgres_scanner.cpp
+++ b/src/postgres_scanner.cpp
@@ -67,9 +67,10 @@ private:
static void PostgresGetSnapshot(ClientContext &context, PostgresVersion version, const PostgresBindData &bind_data,
PostgresGlobalState &gstate) {
unique_ptr<PostgresResult> result;
- // by default disable snapshotting
- gstate.snapshot = string();
- if (gstate.max_threads <= 1) {
+ gstate.snapshot = bind_data.snapshot;
+
+ // nothing to do if there is a single thread or a manually specified snapshot
+ if (gstate.max_threads <= 1 || !gstate.snapshot.empty()) {
return;
}
if (version.type_v == PostgresInstanceType::AURORA) {
@@ -93,7 +94,6 @@ static void PostgresGetSnapshot(ClientContext &context, PostgresVersion version,
context, "SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)");
if (result) {
auto in_recovery = result->GetBool(0, 0) || result->GetInt64(0, 2) > 0;
- gstate.snapshot = "";
if (!in_recovery) {
gstate.snapshot = result->GetString(0, 1);
}
@@ -182,6 +182,15 @@ static unique_ptr<FunctionData> PostgresBind(ClientContext &context, TableFuncti
bind_data->table_name = input.inputs[2].GetValue<string>();
bind_data->attach_path = bind_data->dsn;
+ for (auto &kv : input.named_parameters) {
+ if (kv.second.IsNull()) {
+ throw BinderException("Cannot use NULL as function argument");
+ }
+ if (kv.first == "snapshot") {
+ bind_data->snapshot = kv.second.GetValue<string>();
+ }
+ }
+
auto con = PostgresConnection::Open(bind_data->dsn, bind_data->attach_path);
auto version = con.GetPostgresVersion(context);
// query the table schema so we can interpret the bits in the pages
@@ -315,6 +324,9 @@ static unique_ptr<GlobalTableFunctionState> PostgresInitGlobalState(ClientContex
auto &bind_data = input.bind_data->Cast<PostgresBindData>();
auto result = make_uniq<PostgresGlobalState>(PostgresMaxThreads(context, input.bind_data.get()));
auto pg_catalog = bind_data.GetCatalog();
+
+ result->snapshot = bind_data.snapshot;
+
if (pg_catalog) {
auto &transaction = Transaction::Get(context, *pg_catalog).Cast<PostgresTransaction>();
auto &con =
@@ -323,7 +335,7 @@ static unique_ptr<GlobalTableFunctionState> PostgresInitGlobalState(ClientContex
} else {
auto con = PostgresConnection::Open(bind_data.dsn, bind_data.attach_path);
if (bind_data.use_transaction) {
- PostgresScanConnect(context, con, string());
+ PostgresScanConnect(context, con, result->snapshot);
}
result->SetConnection(std::move(con));
}
@@ -559,6 +571,8 @@ PostgresScanFunction::PostgresScanFunction()
get_bind_info = PostgresGetBindInfo;
projection_pushdown = true;
global_initialization = TableFunctionInitialization::INITIALIZE_ON_SCHEDULE;
+
+ named_parameters["snapshot"] = LogicalType::VARCHAR;
}
PostgresScanFunctionFilterPushdown::PostgresScanFunctionFilterPushdown()
@@ -574,6 +588,8 @@ PostgresScanFunctionFilterPushdown::PostgresScanFunctionFilterPushdown()
projection_pushdown = true;
filter_pushdown = true;
global_initialization = TableFunctionInitialization::INITIALIZE_ON_SCHEDULE;
+
+ named_parameters["snapshot"] = LogicalType::VARCHAR;
}
} // namespace duckdb