Skip to content

Commit 69b7bd9

Browse files
YuweiXiaoclaude
andauthored
feat: schema DDL sync — ADD/DROP/RENAME COLUMN, block ALTER TYPE (#39)
## Summary - **Automatic DDL propagation** from source to DuckLake target tables for ADD COLUMN, DROP COLUMN, and RENAME COLUMN - **Detection via RELATION message diffing** — pgoutput sends a RELATION message before the first DML after a schema change; comparing with the cached entry detects column additions, removals, and renames - **Non-blocking DDL barrier queue** — DDL is a `PendingDdl` in a per-table `VecDeque`; the WAL consumer sets the barrier and continues immediately while the flush thread drains old-schema data, applies ALTER TABLE via a short-lived PG connection, then resumes with the new schema. Multiple barriers are queued (not merged) so each batch is processed with the correct column layout. - **OID-based target resolution** — `target_oid` stored in `table_mappings`; `apply_ddl_commands()` resolves the current target name from `pg_class` so the pipeline survives user-initiated target table renames - **Source rename does NOT rename target** — source table renames update `source_schema`/`source_table` metadata only; target table name stays unchanged - **ALTER COLUMN TYPE blocked** — same-name-different-OID columns are detected and the table is transitioned to ERRORED immediately (threshold=1) with a resync hint, preventing silent data corruption from stale column types - **Fix: preserve error_message in ERRORED state** — `clear_error_on_success()` now skips tables in ERRORED state so a subsequent successful flush doesn't wipe the diagnostic message - **New types**: `DdlCommand` enum (with `UnsupportedAlterColumnType` variant), `PendingDdl` struct - **Metadata helpers**: `get_column_type()`, `update_source_name()`, `update_target_oid()`, `pg_oid_to_type_name()` ## Test plan - [x] `make check-regression TEST=ddl_sync` — ADD COLUMN, DROP COLUMN, RENAME COLUMN propagate correctly; multi-DDL barrier queuing works; ALTER COLUMN TYPE errors the pipeline - [x] `make check-regression TEST=rename_table` — source rename does NOT rename target, metadata updated, UPDATE/DELETE work after rename - [x] `make installcheck` — all 42 tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent aab32d2 commit 69b7bd9

14 files changed

Lines changed: 997 additions & 60 deletions

File tree

PROGRESS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
- [ ] Mixed DML replication lag 50-100x append — WAL amplification + Parquet-scan DELETE phase
1717

1818
### Features
19+
- [x] Schema DDL sync — ALTER TABLE ADD/DROP/RENAME COLUMN propagation (OID-based target resolution)
20+
- [ ] TRUNCATE as DDL barrier — route TRUNCATE through the DDL barrier queue (DELETE FROM target), removing drain logic from the WAL consumer side
1921
- [ ] Per-table config JSONB column on `table_mappings` — consolidate `routing_enabled` and future per-table settings into a single `config JSONB` column (like `sync_groups.config`), avoid schema sprawl
20-
- [ ] Schema DDL sync — ALTER TABLE ADD/DROP COLUMN propagation
2122
- [ ] Staged storage — durable delta layer between WAL and DuckLake to decouple CDC from file proliferation
2223
- [ ] Explicit `Value` variants for more PG types (date, timestamp, uuid, numeric, interval, json)
2324
- [x] Sync tables with no PK (append mode; upsert requires PK)

doc/CODE_WALKTHROUGH.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ Iterates over `(lsn, data)` tuples, parsing each pgoutput binary message:
637637

638638
| Message | Action |
639639
|---------|--------|
640-
| `'R'` (RELATION) | Cache schema info in `rel_cache` |
640+
| `'R'` (RELATION) | Cache schema info in `rel_cache`; detect DDL changes via diffing (see §5.1) |
641641
| `'I'` (INSERT) | Decode typed values, resolve mapping, skip if disabled/ERRORED/CATCHUP, push to queue |
642642
| `'U'` (UPDATE) | Decode old+new, if TOAST unchanged → `Update` change, else → `Delete` + `Insert` |
643643
| `'D'` (DELETE) | Decode old key values, push `Delete` change |
@@ -663,6 +663,28 @@ After the message loop:
663663
- TRUNCATE: uses `drain_and_wait_table()` for per-table synchronous drain before DELETE
664664
- Crash safety: replication slot only advances past what all tables have durably flushed (confirmed_lsn = min(applied_lsn) read from PG)
665665

666+
### 5.1. Schema DDL Sync
667+
668+
Schema changes (ADD/DROP/RENAME COLUMN) are automatically propagated from source to DuckLake target tables. Source table renames are tracked (metadata updated) but do **not** rename the target — the target table name is stable and independent of the source name.
669+
670+
**Detection** — pgoutput sends a RELATION message before the first DML after a schema change. The `'R'` handler in `process_one_wal_message()` compares the new RELATION with the cached entry via `detect_schema_changes()`:
671+
672+
- **ADD COLUMN**: column names in new but not in old → query `pg_attribute` for type
673+
- **DROP COLUMN**: column names in old but not in new
674+
- **RENAME COLUMN**: same position, different name, same type OID
675+
676+
**Target resolution**`apply_ddl_commands()` resolves the current target table name from `target_oid` via `pg_class` rather than using a stored name string. This means the pipeline survives user-initiated renames of the target table. For old mappings without `target_oid`, it falls back to the name stored in metadata.
677+
678+
**Propagation** — DDL is treated as a non-blocking barrier event in the per-table queue (`PendingDdl`). The WAL consumer sets the barrier and continues immediately; the flush thread handles it autonomously:
679+
680+
1. WAL consumer detects schema diff → calls `coordinator.set_pending_ddl()` with `DdlCommand` list and new `QueueMeta`
681+
2. While barrier is set, `push_change()` routes new-schema changes to `pending_after_ddl`
682+
3. Flush thread drains and flushes old-schema changes from the buffer
683+
4. Flush thread applies `ALTER TABLE` commands to the DuckLake target via a short-lived PG connection (`apply_ddl_commands()`)
684+
5. Flush thread resets `FlushWorker`, merges `pending_after_ddl` into main queue with updated `QueueMeta`, continues normally
685+
686+
The barrier ensures old-schema data is flushed before ALTER TABLE, preventing column mismatch errors or data loss.
687+
666688
---
667689

668690
## 6. duckpipe-pg: The PostgreSQL Extension

duckpipe-core/src/flush_coordinator.rs

Lines changed: 246 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,58 @@ pub struct TableMetrics {
4545

4646
/// Cloneable table schema info used by the flush thread for DuckDB buffer operations.
4747
#[derive(Clone, Debug)]
48-
struct QueueMeta {
49-
target_key: String,
50-
mapping_id: i32,
51-
attnames: Vec<String>,
52-
key_attrs: Vec<usize>,
53-
atttypes: Vec<u32>,
54-
source_label: String,
55-
sync_mode: String,
48+
pub struct QueueMeta {
49+
pub target_key: String,
50+
pub mapping_id: i32,
51+
pub attnames: Vec<String>,
52+
pub key_attrs: Vec<usize>,
53+
pub atttypes: Vec<u32>,
54+
pub source_label: String,
55+
pub sync_mode: String,
56+
}
57+
58+
/// DDL command to apply to the DuckLake target table.
59+
#[derive(Debug, Clone)]
60+
pub enum DdlCommand {
61+
AddColumn {
62+
col_name: String,
63+
col_type: String,
64+
},
65+
DropColumn {
66+
col_name: String,
67+
},
68+
RenameColumn {
69+
old_name: String,
70+
new_name: String,
71+
},
72+
UnsupportedAlterColumnType {
73+
col_name: String,
74+
old_type: String,
75+
new_type: String,
76+
},
77+
}
78+
79+
/// DDL barrier: set by the WAL consumer when a schema change is detected.
80+
/// The flush thread drains old-schema changes, flushes, applies DDL, then continues.
81+
pub struct PendingDdl {
82+
pub commands: Vec<DdlCommand>,
83+
pub new_meta: QueueMeta,
84+
/// Target table OID for resolving current name via pg_class.
85+
pub target_oid: i64,
86+
/// Changes received after this DDL but before the next DDL (or present).
87+
/// Populated by `push_change()` routing and by `set_pending_ddl()` when
88+
/// a subsequent barrier arrives.
89+
pub post_changes: Vec<Change>,
5690
}
5791

5892
/// Shared queue data protected by Mutex.
5993
struct SharedTableQueue {
6094
meta: QueueMeta,
6195
changes: Vec<Change>,
6296
last_lsn: u64,
97+
/// DDL barrier queue. When non-empty, push_change routes to the last
98+
/// barrier's `post_changes`. Processed FIFO by the flush thread.
99+
pending_ddls: std::collections::VecDeque<PendingDdl>,
63100
}
64101

65102
/// Arc-shared handle between producer (main thread) and consumer (flush thread).
@@ -455,6 +492,7 @@ impl FlushCoordinator {
455492
meta: meta.clone(),
456493
changes: Vec::new(),
457494
last_lsn: 0,
495+
pending_ddls: std::collections::VecDeque::new(),
458496
}),
459497
condvar: Condvar::new(),
460498
});
@@ -525,13 +563,20 @@ impl FlushCoordinator {
525563
///
526564
/// The flush thread wakes on its own via `drain_poll_ms` condvar timeout,
527565
/// so no explicit notification is needed after pushing changes.
566+
///
567+
/// When a DDL barrier is set, changes are routed to the last barrier's
568+
/// `post_changes` so the flush thread can drain old-schema changes first.
528569
pub fn push_change(&self, mapping_id: i32, change: Change) {
529570
if let Some(entry) = self.threads.get(&mapping_id) {
530571
let mut guard = entry.queue_handle.inner.lock().unwrap();
531572
if change.lsn > guard.last_lsn {
532573
guard.last_lsn = change.lsn;
533574
}
534-
guard.changes.push(change);
575+
if let Some(last_ddl) = guard.pending_ddls.back_mut() {
576+
last_ddl.post_changes.push(change);
577+
} else {
578+
guard.changes.push(change);
579+
}
535580
self.backpressure
536581
.total_queued
537582
.fetch_add(1, Ordering::Relaxed);
@@ -543,6 +588,22 @@ impl FlushCoordinator {
543588
}
544589
}
545590

591+
/// Set a DDL barrier on the queue for a target table.
592+
///
593+
/// The WAL consumer calls this when it detects a schema change from a RELATION message.
594+
/// New changes after this call are routed to the latest barrier's `post_changes`.
595+
/// The flush thread processes barriers FIFO: drain old-schema changes, flush, apply
596+
/// DDL, move post_changes to main queue, reset worker, then check for next barrier.
597+
///
598+
/// Multiple barriers are queued (not merged) so that each batch of post_changes is
599+
/// processed with the correct schema after its corresponding DDL is applied.
600+
pub fn set_pending_ddl(&self, mapping_id: i32, ddl: PendingDdl) {
601+
if let Some(entry) = self.threads.get(&mapping_id) {
602+
let mut guard = entry.queue_handle.inner.lock().unwrap();
603+
guard.pending_ddls.push_back(ddl);
604+
}
605+
}
606+
546607
/// Check if backpressure should pause WAL consumption.
547608
/// Excludes changes queued for paused (SNAPSHOT) tables so that buffered
548609
/// snapshot WAL changes don't block streaming for other tables.
@@ -692,6 +753,11 @@ impl FlushCoordinator {
692753
let shared = {
693754
let guard = entry.queue_handle.inner.lock().unwrap();
694755
guard.changes.len() as i64
756+
+ guard
757+
.pending_ddls
758+
.iter()
759+
.map(|d| d.post_changes.len() as i64)
760+
.sum::<i64>()
695761
};
696762
let local = entry.pending_local.load(Ordering::Relaxed);
697763
let abandoned = shared + local;
@@ -980,8 +1046,9 @@ fn flush_thread_main(
9801046
{
9811047
let mut guard = queue_handle.inner.lock().unwrap();
9821048

983-
// Wait with timeout for new changes, drain request, or shutdown
1049+
// Wait with timeout for new changes, DDL barrier, drain request, or shutdown
9841050
if guard.changes.is_empty()
1051+
&& guard.pending_ddls.is_empty()
9851052
&& !control.shutdown.load(Ordering::Acquire)
9861053
&& !control.drain_requested.load(Ordering::Acquire)
9871054
&& wait_timeout > Duration::ZERO
@@ -1126,6 +1193,89 @@ fn flush_thread_main(
11261193
}
11271194
}
11281195
// Local Vec<Change> is dropped here — Rust memory freed.
1196+
} else if !guard.pending_ddls.is_empty() {
1197+
// All old-schema changes drained, DDL barrier(s) present.
1198+
// Process the first barrier only — its post_changes become the
1199+
// new main queue. If more barriers remain, they'll be processed
1200+
// on subsequent iterations (each with its own schema).
1201+
let PendingDdl {
1202+
commands,
1203+
new_meta,
1204+
target_oid,
1205+
post_changes,
1206+
} = guard.pending_ddls.pop_front().unwrap();
1207+
guard.changes = post_changes;
1208+
guard.meta = new_meta.clone();
1209+
drop(guard);
1210+
1211+
// 1. Flush old-schema buffer if any data is buffered
1212+
if buffered_count > 0 {
1213+
if let Some(meta) = buffered_meta.as_ref() {
1214+
do_flush_buffer(
1215+
&mut worker,
1216+
buffered_count,
1217+
buffered_bytes,
1218+
buffered_lsn,
1219+
meta,
1220+
pg_connstr,
1221+
group_name,
1222+
&result_tx,
1223+
&rt,
1224+
);
1225+
}
1226+
backpressure
1227+
.total_queued
1228+
.fetch_sub(buffered_count, Ordering::Relaxed);
1229+
pending_local.store(0, Ordering::Relaxed);
1230+
}
1231+
1232+
// 2. Apply DDL via pg_ducklake ALTER TABLE.
1233+
// The DDL must go through PG so that both the PG catalog
1234+
// (pg_attribute) and the DuckLake catalog are updated.
1235+
//
1236+
worker = None; // DETACH DuckLake before PG DDL
1237+
if let Err(e) = rt.block_on(apply_ddl_commands(
1238+
&commands, target_oid, pg_connstr, group_name,
1239+
)) {
1240+
tracing::error!(
1241+
"pg_duckpipe: DDL sync failed for {}: {}",
1242+
new_meta.target_key,
1243+
e
1244+
);
1245+
let error_msg = format!("DDL sync failed: {}", e);
1246+
// For unsupported DDL (e.g. ALTER COLUMN TYPE), transition to
1247+
// ERRORED immediately (threshold=1) instead of waiting for 3 failures.
1248+
let threshold = if commands
1249+
.iter()
1250+
.any(|c| matches!(c, DdlCommand::UnsupportedAlterColumnType { .. }))
1251+
{
1252+
1
1253+
} else {
1254+
ERRORED_THRESHOLD
1255+
};
1256+
let _ = result_tx.send(FlushThreadResult::Error {
1257+
target_key: new_meta.target_key.clone(),
1258+
mapping_id: new_meta.mapping_id,
1259+
error: error_msg.clone(),
1260+
});
1261+
let _ = rt.block_on(flush_worker::update_error_state(
1262+
pg_connstr,
1263+
new_meta.mapping_id,
1264+
&error_msg,
1265+
threshold,
1266+
group_name,
1267+
));
1268+
}
1269+
1270+
// 3. Reset FlushWorker (forces re-discovery of lake_info)
1271+
worker = None;
1272+
buffered_meta = None;
1273+
buffered_count = 0;
1274+
buffered_bytes = 0;
1275+
next_seq = 0;
1276+
buffered_lsn = 0;
1277+
last_flush = Instant::now();
1278+
continue; // process new-schema changes on next iteration
11291279
} else {
11301280
drop(guard);
11311281
}
@@ -1329,3 +1479,89 @@ fn signal_drain_complete(
13291479
control.drain_requested.store(false, Ordering::Release);
13301480
cvar.notify_one();
13311481
}
1482+
1483+
/// Apply DDL commands to the DuckLake target table via a short-lived PG connection.
1484+
///
1485+
/// Resolves the current target table name from `target_oid` via `pg_class`, so the
1486+
/// pipeline survives user-initiated renames of the target table.
1487+
async fn apply_ddl_commands(
1488+
commands: &[DdlCommand],
1489+
target_oid: i64,
1490+
pg_connstr: &str,
1491+
group_name: &str,
1492+
) -> Result<(), String> {
1493+
let app_name = crate::connstr::app_name(group_name, "ddl");
1494+
let (client, conn_handle) = crate::connstr::pg_connect_with_app_name(pg_connstr, &app_name)
1495+
.await
1496+
.map_err(|e| format!("DDL connect: {}", e))?;
1497+
1498+
// Resolve current target table name from OID
1499+
let rows = client
1500+
.query(
1501+
"SELECT n.nspname, c.relname FROM pg_class c \
1502+
JOIN pg_namespace n ON n.oid = c.relnamespace \
1503+
WHERE c.oid = $1::bigint::oid",
1504+
&[&target_oid],
1505+
)
1506+
.await
1507+
.map_err(|e| format!("OID resolve: {}", e))?;
1508+
if rows.is_empty() {
1509+
return Err(format!("target OID {} not found in pg_class", target_oid));
1510+
}
1511+
let schema: String = rows[0].get(0);
1512+
let table: String = rows[0].get(1);
1513+
let target_ref = format!(
1514+
"\"{}\".\"{}\"",
1515+
schema.replace('"', "\"\""),
1516+
table.replace('"', "\"\""),
1517+
);
1518+
1519+
for cmd in commands {
1520+
let sql = match cmd {
1521+
DdlCommand::UnsupportedAlterColumnType {
1522+
col_name,
1523+
old_type,
1524+
new_type,
1525+
} => {
1526+
return Err(format!(
1527+
"ALTER COLUMN TYPE on \"{}\" ({} → {}) is not supported by DDL sync. \
1528+
Run SELECT duckpipe.resync('schema.table') to re-snapshot the table.",
1529+
col_name, old_type, new_type
1530+
));
1531+
}
1532+
DdlCommand::AddColumn { col_name, col_type } => {
1533+
format!(
1534+
"ALTER TABLE {} ADD COLUMN \"{}\" {}",
1535+
target_ref,
1536+
col_name.replace('"', "\"\""),
1537+
col_type,
1538+
)
1539+
}
1540+
DdlCommand::DropColumn { col_name } => {
1541+
format!(
1542+
"ALTER TABLE {} DROP COLUMN \"{}\"",
1543+
target_ref,
1544+
col_name.replace('"', "\"\""),
1545+
)
1546+
}
1547+
DdlCommand::RenameColumn { old_name, new_name } => {
1548+
format!(
1549+
"ALTER TABLE {} RENAME COLUMN \"{}\" TO \"{}\"",
1550+
target_ref,
1551+
old_name.replace('"', "\"\""),
1552+
new_name.replace('"', "\"\""),
1553+
)
1554+
}
1555+
};
1556+
tracing::info!("pg_duckpipe: DDL sync (PG): {}", sql);
1557+
client
1558+
.execute(&sql, &[])
1559+
.await
1560+
.map_err(|e| format!("DDL exec '{}': {}", sql, e))?;
1561+
}
1562+
1563+
drop(client);
1564+
let _ = conn_handle.await;
1565+
1566+
Ok(())
1567+
}

duckpipe-core/src/flush_worker.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub async fn update_error_state(
7373
}
7474

7575
/// Clear error state on successful flush: reset consecutive_failures and error_message.
76+
/// Skips tables in ERRORED state — those retain their error_message for user visibility
77+
/// and are recovered only through the auto-retry mechanism or manual resync.
7678
pub async fn clear_error_on_success(
7779
connstr: &str,
7880
mapping_id: i32,
@@ -83,9 +85,15 @@ pub async fn clear_error_on_success(
8385
.await
8486
.map_err(|e| format!("clear_error connect: {}", e))?;
8587

86-
let meta = MetadataClient::new(&client);
87-
let _ = meta.clear_consecutive_failures(mapping_id).await;
88-
let _ = meta.record_error_message(mapping_id, "").await;
88+
client
89+
.execute(
90+
"UPDATE duckpipe.table_mappings \
91+
SET consecutive_failures = 0, error_message = '' \
92+
WHERE id = $1 AND state != 'ERRORED'",
93+
&[&mapping_id],
94+
)
95+
.await
96+
.map_err(|e| format!("clear_error: {}", e))?;
8997

9098
drop(client);
9199
let _ = conn_handle.await;

0 commit comments

Comments
 (0)