Skip to content

Commit

Permalink
Don't sync MotherDuck metadata forever
Browse files Browse the repository at this point in the history
Keeping a connection open to MotherDuck forever is undesirable from a
resourcing perspective on the MotherDuck side. This changes that to
behave similarly like the DuckDB CLI, after 5 minutes (by default)
syncing will stop. The main difficulty is to restart the syncing.
In the DuckDB CLI the syncing will start again after some activity is
detected, but the background worker never triggers activity. So instead
we need to make sure that activity in other connections triggers a
restart of the syncing in the background worker. This is done using some
very simple IPC in shared memory.
  • Loading branch information
JelteF committed Feb 7, 2025
1 parent a9faa3c commit 892f6c7
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 35 deletions.
7 changes: 4 additions & 3 deletions include/pgduckdb/pgduckdb_background_worker.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

void DuckdbInitBackgroundWorker(void);

namespace pgduckdb {

void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade);
void InitBackgroundWorker(void);
void TriggerActivity(void);

extern bool is_background_worker;
extern bool doing_motherduck_sync;
extern char *current_duckdb_database_name;
extern char *current_motherduck_catalog_version;
Expand Down
1 change: 1 addition & 0 deletions include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
extern char *duckdb_postgres_role;
extern char *duckdb_motherduck_default_database;
extern char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout;
8 changes: 7 additions & 1 deletion src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO;
char *duckdb_motherduck_token = strdup("");
char *duckdb_motherduck_postgres_database = strdup("postgres");
char *duckdb_motherduck_default_database = strdup("");
char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup("5 minutes");
char *duckdb_postgres_role = strdup("");

int duckdb_maximum_threads = -1;
Expand All @@ -44,7 +45,7 @@ _PG_init(void) {
DuckdbInitGUC();
DuckdbInitHooks();
DuckdbInitNode();
DuckdbInitBackgroundWorker();
pgduckdb::InitBackgroundWorker();
pgduckdb::RegisterDuckdbXactCallback();
}
} // extern "C"
Expand Down Expand Up @@ -186,4 +187,9 @@ DuckdbInitGUC(void) {
DefineCustomVariable("duckdb.motherduck_default_database",
"Which database in MotherDuck to designate as default (in place of my_db)",
&duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY);

DefineCustomVariable("duckdb.motherduck_background_catalog_refresh_inactivity_timeout",
"When to stop syncing of the motherduck catalog when no activity has taken place",
&duckdb_motherduck_background_catalog_refresh_inactivity_timeout, PGC_POSTMASTER,
GUC_SUPERUSER_ONLY);
}
151 changes: 128 additions & 23 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,28 @@ extern "C" {
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"

static bool is_background_worker = false;
static std::unordered_map<std::string, std::string> last_known_motherduck_catalog_versions;
static uint64 initial_cache_version = 0;

namespace pgduckdb {

bool is_background_worker = false;

static void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context);
static void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *context);

typedef struct BackgoundWorkerShmemStruct {
Latch *bgw_latch; /* The latch of the background worker */

slock_t lock; /* protects all the fields below */

int64 activity_count; /* the number of times activity was triggered by other backends */
} BackgoundWorkerShmemStruct;

static BackgoundWorkerShmemStruct *BgwShmemStruct;

} // namespace pgduckdb

extern "C" {
PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
Expand All @@ -66,9 +84,15 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
BackgroundWorkerUnblockSignals();

BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0);
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
pgduckdb::BgwShmemStruct->bgw_latch = MyLatch;
int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count;
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);

pgduckdb::doing_motherduck_sync = true;
is_background_worker = true;
pgduckdb::is_background_worker = true;

duckdb::unique_ptr<duckdb::Connection> connection;

