Skip to content

Async, order-preserving batch INSERTs#191

Open
guillesd wants to merge 9 commits into
duckdb:mainfrom
guillesd:feat/async-send-data
Open

Async, order-preserving batch INSERTs#191
guillesd wants to merge 9 commits into
duckdb:mainfrom
guillesd:feat/async-send-data

Conversation

@guillesd

@guillesd guillesd commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

#189 moved the server-side append onto a real INSERT INTO t SELECT * FROM scan_data_from_quack_client('<id>') driven by PhysicalBatchInsert, but left the client deliberately dumb: it sent one chunk per QUACK_SEND_DATA_REQUEST and blocked the execution thread on each POST. That serializes the whole upload behind the network, and it throws away source order the moment more than one thread is producing.

This PR is the async/parallel client #189 said would come later, plus the server-side machinery needed to put the rows back in order. The upload now runs off the execution threads, multiple requests are in flight at once, and a remote INSERT ... SELECT lands rows in the same order it would locally. This supersedes #179 and is also 2 to 2.5x faster in localhost.

This builds on #189 and keeps the read path untouched.

Async upload

The sink no longer POSTs inline. A regular execution thread serializes its buffered chunks into a QUACK_SEND_DATA_REQUEST and registers the payload with a per-statement ManagedAsyncTaskQueue; an ASYNC-pool thread then does the blocking POST and checks the ack. So the producer threads keep scanning and serializing while the network work happens elsewhere, and several sends are outstanding at the same time.

To make that split clean I added QuackClient::PostRaw, which takes already-serialized bytes and returns the raw response body. The serialize step runs on the producer thread (where the ClientContext is live), and the POST runs on the async pool with context=nullptr, falling back to database-level params and logging. FINALIZE drains the queue before it returns, so an autocommit insert is still atomic on the server.

Ordering modes

Order preservation is decided at plan time in ConfigureOrdering, mirroring core's plan_insert.cpp, and stored on the operator as an AppendOrderMode:

  • UNORDERED (preserve_insertion_order=false): the fast path. No stamps, the server inserts chunks as they arrive.
  • PARALLEL_ORDERED: parallel sources (table/parquet scans). Each thread stamps its chunks with (batch_index, sequence_index, is_last_in_batch) taken from the executor's batch index, and the server reassembles.
  • SERIAL_ORDERED: single-threaded sources like range() that have no executor batch index. The lone producer mints a fresh batch per flush.

RequiredPartitionInfo only asks for executor batch indices in PARALLEL_ORDERED, so the executor's batch-index assertion never fires for sources that don't supply one. ParallelSink is true for everything except SERIAL_ORDERED.

Wire protocol

SEND_DATA_REQUEST now carries a vector of chunks instead of a single one (one message is one flush), plus the ordering stamp and a batch_watermark. FINALIZE carries a min_batch_watermark.

The watermark is the piece that makes ordered delivery safe. It is the minimum batch index that will ever appear in the stream, piggybacked on every ordered message. Without it the server cannot tell whether a low batch is still coming or simply never existed, so it could deliver batch 5 before batch 4 ever arrives and trip PhysicalBatchInsert's monotonicity check. With the watermark the server initialises its delivery cursor and only drains a batch once every batch below it is accounted for. FINALIZE repeats it as a safety net in case all messages arrived before any per-message watermark was valid.

Server side: reassembly and parallel insert

QuackDataStream grew an ordered-assembly stage. PushOrdered buffers (batch, seq) fragments and, as soon as a batch's full sequence range has arrived, drains it as one unit into a ready_batches_ queue in ascending batch order. Unordered streams skip all of that: PushUnordered drops one wire message's chunks straight onto the same queue.

On the scan side, scan_data_from_quack_client now raises MaxThreads to the live thread count. Each scan task claims one complete batch from ready_batches_ via TryPopBatch and owns it exclusively until it is drained, so PhysicalBatchInsert sees a stable, unique batch index per task and can write row groups in parallel without interleaving rows across a batch boundary. When no batch is ready the task returns BLOCKED with an ASYNC-pool wait task, same as before.

Unordered streams take a different operator entirely: the bind signals NO_ORDER, which makes the planner pick PhysicalInsert(parallel=true) instead of PhysicalBatchInsert. That sink ignores batch indices, so concurrent tasks can append freely. Both modes share the one ready_batches_ queue and the one scan loop; the only thing that differs is how the producer fills the queue and which insert operator drains it.

Follow-ups

  • ready_batches_ is unbounded for now. Real flow control / backpressure management (the server handing the client an accept budget through the SEND_DATA_RESPONSE placeholder) is out of scope for this PR.
  • Decoupling serialization from request (or a good abstraction that can be reused and does this) is a separate transport change that now is implemented partially for the purpose of async posting but could be reused in different parts (like the fetch path)
  • Also memory in QuackDataStream is not really tracked by the buffer manager. Maybe we'd like this to be the case although I am not sure. Maybe just doing a good job on the backpressure side is enough. Or as Mark mentioned, make CollumnDataCollection serializable and use this in the transport (ColumnDataCollection is buffer managed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant