Skip to content

Commit 2fccd22

Browse files
committed
Don't sync MotherDuck metadata forever
Keeping a connection open to MotherDuck forever is undesirable from a resourcing perspective on the MotherDuck side. This changes that to behave similarly like the DuckDB CLI, after 5 minutes (by default) syncing will stop. The main difficulty is to restart the syncing. In the DuckDB CLI the syncing will start again after some activity is detected, but the background worker never triggers activity. So instead we need to make sure that activity in other connections triggers a restart of the syncing in the background worker. This is done using some very simple IPC in shared memory.
1 parent a9faa3c commit 2fccd22

File tree

6 files changed

+177
-41
lines changed

6 files changed

+177
-41
lines changed

include/pgduckdb/pgduckdb_background_worker.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#pragma once
22

3-
void DuckdbInitBackgroundWorker(void);
4-
53
namespace pgduckdb {
64

7-
void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade);
5+
void InitBackgroundWorker(void);
6+
void TriggerActivity(void);
7+
8+
extern bool is_background_worker;
89
extern bool doing_motherduck_sync;
910
extern char *current_duckdb_database_name;
1011
extern char *current_motherduck_catalog_version;

include/pgduckdb/pgduckdb_guc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ extern int duckdb_motherduck_enabled;
1515
extern char *duckdb_motherduck_token;
1616
extern char *duckdb_postgres_role;
1717
extern char *duckdb_motherduck_default_database;
18+
extern char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout;

src/pgduckdb.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO;
2020
char *duckdb_motherduck_token = strdup("");
2121
char *duckdb_motherduck_postgres_database = strdup("postgres");
2222
char *duckdb_motherduck_default_database = strdup("");
23+
char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup("3 minutes");
2324
char *duckdb_postgres_role = strdup("");
2425

2526
int duckdb_maximum_threads = -1;
@@ -44,7 +45,7 @@ _PG_init(void) {
4445
DuckdbInitGUC();
4546
DuckdbInitHooks();
4647
DuckdbInitNode();
47-
DuckdbInitBackgroundWorker();
48+
pgduckdb::InitBackgroundWorker();
4849
pgduckdb::RegisterDuckdbXactCallback();
4950
}
5051
} // extern "C"
@@ -186,4 +187,9 @@ DuckdbInitGUC(void) {
186187
DefineCustomVariable("duckdb.motherduck_default_database",
187188
"Which database in MotherDuck to designate as default (in place of my_db)",
188189
&duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY);
190+
191+
DefineCustomVariable("duckdb.motherduck_background_catalog_refresh_inactivity_timeout",
192+
"When to stop syncing of the motherduck catalog when no activity has taken place",
193+
&duckdb_motherduck_background_catalog_refresh_inactivity_timeout, PGC_POSTMASTER,
194+
GUC_SUPERUSER_ONLY);
189195
}

src/pgduckdb_background_worker.cpp

Lines changed: 130 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,28 @@ extern "C" {
5353
#include "pgduckdb/pgduckdb_background_worker.hpp"
5454
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
5555

56-
static bool is_background_worker = false;
5756
static std::unordered_map<std::string, std::string> last_known_motherduck_catalog_versions;
5857
static uint64 initial_cache_version = 0;
5958

59+
namespace pgduckdb {
60+
61+
bool is_background_worker = false;
62+
63+
static void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context);
64+
static void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *context);
65+
66+
typedef struct BackgoundWorkerShmemStruct {
67+
Latch *bgw_latch; /* The latch of the background worker */
68+
69+
slock_t lock; /* protects all the fields below */
70+
71+
int64 activity_count; /* the number of times activity was triggered by other backends */
72+
} BackgoundWorkerShmemStruct;
73+
74+
static BackgoundWorkerShmemStruct *BgwShmemStruct;
75+
76+
} // namespace pgduckdb
77+
6078
extern "C" {
6179
PGDLLEXPORT void
6280
pgduckdb_background_worker_main(Datum /* main_arg */) {
@@ -66,9 +84,15 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
6684
BackgroundWorkerUnblockSignals();
6785

6886
BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0);
87+
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
88+
pgduckdb::BgwShmemStruct->bgw_latch = MyLatch;
89+
int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count;
90+
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);
6991