while (true) {
// Initialize SPI (Server Programming Interface) and connect to the database
Expand All @@ -78,12 +102,23 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
PushActiveSnapshot(GetTransactionSnapshot());

if (pgduckdb::IsExtensionRegistered()) {
if (!connection) {
connection = pgduckdb::DuckDBManager::Get().CreateConnection();
}
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
int64 new_activity_count = pgduckdb::BgwShmemStruct->activity_count;
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);
if (last_activity_count != new_activity_count) {
last_activity_count = new_activity_count;
/* Trigger some activity to restart the syncing */
pgduckdb::DuckDBQueryOrThrow(*connection, "FROM duckdb_tables() limit 0");
}
/*
* If the extension is not registerid this loop is a no-op, which
* means we essentially keep polling until the extension is
* installed
*/
pgduckdb::SyncMotherDuckCatalogsWithPg(false);
pgduckdb::SyncMotherDuckCatalogsWithPg(false, *connection->context);
}

// Commit the transaction
Expand All @@ -108,11 +143,20 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
Datum drop_with_cascade = PG_GETARG_BOOL(0);
/* clear the cache of known catalog versions to force a full sync */
last_known_motherduck_catalog_versions.clear();

/*
* We don't use GetConnection, because we want to be able to precisely
* control the transaction lifecycle. We commit Postgres connections
* throughout this function, and the GetConnect its cached connection its
* lifecycle would be linked to those postgres transactions, which we
* don't want.
*/
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
SPI_connect_ext(SPI_OPT_NONATOMIC);
PG_TRY();
{
pgduckdb::doing_motherduck_sync = true;
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade);
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade, *connection->context);
}
PG_FINALLY();
{
Expand All @@ -124,8 +168,56 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
}
}

namespace pgduckdb {
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;

/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in pgss_shmem_startup().
*/
static void
ShmemRequest(void) {
if (prev_shmem_request_hook)
prev_shmem_request_hook();

RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct));
}

/*
* CheckpointerShmemInit
* Allocate and initialize checkpointer-related shared memory
*/
static void
ShmemStartup(void) {
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

Size size = sizeof(BackgoundWorkerShmemStruct);
bool found;

/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);

BgwShmemStruct = (BackgoundWorkerShmemStruct *)ShmemInitStruct("DuckdbBackgroundWorker Data", size, &found);

if (!found) {
/*
* First time through, so initialize. Note that we zero the whole
* requests array; this is so that CompactCheckpointerRequestQueue can
* assume that any pad bytes in the request structs are zeroes.
*/
MemSet(BgwShmemStruct, 0, size);
SpinLockInit(&BgwShmemStruct->lock);
}

LWLockRelease(AddinShmemInitLock);
}

void
DuckdbInitBackgroundWorker(void) {
InitBackgroundWorker(void) {
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
return;
}
Expand All @@ -143,9 +235,27 @@ DuckdbInitBackgroundWorker(void) {

// Register the worker
RegisterBackgroundWorker(&worker);

/* Set up the shared memory hooks */
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = ShmemRequest;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = ShmemStartup;
}

void
TriggerActivity(void) {
if (!IsMotherDuckEnabled()) {
return;
}

SpinLockAcquire(&BgwShmemStruct->lock);
BgwShmemStruct->activity_count++;
/* Force wake up the background worker */
SetLatch(BgwShmemStruct->bgw_latch);
SpinLockRelease(&BgwShmemStruct->lock);
}

namespace pgduckdb {
/* Global variables that are used to communicate with our event triggers so
* they handle DDL of syncing differently than user-initiated DDL */
bool doing_motherduck_sync;
Expand Down Expand Up @@ -546,30 +656,25 @@ CreateSchemaIfNotExists(const char *postgres_schema_name, bool is_default_db) {
return true;
}

void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade);

void
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade) {
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade);
static void
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context) {
/*
* TODO: Passing a reference through InvokeCPPFunc doesn't work here
* for some reason. So to work around that we use a pointer instead.
* We should fix the underlying problem instead.
*/
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade, &context);
}

