@@ -13,7 +13,9 @@ use std::time::Instant;
1313
1414use duckdb:: { Config , Connection } ;
1515
16- use crate :: types:: { fixed_bytes_for_oid, Change , ChangeType , ResolvedConfig , Value } ;
16+ use crate :: types:: {
17+ fixed_bytes_for_oid, is_duckpipe_system_column, Change , ChangeType , ResolvedConfig , Value ,
18+ } ;
1719
1820const DUCKLAKE_EXT_FILENAME : & str = "ducklake.duckdb_extension" ;
1921
@@ -180,11 +182,11 @@ fn discover_lake_table_info(
180182
181183 // Build column_types aligned to expected_attnames order.
182184 // The DuckLake catalog columns should match pgoutput attnames.
183- // Filter out _duckpipe_source — it's a system column managed by duckpipe,
184- // not from pgoutput, so it should not appear in the expected alignment.
185+ // Filter out duckpipe system columns — they're managed by duckpipe,
186+ // not from pgoutput, so they should not appear in the expected alignment.
185187 let lake_col_map: std:: collections:: HashMap < String , String > = col_rows
186188 . into_iter ( )
187- . filter ( |( name, _) | name . to_lowercase ( ) != "_duckpipe_source" )
189+ . filter ( |( name, _) | ! is_duckpipe_system_column ( name ) )
188190 . map ( |( name, dtype) | ( name. to_lowercase ( ) , dtype) )
189191 . collect ( ) ;
190192
@@ -290,6 +292,12 @@ pub struct FlushWorker {
290292 /// Injected as a SQL literal into INSERT statements and scopes
291293 /// DELETE operations to this label for fan-in isolation.
292294 source_label : String ,
295+ /// Sync mode: "upsert" (default) or "append" (immutable changelog).
296+ sync_mode : String ,
297+ /// Cached high-water `_duckpipe_lsn` from the target table (append mode only).
298+ /// Queried once on the first flush after restart, then reused across subsequent
299+ /// flushes until all replayed WAL has been consumed. Zero means no dedup needed.
300+ append_dedup_lsn : i64 ,
293301}
294302
295303impl FlushWorker {
@@ -302,6 +310,7 @@ impl FlushWorker {
302310 ducklake_schema : & str ,
303311 resolved_config : & ResolvedConfig ,
304312 source_label : String ,
313+ sync_mode : String ,
305314 ) -> Result < Self , String > {
306315 let db = open_ducklake_connection ( pg_connstr, ducklake_schema) ?;
307316
@@ -328,6 +337,8 @@ impl FlushWorker {
328337 flush_memory_limit,
329338 cached_fixed_row_bytes : 0 ,
330339 source_label,
340+ sync_mode,
341+ append_dedup_lsn : 0 ,
331342 } )
332343 }
333344
@@ -388,6 +399,10 @@ impl FlushWorker {
388399 lake_info. column_types[ i]
389400 ) ) ;
390401 }
402+ // Append mode: store per-change LSN for _duckpipe_lsn metadata
403+ if self . sync_mode == "append" {
404+ buf_cols. push ( "_lsn BIGINT" . to_string ( ) ) ;
405+ }
391406
392407 let create_buf = format ! ( "CREATE TABLE buffer ({})" , buf_cols. join( ", " ) ) ;
393408 self . db
@@ -441,6 +456,7 @@ impl FlushWorker {
441456 let mut seq = seq_start;
442457 let fixed_row_bytes = self . cached_fixed_row_bytes ;
443458 let mut var_total: usize = 0 ;
459+ let is_append = self . sync_mode == "append" ;
444460
445461 {
446462 let mut appender = self
@@ -456,8 +472,9 @@ impl FlushWorker {
456472 ChangeType :: Delete => 2 ,
457473 } ;
458474
459- // +2 = _seq, _op_type
460- let mut row: Vec < Box < dyn duckdb:: ToSql > > = Vec :: with_capacity ( 2 + ncols) ;
475+ // +2 = _seq, _op_type; +1 more for _lsn in append mode
476+ let extra = if is_append { 3 } else { 2 } ;
477+ let mut row: Vec < Box < dyn duckdb:: ToSql > > = Vec :: with_capacity ( extra + ncols) ;
461478 row. push ( Box :: new ( seq) ) ;
462479 row. push ( Box :: new ( op_type) ) ;
463480
@@ -482,6 +499,11 @@ impl FlushWorker {
482499 }
483500 }
484501
502+ // Append mode: store LSN for _duckpipe_lsn metadata
503+ if is_append {
504+ row. push ( Box :: new ( change. lsn as i64 ) ) ;
505+ }
506+
485507 let refs: Vec < & dyn duckdb:: ToSql > = row. iter ( ) . map ( |b| b. as_ref ( ) ) . collect ( ) ;
486508 appender
487509 . append_row ( refs. as_slice ( ) )
@@ -497,8 +519,8 @@ impl FlushWorker {
497519 Ok ( ( seq, batch_bytes) )
498520 }
499521
500- /// Compact the buffer (dedup by PK), apply DELETE+INSERT to DuckLake in a
501- /// transaction, then drop the buffer table .
522+ /// Flush the buffer to DuckLake. Dispatches to the appropriate strategy
523+ /// based on `sync_mode`: append (immutable changelog) or upsert (compact + replace) .
502524 ///
503525 /// Returns the flush result with timing and memory metrics.
504526 pub fn flush_buffer (
@@ -521,28 +543,149 @@ impl FlushWorker {
521543
522544 let flush_start = Instant :: now ( ) ;
523545
524- let target_table = self
525- . target_table
526- . as_ref ( )
527- . ok_or ( "target_table not cached — ensure_buffer should have been called" ) ?;
528- let lake_info = self . lake_info . as_ref ( ) . ok_or_else ( || {
529- "lake_info not available — ensure_buffer should have been called" . to_string ( )
546+ // Raise memory limit for flush phase (compaction + DuckLake writes)
547+ self . db
548+ . execute_batch ( & format ! ( "SET memory_limit = '{}'" , self . flush_memory_limit) )
549+ . map_err ( |e| format ! ( "duckdb raise memory_limit: {}" , e) ) ?;
550+
551+ if self . sync_mode == "append" {
552+ self . flush_append ( target_key, applied_count, & flush_start) ?;
553+ } else {
554+ self . flush_upsert ( target_key, applied_count, & flush_start) ?;
555+ }
556+
557+ let memory_bytes = query_memory_usage ( & self . db ) ;
558+ let flush_duration_ms = flush_start. elapsed ( ) . as_millis ( ) as i64 ;
559+
560+ Ok ( DuckDbFlushResult {
561+ target_key : target_key. to_string ( ) ,
562+ mapping_id,
563+ applied_count,
564+ memory_bytes,
565+ flush_duration_ms,
566+ buffered_bytes,
567+ } )
568+ }
569+
570+ /// Append-mode flush: INSERT all buffered rows with metadata columns.
571+ /// No compaction, no DELETE — the target is an immutable changelog.
572+ ///
573+ /// Crash-recovery dedup: on the first flush after worker creation, queries
574+ /// `MAX(_duckpipe_lsn)` from the target and caches it in `append_dedup_lsn`.
575+ /// All subsequent flushes within this worker's lifetime add
576+ /// `WHERE _lsn > {dedup_lsn}` to skip already-committed rows.
577+ ///
578+ /// The WHERE filter is on the small in-memory buffer table (not the target),
579+ /// so the cost is negligible. Workers are dropped after each flush cycle and
580+ /// recreated with a fresh `append_dedup_lsn = 0`, so the MAX query runs at
581+ /// most once per flush cycle. In steady state (no crash), `MAX()` returns
582+ /// a value below all incoming LSNs, so the WHERE clause filters nothing.
583+ fn flush_append (
584+ & mut self ,
585+ target_key : & str ,
586+ applied_count : i64 ,
587+ flush_start : & Instant ,
588+ ) -> Result < ( ) , String > {
589+ let ( target_ref, source_literal, all_cols) = self . flush_refs ( ) ?;
590+
591+ // Query the target's high-water LSN once and cache it for this worker's
592+ // lifetime. Reused across multiple flushes if the worker survives
593+ // (e.g., drain_poll batching splits a large backlog into chunks).
594+ if self . append_dedup_lsn == 0 && self . may_have_conflicts {
595+ let sql = format ! (
596+ "SELECT COALESCE(MAX(\" _duckpipe_lsn\" ), 0) FROM {} \
597+ WHERE \" _duckpipe_source\" = {}",
598+ target_ref, source_literal
599+ ) ;
600+ let mut stmt = self
601+ . db
602+ . prepare ( & sql)
603+ . map_err ( |e| format ! ( "dedup max_lsn prepare: {}" , e) ) ?;
604+ self . append_dedup_lsn = stmt
605+ . query_row ( [ ] , |row| row. get ( 0 ) )
606+ . map_err ( |e| format ! ( "dedup max_lsn query: {}" , e) ) ?;
607+ if self . append_dedup_lsn > 0 {
608+ tracing:: info!(
609+ "DuckPipe: append dedup for {} — high-water _lsn = {} (crash recovery)" ,
610+ target_key,
611+ self . append_dedup_lsn
612+ ) ;
613+ }
614+ }
615+
616+ let where_clause = if self . append_dedup_lsn > 0 {
617+ format ! ( " WHERE _lsn > {}" , self . append_dedup_lsn)
618+ } else {
619+ String :: new ( )
620+ } ;
621+
622+ let t_phase = Instant :: now ( ) ;
623+ self . db
624+ . execute_batch ( "BEGIN" )
625+ . map_err ( |e| format ! ( "duckdb begin: {}" , e) ) ?;
626+ let t_begin_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
627+
628+ let t_phase = Instant :: now ( ) ;
629+ let insert_sql = format ! (
630+ "INSERT INTO {target_ref} ({cols}, \" _duckpipe_source\" , \" _duckpipe_op\" , \" _duckpipe_lsn\" ) \
631+ SELECT {cols}, {source_literal}, \
632+ CASE _op_type WHEN 0 THEN 'I' WHEN 1 THEN 'U' WHEN 2 THEN 'D' ELSE 'I' END, \
633+ _lsn \
634+ FROM buffer{where_clause} ORDER BY _seq",
635+ target_ref = target_ref,
636+ cols = all_cols,
637+ source_literal = source_literal,
638+ where_clause = where_clause
639+ ) ;
640+ self . db . execute_batch ( & insert_sql) . map_err ( |e| {
641+ let _ = self . db . execute_batch ( "ROLLBACK" ) ;
642+ format ! ( "duckdb append insert into {}: {}" , target_key, e)
530643 } ) ?;
644+ let t_insert_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
645+
646+ let t_phase = Instant :: now ( ) ;
647+ self . db
648+ . execute_batch ( "COMMIT" )
649+ . map_err ( |e| format ! ( "duckdb commit: {}" , e) ) ?;
650+ let t_commit_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
651+
652+ let t_phase = Instant :: now ( ) ;
653+ self . db
654+ . execute_batch ( "DROP TABLE IF EXISTS buffer;" )
655+ . map_err ( |e| format ! ( "duckdb cleanup: {}" , e) ) ?;
656+ self . buffer_exists = false ;
657+ self . has_non_inserts = false ;
658+ let t_cleanup_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
659+
660+ tracing:: info!(
661+ "DuckPipe timing: action=duckdb_flush_append target={} rows={} dedup_lsn={} \
662+ begin_ms={:.1} insert_ms={:.1} commit_ms={:.1} cleanup_ms={:.1} total_ms={:.1}",
663+ target_key,
664+ applied_count,
665+ self . append_dedup_lsn,
666+ t_begin_ms,
667+ t_insert_ms,
668+ t_commit_ms,
669+ t_cleanup_ms,
670+ flush_start. elapsed( ) . as_secs_f64( ) * 1000.0 ,
671+ ) ;
672+ Ok ( ( ) )
673+ }
674+
675+ /// Upsert-mode flush: compact by PK (dedup), DELETE matching rows, INSERT latest values.
676+ fn flush_upsert (
677+ & mut self ,
678+ target_key : & str ,
679+ applied_count : i64 ,
680+ flush_start : & Instant ,
681+ ) -> Result < ( ) , String > {
682+ let ( target_ref, source_literal, all_cols) = self . flush_refs ( ) ?;
531683 let pk_cols = self
532684 . cached_pk_cols
533685 . as_ref ( )
534686 . ok_or ( "pk_cols not cached — ensure_buffer should have been called" ) ?;
535- let all_cols = self
536- . cached_all_cols
537- . as_ref ( )
538- . ok_or ( "all_cols not cached — ensure_buffer should have been called" ) ?;
539687 let has_non_inserts = self . has_non_inserts ;
540688
541- // Raise memory limit for flush phase (compaction + DuckLake writes)
542- self . db
543- . execute_batch ( & format ! ( "SET memory_limit = '{}'" , self . flush_memory_limit) )
544- . map_err ( |e| format ! ( "duckdb raise memory_limit: {}" , e) ) ?;
545-
546689 // Step 1: Compact — deduplicate by PK, keep last operation (highest seq).
547690 let t_phase = Instant :: now ( ) ;
548691 let compact_sql = format ! (
@@ -558,28 +701,19 @@ impl FlushWorker {
558701 . map_err ( |e| format ! ( "duckdb compact: {}" , e) ) ?;
559702 let t_compact_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
560703
561- // Step 2: Apply changes to DuckLake target
562- let target_ref = format ! (
563- "lake.\" {}\" .\" {}\" " ,
564- lake_info. lake_schema. replace( '"' , "\" \" " ) ,
565- target_table. replace( '"' , "\" \" " )
566- ) ;
567-
568- // Wrap DELETE+INSERT in a transaction for atomicity.
704+ // Step 2: DELETE+INSERT in a transaction for atomicity.
569705 let t_phase = Instant :: now ( ) ;
570706 self . db
571707 . execute_batch ( "BEGIN" )
572708 . map_err ( |e| format ! ( "duckdb begin: {}" , e) ) ?;
573709 let t_begin_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
574710
575- // Step 2b: DELETE
576711 let skip_delete = !has_non_inserts && !self . may_have_conflicts ;
577712
578713 let pk_where: Vec < String > = pk_cols
579714 . iter ( )
580715 . map ( |c| format ! ( "{target_ref}.{c} = compacted.{c}" ) )
581716 . collect ( ) ;
582- // Scope DELETE to only rows from this source (fan-in safe).
583717 let source_scope = format ! (
584718 " AND {target_ref}.\" _duckpipe_source\" = '{}'" ,
585719 self . source_label. replace( '\'' , "''" )
@@ -608,14 +742,13 @@ impl FlushWorker {
608742 self . may_have_conflicts = false ;
609743 }
610744
611- // Step 2c : INSERT — inject _duckpipe_source as a literal (not stored in buffer)
745+ // Step 3 : INSERT non-delete rows with _duckpipe_source
612746 let t_phase = Instant :: now ( ) ;
613- let source_literal = format ! ( "'{}'" , self . source_label. replace( '\'' , "''" ) ) ;
614747 let insert_sql = format ! (
615748 "INSERT INTO {target_ref} ({cols}, \" _duckpipe_source\" ) \
616749 SELECT {cols}, {source_literal} FROM compacted WHERE _op_type IN (0, 1)",
617750 target_ref = target_ref,
618- cols = all_cols. join ( ", " ) ,
751+ cols = all_cols,
619752 source_literal = source_literal
620753 ) ;
621754 self . db . execute_batch ( & insert_sql) . map_err ( |e| {
@@ -630,7 +763,6 @@ impl FlushWorker {
630763 . map_err ( |e| format ! ( "duckdb commit: {}" , e) ) ?;
631764 let t_commit_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
632765
633- // Cleanup
634766 let t_phase = Instant :: now ( ) ;
635767 self . db
636768 . execute_batch ( "DROP TABLE IF EXISTS compacted; DROP TABLE IF EXISTS buffer;" )
@@ -639,9 +771,6 @@ impl FlushWorker {
639771 self . has_non_inserts = false ;
640772 let t_cleanup_ms = t_phase. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
641773
642- // No need to restore buffer-phase memory limit — the worker is dropped
643- // after each flush cycle and recreated with buffer_memory_limit on next use.
644-
645774 tracing:: debug!(
646775 "DuckPipe perf: action=duckdb_flush target={} rows={} \
647776 compact_ms={:.1} begin_ms={:.1} delete_ms={:.1} insert_ms={:.1} \
@@ -667,18 +796,33 @@ impl FlushWorker {
667796 self . may_have_conflicts,
668797 flush_start. elapsed( ) . as_secs_f64( ) * 1000.0 ,
669798 ) ;
799+ Ok ( ( ) )
800+ }
670801
671- let memory_bytes = query_memory_usage ( & self . db ) ;
672- let flush_duration_ms = flush_start. elapsed ( ) . as_millis ( ) as i64 ;
802+ /// Extract shared references needed by both flush paths:
803+ /// `(target_ref, source_literal, all_cols_joined)`.
804+ fn flush_refs ( & self ) -> Result < ( String , String , String ) , String > {
805+ let target_table = self
806+ . target_table
807+ . as_ref ( )
808+ . ok_or ( "target_table not cached — ensure_buffer should have been called" ) ?;
809+ let lake_info = self . lake_info . as_ref ( ) . ok_or_else ( || {
810+ "lake_info not available — ensure_buffer should have been called" . to_string ( )
811+ } ) ?;
812+ let all_cols = self
813+ . cached_all_cols
814+ . as_ref ( )
815+ . ok_or ( "all_cols not cached — ensure_buffer should have been called" ) ?;
673816
674- Ok ( DuckDbFlushResult {
675- target_key : target_key. to_string ( ) ,
676- mapping_id,
677- applied_count,
678- memory_bytes,
679- flush_duration_ms,
680- buffered_bytes,
681- } )
817+ let target_ref = format ! (
818+ "lake.\" {}\" .\" {}\" " ,
819+ lake_info. lake_schema. replace( '"' , "\" \" " ) ,
820+ target_table. replace( '"' , "\" \" " )
821+ ) ;
822+ let source_literal = format ! ( "'{}'" , self . source_label. replace( '\'' , "''" ) ) ;
823+ let all_cols_joined = all_cols. join ( ", " ) ;
824+
825+ Ok ( ( target_ref, source_literal, all_cols_joined) )
682826 }
683827
684828 /// Drop the buffer table if it exists (used on shutdown/error).
0 commit comments