From 892f6c7de215c77966406d3384e18cfd6fc8ef23 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Tue, 28 Jan 2025 18:52:32 +0100 Subject: [PATCH 1/5] Don't sync MotherDuck metadata forever Keeping a connection open to MotherDuck forever is undesirable from a resourcing perspective on the MotherDuck side. This changes that to behave similarly like the DuckDB CLI, after 5 minutes (by default) syncing will stop. The main difficulty is to restart the syncing. In the DuckDB CLI the syncing will start again after some activity is detected, but the background worker never triggers activity. So instead we need to make sure that activity in other connections triggers a restart of the syncing in the background worker. This is done using some very simple IPC in shared memory. --- .../pgduckdb/pgduckdb_background_worker.hpp | 7 +- include/pgduckdb/pgduckdb_guc.h | 1 + src/pgduckdb.cpp | 8 +- src/pgduckdb_background_worker.cpp | 151 +++++++++++++++--- src/pgduckdb_duckdb.cpp | 12 +- src/pgduckdb_hooks.cpp | 31 ++++ 6 files changed, 175 insertions(+), 35 deletions(-) diff --git a/include/pgduckdb/pgduckdb_background_worker.hpp b/include/pgduckdb/pgduckdb_background_worker.hpp index f6ab19c7..8752404e 100644 --- a/include/pgduckdb/pgduckdb_background_worker.hpp +++ b/include/pgduckdb/pgduckdb_background_worker.hpp @@ -1,10 +1,11 @@ #pragma once -void DuckdbInitBackgroundWorker(void); - namespace pgduckdb { -void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade); +void InitBackgroundWorker(void); +void TriggerActivity(void); + +extern bool is_background_worker; extern bool doing_motherduck_sync; extern char *current_duckdb_database_name; extern char *current_motherduck_catalog_version; diff --git a/include/pgduckdb/pgduckdb_guc.h b/include/pgduckdb/pgduckdb_guc.h index 20e965f4..8682ca77 100644 --- a/include/pgduckdb/pgduckdb_guc.h +++ b/include/pgduckdb/pgduckdb_guc.h @@ -15,3 +15,4 @@ extern int duckdb_motherduck_enabled; extern char *duckdb_motherduck_token; extern char *duckdb_postgres_role; extern char *duckdb_motherduck_default_database; +extern char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout; diff --git a/src/pgduckdb.cpp b/src/pgduckdb.cpp index 973798b8..d8e02afd 100644 --- a/src/pgduckdb.cpp +++ b/src/pgduckdb.cpp @@ -20,6 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO; char *duckdb_motherduck_token = strdup(""); char *duckdb_motherduck_postgres_database = strdup("postgres"); char *duckdb_motherduck_default_database = strdup(""); +char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup("5 minutes"); char *duckdb_postgres_role = strdup(""); int duckdb_maximum_threads = -1; @@ -44,7 +45,7 @@ _PG_init(void) { DuckdbInitGUC(); DuckdbInitHooks(); DuckdbInitNode(); - DuckdbInitBackgroundWorker(); + pgduckdb::InitBackgroundWorker(); pgduckdb::RegisterDuckdbXactCallback(); } } // extern "C" @@ -186,4 +187,9 @@ DuckdbInitGUC(void) { DefineCustomVariable("duckdb.motherduck_default_database", "Which database in MotherDuck to designate as default (in place of my_db)", &duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY); + + DefineCustomVariable("duckdb.motherduck_background_catalog_refresh_inactivity_timeout", + "When to stop syncing of the motherduck catalog when no activity has taken place", + &duckdb_motherduck_background_catalog_refresh_inactivity_timeout, PGC_POSTMASTER, + GUC_SUPERUSER_ONLY); } diff --git a/src/pgduckdb_background_worker.cpp b/src/pgduckdb_background_worker.cpp index 49a00422..f0ea949c 100644 --- a/src/pgduckdb_background_worker.cpp +++ b/src/pgduckdb_background_worker.cpp @@ -53,10 +53,28 @@ extern "C" { #include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/pgduckdb_metadata_cache.hpp" -static bool is_background_worker = false; static std::unordered_map last_known_motherduck_catalog_versions; static uint64 initial_cache_version = 0; +namespace pgduckdb { + +bool is_background_worker = false; + +static void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context); +static void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *context); + +typedef struct BackgoundWorkerShmemStruct { + Latch *bgw_latch; /* The latch of the background worker */ + + slock_t lock; /* protects all the fields below */ + + int64 activity_count; /* the number of times activity was triggered by other backends */ +} BackgoundWorkerShmemStruct; + +static BackgoundWorkerShmemStruct *BgwShmemStruct; + +} // namespace pgduckdb + extern "C" { PGDLLEXPORT void pgduckdb_background_worker_main(Datum /* main_arg */) { @@ -66,9 +84,15 @@ pgduckdb_background_worker_main(Datum /* main_arg */) { BackgroundWorkerUnblockSignals(); BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0); + SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock); + pgduckdb::BgwShmemStruct->bgw_latch = MyLatch; + int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count; + SpinLockRelease(&pgduckdb::BgwShmemStruct->lock); pgduckdb::doing_motherduck_sync = true; - is_background_worker = true; + pgduckdb::is_background_worker = true; + + duckdb::unique_ptr connection; while (true) { // Initialize SPI (Server Programming Interface) and connect to the database @@ -78,12 +102,23 @@ pgduckdb_background_worker_main(Datum /* main_arg */) { PushActiveSnapshot(GetTransactionSnapshot()); if (pgduckdb::IsExtensionRegistered()) { + if (!connection) { + connection = pgduckdb::DuckDBManager::Get().CreateConnection(); + } + SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock); + int64 new_activity_count = pgduckdb::BgwShmemStruct->activity_count; + SpinLockRelease(&pgduckdb::BgwShmemStruct->lock); + if (last_activity_count != new_activity_count) { + last_activity_count = new_activity_count; + /* Trigger some activity to restart the syncing */ + pgduckdb::DuckDBQueryOrThrow(*connection, "FROM duckdb_tables() limit 0"); + } /* * If the extension is not registerid this loop is a no-op, which * means we essentially keep polling until the extension is * installed */ - pgduckdb::SyncMotherDuckCatalogsWithPg(false); + pgduckdb::SyncMotherDuckCatalogsWithPg(false, *connection->context); } // Commit the transaction @@ -108,11 +143,20 @@ force_motherduck_sync(PG_FUNCTION_ARGS) { Datum drop_with_cascade = PG_GETARG_BOOL(0); /* clear the cache of known catalog versions to force a full sync */ last_known_motherduck_catalog_versions.clear(); + + /* + * We don't use GetConnection, because we want to be able to precisely + * control the transaction lifecycle. We commit Postgres connections + * throughout this function, and the GetConnect its cached connection its + * lifecycle would be linked to those postgres transactions, which we + * don't want. + */ + auto connection = pgduckdb::DuckDBManager::Get().CreateConnection(); SPI_connect_ext(SPI_OPT_NONATOMIC); PG_TRY(); { pgduckdb::doing_motherduck_sync = true; - pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade); + pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade, *connection->context); } PG_FINALLY(); { @@ -124,8 +168,56 @@ force_motherduck_sync(PG_FUNCTION_ARGS) { } } +namespace pgduckdb { +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +/* + * shmem_request hook: request additional shared resources. We'll allocate or + * attach to the shared resources in pgss_shmem_startup(). + */ +static void +ShmemRequest(void) { + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + + RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct)); +} + +/* + * CheckpointerShmemInit + * Allocate and initialize checkpointer-related shared memory + */ +static void +ShmemStartup(void) { + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + Size size = sizeof(BackgoundWorkerShmemStruct); + bool found; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + BgwShmemStruct = (BackgoundWorkerShmemStruct *)ShmemInitStruct("DuckdbBackgroundWorker Data", size, &found); + + if (!found) { + /* + * First time through, so initialize. Note that we zero the whole + * requests array; this is so that CompactCheckpointerRequestQueue can + * assume that any pad bytes in the request structs are zeroes. + */ + MemSet(BgwShmemStruct, 0, size); + SpinLockInit(&BgwShmemStruct->lock); + } + + LWLockRelease(AddinShmemInitLock); +} + void -DuckdbInitBackgroundWorker(void) { +InitBackgroundWorker(void) { if (!pgduckdb::IsMotherDuckEnabledAnywhere()) { return; } @@ -143,9 +235,27 @@ DuckdbInitBackgroundWorker(void) { // Register the worker RegisterBackgroundWorker(&worker); + + /* Set up the shared memory hooks */ + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = ShmemRequest; + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = ShmemStartup; +} + +void +TriggerActivity(void) { + if (!IsMotherDuckEnabled()) { + return; + } + + SpinLockAcquire(&BgwShmemStruct->lock); + BgwShmemStruct->activity_count++; + /* Force wake up the background worker */ + SetLatch(BgwShmemStruct->bgw_latch); + SpinLockRelease(&BgwShmemStruct->lock); } -namespace pgduckdb { /* Global variables that are used to communicate with our event triggers so * they handle DDL of syncing differently than user-initiated DDL */ bool doing_motherduck_sync; @@ -546,30 +656,25 @@ CreateSchemaIfNotExists(const char *postgres_schema_name, bool is_default_db) { return true; } -void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade); - -void -SyncMotherDuckCatalogsWithPg(bool drop_with_cascade) { - InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade); +static void +SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context) { + /* + * TODO: Passing a reference through InvokeCPPFunc doesn't work here + * for some reason. So to work around that we use a pointer instead. + * We should fix the underlying problem instead. + */ + InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade, &context); } -void -SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) { +static void +SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *_context) { if (!pgduckdb::IsMotherDuckEnabled()) { throw std::runtime_error("MotherDuck support is not enabled"); } - initial_cache_version = pgduckdb::CacheVersion(); + auto &context = *_context; - /* - * We don't use GetConnection, because we want to be able to precisely - * control the transaction lifecycle. We commit Postgres connections - * throughout this function, and the GetConnect its cached connection its - * lifecycle would be linked to those postgres transactions, which we - * don't want. - */ - auto connection = pgduckdb::DuckDBManager::Get().CreateConnection(); - auto &context = *connection->context; + initial_cache_version = pgduckdb::CacheVersion(); auto &db_manager = duckdb::DatabaseManager::Get(context); const auto &default_db = db_manager.GetDefaultDatabase(context); diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index ac417fc3..a66de116 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -6,6 +6,7 @@ #include "duckdb/main/extension_util.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/catalog/pgduckdb_storage.hpp" #include "pgduckdb/scan/postgres_scan.hpp" #include "pgduckdb/pg/transactions.hpp" @@ -191,14 +192,9 @@ DuckDBManager::Initialize() { pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;"); if (pgduckdb::IsMotherDuckEnabled()) { - /* - * Workaround for MotherDuck catalog sync that turns off automatically, - * in case of no queries being sent to MotherDuck. Since the background - * worker never sends any query to MotherDuck we need to turn this off. - * So we set the timeout to an arbitrary very large value. - */ - pgduckdb::DuckDBQueryOrThrow(context, - "SET motherduck_background_catalog_refresh_inactivity_timeout='99 years'"); + pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" + + duckdb::KeywordHelper::WriteQuoted( + duckdb_motherduck_background_catalog_refresh_inactivity_timeout)); } LoadFunctions(context); diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index e9e0d1cb..c5583668 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -27,6 +27,7 @@ extern "C" { #include "pgduckdb/pgduckdb_metadata_cache.hpp" #include "pgduckdb/pgduckdb_ddl.hpp" #include "pgduckdb/pgduckdb_table_am.hpp" +#include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/utility/copy.hpp" #include "pgduckdb/vendor/pg_explain.hpp" #include "pgduckdb/vendor/pg_list.hpp" @@ -189,10 +190,12 @@ static PlannedStmt * DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options, ParamListInfo bound_params) { if (pgduckdb::IsExtensionRegistered()) { if (NeedsDuckdbExecution(parse)) { + pgduckdb::TriggerActivity(); IsAllowedStatement(parse, true); return DuckdbPlanNode(parse, query_string, cursor_options, bound_params, true); } else if (duckdb_force_execution && IsAllowedStatement(parse) && ContainsFromClause(parse)) { + pgduckdb::TriggerActivity(); PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, query_string, cursor_options, bound_params, false); if (duckdbPlan) { return duckdbPlan; @@ -353,6 +356,18 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv); } +static bool +IsOutdatedMotherduckCatalogErrcode(int error_code) { + switch (error_code) { + case ERRCODE_UNDEFINED_COLUMN: + case ERRCODE_UNDEFINED_SCHEMA: + case ERRCODE_UNDEFINED_TABLE: + return true; + default: + return false; + } +} + static void DuckdbEmitLogHook(ErrorData *edata) { if (prev_emit_log_hook) { @@ -385,6 +400,22 @@ DuckdbEmitLogHook(ErrorData *edata) { "If you use DuckDB functions like read_parquet, you need to use the r['colname'] syntax introduced " "in pg_duckdb 0.3.0. It seems like you might be using the outdated \"AS (colname coltype, ...)\" syntax"); } + + /* + * The background worker stops syncing the catalogs after the + * motherduck_background_catalog_refresh_inactivity_timeout has been + * reached. This means that the table metadata that Postgres knows about + * could be out of date, which could then easily result in errors about + * missing from the Postgres parser because it cannot understand the query. + * + * This mitigates the impact of that by triggering a restart of the catalog + * syncing when one of those errors occurs AND the current user can + * actually use DuckDB. + */ + if (IsOutdatedMotherduckCatalogErrcode(edata->sqlerrcode) && pgduckdb::IsExtensionRegistered() && + pgduckdb::IsDuckdbExecutionAllowed()) { + pgduckdb::TriggerActivity(); + } } void From 8895ca0859913e00c0819a9b1500995889649e83 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Sat, 8 Feb 2025 12:31:55 +0100 Subject: [PATCH 2/5] Make PG14 work --- src/pgduckdb_background_worker.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/pgduckdb_background_worker.cpp b/src/pgduckdb_background_worker.cpp index f0ea949c..01610ae4 100644 --- a/src/pgduckdb_background_worker.cpp +++ b/src/pgduckdb_background_worker.cpp @@ -169,7 +169,9 @@ force_motherduck_sync(PG_FUNCTION_ARGS) { } namespace pgduckdb { +#if PG_VERSION_NUM >= 150000 static shmem_request_hook_type prev_shmem_request_hook = NULL; +#endif static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* @@ -178,8 +180,10 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; */ static void ShmemRequest(void) { +#if PG_VERSION_NUM >= 150000 if (prev_shmem_request_hook) prev_shmem_request_hook(); +#endif RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct)); } @@ -237,8 +241,12 @@ InitBackgroundWorker(void) { RegisterBackgroundWorker(&worker); /* Set up the shared memory hooks */ +#if PG_VERSION_NUM >= 150000 prev_shmem_request_hook = shmem_request_hook; shmem_request_hook = ShmemRequest; +#else + ShmemRequest(); +#endif prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = ShmemStartup; } From 488e2cfef461b8f88c951a4e913f268c5006cb09 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Tue, 11 Feb 2025 14:34:39 +0100 Subject: [PATCH 3/5] Update src/pgduckdb_hooks.cpp Co-authored-by: Y. --- src/pgduckdb_hooks.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index c5583668..f18a07e5 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -358,14 +358,9 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp static bool IsOutdatedMotherduckCatalogErrcode(int error_code) { - switch (error_code) { - case ERRCODE_UNDEFINED_COLUMN: - case ERRCODE_UNDEFINED_SCHEMA: - case ERRCODE_UNDEFINED_TABLE: - return true; - default: - return false; - } + return error_code == ERRCODE_UNDEFINED_COLUMN || + error_code == ERRCODE_UNDEFINED_SCHEMA || + error_code == ERRCODE_UNDEFINED_TABLE } static void From a358986866175284cdb2efe37b15935a31f84322 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Tue, 11 Feb 2025 14:55:02 +0100 Subject: [PATCH 4/5] Revert "Update src/pgduckdb_hooks.cpp" This reverts commit 488e2cfef461b8f88c951a4e913f268c5006cb09. --- src/pgduckdb_hooks.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index f18a07e5..c5583668 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -358,9 +358,14 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp static bool IsOutdatedMotherduckCatalogErrcode(int error_code) { - return error_code == ERRCODE_UNDEFINED_COLUMN || - error_code == ERRCODE_UNDEFINED_SCHEMA || - error_code == ERRCODE_UNDEFINED_TABLE + switch (error_code) { + case ERRCODE_UNDEFINED_COLUMN: + case ERRCODE_UNDEFINED_SCHEMA: + case ERRCODE_UNDEFINED_TABLE: + return true; + default: + return false; + } } static void From 7783fb34aef2a1bb88c62371ce59a1d0c904d8c6 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Tue, 11 Feb 2025 15:04:59 +0100 Subject: [PATCH 5/5] Default to MotherDuck provided refresh timeout --- src/pgduckdb.cpp | 2 +- src/pgduckdb_duckdb.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pgduckdb.cpp b/src/pgduckdb.cpp index d8e02afd..9fcb3290 100644 --- a/src/pgduckdb.cpp +++ b/src/pgduckdb.cpp @@ -20,7 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO; char *duckdb_motherduck_token = strdup(""); char *duckdb_motherduck_postgres_database = strdup("postgres"); char *duckdb_motherduck_default_database = strdup(""); -char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup("5 minutes"); +char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup(""); char *duckdb_postgres_role = strdup(""); int duckdb_maximum_threads = -1; diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index a66de116..bf32e3b4 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -191,7 +191,8 @@ DuckDBManager::Initialize() { pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE 'pgduckdb' (TYPE pgduckdb)"); pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;"); - if (pgduckdb::IsMotherDuckEnabled()) { + if (pgduckdb::IsMotherDuckEnabled() && + strlen(duckdb_motherduck_background_catalog_refresh_inactivity_timeout) > 0) { pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" + duckdb::KeywordHelper::WriteQuoted( duckdb_motherduck_background_catalog_refresh_inactivity_timeout));