Skip to content

Support parallel DuckDB threads for Postgres table scan #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 16, 2025

Conversation

YuweiXiao
Copy link
Contributor

Currently, we use a single DuckDB thread for Postgres table scan, even though multiple Postgres workers will be initialized. This leads to a performance bottleneck when scanning large amounts of data.

This PR parallelizes the conversion from Postgres tuple to DuckDB data chunk. Below are benchmark results on a 5GB TPCH lineitem table.

  • Benchmark query: select * from lineitem order by 1 limit 1
  • Other GUC setups: duckdb.max_workers_per_postgres_scan = 2
Threads (duckdb.threads_for_postgres_scan) Costs (seconds)
1 15.8
2 8.7
4 5.8

Copy link
Collaborator

@JelteF JelteF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! The perf differences you report are very impressive. I think a bit more code comments would be quite helpful to make understanding this easier.

Similarly to #688 I'm postponing this until after 1.0 though, given it touches a very core part of pg_duckdb.

@@ -1505,6 +1505,7 @@ AppendString(duckdb::Vector &result, Datum value, idx_t offset, bool is_bpchar)

static void
AppendJsonb(duckdb::Vector &result, Datum value, idx_t offset) {
std::lock_guard<std::recursive_mutex> lock(GlobalProcessLock::GetLock());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the only types that need this additional locking now? Would be good to explicitely state in a comment for each other type why they are thread safe. That way we won't forget to check when introducing support for new types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, only JSON and LIST. Additionally, as mentioned in #750, both of these types have memory issues.

I will add a comment for ConvertPostgresToDuckValue. The overall rule is that it is thread-safe as long as it does not use Postgres MemContext.

Comment on lines 268 to 279
bool is_parallel_scan = local_state.global_state->MaxThreads() > 1;
if (!is_parallel_scan) {
std::lock_guard<std::recursive_mutex> lock(GlobalProcessLock::GetLock());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this difference in behaviour between 1 and more than one threads doesn't completely make sense. Even if max_threads_per_postgres_scan is 1, it's still possible to have two different postgres scans running in parallel. Those two concurrent postgres scans would still benefit from not holding a lock during InsertTupleIntoChunk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading more I now realize this is probably important for the case where we don't use backgroundworkers for the scan. So maybe we should keep this functionality. But I think it's worth refactoring and/or commenting this a bit more, because now the logic is quite hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay. I was trying to keep the original code unchanged for the single-thread case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have the two cases share a bit more code. Now it's unclear if the places where they are different are on purpose or by accident.

void
SlotGetAllAttrsUnsafe(TupleTableSlot *slot) {
slot_getallattrs(slot);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed scary to me, but looking closely at the implementation of slot_getallattrs it doesn't use memory contexts nor can it throw an error. The only place where it throws an error is:

	if (unlikely(attnum > slot->tts_tupleDescriptor->natts))
		elog(ERROR, "invalid attribute number %d", attnum);

But that can condition can never be false, due to the fact that slot->tts_tupleDescriptor->natts is passed as attnum.

Could you merge this function function with SlotGetAllAttrs? And add the above information in a code comment for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure. I am using the term "unsafe" to refer to the fact that it is not protected by PostgresFunctionGuard, even though it does not actually require that protection :)

};

// Local State

#define LOCAL_STATE_SLOT_BATCH_SIZE 32
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 32? Maybe we should we make this configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned about burdening users with another GUC hyperparameter.

I tested batch sizes of 8, 16, 32, and 64, and found that 32 performs the best. BTW, the batch size helps to amortize the lock overhead across threads.

TupleTableSlot *InitTupleSlot();
bool
IsParallelScan() const {
return nworkers_launched > 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find the place where you change nworkers_launched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is assigned during the initialization of Postgres parallel workers, which is part of the original code logic. I simply added 0 initialization and the interface.

SlotGetAllAttrs(slot);
InsertTupleIntoChunk(output, local_state, slot);
for (size_t j = 0; j < valid_slots; j++) {
MinimalTuple minmal_tuple = reinterpret_cast<MinimalTuple>(local_state.minimal_tuple_buffer[j].data());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Y-- I need your C++ knowledge. Is this a good way to keep a buffer of MinimalTuples?

One thought I had is that now we do two copies of the minimal tuple:

  1. Once from the stack into the buffer (in GetnextMinimalTuple)
  2. Once from the buffer back to the stack (here).

I think if we instead have an array of MinimalTuple that we realloc instead of using vectors of bytes, then we only need to copy once and we can pass the minimal tuple from the buffer directly into ExecStoreMinimalTupleUnsafe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't do any copy (and thus doesn't extend/modify its lifetime).
It forces the compiler to accept that the bytes stored in the minimal_tuple_buffer[j] vector are a MinimalTuple (aka MinimalTupleData *, where MinimalTupleData is itself a struct).

I haven't read the code yet, but my first question would be: why are they vector<uint8_t> instead of vector<MinimalTuple> in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @Y-- pointed out, only one copy occurs (from Postgres parallel workers' shared memory to the buffer).

One benefit of using vector<uint8_t> is that we have an off-the-shelf API to enlarge or shrink the buffer (i.e., resize). Additionally, there is no need to worry about memory leaks, as they are handled by RAII.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It forces the compiler to accept that the bytes stored in the minimal_tuple_buffer[j] vector are a MinimalTuple (aka MinimalTupleData *, where MinimalTupleData is itself a struct).

Sounds like that could cause alignment problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Let me confirm the alignment issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-checked that there won't be any alignment issues. std::vector and MemoryContext (which internally uses malloc) follow the same rule to align to at least alignof(max_align_t).

}

