Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions duckpipe-core/src/duckdb_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use std::time::Instant;
use duckdb::{Config, Connection};

use crate::types::{
fixed_bytes_for_oid, is_duckpipe_system_column, Change, ChangeType, ResolvedConfig, Value,
fixed_bytes_for_oid, is_duckpipe_system_column, Change, ChangeType, ResolvedConfig, SyncMode,
Value,
};

const DUCKLAKE_EXT_FILENAME: &str = "ducklake.duckdb_extension";
Expand Down Expand Up @@ -296,8 +297,8 @@ pub struct FlushWorker {
/// Injected as a SQL literal into INSERT statements and scopes
/// DELETE operations to this label for fan-in isolation.
source_label: String,
/// Sync mode: "upsert" (default) or "append" (immutable changelog).
sync_mode: String,
/// Sync mode: upsert (default) or append (immutable changelog).
sync_mode: SyncMode,
/// Cached high-water `_duckpipe_lsn` from the target table (append mode only).
/// Queried once on the first flush after restart, then reused across subsequent
/// flushes until all replayed WAL has been consumed. Zero means no dedup needed.
Expand All @@ -319,7 +320,7 @@ impl FlushWorker {
ducklake_schema: &str,
resolved_config: &ResolvedConfig,
source_label: String,
sync_mode: String,
sync_mode: SyncMode,
temp_dir: std::path::PathBuf,
) -> Result<Self, String> {
let db = open_ducklake_connection(pg_connstr, ducklake_schema)?;
Expand Down Expand Up @@ -423,7 +424,7 @@ impl FlushWorker {
)
}))
// Append mode: store per-change LSN for _duckpipe_lsn metadata
.chain((self.sync_mode == "append").then(|| "_lsn BIGINT".to_string()))
.chain((matches!(self.sync_mode, SyncMode::Append)).then(|| "_lsn BIGINT".to_string()))
.collect();