void
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
static void
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *_context) {
if (!pgduckdb::IsMotherDuckEnabled()) {
throw std::runtime_error("MotherDuck support is not enabled");
}

initial_cache_version = pgduckdb::CacheVersion();
auto &context = *_context;

/*
* We don't use GetConnection, because we want to be able to precisely
* control the transaction lifecycle. We commit Postgres connections
* throughout this function, and the GetConnect its cached connection its
* lifecycle would be linked to those postgres transactions, which we
* don't want.
*/
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
auto &context = *connection->context;
initial_cache_version = pgduckdb::CacheVersion();

auto &db_manager = duckdb::DatabaseManager::Get(context);
const auto &default_db = db_manager.GetDefaultDatabase(context);
Expand Down
12 changes: 4 additions & 8 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "duckdb/main/extension_util.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"

#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/catalog/pgduckdb_storage.hpp"
#include "pgduckdb/scan/postgres_scan.hpp"
#include "pgduckdb/pg/transactions.hpp"
Expand Down Expand Up @@ -191,14 +192,9 @@ DuckDBManager::Initialize() {
pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;");

if (pgduckdb::IsMotherDuckEnabled()) {
/*
* Workaround for MotherDuck catalog sync that turns off automatically,
* in case of no queries being sent to MotherDuck. Since the background
* worker never sends any query to MotherDuck we need to turn this off.
* So we set the timeout to an arbitrary very large value.
*/
pgduckdb::DuckDBQueryOrThrow(context,
"SET motherduck_background_catalog_refresh_inactivity_timeout='99 years'");
pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" +
duckdb::KeywordHelper::WriteQuoted(
duckdb_motherduck_background_catalog_refresh_inactivity_timeout));
}

LoadFunctions(context);
Expand Down
31 changes: 31 additions & 0 deletions src/pgduckdb_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_ddl.hpp"
#include "pgduckdb/pgduckdb_table_am.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/utility/copy.hpp"
#include "pgduckdb/vendor/pg_explain.hpp"
#include "pgduckdb/vendor/pg_list.hpp"
Expand Down Expand Up @@ -189,10 +190,12 @@ static PlannedStmt *
DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options, ParamListInfo bound_params) {
if (pgduckdb::IsExtensionRegistered()) {
if (NeedsDuckdbExecution(parse)) {
pgduckdb::TriggerActivity();
IsAllowedStatement(parse, true);

return DuckdbPlanNode(parse, query_string, cursor_options, bound_params, true);
} else if (duckdb_force_execution && IsAllowedStatement(parse) && ContainsFromClause(parse)) {
pgduckdb::TriggerActivity();
PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, query_string, cursor_options, bound_params, false);
if (duckdbPlan) {
return duckdbPlan;
Expand Down Expand Up @@ -353,6 +356,18 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp
prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv);
}

static bool
IsOutdatedMotherduckCatalogErrcode(int error_code) {
switch (error_code) {
case ERRCODE_UNDEFINED_COLUMN:
case ERRCODE_UNDEFINED_SCHEMA:
case ERRCODE_UNDEFINED_TABLE:
return true;
default:
return false;
}
}

static void
DuckdbEmitLogHook(ErrorData *edata) {
if (prev_emit_log_hook) {
Expand Down Expand Up @@ -385,6 +400,22 @@ DuckdbEmitLogHook(ErrorData *edata) {
"If you use DuckDB functions like read_parquet, you need to use the r['colname'] syntax introduced "
"in pg_duckdb 0.3.0. It seems like you might be using the outdated \"AS (colname coltype, ...)\" syntax");
}

/*
* The background worker stops syncing the catalogs after the
* motherduck_background_catalog_refresh_inactivity_timeout has been
* reached. This means that the table metadata that Postgres knows about
* could be out of date, which could then easily result in errors about
* missing from the Postgres parser because it cannot understand the query.
*
* This mitigates the impact of that by triggering a restart of the catalog
* syncing when one of those errors occurs AND the current user can
* actually use DuckDB.
*/
if (IsOutdatedMotherduckCatalogErrcode(edata->sqlerrcode) && pgduckdb::IsExtensionRegistered() &&
pgduckdb::IsDuckdbExecutionAllowed()) {
pgduckdb::TriggerActivity();
}
}

void
Expand Down

0 comments on commit 892f6c7

Please sign in to comment.