TupleTableSlot *
ExecStoreMinimalTupleUnsafe(MinimalTuple minmal_tuple, TupleTableSlot *slot, bool shouldFree) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the comment I left above. Let's add a comment why this is safe to use without the lock. Something like:

It's safe to call ExecStoreMinimalTuple without the PostgresFunctionGuard because it does not allocate in memory contexts and the only error it can throw is when the slot is not a minimal slot. That error is an obvious programming error so we can ignore it here.

And just like the function above let's drop the Unsafe from the name. (you probably need to change the body to call the original like ::ExecStoreMinimalTuple(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecStoreMinimalTuple might do pfree if the slot is owned by the tuple (TTS_SHOULDFREE(slot)). I added the comment to it.

@JelteF JelteF added this to the 1.1.0 milestone May 7, 2025
@YuweiXiao
Copy link
Contributor Author

@JelteF Thanks for the review! YES, go for 1.1.0 is reasonable.

@JelteF
Copy link
Collaborator

JelteF commented May 30, 2025

Do you plan on addressing the review feedback, I'm considering maybe merging this in for 1.0 anyway if it's in a good state.

@YuweiXiao
Copy link
Contributor Author

Yeah, that would be nice! Let me resolve the conflict first.

@YuweiXiao YuweiXiao force-pushed the issue_parallel_postgres_scan branch from 5adfaf6 to 88b7d36 Compare May 30, 2025 13:26
@YuweiXiao YuweiXiao requested a review from JelteF May 30, 2025 13:34
MinimalTuple minmal_tuple = reinterpret_cast<MinimalTuple>(local_state.minimal_tuple_buffer[j].data());
local_state.slot = ExecStoreMinimalTupleUnsafe(minmal_tuple, local_state.slot, false);
SlotGetAllAttrs(local_state.slot);
InsertTupleIntoChunk(output, local_state, local_state.slot);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not switching the to the tuple memory context here. That will cause the leaks again. I think we probably want to pass the memory context into InsertTupleIntoChunk, because we only want to switch to it when we need to for the type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is switched before the for loop. But you reminds me that it is not thread safe to do the switch. Let me check how we can resolve the leak here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't set it before the loop, because then it will also be used when getting the next tuple, which caused problems here: #805

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the memory context seems to be the only way, in order to maintain parallelism. Should we fallback to single-thread processing at the very beginning when we encounter JSON/LIST? This would eliminate the need for switching here. In that case for LIST/JSON, parallelism does not help much as we keep locking in the middle of something.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess maybe it'd okay, because that was only a problem when we were not using background workers to do the actual reading. And this threading logic only kicks in when we do use background workers right? Still it seems nice to align behaviour for threaded and non threaded code for easy maintainability and understanding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES, threading comes along with bg scan worker. Let me try re-org these two part of code.