7092
pgduckdb::doing_motherduck_sync = true;
71-
is_background_worker = true;
93+
pgduckdb::is_background_worker = true;
94+
95+
duckdb::unique_ptr<duckdb::Connection> connection;
7296

7397
while (true) {
7498
// Initialize SPI (Server Programming Interface) and connect to the database
@@ -78,12 +102,23 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
78102
PushActiveSnapshot(GetTransactionSnapshot());
79103

80104
if (pgduckdb::IsExtensionRegistered()) {
105+
if (!connection) {
106+
connection = pgduckdb::DuckDBManager::Get().CreateConnection();
107+
}
108+
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
109+
int64 new_activity_count = pgduckdb::BgwShmemStruct->activity_count;
110+
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);
111+
if (last_activity_count != new_activity_count) {
112+
last_activity_count = new_activity_count;
113+
/* Trigger some activity to restart the syncing */
114+
pgduckdb::DuckDBQueryOrThrow(*connection, "FROM duckdb_tables() limit 0");
115+
}
81116
/*
82117
* If the extension is not registerid this loop is a no-op, which
83118
* means we essentially keep polling until the extension is
84119
* installed
85120
*/
86-
pgduckdb::SyncMotherDuckCatalogsWithPg(false);
121+
pgduckdb::SyncMotherDuckCatalogsWithPg(false, *connection->context);
87122
}
88123

89124
// Commit the transaction
@@ -108,24 +143,79 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
108143
Datum drop_with_cascade = PG_GETARG_BOOL(0);
109144
/* clear the cache of known catalog versions to force a full sync */
110145
last_known_motherduck_catalog_versions.clear();
146+
147+
/*
148+
* We don't use GetConnection, because we want to be able to precisely
149+
* control the transaction lifecycle. We commit Postgres connections
150+
* throughout this function, and the GetConnect its cached connection its
151+
* lifecycle would be linked to those postgres transactions, which we
152+
* don't want.
153+
*/
154+
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
111155
SPI_connect_ext(SPI_OPT_NONATOMIC);
112156
PG_TRY();
113157
{
114158
pgduckdb::doing_motherduck_sync = true;
115-
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade);
159+
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade, *connection->context);
116160
}
117161
PG_FINALLY();
118-
{
119-
pgduckdb::doing_motherduck_sync = false;
120-
}
162+
{ pgduckdb::doing_motherduck_sync = false; }
121163
PG_END_TRY();
122164
SPI_finish();
123165
PG_RETURN_VOID();
124166
}
125167
}
126168

