Skip to content

Support parallel DuckDB threads for Postgres table scan #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 16, 2025
Merged
2 changes: 2 additions & 0 deletions include/pgduckdb/pg/relations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ bool TupleIsNull(TupleTableSlot *slot);

void SlotGetAllAttrs(TupleTableSlot *slot);

TupleTableSlot *ExecStoreMinimalTupleUnsafe(MinimalTuple minmal_tuple, TupleTableSlot *slot, bool shouldFree);

double EstimateRelSize(Relation rel);

Oid GetRelidFromSchemaAndTable(const char *, const char *);
Expand Down
1 change: 1 addition & 0 deletions include/pgduckdb/pgduckdb_guc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern bool duckdb_allow_community_extensions;
extern bool duckdb_allow_unsigned_extensions;
extern bool duckdb_autoinstall_known_extensions;
extern bool duckdb_autoload_known_extensions;
extern int duckdb_threads_for_postgres_scan;
extern int duckdb_max_workers_per_postgres_scan;
extern char *duckdb_postgres_role;
extern char *duckdb_motherduck_session_hint;
Expand Down
2 changes: 2 additions & 0 deletions include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type
void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset);
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot);
void InsertTuplesIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot **slots,
int num_slots);

} // namespace pgduckdb
10 changes: 8 additions & 2 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
~PostgresScanGlobalState();
idx_t
MaxThreads() const override {
return 1;
return max_threads;
}
void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input);
bool RegisterLocalState();
void UnregisterLocalState();

private:
int ExtractQueryFilters(duckdb::TableFilter *filter, const char *column_name, duckdb::string &filters,
Expand All @@ -35,18 +37,22 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
bool count_tuples_only;
duckdb::vector<AttrNumber> output_columns;
std::atomic<std::uint32_t> total_row_count;
std::atomic<std::int32_t> registered_local_states;
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
MemoryContext duckdb_scan_memory_ctx;
idx_t max_threads;
};

// Local State

#define LOCAL_STATE_SLOT_BATCH_SIZE 32
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 32? Maybe we should we make this configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned about burdening users with another GUC hyperparameter.

I tested batch sizes of 8, 16, 32, and 64, and found that 32 performs the best. BTW, the batch size helps to amortize the lock overhead across threads.

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
PostgresScanLocalState(PostgresScanGlobalState *global_state);
~PostgresScanLocalState() override;

PostgresScanGlobalState *global_state;
TupleTableSlot *slots[LOCAL_STATE_SLOT_BATCH_SIZE];
std::vector<uint8_t> minimal_tuple_buffer[LOCAL_STATE_SLOT_BATCH_SIZE];

size_t output_vector_size;
bool exhausted_scan;
Expand Down
8 changes: 8 additions & 0 deletions include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "pgduckdb/pg/declarations.hpp"

#include <vector>

#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.

namespace pgduckdb {
Expand All @@ -13,6 +15,12 @@ class PostgresTableReader {
TupleTableSlot *GetNextTuple();
void Init(const char *table_scan_query, bool count_tuples_only);
void Cleanup();
bool GetNextMinimalWorkerTuple(std::vector<uint8_t> &minimal_tuple_buffer);
TupleTableSlot *InitTupleSlot();
int
NumWorkersLaunched() const {
return nworkers_launched;
}

private:
PostgresTableReader(const PostgresTableReader &) = delete;
Expand Down
13 changes: 12 additions & 1 deletion src/pg/relations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,18 @@ TupleIsNull(TupleTableSlot *slot) {

void
SlotGetAllAttrs(TupleTableSlot *slot) {
PostgresFunctionGuard(slot_getallattrs, slot);
// It is safe to call slot_getallattrs directly without the PostgresFunctionGuard because the function doesn't
// perform any memory allocations. Assertions or errors are guaranteed not to occur for minimal slots.
slot_getallattrs(slot);
}

TupleTableSlot *
ExecStoreMinimalTupleUnsafe(MinimalTuple minmal_tuple, TupleTableSlot *slot, bool shouldFree) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the comment I left above. Let's add a comment why this is safe to use without the lock. Something like:

It's safe to call ExecStoreMinimalTuple without the PostgresFunctionGuard because it does not allocate in memory contexts and the only error it can throw is when the slot is not a minimal slot. That error is an obvious programming error so we can ignore it here.

And just like the function above let's drop the Unsafe from the name. (you probably need to change the body to call the original like ::ExecStoreMinimalTuple(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecStoreMinimalTuple might do pfree if the slot is owned by the tuple (TTS_SHOULDFREE(slot)). I added the comment to it.

// It's safe to call ExecStoreMinimalTuple without the PostgresFunctionGuard as long as the slot is not "owned" by
// the tuple, i.e., TTS_SHOULDFREE(slot) is false. This is because it does not allocate in memory contexts and the
// only error it can throw is when the slot is not a minimal slot. That error is an obvious programming error so we
// can ignore it here.
return ::ExecStoreMinimalTuple(minmal_tuple, slot, shouldFree);
}

Relation
Expand Down
1 change: 1 addition & 0 deletions src/pgduckdb_detoast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ToastFetchDatum(struct varlena *attr) {
return result;
}

// This function is thread-safe and does not utilize the PostgreSQL memory context.
Datum
DetoastPostgresDatum(struct varlena *attr, bool *should_free) {
struct varlena *toasted_value = nullptr;
Expand Down
6 changes: 5 additions & 1 deletion src/pgduckdb_guc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ bool duckdb_force_execution = false;
bool duckdb_unsafe_allow_mixed_transactions = false;
bool duckdb_convert_unsupported_numeric_to_double = false;
bool duckdb_log_pg_explain = false;
int duckdb_threads_for_postgres_scan = 2;
int duckdb_max_workers_per_postgres_scan = 2;
char *duckdb_motherduck_session_hint = strdup("");
char *duckdb_postgres_role = strdup("");
Expand Down Expand Up @@ -146,9 +147,12 @@ InitGUC() {
DefineCustomVariable("duckdb.log_pg_explain", "Logs the EXPLAIN plan of a Postgres scan at the NOTICE log level",
&duckdb_log_pg_explain);

DefineCustomVariable("duckdb.threads_for_postgres_scan",
"Maximum number of DuckDB threads used for a single Postgres scan",
&duckdb_threads_for_postgres_scan, 1, MAX_PARALLEL_WORKER_LIMIT);
DefineCustomVariable("duckdb.max_workers_per_postgres_scan",
"Maximum number of PostgreSQL workers used for a single Postgres scan",
&pgduckdb::duckdb_max_workers_per_postgres_scan, 0, MAX_PARALLEL_WORKER_LIMIT);
&duckdb_max_workers_per_postgres_scan, 0, MAX_PARALLEL_WORKER_LIMIT);

DefineCustomVariable("duckdb.postgres_role",
"Which postgres role should be allowed to use DuckDB execution, use the secrets and create "
Expand Down
81 changes: 79 additions & 2 deletions src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "pgduckdb/pgduckdb_utils.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/scan/postgres_scan.hpp"
#include "pgduckdb/pg/memory.hpp"
#include "pgduckdb/pg/types.hpp"

extern "C" {
Expand Down Expand Up @@ -355,8 +356,8 @@ ConvertTimestampTzDatum(const duckdb::Value &value) {
if (!ValidTimestampOrTimestampTz(rawValue))
throw duckdb::OutOfRangeException(
"The TimestampTz value should be between min and max value (%s <-> %s)",
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_t>(PGDUCKDB_MIN_TIMESTAMP_VALUE)),
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_t>(PGDUCKDB_MAX_TIMESTAMP_VALUE)));
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_tz_t>(PGDUCKDB_MIN_TIMESTAMP_VALUE)),
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_tz_t>(PGDUCKDB_MAX_TIMESTAMP_VALUE)));

return TimestampTzGetDatum(rawValue - pgduckdb::PGDUCKDB_DUCK_TIMESTAMP_OFFSET);
}
Expand Down Expand Up @@ -1984,6 +1985,82 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_loc
scan_global_state->total_row_count++;
}