let create_buf = format!("CREATE TABLE buffer ({})", buf_cols.join(", "));
Expand Down Expand Up @@ -478,7 +479,7 @@ impl FlushWorker {
let mut seq = seq_start;
let fixed_row_bytes = self.cached_fixed_row_bytes;
let mut var_total: usize = 0;
let is_append = self.sync_mode == "append";
let is_append = matches!(self.sync_mode, SyncMode::Append);

{
let mut appender = self
Expand All @@ -488,11 +489,7 @@ impl FlushWorker {

for change in changes {
seq += 1;
let op_type: i32 = match change.change_type {
ChangeType::Insert => 0,
ChangeType::Update => 1,
ChangeType::Delete => 2,
};
let op_type = change.change_type.as_i32();

// +2 = _seq, _op_type; +1 more for _lsn in append mode
let extra = if is_append { 3 } else { 2 };
Expand Down Expand Up @@ -570,7 +567,7 @@ impl FlushWorker {
.execute_batch(&format!("SET memory_limit = '{}'", self.flush_memory_limit))
.map_err(|e| format!("duckdb raise memory_limit: {}", e))?;

if self.sync_mode == "append" {
if matches!(self.sync_mode, SyncMode::Append) {
self.flush_append(target_key, applied_count, &flush_start)?;
} else {
self.flush_upsert(target_key, applied_count, &flush_start)?;
Expand Down
8 changes: 4 additions & 4 deletions duckpipe-core/src/flush_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::{Duration, Instant};
use crate::duckdb_flush::FlushWorker;
use crate::flush_worker;
use crate::metadata::ERRORED_THRESHOLD;
use crate::types::{Change, ResolvedConfig, TableConfig};
use crate::types::{Change, ResolvedConfig, SyncMode, TableConfig};

/// In-memory group-level metrics from FlushCoordinator.
#[derive(Clone, Copy, Default)]
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct QueueMeta {
pub key_attrs: Vec<usize>,
pub atttypes: Vec<u32>,
pub source_label: String,
pub sync_mode: String,
pub sync_mode: SyncMode,
}

/// Barrier command: DDL (ADD/DROP/RENAME COLUMN) or TRUNCATE (DELETE by source).
Expand Down Expand Up @@ -450,7 +450,7 @@ impl FlushCoordinator {
atttypes: Vec<u32>,
paused: bool,
source_label: String,
sync_mode: String,
sync_mode: SyncMode,
table_config: &TableConfig,
) {
// Seed in-memory LSN from the persistent PG value if we haven't seen this table yet.
Expand Down Expand Up @@ -1102,7 +1102,7 @@ fn flush_thread_main(
ducklake_schema,
&resolved_config,
meta.source_label.clone(),
meta.sync_mode.clone(),
meta.sync_mode,
temp_dir.clone(),
) {
Ok(w) => worker = Some(w),
Expand Down
30 changes: 19 additions & 11 deletions duckpipe-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ use std::collections::HashMap;

use tokio_postgres::{Client, Row};

use crate::state::SyncState;
use crate::types::{
format_lsn, parse_lsn, GroupConfig, GroupMode, SyncGroup, TableConfig, TableMapping,
format_lsn, parse_lsn, GroupConfig, GroupMode, SyncGroup, SyncMode, TableConfig, TableMapping,
};

/// Parse a `tokio_postgres::Row` from the standard 14-column TableMapping SELECT into a `TableMapping`.
/// Column order: id, source_schema, source_table, target_schema, target_table,
/// state, snapshot_lsn::text, enabled, source_oid, error_message,
/// applied_lsn::text, source_label, sync_mode, config::text
fn table_mapping_from_row(row: &Row) -> TableMapping {
let state_str: String = row.get(5);
let sync_mode_str: Option<String> = row.get(13);
TableMapping {
id: row.get(0),
source_schema: row.get(1),
source_table: row.get(2),
target_schema: row.get(3),
target_table: row.get(4),
state: row.get(5),
state: SyncState::from_str(&state_str).unwrap_or(SyncState::Pending),
snapshot_lsn: row
.get::<_, Option<String>>(6)
.map(|s| parse_lsn(&s))
Expand All @@ -33,9 +36,9 @@ fn table_mapping_from_row(row: &Row) -> TableMapping {
.unwrap_or(0),
target_oid: row.get::<_, Option<i64>>(11).unwrap_or(0),
source_label: row.get::<_, Option<String>>(12).unwrap_or_default(),
sync_mode: row
.get::<_, Option<String>>(13)
.unwrap_or_else(|| "upsert".to_string()),
sync_mode: sync_mode_str
.and_then(|s| s.parse().ok())
.unwrap_or(SyncMode::Upsert),
config: row
.get::<_, Option<String>>(14)
.and_then(|s| TableConfig::from_json_str(&s).ok())
Expand Down Expand Up @@ -328,7 +331,7 @@ impl<'a> MetadataClient<'a> {
pub async fn retry_errored_table(
&self,
mapping_id: i32,
) -> Result<Option<String>, tokio_postgres::Error> {
) -> Result<Option<SyncState>, tokio_postgres::Error> {
let rows = self
.client
.query(
Expand All @@ -340,7 +343,10 @@ impl<'a> MetadataClient<'a> {
&[&mapping_id],
)
.await?;
Ok(rows.first().map(|r| r.get::<_, String>(0)))
Ok(rows.first().map(|r| {
let s: String = r.get(0);
SyncState::from_str(&s).unwrap_or(SyncState::Pending)
}))
}

/// Update source schema and table name (e.g., after a table rename detected via OID match).
Expand Down Expand Up @@ -606,7 +612,8 @@ impl<'a> MetadataClient<'a> {
source_label: row.get::<_, Option<String>>(5).unwrap_or_default(),
sync_mode: row
.get::<_, Option<String>>(6)
.unwrap_or_else(|| "upsert".to_string()),
.and_then(|s| s.parse().ok())
.unwrap_or(SyncMode::Upsert),
})
.collect())
}
Expand Down Expand Up @@ -644,7 +651,7 @@ impl<'a> MetadataClient<'a> {
pub async fn get_mapping_enabled_states(
&self,
ids: &[i32],
) -> Result<HashMap<i32, (bool, String)>, tokio_postgres::Error> {
) -> Result<HashMap<i32, (bool, SyncState)>, tokio_postgres::Error> {
let rows = self
.client
.query(
Expand All @@ -659,7 +666,8 @@ impl<'a> MetadataClient<'a> {
.map(|row| {
let id: i32 = row.get(0);
let enabled: bool = row.get(1);
let state: String = row.get(2);
let state_str: String = row.get(2);
let state = SyncState::from_str(&state_str).unwrap_or(SyncState::Pending);
(id, (enabled, state))
})
.collect())
Expand Down Expand Up @@ -706,7 +714,7 @@ pub struct SnapshotTask {
pub target_schema: String,
pub target_table: String,
pub source_label: String,
pub sync_mode: String,
pub sync_mode: SyncMode,
}

/// Load column names and primary key attribute indices from any PG connection.
Expand Down
44 changes: 24 additions & 20 deletions duckpipe-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use crate::flush_coordinator::{
use crate::metadata::{compute_backoff_secs, MetadataClient, ERRORED_THRESHOLD};
use crate::slot_consumer::SlotConsumer;
use crate::snapshot_manager::SnapshotManager;
use crate::types::{Change, ChangeType, RelCacheEntry, SyncGroup, TableMapping, Value};
use crate::state::SyncState;
use crate::types::{Change, ChangeType, RelCacheEntry, SyncGroup, SyncMode, TableMapping, Value};

/// Configuration for the DuckPipe service.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -142,7 +143,7 @@ async fn ensure_coordinator_queue(
final_attrs
};

let paused = mapping.state == "SNAPSHOT";
let paused = matches!(mapping.state, SyncState::Snapshot);
coordinator.ensure_queue(
target_key,
mapping.id,
Expand All @@ -152,7 +153,7 @@ async fn ensure_coordinator_queue(
atttypes.clone(),
paused,
mapping.source_label.clone(),
mapping.sync_mode.clone(),
mapping.sync_mode,
&mapping.config,
);
Ok(())
Expand All @@ -162,15 +163,18 @@ async fn ensure_coordinator_queue(
/// not yet caught up (CATCHUP with lsn <= snapshot_lsn), or already applied
/// in append mode (lsn <= applied_lsn).
fn should_skip_change(mapping: &TableMapping, lsn: u64) -> bool {
if !mapping.enabled || mapping.state == "ERRORED" {
if !mapping.enabled || matches!(mapping.state, SyncState::Errored) {
return true;
}
if mapping.state == "CATCHUP" && mapping.snapshot_lsn != 0 && lsn <= mapping.snapshot_lsn {
if matches!(mapping.state, SyncState::Catchup)
&& mapping.snapshot_lsn != 0
&& lsn <= mapping.snapshot_lsn
{
return true;
}
// Append-mode exactly-once: skip already-applied WAL changes
if mapping.sync_mode == "append"
&& mapping.state == "STREAMING"
if matches!(mapping.sync_mode, SyncMode::Append)
&& matches!(mapping.state, SyncState::Streaming)
&& mapping.applied_lsn != 0
&& lsn <= mapping.applied_lsn
{
Expand Down Expand Up @@ -353,7 +357,7 @@ async fn process_one_wal_message(
// Detect schema changes by comparing with the cached RELATION entry.
if let Some(old_cached) = rel_cache.get(&rel_id) {
if let Some(ref mapping) = old_cached.mapping {
if mapping.enabled && mapping.state != "ERRORED" {
if mapping.enabled && !matches!(mapping.state, SyncState::Errored) {
let catalog_client = source_client.unwrap_or(meta.client());

// Detect column-level schema changes
Expand Down Expand Up @@ -384,7 +388,7 @@ async fn process_one_wal_message(
key_attrs: new_key_attrs,
atttypes: new_entry.atttypes.clone(),
source_label: mapping.source_label.clone(),
sync_mode: mapping.sync_mode.clone(),
sync_mode: mapping.sync_mode,
};

coordinator.set_pending_ddl(
Expand Down Expand Up @@ -538,7 +542,7 @@ async fn process_one_wal_message(
extract_key_values(&new_values, pk_attrs)
};

if mapping.sync_mode == "append" {
if matches!(mapping.sync_mode, SyncMode::Append) {
// Append mode: push a single Update change (preserves raw op type)
let final_values = if has_unchanged && old_marker == 'O' {
fill_toast_columns(&new_values, &old_values, &new_unchanged)
Expand Down Expand Up @@ -696,8 +700,8 @@ async fn process_one_wal_message(
if let Some(cached) = rel_cache.get(&rel_id) {
let mapping = resolve_mapping(meta, group.id, rel_id, &cached.entry).await?;
if let Some(mapping) = mapping {
if mapping.enabled && mapping.state != "ERRORED" {
if mapping.sync_mode == "append" {
if mapping.enabled && !matches!(mapping.state, SyncState::Errored) {
if matches!(mapping.sync_mode, SyncMode::Append) {
// Append mode: skip DELETE, log that TRUNCATE was received.
// The changelog is immutable — TRUNCATE is not propagated.
tracing::info!(
Expand Down Expand Up @@ -794,18 +798,18 @@ async fn run_heartbeat(
}
Ok(Some(new_state)) => {
tracing::info!(
"pg_duckpipe: auto-retrying errored table {}.{} {}",
"pg_duckpipe: auto-retrying errored table {}.{} -> {}",
table.source_schema,
table.source_table,
new_state
);
if new_state == "STREAMING" {
// Mirror ERRORED STREAMING in the rel_cache so routing resumes
if matches!(new_state, SyncState::Streaming) {
// Mirror ERRORED -> STREAMING in the rel_cache so routing resumes
// immediately without waiting for the next periodic refresh.
for cached in rel_cache.values_mut() {
if let Some(ref mut mapping) = cached.mapping {
if mapping.id == table.id {
mapping.state = "STREAMING".to_string();
mapping.state = SyncState::Streaming;
break;
}
}
Expand All @@ -825,11 +829,11 @@ async fn run_heartbeat(
// Mirror the transition immediately in rel_cache.
for cached in rel_cache.values_mut() {
if let Some(ref mut mapping) = cached.mapping {
if mapping.state == "CATCHUP"
if matches!(mapping.state, SyncState::Catchup)
&& mapping.snapshot_lsn != 0
&& mapping.snapshot_lsn <= group.pending_lsn
{
mapping.state = "STREAMING".to_string();
mapping.state = SyncState::Streaming;
}
}
}
Expand Down Expand Up @@ -1345,9 +1349,9 @@ pub async fn run_group_sync_cycle(
Ok(meta_map) => {
for cached in slot_state.rel_cache.values_mut() {
if let Some(ref mut mapping) = cached.mapping {
if let Some((enabled, state_str)) = meta_map.get(&mapping.id) {
if let Some((enabled, state)) = meta_map.get(&mapping.id) {
mapping.enabled = *enabled;
mapping.state = state_str.clone();
mapping.state = *state;
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions duckpipe-core/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::time::Instant;

use duckdb::Connection;

use crate::types::SyncMode;

/// Guard that DETACHes the DuckLake database on drop.
struct DetachOnDrop(Connection);

Expand Down Expand Up @@ -77,7 +79,7 @@ pub async fn process_snapshot_task(
task_id: i32,
group_name: &str,
source_label: String,
sync_mode: String,
sync_mode: SyncMode,
) -> Result<(u64, u64, u64), String> {
let table_start = Instant::now();

Expand Down Expand Up @@ -165,7 +167,7 @@ pub async fn process_snapshot_task(
let consumer_target_table = target_table.to_string();
let consumer_timing = timing;
let consumer_source_label = source_label.clone();
let consumer_sync_mode = sync_mode.clone();
let consumer_sync_mode = sync_mode;

let consumer_handle = tokio::task::spawn_blocking(move || {
run_duckdb_consumer(
Expand All @@ -176,7 +178,7 @@ pub async fn process_snapshot_task(
&consumer_target_table,
consumer_timing,
&consumer_source_label,
&consumer_sync_mode,
consumer_sync_mode,
)
});

Expand Down Expand Up @@ -383,7 +385,7 @@ fn run_duckdb_consumer(
target_table: &str,
timing: bool,
source_label: &str,
sync_mode: &str,
sync_mode: SyncMode,
) -> Result<u64, String> {
let t_start = if timing { Some(Instant::now()) } else { None };

Expand Down Expand Up @@ -500,7 +502,7 @@ fn run_duckdb_consumer(

// Build INSERT: source columns from CSV + system column literals
let source_value = format!("'{}'", source_label.replace('\'', "''"));
let insert_sql = if sync_mode == "append" {
let insert_sql = if matches!(sync_mode, SyncMode::Append) {
// Append mode: inject _duckpipe_op='I' and _duckpipe_lsn=0 for snapshot rows
format!(
"INSERT INTO {} ({}) SELECT {}, {}, 'I', 0 FROM read_csv('{}', header=true, nullstr='__DUCKPIPE_NULL__')",
Expand Down
Loading
Loading