* GlobalProcessLock should be held before calling this.
*/
bool
PostgresTableReader::GetNextMinimalTuple(std::vector<uint8_t> &minimal_tuple_buffer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear that this requires using background workers for the reading. Let's change the name and update the comment.

Suggested change
PostgresTableReader::GetNextMinimalTuple(std::vector<uint8_t> &minimal_tuple_buffer) {
PostgresTableReader::GetNextMinimalWorkerTuple(std::vector<uint8_t> &minimal_tuple_buffer) {

@YuweiXiao YuweiXiao requested a review from JelteF May 31, 2025 09:19
@@ -1868,6 +1869,7 @@ ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, i
break;
}
case duckdb::LogicalTypeId::LIST: {
std::lock_guard<std::recursive_mutex> lock(GlobalProcessLock::GetLock());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is not necessary since we fallback to single-thread for LIST/JSON?

@@ -145,9 +146,12 @@ InitGUC() {
DefineCustomVariable("duckdb.log_pg_explain", "Logs the EXPLAIN plan of a Postgres scan at the NOTICE log level",
&duckdb_log_pg_explain);

DefineCustomVariable("duckdb.threads_for_postgres_scan",
"Maximum number of DuckDB threads used for a single Postgres scan",
&duckdb_threads_for_postgres_scan, 1, MAX_PARALLEL_WORKER_LIMIT, PGC_SUSET);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to make this PGC_SUSET.

Suggested change
&duckdb_threads_for_postgres_scan, 1, MAX_PARALLEL_WORKER_LIMIT, PGC_SUSET);
&duckdb_threads_for_postgres_scan, 1, MAX_PARALLEL_WORKER_LIMIT);

}

SlotGetAllAttrs(slot);
// This memory context is use as a scratchpad space for any allocation required to add the tuple
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This memory context is use as a scratchpad space for any allocation required to add the tuple
// This memory context is used as a scratchpad space for any allocation required to add the tuple

MinimalTuple
PostgresTableReader::GetNextWorkerTuple() {
int nvisited = 0;
TupleQueueReader *reader = NULL;
MinimalTuple minimal_tuple = NULL;
bool readerdone = false;
for (;;) {
for (; next_parallel_reader < nreaders;) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this suddenly needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the multi-threaded scenario, one thread might read all worker tuples, perform cleanup, and then release the global lock. When other threads call this function afterward, they will attempt to index an empty array, potentially causing a segmentation fault.

@@ -36,15 +36,18 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
MemoryContext duckdb_scan_memory_ctx;
int max_threads;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit- should we use idx_t here? (since this is what we return in MaxThreads above?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES!

if (cleaned_up) {
return;
return NULL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case can the InitTupleSlot be called when the reader was cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encountered a case where one thread read all tuples and performed cleanup while another thread was still initializing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - maybe worth a comment? I'm sure I will forget in approximatively 10 minutes :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing

@YuweiXiao YuweiXiao requested a review from JelteF June 3, 2025 10:58
@JelteF JelteF modified the milestones: 1.1.0, 1.0.0 Jun 3, 2025
@YuweiXiao YuweiXiao force-pushed the issue_parallel_postgres_scan branch 2 times, most recently from d65fb9d to 1fd2abf Compare June 4, 2025 11:55
@YuweiXiao YuweiXiao requested a review from Y-- June 4, 2025 11:55
Comment on lines 410 to 412
// - The scan includes JSON or LIST columns, since parallelism is inefficient for these types. This is because
// converting these types requires calling Postgres functions, which use the Postgres memory context and
// require holding the global lock, limiting parallel efficiency.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this really makes sense. If such columns are NULL there's no need for locking. And even if they are not, there might still be enough other columns that don't need locking. Also the locking is only needed for JSONB columns, not for regular JSON columns.

Copy link
Collaborator

@JelteF JelteF Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, whichever route we go. We should do the same for the varbit/bit type too. That one also allocates internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, you are right. varbit/bit should be handled. Not sure if there is a way to guard us from unexpected palloc calls in a multi-threading setup. It is hard to maintain for cases like adding new types or changing the impl for conversion.

The alternative solution, passing down the memory context and doing locking for these types makes the whole protect logic scattered around the code. It is not maintainable either.

I am having an idea to make the conversion column-based (only when multi-threading) and do locking if necessary based on type checking. Something looks like:

for (size_t i = 0; i < valid_slots; ++i) {
      // construct slots from tuple buffer
      buffer_slots[i] = ...
}
for (size_t i = 0; i < num_columns; ++i) {
      bool unsafe = is_type_unsafe(desc[i])
      if (unsafe)
           locking & setup memory
      InsertTupleIntoChunkColumns(output, local_state, buffer_slots, valid_slots, i);
      if (unsafe)
           unlock & reset memory
}

Copy link
Collaborator

@JelteF JelteF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is close to being merge-able. I left a final comment about the JSON/LIST/VARBIT stuff. And apart from that this needs merge conflicts resolved. But other than that I think this is good.

@YuweiXiao YuweiXiao force-pushed the issue_parallel_postgres_scan branch from 1fd2abf to 47f6805 Compare June 9, 2025 07:03
@YuweiXiao
Copy link
Contributor Author

YuweiXiao commented Jun 9, 2025

@JelteF Hey, restrictions on unsafe types like JSON/LIST have been removed by converting Postgres slots into DuckDB data chunks in a columnar fashion. If any other unsafe type is supported in the future, one only needs to add it to IsThreadSafeTypeForPostgresToDuckDB.

btw, the columnar conversion can be optimized by eliminating if-else branch (also switch statement). This may involve a large amount of code refactoring.

@YuweiXiao YuweiXiao requested a review from JelteF June 10, 2025 00:43
@JelteF JelteF merged commit d8f548b into duckdb:main Jun 16, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants