Skip to content

Async appends for quack#179

Open
guillesd wants to merge 18 commits into
duckdb:mainfrom
guillesd:feat/async-append
Open

Async appends for quack#179
guillesd wants to merge 18 commits into
duckdb:mainfrom
guillesd:feat/async-append

Conversation

@guillesd

@guillesd guillesd commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

This PR introduces the ManagedAsyncTaskQueue from duckdb/duckdb#23404 as to offload the APPEND_REQUEST messages to our Async IO thread pool. This follows up on #171 which was batching APPEND_REQUEST messages in vectors of data chunks.

After implementing this, I optimised it further by parallelising the requests using our regular thread pool. This was already a massive speed up vs main. This is what I call the optimized branch in the follow up benchmarks that I will show.

This PR introduces our async thread pool, which gets tasks handed off by ManagedAsyncTaskQueue. Regular threads read data, crunch and serialize, and the async threads just posts over the wire. This is a clean way of getting "async like" performance without going purely async on the protocol side (non-blocking event loop or multiplexed protocol). So to clarify, the protocol is not becoming async, it is just being more efficient.

Note that we use rows as a an upper limit for the batches. The reason for this is that you may have INSERT INTO t FROM t2 WHERE col = 2 which can produce incomplete data chunks. So to make sure we actually count the rows to saturate based on that. To be fair we could also use size. But anyway I feel like all of this should go into a cost model anyway at some point.

The numbers

I ran two main benchmarks, one with multiple smaller batches (micro-batching) and one with a bigger batch of 1M rows.

Single 1 M row batch

Env main optimize async async vs main async vs optimize
local (~0.05ms) 0.945 0.024 0.014 67× 1.7×
ec2→ec2 (~3ms) 1.839 0.056 0.034 54× 1.65×
WAN (~18ms) 10.11 0.526 0.159 64× 3.3×

As you can see the append path was not really optimized, so right out of the back we get massive performance gains. Async is considerably better over WAN (which is great, shows that we are covering the network costs). Also better in EC2 to EC2 which I believe is the common use case (ETL using DuckDB inserting into another DuckDB over Quack).

Multi-batch (20 x 50k row inserts)

Env main optimize async
local 0.972 0.060 0.062
ec2→ec2 1.897 0.140 0.144
WAN 10.25 1.30 1.25

This is considerably better than main too (8x-13x), but as you can see for relatively small inserts (almost no round trips) the batching of serialization costs (multiple chunks in one go) and parallelization is what matters. For a couple of round trips the network cost is negligible.

Future plans

I want to do a similar optimization in the read path, decoupling execution threads from async threads doing the fetch requests. This will make pre-fetching easier to implement, via using the machinery exposed in ManagedAsyncTaskQueue.

Then potentially we can think for both paths to use a similar cost model to what pdet implemented for parquet async (fetch optimal sizes based on network bandwidth).

cc: @lnkuiper

@lnkuiper

Copy link
Copy Markdown
Member

Awesome performance numbers! Changes look clean too :)

Codex found two issues:

# name: test/temp.test
# description: quack async append should preserve insert order and client-side append logging
# group: [temp]

require quack

require httpfs

test-env QUACK_SERVER quack:localhost

set ignore_error_messages

statement ok
LOAD quack;

statement ok quack_db:quack_server_con
set variable actual_listen_uri = (select listen_uri from quack_serve('{QUACK_SERVER}:0', token='asdf'));