169+
namespace pgduckdb {
170+
static shmem_request_hook_type prev_shmem_request_hook = NULL;
171+
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
172+
173+
/*
174+
* shmem_request hook: request additional shared resources. We'll allocate or
175+
* attach to the shared resources in pgss_shmem_startup().
176+
*/
177+
static void
178+
ShmemRequest(void) {
179+
if (prev_shmem_request_hook)
180+
prev_shmem_request_hook();
181+
182+
RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct));
183+
}
184+
185+
/*
186+
* CheckpointerShmemInit
187+
* Allocate and initialize checkpointer-related shared memory
188+
*/
189+
static void
190+
ShmemStartup(void) {
191+
if (prev_shmem_startup_hook)
192+
prev_shmem_startup_hook();
193+
194+
Size size = sizeof(BackgoundWorkerShmemStruct);
195+
bool found;
196+
197+
/*
198+
* Create or attach to the shared memory state, including hash table
199+
*/
200+
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
201+
202+
BgwShmemStruct = (BackgoundWorkerShmemStruct *)ShmemInitStruct("DuckdbBackgroundWorker Data", size, &found);
203+
204+
if (!found) {
205+
/*
206+
* First time through, so initialize. Note that we zero the whole
207+
* requests array; this is so that CompactCheckpointerRequestQueue can
208+
* assume that any pad bytes in the request structs are zeroes.
209+
*/
210+
MemSet(BgwShmemStruct, 0, size);
211+
SpinLockInit(&BgwShmemStruct->lock);
212+
}
213+
214+
LWLockRelease(AddinShmemInitLock);
215+
}
216+
127217
void
128-
DuckdbInitBackgroundWorker(void) {
218+
InitBackgroundWorker(void) {
129219
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
130220
return;
131221
}
@@ -143,9 +233,27 @@ DuckdbInitBackgroundWorker(void) {
143233

144234
// Register the worker
145235
RegisterBackgroundWorker(&worker);
236+
237+
/* Set up the shared memory hooks */
238+
prev_shmem_request_hook = shmem_request_hook;
239+
shmem_request_hook = ShmemRequest;
240+
prev_shmem_startup_hook = shmem_startup_hook;
241+
shmem_startup_hook = ShmemStartup;
242+
}
243+
244+
void
245+
TriggerActivity(void) {
246+
if (!IsMotherDuckEnabled()) {
247+
return;
248+
}
249+
250+
SpinLockAcquire(&BgwShmemStruct->lock);
251+
BgwShmemStruct->activity_count++;
252+
/* Force wake up the background worker */
253+
SetLatch(BgwShmemStruct->bgw_latch);
254+
SpinLockRelease(&BgwShmemStruct->lock);
146255
}
147256

148-
namespace pgduckdb {
149257
/* Global variables that are used to communicate with our event triggers so
150258
* they handle DDL of syncing differently than user-initiated DDL */
151259
bool doing_motherduck_sync;
@@ -279,9 +387,7 @@ SPI_run_utility_command(const char *query) {
279387
*/
280388
BeginInternalSubTransaction(NULL);
281389
PG_TRY();
282-
{
283-
ret = SPI_exec(query, 0);
284-
}
390+
{ ret = SPI_exec(query, 0); }
285391
PG_CATCH();
286392
{
287393
MemoryContextSwitchTo(old_context);
@@ -546,30 +652,25 @@ CreateSchemaIfNotExists(const char *postgres_schema_name, bool is_default_db) {
546652
return true;
547653
}
548654

549-
void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade);
550-
551-
void
552-
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade) {
553-
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade);
655+
static void
656+
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context) {
657+
/*
658+
* TODO: Passing a reference through InvokeCPPFunc doesn't work here
659+
* for some reason. So to work around that we use a pointer instead.
660+
* We should fix the underlying problem instead.
661+
*/
662+
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade, &context);
554663
}
555664

556-
void
557-
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
665+
static void
666+
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *_context) {
558667
if (!pgduckdb::IsMotherDuckEnabled()) {
559668
throw std::runtime_error("MotherDuck support is not enabled");
560669
}
561670

562-
initial_cache_version = pgduckdb::CacheVersion();
671+
auto &context = *_context;
563672

564-
/*
565-
* We don't use GetConnection, because we want to be able to precisely
566-
* control the transaction lifecycle. We commit Postgres connections
567-
* throughout this function, and the GetConnect its cached connection its
568-
* lifecycle would be linked to those postgres transactions, which we
569-
* don't want.
570-
*/
571-
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
572-
auto &context = *connection->context;
673+
initial_cache_version = pgduckdb::CacheVersion();
573674

574675
auto &db_manager = duckdb::DatabaseManager::Get(context);
575676
const auto &default_db = db_manager.GetDefaultDatabase(context);

src/pgduckdb_duckdb.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "duckdb/main/extension_util.hpp"
77
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
88

9+
#include "pgduckdb/pgduckdb_background_worker.hpp"
910
#include "pgduckdb/catalog/pgduckdb_storage.hpp"
1011
#include "pgduckdb/scan/postgres_scan.hpp"
1112
#include "pgduckdb/pg/transactions.hpp"
@@ -191,14 +192,9 @@ DuckDBManager::Initialize() {
191192
pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;");
192193

193194
if (pgduckdb::IsMotherDuckEnabled()) {
194-
/*
195-
* Workaround for MotherDuck catalog sync that turns off automatically,
196-
* in case of no queries being sent to MotherDuck. Since the background
197-
* worker never sends any query to MotherDuck we need to turn this off.
198-
* So we set the timeout to an arbitrary very large value.
199-
*/
200-
pgduckdb::DuckDBQueryOrThrow(context,
201-
"SET motherduck_background_catalog_refresh_inactivity_timeout='99 years'");
195+
pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" +
196+
duckdb::KeywordHelper::WriteQuoted(
197+
duckdb_motherduck_background_catalog_refresh_inactivity_timeout));
202198
}
203199

204200
LoadFunctions(context);

src/pgduckdb_hooks.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern "C" {
2727
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
2828
#include "pgduckdb/pgduckdb_ddl.hpp"
2929
#include "pgduckdb/pgduckdb_table_am.hpp"
30+
#include "pgduckdb/pgduckdb_background_worker.hpp"
3031
#include "pgduckdb/utility/copy.hpp"
3132
#include "pgduckdb/vendor/pg_explain.hpp"
3233
#include "pgduckdb/vendor/pg_list.hpp"
@@ -189,10 +190,12 @@ static PlannedStmt *
189190
DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options, ParamListInfo bound_params) {
190191
if (pgduckdb::IsExtensionRegistered()) {
191192
if (NeedsDuckdbExecution(parse)) {
193+
pgduckdb::TriggerActivity();
192194
IsAllowedStatement(parse, true);
193195

194196
return DuckdbPlanNode(parse, query_string, cursor_options, bound_params, true);
195197
} else if (duckdb_force_execution && IsAllowedStatement(parse) && ContainsFromClause(parse)) {
198+
pgduckdb::TriggerActivity();
196199
PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, query_string, cursor_options, bound_params, false);
197200
if (duckdbPlan) {
198201
return duckdbPlan;
@@ -353,6 +356,18 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp
353356
prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv);
354357
}
355358

359+
static bool
360+
IsOutdatedMotherduckCatalogErrcode(int error_code) {
361+
switch (error_code) {
362+
case ERRCODE_UNDEFINED_COLUMN:
363+
case ERRCODE_UNDEFINED_SCHEMA:
364+
case ERRCODE_UNDEFINED_TABLE:
365+
return true;
366+
default:
367+
return false;
368+
}
369+
}
370+
356371
static void
357372
DuckdbEmitLogHook(ErrorData *edata) {
358373
if (prev_emit_log_hook) {
@@ -385,6 +400,22 @@ DuckdbEmitLogHook(ErrorData *edata) {
385400
"If you use DuckDB functions like read_parquet, you need to use the r['colname'] syntax introduced "
386401
"in pg_duckdb 0.3.0. It seems like you might be using the outdated \"AS (colname coltype, ...)\" syntax");
387402
}
403+
404+
/*
405+
* The background worker stops syncing the catalogs after the
406+
* motherduck_background_catalog_refresh_inactivity_timeout has been
407+
* reached. This means that the table metadata that Postgres knows about
408+
* could be out of date, which could then easily result in errors about
409+
* missing from the Postgres parser because it cannot understand the query.
410+
*
411+
* This mitigates the impact of that by triggering a restart of the catalog
412+
* syncing when one of those errors occurs AND the current user can
413+
* actually use DuckDB.
414+
*/
415+
if (IsOutdatedMotherduckCatalogErrcode(edata->sqlerrcode) && pgduckdb::IsExtensionRegistered() &&
416+
pgduckdb::IsDuckdbExecutionAllowed()) {
417+
pgduckdb::TriggerActivity();
418+
}
388419
}
389420

390421
void

0 commit comments

Comments
 (0)