1+ <div align =" center " >
2+
13# pg_duckpipe
24
3- PostgreSQL extension and daemon for CDC synchronization from heap tables (row store) to DuckLake tables (column store), enabling HTAP in one system.
5+ PostgreSQL extension for real-time CDC to pg_ducklake
46
5- ## Overview
7+ [ ![ dockerhub] ( https://img.shields.io/docker/pulls/pgducklake/pgduckpipe?logo=docker )] ( https://hub.docker.com/r/pgducklake/pgduckpipe )
8+ [ ![ License] ( https://img.shields.io/badge/License-MIT-blue )] ( https://github.com/YuweiXIAO/pg_duckpipe/blob/main/LICENSE )
69
7- ` pg_duckpipe ` tails PostgreSQL WAL via logical replication ( ` pgoutput ` ), decodes row changes, and applies them to DuckLake targets managed by ` pg_duckdb ` .
10+ </ div >
811
9- ```
10- Heap table writes -> WAL -> replication slot (pgoutput) -> duckpipe worker -> DuckLake table
11- ```
12+ ## Overview
1213
13- The project is a Rust workspace:
14+ ` pg_duckpipe ` brings real-time change data capture (CDC) to PostgreSQL, enabling HTAP by continuously syncing your row tables to [ DuckLake ] ( https://github.com/relytcloud/pg_ducklake/ ) columnar tables. Run transactional and analytical workloads in a single database.
1415
15- - ` duckpipe-pg ` : ` pgrx ` extension (` CREATE EXTENSION pg_duckpipe ` )
16- - ` duckpipe-core ` : shared CDC engine
17- - ` duckpipe-daemon ` : standalone binary using the same core engine
16+ ![ pg_duckpipe architecture] ( images/arch.png )
1817
1918## Key Features
2019
21- - Streaming replication path (` START_REPLICATION ` ) with crash-safe slot advancement
22- - Per-table state machine: ` SNAPSHOT -> CATCHUP -> STREAMING ` (+ ` ERRORED ` )
23- - Sync groups: multiple tables share one publication + one replication slot
24- - Automatic target creation (` USING ducklake ` ) via ` duckpipe.add_table(...) `
25- - TOAST-unchanged UPDATE preservation (no NULL corruption on unchanged TOAST columns)
26- - Per-table error isolation with exponential backoff auto-retry
27- - Backpressure when queued changes exceed a configured threshold
28- - Rename safety via source-table OID tracking
29- - Dynamic background worker auto-start on ` add_table() `
30-
31- ## Quick Start (Extension Mode)
32-
33- ``` sql
34- -- Install extension (requires pg_duckdb)
35- CREATE EXTENSION pg_duckpipe CASCADE;
20+ - ** One-command setup** : ` duckpipe.add_table() ` creates the columnar target and starts syncing automatically
21+ - ** Real-time CDC** : streams WAL changes continuously with ~ 5 s default flush latency (tunable)
22+ - ** Sync groups** : group multiple tables to share a single publication and replication slot
3623
37- -- Source table must have a primary key
38- CREATE TABLE public .orders (
39- id BIGSERIAL PRIMARY KEY ,
40- customer_id BIGINT NOT NULL ,
41- amount NUMERIC NOT NULL
42- );
24+ ## Quick Start
4325
44- -- Add table to sync (auto-creates public.orders_ducklake, starts worker if needed)
45- SELECT duckpipe .add_table (' public.orders' );
46-
47- -- Write OLTP data
48- INSERT INTO public .orders (customer_id, amount) VALUES (101 , 42 .50 );
26+ ``` bash
27+ # Start PostgreSQL with pg_duckpipe (includes pg_duckdb + pg_ducklake)
28+ docker run -d --name duckpipe -p 5432:5432 -e POSTGRES_PASSWORD=duckdb pgducklake/pgduckpipe:18-main
4929
50- -- Observe sync state
51- SELECT source_table, state, rows_synced, applied_lsn
52- FROM duckpipe .status ();
30+ # Connect
31+ psql -h localhost -U postgres
5332```
5433
55- ## SQL API
56-
57- ### Sync groups
58-
5934``` sql
60- duckpipe .create_group (name, [publication], [slot_name]) - > text
61- duckpipe .drop_group (name, [drop_slot])
62- duckpipe .enable_group (name)
63- duckpipe .disable_group (name)
64- ```
35+ CREATE TABLE orders (id BIGSERIAL PRIMARY KEY , customer_id BIGINT , amount NUMERIC );
6536
66- ### Table management
37+ SELECT duckpipe . add_table ( ' public.orders ' ); -- start CDC to DuckLake
6738
68- ``` sql
69- duckpipe .add_table (source_table, [target_table], [sync_group], [copy_data])
70- duckpipe .remove_table (source_table, [drop_target])
71- duckpipe .move_table (source_table, new_group)
72- duckpipe .resync_table (source_table)
73- ```
39+ INSERT INTO orders(customer_id, amount) VALUES (101 , 42 .50 ), (102 , 99 .00 );
7440
75- ### Worker control
76-
77- ``` sql
78- duckpipe .start_worker ()
79- duckpipe .stop_worker ()
41+ SELECT * FROM orders_ducklake; -- query the columnar copy
42+ SELECT source_table, state, rows_synced FROM duckpipe .status ();
8043```
8144
82- ### Monitoring
83-
84- ``` sql
85- duckpipe .groups ()
86- duckpipe .tables ()
87- duckpipe .status ()
88- duckpipe .worker_status ()
89- ```
90-
91- ` duckpipe.status() ` includes per-table ` state ` , ` queued_changes ` , ` error_message ` ,
92- ` consecutive_failures ` , ` retry_at ` , and ` applied_lsn ` .
93-
94- ## Configuration (GUCs)
95-
96- | GUC | Default | Context | Notes |
97- | -----| ---------| ---------| -------|
98- | ` duckpipe.poll_interval ` | ` 1000 ` | ` SIGHUP ` | Poll interval in ms (100..3600000) |
99- | ` duckpipe.batch_size_per_group ` | ` 100000 ` | ` SIGHUP ` | Max WAL messages per group per cycle |
100- | ` duckpipe.enabled ` | ` on ` | ` SIGHUP ` | Enable/disable worker loop |
101- | ` duckpipe.debug_log ` | ` off ` | ` SIGHUP ` | Emit critical-path timing logs |
102- | ` duckpipe.flush_interval ` | ` 1000 ` | ` SIGHUP ` | Flush interval in ms (100..60000) |
103- | ` duckpipe.flush_batch_threshold ` | ` 10000 ` | ` SIGHUP ` | Queue size that triggers immediate flush |
104- | ` duckpipe.max_queued_changes ` | ` 500000 ` | ` SIGHUP ` | Backpressure threshold |
105- | ` duckpipe.data_inlining_row_limit ` | ` 0 ` | ` USERSET ` | DuckLake data inlining row limit |
106-
107- Example:
108-
109- ``` sql
110- ALTER SYSTEM SET duckpipe .flush_interval = 200 ;
111- ALTER SYSTEM SET duckpipe .flush_batch_threshold = 1000 ;
112- SELECT pg_reload_conf();
113- ```
45+ > Building from source? ` CREATE EXTENSION pg_duckpipe CASCADE; ` first, then the same SQL above.
11446
11547## Requirements
11648
117- - PostgreSQL 14+
118- - ` pg_duckdb ` extension available
119- - Source tables require a ` PRIMARY KEY `
120- - Logical replication enabled (` wal_level=logical ` , slots/senders configured)
49+ > The Docker image ships everything preconfigured. The notes below apply when installing from source.
50+
51+ - ** PostgreSQL 18** — currently the only tested version; older versions may work but are unsupported
52+ - ** pg_ducklake** (which bundles pg_duckdb and libduckdb) must be installed
53+ - ` wal_level = logical ` with sufficient ` max_replication_slots ` and ` max_wal_senders `
54+ - Both extensions must be preloaded: ` shared_preload_libraries = 'pg_duckdb, pg_duckpipe' `
55+ - Source tables must have a ** PRIMARY KEY** (required by logical replication to identify rows)
12156
122- ## Build
57+ ## Build (From Source)
12358
12459``` bash
12560make
@@ -129,18 +64,17 @@ make install
12964## Test
13065
13166``` bash
132- make installcheck
133- make check-regression TEST=api
67+ make installcheck # Build + install + run all regression tests
68+ make check-regression TEST=api # Run a single test
13469```
13570
136- Current regression schedule contains 20 tests under ` test/regression/ ` .
137-
13871## Documentation
13972
140- - [ doc/USAGE.md] ( doc/USAGE.md ) : SQL usage and operations guide
141- - [ doc/CODE_WALKTHROUGH.md] ( doc/CODE_WALKTHROUGH.md ) : source-level architecture walkthrough
142- - [ doc/DESIGN_V2.md] ( doc/DESIGN_V2.md ) : historical v2 design notes
143- - [ benchmark/README.md] ( benchmark/README.md ) : benchmark harness
73+ | Doc | Description |
74+ | -----| -------------|
75+ | [ doc/USAGE.md] ( doc/USAGE.md ) | SQL usage, monitoring, ** configuration (GUCs)** , and tuning |
76+ | [ doc/DESIGN_V2.md] ( doc/DESIGN_V2.md ) | Historical v2 design notes |
77+ | [ benchmark/README.md] ( benchmark/README.md ) | Benchmark harness |
14478
14579## License
14680
0 commit comments