query I quack_db:quack_server_con
set variable listen_uri = concat('''', getvariable('actual_listen_uri'), '''');

set variable server <variable:quack_server_con:listen_uri>

statement ok
ATTACH {server} AS rpc (token 'asdf');

# Client-side append requests should be logged. The async append path must keep
# using the Quack request logging path, not only the raw HTTP transport path.
statement ok
CALL enable_logging('Quack', storage='memory');

statement ok
CREATE TABLE local_log_probe(i INTEGER);

statement ok
INSERT INTO local_log_probe VALUES (1), (2), (3);

statement ok
CREATE TABLE rpc.append_log_probe(i INTEGER);

statement ok
INSERT INTO rpc.append_log_probe SELECT * FROM local_log_probe;

statement ok
CREATE TEMP TABLE append_log_result AS
SELECT count(*) AS client_append_logs
FROM duckdb_logs_parsed('Quack')
WHERE message_type = 'APPEND_REQUEST'
  AND server IS NOT NULL
  AND server != '';

# Parallel inserts into Quack tables must respect preserve_insertion_order. The
# low flush threshold forces multiple async append batches to expose reordering.
statement ok
SET preserve_insertion_order=true;

statement ok
SET threads=4;

statement ok
SET quack_append_flush_rows=2048;

statement ok
CREATE TABLE rpc.order_probe(i BIGINT);

statement ok
INSERT INTO rpc.order_probe SELECT * FROM range(1000000) t(i);

statement ok
CREATE TEMP TABLE order_result AS
SELECT count(*) AS order_inversions
FROM (
	SELECT i, row_number() OVER () - 1 AS rn
	FROM rpc.order_probe
)
WHERE i != rn;

statement ok quack_db:quack_server_con
call quack_stop({server});

# Expected baseline behavior: the small append emits one client-side
# APPEND_REQUEST log, and the parallel append preserves row order exactly.
query II
SELECT client_append_logs, order_inversions
FROM append_log_result, order_result;
----
1	0

@guillesd guillesd force-pushed the feat/async-append branch from 263dd87 to 64e30d5 Compare June 25, 2026 20:19
@guillesd guillesd force-pushed the feat/async-append branch from 64e30d5 to 52e1954 Compare June 25, 2026 20:21
@guillesd

Copy link
Copy Markdown
Contributor Author

Quick follow-up on this: while testing the async path I noticed we weren't actually preserving insertion order. With preserve_insertion_order=true (the default), inserting from a parallel source like a parquet scan or a table scan with threads>1 was scrambling the rows. Turns out the parallel producers were already doing this even at async_threads=0, the async pool just added a second layer of reordering on top. range() hid it because it feeds the sink serially, so it looked fine there.

My first instinct was to stamp each APPEND_REQUEST with the executor's batch_index and let the server reorder by it, basically mirroring the read path. But that doesn't really hold up: the executor batch index is sparse and the offsets are source-dependent (a table scan starts its counter at 1, parquet at 0), and a parallel parquet scan even hands out row groups out of order. So the server has no reliable way to tell where the sequence starts or which gaps are real vs still in flight.

Then I went and looked at how PhysicalBatchCopyToFile handles this in core (the ordered COPY-to-parquet path), and it hits the exact same problem, except it re-maps locally instead of asking a remote party to reorder. Finished batches go into a sorted map keyed by the executor batch index, and as the min_batch_index watermark finalizes them they get released in order, each getting the next dense 0-based index. So I did the same on the client: a small coordinator that hands out a contiguous source-order sequence, and then the server just applies them in order from 0, no watermark guessing, no gaps to skip server-side.

Couple of nice side effects: the scan stays fully parallel (the re-map is just a quick locked hand-off, the heavy serialize/send stays outside the lock), and we only ever buffer the in-flight window on both sides instead of the whole insert. Performance is roughly the same as the unordered path for a 20M parquet file copy over the wire.

async_threads doesn't change any of this btw, it only decides whether the (still unordered) sends go out synchronously or through the pool; the ordering is handled the same way regardless. Tested across parquet, table scan, range and filtered sources with real batch gaps, sweeping async_threads 0/1/8, all preserving order.

@lnkuiper

Copy link
Copy Markdown
Member

Thanks for the changes!

Codex still finds some issues with regards to ordering and an issue for failed append leaving partial rows:

# name: test/temp.test
# description: quack async append order and failed-statement atomicity repros
# group: [temp]

require quack

require httpfs

test-env QUACK_SERVER quack:localhost

set ignore_error_messages

statement ok
LOAD quack;

statement ok quack_db:quack_server_con
set variable actual_listen_uri = (select listen_uri from quack_serve('{QUACK_SERVER}:0', token='asdf'));

query I quack_db:quack_server_con
set variable listen_uri = concat('''', getvariable('actual_listen_uri'), '''');

set variable server <variable:quack_server_con:listen_uri>

statement ok
ATTACH {server} AS rpc (token 'asdf');

statement ok
CREATE TEMP TABLE repro_results(name VARCHAR, failures BIGINT);

# Parallel table-scan appends should preserve source order when preserve_insertion_order is true.
statement ok
SET preserve_insertion_order=true;

statement ok
SET threads=4;

statement ok
CREATE TABLE src AS SELECT * FROM range(300000) t(i);

statement ok
CREATE TABLE rpc.t_scan(i BIGINT);

statement ok
INSERT INTO rpc.t_scan SELECT i FROM src;

# This currently records 300000: the server table starts at row 245760, not 0.
statement ok
INSERT INTO repro_results
SELECT 'table_scan_order_inversions', count(*)
FROM (
	SELECT i, row_number() OVER () - 1 AS rn
	FROM rpc.t_scan
)
WHERE i != rn;

# A failed append statement should not leave earlier async batches committed.
statement ok
CREATE TABLE rpc.checked(i BIGINT CHECK (i < 250000));

statement error
INSERT INTO rpc.checked SELECT * FROM range(500000) t(i);
----
CHECK constraint failed

# This currently records 204800: one default append flush batch remains visible.
statement ok
INSERT INTO repro_results
SELECT 'partial_rows_after_failed_insert', count(*)
FROM rpc.checked;

statement ok quack_db:quack_server_con
call quack_stop({server});

query TI
SELECT name, failures
FROM repro_results
ORDER BY name;
----
partial_rows_after_failed_insert	0
table_scan_order_inversions	0

@guillesd

Copy link
Copy Markdown
Contributor Author

@lnkuiper the atomicity bug is pre-existing in main. I can take a look if it's an easy fix

The other one seems fair!

@guillesd

Copy link
Copy Markdown
Contributor Author

I can't reproduce bug 1 locally. It does preserve order. I added the test!

@guillesd guillesd force-pushed the feat/async-append branch from e6347fd to 3b38ba6 Compare June 26, 2026 08:59
statement ok
SET threads=4;

# --- EXECUTOR path: parallel table-scan source must land in source order despite parallel producers + async.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is passing @lnkuiper

@lnkuiper lnkuiper left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants