Skip to content

Commit 5dfec37

Browse files
YuweiXiaoclaude
andauthored
refactor: non-blocking TRUNCATE via barrier queue (#61)
## Summary - TRUNCATE now uses the same non-blocking `PendingDdl` barrier queue as DDL sync (ADD/DROP/RENAME COLUMN), instead of blocking the WAL consumer for up to 30s via `drain_and_wait_table()` - The flush thread autonomously drains pending changes then executes `DELETE FROM` on the target — the WAL consumer enqueues the barrier and continues immediately - For truncate-only barriers the DuckDB worker stays alive (schema unchanged), avoiding unnecessary reconnection overhead - Removes the now-dead `drain_and_wait_table()` method (shutdown uses the separate `drain_and_wait_all()`) ## Test plan - [x] `make installcheck` — all 42 regression tests pass, including `truncate` and `fan_in` which exercise TRUNCATE propagation, fan-in scoped DELETE, and INSERT-after-TRUNCATE semantics - [x] `cargo check` — clean compilation (no new warnings) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 69b7bd9 commit 5dfec37

3 files changed

Lines changed: 52 additions & 57 deletions

File tree

doc/CODE_WALKTHROUGH.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ Fairness: threads take a ticket on arrival and wait in strict FIFO order — no
517517
- `ensure_queue(target_key, mapping_id, attnames, key_attrs, atttypes)` — create queue + spawn flush thread if new (or respawn if dead); passes `Arc<FlushGate>` to thread
518518
- `push_change(target_key, change)` — lock queue, push change, notify condvar, increment backpressure counter
519519
- `is_backpressured() -> bool` — true when total_queued >= max_queued
520-
- `drain_and_wait_table(target_key)`per-table synchronous drain (used for TRUNCATE)
520+
- `get_meta(mapping_id) -> Option<QueueMeta>`clone current queue metadata (used to build TRUNCATE barriers)
521521
- `drain_and_wait_all() -> Vec<FlushThreadResult>` — synchronous barrier (retained for shutdown only)
522522
- `collect_results() -> Vec<FlushThreadResult>` — non-blocking drain of mpsc result channel
523523
- `set_max_concurrent_flushes(n)` — dynamically update gate limit (for runtime GUC changes)
@@ -643,7 +643,7 @@ Iterates over `(lsn, data)` tuples, parsing each pgoutput binary message:
643643
| `'D'` (DELETE) | Decode old key values, push `Delete` change |
644644
| `'B'` (BEGIN) | Parse but no action (LSN tracked from COMMIT) |
645645
| `'C'` (COMMIT) | Update `group.pending_lsn = end_lsn` |
646-
| `'T'` (TRUNCATE) | Per-table drain (`drain_and_wait_table`), then `DELETE FROM` each target table |
646+
| `'T'` (TRUNCATE) | Enqueue barrier (`DdlCommand::Truncate`); flush thread drains then executes `DELETE FROM` |
647647

648648
After the message loop:
649649
1. `coordinator.collect_results()` — non-blocking drain of flush results for error logging (no synchronous barrier)
@@ -660,7 +660,7 @@ After the message loop:
660660
- On failure: `FlushWorker` is dropped (lazily recreated), flush thread records error in PG, transitions to ERRORED after 3 consecutive failures
661661
- Per-table error isolation: one table failing doesn't block others
662662
- Backpressure: WAL consumer pauses when total queued changes exceed `max_queued_changes`
663-
- TRUNCATE: uses `drain_and_wait_table()` for per-table synchronous drain before DELETE
663+
- TRUNCATE: non-blocking barrier queue (same as DDL); flush thread drains pending changes then executes DELETE
664664
- Crash safety: replication slot only advances past what all tables have durably flushed (confirmed_lsn = min(applied_lsn) read from PG)
665665

666666
### 5.1. Schema DDL Sync

duckpipe-core/src/flush_coordinator.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
//! - Handle PG metadata updates (applied_lsn, metrics, error state) independently
66
//! - Backpressure via AtomicI64 total_queued prevents unbounded memory growth
77
//!
8-
//! `drain_and_wait_all()` is retained for shutdown. `drain_and_wait_table()` is
9-
//! used for TRUNCATE (must flush before DELETE).
8+
//! `drain_and_wait_all()` is retained for shutdown. TRUNCATE uses the same
9+
//! non-blocking barrier queue as DDL (see `DdlCommand::Truncate`).
1010
1111
use std::collections::{HashMap, HashSet};
1212
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
@@ -55,7 +55,7 @@ pub struct QueueMeta {
5555
pub sync_mode: String,
5656
}
5757

58-
/// DDL command to apply to the DuckLake target table.
58+
/// Barrier command: DDL (ADD/DROP/RENAME COLUMN) or TRUNCATE (DELETE by source).
5959
#[derive(Debug, Clone)]
6060
pub enum DdlCommand {
6161
AddColumn {
@@ -74,10 +74,13 @@ pub enum DdlCommand {
7474
old_type: String,
7575
new_type: String,
7676
},
77+
Truncate {
78+
source_label: String,
79+
},
7780
}
7881

79-
/// DDL barrier: set by the WAL consumer when a schema change is detected.
80-
/// The flush thread drains old-schema changes, flushes, applies DDL, then continues.
82+
/// Barrier: set by the WAL consumer when a schema change or TRUNCATE is detected.
83+
/// The flush thread drains old changes, flushes, applies commands, then continues.
8184
pub struct PendingDdl {
8285
pub commands: Vec<DdlCommand>,
8386
pub new_meta: QueueMeta,
@@ -604,6 +607,14 @@ impl FlushCoordinator {
604607
}
605608
}
606609

610+
/// Clone the current QueueMeta for a table (used to construct barriers
611+
/// where the schema is unchanged, e.g. TRUNCATE).
612+
pub fn get_meta(&self, mapping_id: i32) -> Option<QueueMeta> {
613+
self.threads
614+
.get(&mapping_id)
615+
.map(|entry| entry.queue_handle.inner.lock().unwrap().meta.clone())
616+
}
617+
607618
/// Check if backpressure should pause WAL consumption.
608619
/// Excludes changes queued for paused (SNAPSHOT) tables so that buffered
609620
/// snapshot WAL changes don't block streaming for other tables.
@@ -800,28 +811,6 @@ impl FlushCoordinator {
800811
.retain(|id, _| active_mapping_ids.contains(id));
801812
}
802813

803-
/// Per-table synchronous drain: signal one flush thread to drain, wait for completion.
804-
/// Used for TRUNCATE — must flush pending changes before DELETE.
805-
pub fn drain_and_wait_table(&mut self, mapping_id: i32) {
806-
if let Some(entry) = self.threads.get(&mapping_id) {
807-
// Reset drain_complete flag
808-
{
809-
let mut done = entry.drain_complete.0.lock().unwrap();
810-
*done = false;
811-
}
812-
entry.control.drain_requested.store(true, Ordering::Release);
813-
entry.queue_handle.condvar.notify_one();
814-
815-
// Wait for completion
816-
let (lock, cvar) = &*entry.drain_complete;
817-
let guard = lock.lock().unwrap();
818-
let _guard = cvar
819-
.wait_timeout_while(guard, Duration::from_secs(30), |done| !*done)
820-
.unwrap()
821-
.0;
822-
}
823-
}
824-
825814
/// Synchronous barrier: signal all flush threads to drain their queues,
826815
/// wait for all to complete, then collect results.
827816
/// Retained for shutdown.
@@ -1208,7 +1197,7 @@ fn flush_thread_main(
12081197
guard.meta = new_meta.clone();
12091198
drop(guard);
12101199

1211-
// 1. Flush old-schema buffer if any data is buffered
1200+
// 1. Flush old buffer if any data is buffered
12121201
if buffered_count > 0 {
12131202
if let Some(meta) = buffered_meta.as_ref() {
12141203
do_flush_buffer(
@@ -1229,20 +1218,17 @@ fn flush_thread_main(
12291218
pending_local.store(0, Ordering::Relaxed);
12301219
}
12311220

1232-
// 2. Apply DDL via pg_ducklake ALTER TABLE.
1233-
// The DDL must go through PG so that both the PG catalog
1234-
// (pg_attribute) and the DuckLake catalog are updated.
1235-
//
1236-
worker = None; // DETACH DuckLake before PG DDL
1221+
// 2. Apply barrier commands via PG.
1222+
worker = None;
12371223
if let Err(e) = rt.block_on(apply_ddl_commands(
12381224
&commands, target_oid, pg_connstr, group_name,
12391225
)) {
12401226
tracing::error!(
1241-
"pg_duckpipe: DDL sync failed for {}: {}",
1227+
"pg_duckpipe: barrier command failed for {}: {}",
12421228
new_meta.target_key,
12431229
e
12441230
);
1245-
let error_msg = format!("DDL sync failed: {}", e);
1231+
let error_msg = format!("barrier command failed: {}", e);
12461232
// For unsupported DDL (e.g. ALTER COLUMN TYPE), transition to
12471233
// ERRORED immediately (threshold=1) instead of waiting for 3 failures.
12481234
let threshold = if commands
@@ -1275,7 +1261,7 @@ fn flush_thread_main(
12751261
next_seq = 0;
12761262
buffered_lsn = 0;
12771263
last_flush = Instant::now();
1278-
continue; // process new-schema changes on next iteration
1264+
continue; // process post_changes on next iteration
12791265
} else {
12801266
drop(guard);
12811267
}
@@ -1552,12 +1538,19 @@ async fn apply_ddl_commands(
15521538
new_name.replace('"', "\"\""),
15531539
)
15541540
}
1541+
DdlCommand::Truncate { source_label } => {
1542+
format!(
1543+
"DELETE FROM {} WHERE \"_duckpipe_source\" = '{}'",
1544+
target_ref,
1545+
source_label.replace('\'', "''"),
1546+
)
1547+
}
15551548
};
1556-
tracing::info!("pg_duckpipe: DDL sync (PG): {}", sql);
1549+
tracing::info!("pg_duckpipe: barrier exec (PG): {}", sql);
15571550
client
15581551
.execute(&sql, &[])
15591552
.await
1560-
.map_err(|e| format!("DDL exec '{}': {}", sql, e))?;
1553+
.map_err(|e| format!("barrier exec '{}': {}", sql, e))?;
15611554
}
15621555

15631556
drop(client);

duckpipe-core/src/service.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,8 @@ async fn process_one_wal_message(
684684
return Ok(true);
685685
}
686686
'T' => {
687-
// TRUNCATE — flush per-table pending queues, then DELETE all from targets.
687+
// TRUNCATE — enqueue a barrier so the flush thread drains pending
688+
// changes, then executes DELETE FROM on the target (non-blocking).
688689
// Uses DELETE FROM instead of TRUNCATE because DuckLake tables
689690
// (via pg_duckdb) silently ignore TRUNCATE — it succeeds but
690691
// doesn't actually remove rows.
@@ -704,23 +705,24 @@ async fn process_one_wal_message(
704705
mapping.source_schema,
705706
mapping.source_table
706707
);
708+
} else if let Some(meta) = coordinator.get_meta(mapping.id) {
709+
coordinator.set_pending_ddl(
710+
mapping.id,
711+
PendingDdl {
712+
commands: vec![DdlCommand::Truncate {
713+
source_label: mapping.source_label.clone(),
714+
}],
715+
new_meta: meta,
716+
target_oid: mapping.target_oid,
717+
post_changes: Vec::new(),
718+
},
719+
);
707720
} else {
708-
coordinator.drain_and_wait_table(mapping.id);
709-
// Scope DELETE by _duckpipe_source (always set)
710-
let delete_sql = format!(
711-
"DELETE FROM \"{}\".\"{}\" WHERE \"_duckpipe_source\" = '{}'",
712-
mapping.target_schema.replace('"', "\"\""),
713-
mapping.target_table.replace('"', "\"\""),
714-
mapping.source_label.replace('\'', "''")
721+
tracing::warn!(
722+
"pg_duckpipe: TRUNCATE on {}.{} — no flush queue yet, skipping",
723+
mapping.source_schema,
724+
mapping.source_table
715725
);
716-
if let Err(e) = client.execute(&delete_sql, &[]).await {
717-
tracing::error!(
718-
"pg_duckpipe: failed to clear target table {}.{}: {}",
719-
mapping.target_schema,
720-
mapping.target_table,
721-
e
722-
);
723-
}
724726
}
725727
}
726728
}

0 commit comments

Comments
 (0)