/*
* Returns true if the given type can be converted from a Postgres datum to a DuckDB value
* without requiring any Postgres-specific functions or memory allocations (such as palloc).
*/
static bool
IsThreadSafeTypeForPostgresToDuckDB(Oid attr_type, duckdb::LogicalTypeId duckdb_type) {
if (duckdb_type == duckdb::LogicalTypeId::VARCHAR) {
return attr_type != JSONBOID;
}
if (duckdb_type == duckdb::LogicalTypeId::LIST || duckdb_type == duckdb::LogicalTypeId::BIT) {
return false;
}

return true;
}

/*
* Insert batch of tuples into chunk. This function is thread-safe and is meant for multi-threaded scans.
*
* Global lock & PG memory context are handled for unsafe types, e.g., JSONB/LIST/VARBIT.
*/
void
InsertTuplesIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot **slots,
int num_slots) {
if (num_slots == 0) {
return;
}

auto scan_global_state = scan_local_state.global_state;
int natts = slots[0]->tts_tupleDescriptor->natts;
D_ASSERT(!scan_global_state->count_tuples_only);

for (int duckdb_output_index = 0; duckdb_output_index < natts; duckdb_output_index++) {
auto &result = output.data[duckdb_output_index];
auto attr = slots[0]->tts_tupleDescriptor->attrs[duckdb_output_index];
bool is_safe_type = IsThreadSafeTypeForPostgresToDuckDB(attr.atttypid, result.GetType().id());

std::unique_ptr<std::lock_guard<std::recursive_mutex>> lock_guard;
MemoryContext old_ctx = NULL;
if (!is_safe_type) {
lock_guard = std::make_unique<std::lock_guard<std::recursive_mutex>>(GlobalProcessLock::GetLock());
old_ctx = pg::MemoryContextSwitchTo(scan_global_state->duckdb_scan_memory_ctx);
}

for (int row = 0; row < num_slots; row++) {
if (slots[row]->tts_isnull[duckdb_output_index]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(scan_local_state.output_vector_size + row);
} else {
if (attr.attlen == -1) {
bool should_free = false;
Datum detoasted_value = DetoastPostgresDatum(
reinterpret_cast<varlena *>(slots[row]->tts_values[duckdb_output_index]), &should_free);
ConvertPostgresToDuckValue(attr.atttypid, detoasted_value, result,
scan_local_state.output_vector_size + row);
if (should_free) {
duckdb_free(reinterpret_cast<void *>(detoasted_value));
}
} else {
ConvertPostgresToDuckValue(attr.atttypid, slots[row]->tts_values[duckdb_output_index], result,
scan_local_state.output_vector_size + row);
}
}
}

if (!is_safe_type) {
pg::MemoryContextSwitchTo(old_ctx);
pg::MemoryContextReset(scan_global_state->duckdb_scan_memory_ctx);
// Lock will be automatically unlocked when lock_guard goes out of scope
}
}

scan_local_state.output_vector_size += num_slots;
scan_global_state->total_row_count += num_slots;
}

NumericVar
FromNumeric(Numeric num) {
NumericVar dest;
Expand Down
Loading